mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-05 08:07:04 +00:00
258 lines
8.7 KiB
Python
258 lines
8.7 KiB
Python
import time
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
import datahub.metadata.schema_classes as models
|
|
from datahub.emitter.mce_builder import make_data_job_urn, make_dataset_urn
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.metadata.schema_classes import (
|
|
DataJobInfoClass,
|
|
DataJobInputOutputClass,
|
|
EdgeClass,
|
|
)
|
|
from datahub.specific.datajob import DataJobPatchBuilder
|
|
from tests.patch.common_patch_tests import (
|
|
helper_test_custom_properties_patch,
|
|
helper_test_dataset_tags_patch,
|
|
helper_test_entity_terms_patch,
|
|
helper_test_ownership_patch,
|
|
)
|
|
|
|
|
|
def _make_test_datajob_urn(
|
|
seedFlow: str = "SampleAirflowDag", seedTask: str = "SampleAirflowTask"
|
|
):
|
|
return make_data_job_urn(
|
|
orchestrator="airflow",
|
|
flow_id=f"{seedFlow}{uuid.uuid4()}",
|
|
job_id=f"{seedTask}{uuid.uuid4()}",
|
|
)
|
|
|
|
|
|
# Common Aspect Patch Tests
|
|
# Ownership
|
|
@pytest.mark.parametrize(
|
|
"client_fixture_name", ["graph_client", "openapi_graph_client"]
|
|
)
|
|
def test_datajob_ownership_patch(request, client_fixture_name):
|
|
graph_client = request.getfixturevalue(client_fixture_name)
|
|
datajob_urn = _make_test_datajob_urn()
|
|
helper_test_ownership_patch(graph_client, datajob_urn, DataJobPatchBuilder)
|
|
|
|
|
|
# Tags
|
|
@pytest.mark.parametrize(
|
|
"client_fixture_name", ["graph_client", "openapi_graph_client"]
|
|
)
|
|
def test_datajob_tags_patch(request, client_fixture_name):
|
|
graph_client = request.getfixturevalue(client_fixture_name)
|
|
helper_test_dataset_tags_patch(
|
|
graph_client, _make_test_datajob_urn(), DataJobPatchBuilder
|
|
)
|
|
|
|
|
|
# Terms
|
|
@pytest.mark.parametrize(
|
|
"client_fixture_name", ["graph_client", "openapi_graph_client"]
|
|
)
|
|
def test_dataset_terms_patch(request, client_fixture_name):
|
|
graph_client = request.getfixturevalue(client_fixture_name)
|
|
helper_test_entity_terms_patch(
|
|
graph_client, _make_test_datajob_urn(), DataJobPatchBuilder
|
|
)
|
|
|
|
|
|
# Custom Properties
|
|
@pytest.mark.parametrize(
|
|
"client_fixture_name", ["graph_client", "openapi_graph_client"]
|
|
)
|
|
def test_custom_properties_patch(request, client_fixture_name):
|
|
graph_client = request.getfixturevalue(client_fixture_name)
|
|
orig_datajob_info = DataJobInfoClass(name="test_name", type="TestJobType")
|
|
helper_test_custom_properties_patch(
|
|
graph_client,
|
|
test_entity_urn=_make_test_datajob_urn(),
|
|
patch_builder_class=DataJobPatchBuilder,
|
|
custom_properties_aspect_class=DataJobInfoClass,
|
|
base_aspect=orig_datajob_info,
|
|
)
|
|
|
|
|
|
# Specific Aspect Patch Tests
|
|
# Input/Output
|
|
@pytest.mark.parametrize(
|
|
"client_fixture_name", ["graph_client", "openapi_graph_client"]
|
|
)
|
|
def test_datajob_inputoutput_dataset_patch(request, client_fixture_name):
|
|
graph_client = request.getfixturevalue(client_fixture_name)
|
|
datajob_urn = _make_test_datajob_urn()
|
|
|
|
other_dataset_urn = make_dataset_urn(
|
|
platform="hive", name=f"SampleHiveDataset2-{uuid.uuid4()}", env="PROD"
|
|
)
|
|
|
|
patch_dataset_urn = make_dataset_urn(
|
|
platform="hive", name=f"SampleHiveDataset3-{uuid.uuid4()}", env="PROD"
|
|
)
|
|
|
|
inputoutput_lineage = DataJobInputOutputClass(
|
|
inputDatasets=[],
|
|
outputDatasets=[],
|
|
inputDatasetEdges=[EdgeClass(destinationUrn=other_dataset_urn)],
|
|
)
|
|
dataset_input_lineage_to_add = EdgeClass(destinationUrn=patch_dataset_urn)
|
|
mcpw = MetadataChangeProposalWrapper(
|
|
entityUrn=datajob_urn, aspect=inputoutput_lineage
|
|
)
|
|
|
|
graph_client.emit_mcp(mcpw)
|
|
inputoutput_lineage_read = graph_client.get_aspect(
|
|
entity_urn=datajob_urn,
|
|
aspect_type=DataJobInputOutputClass,
|
|
)
|
|
assert inputoutput_lineage_read is not None
|
|
assert inputoutput_lineage_read.inputDatasetEdges is not None
|
|
assert (
|
|
inputoutput_lineage_read.inputDatasetEdges[0].destinationUrn
|
|
== other_dataset_urn
|
|
)
|
|
|
|
for patch_mcp in (
|
|
DataJobPatchBuilder(datajob_urn)
|
|
.add_input_dataset(dataset_input_lineage_to_add)
|
|
.build()
|
|
):
|
|
graph_client.emit_mcp(patch_mcp)
|
|
pass
|
|
|
|
inputoutput_lineage_read = graph_client.get_aspect(
|
|
entity_urn=datajob_urn,
|
|
aspect_type=DataJobInputOutputClass,
|
|
)
|
|
assert inputoutput_lineage_read is not None
|
|
assert inputoutput_lineage_read.inputDatasetEdges is not None
|
|
assert len(inputoutput_lineage_read.inputDatasetEdges) == 2
|
|
assert (
|
|
inputoutput_lineage_read.inputDatasetEdges[0].destinationUrn
|
|
== other_dataset_urn
|
|
)
|
|
assert (
|
|
inputoutput_lineage_read.inputDatasetEdges[1].destinationUrn
|
|
== patch_dataset_urn
|
|
)
|
|
|
|
for patch_mcp in (
|
|
DataJobPatchBuilder(datajob_urn).remove_input_dataset(patch_dataset_urn).build()
|
|
):
|
|
graph_client.emit_mcp(patch_mcp)
|
|
pass
|
|
|
|
inputoutput_lineage_read = graph_client.get_aspect(
|
|
entity_urn=datajob_urn,
|
|
aspect_type=DataJobInputOutputClass,
|
|
)
|
|
assert inputoutput_lineage_read is not None
|
|
assert inputoutput_lineage_read.inputDatasetEdges is not None
|
|
assert len(inputoutput_lineage_read.inputDatasetEdges) == 1
|
|
assert (
|
|
inputoutput_lineage_read.inputDatasetEdges[0].destinationUrn
|
|
== other_dataset_urn
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"client_fixture_name", ["graph_client", "openapi_graph_client"]
|
|
)
|
|
def test_datajob_multiple_inputoutput_dataset_patch(request, client_fixture_name):
|
|
graph_client = request.getfixturevalue(client_fixture_name)
|
|
"""Test creating a data job with multiple input and output datasets and verifying the aspects."""
|
|
# Create the data job
|
|
datajob_urn = "urn:li:dataJob:(urn:li:dataFlow:(airflow,training,default),training)"
|
|
|
|
# Create input and output dataset URNs
|
|
input_datasets = ["input_data_1", "input_data_2"]
|
|
output_datasets = ["output_data_1", "output_data_2"]
|
|
|
|
input_dataset_urns = [
|
|
make_dataset_urn(platform="s3", name=f"test_patch_{dataset}", env="PROD")
|
|
for dataset in input_datasets
|
|
]
|
|
output_dataset_urns = [
|
|
make_dataset_urn(platform="s3", name=f"test_patch_{dataset}", env="PROD")
|
|
for dataset in output_datasets
|
|
]
|
|
|
|
# Create edges for datasets
|
|
def make_edge(urn, generate_auditstamp=False):
|
|
audit_stamp = models.AuditStampClass(
|
|
time=int(time.time() * 1000.0),
|
|
actor="urn:li:corpuser:datahub",
|
|
)
|
|
return EdgeClass(
|
|
destinationUrn=str(urn),
|
|
lastModified=audit_stamp if generate_auditstamp else None,
|
|
)
|
|
|
|
# Initialize empty input/output lineage
|
|
initial_lineage = DataJobInputOutputClass(
|
|
inputDatasets=[], outputDatasets=[], inputDatasetEdges=[], outputDatasetEdges=[]
|
|
)
|
|
|
|
# Emit initial lineage
|
|
mcpw = MetadataChangeProposalWrapper(entityUrn=datajob_urn, aspect=initial_lineage)
|
|
graph_client.emit_mcp(mcpw)
|
|
|
|
# Create patches for input and output datasets
|
|
patch_builder = DataJobPatchBuilder(datajob_urn)
|
|
for input_urn in input_dataset_urns:
|
|
patch_builder.add_input_dataset(make_edge(input_urn))
|
|
for output_urn in output_dataset_urns:
|
|
patch_builder.add_output_dataset(make_edge(output_urn))
|
|
|
|
# Apply patches
|
|
for patch_mcp in patch_builder.build():
|
|
graph_client.emit_mcp(patch_mcp)
|
|
|
|
# Verify the lineage was correctly applied
|
|
lineage_aspect = graph_client.get_aspect(
|
|
entity_urn=datajob_urn,
|
|
aspect_type=DataJobInputOutputClass,
|
|
)
|
|
|
|
# Assert lineage was created
|
|
assert lineage_aspect is not None
|
|
assert lineage_aspect.inputDatasetEdges is not None
|
|
assert lineage_aspect.outputDatasetEdges is not None
|
|
|
|
# Verify input datasets
|
|
assert len(lineage_aspect.inputDatasetEdges) == len(input_datasets)
|
|
input_urns = {edge.destinationUrn for edge in lineage_aspect.inputDatasetEdges}
|
|
expected_input_urns = {str(urn) for urn in input_dataset_urns}
|
|
assert input_urns == expected_input_urns
|
|
|
|
# Verify output datasets
|
|
assert len(lineage_aspect.outputDatasetEdges) == len(output_datasets)
|
|
output_urns = {edge.destinationUrn for edge in lineage_aspect.outputDatasetEdges}
|
|
expected_output_urns = {str(urn) for urn in output_dataset_urns}
|
|
assert output_urns == expected_output_urns
|
|
|
|
# Test updating the same datasets again (idempotency)
|
|
patch_builder = DataJobPatchBuilder(datajob_urn)
|
|
for input_urn in input_dataset_urns:
|
|
patch_builder.add_input_dataset(make_edge(input_urn))
|
|
for output_urn in output_dataset_urns:
|
|
patch_builder.add_output_dataset(make_edge(output_urn))
|
|
|
|
for patch_mcp in patch_builder.build():
|
|
graph_client.emit_mcp(patch_mcp)
|
|
|
|
# Verify the aspect hasn't changed
|
|
updated_lineage_aspect = graph_client.get_aspect(
|
|
entity_urn=datajob_urn,
|
|
aspect_type=DataJobInputOutputClass,
|
|
)
|
|
|
|
assert updated_lineage_aspect is not None
|
|
assert updated_lineage_aspect.to_obj() == lineage_aspect.to_obj()
|