diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 1ad8512b9e..e47d829651 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -792,7 +792,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): ) view = cast(BigqueryView, table) - view_definition_string = view.ddl + view_definition_string = view.view_definition view_properties_aspect = ViewProperties( materialized=False, viewLanguage="SQL", viewLogic=view_definition_string ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 683d369973..152394b4eb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -6,13 +6,13 @@ from typing import Counter, Dict, List, Optional import pydantic -from datahub.ingestion.source.sql.sql_common import SQLSourceReport +from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.stats_collections import TopKDict @dataclass -class BigQueryV2Report(SQLSourceReport): +class BigQueryV2Report(ProfilingSqlReport): num_total_lineage_entries: TopKDict[str, int] = field(default_factory=TopKDict) num_skipped_lineage_entries_missing_data: TopKDict[str, int] = field( default_factory=TopKDict @@ -55,7 +55,6 @@ class BigQueryV2Report(SQLSourceReport): upstream_lineage: LossyDict = field(default_factory=LossyDict) partition_info: Dict[str, str] = field(default_factory=TopKDict) profile_table_selection_criteria: Dict[str, str] = field(default_factory=TopKDict) - num_tables_not_eligible_profiling: Dict[str, int] = field(default_factory=TopKDict) selected_profile_tables: Dict[str, List[str]] = field(default_factory=TopKDict) invalid_partition_ids: Dict[str, str] = field(default_factory=TopKDict) allow_pattern: Optional[str] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index fa475f66d0..3302c873bd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -8,49 +8,33 @@ from google.cloud import bigquery from google.cloud.bigquery.table import RowIterator, TableListItem, TimePartitioning from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier +from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView logger: logging.Logger = logging.getLogger(__name__) @dataclass(frozen=True, eq=True) -class BigqueryColumn: - name: str - ordinal_position: int +class BigqueryColumn(BaseColumn): field_path: str - is_nullable: bool is_partition_column: bool - data_type: str - comment: str @dataclass -class BigqueryTable: - name: str - created: datetime - last_altered: Optional[datetime] - size_in_bytes: Optional[int] - rows_count: Optional[int] - expires: Optional[datetime] - clustering_fields: Optional[List[str]] - labels: Optional[Dict[str, str]] - num_partitions: Optional[int] - max_partition_id: Optional[str] - max_shard_id: Optional[str] - active_billable_bytes: Optional[int] - long_term_billable_bytes: Optional[int] - comment: str - ddl: str - time_partitioning: TimePartitioning +class BigqueryTable(BaseTable): + expires: Optional[datetime] = None + clustering_fields: Optional[List[str]] = None + labels: Optional[Dict[str, str]] = None + num_partitions: Optional[int] = None + max_partition_id: Optional[str] = None + max_shard_id: Optional[str] = None + active_billable_bytes: Optional[int] = None + long_term_billable_bytes: Optional[int] = None + time_partitioning: Optional[TimePartitioning] = None columns: List[BigqueryColumn] = field(default_factory=list) @dataclass -class BigqueryView: - name: str - created: datetime - last_altered: datetime - comment: str - ddl: str +class BigqueryView(BaseView): columns: List[BigqueryColumn] = field(default_factory=list) @@ -350,7 +334,7 @@ class BigQueryDataDictionary: table.last_altered / 1000, tz=timezone.utc ) if "last_altered" in table - else None, + else table.created, size_in_bytes=table.get("bytes"), rows_count=table.get("row_count"), comment=table.comment, @@ -404,9 +388,11 @@ class BigQueryDataDictionary: BigqueryView( name=table.table_name, created=table.created, - last_altered=table.last_altered if "last_altered" in table else None, + last_altered=table.last_altered + if "last_altered" in table + else table.created, comment=table.comment, - ddl=table.view_definition, + view_definition=table.view_definition, ) for table in cur ] diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index a4dd52e8d8..faf02649d5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -468,15 +468,15 @@ timestamp < "{end_time}" self, project: str, dataset: str, view: BigqueryView ) -> List[BigqueryTableIdentifier]: parsed_tables = set() - if view.ddl: + if view.view_definition: try: parser = BigQuerySQLParser( - view.ddl, self.config.sql_parser_use_external_process + view.view_definition, self.config.sql_parser_use_external_process ) tables = parser.get_tables() except Exception as ex: logger.debug( - f"View {view.name} definination sql parsing failed on query: {view.ddl}. Edge from physical table to view won't be added. The error was {ex}." + f"View {view.name} definination sql parsing failed on query: {view.view_definition}. Edge from physical table to view won't be added. The error was {ex}." ) return [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index d584575349..a83787beb8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -4,8 +4,6 @@ import logging from typing import Dict, Iterable, List, Optional, Tuple, cast from dateutil.relativedelta import relativedelta -from sqlalchemy import create_engine, inspect -from sqlalchemy.engine.reflection import Inspector from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance from datahub.emitter.mcp_builder import wrap_aspect_as_workunit @@ -17,12 +15,11 @@ from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, BigqueryTable, ) -from datahub.ingestion.source.ge_data_profiler import ( - DatahubGEProfiler, - GEProfilerRequest, +from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest +from datahub.ingestion.source.sql.sql_generic_profiler import ( + GenericProfiler, + TableProfilerRequest, ) -from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile -from datahub.metadata.schema_classes import DatasetProfileClass logger = logging.getLogger(__name__) @@ -33,11 +30,14 @@ class BigqueryProfilerRequest(GEProfilerRequest): profile_table_level_only: bool = False -class BigqueryProfiler: +class BigqueryProfiler(GenericProfiler): + config: BigQueryV2Config + report: BigQueryV2Report + def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report) -> None: + super().__init__(config, report, "bigquery") self.config = config self.report = report - self.platform = "bigquery" @staticmethod def get_partition_range_from_partition_id( @@ -184,9 +184,9 @@ WHERE if len(profile_requests) == 0: continue - + table_profile_requests = cast(List[TableProfilerRequest], profile_requests) for request, profile in self.generate_profiles( - profile_requests, + table_profile_requests, self.config.profiling.max_workers, platform=self.platform, profiler_args=self.get_profile_args(), @@ -231,8 +231,11 @@ WHERE dataset_name, table.last_altered, table.size_in_bytes, table.rows_count ): profile_table_level_only = True - self.report.num_tables_not_eligible_profiling[dataset] = ( - self.report.num_tables_not_eligible_profiling.get(dataset, 0) + 1 + self.report.num_tables_not_eligible_profiling[f"{project}.{dataset}"] = ( + self.report.num_tables_not_eligible_profiling.get( + f"{project}.{dataset}", 0 + ) + + 1 ) if not table.columns: @@ -276,107 +279,3 @@ WHERE profile_table_level_only=profile_table_level_only, ) return profile_request - - def is_dataset_eligible_for_profiling( - self, - dataset_name: str, - last_altered: Optional[datetime.datetime], - size_in_bytes: Optional[int], - rows_count: Optional[int], - ) -> bool: - threshold_time: Optional[datetime.datetime] = None - if self.config.profiling.profile_if_updated_since_days is not None: - threshold_time = datetime.datetime.now( - datetime.timezone.utc - ) - datetime.timedelta(self.config.profiling.profile_if_updated_since_days) - - return ( - ( - self.config.table_pattern.allowed(dataset_name) - and self.config.profile_pattern.allowed(dataset_name) - ) - and ( - (threshold_time is None) - or (last_altered is not None and last_altered >= threshold_time) - ) - and ( - self.config.profiling.profile_table_size_limit is None - or ( - size_in_bytes is not None - and size_in_bytes / (2**30) - <= self.config.profiling.profile_table_size_limit - ) # Note: Profiling is not allowed is size_in_bytes is not available - ) - and ( - self.config.profiling.profile_table_row_limit is None - or ( - rows_count is not None - and rows_count <= self.config.profiling.profile_table_row_limit - ) # Note: Profiling is not allowed is rows_count is not available - ) - ) - - def get_inspectors(self) -> Iterable[Inspector]: - # This method can be overridden in the case that you want to dynamically - # run on multiple databases. - - url = self.config.get_sql_alchemy_url() - logger.debug(f"sql_alchemy_url={url}") - engine = create_engine(url, **self.config.options) - with engine.connect() as conn: - inspector = inspect(conn) - yield inspector - - def get_profiler_instance(self) -> "DatahubGEProfiler": - logger.debug("Getting profiler instance from bigquery") - url = self.config.get_sql_alchemy_url() - - logger.debug(f"sql_alchemy_url={url}") - - engine = create_engine(url, **self.config.options) - with engine.connect() as conn: - inspector = inspect(conn) - - return DatahubGEProfiler( - conn=inspector.bind, - report=self.report, - config=self.config.profiling, - platform=self.platform, - ) - - def get_profile_args(self) -> Dict: - """Passed down to GE profiler""" - return {} - - def generate_profiles( - self, - requests: List[BigqueryProfilerRequest], - max_workers: int, - platform: Optional[str] = None, - profiler_args: Optional[Dict] = None, - ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]: - - ge_profile_requests: List[GEProfilerRequest] = [ - cast(GEProfilerRequest, request) - for request in requests - if not request.profile_table_level_only - ] - table_level_profile_requests: List[BigqueryProfilerRequest] = [ - request for request in requests if request.profile_table_level_only - ] - for request in table_level_profile_requests: - profile = DatasetProfile( - timestampMillis=int(datetime.datetime.now().timestamp() * 1000), - columnCount=len(request.table.columns), - rowCount=request.table.rows_count, - sizeInBytes=request.table.size_in_bytes, - ) - yield (request, profile) - - if not ge_profile_requests: - return - - ge_profiler = self.get_profiler_instance() - yield from ge_profiler.generate_profiles( - ge_profile_requests, max_workers, platform, profiler_args - ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index 1d48fdcb53..609dda0a73 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -102,12 +102,12 @@ class GEProfilingConfig(ConfigModel): ) profile_table_size_limit: Optional[int] = Field( - default=1, + default=5, description="Profile tables only if their size is less then specified GBs. If set to `null`, no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`", ) profile_table_row_limit: Optional[int] = Field( - default=50000, + default=5000000, description="Profile tables only if their row count is less then specified count. If set to `null`, no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`", ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py index 2cc2c91001..207c2d3dec 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py @@ -1,7 +1,6 @@ import dataclasses -import datetime import logging -from typing import Callable, Dict, Iterable, List, Optional, Tuple, cast +from typing import Callable, Iterable, List, Optional, cast from snowflake.sqlalchemy import snowdialect from sqlalchemy import create_engine, inspect @@ -21,9 +20,14 @@ from datahub.ingestion.source.snowflake.snowflake_schema import ( SnowflakeDatabase, SnowflakeTable, ) -from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin -from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile -from datahub.metadata.schema_classes import DatasetProfileClass +from datahub.ingestion.source.snowflake.snowflake_utils import ( + SnowflakeCommonMixin, + SnowflakeCommonProtocol, +) +from datahub.ingestion.source.sql.sql_generic_profiler import ( + GenericProfiler, + TableProfilerRequest, +) snowdialect.ischema_names["GEOGRAPHY"] = sqltypes.NullType @@ -36,14 +40,17 @@ class SnowflakeProfilerRequest(GEProfilerRequest): profile_table_level_only: bool = False -class SnowflakeProfiler(SnowflakeCommonMixin): +class SnowflakeProfiler(SnowflakeCommonMixin, GenericProfiler, SnowflakeCommonProtocol): + config: SnowflakeV2Config + report: SnowflakeV2Report + def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None: + super().__init__(config, report, self.platform) self.config = config self.report = report self.logger = logger def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit]: - # Extra default SQLAlchemy option for better connection pooling and threading. # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow if self.config.profiling.enabled: @@ -74,10 +81,13 @@ class SnowflakeProfiler(SnowflakeCommonMixin): if len(profile_requests) == 0: continue + + table_profile_requests = cast(List[TableProfilerRequest], profile_requests) + for request, profile in self.generate_profiles( - db.name, - profile_requests, + table_profile_requests, self.config.profiling.max_workers, + db.name, platform=self.platform, profiler_args=self.get_profile_args(), ): @@ -139,47 +149,11 @@ class SnowflakeProfiler(SnowflakeCommonMixin): ) return profile_request - def is_dataset_eligible_for_profiling( - self, - dataset_name: str, - last_altered: datetime.datetime, - size_in_bytes: int, - rows_count: Optional[int], - ) -> bool: - threshold_time: Optional[datetime.datetime] = None - if self.config.profiling.profile_if_updated_since_days is not None: - threshold_time = datetime.datetime.now( - datetime.timezone.utc - ) - datetime.timedelta(self.config.profiling.profile_if_updated_since_days) + def get_profiler_instance( + self, db_name: Optional[str] = None + ) -> "DatahubGEProfiler": + assert db_name - return ( - ( - self.config.table_pattern.allowed(dataset_name) - and self.config.profile_pattern.allowed(dataset_name) - ) - and (threshold_time is None or last_altered >= threshold_time) - and ( - self.config.profiling.profile_table_size_limit is None - or ( - size_in_bytes is not None - and size_in_bytes / (2**30) - <= self.config.profiling.profile_table_size_limit - ) - # Note: Profiling is not allowed is size_in_bytes is not available - # and self.config.profiling.profile_table_size_limit is set - ) - and ( - self.config.profiling.profile_table_row_limit is None - or ( - rows_count is not None - and rows_count <= self.config.profiling.profile_table_row_limit - ) - # Note: Profiling is not allowed is rows_count is not available - # and self.config.profiling.profile_table_row_limit is set - ) - ) - - def get_profiler_instance(self, db_name: str) -> "DatahubGEProfiler": url = self.config.get_sql_alchemy_url( database=db_name, username=self.config.username, @@ -204,10 +178,6 @@ class SnowflakeProfiler(SnowflakeCommonMixin): platform=self.platform, ) - def get_profile_args(self) -> Dict: - """Passed down to GE profiler""" - return {} - def callable_for_db_connection(self, db_name: str) -> Callable: def get_db_connection(): conn = self.config.get_connection() @@ -215,38 +185,3 @@ class SnowflakeProfiler(SnowflakeCommonMixin): return conn return get_db_connection - - def generate_profiles( - self, - db_name: str, - requests: List[SnowflakeProfilerRequest], - max_workers: int, - platform: Optional[str] = None, - profiler_args: Optional[Dict] = None, - ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]: - - ge_profile_requests: List[GEProfilerRequest] = [ - cast(GEProfilerRequest, request) - for request in requests - if not request.profile_table_level_only - ] - table_level_profile_requests: List[SnowflakeProfilerRequest] = [ - request for request in requests if request.profile_table_level_only - ] - for request in table_level_profile_requests: - profile = DatasetProfile( - timestampMillis=round(datetime.datetime.now().timestamp() * 1000), - columnCount=len(request.table.columns), - rowCount=request.table.rows_count, - sizeInBytes=request.table.size_in_bytes, - ) - yield (request, profile) - - if len(ge_profile_requests) == 0: - return - - # Otherwise, if column level profiling is enabled, use GE profiler. - ge_profiler = self.get_profiler_instance(db_name) - yield from ge_profiler.generate_profiles( - ge_profile_requests, max_workers, platform, profiler_args - ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py index 1b4d2e4a4f..e70c48771c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py @@ -1,8 +1,9 @@ +from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.ingestion.source_report.sql.snowflake import SnowflakeReport from datahub.ingestion.source_report.usage.snowflake_usage import SnowflakeUsageReport -class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport): +class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlReport): schemas_scanned: int = 0 databases_scanned: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py index ab5c0799bf..4a6f7760aa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py @@ -8,6 +8,7 @@ from snowflake.connector import SnowflakeConnection from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeQueryMixin +from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView logger: logging.Logger = logging.getLogger(__name__) @@ -28,13 +29,8 @@ class SnowflakeFK: referred_column_names: List[str] -@dataclass -class SnowflakeColumn: - name: str - ordinal_position: int - is_nullable: bool - data_type: str - comment: Optional[str] +@dataclass(frozen=True, eq=True) +class SnowflakeColumn(BaseColumn): character_maximum_length: Optional[int] numeric_precision: Optional[int] numeric_scale: Optional[int] @@ -60,14 +56,8 @@ class SnowflakeColumn: @dataclass -class SnowflakeTable: - name: str - created: datetime - last_altered: datetime - size_in_bytes: int - rows_count: int - comment: Optional[str] - clustering_key: str +class SnowflakeTable(BaseTable): + clustering_key: Optional[str] = None pk: Optional[SnowflakePK] = None columns: List[SnowflakeColumn] = field(default_factory=list) foreign_keys: List[SnowflakeFK] = field(default_factory=list) @@ -75,12 +65,7 @@ class SnowflakeTable: @dataclass -class SnowflakeView: - name: str - created: datetime - comment: Optional[str] - view_definition: str - last_altered: Optional[datetime] = None +class SnowflakeView(BaseView): columns: List[SnowflakeColumn] = field(default_factory=list) @@ -225,6 +210,7 @@ class SnowflakeDataDictionary(SnowflakeQueryMixin): # last_altered=table["last_altered"], comment=table["comment"], view_definition=table["text"], + last_altered=table["created_on"], ) ) return views @@ -245,6 +231,7 @@ class SnowflakeDataDictionary(SnowflakeQueryMixin): # last_altered=table["last_altered"], comment=table["comment"], view_definition=table["text"], + last_altered=table["created_on"], ) ) return views diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py index c3b9be555a..6ea5c442dc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py @@ -31,23 +31,12 @@ SNOWFLAKE_DEFAULT_CLOUD = SnowflakeCloudProvider.AWS # Required only for mypy, since we are using mixin classes, and not inheritance. # Reference - https://mypy.readthedocs.io/en/latest/more_types.html#mixin-classes class SnowflakeLoggingProtocol(Protocol): - @property - def logger(self) -> logging.Logger: - ... + logger: logging.Logger -class SnowflakeCommonProtocol(Protocol): - @property - def logger(self) -> logging.Logger: - ... - - @property - def config(self) -> SnowflakeV2Config: - ... - - @property - def report(self) -> SnowflakeV2Report: - ... +class SnowflakeCommonProtocol(SnowflakeLoggingProtocol, Protocol): + config: SnowflakeV2Config + report: SnowflakeV2Report def get_dataset_identifier( self, table_name: str, schema_name: str, db_name: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic.py index 01fd5d1ebb..a06589b19a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic.py @@ -1,3 +1,7 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Generic, List, Optional, TypeVar + from pydantic.fields import Field from datahub.ingestion.api.common import PipelineContext @@ -12,6 +16,42 @@ from datahub.ingestion.api.decorators import ( from datahub.ingestion.source.sql.sql_common import SQLAlchemyConfig, SQLAlchemySource +@dataclass(frozen=True, eq=True) +class BaseColumn: + name: str + ordinal_position: int + is_nullable: bool + data_type: str + comment: Optional[str] + + +SqlTableColumn = TypeVar("SqlTableColumn", bound="BaseColumn") + + +@dataclass +class BaseTable(Generic[SqlTableColumn]): + name: str + comment: Optional[str] + created: datetime + last_altered: datetime + size_in_bytes: int + rows_count: int + columns: List[SqlTableColumn] = field(default_factory=list) + ddl: Optional[str] = None + + +@dataclass +class BaseView(Generic[SqlTableColumn]): + name: str + comment: Optional[str] + created: datetime + last_altered: datetime + view_definition: str + size_in_bytes: Optional[int] = None + rows_count: Optional[int] = None + columns: List[SqlTableColumn] = field(default_factory=list) + + class SQLAlchemyGenericConfig(SQLAlchemyConfig): platform: str = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py new file mode 100644 index 0000000000..13d6febf9d --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -0,0 +1,165 @@ +import logging +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Dict, Iterable, List, Optional, Tuple, Union, cast + +from sqlalchemy import create_engine, inspect +from sqlalchemy.engine.reflection import Inspector + +from datahub.ingestion.source.ge_data_profiler import ( + DatahubGEProfiler, + GEProfilerRequest, +) +from datahub.ingestion.source.sql.sql_common import SQLAlchemyConfig, SQLSourceReport +from datahub.ingestion.source.sql.sql_generic import BaseTable, BaseView +from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile +from datahub.metadata.schema_classes import DatasetProfileClass +from datahub.utilities.stats_collections import TopKDict + + +@dataclass +class DetailedProfilerReportMixin: + profiling_skipped_not_updated: TopKDict[str, int] = field(default_factory=TopKDict) + profiling_skipped_size_limit: TopKDict[str, int] = field(default_factory=TopKDict) + + profiling_skipped_row_limit: TopKDict[str, int] = field(default_factory=TopKDict) + num_tables_not_eligible_profiling: Dict[str, int] = field(default_factory=TopKDict) + + +class ProfilingSqlReport(DetailedProfilerReportMixin, SQLSourceReport): + pass + + +@dataclass +class TableProfilerRequest(GEProfilerRequest): + table: Union[BaseTable, BaseView] + profile_table_level_only: bool = False + + +logger = logging.getLogger(__name__) + + +class GenericProfiler: + def __init__( + self, config: SQLAlchemyConfig, report: ProfilingSqlReport, platform: str + ) -> None: + self.config = config + self.report = report + self.platform = platform + + def generate_profiles( + self, + requests: List[TableProfilerRequest], + max_workers: int, + db_name: Optional[str] = None, + platform: Optional[str] = None, + profiler_args: Optional[Dict] = None, + ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]: + + ge_profile_requests: List[GEProfilerRequest] = [ + cast(GEProfilerRequest, request) + for request in requests + if not request.profile_table_level_only + ] + table_level_profile_requests: List[TableProfilerRequest] = [ + request for request in requests if request.profile_table_level_only + ] + for request in table_level_profile_requests: + profile = DatasetProfile( + timestampMillis=int(datetime.now().timestamp() * 1000), + columnCount=len(request.table.columns), + rowCount=request.table.rows_count, + sizeInBytes=request.table.size_in_bytes, + ) + yield (request, profile) + + if not ge_profile_requests: + return + + # Otherwise, if column level profiling is enabled, use GE profiler. + ge_profiler = self.get_profiler_instance(db_name) + yield from ge_profiler.generate_profiles( + ge_profile_requests, max_workers, platform, profiler_args + ) + + def get_inspectors(self) -> Iterable[Inspector]: + # This method can be overridden in the case that you want to dynamically + # run on multiple databases. + + url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url={url}") + engine = create_engine(url, **self.config.options) + with engine.connect() as conn: + inspector = inspect(conn) + yield inspector + + def get_profiler_instance( + self, db_name: Optional[str] = None + ) -> "DatahubGEProfiler": + logger.debug(f"Getting profiler instance from {self.platform}") + url = self.config.get_sql_alchemy_url() + + logger.debug(f"sql_alchemy_url={url}") + + engine = create_engine(url, **self.config.options) + with engine.connect() as conn: + inspector = inspect(conn) + + return DatahubGEProfiler( + conn=inspector.bind, + report=self.report, + config=self.config.profiling, + platform=self.platform, + ) + + def is_dataset_eligible_for_profiling( + self, + dataset_name: str, + last_altered: Optional[datetime], + size_in_bytes: Optional[int], + rows_count: Optional[int], + ) -> bool: + threshold_time: Optional[datetime] = None + if self.config.profiling.profile_if_updated_since_days is not None: + threshold_time = datetime.now(timezone.utc) - timedelta( + self.config.profiling.profile_if_updated_since_days + ) + + if not self.config.table_pattern.allowed( + dataset_name + ) or not self.config.profile_pattern.allowed(dataset_name): + return False + + schema_name = dataset_name.rsplit(".", 1)[0] + if (threshold_time is not None) and ( + last_altered is not None and last_altered < threshold_time + ): + self.report.profiling_skipped_not_updated[schema_name] = ( + self.report.profiling_skipped_not_updated.get(schema_name, 0) + 1 + ) + return False + + if self.config.profiling.profile_table_size_limit is not None and ( + size_in_bytes is None + or size_in_bytes / (2**30) + > self.config.profiling.profile_table_size_limit + ): + self.report.profiling_skipped_size_limit[schema_name] = ( + self.report.profiling_skipped_size_limit.get(schema_name, 0) + 1 + ) + return False + + if self.config.profiling.profile_table_row_limit is not None and ( + rows_count is None + or rows_count > self.config.profiling.profile_table_row_limit + ): + self.report.profiling_skipped_row_limit[schema_name] = ( + self.report.profiling_skipped_row_limit.get(schema_name, 0) + 1 + ) + return False + + return True + + def get_profile_args(self) -> Dict: + """Passed down to GE profiler""" + return {} diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source_report/sql/snowflake.py index 89c715efb1..7862e0780a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/sql/snowflake.py @@ -2,7 +2,7 @@ from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Optional -from datahub.ingestion.source.sql.sql_common import SQLSourceReport +from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.ingestion.source_report.time_window import BaseTimeWindowReport @@ -12,7 +12,7 @@ class BaseSnowflakeReport(BaseTimeWindowReport): @dataclass -class SnowflakeReport(BaseSnowflakeReport, SQLSourceReport): +class SnowflakeReport(BaseSnowflakeReport, ProfilingSqlReport): num_table_to_table_edges_scanned: int = 0 num_table_to_view_edges_scanned: int = 0 num_view_to_table_edges_scanned: int = 0 diff --git a/metadata-ingestion/tests/unit/test_bigquery_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_lineage.py index a27c61be7a..a7717aec82 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_lineage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_lineage.py @@ -24,7 +24,7 @@ FROM created=datetime.datetime.now(), last_altered=datetime.datetime.now(), comment="", - ddl=ddl, + view_definition=ddl, ) tables = extractor.parse_view_lineage("my_project", "my_dataset", view) assert 1 == len(tables) @@ -42,7 +42,7 @@ def test_parse_view_lineage_with_two_part_table_name(): created=datetime.datetime.now(), last_altered=datetime.datetime.now(), comment="", - ddl=ddl, + view_definition=ddl, ) tables = extractor.parse_view_lineage("my_project", "my_dataset", view) assert 1 == len(tables) @@ -60,7 +60,7 @@ def test_one_part_table(): created=datetime.datetime.now(), last_altered=datetime.datetime.now(), comment="", - ddl=ddl, + view_definition=ddl, ) tables = extractor.parse_view_lineage("my_project", "my_dataset", view) assert 1 == len(tables) @@ -78,7 +78,7 @@ def test_create_statement_with_multiple_table(): created=datetime.datetime.now(), last_altered=datetime.datetime.now(), comment="", - ddl=ddl, + view_definition=ddl, ) tables = extractor.parse_view_lineage("my_project", "my_dataset", view) tables.sort(key=lambda e: e.get_table_name())