fix(ingest/fivetran): only materialize upstream lineage (#9490)

This commit is contained in:
Harshal Sheth 2023-12-21 13:35:34 -05:00 committed by GitHub
parent 55cb56821c
commit b80d2f471c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 39 deletions

View File

@ -7,6 +7,7 @@ from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance, DataProcessInstance,
InstanceRunResult, InstanceRunResult,
) )
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import ( from datahub.ingestion.api.decorators import (
SourceCapability, SourceCapability,
@ -248,14 +249,18 @@ class FivetranSource(StatefulIngestionSourceBase):
# Map Fivetran's connector entity with Datahub's datajob entity # Map Fivetran's connector entity with Datahub's datajob entity
datajob = self._generate_datajob_from_connector(connector) datajob = self._generate_datajob_from_connector(connector)
for mcp in datajob.generate_mcp(materialize_iolets=True): for mcp in datajob.generate_mcp(materialize_iolets=False):
if mcp.entityType == "dataset" and isinstance(mcp.aspect, StatusClass):
# While we "materialize" the referenced datasets, we don't want them
# to be tracked by stateful ingestion.
yield mcp.as_workunit(is_primary_source=False)
else:
yield mcp.as_workunit() yield mcp.as_workunit()
# Materialize the upstream referenced datasets.
# We assume that the downstreams are materialized by other ingestion sources.
for iolet in datajob.inlets:
# We don't want these to be tracked by stateful ingestion.
yield MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
).as_workunit(is_primary_source=False)
# Map Fivetran's job/sync history entity with Datahub's data process entity # Map Fivetran's job/sync history entity with Datahub's data process entity
for job in connector.jobs: for job in connector.jobs:
dpi = self._generate_dpi_from_job(job, datajob) dpi = self._generate_dpi_from_job(job, datajob)

View File

@ -178,38 +178,6 @@
"lastRunId": "no-run-id-provided" "lastRunId": "no-run-id-provided"
} }
}, },
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.employee,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.company,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{ {
"entityType": "dataJob", "entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",