lineage updates

This commit is contained in:
Jonny Dixon 2025-03-18 16:55:22 +00:00
parent eca67cb5ea
commit 5a25dfd0a9

View File

@ -353,11 +353,14 @@ class FivetranSource(StatefulIngestionSourceBase):
if not source_urn or not dest_urn: if not source_urn or not dest_urn:
return return
# Initialize field-level lineage collection # Log details for debugging
lineage_field_map: Dict[str, List[str]] = {} logger.info(f"Creating column lineage from {source_urn} to {dest_urn}")
# If there are explicit column mappings, use them # If there are explicit column mappings, use them directly
if lineage.column_lineage: if lineage.column_lineage:
# Log the number of column mappings we're processing
logger.info(f"Processing {len(lineage.column_lineage)} column mappings")
for column_lineage in lineage.column_lineage: for column_lineage in lineage.column_lineage:
if ( if (
not column_lineage.source_column not column_lineage.source_column
@ -365,17 +368,37 @@ class FivetranSource(StatefulIngestionSourceBase):
or column_lineage.destination_column.startswith("_fivetran") or column_lineage.destination_column.startswith("_fivetran")
): ):
continue continue
# Log the column mapping
logger.debug(
f"Column mapping: {column_lineage.source_column} -> {column_lineage.destination_column}"
)
try: try:
# Use column names directly as they should already be in the correct form
source_column = column_lineage.source_column
destination_column = column_lineage.destination_column
# For BigQuery specifically, ensure the field names are correctly formed
dest_platform = str(dest_urn).split(",")[0].split(":")[-1]
if dest_platform.lower() == "bigquery":
# BigQuery fields are case-sensitive, ensure proper case
destination_column = destination_column.lower()
# Make sure we don't have any special characters in field path
destination_column = destination_column.replace(".", "_")
# Create field URNs for source and destination
source_field_urn = builder.make_schema_field_urn( source_field_urn = builder.make_schema_field_urn(
str(source_urn), str(source_urn),
column_lineage.source_column, source_column,
) )
dest_field_urn = builder.make_schema_field_urn( dest_field_urn = builder.make_schema_field_urn(
str(dest_urn), str(dest_urn),
column_lineage.destination_column, destination_column,
) )
# Add to fine-grained lineage
fine_grained_lineage.append( fine_grained_lineage.append(
FineGrainedLineage( FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
@ -385,37 +408,31 @@ class FivetranSource(StatefulIngestionSourceBase):
) )
) )
# Group source fields by destination field logger.debug(
if dest_field_urn not in lineage_field_map: f"Added field lineage: {source_field_urn} -> {dest_field_urn}"
lineage_field_map[dest_field_urn] = [] )
lineage_field_map[dest_field_urn].append(source_field_urn)
except Exception as e: except Exception as e:
logger.warning(f"Failed to create column lineage: {e}") logger.warning(
f"Failed to create column lineage for {column_lineage.source_column} -> {column_lineage.destination_column}: {e}"
)
# If we have field mappings, create dataset-level lineage with field granularity # Log the total number of lineage entries created
if lineage_field_map: if fine_grained_lineage:
self._create_field_lineage_mcp(source_urn, dest_urn, lineage_field_map)
else:
# If no column mappings are provided, we'll need to attempt to infer column lineage
try:
logger.info( logger.info(
f"No explicit column lineage for {lineage.source_table} -> {lineage.destination_table}. " f"Created {len(fine_grained_lineage)} field lineage entries for {source_urn} -> {dest_urn}"
f"Auto-generating column lineage based on assumed name preservation." )
else:
logger.warning(
f"No valid column lineage mappings found for {source_urn} -> {dest_urn}"
)
else:
# No column mappings provided - log a warning
logger.warning(
f"No column lineage data available for {lineage.source_table} -> {lineage.destination_table}. "
f"This may indicate an issue with schema retrieval from the Fivetran API."
) )
# Add a special note in the report # Add a placeholder entry to indicate table-level lineage only
self.report.info(
title="Auto-generated column lineage",
message=(
"Column lineage was automatically generated for some tables based on Fivetran's "
"standard column mapping patterns. Columns are assumed to keep the same name "
"with possible case adjustments based on the destination platform."
),
context=f"{lineage.source_table}{lineage.destination_table}",
)
# Add a placeholder entry to indicate column lineage is inferred
fine_grained_lineage.append( fine_grained_lineage.append(
FineGrainedLineage( FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
@ -425,9 +442,6 @@ class FivetranSource(StatefulIngestionSourceBase):
) )
) )
except Exception as e:
logger.warning(f"Failed to auto-generate column lineage: {e}")
def _create_field_lineage_mcp( def _create_field_lineage_mcp(
self, self,
source_urn: DatasetUrn, source_urn: DatasetUrn,