diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index c0395b4e4e..12e362fa8a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -7,6 +7,7 @@ from datahub.api.entities.dataprocess.dataprocess_instance import ( DataProcessInstance, InstanceRunResult, ) +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SourceCapability, @@ -248,13 +249,17 @@ class FivetranSource(StatefulIngestionSourceBase): # Map Fivetran's connector entity with Datahub's datajob entity datajob = self._generate_datajob_from_connector(connector) - for mcp in datajob.generate_mcp(materialize_iolets=True): - if mcp.entityType == "dataset" and isinstance(mcp.aspect, StatusClass): - # While we "materialize" the referenced datasets, we don't want them - # to be tracked by stateful ingestion. - yield mcp.as_workunit(is_primary_source=False) - else: - yield mcp.as_workunit() + for mcp in datajob.generate_mcp(materialize_iolets=False): + yield mcp.as_workunit() + + # Materialize the upstream referenced datasets. + # We assume that the downstreams are materialized by other ingestion sources. + for iolet in datajob.inlets: + # We don't want these to be tracked by stateful ingestion. + yield MetadataChangeProposalWrapper( + entityUrn=str(iolet), + aspect=StatusClass(removed=False), + ).as_workunit(is_primary_source=False) # Map Fivetran's job/sync history entity with Datahub's data process entity for job in connector.jobs: diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_golden.json index a72c960a72..b8f05fa6e9 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_golden.json @@ -178,38 +178,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.employee,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.company,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",