fix(ingest/dbt): respect convert_column_urns_to_lowercase in mapping CLL (#10132)

This commit is contained in:
Harshal Sheth 2024-03-26 14:42:47 -07:00 committed by GitHub
parent e97e6822ad
commit 1febe68b49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -699,23 +699,27 @@ def get_upstreams_for_test(
def make_mapping_upstream_lineage( def make_mapping_upstream_lineage(
upstream_urn: str, downstream_urn: str, node: DBTNode upstream_urn: str,
downstream_urn: str,
node: DBTNode,
convert_column_urns_to_lowercase: bool,
) -> UpstreamLineageClass: ) -> UpstreamLineageClass:
cll = None cll = []
if node.columns: for column in node.columns or []:
cll = [ field_name = column.name
if convert_column_urns_to_lowercase:
field_name = field_name.lower()
cll.append(
FineGrainedLineage( FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[ upstreams=[mce_builder.make_schema_field_urn(upstream_urn, field_name)],
mce_builder.make_schema_field_urn(upstream_urn, column.name)
],
downstreamType=FineGrainedLineageDownstreamType.FIELD, downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[ downstreams=[
mce_builder.make_schema_field_urn(downstream_urn, column.name) mce_builder.make_schema_field_urn(downstream_urn, field_name)
], ],
) )
for column in node.columns )
]
return UpstreamLineageClass( return UpstreamLineageClass(
upstreams=[ upstreams=[
@ -727,7 +731,7 @@ def make_mapping_upstream_lineage(
), ),
) )
], ],
fineGrainedLineages=cll, fineGrainedLineages=cll or None,
) )
@ -1255,6 +1259,7 @@ class DBTSourceBase(StatefulIngestionSourceBase):
upstream_urn=upstream_dbt_urn, upstream_urn=upstream_dbt_urn,
downstream_urn=node_datahub_urn, downstream_urn=node_datahub_urn,
node=node, node=node,
convert_column_urns_to_lowercase=self.config.convert_column_urns_to_lowercase,
) )
if self.config.incremental_lineage: if self.config.incremental_lineage:
# We only generate incremental lineage for non-dbt nodes. # We only generate incremental lineage for non-dbt nodes.
@ -1601,6 +1606,7 @@ class DBTSourceBase(StatefulIngestionSourceBase):
), ),
downstream_urn=node_urn, downstream_urn=node_urn,
node=node, node=node,
convert_column_urns_to_lowercase=self.config.convert_column_urns_to_lowercase,
) )
else: else:
upstream_urns = get_upstreams( upstream_urns = get_upstreams(