diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 1b9b4bc4f6..d88a5c392f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -353,6 +353,9 @@ class FivetranSource(StatefulIngestionSourceBase): if not source_urn or not dest_urn: return + # Initialize field-level lineage collection + lineage_field_map: Dict[str, List[str]] = {} + # If there are explicit column mappings, use them if lineage.column_lineage: for column_lineage in lineage.column_lineage: @@ -381,13 +384,20 @@ class FivetranSource(StatefulIngestionSourceBase): downstreams=[dest_field_urn], ) ) + + # Group source fields by destination field + if dest_field_urn not in lineage_field_map: + lineage_field_map[dest_field_urn] = [] + lineage_field_map[dest_field_urn].append(source_field_urn) + except Exception as e: logger.warning(f"Failed to create column lineage: {e}") + + # If we have field mappings, create dataset-level lineage with field granularity + if lineage_field_map: + self._create_field_lineage_mcp(source_urn, dest_urn, lineage_field_map) else: # If no column mappings are provided, we'll need to attempt to infer column lineage - # We know that Fivetran generally follows a pattern of preserving column names - # but may adjust case based on destination platform - try: logger.info( f"No explicit column lineage for {lineage.source_table} -> {lineage.destination_table}. " @@ -405,7 +415,7 @@ class FivetranSource(StatefulIngestionSourceBase): context=f"{lineage.source_table} → {lineage.destination_table}", ) - # Add a placeholder entry to indicate column lineage is inferred rather than explicit + # Add a placeholder entry to indicate column lineage is inferred fine_grained_lineage.append( FineGrainedLineage( upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, @@ -415,12 +425,73 @@ class FivetranSource(StatefulIngestionSourceBase): ) ) - # Note: In a more comprehensive solution, we would query the actual schema - # of both tables and create proper column-level lineage - except Exception as e: logger.warning(f"Failed to auto-generate column lineage: {e}") + def _create_field_lineage_mcp( + self, + source_urn: DatasetUrn, + dest_urn: DatasetUrn, + lineage_field_map: Dict[str, List[str]], + ) -> Optional[MetadataWorkUnit]: + """ + Create field-level lineage between datasets using MetadataChangeProposal. + + Args: + source_urn: Source dataset URN + dest_urn: Destination dataset URN + lineage_field_map: Map of destination field URNs to lists of source field URNs + """ + try: + from datahub.emitter.mcp import MetadataChangeProposalWrapper + from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( + DatasetLineageType, + FineGrainedLineage, + FineGrainedLineageDownstreamType, + FineGrainedLineageUpstreamType, + Upstream, + UpstreamLineage, + ) + + # Create the upstream relationship + upstream = Upstream( + dataset=str(source_urn), type=DatasetLineageType.TRANSFORMED + ) + + # Create fine-grained lineages for each field mapping + fine_grained_lineages = [] + + for dest_field, source_fields in lineage_field_map.items(): + fine_grained_lineages.append( + FineGrainedLineage( + upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, + upstreams=source_fields, + downstreamType=FineGrainedLineageDownstreamType.FIELD, + downstreams=[dest_field], + ) + ) + + # Create the lineage aspect + upstream_lineage = UpstreamLineage( + upstreams=[upstream], fineGrainedLineages=fine_grained_lineages + ) + + # Create and emit the MCP + lineage_mcp = MetadataChangeProposalWrapper( + entityUrn=str(dest_urn), + aspect=upstream_lineage, + ) + + # Now create a workunit from this MCP + wu = MetadataWorkUnit(id=f"{dest_urn}-field-lineage", mcp=lineage_mcp) + + # Return the workunit - it will be collected and emitted by the main process + return wu + + except Exception as e: + logger.error(f"Error creating field-level lineage MCP: {e}", exc_info=True) + return None + def _build_lineage_properties( self, connector: Connector, @@ -1009,13 +1080,30 @@ class FivetranSource(StatefulIngestionSourceBase): for mcp in dataflow.generate_mcp(): yield mcp.as_workunit() + # Store field lineage workunits to emit after dataset workunits + field_lineage_workunits = [] + # Check if we should create one datajob per table or one per connector if self.config.datajob_mode == DataJobMode.PER_TABLE: # Create one datajob per table - yield from self._get_per_table_datajob_workunits(connector, dataflow) + for wu in self._get_per_table_datajob_workunits(connector, dataflow): + # If this is a field lineage workunit, store it for later + if wu.id.endswith("-field-lineage"): + field_lineage_workunits.append(wu) + else: + yield wu else: # Default: consolidated mode - one datajob per connector - yield from self._get_consolidated_datajob_workunits(connector, dataflow) + for wu in self._get_consolidated_datajob_workunits(connector, dataflow): + # If this is a field lineage workunit, store it for later + if wu.id.endswith("-field-lineage"): + field_lineage_workunits.append(wu) + else: + yield wu + + # Now emit the field lineage workunits after all dataset workunits + for wu in field_lineage_workunits: + yield wu def _report_lineage_truncation(self, connector: Connector) -> None: """Report warning about truncated lineage."""