Added caching and other perf improvements

This commit is contained in:
Piotr Skrydalewicz 2025-06-27 00:54:33 +02:00
parent e7e29a9071
commit 1263498ad0

View File

@ -1574,27 +1574,33 @@ class SqlParsingAggregator(Closeable):
@dataclasses.dataclass
class QueryLineageInfo:
upstreams: List[UrnStr] # this is direct upstreams, with *no temp tables*
column_lineage: List[ColumnLineageInfo]
upstreams: OrderedSet[
UrnStr
] # this is direct upstreams, with *no temp tables*
column_lineage: OrderedSet[ColumnLineageInfo]
confidence_score: float
def _merge_lineage_from(self, other_query: "QueryLineageInfo") -> None:
self.upstreams += other_query.upstreams
self.column_lineage += other_query.column_lineage
self.upstreams.update(other_query.upstreams)
self.column_lineage.update(other_query.column_lineage)
self.confidence_score = min(
self.confidence_score, other_query.confidence_score
)
cache: Dict[str, QueryLineageInfo] = {}
def _recurse_into_query(
query: QueryMetadata, recursion_path: List[QueryId]
) -> QueryLineageInfo:
if query.query_id in recursion_path:
# This is a cycle, so we just return the query as-is.
return QueryLineageInfo(
upstreams=query.upstreams,
column_lineage=query.column_lineage,
upstreams=OrderedSet(query.upstreams),
column_lineage=OrderedSet(query.column_lineage),
confidence_score=query.confidence_score,
)
if query.query_id in cache:
return cache[query.query_id]
recursion_path = [*recursion_path, query.query_id]
composed_of_queries.add(query.query_id)
@ -1669,11 +1675,14 @@ class SqlParsingAggregator(Closeable):
]
)
return QueryLineageInfo(
upstreams=list(new_upstreams),
column_lineage=new_cll,
ret = QueryLineageInfo(
upstreams=new_upstreams,
column_lineage=OrderedSet(new_cll),
confidence_score=new_confidence_score,
)
cache[query.query_id] = ret
return ret
resolved_lineage_info = _recurse_into_query(base_query, [])
@ -1713,8 +1722,8 @@ class SqlParsingAggregator(Closeable):
base_query,
query_id=composite_query_id,
formatted_query_string=merged_query_text,
upstreams=resolved_lineage_info.upstreams,
column_lineage=resolved_lineage_info.column_lineage,
upstreams=list(resolved_lineage_info.upstreams),
column_lineage=list(resolved_lineage_info.column_lineage),
confidence_score=resolved_lineage_info.confidence_score,
)