feat(ingest/sql-parsing): Support file backed dict in SqlParsingBuilder for lineage (#9654)

This commit is contained in:
Andrew Sikowitz 2024-01-18 23:12:20 -05:00 committed by GitHub
parent 3682c5f1d0
commit f993f50a04
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -20,6 +20,7 @@ from datahub.metadata.schema_classes import (
UpstreamClass,
UpstreamLineageClass,
)
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult
logger = logging.getLogger(__name__)
@ -80,10 +81,10 @@ class SqlParsingBuilder:
generate_operations: bool = True
usage_config: Optional[BaseUsageConfig] = None
# TODO: Make inner dict a FileBackedDict and make LineageEdge frozen
# Maps downstream urn -> upstream urn -> LineageEdge
# Builds up a single LineageEdge for each upstream -> downstream pair
_lineage_map: Dict[DatasetUrn, Dict[DatasetUrn, LineageEdge]] = field(
default_factory=lambda: defaultdict(dict), init=False
_lineage_map: FileBackedDict[Dict[DatasetUrn, LineageEdge]] = field(
default_factory=FileBackedDict, init=False
)
# TODO: Replace with FileBackedDict approach like in BigQuery usage
@ -128,13 +129,14 @@ class SqlParsingBuilder:
if self.generate_lineage:
for downstream_urn in downstreams_to_ingest:
_merge_lineage_data(
# Set explicitly so that FileBackedDict registers any mutations
self._lineage_map[downstream_urn] = _merge_lineage_data(
downstream_urn=downstream_urn,
upstream_urns=result.in_tables,
column_lineage=result.column_lineage
if include_column_lineage
else None,
upstream_edges=self._lineage_map[downstream_urn],
upstream_edges=self._lineage_map.get(downstream_urn, {}),
query_timestamp=query_timestamp,
is_view_ddl=is_view_ddl,
user=user,
@ -170,11 +172,12 @@ class SqlParsingBuilder:
user: Optional[UserUrn] = None,
) -> None:
"""Manually add a single upstream -> downstream lineage edge, e.g. if sql parsing fails."""
_merge_lineage_data(
# Set explicitly so that FileBackedDict registers any mutations
self._lineage_map[downstream_urn] = _merge_lineage_data(
downstream_urn=downstream_urn,
upstream_urns=upstream_urns,
column_lineage=None,
upstream_edges=self._lineage_map[downstream_urn],
upstream_edges=self._lineage_map.get(downstream_urn, {}),
query_timestamp=timestamp,
is_view_ddl=is_view_ddl,
user=user,
@ -225,7 +228,7 @@ def _merge_lineage_data(
query_timestamp: Optional[datetime],
is_view_ddl: bool,
user: Optional[UserUrn],
) -> None:
) -> Dict[str, LineageEdge]:
for upstream_urn in upstream_urns:
edge = upstream_edges.setdefault(
upstream_urn,
@ -255,6 +258,8 @@ def _merge_lineage_data(
column_map = upstream_edges[upstream_column_info.table].column_map
column_map[cl.downstream.column].add(upstream_column_info.column)
return upstream_edges
def _compute_upstream_fields(
result: SqlParsingResult,