mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-13 04:02:34 +00:00
881 lines
33 KiB
Python
881 lines
33 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Dict, List, Optional, cast
|
|
from unittest.mock import MagicMock, Mock, patch
|
|
from uuid import UUID
|
|
|
|
import pytest
|
|
from prefect.client.schemas import FlowRun, TaskRun, Workspace
|
|
from prefect.futures import PrefectFuture
|
|
from prefect.server.schemas.core import Flow
|
|
from prefect.task_runners import SequentialTaskRunner
|
|
from requests.models import Response
|
|
|
|
from datahub.api.entities.datajob import DataJob
|
|
from datahub.utilities.urns.dataset_urn import DatasetUrn
|
|
from prefect_datahub.datahub_emitter import DatahubEmitter
|
|
from prefect_datahub.entities import Dataset, _Entity
|
|
|
|
mock_transform_task_json: Dict = {
|
|
"name": "transform",
|
|
"description": "Transform the actual data",
|
|
"task_key": "__main__.transform",
|
|
"tags": ["etl flow task"],
|
|
}
|
|
|
|
mock_extract_task_run_json: Dict = {
|
|
"id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
|
|
"created": "2023-06-06T05:51:54.822707+00:00",
|
|
"updated": "2023-06-06T05:51:55.126000+00:00",
|
|
"name": "Extract-0",
|
|
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"task_key": "__main__.extract",
|
|
"dynamic_key": "0",
|
|
"cache_key": None,
|
|
"cache_expiration": None,
|
|
"task_version": None,
|
|
"empirical_policy": {
|
|
"max_retries": 0,
|
|
"retry_delay_seconds": 0.0,
|
|
"retries": 0,
|
|
"retry_delay": 0,
|
|
"retry_jitter_factor": None,
|
|
},
|
|
"tags": [],
|
|
"state_id": "e280decd-2cc8-4428-a70f-149bcaf95b3c",
|
|
"task_inputs": {},
|
|
"state_type": "COMPLETED",
|
|
"state_name": "Completed",
|
|
"run_count": 1,
|
|
"flow_run_run_count": 1,
|
|
"expected_start_time": "2023-06-06T05:51:54.822183+00:00",
|
|
"next_scheduled_start_time": None,
|
|
"start_time": "2023-06-06T05:51:55.016264+00:00",
|
|
"end_time": "2023-06-06T05:51:55.096534+00:00",
|
|
"total_run_time": 0.08027,
|
|
"estimated_run_time": 0.08027,
|
|
"estimated_start_time_delta": 0.194081,
|
|
"state": {
|
|
"id": "e280decd-2cc8-4428-a70f-149bcaf95b3c",
|
|
"type": "COMPLETED",
|
|
"name": "Completed",
|
|
"timestamp": "2023-06-06T05:51:55.096534+00:00",
|
|
"message": None,
|
|
"data": {"type": "unpersisted"},
|
|
"state_details": {
|
|
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"task_run_id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
|
|
"child_flow_run_id": None,
|
|
"scheduled_time": None,
|
|
"cache_key": None,
|
|
"cache_expiration": None,
|
|
"untrackable_result": False,
|
|
"pause_timeout": None,
|
|
"pause_reschedule": False,
|
|
"pause_key": None,
|
|
"refresh_cache": None,
|
|
},
|
|
},
|
|
}
|
|
|
|
mock_transform_task_run_json: Dict = {
|
|
"id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
|
|
"created": "2023-06-06T05:51:55.160372+00:00",
|
|
"updated": "2023-06-06T05:51:55.358000+00:00",
|
|
"name": "transform-0",
|
|
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"task_key": "__main__.transform",
|
|
"dynamic_key": "0",
|
|
"cache_key": None,
|
|
"cache_expiration": None,
|
|
"task_version": None,
|
|
"empirical_policy": {
|
|
"max_retries": 0,
|
|
"retry_delay_seconds": 0.0,
|
|
"retries": 0,
|
|
"retry_delay": 0,
|
|
"retry_jitter_factor": None,
|
|
},
|
|
"tags": [],
|
|
"state_id": "971ad82e-6e5f-4691-abab-c900358e96c2",
|
|
"task_inputs": {
|
|
"actual_data": [
|
|
{"input_type": "task_run", "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b"}
|
|
]
|
|
},
|
|
"state_type": "COMPLETED",
|
|
"state_name": "Completed",
|
|
"run_count": 1,
|
|
"flow_run_run_count": 1,
|
|
"expected_start_time": "2023-06-06T05:51:55.159416+00:00",
|
|
"next_scheduled_start_time": None,
|
|
"start_time": "2023-06-06T05:51:55.243159+00:00",
|
|
"end_time": "2023-06-06T05:51:55.332950+00:00",
|
|
"total_run_time": 0.089791,
|
|
"estimated_run_time": 0.089791,
|
|
"estimated_start_time_delta": 0.083743,
|
|
"state": {
|
|
"id": "971ad82e-6e5f-4691-abab-c900358e96c2",
|
|
"type": "COMPLETED",
|
|
"name": "Completed",
|
|
"timestamp": "2023-06-06T05:51:55.332950+00:00",
|
|
"message": None,
|
|
"data": {"type": "unpersisted"},
|
|
"state_details": {
|
|
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"task_run_id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
|
|
"child_flow_run_id": None,
|
|
"scheduled_time": None,
|
|
"cache_key": None,
|
|
"cache_expiration": None,
|
|
"untrackable_result": False,
|
|
"pause_timeout": None,
|
|
"pause_reschedule": False,
|
|
"pause_key": None,
|
|
"refresh_cache": None,
|
|
},
|
|
},
|
|
}
|
|
mock_load_task_run_json: Dict = {
|
|
"id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
|
|
"created": "2023-06-06T05:51:55.389823+00:00",
|
|
"updated": "2023-06-06T05:51:55.566000+00:00",
|
|
"name": "Load_task-0",
|
|
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"task_key": "__main__.load",
|
|
"dynamic_key": "0",
|
|
"cache_key": None,
|
|
"cache_expiration": None,
|
|
"task_version": None,
|
|
"empirical_policy": {
|
|
"max_retries": 0,
|
|
"retry_delay_seconds": 0.0,
|
|
"retries": 0,
|
|
"retry_delay": 0,
|
|
"retry_jitter_factor": None,
|
|
},
|
|
"tags": [],
|
|
"state_id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4",
|
|
"task_inputs": {
|
|
"data": [
|
|
{"input_type": "task_run", "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7"}
|
|
]
|
|
},
|
|
"state_type": "COMPLETED",
|
|
"state_name": "Completed",
|
|
"run_count": 1,
|
|
"flow_run_run_count": 1,
|
|
"expected_start_time": "2023-06-06T05:51:55.389075+00:00",
|
|
"next_scheduled_start_time": None,
|
|
"start_time": "2023-06-06T05:51:55.461812+00:00",
|
|
"end_time": "2023-06-06T05:51:55.535954+00:00",
|
|
"total_run_time": 0.074142,
|
|
"estimated_run_time": 0.074142,
|
|
"estimated_start_time_delta": 0.072737,
|
|
"state": {
|
|
"id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4",
|
|
"type": "COMPLETED",
|
|
"name": "Completed",
|
|
"timestamp": "2023-06-06T05:51:55.535954+00:00",
|
|
"message": None,
|
|
"data": {"type": "unpersisted"},
|
|
"state_details": {
|
|
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"task_run_id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
|
|
"child_flow_run_id": None,
|
|
"scheduled_time": None,
|
|
"cache_key": None,
|
|
"cache_expiration": None,
|
|
"untrackable_result": True,
|
|
"pause_timeout": None,
|
|
"pause_reschedule": False,
|
|
"pause_key": None,
|
|
"refresh_cache": None,
|
|
},
|
|
},
|
|
}
|
|
mock_flow_json: Dict = {
|
|
"id": "cc65498f-d950-4114-8cc1-7af9e8fdf91b",
|
|
"created": "2023-06-02T12:31:10.988697+00:00",
|
|
"updated": "2023-06-02T12:31:10.988710+00:00",
|
|
"name": "etl",
|
|
"description": "Extract transform load flow",
|
|
"tags": [],
|
|
}
|
|
mock_flow_run_json: Dict = {
|
|
"id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"created": "2023-06-06T05:51:54.544266+00:00",
|
|
"updated": "2023-06-06T05:51:55.622000+00:00",
|
|
"name": "olivine-beagle",
|
|
"flow_id": "cc65498f-d950-4114-8cc1-7af9e8fdf91b",
|
|
"state_id": "ca2db325-d98f-40e7-862e-449cd0cc9a6e",
|
|
"deployment_id": None,
|
|
"work_queue_name": None,
|
|
"flow_version": "3ba54dfa31a7c9af4161aa4cd020a527",
|
|
"parameters": {},
|
|
"idempotency_key": None,
|
|
"context": {},
|
|
"empirical_policy": {
|
|
"max_retries": 0,
|
|
"retry_delay_seconds": 0.0,
|
|
"retries": 0,
|
|
"retry_delay": 0,
|
|
"pause_keys": [],
|
|
"resuming": False,
|
|
},
|
|
"tags": [],
|
|
"parent_task_run_id": None,
|
|
"state_type": "COMPLETED",
|
|
"state_name": "Completed",
|
|
"run_count": 1,
|
|
"expected_start_time": "2023-06-06T05:51:54.543357+00:00",
|
|
"next_scheduled_start_time": None,
|
|
"start_time": "2023-06-06T05:51:54.750523+00:00",
|
|
"end_time": "2023-06-06T05:51:55.596446+00:00",
|
|
"total_run_time": 0.845923,
|
|
"estimated_run_time": 0.845923,
|
|
"estimated_start_time_delta": 0.207166,
|
|
"auto_scheduled": False,
|
|
"infrastructure_document_id": None,
|
|
"infrastructure_pid": None,
|
|
"created_by": None,
|
|
"work_pool_name": None,
|
|
"state": {
|
|
"id": "ca2db325-d98f-40e7-862e-449cd0cc9a6e",
|
|
"type": "COMPLETED",
|
|
"name": "Completed",
|
|
"timestamp": "2023-06-06T05:51:55.596446+00:00",
|
|
"message": "All states completed.",
|
|
"data": {"type": "unpersisted"},
|
|
"state_details": {
|
|
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"task_run_id": None,
|
|
"child_flow_run_id": None,
|
|
"scheduled_time": None,
|
|
"cache_key": None,
|
|
"cache_expiration": None,
|
|
"untrackable_result": False,
|
|
"pause_timeout": None,
|
|
"pause_reschedule": False,
|
|
"pause_key": None,
|
|
"refresh_cache": None,
|
|
},
|
|
},
|
|
}
|
|
mock_graph_json: List[Dict] = [
|
|
{
|
|
"id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
|
|
"name": "Extract-0",
|
|
"upstream_dependencies": [],
|
|
"state": {
|
|
"id": "e280decd-2cc8-4428-a70f-149bcaf95b3c",
|
|
"type": "COMPLETED",
|
|
"name": "Completed",
|
|
"timestamp": "2023-06-06T05:51:55.096534+00:00",
|
|
"message": None,
|
|
"data": {"type": "unpersisted"},
|
|
"state_details": {
|
|
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"task_run_id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
|
|
"child_flow_run_id": None,
|
|
"scheduled_time": None,
|
|
"cache_key": None,
|
|
"cache_expiration": None,
|
|
"untrackable_result": False,
|
|
"pause_timeout": None,
|
|
"pause_reschedule": False,
|
|
"pause_key": None,
|
|
"refresh_cache": None,
|
|
},
|
|
},
|
|
"expected_start_time": "2023-06-06T05:51:54.822183+00:00",
|
|
"start_time": "2023-06-06T05:51:55.016264+00:00",
|
|
"end_time": "2023-06-06T05:51:55.096534+00:00",
|
|
"total_run_time": 0.08027,
|
|
"estimated_run_time": 0.08027,
|
|
"untrackable_result": False,
|
|
},
|
|
{
|
|
"id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
|
|
"name": "Load_task-0",
|
|
"upstream_dependencies": [
|
|
{"input_type": "task_run", "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7"}
|
|
],
|
|
"state": {
|
|
"id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4",
|
|
"type": "COMPLETED",
|
|
"name": "Completed",
|
|
"timestamp": "2023-06-06T05:51:55.535954+00:00",
|
|
"message": None,
|
|
"data": {"type": "unpersisted"},
|
|
"state_details": {
|
|
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"task_run_id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
|
|
"child_flow_run_id": None,
|
|
"scheduled_time": None,
|
|
"cache_key": None,
|
|
"cache_expiration": None,
|
|
"untrackable_result": True,
|
|
"pause_timeout": None,
|
|
"pause_reschedule": False,
|
|
"pause_key": None,
|
|
"refresh_cache": None,
|
|
},
|
|
},
|
|
"expected_start_time": "2023-06-06T05:51:55.389075+00:00",
|
|
"start_time": "2023-06-06T05:51:55.461812+00:00",
|
|
"end_time": "2023-06-06T05:51:55.535954+00:00",
|
|
"total_run_time": 0.074142,
|
|
"estimated_run_time": 0.074142,
|
|
"untrackable_result": True,
|
|
},
|
|
{
|
|
"id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
|
|
"name": "transform-0",
|
|
"upstream_dependencies": [
|
|
{"input_type": "task_run", "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b"}
|
|
],
|
|
"state": {
|
|
"id": "971ad82e-6e5f-4691-abab-c900358e96c2",
|
|
"type": "COMPLETED",
|
|
"name": "Completed",
|
|
"timestamp": "2023-06-06T05:51:55.332950+00:00",
|
|
"message": None,
|
|
"data": {"type": "unpersisted"},
|
|
"state_details": {
|
|
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
|
|
"task_run_id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
|
|
"child_flow_run_id": None,
|
|
"scheduled_time": None,
|
|
"cache_key": None,
|
|
"cache_expiration": None,
|
|
"untrackable_result": False,
|
|
"pause_timeout": None,
|
|
"pause_reschedule": False,
|
|
"pause_key": None,
|
|
"refresh_cache": None,
|
|
},
|
|
},
|
|
"expected_start_time": "2023-06-06T05:51:55.159416+00:00",
|
|
"start_time": "2023-06-06T05:51:55.243159+00:00",
|
|
"end_time": "2023-06-06T05:51:55.332950+00:00",
|
|
"total_run_time": 0.089791,
|
|
"estimated_run_time": 0.089791,
|
|
"untrackable_result": False,
|
|
},
|
|
]
|
|
mock_workspace_json: Dict = {
|
|
"account_id": "33e98cfe-ad06-4ceb-a500-c11148499f75",
|
|
"account_name": "shubhamjagtapgslabcom",
|
|
"account_handle": "shubhamjagtapgslabcom",
|
|
"workspace_id": "157eb822-1b3b-4338-ae80-98edd5d00cb9",
|
|
"workspace_name": "datahub",
|
|
"workspace_description": "",
|
|
"workspace_handle": "datahub",
|
|
}
|
|
|
|
|
|
async def mock_task_run_future():
|
|
extract_prefect_future: PrefectFuture = PrefectFuture(
|
|
name=mock_extract_task_run_json["name"],
|
|
key=UUID("4552629a-ac04-4590-b286-27642292739f"),
|
|
task_runner=SequentialTaskRunner(),
|
|
)
|
|
extract_prefect_future.task_run = cast(
|
|
None, TaskRun.parse_obj(mock_extract_task_run_json)
|
|
)
|
|
transform_prefect_future: PrefectFuture = PrefectFuture(
|
|
name=mock_transform_task_run_json["name"],
|
|
key=UUID("40fff3e5-5ef4-4b8b-9cc8-786f91bcc656"),
|
|
task_runner=SequentialTaskRunner(),
|
|
)
|
|
transform_prefect_future.task_run = cast(
|
|
None, TaskRun.parse_obj(mock_transform_task_run_json)
|
|
)
|
|
load_prefect_future: PrefectFuture = PrefectFuture(
|
|
name=mock_load_task_run_json["name"],
|
|
key=UUID("7565f596-9eb0-4330-ba34-963e7839883e"),
|
|
task_runner=SequentialTaskRunner(),
|
|
)
|
|
load_prefect_future.task_run = cast(
|
|
None, TaskRun.parse_obj(mock_load_task_run_json)
|
|
)
|
|
return [extract_prefect_future, transform_prefect_future, load_prefect_future]
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def mock_run_logger():
|
|
with patch(
|
|
"prefect_datahub.datahub_emitter.get_run_logger",
|
|
return_value=logging.getLogger(),
|
|
) as mock_logger:
|
|
yield mock_logger
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def mock_run_context(mock_run_logger):
|
|
task_run_ctx = MagicMock()
|
|
task_run_ctx.task.task_key = mock_transform_task_json["task_key"]
|
|
task_run_ctx.task.name = mock_transform_task_json["name"]
|
|
task_run_ctx.task.description = mock_transform_task_json["description"]
|
|
task_run_ctx.task.tags = mock_transform_task_json["tags"]
|
|
|
|
flow_run_ctx = MagicMock()
|
|
flow_run_ctx.flow.name = mock_flow_json["name"]
|
|
flow_run_ctx.flow.description = mock_flow_json["description"]
|
|
flow_run_obj = FlowRun.parse_obj(mock_flow_run_json)
|
|
flow_run_ctx.flow_run.id = flow_run_obj.id
|
|
flow_run_ctx.flow_run.name = flow_run_obj.name
|
|
flow_run_ctx.flow_run.flow_id = flow_run_obj.flow_id
|
|
flow_run_ctx.flow_run.start_time = flow_run_obj.start_time
|
|
flow_run_ctx.task_run_futures = asyncio.run(mock_task_run_future())
|
|
|
|
with patch(
|
|
"prefect_datahub.datahub_emitter.TaskRunContext"
|
|
) as mock_task_run_ctx, patch(
|
|
"prefect_datahub.datahub_emitter.FlowRunContext"
|
|
) as mock_flow_run_ctx:
|
|
mock_task_run_ctx.get.return_value = task_run_ctx
|
|
mock_flow_run_ctx.get.return_value = flow_run_ctx
|
|
yield (task_run_ctx, flow_run_ctx)
|
|
|
|
|
|
async def mock_task_run(*args, **kwargs):
|
|
task_run_id = str(kwargs["task_run_id"])
|
|
if task_run_id == "fa14a52b-d271-4c41-99cb-6b42ca7c070b":
|
|
return TaskRun.parse_obj(mock_extract_task_run_json)
|
|
elif task_run_id == "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7":
|
|
return TaskRun.parse_obj(mock_transform_task_run_json)
|
|
elif task_run_id == "f19f83ea-316f-4781-8cbe-1d5d8719afc3":
|
|
return TaskRun.parse_obj(mock_load_task_run_json)
|
|
return None
|
|
|
|
|
|
async def mock_flow(*args, **kwargs):
|
|
return Flow.parse_obj(mock_flow_json)
|
|
|
|
|
|
async def mock_flow_run(*args, **kwargs):
|
|
return FlowRun.parse_obj(mock_flow_run_json)
|
|
|
|
|
|
async def mock_flow_run_graph(*args, **kwargs):
|
|
response = Response()
|
|
response.status_code = 200
|
|
response._content = json.dumps(mock_graph_json, separators=(",", ":")).encode(
|
|
"utf-8"
|
|
)
|
|
return response
|
|
|
|
|
|
async def mock_api_healthcheck(*args, **kwargs):
|
|
return None
|
|
|
|
|
|
async def mock_read_workspaces(*args, **kwargs):
|
|
return [Workspace.parse_obj(mock_workspace_json)]
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def mock_prefect_client():
|
|
prefect_client_mock = MagicMock()
|
|
prefect_client_mock.read_flow.side_effect = mock_flow
|
|
prefect_client_mock.read_flow_run.side_effect = mock_flow_run
|
|
prefect_client_mock.read_task_run.side_effect = mock_task_run
|
|
prefect_client_mock._client.get.side_effect = mock_flow_run_graph
|
|
with patch("prefect_datahub.datahub_emitter.orchestration") as mock_client:
|
|
mock_client.get_client.return_value = prefect_client_mock
|
|
yield prefect_client_mock
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def mock_prefect_cloud_client():
|
|
prefect_cloud_client_mock = MagicMock()
|
|
prefect_cloud_client_mock.api_healthcheck.side_effect = mock_api_healthcheck
|
|
prefect_cloud_client_mock.read_workspaces.side_effect = mock_read_workspaces
|
|
with patch("prefect_datahub.datahub_emitter.cloud") as mock_client, patch(
|
|
"prefect_datahub.datahub_emitter.PREFECT_API_URL.value",
|
|
return_value="https://api.prefect.cloud/api/accounts/33e98cfe-ad06-4ceb-"
|
|
"a500-c11148499f75/workspaces/157eb822-1b3b-4338-ae80-98edd5d00cb9",
|
|
):
|
|
mock_client.get_cloud_client.return_value = prefect_cloud_client_mock
|
|
yield prefect_cloud_client_mock
|
|
|
|
|
|
@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
|
|
def test_entities_to_urn_list(mock_emit):
|
|
dataset_urn_list = DatahubEmitter()._entities_to_urn_list(
|
|
[Dataset("snowflake", "mydb.schema.tableA")]
|
|
)
|
|
for dataset_urn in dataset_urn_list:
|
|
assert isinstance(dataset_urn, DatasetUrn)
|
|
|
|
|
|
@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
|
|
def test_get_flow_run_graph(mock_emit, mock_prefect_client):
|
|
graph_json = asyncio.run(
|
|
DatahubEmitter()._get_flow_run_graph("c3b947e5-3fa1-4b46-a2e2-58d50c938f2e")
|
|
)
|
|
assert isinstance(graph_json, list)
|
|
|
|
|
|
@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
|
|
def test__get_workspace(mock_emit, mock_prefect_cloud_client):
|
|
workspace_name = DatahubEmitter()._get_workspace()
|
|
assert workspace_name == "datahub"
|
|
|
|
|
|
@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
|
|
def test_add_task(mock_emit, mock_run_context):
|
|
mock_emitter = Mock()
|
|
mock_emit.return_value = mock_emitter
|
|
|
|
datahub_emitter = DatahubEmitter()
|
|
inputs: Optional[List[_Entity]] = [Dataset("snowflake", "mydb.schema.tableA")]
|
|
outputs: Optional[List[_Entity]] = [Dataset("snowflake", "mydb.schema.tableC")]
|
|
datahub_emitter.add_task(
|
|
inputs=inputs,
|
|
outputs=outputs,
|
|
)
|
|
|
|
task_run_ctx = mock_run_context[0]
|
|
flow_run_ctx = mock_run_context[1]
|
|
|
|
expected_datajob_urn = (
|
|
f"urn:li:dataJob:(urn:li:dataFlow:"
|
|
f"(prefect,{flow_run_ctx.flow.name},PROD),{task_run_ctx.task.task_key})"
|
|
)
|
|
|
|
assert expected_datajob_urn in datahub_emitter._datajobs_to_emit.keys()
|
|
actual_datajob = datahub_emitter._datajobs_to_emit[expected_datajob_urn]
|
|
assert isinstance(actual_datajob, DataJob)
|
|
assert str(actual_datajob.flow_urn) == "urn:li:dataFlow:(prefect,etl,PROD)"
|
|
assert actual_datajob.name == task_run_ctx.task.name
|
|
assert actual_datajob.description == task_run_ctx.task.description
|
|
assert actual_datajob.tags == task_run_ctx.task.tags
|
|
assert (
|
|
str(actual_datajob.inlets[0])
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)"
|
|
)
|
|
assert (
|
|
str(actual_datajob.outlets[0])
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
|
|
)
|
|
assert mock_emit.emit.call_count == 0
|
|
|
|
|
|
@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
|
|
def test_emit_flow(
|
|
mock_emit, mock_run_context, mock_prefect_client, mock_prefect_cloud_client
|
|
):
|
|
mock_emitter = Mock()
|
|
mock_emit.return_value = mock_emitter
|
|
|
|
platform_instance = "datahub_workspace"
|
|
|
|
datahub_emitter = DatahubEmitter(platform_instance=platform_instance)
|
|
datahub_emitter.add_task()
|
|
datahub_emitter.emit_flow()
|
|
|
|
task_run_ctx = mock_run_context[0]
|
|
flow_run_ctx = mock_run_context[1]
|
|
|
|
expected_dataflow_urn = (
|
|
f"urn:li:dataFlow:(prefect,{platform_instance}.{flow_run_ctx.flow.name},PROD)"
|
|
)
|
|
|
|
# print method calls for debugging
|
|
print("\nMethod calls indices:")
|
|
for i, call in enumerate(mock_emitter.method_calls):
|
|
if (
|
|
len(call[1]) > 0
|
|
and hasattr(call[1][0], "aspectName")
|
|
and hasattr(call[1][0], "entityUrn")
|
|
):
|
|
print(
|
|
f"Index {i}: aspectName={call[1][0].aspectName}, entityUrn={call[1][0].entityUrn}"
|
|
)
|
|
|
|
# Ignore the first call (index 0) which is a connection call
|
|
# DataFlow assertions
|
|
assert mock_emitter.method_calls[1][1][0].aspectName == "dataFlowInfo"
|
|
assert mock_emitter.method_calls[1][1][0].entityUrn == expected_dataflow_urn
|
|
assert mock_emitter.method_calls[2][1][0].aspectName == "status"
|
|
assert mock_emitter.method_calls[2][1][0].entityUrn == expected_dataflow_urn
|
|
assert mock_emitter.method_calls[3][1][0].aspectName == "dataPlatformInstance"
|
|
assert mock_emitter.method_calls[3][1][0].entityUrn == expected_dataflow_urn
|
|
assert mock_emitter.method_calls[4][1][0].aspectName == "ownership"
|
|
assert mock_emitter.method_calls[4][1][0].entityUrn == expected_dataflow_urn
|
|
assert mock_emitter.method_calls[5][1][0].aspectName == "globalTags"
|
|
assert mock_emitter.method_calls[5][1][0].entityUrn == expected_dataflow_urn
|
|
assert mock_emitter.method_calls[6][1][0].aspectName == "browsePaths"
|
|
assert mock_emitter.method_calls[6][1][0].entityUrn == expected_dataflow_urn
|
|
# DataFlow assertions - duplicated
|
|
assert mock_emitter.method_calls[7][1][0].aspectName == "dataFlowInfo"
|
|
assert mock_emitter.method_calls[7][1][0].entityUrn == expected_dataflow_urn
|
|
assert mock_emitter.method_calls[8][1][0].aspectName == "status"
|
|
assert mock_emitter.method_calls[8][1][0].entityUrn == expected_dataflow_urn
|
|
assert mock_emitter.method_calls[9][1][0].aspectName == "dataPlatformInstance"
|
|
assert mock_emitter.method_calls[9][1][0].entityUrn == expected_dataflow_urn
|
|
assert mock_emitter.method_calls[10][1][0].aspectName == "ownership"
|
|
assert mock_emitter.method_calls[10][1][0].entityUrn == expected_dataflow_urn
|
|
assert mock_emitter.method_calls[11][1][0].aspectName == "globalTags"
|
|
assert mock_emitter.method_calls[11][1][0].entityUrn == expected_dataflow_urn
|
|
|
|
# DataProcessInstance assertions for the flow
|
|
assert (
|
|
mock_emitter.method_calls[12][1][0].aspectName
|
|
== "dataProcessInstanceProperties"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[12][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[13][1][0].aspectName
|
|
== "dataProcessInstanceRelationships"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[13][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac"
|
|
)
|
|
assert mock_emitter.method_calls[14][1][0].aspectName == "dataPlatformInstance"
|
|
assert (
|
|
mock_emitter.method_calls[14][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[15][1][0].aspectName == "dataProcessInstanceRunEvent"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[15][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac"
|
|
)
|
|
|
|
# DataJob assertions for extract
|
|
assert mock_emitter.method_calls[16][1][0].aspectName == "dataJobInfo"
|
|
assert (
|
|
mock_emitter.method_calls[16][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
|
|
)
|
|
assert mock_emitter.method_calls[17][1][0].aspectName == "dataPlatformInstance"
|
|
assert (
|
|
mock_emitter.method_calls[17][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
|
|
)
|
|
assert mock_emitter.method_calls[18][1][0].aspectName == "status"
|
|
assert (
|
|
mock_emitter.method_calls[18][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
|
|
)
|
|
assert mock_emitter.method_calls[19][1][0].aspectName == "dataJobInputOutput"
|
|
assert (
|
|
mock_emitter.method_calls[19][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
|
|
)
|
|
assert mock_emitter.method_calls[20][1][0].aspectName == "ownership"
|
|
assert (
|
|
mock_emitter.method_calls[20][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
|
|
)
|
|
assert mock_emitter.method_calls[21][1][0].aspectName == "globalTags"
|
|
assert (
|
|
mock_emitter.method_calls[21][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
|
|
)
|
|
assert mock_emitter.method_calls[22][1][0].aspectName == "browsePaths"
|
|
assert (
|
|
mock_emitter.method_calls[22][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
|
|
)
|
|
|
|
# DataProcessInstance assertions for extract
|
|
assert (
|
|
mock_emitter.method_calls[23][1][0].aspectName
|
|
== "dataProcessInstanceProperties"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[23][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[24][1][0].aspectName
|
|
== "dataProcessInstanceRelationships"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[24][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
|
|
)
|
|
assert mock_emitter.method_calls[25][1][0].aspectName == "dataPlatformInstance"
|
|
assert (
|
|
mock_emitter.method_calls[25][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[26][1][0].aspectName == "dataProcessInstanceRunEvent"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[26][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[27][1][0].aspectName == "dataProcessInstanceRunEvent"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[27][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
|
|
)
|
|
|
|
# DataJob assertions for load
|
|
assert mock_emitter.method_calls[28][1][0].aspectName == "dataJobInfo"
|
|
assert (
|
|
mock_emitter.method_calls[28][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
|
|
)
|
|
assert mock_emitter.method_calls[29][1][0].aspectName == "dataPlatformInstance"
|
|
assert (
|
|
mock_emitter.method_calls[29][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
|
|
)
|
|
assert mock_emitter.method_calls[30][1][0].aspectName == "status"
|
|
assert (
|
|
mock_emitter.method_calls[30][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
|
|
)
|
|
assert mock_emitter.method_calls[31][1][0].aspectName == "dataJobInputOutput"
|
|
assert (
|
|
mock_emitter.method_calls[31][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
|
|
)
|
|
assert mock_emitter.method_calls[32][1][0].aspectName == "ownership"
|
|
assert (
|
|
mock_emitter.method_calls[32][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
|
|
)
|
|
assert mock_emitter.method_calls[33][1][0].aspectName == "globalTags"
|
|
assert (
|
|
mock_emitter.method_calls[33][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
|
|
)
|
|
assert mock_emitter.method_calls[34][1][0].aspectName == "browsePaths"
|
|
assert (
|
|
mock_emitter.method_calls[34][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
|
|
)
|
|
|
|
# DataProcessInstance assertions for load
|
|
assert (
|
|
mock_emitter.method_calls[35][1][0].aspectName
|
|
== "dataProcessInstanceProperties"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[35][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[36][1][0].aspectName
|
|
== "dataProcessInstanceRelationships"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[36][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
|
|
)
|
|
assert mock_emitter.method_calls[37][1][0].aspectName == "dataPlatformInstance"
|
|
assert (
|
|
mock_emitter.method_calls[37][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[38][1][0].aspectName == "dataProcessInstanceRunEvent"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[38][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[39][1][0].aspectName == "dataProcessInstanceRunEvent"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[39][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
|
|
)
|
|
|
|
# DataJob assertions for transform
|
|
assert mock_emitter.method_calls[40][1][0].aspectName == "dataJobInfo"
|
|
assert (
|
|
mock_emitter.method_calls[40][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
|
|
)
|
|
assert mock_emitter.method_calls[41][1][0].aspectName == "dataPlatformInstance"
|
|
assert (
|
|
mock_emitter.method_calls[41][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
|
|
)
|
|
assert mock_emitter.method_calls[42][1][0].aspectName == "status"
|
|
assert (
|
|
mock_emitter.method_calls[42][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
|
|
)
|
|
assert mock_emitter.method_calls[43][1][0].aspectName == "dataJobInputOutput"
|
|
assert (
|
|
mock_emitter.method_calls[43][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
|
|
)
|
|
assert mock_emitter.method_calls[44][1][0].aspectName == "ownership"
|
|
assert (
|
|
mock_emitter.method_calls[44][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
|
|
)
|
|
assert mock_emitter.method_calls[45][1][0].aspectName == "globalTags"
|
|
assert (
|
|
mock_emitter.method_calls[45][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[45][1][0].aspect.tags[0].tag
|
|
== f"urn:li:tag:{task_run_ctx.task.tags[0]}"
|
|
)
|
|
assert mock_emitter.method_calls[46][1][0].aspectName == "browsePaths"
|
|
assert (
|
|
mock_emitter.method_calls[46][1][0].entityUrn
|
|
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
|
|
)
|
|
|
|
# DataProcessInstance assertions for transform
|
|
assert (
|
|
mock_emitter.method_calls[47][1][0].aspectName
|
|
== "dataProcessInstanceProperties"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[47][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[48][1][0].aspectName
|
|
== "dataProcessInstanceRelationships"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[48][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
|
|
)
|
|
assert mock_emitter.method_calls[49][1][0].aspectName == "dataPlatformInstance"
|
|
assert (
|
|
mock_emitter.method_calls[49][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[50][1][0].aspectName == "dataProcessInstanceRunEvent"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[50][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[51][1][0].aspectName == "dataProcessInstanceRunEvent"
|
|
)
|
|
assert (
|
|
mock_emitter.method_calls[51][1][0].entityUrn
|
|
== "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
|
|
)
|