fix(ingest/bigquery): Add to lineage, not overwrite, when using sql parser (#7814)

This commit is contained in:
Andrew Sikowitz 2023-04-14 02:46:10 -04:00 committed by GitHub
parent 4ec280ee20
commit d8d8176b1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -450,11 +450,13 @@ timestamp < "{end_time}"
self.report.num_skipped_lineage_entries_not_allowed[e.project_id] += 1
continue
lineage_from_event: Set[LineageEdge] = set()
destination_table_str = str(e.destinationTable)
has_table = False
for ref_table in e.referencedTables:
if str(ref_table) != destination_table_str:
lineage_map[destination_table_str].add(
lineage_from_event.add(
LineageEdge(
table=str(ref_table),
auditStamp=e.end_time
@ -466,7 +468,7 @@ timestamp < "{end_time}"
has_view = False
for ref_view in e.referencedViews:
if str(ref_view) != destination_table_str:
lineage_map[destination_table_str].add(
lineage_from_event.add(
LineageEdge(
table=str(ref_view),
auditStamp=e.end_time
@ -475,7 +477,10 @@ timestamp < "{end_time}"
)
)
has_view = True
if self.config.lineage_use_sql_parser and has_table and has_view:
if not lineage_from_event:
self.report.num_skipped_lineage_entries_other[e.project_id] += 1
elif self.config.lineage_use_sql_parser and has_table and has_view:
# If there is a view being referenced then bigquery sends both the view as well as underlying table
# in the references. There is no distinction between direct/base objects accessed. So doing sql parsing
# to ensure we only use direct objects accessed for lineage
@ -496,15 +501,14 @@ timestamp < "{end_time}"
e.project_id
] += 1
continue
curr_lineage = lineage_map[destination_table_str]
new_lineage = set()
for lineage in curr_lineage:
for lineage in lineage_from_event:
name = lineage.table.split("/")[-1]
if name in referenced_objs:
new_lineage.add(lineage)
lineage_map[destination_table_str] = new_lineage
if not (has_table or has_view):
self.report.num_skipped_lineage_entries_other[e.project_id] += 1
lineage_from_event = new_lineage
lineage_map[destination_table_str].update(lineage_from_event)
logger.info("Exiting create lineage map function")
return lineage_map