diff --git a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py index 046b615bd4..a8fe4f0df8 100644 --- a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py +++ b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py @@ -20,6 +20,7 @@ from datahub.metadata.schema_classes import ( UpstreamClass, UpstreamLineageClass, ) +from datahub.utilities.file_backed_collections import FileBackedDict from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult logger = logging.getLogger(__name__) @@ -80,10 +81,10 @@ class SqlParsingBuilder: generate_operations: bool = True usage_config: Optional[BaseUsageConfig] = None - # TODO: Make inner dict a FileBackedDict and make LineageEdge frozen + # Maps downstream urn -> upstream urn -> LineageEdge # Builds up a single LineageEdge for each upstream -> downstream pair - _lineage_map: Dict[DatasetUrn, Dict[DatasetUrn, LineageEdge]] = field( - default_factory=lambda: defaultdict(dict), init=False + _lineage_map: FileBackedDict[Dict[DatasetUrn, LineageEdge]] = field( + default_factory=FileBackedDict, init=False ) # TODO: Replace with FileBackedDict approach like in BigQuery usage @@ -128,13 +129,14 @@ class SqlParsingBuilder: if self.generate_lineage: for downstream_urn in downstreams_to_ingest: - _merge_lineage_data( + # Set explicitly so that FileBackedDict registers any mutations + self._lineage_map[downstream_urn] = _merge_lineage_data( downstream_urn=downstream_urn, upstream_urns=result.in_tables, column_lineage=result.column_lineage if include_column_lineage else None, - upstream_edges=self._lineage_map[downstream_urn], + upstream_edges=self._lineage_map.get(downstream_urn, {}), query_timestamp=query_timestamp, is_view_ddl=is_view_ddl, user=user, @@ -170,11 +172,12 @@ class SqlParsingBuilder: user: Optional[UserUrn] = None, ) -> None: """Manually add a single upstream -> downstream lineage edge, e.g. if sql parsing fails.""" - _merge_lineage_data( + # Set explicitly so that FileBackedDict registers any mutations + self._lineage_map[downstream_urn] = _merge_lineage_data( downstream_urn=downstream_urn, upstream_urns=upstream_urns, column_lineage=None, - upstream_edges=self._lineage_map[downstream_urn], + upstream_edges=self._lineage_map.get(downstream_urn, {}), query_timestamp=timestamp, is_view_ddl=is_view_ddl, user=user, @@ -225,7 +228,7 @@ def _merge_lineage_data( query_timestamp: Optional[datetime], is_view_ddl: bool, user: Optional[UserUrn], -) -> None: +) -> Dict[str, LineageEdge]: for upstream_urn in upstream_urns: edge = upstream_edges.setdefault( upstream_urn, @@ -255,6 +258,8 @@ def _merge_lineage_data( column_map = upstream_edges[upstream_column_info.table].column_map column_map[cl.downstream.column].add(upstream_column_info.column) + return upstream_edges + def _compute_upstream_fields( result: SqlParsingResult,