lineage updates

This commit is contained in:
Jonny Dixon 2025-03-18 12:36:36 +00:00
parent 7d14004c99
commit eca67cb5ea

View File

@ -353,6 +353,9 @@ class FivetranSource(StatefulIngestionSourceBase):
if not source_urn or not dest_urn:
return
# Initialize field-level lineage collection
lineage_field_map: Dict[str, List[str]] = {}
# If there are explicit column mappings, use them
if lineage.column_lineage:
for column_lineage in lineage.column_lineage:
@ -381,13 +384,20 @@ class FivetranSource(StatefulIngestionSourceBase):
downstreams=[dest_field_urn],
)
)
# Group source fields by destination field
if dest_field_urn not in lineage_field_map:
lineage_field_map[dest_field_urn] = []
lineage_field_map[dest_field_urn].append(source_field_urn)
except Exception as e:
logger.warning(f"Failed to create column lineage: {e}")
# If we have field mappings, create dataset-level lineage with field granularity
if lineage_field_map:
self._create_field_lineage_mcp(source_urn, dest_urn, lineage_field_map)
else:
# If no column mappings are provided, we'll need to attempt to infer column lineage
# We know that Fivetran generally follows a pattern of preserving column names
# but may adjust case based on destination platform
try:
logger.info(
f"No explicit column lineage for {lineage.source_table} -> {lineage.destination_table}. "
@ -405,7 +415,7 @@ class FivetranSource(StatefulIngestionSourceBase):
context=f"{lineage.source_table}{lineage.destination_table}",
)
# Add a placeholder entry to indicate column lineage is inferred rather than explicit
# Add a placeholder entry to indicate column lineage is inferred
fine_grained_lineage.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
@ -415,12 +425,73 @@ class FivetranSource(StatefulIngestionSourceBase):
)
)
# Note: In a more comprehensive solution, we would query the actual schema
# of both tables and create proper column-level lineage
except Exception as e:
logger.warning(f"Failed to auto-generate column lineage: {e}")
def _create_field_lineage_mcp(
self,
source_urn: DatasetUrn,
dest_urn: DatasetUrn,
lineage_field_map: Dict[str, List[str]],
) -> Optional[MetadataWorkUnit]:
"""
Create field-level lineage between datasets using MetadataChangeProposal.
Args:
source_urn: Source dataset URN
dest_urn: Destination dataset URN
lineage_field_map: Map of destination field URNs to lists of source field URNs
"""
try:
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
)
# Create the upstream relationship
upstream = Upstream(
dataset=str(source_urn), type=DatasetLineageType.TRANSFORMED
)
# Create fine-grained lineages for each field mapping
fine_grained_lineages = []
for dest_field, source_fields in lineage_field_map.items():
fine_grained_lineages.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=source_fields,
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[dest_field],
)
)
# Create the lineage aspect
upstream_lineage = UpstreamLineage(
upstreams=[upstream], fineGrainedLineages=fine_grained_lineages
)
# Create and emit the MCP
lineage_mcp = MetadataChangeProposalWrapper(
entityUrn=str(dest_urn),
aspect=upstream_lineage,
)
# Now create a workunit from this MCP
wu = MetadataWorkUnit(id=f"{dest_urn}-field-lineage", mcp=lineage_mcp)
# Return the workunit - it will be collected and emitted by the main process
return wu
except Exception as e:
logger.error(f"Error creating field-level lineage MCP: {e}", exc_info=True)
return None
def _build_lineage_properties(
self,
connector: Connector,
@ -1009,13 +1080,30 @@ class FivetranSource(StatefulIngestionSourceBase):
for mcp in dataflow.generate_mcp():
yield mcp.as_workunit()
# Store field lineage workunits to emit after dataset workunits
field_lineage_workunits = []
# Check if we should create one datajob per table or one per connector
if self.config.datajob_mode == DataJobMode.PER_TABLE:
# Create one datajob per table
yield from self._get_per_table_datajob_workunits(connector, dataflow)
for wu in self._get_per_table_datajob_workunits(connector, dataflow):
# If this is a field lineage workunit, store it for later
if wu.id.endswith("-field-lineage"):
field_lineage_workunits.append(wu)
else:
yield wu
else:
# Default: consolidated mode - one datajob per connector
yield from self._get_consolidated_datajob_workunits(connector, dataflow)
for wu in self._get_consolidated_datajob_workunits(connector, dataflow):
# If this is a field lineage workunit, store it for later
if wu.id.endswith("-field-lineage"):
field_lineage_workunits.append(wu)
else:
yield wu
# Now emit the field lineage workunits after all dataset workunits
for wu in field_lineage_workunits:
yield wu
def _report_lineage_truncation(self, connector: Connector) -> None:
"""Report warning about truncated lineage."""