feat(ingest): snowflake profile tables only if they have been updates since N days (#5132)

This commit is contained in:
mayurinehate 2022-06-13 14:59:16 +05:30 committed by GitHub
parent 503208beaa
commit 7b143b06fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 98 additions and 9 deletions

View File

@ -11,6 +11,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Deprecations ### Deprecations
### Other notable Changes ### Other notable Changes
- #5132 Profile tables in `snowflake` source only if they have been updated since configured (default: `1`) number of day(s). Update the config `profiling.profile_if_updated_since_days` as per your profiling schedule or set it to `None` if you want older behaviour.
## `v0.8.38` ## `v0.8.38`

View File

@ -84,6 +84,11 @@ class GEProfilingConfig(ConfigModel):
description="A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.", description="A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.",
) )
profile_if_updated_since_days: Optional[pydantic.PositiveFloat] = Field(
default=1,
description="Profile table only if it has been updated since these many number of days. `None` implies profile all tables. Only Snowflake supports this.",
)
# The default of (5 * cpu_count) is adopted from the default max_workers # The default of (5 * cpu_count) is adopted from the default max_workers
# parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound # parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound
# task, it may make sense to increase this default value in the future. # task, it may make sense to increase this default value in the future.

View File

@ -810,13 +810,19 @@ WHERE
return None, None return None, None
def is_dataset_eligible_for_profiling( def is_dataset_eligible_for_profiling(
self, dataset_name: str, sql_config: SQLAlchemyConfig self,
dataset_name: str,
sql_config: SQLAlchemyConfig,
inspector: Inspector,
profile_candidates: Optional[List[str]],
) -> bool: ) -> bool:
""" """
Method overrides default profiling filter which checks profiling eligibility based on allow-deny pattern. Method overrides default profiling filter which checks profiling eligibility based on allow-deny pattern.
This one also don't profile those sharded tables which are not the latest. This one also don't profile those sharded tables which are not the latest.
""" """
if not super().is_dataset_eligible_for_profiling(dataset_name, sql_config): if not super().is_dataset_eligible_for_profiling(
dataset_name, sql_config, inspector, profile_candidates
):
return False return False
(project_id, schema, table) = dataset_name.split(".") (project_id, schema, table) = dataset_name.split(".")

View File

@ -1,6 +1,7 @@
import json import json
import logging import logging
from collections import defaultdict from collections import defaultdict
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
import pydantic import pydantic
@ -177,8 +178,12 @@ class SnowflakeSource(SQLAlchemySource):
else: else:
self.report.report_dropped(db) self.report.report_dropped(db)
def get_identifier(self, *, schema: str, entity: str, **kwargs: Any) -> str: def get_identifier(
regular = super().get_identifier(schema=schema, entity=entity, **kwargs) self, *, schema: str, entity: str, inspector: Inspector, **kwargs: Any
) -> str:
regular = super().get_identifier(
schema=schema, entity=entity, inspector=inspector, **kwargs
)
return f"{self.current_database.lower()}.{regular}" return f"{self.current_database.lower()}.{regular}"
def _populate_view_upstream_lineage(self, engine: sqlalchemy.engine.Engine) -> None: def _populate_view_upstream_lineage(self, engine: sqlalchemy.engine.Engine) -> None:
@ -748,6 +753,37 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
return False return False
return True return True
def generate_profile_candidates(
self, inspector: Inspector, threshold_time: datetime
) -> List[str]:
self.report.profile_if_updated_since = threshold_time
_profile_candidates = []
db_rows = inspector.engine.execute(
text(
"""
select table_catalog, table_schema, table_name
from information_schema.tables
where last_altered >= to_timestamp_ltz({timestamp}, 3) and table_type= 'BASE TABLE'
""".format(
timestamp=round(threshold_time.timestamp() * 1000)
)
)
)
db_name = self.current_database
for db_row in db_rows:
_profile_candidates.append(
self.get_identifier(
schema=db_row.table_schema,
entity=db_row.table_name,
inspector=inspector,
).lower()
)
logger.debug(f"Generating profiling candidates for db {db_name}")
self.report.profile_candidates[db_name] = _profile_candidates
return _profile_candidates
# Stateful Ingestion specific overrides # Stateful Ingestion specific overrides
# NOTE: There is no special state associated with this source yet than what is provided by sql_common. # NOTE: There is no special state associated with this source yet than what is provided by sql_common.
def get_platform_instance_id(self) -> str: def get_platform_instance_id(self) -> str:

