mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-02 19:58:59 +00:00
fix(ingest/snowflake): fix error case in column lineage (#10808)
This commit is contained in:
parent
5f33bf3c42
commit
62e6b7ff78
@ -685,12 +685,7 @@ WHERE table_schema='{schema_name}' AND {extra_clause}"""
|
|||||||
t.query_start_time AS query_start_time,
|
t.query_start_time AS query_start_time,
|
||||||
t.query_id AS query_id
|
t.query_id AS query_id
|
||||||
FROM
|
FROM
|
||||||
(
|
snowflake.account_usage.access_history t,
|
||||||
SELECT * from snowflake.account_usage.access_history
|
|
||||||
WHERE
|
|
||||||
query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
|
|
||||||
AND query_start_time < to_timestamp_ltz({end_time_millis}, 3)
|
|
||||||
) t,
|
|
||||||
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r,
|
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r,
|
||||||
lateral flatten(input => t.OBJECTS_MODIFIED) w,
|
lateral flatten(input => t.OBJECTS_MODIFIED) w,
|
||||||
lateral flatten(input => w.value : "columns", outer => true) wcols,
|
lateral flatten(input => w.value : "columns", outer => true) wcols,
|
||||||
@ -780,12 +775,14 @@ WHERE table_schema='{schema_name}' AND {extra_clause}"""
|
|||||||
queries AS (
|
queries AS (
|
||||||
select qid.downstream_table_name, qid.query_id, query_history.query_text, query_history.start_time
|
select qid.downstream_table_name, qid.query_id, query_history.query_text, query_history.start_time
|
||||||
from query_ids qid
|
from query_ids qid
|
||||||
JOIN (
|
LEFT JOIN (
|
||||||
SELECT * FROM snowflake.account_usage.query_history
|
SELECT * FROM snowflake.account_usage.query_history
|
||||||
WHERE query_history.start_time >= to_timestamp_ltz({start_time_millis}, 3)
|
WHERE query_history.start_time >= to_timestamp_ltz({start_time_millis}, 3)
|
||||||
AND query_history.start_time < to_timestamp_ltz({end_time_millis}, 3)
|
AND query_history.start_time < to_timestamp_ltz({end_time_millis}, 3)
|
||||||
) query_history
|
) query_history
|
||||||
on qid.query_id = query_history.query_id
|
on qid.query_id = query_history.query_id
|
||||||
|
WHERE qid.query_id is not null
|
||||||
|
AND query_history.query_text is not null
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
h.downstream_table_name AS "DOWNSTREAM_TABLE_NAME",
|
h.downstream_table_name AS "DOWNSTREAM_TABLE_NAME",
|
||||||
@ -850,12 +847,7 @@ WHERE table_schema='{schema_name}' AND {extra_clause}"""
|
|||||||
t.query_start_time AS query_start_time,
|
t.query_start_time AS query_start_time,
|
||||||
t.query_id AS query_id
|
t.query_id AS query_id
|
||||||
FROM
|
FROM
|
||||||
(
|
snowflake.account_usage.access_history t,
|
||||||
SELECT * from snowflake.account_usage.access_history
|
|
||||||
WHERE
|
|
||||||
query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
|
|
||||||
AND query_start_time < to_timestamp_ltz({end_time_millis}, 3)
|
|
||||||
) t,
|
|
||||||
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r,
|
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r,
|
||||||
lateral flatten(input => t.OBJECTS_MODIFIED) w
|
lateral flatten(input => t.OBJECTS_MODIFIED) w
|
||||||
WHERE
|
WHERE
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user