import asyncio import json import logging from typing import Dict, List, Optional, cast from unittest.mock import MagicMock, Mock, patch from uuid import UUID import pytest from prefect.client.schemas import FlowRun, TaskRun, Workspace from prefect.futures import PrefectFuture from prefect.server.schemas.core import Flow from prefect.task_runners import SequentialTaskRunner from requests.models import Response from datahub.api.entities.datajob import DataJob from datahub.utilities.urns.dataset_urn import DatasetUrn from prefect_datahub.datahub_emitter import DatahubEmitter from prefect_datahub.entities import Dataset, _Entity mock_transform_task_json: Dict = { "name": "transform", "description": "Transform the actual data", "task_key": "__main__.transform", "tags": ["etl flow task"], } mock_extract_task_run_json: Dict = { "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b", "created": "2023-06-06T05:51:54.822707+00:00", "updated": "2023-06-06T05:51:55.126000+00:00", "name": "Extract-0", "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "task_key": "__main__.extract", "dynamic_key": "0", "cache_key": None, "cache_expiration": None, "task_version": None, "empirical_policy": { "max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "retry_jitter_factor": None, }, "tags": [], "state_id": "e280decd-2cc8-4428-a70f-149bcaf95b3c", "task_inputs": {}, "state_type": "COMPLETED", "state_name": "Completed", "run_count": 1, "flow_run_run_count": 1, "expected_start_time": "2023-06-06T05:51:54.822183+00:00", "next_scheduled_start_time": None, "start_time": "2023-06-06T05:51:55.016264+00:00", "end_time": "2023-06-06T05:51:55.096534+00:00", "total_run_time": 0.08027, "estimated_run_time": 0.08027, "estimated_start_time_delta": 0.194081, "state": { "id": "e280decd-2cc8-4428-a70f-149bcaf95b3c", "type": "COMPLETED", "name": "Completed", "timestamp": "2023-06-06T05:51:55.096534+00:00", "message": None, "data": {"type": "unpersisted"}, "state_details": { "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "task_run_id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b", "child_flow_run_id": None, "scheduled_time": None, "cache_key": None, "cache_expiration": None, "untrackable_result": False, "pause_timeout": None, "pause_reschedule": False, "pause_key": None, "refresh_cache": None, }, }, } mock_transform_task_run_json: Dict = { "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7", "created": "2023-06-06T05:51:55.160372+00:00", "updated": "2023-06-06T05:51:55.358000+00:00", "name": "transform-0", "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "task_key": "__main__.transform", "dynamic_key": "0", "cache_key": None, "cache_expiration": None, "task_version": None, "empirical_policy": { "max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "retry_jitter_factor": None, }, "tags": [], "state_id": "971ad82e-6e5f-4691-abab-c900358e96c2", "task_inputs": { "actual_data": [ {"input_type": "task_run", "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b"} ] }, "state_type": "COMPLETED", "state_name": "Completed", "run_count": 1, "flow_run_run_count": 1, "expected_start_time": "2023-06-06T05:51:55.159416+00:00", "next_scheduled_start_time": None, "start_time": "2023-06-06T05:51:55.243159+00:00", "end_time": "2023-06-06T05:51:55.332950+00:00", "total_run_time": 0.089791, "estimated_run_time": 0.089791, "estimated_start_time_delta": 0.083743, "state": { "id": "971ad82e-6e5f-4691-abab-c900358e96c2", "type": "COMPLETED", "name": "Completed", "timestamp": "2023-06-06T05:51:55.332950+00:00", "message": None, "data": {"type": "unpersisted"}, "state_details": { "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "task_run_id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7", "child_flow_run_id": None, "scheduled_time": None, "cache_key": None, "cache_expiration": None, "untrackable_result": False, "pause_timeout": None, "pause_reschedule": False, "pause_key": None, "refresh_cache": None, }, }, } mock_load_task_run_json: Dict = { "id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3", "created": "2023-06-06T05:51:55.389823+00:00", "updated": "2023-06-06T05:51:55.566000+00:00", "name": "Load_task-0", "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "task_key": "__main__.load", "dynamic_key": "0", "cache_key": None, "cache_expiration": None, "task_version": None, "empirical_policy": { "max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "retry_jitter_factor": None, }, "tags": [], "state_id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4", "task_inputs": { "data": [ {"input_type": "task_run", "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7"} ] }, "state_type": "COMPLETED", "state_name": "Completed", "run_count": 1, "flow_run_run_count": 1, "expected_start_time": "2023-06-06T05:51:55.389075+00:00", "next_scheduled_start_time": None, "start_time": "2023-06-06T05:51:55.461812+00:00", "end_time": "2023-06-06T05:51:55.535954+00:00", "total_run_time": 0.074142, "estimated_run_time": 0.074142, "estimated_start_time_delta": 0.072737, "state": { "id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4", "type": "COMPLETED", "name": "Completed", "timestamp": "2023-06-06T05:51:55.535954+00:00", "message": None, "data": {"type": "unpersisted"}, "state_details": { "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "task_run_id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3", "child_flow_run_id": None, "scheduled_time": None, "cache_key": None, "cache_expiration": None, "untrackable_result": True, "pause_timeout": None, "pause_reschedule": False, "pause_key": None, "refresh_cache": None, }, }, } mock_flow_json: Dict = { "id": "cc65498f-d950-4114-8cc1-7af9e8fdf91b", "created": "2023-06-02T12:31:10.988697+00:00", "updated": "2023-06-02T12:31:10.988710+00:00", "name": "etl", "description": "Extract transform load flow", "tags": [], } mock_flow_run_json: Dict = { "id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "created": "2023-06-06T05:51:54.544266+00:00", "updated": "2023-06-06T05:51:55.622000+00:00", "name": "olivine-beagle", "flow_id": "cc65498f-d950-4114-8cc1-7af9e8fdf91b", "state_id": "ca2db325-d98f-40e7-862e-449cd0cc9a6e", "deployment_id": None, "work_queue_name": None, "flow_version": "3ba54dfa31a7c9af4161aa4cd020a527", "parameters": {}, "idempotency_key": None, "context": {}, "empirical_policy": { "max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "pause_keys": [], "resuming": False, }, "tags": [], "parent_task_run_id": None, "state_type": "COMPLETED", "state_name": "Completed", "run_count": 1, "expected_start_time": "2023-06-06T05:51:54.543357+00:00", "next_scheduled_start_time": None, "start_time": "2023-06-06T05:51:54.750523+00:00", "end_time": "2023-06-06T05:51:55.596446+00:00", "total_run_time": 0.845923, "estimated_run_time": 0.845923, "estimated_start_time_delta": 0.207166, "auto_scheduled": False, "infrastructure_document_id": None, "infrastructure_pid": None, "created_by": None, "work_pool_name": None, "state": { "id": "ca2db325-d98f-40e7-862e-449cd0cc9a6e", "type": "COMPLETED", "name": "Completed", "timestamp": "2023-06-06T05:51:55.596446+00:00", "message": "All states completed.", "data": {"type": "unpersisted"}, "state_details": { "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "task_run_id": None, "child_flow_run_id": None, "scheduled_time": None, "cache_key": None, "cache_expiration": None, "untrackable_result": False, "pause_timeout": None, "pause_reschedule": False, "pause_key": None, "refresh_cache": None, }, }, } mock_graph_json: List[Dict] = [ { "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b", "name": "Extract-0", "upstream_dependencies": [], "state": { "id": "e280decd-2cc8-4428-a70f-149bcaf95b3c", "type": "COMPLETED", "name": "Completed", "timestamp": "2023-06-06T05:51:55.096534+00:00", "message": None, "data": {"type": "unpersisted"}, "state_details": { "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "task_run_id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b", "child_flow_run_id": None, "scheduled_time": None, "cache_key": None, "cache_expiration": None, "untrackable_result": False, "pause_timeout": None, "pause_reschedule": False, "pause_key": None, "refresh_cache": None, }, }, "expected_start_time": "2023-06-06T05:51:54.822183+00:00", "start_time": "2023-06-06T05:51:55.016264+00:00", "end_time": "2023-06-06T05:51:55.096534+00:00", "total_run_time": 0.08027, "estimated_run_time": 0.08027, "untrackable_result": False, }, { "id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3", "name": "Load_task-0", "upstream_dependencies": [ {"input_type": "task_run", "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7"} ], "state": { "id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4", "type": "COMPLETED", "name": "Completed", "timestamp": "2023-06-06T05:51:55.535954+00:00", "message": None, "data": {"type": "unpersisted"}, "state_details": { "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "task_run_id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3", "child_flow_run_id": None, "scheduled_time": None, "cache_key": None, "cache_expiration": None, "untrackable_result": True, "pause_timeout": None, "pause_reschedule": False, "pause_key": None, "refresh_cache": None, }, }, "expected_start_time": "2023-06-06T05:51:55.389075+00:00", "start_time": "2023-06-06T05:51:55.461812+00:00", "end_time": "2023-06-06T05:51:55.535954+00:00", "total_run_time": 0.074142, "estimated_run_time": 0.074142, "untrackable_result": True, }, { "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7", "name": "transform-0", "upstream_dependencies": [ {"input_type": "task_run", "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b"} ], "state": { "id": "971ad82e-6e5f-4691-abab-c900358e96c2", "type": "COMPLETED", "name": "Completed", "timestamp": "2023-06-06T05:51:55.332950+00:00", "message": None, "data": {"type": "unpersisted"}, "state_details": { "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", "task_run_id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7", "child_flow_run_id": None, "scheduled_time": None, "cache_key": None, "cache_expiration": None, "untrackable_result": False, "pause_timeout": None, "pause_reschedule": False, "pause_key": None, "refresh_cache": None, }, }, "expected_start_time": "2023-06-06T05:51:55.159416+00:00", "start_time": "2023-06-06T05:51:55.243159+00:00", "end_time": "2023-06-06T05:51:55.332950+00:00", "total_run_time": 0.089791, "estimated_run_time": 0.089791, "untrackable_result": False, }, ] mock_workspace_json: Dict = { "account_id": "33e98cfe-ad06-4ceb-a500-c11148499f75", "account_name": "shubhamjagtapgslabcom", "account_handle": "shubhamjagtapgslabcom", "workspace_id": "157eb822-1b3b-4338-ae80-98edd5d00cb9", "workspace_name": "datahub", "workspace_description": "", "workspace_handle": "datahub", } async def mock_task_run_future(): extract_prefect_future: PrefectFuture = PrefectFuture( name=mock_extract_task_run_json["name"], key=UUID("4552629a-ac04-4590-b286-27642292739f"), task_runner=SequentialTaskRunner(), ) extract_prefect_future.task_run = cast( None, TaskRun.parse_obj(mock_extract_task_run_json) ) transform_prefect_future: PrefectFuture = PrefectFuture( name=mock_transform_task_run_json["name"], key=UUID("40fff3e5-5ef4-4b8b-9cc8-786f91bcc656"), task_runner=SequentialTaskRunner(), ) transform_prefect_future.task_run = cast( None, TaskRun.parse_obj(mock_transform_task_run_json) ) load_prefect_future: PrefectFuture = PrefectFuture( name=mock_load_task_run_json["name"], key=UUID("7565f596-9eb0-4330-ba34-963e7839883e"), task_runner=SequentialTaskRunner(), ) load_prefect_future.task_run = cast( None, TaskRun.parse_obj(mock_load_task_run_json) ) return [extract_prefect_future, transform_prefect_future, load_prefect_future] @pytest.fixture(scope="module") def mock_run_logger(): with patch( "prefect_datahub.datahub_emitter.get_run_logger", return_value=logging.getLogger(), ) as mock_logger: yield mock_logger @pytest.fixture(scope="module") def mock_run_context(mock_run_logger): task_run_ctx = MagicMock() task_run_ctx.task.task_key = mock_transform_task_json["task_key"] task_run_ctx.task.name = mock_transform_task_json["name"] task_run_ctx.task.description = mock_transform_task_json["description"] task_run_ctx.task.tags = mock_transform_task_json["tags"] flow_run_ctx = MagicMock() flow_run_ctx.flow.name = mock_flow_json["name"] flow_run_ctx.flow.description = mock_flow_json["description"] flow_run_obj = FlowRun.parse_obj(mock_flow_run_json) flow_run_ctx.flow_run.id = flow_run_obj.id flow_run_ctx.flow_run.name = flow_run_obj.name flow_run_ctx.flow_run.flow_id = flow_run_obj.flow_id flow_run_ctx.flow_run.start_time = flow_run_obj.start_time flow_run_ctx.task_run_futures = asyncio.run(mock_task_run_future()) with patch( "prefect_datahub.datahub_emitter.TaskRunContext" ) as mock_task_run_ctx, patch( "prefect_datahub.datahub_emitter.FlowRunContext" ) as mock_flow_run_ctx: mock_task_run_ctx.get.return_value = task_run_ctx mock_flow_run_ctx.get.return_value = flow_run_ctx yield (task_run_ctx, flow_run_ctx) async def mock_task_run(*args, **kwargs): task_run_id = str(kwargs["task_run_id"]) if task_run_id == "fa14a52b-d271-4c41-99cb-6b42ca7c070b": return TaskRun.parse_obj(mock_extract_task_run_json) elif task_run_id == "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7": return TaskRun.parse_obj(mock_transform_task_run_json) elif task_run_id == "f19f83ea-316f-4781-8cbe-1d5d8719afc3": return TaskRun.parse_obj(mock_load_task_run_json) return None async def mock_flow(*args, **kwargs): return Flow.parse_obj(mock_flow_json) async def mock_flow_run(*args, **kwargs): return FlowRun.parse_obj(mock_flow_run_json) async def mock_flow_run_graph(*args, **kwargs): response = Response() response.status_code = 200 response._content = json.dumps(mock_graph_json, separators=(",", ":")).encode( "utf-8" ) return response async def mock_api_healthcheck(*args, **kwargs): return None async def mock_read_workspaces(*args, **kwargs): return [Workspace.parse_obj(mock_workspace_json)] @pytest.fixture(scope="module") def mock_prefect_client(): prefect_client_mock = MagicMock() prefect_client_mock.read_flow.side_effect = mock_flow prefect_client_mock.read_flow_run.side_effect = mock_flow_run prefect_client_mock.read_task_run.side_effect = mock_task_run prefect_client_mock._client.get.side_effect = mock_flow_run_graph with patch("prefect_datahub.datahub_emitter.orchestration") as mock_client: mock_client.get_client.return_value = prefect_client_mock yield prefect_client_mock @pytest.fixture(scope="module") def mock_prefect_cloud_client(): prefect_cloud_client_mock = MagicMock() prefect_cloud_client_mock.api_healthcheck.side_effect = mock_api_healthcheck prefect_cloud_client_mock.read_workspaces.side_effect = mock_read_workspaces with patch("prefect_datahub.datahub_emitter.cloud") as mock_client, patch( "prefect_datahub.datahub_emitter.PREFECT_API_URL.value", return_value="https://api.prefect.cloud/api/accounts/33e98cfe-ad06-4ceb-" "a500-c11148499f75/workspaces/157eb822-1b3b-4338-ae80-98edd5d00cb9", ): mock_client.get_cloud_client.return_value = prefect_cloud_client_mock yield prefect_cloud_client_mock @patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True) def test_entities_to_urn_list(mock_emit): dataset_urn_list = DatahubEmitter()._entities_to_urn_list( [Dataset("snowflake", "mydb.schema.tableA")] ) for dataset_urn in dataset_urn_list: assert isinstance(dataset_urn, DatasetUrn) @patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True) def test_get_flow_run_graph(mock_emit, mock_prefect_client): graph_json = asyncio.run( DatahubEmitter()._get_flow_run_graph("c3b947e5-3fa1-4b46-a2e2-58d50c938f2e") ) assert isinstance(graph_json, list) @patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True) def test__get_workspace(mock_emit, mock_prefect_cloud_client): workspace_name = DatahubEmitter()._get_workspace() assert workspace_name == "datahub" @patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True) def test_add_task(mock_emit, mock_run_context): mock_emitter = Mock() mock_emit.return_value = mock_emitter datahub_emitter = DatahubEmitter() inputs: Optional[List[_Entity]] = [Dataset("snowflake", "mydb.schema.tableA")] outputs: Optional[List[_Entity]] = [Dataset("snowflake", "mydb.schema.tableC")] datahub_emitter.add_task( inputs=inputs, outputs=outputs, ) task_run_ctx = mock_run_context[0] flow_run_ctx = mock_run_context[1] expected_datajob_urn = ( f"urn:li:dataJob:(urn:li:dataFlow:" f"(prefect,{flow_run_ctx.flow.name},PROD),{task_run_ctx.task.task_key})" ) assert expected_datajob_urn in datahub_emitter._datajobs_to_emit.keys() actual_datajob = datahub_emitter._datajobs_to_emit[expected_datajob_urn] assert isinstance(actual_datajob, DataJob) assert str(actual_datajob.flow_urn) == "urn:li:dataFlow:(prefect,etl,PROD)" assert actual_datajob.name == task_run_ctx.task.name assert actual_datajob.description == task_run_ctx.task.description assert actual_datajob.tags == task_run_ctx.task.tags assert ( str(actual_datajob.inlets[0]) == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)" ) assert ( str(actual_datajob.outlets[0]) == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" ) assert mock_emit.emit.call_count == 0 @patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True) def test_emit_flow( mock_emit, mock_run_context, mock_prefect_client, mock_prefect_cloud_client ): mock_emitter = Mock() mock_emit.return_value = mock_emitter platform_instance = "datahub_workspace" datahub_emitter = DatahubEmitter(platform_instance=platform_instance) datahub_emitter.add_task() datahub_emitter.emit_flow() task_run_ctx = mock_run_context[0] flow_run_ctx = mock_run_context[1] expected_dataflow_urn = ( f"urn:li:dataFlow:(prefect,{platform_instance}.{flow_run_ctx.flow.name},PROD)" ) # print method calls for debugging print("\nMethod calls indices:") for i, call in enumerate(mock_emitter.method_calls): if ( len(call[1]) > 0 and hasattr(call[1][0], "aspectName") and hasattr(call[1][0], "entityUrn") ): print( f"Index {i}: aspectName={call[1][0].aspectName}, entityUrn={call[1][0].entityUrn}" ) # Ignore the first call (index 0) which is a connection call # DataFlow assertions assert mock_emitter.method_calls[1][1][0].aspectName == "dataFlowInfo" assert mock_emitter.method_calls[1][1][0].entityUrn == expected_dataflow_urn assert mock_emitter.method_calls[2][1][0].aspectName == "status" assert mock_emitter.method_calls[2][1][0].entityUrn == expected_dataflow_urn assert mock_emitter.method_calls[3][1][0].aspectName == "dataPlatformInstance" assert mock_emitter.method_calls[3][1][0].entityUrn == expected_dataflow_urn assert mock_emitter.method_calls[4][1][0].aspectName == "ownership" assert mock_emitter.method_calls[4][1][0].entityUrn == expected_dataflow_urn assert mock_emitter.method_calls[5][1][0].aspectName == "globalTags" assert mock_emitter.method_calls[5][1][0].entityUrn == expected_dataflow_urn assert mock_emitter.method_calls[6][1][0].aspectName == "browsePaths" assert mock_emitter.method_calls[6][1][0].entityUrn == expected_dataflow_urn # DataFlow assertions - duplicated assert mock_emitter.method_calls[7][1][0].aspectName == "dataFlowInfo" assert mock_emitter.method_calls[7][1][0].entityUrn == expected_dataflow_urn assert mock_emitter.method_calls[8][1][0].aspectName == "status" assert mock_emitter.method_calls[8][1][0].entityUrn == expected_dataflow_urn assert mock_emitter.method_calls[9][1][0].aspectName == "dataPlatformInstance" assert mock_emitter.method_calls[9][1][0].entityUrn == expected_dataflow_urn assert mock_emitter.method_calls[10][1][0].aspectName == "ownership" assert mock_emitter.method_calls[10][1][0].entityUrn == expected_dataflow_urn assert mock_emitter.method_calls[11][1][0].aspectName == "globalTags" assert mock_emitter.method_calls[11][1][0].entityUrn == expected_dataflow_urn # DataProcessInstance assertions for the flow assert ( mock_emitter.method_calls[12][1][0].aspectName == "dataProcessInstanceProperties" ) assert ( mock_emitter.method_calls[12][1][0].entityUrn == "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac" ) assert ( mock_emitter.method_calls[13][1][0].aspectName == "dataProcessInstanceRelationships" ) assert ( mock_emitter.method_calls[13][1][0].entityUrn == "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac" ) assert mock_emitter.method_calls[14][1][0].aspectName == "dataPlatformInstance" assert ( mock_emitter.method_calls[14][1][0].entityUrn == "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac" ) assert ( mock_emitter.method_calls[15][1][0].aspectName == "dataProcessInstanceRunEvent" ) assert ( mock_emitter.method_calls[15][1][0].entityUrn == "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac" ) # DataJob assertions for extract assert mock_emitter.method_calls[16][1][0].aspectName == "dataJobInfo" assert ( mock_emitter.method_calls[16][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" ) assert mock_emitter.method_calls[17][1][0].aspectName == "dataPlatformInstance" assert ( mock_emitter.method_calls[17][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" ) assert mock_emitter.method_calls[18][1][0].aspectName == "status" assert ( mock_emitter.method_calls[18][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" ) assert mock_emitter.method_calls[19][1][0].aspectName == "dataJobInputOutput" assert ( mock_emitter.method_calls[19][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" ) assert mock_emitter.method_calls[20][1][0].aspectName == "ownership" assert ( mock_emitter.method_calls[20][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" ) assert mock_emitter.method_calls[21][1][0].aspectName == "globalTags" assert ( mock_emitter.method_calls[21][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" ) assert mock_emitter.method_calls[22][1][0].aspectName == "browsePaths" assert ( mock_emitter.method_calls[22][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" ) # DataProcessInstance assertions for extract assert ( mock_emitter.method_calls[23][1][0].aspectName == "dataProcessInstanceProperties" ) assert ( mock_emitter.method_calls[23][1][0].entityUrn == "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691" ) assert ( mock_emitter.method_calls[24][1][0].aspectName == "dataProcessInstanceRelationships" ) assert ( mock_emitter.method_calls[24][1][0].entityUrn == "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691" ) assert mock_emitter.method_calls[25][1][0].aspectName == "dataPlatformInstance" assert ( mock_emitter.method_calls[25][1][0].entityUrn == "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691" ) assert ( mock_emitter.method_calls[26][1][0].aspectName == "dataProcessInstanceRunEvent" ) assert ( mock_emitter.method_calls[26][1][0].entityUrn == "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691" ) assert ( mock_emitter.method_calls[27][1][0].aspectName == "dataProcessInstanceRunEvent" ) assert ( mock_emitter.method_calls[27][1][0].entityUrn == "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691" ) # DataJob assertions for load assert mock_emitter.method_calls[28][1][0].aspectName == "dataJobInfo" assert ( mock_emitter.method_calls[28][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" ) assert mock_emitter.method_calls[29][1][0].aspectName == "dataPlatformInstance" assert ( mock_emitter.method_calls[29][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" ) assert mock_emitter.method_calls[30][1][0].aspectName == "status" assert ( mock_emitter.method_calls[30][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" ) assert mock_emitter.method_calls[31][1][0].aspectName == "dataJobInputOutput" assert ( mock_emitter.method_calls[31][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" ) assert mock_emitter.method_calls[32][1][0].aspectName == "ownership" assert ( mock_emitter.method_calls[32][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" ) assert mock_emitter.method_calls[33][1][0].aspectName == "globalTags" assert ( mock_emitter.method_calls[33][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" ) assert mock_emitter.method_calls[34][1][0].aspectName == "browsePaths" assert ( mock_emitter.method_calls[34][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" ) # DataProcessInstance assertions for load assert ( mock_emitter.method_calls[35][1][0].aspectName == "dataProcessInstanceProperties" ) assert ( mock_emitter.method_calls[35][1][0].entityUrn == "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038" ) assert ( mock_emitter.method_calls[36][1][0].aspectName == "dataProcessInstanceRelationships" ) assert ( mock_emitter.method_calls[36][1][0].entityUrn == "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038" ) assert mock_emitter.method_calls[37][1][0].aspectName == "dataPlatformInstance" assert ( mock_emitter.method_calls[37][1][0].entityUrn == "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038" ) assert ( mock_emitter.method_calls[38][1][0].aspectName == "dataProcessInstanceRunEvent" ) assert ( mock_emitter.method_calls[38][1][0].entityUrn == "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038" ) assert ( mock_emitter.method_calls[39][1][0].aspectName == "dataProcessInstanceRunEvent" ) assert ( mock_emitter.method_calls[39][1][0].entityUrn == "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038" ) # DataJob assertions for transform assert mock_emitter.method_calls[40][1][0].aspectName == "dataJobInfo" assert ( mock_emitter.method_calls[40][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" ) assert mock_emitter.method_calls[41][1][0].aspectName == "dataPlatformInstance" assert ( mock_emitter.method_calls[41][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" ) assert mock_emitter.method_calls[42][1][0].aspectName == "status" assert ( mock_emitter.method_calls[42][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" ) assert mock_emitter.method_calls[43][1][0].aspectName == "dataJobInputOutput" assert ( mock_emitter.method_calls[43][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" ) assert mock_emitter.method_calls[44][1][0].aspectName == "ownership" assert ( mock_emitter.method_calls[44][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" ) assert mock_emitter.method_calls[45][1][0].aspectName == "globalTags" assert ( mock_emitter.method_calls[45][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" ) assert ( mock_emitter.method_calls[45][1][0].aspect.tags[0].tag == f"urn:li:tag:{task_run_ctx.task.tags[0]}" ) assert mock_emitter.method_calls[46][1][0].aspectName == "browsePaths" assert ( mock_emitter.method_calls[46][1][0].entityUrn == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" ) # DataProcessInstance assertions for transform assert ( mock_emitter.method_calls[47][1][0].aspectName == "dataProcessInstanceProperties" ) assert ( mock_emitter.method_calls[47][1][0].entityUrn == "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6" ) assert ( mock_emitter.method_calls[48][1][0].aspectName == "dataProcessInstanceRelationships" ) assert ( mock_emitter.method_calls[48][1][0].entityUrn == "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6" ) assert mock_emitter.method_calls[49][1][0].aspectName == "dataPlatformInstance" assert ( mock_emitter.method_calls[49][1][0].entityUrn == "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6" ) assert ( mock_emitter.method_calls[50][1][0].aspectName == "dataProcessInstanceRunEvent" ) assert ( mock_emitter.method_calls[50][1][0].entityUrn == "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6" ) assert ( mock_emitter.method_calls[51][1][0].aspectName == "dataProcessInstanceRunEvent" ) assert ( mock_emitter.method_calls[51][1][0].entityUrn == "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6" )