View File

@ -1185,6 +1185,15 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
except NotImplementedError: except NotImplementedError:
description: Optional[str] = None description: Optional[str] = None
properties: Dict[str, str] = {} properties: Dict[str, str] = {}
except ProgrammingError as pe:
# Snowflake needs schema names quoted when fetching table comments.
logger.debug(
f"Encountered ProgrammingError. Retrying with quoted schema name for schema {schema} and view {view}",
pe,
)
description = None
properties = {}
view_info: dict = inspector.get_table_comment(view, f'"{schema}"') # type: ignore
else: else:
description = view_info["text"] description = view_info["text"]
@ -1308,13 +1317,27 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
) -> Optional[bool]: ) -> Optional[bool]:
return None return None
# Override if needed
def generate_profile_candidates(
self, inspector: Inspector, threshold_time: datetime.datetime
) -> Optional[List[str]]:
raise NotImplementedError()
# Override if you want to do additional checks # Override if you want to do additional checks
def is_dataset_eligible_for_profiling( def is_dataset_eligible_for_profiling(
self, dataset_name: str, sql_config: SQLAlchemyConfig self,
dataset_name: str,
sql_config: SQLAlchemyConfig,
inspector: Inspector,
profile_candidates: Optional[List[str]],
) -> bool: ) -> bool:
return sql_config.table_pattern.allowed( return (
dataset_name sql_config.table_pattern.allowed(dataset_name)
) and sql_config.profile_pattern.allowed(dataset_name) and sql_config.profile_pattern.allowed(dataset_name)
) and (
profile_candidates is None
or (profile_candidates is not None and dataset_name in profile_candidates)
)
def loop_profiler_requests( def loop_profiler_requests(
self, self,
@ -1325,6 +1348,19 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest
tables_seen: Set[str] = set() tables_seen: Set[str] = set()
profile_candidates = None # Default value if profile candidates not available.
if sql_config.profiling.profile_if_updated_since_days is not None:
try:
threshold_time: datetime.datetime = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(
sql_config.profiling.profile_if_updated_since_days # type:ignore
)
profile_candidates = self.generate_profile_candidates(
inspector, threshold_time
)
except NotImplementedError:
logger.debug("Source does not support generating profile candidates.")
for table in inspector.get_table_names(schema): for table in inspector.get_table_names(schema):
schema, table = self.standardize_schema_table_names( schema, table = self.standardize_schema_table_names(
@ -1333,7 +1369,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
dataset_name = self.get_identifier( dataset_name = self.get_identifier(
schema=schema, entity=table, inspector=inspector schema=schema, entity=table, inspector=inspector
) )
if not self.is_dataset_eligible_for_profiling(dataset_name, sql_config): if not self.is_dataset_eligible_for_profiling(
dataset_name, sql_config, inspector, profile_candidates
):
if self.config.profiling.report_dropped_profiles: if self.config.profiling.report_dropped_profiles:
self.report.report_dropped(f"profile of {dataset_name}") self.report.report_dropped(f"profile of {dataset_name}")
continue continue

View File

@ -36,3 +36,6 @@ class SnowflakeReport(BaseSnowflakeReport, SQLSourceReport):
role: str = "" role: str = ""
check_role_grants: Optional[bool] = None check_role_grants: Optional[bool] = None
role_grants: List[str] = field(default_factory=list) role_grants: List[str] = field(default_factory=list)
profile_if_updated_since: Optional[datetime] = None
profile_candidates: Dict[str, List[str]] = field(default_factory=dict)