fix(ingest): snowflake - handle external S3 bucket lineage for "External Tables". (#4192)

This commit is contained in:
Ravindra Lanka 2022-02-18 15:52:13 -08:00 committed by GitHub
parent e09542d3e8
commit ab633d0047
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,7 +1,7 @@
import json import json
import logging import logging
from collections import defaultdict from collections import defaultdict
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
import pydantic import pydantic
@ -185,7 +185,7 @@ class SnowflakeSource(SQLAlchemySource):
def __init__(self, config, ctx): def __init__(self, config, ctx):
super().__init__(config, ctx, "snowflake") super().__init__(config, ctx, "snowflake")
self._lineage_map: Optional[Dict[str, List[Tuple[str, str, str]]]] = None self._lineage_map: Optional[Dict[str, List[Tuple[str, str, str]]]] = None
self._external_lineage_map: Optional[Dict[str, List[str]]] = None self._external_lineage_map: Optional[Dict[str, Set[str]]] = None
@classmethod @classmethod
def create(cls, config_dict, ctx): def create(cls, config_dict, ctx):
@ -408,6 +408,8 @@ WHERE
url = self.config.get_sql_alchemy_url() url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}") logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options) engine = create_engine(url, **self.config.options)
# Handles the case where a table is populated from an external location via copy.
# Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv';
query: str = """ query: str = """
WITH external_table_lineage_history AS ( WITH external_table_lineage_history AS (
SELECT SELECT
@ -432,16 +434,35 @@ WHERE
end_time_millis=int(self.config.end_time.timestamp() * 1000), end_time_millis=int(self.config.end_time.timestamp() * 1000),
) )
self._external_lineage_map = {} self._external_lineage_map = defaultdict(set)
try: try:
for db_row in engine.execute(query): for db_row in engine.execute(query):
# key is the down-stream table name # key is the down-stream table name
key: str = db_row[1].lower().replace('"', "") key: str = db_row[1].lower().replace('"', "")
self._external_lineage_map[key] = json.loads(db_row[0]) self._external_lineage_map[key] |= {*json.loads(db_row[0])}
logger.debug(f"Lineage[{key}]:{self._external_lineage_map[key]}") logger.debug(
f"ExternalLineage[{key}]:{self._external_lineage_map[key]}"
)
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"Extracting lineage from Snowflake failed." f"Populating table external lineage from Snowflake failed."
f"Please check your premissions. Continuing...\nError was {e}."
)
# Handles the case for explicitly created external tables.
# NOTE: Snowflake does not log this information to the access_history table.
external_tables_query: str = "show external tables"
try:
for db_row in engine.execute(external_tables_query):
key = (
f"{db_row.database_name}.{db_row.schema_name}.{db_row.name}".lower()
)
self._external_lineage_map[key].add(db_row.location)
logger.debug(
f"ExternalLineage[{key}]:{self._external_lineage_map[key]}"
)
except Exception as e:
logger.warning(
f"Populating external table lineage from Snowflake failed."
f"Please check your premissions. Continuing...\nError was {e}." f"Please check your premissions. Continuing...\nError was {e}."
) )
@ -475,7 +496,7 @@ WITH table_lineage_history AS (
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)) AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3))
SELECT upstream_table_name, downstream_table_name, upstream_table_columns, downstream_table_columns SELECT upstream_table_name, downstream_table_name, upstream_table_columns, downstream_table_columns
FROM table_lineage_history FROM table_lineage_history
WHERE upstream_table_domain = 'Table' and downstream_table_domain = 'Table' WHERE upstream_table_domain in ('Table', 'External table') and downstream_table_domain = 'Table'
QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_name ORDER BY query_start_time DESC) = 1 """.format( QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_name ORDER BY query_start_time DESC) = 1 """.format(
start_time_millis=int(self.config.start_time.timestamp() * 1000), start_time_millis=int(self.config.start_time.timestamp() * 1000),
end_time_millis=int(self.config.end_time.timestamp() * 1000), end_time_millis=int(self.config.end_time.timestamp() * 1000),
@ -513,8 +534,8 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
assert self._lineage_map is not None assert self._lineage_map is not None
assert self._external_lineage_map is not None assert self._external_lineage_map is not None
dataset_name = dataset_key.name dataset_name = dataset_key.name
lineage = self._lineage_map.get(f"{dataset_name}", []) lineage = self._lineage_map[dataset_name]
external_lineage = self._external_lineage_map.get(f"{dataset_name}", []) external_lineage = self._external_lineage_map[dataset_name]
if not (lineage or external_lineage): if not (lineage or external_lineage):
logger.debug(f"No lineage found for {dataset_name}") logger.debug(f"No lineage found for {dataset_name}")
return None return None