fix(py-sdk): DataJobPatchBuilder handling timestamps, output edges (#12067)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Shirshanka Das 2024-12-09 16:40:31 -08:00 committed by GitHub
parent 8a1c1804b7
commit e4ea993df1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 190 additions and 84 deletions

View File

@ -48,7 +48,14 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11619 - schema field/column paths can no longer be duplicated within the schema
- #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this.
- #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries *entities* (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted
(after 10d) or are timeseries *entities* (dataprocess, execution requests)
will be removed automatically using logic in the `datahub-gc` ingestion
source.
- #12067 - Default behavior of DataJobPatchBuilder in Python sdk has been
changed to NOT fill out `created` and `lastModified` auditstamps by default
for input and output dataset edges. This should not have any user-observable
impact (time-based lineage viz will still continue working based on observed time), but could break assumptions previously being made by clients.
### Potential Downtime

View File

@ -102,7 +102,7 @@ class DataJobPatchBuilder(MetadataPatchProposal):
Notes:
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
it is converted to an Edge object and added without any audit stamps.
"""
if isinstance(input, Edge):
input_urn: str = input.destinationUrn
@ -114,8 +114,6 @@ class DataJobPatchBuilder(MetadataPatchProposal):
input_edge = Edge(
destinationUrn=input_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)
self._ensure_urn_type("dataJob", [input_edge], "add_input_datajob")
@ -185,7 +183,7 @@ class DataJobPatchBuilder(MetadataPatchProposal):
Notes:
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
it is converted to an Edge object and added without any audit stamps.
"""
if isinstance(input, Edge):
input_urn: str = input.destinationUrn
@ -197,8 +195,6 @@ class DataJobPatchBuilder(MetadataPatchProposal):
input_edge = Edge(
destinationUrn=input_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)
self._ensure_urn_type("dataset", [input_edge], "add_input_dataset")
@ -270,7 +266,7 @@ class DataJobPatchBuilder(MetadataPatchProposal):
Notes:
If `output` is an Edge object, it is used directly. If `output` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
it is converted to an Edge object and added without any audit stamps.
"""
if isinstance(output, Edge):
output_urn: str = output.destinationUrn
@ -282,15 +278,13 @@ class DataJobPatchBuilder(MetadataPatchProposal):
output_edge = Edge(
destinationUrn=output_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)
self._ensure_urn_type("dataset", [output_edge], "add_output_dataset")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/outputDatasetEdges/{self.quote(str(output))}",
path=f"/outputDatasetEdges/{self.quote(output_urn)}",
value=output_edge,
)
return self

View File

@ -60,15 +60,7 @@
"op": "add",
"path": "/inputDatajobEdges/urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
"value": {
"destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
"destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)"
}
}
]
@ -178,30 +170,14 @@
"op": "add",
"path": "/inputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)"
}
},
{
"op": "add",
"path": "/inputDatajobEdges/urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
"value": {
"destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
"destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)"
}
}
]
@ -287,15 +263,7 @@
"op": "add",
"path": "/inputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)"
}
}
]

View File

@ -1,5 +1,6 @@
import json
import pathlib
from typing import Any, Dict, Union
import pytest
from freezegun.api import freeze_time
@ -15,7 +16,9 @@ from datahub.emitter.mce_builder import (
)
from datahub.ingestion.sink.file import write_metadata_file
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetLineageTypeClass,
EdgeClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
@ -182,8 +185,66 @@ def test_basic_dashboard_patch_builder():
]
@pytest.mark.parametrize(
"created_on,last_modified,expected_actor",
[
(1586847600000, 1586847600000, "urn:li:corpuser:datahub"),
(None, None, "urn:li:corpuser:datahub"),
(1586847600000, None, "urn:li:corpuser:datahub"),
(None, 1586847600000, "urn:li:corpuser:datahub"),
],
ids=["both_timestamps", "no_timestamps", "only_created", "only_modified"],
)
@freeze_time("2020-04-14 07:00:00")
def test_datajob_patch_builder():
def test_datajob_patch_builder(created_on, last_modified, expected_actor):
def make_edge_or_urn(urn: str) -> Union[EdgeClass, str]:
if created_on or last_modified:
return EdgeClass(
destinationUrn=str(urn),
created=(
AuditStampClass(
time=created_on,
actor=expected_actor,
)
if created_on
else None
),
lastModified=(
AuditStampClass(
time=last_modified,
actor=expected_actor,
)
if last_modified
else None
),
)
return urn
def get_edge_expectation(urn: str) -> Dict[str, Any]:
if created_on or last_modified:
expected = {
"destinationUrn": str(urn),
"created": (
AuditStampClass(
time=created_on,
actor=expected_actor,
).to_obj()
if created_on
else None
),
"lastModified": (
AuditStampClass(
time=last_modified,
actor=expected_actor,
).to_obj()
if last_modified
else None
),
}
# filter out None values
return {k: v for k, v in expected.items() if v is not None}
return {"destinationUrn": str(urn)}
flow_urn = make_data_flow_urn(
orchestrator="nifi", flow_id="252C34e5af19-0192-1000-b248-b1abee565b5d"
)
@ -193,13 +254,19 @@ def test_datajob_patch_builder():
patcher = DataJobPatchBuilder(job_urn)
patcher.add_output_dataset(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
make_edge_or_urn(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
)
)
patcher.add_output_dataset(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
make_edge_or_urn(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
)
)
patcher.add_output_dataset(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
make_edge_or_urn(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
)
)
assert patcher.build() == [
@ -214,47 +281,23 @@ def test_datajob_patch_builder():
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder1,DEV)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)",
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
},
"value": get_edge_expectation(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
),
},
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder3,DEV)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)",
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
},
"value": get_edge_expectation(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
),
},
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder2,DEV)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)",
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
},
"value": get_edge_expectation(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
),
},
]
).encode("utf-8"),

View File

@ -1,5 +1,7 @@
import time
import uuid
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import make_data_job_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
@ -136,3 +138,95 @@ def test_datajob_inputoutput_dataset_patch(graph_client):
inputoutput_lineage_read.inputDatasetEdges[0].destinationUrn
== other_dataset_urn
)
def test_datajob_multiple_inputoutput_dataset_patch(graph_client):
"""Test creating a data job with multiple input and output datasets and verifying the aspects."""
# Create the data job
datajob_urn = "urn:li:dataJob:(urn:li:dataFlow:(airflow,training,default),training)"
# Create input and output dataset URNs
input_datasets = ["input_data_1", "input_data_2"]
output_datasets = ["output_data_1", "output_data_2"]
input_dataset_urns = [
make_dataset_urn(platform="s3", name=f"test_patch_{dataset}", env="PROD")
for dataset in input_datasets
]
output_dataset_urns = [
make_dataset_urn(platform="s3", name=f"test_patch_{dataset}", env="PROD")
for dataset in output_datasets
]
# Create edges for datasets
def make_edge(urn, generate_auditstamp=False):
audit_stamp = models.AuditStampClass(
time=int(time.time() * 1000.0),
actor="urn:li:corpuser:datahub",
)
return EdgeClass(
destinationUrn=str(urn),
lastModified=audit_stamp if generate_auditstamp else None,
)
# Initialize empty input/output lineage
initial_lineage = DataJobInputOutputClass(
inputDatasets=[], outputDatasets=[], inputDatasetEdges=[], outputDatasetEdges=[]
)
# Emit initial lineage
mcpw = MetadataChangeProposalWrapper(entityUrn=datajob_urn, aspect=initial_lineage)
graph_client.emit_mcp(mcpw)
# Create patches for input and output datasets
patch_builder = DataJobPatchBuilder(datajob_urn)
for input_urn in input_dataset_urns:
patch_builder.add_input_dataset(make_edge(input_urn))
for output_urn in output_dataset_urns:
patch_builder.add_output_dataset(make_edge(output_urn))
# Apply patches
for patch_mcp in patch_builder.build():
graph_client.emit_mcp(patch_mcp)
# Verify the lineage was correctly applied
lineage_aspect = graph_client.get_aspect(
entity_urn=datajob_urn,
aspect_type=DataJobInputOutputClass,
)
# Assert lineage was created
assert lineage_aspect is not None
assert lineage_aspect.inputDatasetEdges is not None
assert lineage_aspect.outputDatasetEdges is not None
# Verify input datasets
assert len(lineage_aspect.inputDatasetEdges) == len(input_datasets)
input_urns = {edge.destinationUrn for edge in lineage_aspect.inputDatasetEdges}
expected_input_urns = {str(urn) for urn in input_dataset_urns}
assert input_urns == expected_input_urns
# Verify output datasets
assert len(lineage_aspect.outputDatasetEdges) == len(output_datasets)
output_urns = {edge.destinationUrn for edge in lineage_aspect.outputDatasetEdges}
expected_output_urns = {str(urn) for urn in output_dataset_urns}
assert output_urns == expected_output_urns
# Test updating the same datasets again (idempotency)
patch_builder = DataJobPatchBuilder(datajob_urn)
for input_urn in input_dataset_urns:
patch_builder.add_input_dataset(make_edge(input_urn))
for output_urn in output_dataset_urns:
patch_builder.add_output_dataset(make_edge(output_urn))
for patch_mcp in patch_builder.build():
graph_client.emit_mcp(patch_mcp)
# Verify the aspect hasn't changed
updated_lineage_aspect = graph_client.get_aspect(
entity_urn=datajob_urn,
aspect_type=DataJobInputOutputClass,
)
assert updated_lineage_aspect is not None
assert updated_lineage_aspect.to_obj() == lineage_aspect.to_obj()