From ab633d00476384dcb7f649bcab8ef9940df32bf6 Mon Sep 17 00:00:00 2001 From: Ravindra Lanka Date: Fri, 18 Feb 2022 15:52:13 -0800 Subject: [PATCH] fix(ingest): snowflake - handle external S3 bucket lineage for "External Tables". (#4192) --- .../datahub/ingestion/source/sql/snowflake.py | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py index c10e0c7f27..fddcb47685 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py @@ -1,7 +1,7 @@ import json import logging from collections import defaultdict -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union import pydantic @@ -185,7 +185,7 @@ class SnowflakeSource(SQLAlchemySource): def __init__(self, config, ctx): super().__init__(config, ctx, "snowflake") self._lineage_map: Optional[Dict[str, List[Tuple[str, str, str]]]] = None - self._external_lineage_map: Optional[Dict[str, List[str]]] = None + self._external_lineage_map: Optional[Dict[str, Set[str]]] = None @classmethod def create(cls, config_dict, ctx): @@ -408,6 +408,8 @@ WHERE url = self.config.get_sql_alchemy_url() logger.debug(f"sql_alchemy_url={url}") engine = create_engine(url, **self.config.options) + # Handles the case where a table is populated from an external location via copy. + # Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv'; query: str = """ WITH external_table_lineage_history AS ( SELECT @@ -432,16 +434,35 @@ WHERE end_time_millis=int(self.config.end_time.timestamp() * 1000), ) - self._external_lineage_map = {} + self._external_lineage_map = defaultdict(set) try: for db_row in engine.execute(query): # key is the down-stream table name key: str = db_row[1].lower().replace('"', "") - self._external_lineage_map[key] = json.loads(db_row[0]) - logger.debug(f"Lineage[{key}]:{self._external_lineage_map[key]}") + self._external_lineage_map[key] |= {*json.loads(db_row[0])} + logger.debug( + f"ExternalLineage[{key}]:{self._external_lineage_map[key]}" + ) except Exception as e: logger.warning( - f"Extracting lineage from Snowflake failed." + f"Populating table external lineage from Snowflake failed." + f"Please check your premissions. Continuing...\nError was {e}." + ) + # Handles the case for explicitly created external tables. + # NOTE: Snowflake does not log this information to the access_history table. + external_tables_query: str = "show external tables" + try: + for db_row in engine.execute(external_tables_query): + key = ( + f"{db_row.database_name}.{db_row.schema_name}.{db_row.name}".lower() + ) + self._external_lineage_map[key].add(db_row.location) + logger.debug( + f"ExternalLineage[{key}]:{self._external_lineage_map[key]}" + ) + except Exception as e: + logger.warning( + f"Populating external table lineage from Snowflake failed." f"Please check your premissions. Continuing...\nError was {e}." ) @@ -475,7 +496,7 @@ WITH table_lineage_history AS ( AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)) SELECT upstream_table_name, downstream_table_name, upstream_table_columns, downstream_table_columns FROM table_lineage_history -WHERE upstream_table_domain = 'Table' and downstream_table_domain = 'Table' +WHERE upstream_table_domain in ('Table', 'External table') and downstream_table_domain = 'Table' QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_name ORDER BY query_start_time DESC) = 1 """.format( start_time_millis=int(self.config.start_time.timestamp() * 1000), end_time_millis=int(self.config.end_time.timestamp() * 1000), @@ -513,8 +534,8 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na assert self._lineage_map is not None assert self._external_lineage_map is not None dataset_name = dataset_key.name - lineage = self._lineage_map.get(f"{dataset_name}", []) - external_lineage = self._external_lineage_map.get(f"{dataset_name}", []) + lineage = self._lineage_map[dataset_name] + external_lineage = self._external_lineage_map[dataset_name] if not (lineage or external_lineage): logger.debug(f"No lineage found for {dataset_name}") return None