fix(ingest/snowflake): propagate table list from main to query extractor (#11222)

This commit is contained in:
Harshal Sheth 2024-08-23 16:42:32 -04:00 committed by GitHub
parent e0c13fda27
commit 1a09cb2c2a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 20 additions and 5 deletions

View File

@ -48,7 +48,7 @@ class ClassificationReportMixin:
class ClassificationSourceConfigMixin(ConfigModel):
classification: ClassificationConfig = Field(
default=ClassificationConfig(),
description="For details, refer [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md).",
description="For details, refer to [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md).",
)

View File

@ -353,7 +353,7 @@ class SnowflakeLineageExtractor(SnowflakeCommonMixin, Closeable):
def _process_external_lineage_result_row(
cls,
db_row: dict,
discovered_tables: Optional[List[str]],
discovered_tables: Optional[Collection[str]],
identifiers: SnowflakeIdentifierBuilder,
) -> Optional[KnownLineageMapping]:
# key is the down-stream table name

View File

@ -131,7 +131,7 @@ class SnowflakeQueriesExtractor(SnowflakeStructuredReportMixin):
self.report = SnowflakeQueriesExtractorReport()
self.filters = filters
self.identifiers = identifiers
self.discovered_tables = discovered_tables
self.discovered_tables = set(discovered_tables) if discovered_tables else None
self._structured_report = structured_report
@ -175,10 +175,24 @@ class SnowflakeQueriesExtractor(SnowflakeStructuredReportMixin):
return path
def is_temp_table(self, name: str) -> bool:
return any(
if any(
re.match(pattern, name, flags=re.IGNORECASE)
for pattern in self.config.temporary_tables_pattern
)
):
return True
# This is also a temp table if
# 1. this name would be allowed by the dataset patterns, and
# 2. we have a list of discovered tables, and
# 3. it's not in the discovered tables list
if (
self.filters.is_dataset_pattern_allowed(name, SnowflakeObjectDomain.TABLE)
and self.discovered_tables
and name not in self.discovered_tables
):
return True
return False
def is_allowed_table(self, name: str) -> bool:
if self.discovered_tables and name not in self.discovered_tables:

View File

@ -526,6 +526,7 @@ class SnowflakeV2Source(
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=schema_resolver,
discovered_tables=discovered_datasets,
)
# TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs