mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-25 09:00:50 +00:00
fix(ingest): snowflake - fix view upstream lineages to eliminate false edges. (#4284)
This commit is contained in:
parent
a2a939d406
commit
ba0c912c88
@ -236,79 +236,48 @@ class SnowflakeSource(SQLAlchemySource):
|
||||
return f"{self.current_database.lower()}.{regular}"
|
||||
|
||||
def _populate_view_upstream_lineage(self, engine: sqlalchemy.engine.Engine) -> None:
|
||||
# NOTE: This query captures only the upstream lineage of a view.
|
||||
# For more details see https://docs.snowflake.com/en/sql-reference/account-usage/access_history.html#usage-notes for current limitations on capturing the lineage for views.
|
||||
# NOTE: This query captures only the upstream lineage of a view (with no column lineage).
|
||||
# For more details see: https://docs.snowflake.com/en/user-guide/object-dependencies.html#object-dependencies
|
||||
# and also https://docs.snowflake.com/en/sql-reference/account-usage/access_history.html#usage-notes for current limitations on capturing the lineage for views.
|
||||
view_upstream_lineage_query: str = """
|
||||
WITH view_upstream_lineage_history AS (
|
||||
SELECT
|
||||
vu.value : "objectName" AS view_name,
|
||||
vu.value : "objectDomain" AS view_domain,
|
||||
vu.value : "columns" AS view_columns,
|
||||
r.value : "objectName" AS upstream_table_name,
|
||||
r.value : "objectDomain" AS upstream_table_domain,
|
||||
r.value : "columns" AS upstream_table_columns,
|
||||
t.query_start_time AS query_start_time
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
snowflake.account_usage.access_history
|
||||
) t,
|
||||
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) vu,
|
||||
lateral flatten(input => t.BASE_OBJECTS_ACCESSED) r
|
||||
WHERE
|
||||
vu.value : "objectId" IS NOT NULL
|
||||
AND r.value : "objectId" IS NOT NULL
|
||||
AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
|
||||
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)
|
||||
)
|
||||
SELECT
|
||||
view_name,
|
||||
view_columns,
|
||||
upstream_table_name,
|
||||
upstream_table_domain,
|
||||
upstream_table_columns
|
||||
concat(
|
||||
referenced_database, '.', referenced_schema,
|
||||
'.', referenced_object_name
|
||||
) AS view_upstream,
|
||||
concat(
|
||||
referencing_database, '.', referencing_schema,
|
||||
'.', referencing_object_name
|
||||
) AS downstream_view
|
||||
FROM
|
||||
view_upstream_lineage_history
|
||||
snowflake.account_usage.object_dependencies
|
||||
WHERE
|
||||
view_domain in ('View', 'Materialized view')
|
||||
AND view_name != upstream_table_name
|
||||
QUALIFY ROW_NUMBER() OVER (
|
||||
PARTITION BY view_name,
|
||||
upstream_table_name
|
||||
ORDER BY
|
||||
query_start_time DESC
|
||||
) = 1
|
||||
""".format(
|
||||
start_time_millis=int(self.config.start_time.timestamp() * 1000),
|
||||
end_time_millis=int(self.config.end_time.timestamp() * 1000),
|
||||
)
|
||||
referencing_object_domain in ('VIEW', 'MATERIALIZED VIEW')
|
||||
"""
|
||||
|
||||
assert self._lineage_map is not None
|
||||
num_edges: int = 0
|
||||
|
||||
try:
|
||||
for db_row in engine.execute(view_upstream_lineage_query):
|
||||
# Process UpstreamTable->View edge.
|
||||
|
||||
view_name: str = db_row[0].lower().replace('"', "")
|
||||
upstream_table: str = db_row[2].lower().replace('"', "")
|
||||
# Process UpstreamTable/View/ExternalTable/Materialized View->View edge.
|
||||
view_upstream: str = db_row["view_upstream"].lower()
|
||||
view_name: str = db_row["downstream_view"].lower()
|
||||
# key is the downstream view name
|
||||
self._lineage_map[view_name].append(
|
||||
# (<upstream_table_name>, <json_list_of_upstream_table_columns>, <json_list_of_downstream_view_columns>)
|
||||
(upstream_table, db_row[4], db_row[1])
|
||||
# (<upstream_table_name>, <empty_json_list_of_upstream_table_columns>, <empty_json_list_of_downstream_view_columns>)
|
||||
(view_upstream, "[]", "[]")
|
||||
)
|
||||
num_edges += 1
|
||||
logger.debug(
|
||||
f"Table->View: Lineage[View(Down)={view_name}]:Table(Up)={self._lineage_map[view_name]}, upstream_domain={db_row[3]}"
|
||||
f"Upstream->View: Lineage[View(Down)={view_name}]:Upstream={view_upstream}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Extracting the upstream view lineage from Snowflake failed."
|
||||
f"Please check your permissions. Continuing...\nError was {e}."
|
||||
)
|
||||
logger.info(f"A total of {num_edges} Table->View edges found.")
|
||||
logger.info(f"A total of {num_edges} View upstream edges found.")
|
||||
self.report.num_table_to_view_edges_scanned = num_edges
|
||||
|
||||
def _populate_view_downstream_lineage(
|
||||
|
Loading…
x
Reference in New Issue
Block a user