fix(ingest/snowflake): fix to not emit upstream external lineage for non-existent, non-allowed entities (#7778)

This commit is contained in:
Mayuri Nehate 2023-04-11 21:38:56 +05:30 committed by GitHub
parent 8123e36edd
commit cf365c32fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 35 deletions

View File

@ -13,7 +13,6 @@ from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.snowflake.constants import (
LINEAGE_PERMISSION_ERROR,
SnowflakeEdition,
SnowflakeObjectDomain,
)
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
@ -93,36 +92,34 @@ class SnowflakeLineageExtractor(
if self.connection is None:
return
self._populate_external_lineage_map()
self._populate_external_lineage_map(discovered_tables)
if self.config.include_view_lineage:
if len(discovered_views) > 0:
yield from self.get_view_upstream_workunits(discovered_views)
logger.info(
f"Upstream lineage detected for {self.report.num_views_with_upstreams} views.",
)
else:
logger.info("No views found. Skipping View Lineage Extraction.")
yield from self.get_table_upstream_workunits(discovered_tables)
if self._external_lineage_map: # Some external lineage is yet to be emitted
yield from self.get_table_external_upstream_workunits(discovered_tables)
def get_table_external_upstream_workunits(
self, discovered_tables: List[str]
) -> Iterable[MetadataWorkUnit]:
for (
dataset_name,
external_lineage,
) in self._external_lineage_map.items():
upstreams = self.get_external_upstreams(external_lineage)
if upstreams:
self.report.num_tables_with_external_upstreams_only += 1
yield self._create_upstream_lineage_workunit(dataset_name, upstreams)
logger.info(
f"Upstream lineage detected for {self.report.num_tables_with_upstreams} tables.",
f"Only upstream external lineage detected for {self.report.num_tables_with_external_upstreams_only} tables.",
)
yield from self.get_table_external_upstream_workunits()
def get_table_external_upstream_workunits(self) -> Iterable[MetadataWorkUnit]:
if self._external_lineage_map:
for (
dataset_name,
external_lineage,
) in self._external_lineage_map.items():
upstreams = self.get_external_upstreams(external_lineage)
if upstreams:
yield self._create_upstream_lineage_workunit(
dataset_name, upstreams
)
def get_table_upstream_workunits(
self, discovered_tables: List[str]
) -> Iterable[MetadataWorkUnit]:
@ -142,9 +139,12 @@ class SnowflakeLineageExtractor(
yield from self._build_upstream_lineage_workunits_from_query_result(
discovered_tables, results
)
logger.info(
f"Upstream lineage detected for {self.report.num_tables_with_upstreams} tables.",
)
def _build_upstream_lineage_workunits_from_query_result(
self, discovered_assets, results, report_upstream_for_view=False
self, discovered_assets, results, upstream_for_view=False
):
for db_row in results:
dataset_name = self.get_dataset_identifier_from_qualified_name(
@ -157,7 +157,7 @@ class SnowflakeLineageExtractor(
fine_upstreams,
) = self.get_upstreams_from_query_result_row(dataset_name, db_row)
if upstreams:
if report_upstream_for_view:
if upstream_for_view:
self.report.num_views_with_upstreams += 1
else:
self.report.num_tables_with_upstreams += 1
@ -179,7 +179,10 @@ class SnowflakeLineageExtractor(
return
yield from self._build_upstream_lineage_workunits_from_query_result(
discovered_views, results, report_upstream_for_view=True
discovered_views, results, upstream_for_view=True
)
logger.info(
f"Upstream lineage detected for {self.report.num_views_with_upstreams} views.",
)
def _create_upstream_lineage_workunit(
@ -240,7 +243,7 @@ class SnowflakeLineageExtractor(
return upstreams, fine_upstreams
def _populate_external_lineage_map(self) -> None:
def _populate_external_lineage_map(self, discovered_tables: List[str]) -> None:
with PerfTimer() as timer:
self.report.num_external_table_edges_scanned = 0
@ -249,9 +252,9 @@ class SnowflakeLineageExtractor(
"Snowflake Account is Standard Edition. External Lineage Feature via Access History is not supported."
) # See Edition Note above for why
else:
self._populate_external_lineage_from_access_history()
self._populate_external_lineage_from_access_history(discovered_tables)
self._populate_external_lineage_from_show_query()
self._populate_external_lineage_from_show_query(discovered_tables)
logger.info(
f"Found {self.report.num_external_table_edges_scanned} external lineage edges."
@ -263,7 +266,7 @@ class SnowflakeLineageExtractor(
# Handles the case for explicitly created external tables.
# NOTE: Snowflake does not log this information to the access_history table.
def _populate_external_lineage_from_show_query(self):
def _populate_external_lineage_from_show_query(self, discovered_tables):
external_tables_query: str = SnowflakeQuery.show_external_tables()
try:
for db_row in self.query(external_tables_query):
@ -271,9 +274,7 @@ class SnowflakeLineageExtractor(
db_row["name"], db_row["schema_name"], db_row["database_name"]
)
if not self._is_dataset_pattern_allowed(
key, SnowflakeObjectDomain.TABLE
):
if key not in discovered_tables:
continue
self._external_lineage_map[key].add(db_row["location"])
logger.debug(
@ -289,7 +290,9 @@ class SnowflakeLineageExtractor(
# Handles the case where a table is populated from an external location via copy.
# Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv';
def _populate_external_lineage_from_access_history(self):
def _populate_external_lineage_from_access_history(
self, discovered_tables: List[str]
) -> None:
query: str = SnowflakeQuery.external_table_lineage_history(
start_time_millis=int(self.config.start_time.timestamp() * 1000)
if not self.config.ignore_start_time_lineage
@ -299,7 +302,7 @@ class SnowflakeLineageExtractor(
try:
for db_row in self.query(query):
self._process_external_lineage_result_row(db_row)
self._process_external_lineage_result_row(db_row, discovered_tables)
except Exception as e:
if isinstance(e, SnowflakePermissionError):
error_msg = "Failed to get external lineage. Please grant imported privileges on SNOWFLAKE database. "
@ -311,12 +314,12 @@ class SnowflakeLineageExtractor(
f"Populating table external lineage from Snowflake failed due to error {e}.",
)
def _process_external_lineage_result_row(self, db_row):
def _process_external_lineage_result_row(self, db_row, discovered_tables):
# key is the down-stream table name
key: str = self.get_dataset_identifier_from_qualified_name(
db_row["DOWNSTREAM_TABLE_NAME"]
)
if not self._is_dataset_pattern_allowed(key, SnowflakeObjectDomain.TABLE):
if key not in discovered_tables:
return
if db_row["UPSTREAM_LOCATIONS"] is not None:

View File

@ -52,6 +52,7 @@ class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlRepor
edition: Optional[SnowflakeEdition] = None
num_tables_with_external_upstreams_only: int = 0
num_tables_with_upstreams: int = 0
num_views_with_upstreams: int = 0

View File

@ -173,7 +173,7 @@ class SnowflakeUsageExtractor(
)
if dataset_identifier not in discovered_datasets:
logger.debug(
f"Skipping usage for table {dataset_identifier}, as table schema is not accessible"
f"Skipping usage for table {dataset_identifier}, as table schema is not accessible or not allowed by recipe."
)
continue

View File

@ -531,12 +531,20 @@ class SnowflakeV2Source(
for db in databases
for schema in db.schemas
for table_name in schema.tables
if self._is_dataset_pattern_allowed(
self.get_dataset_identifier(table_name, schema.name, db.name),
SnowflakeObjectDomain.TABLE,
)
]
discovered_views: List[str] = [
self.get_dataset_identifier(table_name, schema.name, db.name)
for db in databases
for schema in db.schemas
for table_name in schema.views
if self._is_dataset_pattern_allowed(
self.get_dataset_identifier(table_name, schema.name, db.name),
SnowflakeObjectDomain.VIEW,
)
]
if len(discovered_tables) == 0 and len(discovered_views) == 0: