From ba0c912c882364b87cf9f085dbb07afb42567763 Mon Sep 17 00:00:00 2001 From: Ravindra Lanka Date: Tue, 1 Mar 2022 16:53:26 -0800 Subject: [PATCH] fix(ingest): snowflake - fix view upstream lineages to eliminate false edges. (#4284) --- .../datahub/ingestion/source/sql/snowflake.py | 73 ++++++------------- 1 file changed, 21 insertions(+), 52 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py index b33f6d378a..f28fc26a8d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py @@ -236,79 +236,48 @@ class SnowflakeSource(SQLAlchemySource): return f"{self.current_database.lower()}.{regular}" def _populate_view_upstream_lineage(self, engine: sqlalchemy.engine.Engine) -> None: - # NOTE: This query captures only the upstream lineage of a view. - # For more details see https://docs.snowflake.com/en/sql-reference/account-usage/access_history.html#usage-notes for current limitations on capturing the lineage for views. + # NOTE: This query captures only the upstream lineage of a view (with no column lineage). + # For more details see: https://docs.snowflake.com/en/user-guide/object-dependencies.html#object-dependencies + # and also https://docs.snowflake.com/en/sql-reference/account-usage/access_history.html#usage-notes for current limitations on capturing the lineage for views. view_upstream_lineage_query: str = """ -WITH view_upstream_lineage_history AS ( - SELECT - vu.value : "objectName" AS view_name, - vu.value : "objectDomain" AS view_domain, - vu.value : "columns" AS view_columns, - r.value : "objectName" AS upstream_table_name, - r.value : "objectDomain" AS upstream_table_domain, - r.value : "columns" AS upstream_table_columns, - t.query_start_time AS query_start_time - FROM - ( - SELECT - * - FROM - snowflake.account_usage.access_history - ) t, - lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) vu, - lateral flatten(input => t.BASE_OBJECTS_ACCESSED) r - WHERE - vu.value : "objectId" IS NOT NULL - AND r.value : "objectId" IS NOT NULL - AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3) - AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3) -) SELECT - view_name, - view_columns, - upstream_table_name, - upstream_table_domain, - upstream_table_columns + concat( + referenced_database, '.', referenced_schema, + '.', referenced_object_name + ) AS view_upstream, + concat( + referencing_database, '.', referencing_schema, + '.', referencing_object_name + ) AS downstream_view FROM - view_upstream_lineage_history + snowflake.account_usage.object_dependencies WHERE - view_domain in ('View', 'Materialized view') - AND view_name != upstream_table_name - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY view_name, - upstream_table_name - ORDER BY - query_start_time DESC - ) = 1 - """.format( - start_time_millis=int(self.config.start_time.timestamp() * 1000), - end_time_millis=int(self.config.end_time.timestamp() * 1000), - ) + referencing_object_domain in ('VIEW', 'MATERIALIZED VIEW') + """ assert self._lineage_map is not None num_edges: int = 0 try: for db_row in engine.execute(view_upstream_lineage_query): - # Process UpstreamTable->View edge. - - view_name: str = db_row[0].lower().replace('"', "") - upstream_table: str = db_row[2].lower().replace('"', "") + # Process UpstreamTable/View/ExternalTable/Materialized View->View edge. + view_upstream: str = db_row["view_upstream"].lower() + view_name: str = db_row["downstream_view"].lower() # key is the downstream view name self._lineage_map[view_name].append( - # (, , ) - (upstream_table, db_row[4], db_row[1]) + # (, , ) + (view_upstream, "[]", "[]") ) num_edges += 1 logger.debug( - f"Table->View: Lineage[View(Down)={view_name}]:Table(Up)={self._lineage_map[view_name]}, upstream_domain={db_row[3]}" + f"Upstream->View: Lineage[View(Down)={view_name}]:Upstream={view_upstream}" ) except Exception as e: logger.warning( f"Extracting the upstream view lineage from Snowflake failed." f"Please check your permissions. Continuing...\nError was {e}." ) - logger.info(f"A total of {num_edges} Table->View edges found.") + logger.info(f"A total of {num_edges} View upstream edges found.") self.report.num_table_to_view_edges_scanned = num_edges def _populate_view_downstream_lineage(