diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index d88a5c392f..cc47a68565 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -353,11 +353,14 @@ class FivetranSource(StatefulIngestionSourceBase): if not source_urn or not dest_urn: return - # Initialize field-level lineage collection - lineage_field_map: Dict[str, List[str]] = {} + # Log details for debugging + logger.info(f"Creating column lineage from {source_urn} to {dest_urn}") - # If there are explicit column mappings, use them + # If there are explicit column mappings, use them directly if lineage.column_lineage: + # Log the number of column mappings we're processing + logger.info(f"Processing {len(lineage.column_lineage)} column mappings") + for column_lineage in lineage.column_lineage: if ( not column_lineage.source_column @@ -365,17 +368,37 @@ class FivetranSource(StatefulIngestionSourceBase): or column_lineage.destination_column.startswith("_fivetran") ): continue + + # Log the column mapping + logger.debug( + f"Column mapping: {column_lineage.source_column} -> {column_lineage.destination_column}" + ) + try: + # Use column names directly as they should already be in the correct form + source_column = column_lineage.source_column + destination_column = column_lineage.destination_column + + # For BigQuery specifically, ensure the field names are correctly formed + dest_platform = str(dest_urn).split(",")[0].split(":")[-1] + if dest_platform.lower() == "bigquery": + # BigQuery fields are case-sensitive, ensure proper case + destination_column = destination_column.lower() + # Make sure we don't have any special characters in field path + destination_column = destination_column.replace(".", "_") + + # Create field URNs for source and destination source_field_urn = builder.make_schema_field_urn( str(source_urn), - column_lineage.source_column, + source_column, ) dest_field_urn = builder.make_schema_field_urn( str(dest_urn), - column_lineage.destination_column, + destination_column, ) + # Add to fine-grained lineage fine_grained_lineage.append( FineGrainedLineage( upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, @@ -385,48 +408,39 @@ class FivetranSource(StatefulIngestionSourceBase): ) ) - # Group source fields by destination field - if dest_field_urn not in lineage_field_map: - lineage_field_map[dest_field_urn] = [] - lineage_field_map[dest_field_urn].append(source_field_urn) - - except Exception as e: - logger.warning(f"Failed to create column lineage: {e}") - - # If we have field mappings, create dataset-level lineage with field granularity - if lineage_field_map: - self._create_field_lineage_mcp(source_urn, dest_urn, lineage_field_map) - else: - # If no column mappings are provided, we'll need to attempt to infer column lineage - try: - logger.info( - f"No explicit column lineage for {lineage.source_table} -> {lineage.destination_table}. " - f"Auto-generating column lineage based on assumed name preservation." - ) - - # Add a special note in the report - self.report.info( - title="Auto-generated column lineage", - message=( - "Column lineage was automatically generated for some tables based on Fivetran's " - "standard column mapping patterns. Columns are assumed to keep the same name " - "with possible case adjustments based on the destination platform." - ), - context=f"{lineage.source_table} → {lineage.destination_table}", - ) - - # Add a placeholder entry to indicate column lineage is inferred - fine_grained_lineage.append( - FineGrainedLineage( - upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, - upstreams=[str(source_urn)], - downstreamType=FineGrainedLineageDownstreamType.FIELD, - downstreams=[str(dest_urn)], + logger.debug( + f"Added field lineage: {source_field_urn} -> {dest_field_urn}" + ) + except Exception as e: + logger.warning( + f"Failed to create column lineage for {column_lineage.source_column} -> {column_lineage.destination_column}: {e}" ) - ) - except Exception as e: - logger.warning(f"Failed to auto-generate column lineage: {e}") + # Log the total number of lineage entries created + if fine_grained_lineage: + logger.info( + f"Created {len(fine_grained_lineage)} field lineage entries for {source_urn} -> {dest_urn}" + ) + else: + logger.warning( + f"No valid column lineage mappings found for {source_urn} -> {dest_urn}" + ) + else: + # No column mappings provided - log a warning + logger.warning( + f"No column lineage data available for {lineage.source_table} -> {lineage.destination_table}. " + f"This may indicate an issue with schema retrieval from the Fivetran API." + ) + + # Add a placeholder entry to indicate table-level lineage only + fine_grained_lineage.append( + FineGrainedLineage( + upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, + upstreams=[str(source_urn)], + downstreamType=FineGrainedLineageDownstreamType.FIELD, + downstreams=[str(dest_urn)], + ) + ) def _create_field_lineage_mcp( self,