diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index df179b0d0d..94ab1b0611 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -12,6 +12,10 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #9257: The Python SDK urn types are now autogenerated. The new classes are largely backwards compatible with the previous, manually written classes, but many older methods are now deprecated in favor of a more uniform interface. The only breaking change is that the signature for the director constructor e.g. `TagUrn("tag", ["tag_name"])` is no longer supported, and the simpler `TagUrn("tag_name")` should be used instead. The canonical place to import the urn classes from is `datahub.metadata.urns.*`. Other import paths, like `datahub.utilities.urns.corpuser_urn.CorpuserUrn` are retained for backwards compatibility, but are considered deprecated. - #9286: The `DataHubRestEmitter.emit` method no longer returns anything. It previously returned a tuple of timestamps. +- #8951: A great expectations based profiler has been added for the Unity Catalog source. +To use the old profiler, set `method: analyze` under the `profiling` section in your recipe. +To use the new profiler, set `method: ge`. Profiling is disabled by default, so to enable it, +one of these methods must be specified. ### Potential Downtime diff --git a/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md b/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md index ae2883343d..12540e1977 100644 --- a/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md +++ b/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md @@ -15,7 +15,8 @@ * [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html) + To ingest your workspace's notebooks and respective lineage, your service principal must have `CAN_READ` privileges on the folders containing the notebooks you want to ingest: [guide](https://docs.databricks.com/en/security/auth-authz/access-control/workspace-acl.html#folder-permissions). + To `include_usage_statistics` (enabled by default), your service principal must have `CAN_MANAGE` permissions on any SQL Warehouses you want to ingest: [guide](https://docs.databricks.com/security/auth-authz/access-control/sql-endpoint-acl.html). - + To ingest `profiling` information with `call_analyze` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile. + + To ingest `profiling` information with `method: ge`, you need `SELECT` privileges on all profiled tables. + + To ingest `profiling` information with `method: analyze` and `call_analyze: true` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile. * Alternatively, you can run [ANALYZE TABLE](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-analyze-table.html) yourself on any tables you want to profile, then set `call_analyze` to `false`. You will still need `SELECT` privilege on those tables to fetch the results. - Check the starter recipe below and replace `workspace_url` and `token` with your information from the previous steps. diff --git a/metadata-ingestion/docs/sources/databricks/unity-catalog_recipe.yml b/metadata-ingestion/docs/sources/databricks/unity-catalog_recipe.yml index 7bc336d5f2..931552e734 100644 --- a/metadata-ingestion/docs/sources/databricks/unity-catalog_recipe.yml +++ b/metadata-ingestion/docs/sources/databricks/unity-catalog_recipe.yml @@ -2,24 +2,38 @@ source: type: unity-catalog config: workspace_url: https://my-workspace.cloud.databricks.com - token: "mygenerated_databricks_token" - #metastore_id_pattern: - # deny: - # - 11111-2222-33333-44-555555 - #catalog_pattern: - # allow: - # - my-catalog - #schema_pattern: - # deny: - # - information_schema - #table_pattern: - # allow: - # - test.lineagedemo.dinner - # First you have to create domains on Datahub by following this guide -> https://datahubproject.io/docs/domains/#domains-setup-prerequisites-and-permissions - #domain: - # urn:li:domain:1111-222-333-444-555: - # allow: - # - main.* + token: "" + include_metastore: false + include_ownership: true + profiling: + method: "ge" + enabled: true + warehouse_id: "" + profile_table_level_only: false + max_wait_secs: 60 + pattern: + deny: + - ".*\\.unwanted_schema" + +# profiling: +# method: "analyze" +# enabled: true +# warehouse_id: "" +# profile_table_level_only: true +# call_analyze: true + +# catalogs: ["my_catalog"] +# schema_pattern: +# deny: +# - information_schema +# table_pattern: +# allow: +# - my_catalog.my_schema.my_table +# First you have to create domains on Datahub by following this guide -> https://datahubproject.io/docs/domains/#domains-setup-prerequisites-and-permissions +# domain: +# urn:li:domain:1111-222-333-444-555: +# allow: +# - main.* stateful_ingestion: enabled: true @@ -27,4 +41,4 @@ source: pipeline_name: acme-corp-unity -# sink configs if needed \ No newline at end of file +# sink configs if needed diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 69cbe8d823..dac865d2da 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -262,7 +262,8 @@ databricks = { "databricks-sdk>=0.9.0", "pyspark~=3.3.0", "requests", - "databricks-sql-connector", + # Version 2.4.0 includes sqlalchemy dialect, 2.8.0 includes some bug fixes + "databricks-sql-connector>=2.8.0", } mysql = sql_common | {"pymysql>=1.0.2"} @@ -393,7 +394,7 @@ plugins: Dict[str, Set[str]] = { "powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib, "powerbi-report-server": powerbi_report_server, "vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"}, - "unity-catalog": databricks | sqllineage_lib, + "unity-catalog": databricks | sql_common | sqllineage_lib, "fivetran": snowflake_common, } diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 8ae17600e0..4083eb6db7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -183,7 +183,7 @@ WHERE return yield from self.generate_profile_workunits( profile_requests, - self.config.profiling.max_workers, + max_workers=self.config.profiling.max_workers, platform=self.platform, profiler_args=self.get_profile_args(), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index c334a97680..abb415c90c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -27,6 +27,7 @@ from typing import ( import sqlalchemy as sa import sqlalchemy.sql.compiler +from great_expectations.core.profiler_types_mapping import ProfilerTypeMapping from great_expectations.core.util import convert_to_json_serializable from great_expectations.data_context import AbstractDataContext, BaseDataContext from great_expectations.data_context.types.base import ( @@ -77,8 +78,26 @@ MYSQL = "mysql" SNOWFLAKE = "snowflake" BIGQUERY = "bigquery" REDSHIFT = "redshift" +DATABRICKS = "databricks" TRINO = "trino" +# Type names for Databricks, to match Title Case types in sqlalchemy +ProfilerTypeMapping.INT_TYPE_NAMES.append("Integer") +ProfilerTypeMapping.INT_TYPE_NAMES.append("SmallInteger") +ProfilerTypeMapping.INT_TYPE_NAMES.append("BigInteger") +ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Float") +ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Numeric") +ProfilerTypeMapping.STRING_TYPE_NAMES.append("String") +ProfilerTypeMapping.STRING_TYPE_NAMES.append("Text") +ProfilerTypeMapping.STRING_TYPE_NAMES.append("Unicode") +ProfilerTypeMapping.STRING_TYPE_NAMES.append("UnicodeText") +ProfilerTypeMapping.BOOLEAN_TYPE_NAMES.append("Boolean") +ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Date") +ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("DateTime") +ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Time") +ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Interval") +ProfilerTypeMapping.BINARY_TYPE_NAMES.append("LargeBinary") + # The reason for this wacky structure is quite fun. GE basically assumes that # the config structures were generated directly from YML and further assumes that # they can be `deepcopy`'d without issue. The SQLAlchemy engine and connection @@ -697,6 +716,9 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase): 1, unique_count / non_null_count ) + if not profile.rowCount: + continue + self._get_dataset_column_sample_values(column_profile, column) if ( @@ -1172,7 +1194,7 @@ class DatahubGEProfiler: }, ) - if platform == BIGQUERY: + if platform == BIGQUERY or platform == DATABRICKS: # This is done as GE makes the name as DATASET.TABLE # but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups name_parts = pretty_name.split(".") diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py index 771636e849..6fa3504ced 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py @@ -59,8 +59,7 @@ class RedshiftProfiler(GenericProfiler): yield from self.generate_profile_workunits( profile_requests, - self.config.profiling.max_workers, - db, + max_workers=self.config.profiling.max_workers, platform=self.platform, profiler_args=self.get_profile_args(), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py index 8e18d85d6f..67953de47e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py @@ -62,8 +62,7 @@ class SnowflakeProfiler(GenericProfiler, SnowflakeCommonMixin): yield from self.generate_profile_workunits( profile_requests, - self.config.profiling.max_workers, - database.name, + max_workers=self.config.profiling.max_workers, platform=self.platform, profiler_args=self.get_profile_args(), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index aaeee5717a..e309ff0d15 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -69,8 +69,8 @@ class GenericProfiler: def generate_profile_workunits( self, requests: List[TableProfilerRequest], + *, max_workers: int, - db_name: Optional[str] = None, platform: Optional[str] = None, profiler_args: Optional[Dict] = None, ) -> Iterable[MetadataWorkUnit]: @@ -98,7 +98,7 @@ class GenericProfiler: return # Otherwise, if column level profiling is enabled, use GE profiler. - ge_profiler = self.get_profiler_instance(db_name) + ge_profiler = self.get_profiler_instance() for ge_profiler_request, profile in ge_profiler.generate_profiles( ge_profile_requests, max_workers, platform, profiler_args @@ -149,12 +149,18 @@ class GenericProfiler: profile_table_level_only = self.config.profiling.profile_table_level_only dataset_name = self.get_dataset_name(table.name, schema_name, db_name) if not self.is_dataset_eligible_for_profiling( - dataset_name, table.last_altered, table.size_in_bytes, table.rows_count + dataset_name, + last_altered=table.last_altered, + size_in_bytes=table.size_in_bytes, + rows_count=table.rows_count, ): # Profile only table level if dataset is filtered from profiling # due to size limits alone if self.is_dataset_eligible_for_profiling( - dataset_name, table.last_altered, 0, 0 + dataset_name, + last_altered=table.last_altered, + size_in_bytes=None, + rows_count=None, ): profile_table_level_only = True else: @@ -199,9 +205,7 @@ class GenericProfiler: inspector = inspect(conn) yield inspector - def get_profiler_instance( - self, db_name: Optional[str] = None - ) -> "DatahubGEProfiler": + def get_profiler_instance(self) -> "DatahubGEProfiler": logger.debug(f"Getting profiler instance from {self.platform}") url = self.config.get_sql_alchemy_url() @@ -221,9 +225,10 @@ class GenericProfiler: def is_dataset_eligible_for_profiling( self, dataset_name: str, - last_altered: Optional[datetime], - size_in_bytes: Optional[int], - rows_count: Optional[int], + *, + last_altered: Optional[datetime] = None, + size_in_bytes: Optional[int] = None, + rows_count: Optional[int] = None, ) -> bool: dataset_urn = make_dataset_urn_with_platform_instance( self.platform, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/analyze_profiler.py similarity index 96% rename from metadata-ingestion/src/datahub/ingestion/source/unity/profiler.py rename to metadata-ingestion/src/datahub/ingestion/source/unity/analyze_profiler.py index 8066932e3a..4c8b22f239 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/analyze_profiler.py @@ -6,7 +6,7 @@ from typing import Callable, Collection, Iterable, Optional from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.unity.config import UnityCatalogProfilerConfig +from datahub.ingestion.source.unity.config import UnityCatalogAnalyzeProfilerConfig from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy from datahub.ingestion.source.unity.proxy_types import ( ColumnProfile, @@ -23,8 +23,8 @@ logger = logging.getLogger(__name__) @dataclass -class UnityCatalogProfiler: - config: UnityCatalogProfilerConfig +class UnityCatalogAnalyzeProfiler: + config: UnityCatalogAnalyzeProfilerConfig report: UnityCatalogReport proxy: UnityCatalogApiProxy dataset_urn_builder: Callable[[TableReference], str] diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 4e3deedddb..2c567120b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -1,10 +1,12 @@ import logging import os from datetime import datetime, timedelta, timezone -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union +from urllib.parse import urlparse import pydantic from pydantic import Field +from typing_extensions import Literal from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import ( @@ -13,6 +15,9 @@ from datahub.configuration.source_common import ( ) from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field +from datahub.ingestion.source.ge_data_profiler import DATABRICKS +from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig +from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri from datahub.ingestion.source.state.stale_entity_removal_handler import ( StatefulStaleMetadataRemovalConfig, ) @@ -31,24 +36,20 @@ logger = logging.getLogger(__name__) class UnityCatalogProfilerConfig(ConfigModel): - # TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig - enabled: bool = Field( - default=False, description="Whether profiling should be done." - ) - operation_config: OperationConfig = Field( - default_factory=OperationConfig, - description="Experimental feature. To specify operation configs.", + method: str = Field( + description=( + "Profiling method to use." + " Options supported are `ge` and `analyze`." + " `ge` uses Great Expectations and runs SELECT SQL queries on profiled tables." + " `analyze` calls ANALYZE TABLE on profiled tables. Only works for delta tables." + ), ) + # TODO: Support cluster compute as well, for ge profiling warehouse_id: Optional[str] = Field( default=None, description="SQL Warehouse id, for running profiling queries." ) - profile_table_level_only: bool = Field( - default=False, - description="Whether to perform profiling at table-level only or include column-level profiling as well.", - ) - pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), description=( @@ -58,6 +59,24 @@ class UnityCatalogProfilerConfig(ConfigModel): ), ) + +class UnityCatalogAnalyzeProfilerConfig(UnityCatalogProfilerConfig): + method: Literal["analyze"] = "analyze" + + # TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig + enabled: bool = Field( + default=False, description="Whether profiling should be done." + ) + operation_config: OperationConfig = Field( + default_factory=OperationConfig, + description="Experimental feature. To specify operation configs.", + ) + + profile_table_level_only: bool = Field( + default=False, + description="Whether to perform profiling at table-level only or include column-level profiling as well.", + ) + call_analyze: bool = Field( default=True, description=( @@ -89,7 +108,17 @@ class UnityCatalogProfilerConfig(ConfigModel): return not self.profile_table_level_only +class UnityCatalogGEProfilerConfig(UnityCatalogProfilerConfig, GEProfilingConfig): + method: Literal["ge"] = "ge" + + max_wait_secs: Optional[int] = Field( + default=None, + description="Maximum time to wait for a table to be profiled.", + ) + + class UnityCatalogSourceConfig( + SQLCommonConfig, StatefulIngestionConfigBase, BaseUsageConfig, DatasetSourceConfigMixin, @@ -217,15 +246,34 @@ class UnityCatalogSourceConfig( description="Generate usage statistics.", ) - profiling: UnityCatalogProfilerConfig = Field( - default=UnityCatalogProfilerConfig(), description="Data profiling configuration" + profiling: Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig] = Field( # type: ignore + default=UnityCatalogGEProfilerConfig(), + description="Data profiling configuration", + discriminator="method", ) + scheme: str = DATABRICKS + + def get_sql_alchemy_url(self): + return make_sqlalchemy_uri( + scheme=self.scheme, + username="token", + password=self.token, + at=urlparse(self.workspace_url).netloc, + db=None, + uri_opts={ + "http_path": f"/sql/1.0/warehouses/{self.profiling.warehouse_id}" + }, + ) + def is_profiling_enabled(self) -> bool: return self.profiling.enabled and is_profiling_enabled( self.profiling.operation_config ) + def is_ge_profiling(self) -> bool: + return self.profiling.method == "ge" + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field( default=None, description="Unity Catalog Stateful Ingestion Config." ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py new file mode 100644 index 0000000000..e24ca83307 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py @@ -0,0 +1,170 @@ +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +from typing import Iterable, List, Optional + +from sqlalchemy import create_engine +from sqlalchemy.engine import Connection + +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.sql.sql_config import SQLCommonConfig +from datahub.ingestion.source.sql.sql_generic import BaseTable +from datahub.ingestion.source.sql.sql_generic_profiler import ( + GenericProfiler, + TableProfilerRequest, +) +from datahub.ingestion.source.unity.config import UnityCatalogGEProfilerConfig +from datahub.ingestion.source.unity.proxy_types import Table, TableReference +from datahub.ingestion.source.unity.report import UnityCatalogReport + +logger = logging.getLogger(__name__) + + +@dataclass(init=False) +class UnityCatalogSQLGenericTable(BaseTable): + ref: TableReference = field(init=False) + + def __init__(self, table: Table): + self.name = table.name + self.comment = table.comment + self.created = table.created_at + self.last_altered = table.updated_at + self.column_count = len(table.columns) + self.ref = table.ref + self.size_in_bytes = None + self.rows_count = None + self.ddl = None + + +class UnityCatalogGEProfiler(GenericProfiler): + sql_common_config: SQLCommonConfig + profiling_config: UnityCatalogGEProfilerConfig + report: UnityCatalogReport + + def __init__( + self, + sql_common_config: SQLCommonConfig, + profiling_config: UnityCatalogGEProfilerConfig, + report: UnityCatalogReport, + ) -> None: + super().__init__(sql_common_config, report, "databricks") + self.profiling_config = profiling_config + # TODO: Consider passing dataset urn builder directly + # So there is no repeated logic between this class and source.py + + def get_workunits(self, tables: List[Table]) -> Iterable[MetadataWorkUnit]: + # Extra default SQLAlchemy option for better connection pooling and threading. + # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow + self.config.options.setdefault( + "max_overflow", self.profiling_config.max_workers + ) + + url = self.config.get_sql_alchemy_url() + engine = create_engine(url, **self.config.options) + conn = engine.connect() + + profile_requests = [] + with ThreadPoolExecutor( + max_workers=self.profiling_config.max_workers + ) as executor: + futures = [ + executor.submit( + self.get_unity_profile_request, + UnityCatalogSQLGenericTable(table), + conn, + ) + for table in tables + ] + + try: + for i, completed in enumerate( + as_completed(futures, timeout=self.profiling_config.max_wait_secs) + ): + profile_request = completed.result() + if profile_request is not None: + profile_requests.append(profile_request) + if i > 0 and i % 100 == 0: + logger.info(f"Finished table-level profiling for {i} tables") + except TimeoutError: + logger.warning("Timed out waiting to complete table-level profiling.") + + if len(profile_requests) == 0: + return + + yield from self.generate_profile_workunits( + profile_requests, + max_workers=self.config.profiling.max_workers, + platform=self.platform, + profiler_args=self.get_profile_args(), + ) + + def get_dataset_name(self, table_name: str, schema_name: str, db_name: str) -> str: + # Note: unused... ideally should share logic with TableReference + return f"{db_name}.{schema_name}.{table_name}" + + def get_unity_profile_request( + self, table: UnityCatalogSQLGenericTable, conn: Connection + ) -> Optional[TableProfilerRequest]: + # TODO: Reduce code duplication with get_profile_request + skip_profiling = False + profile_table_level_only = self.profiling_config.profile_table_level_only + + dataset_name = table.ref.qualified_table_name + try: + table.size_in_bytes = _get_dataset_size_in_bytes(table, conn) + except Exception as e: + logger.warning(f"Failed to get table size for {dataset_name}: {e}") + + if table.size_in_bytes is None: + self.report.num_profile_missing_size_in_bytes += 1 + if not self.is_dataset_eligible_for_profiling( + dataset_name, + size_in_bytes=table.size_in_bytes, + last_altered=table.last_altered, + rows_count=0, # Can't get row count ahead of time + ): + # Profile only table level if dataset is filtered from profiling + # due to size limits alone + if self.is_dataset_eligible_for_profiling( + dataset_name, + last_altered=table.last_altered, + size_in_bytes=None, + rows_count=None, + ): + profile_table_level_only = True + else: + skip_profiling = True + + if table.column_count == 0: + skip_profiling = True + + if skip_profiling: + if self.profiling_config.report_dropped_profiles: + self.report.report_dropped(dataset_name) + return None + + self.report.report_entity_profiled(dataset_name) + logger.debug(f"Preparing profiling request for {dataset_name}") + return TableProfilerRequest( + table=table, + pretty_name=dataset_name, + batch_kwargs=dict(schema=table.ref.schema, table=table.name), + profile_table_level_only=profile_table_level_only, + ) + + +def _get_dataset_size_in_bytes( + table: UnityCatalogSQLGenericTable, conn: Connection +) -> Optional[int]: + name = ".".join( + conn.dialect.identifier_preparer.quote(c) + for c in [table.ref.catalog, table.ref.schema, table.ref.table] + ) + row = conn.execute(f"DESCRIBE DETAIL {name}").fetchone() + if row is None: + return None + else: + try: + return int(row._asdict()["sizeInBytes"]) + except Exception: + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py index 4153d9dd88..7f19b6e210 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py @@ -2,15 +2,13 @@ from dataclasses import dataclass, field from typing import Tuple from datahub.ingestion.api.report import EntityFilterReport -from datahub.ingestion.source.state.stale_entity_removal_handler import ( - StaleEntityRemovalSourceReport, -) +from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.utilities.lossy_collections import LossyDict, LossyList @dataclass -class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport): +class UnityCatalogReport(IngestionStageReport, ProfilingSqlReport): metastores: EntityFilterReport = EntityFilterReport.field(type="metastore") catalogs: EntityFilterReport = EntityFilterReport.field(type="catalog") schemas: EntityFilterReport = EntityFilterReport.field(type="schema") @@ -36,5 +34,6 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport): profile_table_errors: LossyDict[str, LossyList[Tuple[str, str]]] = field( default_factory=LossyDict ) + num_profile_missing_size_in_bytes: int = 0 num_profile_failed_unsupported_column_type: int = 0 num_profile_failed_int_casts: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 44b5bbbcb0..03b4f61a51 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -2,7 +2,6 @@ import logging import re import time from concurrent.futures import ThreadPoolExecutor -from datetime import timedelta from typing import Dict, Iterable, List, Optional, Set, Union from urllib.parse import urljoin @@ -52,9 +51,14 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import ( from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) -from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig +from datahub.ingestion.source.unity.analyze_profiler import UnityCatalogAnalyzeProfiler +from datahub.ingestion.source.unity.config import ( + UnityCatalogAnalyzeProfilerConfig, + UnityCatalogGEProfilerConfig, + UnityCatalogSourceConfig, +) from datahub.ingestion.source.unity.connection_test import UnityCatalogConnectionTest -from datahub.ingestion.source.unity.profiler import UnityCatalogProfiler +from datahub.ingestion.source.unity.ge_profiler import UnityCatalogGEProfiler from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy from datahub.ingestion.source.unity.proxy_types import ( DATA_TYPE_REGISTRY, @@ -170,6 +174,9 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource): self.view_refs: Set[TableReference] = set() self.notebooks: FileBackedDict[Notebook] = FileBackedDict() + # Global map of tables, for profiling + self.tables: FileBackedDict[Table] = FileBackedDict() + @staticmethod def test_connection(config_dict: dict) -> TestConnectionReport: return UnityCatalogConnectionTest(config_dict).get_connection_test() @@ -233,16 +240,24 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource): if self.config.is_profiling_enabled(): self.report.report_ingestion_stage_start("Wait on warehouse") assert wait_on_warehouse - timeout = timedelta(seconds=self.config.profiling.max_wait_secs) - wait_on_warehouse.result(timeout) - profiling_extractor = UnityCatalogProfiler( - self.config.profiling, - self.report, - self.unity_catalog_api_proxy, - self.gen_dataset_urn, - ) + wait_on_warehouse.result() + self.report.report_ingestion_stage_start("Profiling") - yield from profiling_extractor.get_workunits(self.table_refs) + if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig): + yield from UnityCatalogAnalyzeProfiler( + self.config.profiling, + self.report, + self.unity_catalog_api_proxy, + self.gen_dataset_urn, + ).get_workunits(self.table_refs) + elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig): + yield from UnityCatalogGEProfiler( + sql_common_config=self.config, + profiling_config=self.config.profiling, + report=self.report, + ).get_workunits(list(self.tables.values())) + else: + raise ValueError("Unknown profiling config method") def build_service_principal_map(self) -> None: try: @@ -358,6 +373,16 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource): self.report.tables.dropped(table.id, f"table ({table.table_type})") continue + if ( + self.config.is_profiling_enabled() + and self.config.is_ge_profiling() + and self.config.profiling.pattern.allowed( + table.ref.qualified_table_name + ) + and not table.is_view + ): + self.tables[table.ref.qualified_table_name] = table + if table.is_view: self.view_refs.add(table.ref) else: diff --git a/metadata-ingestion/tests/integration/mysql/mysql_mces_no_db_golden.json b/metadata-ingestion/tests/integration/mysql/mysql_mces_no_db_golden.json index 38b03ce238..a86ed53406 100644 --- a/metadata-ingestion/tests/integration/mysql/mysql_mces_no_db_golden.json +++ b/metadata-ingestion/tests/integration/mysql/mysql_mces_no_db_golden.json @@ -2254,30 +2254,17 @@ { "fieldPath": "id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "description", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "customer_id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 } ] } @@ -2625,8 +2612,7 @@ { "fieldPath": "col", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 } ] } @@ -2655,8 +2641,7 @@ { "fieldPath": "dummy", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 } ] } @@ -2738,4 +2723,4 @@ "lastRunId": "no-run-id-provided" } } -] \ No newline at end of file +] diff --git a/metadata-ingestion/tests/integration/mysql/mysql_mces_with_db_golden.json b/metadata-ingestion/tests/integration/mysql/mysql_mces_with_db_golden.json index 5cfba57247..b5ebca424d 100644 --- a/metadata-ingestion/tests/integration/mysql/mysql_mces_with_db_golden.json +++ b/metadata-ingestion/tests/integration/mysql/mysql_mces_with_db_golden.json @@ -16,7 +16,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -31,7 +32,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -46,7 +48,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -63,7 +66,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -80,7 +84,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -95,7 +100,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -110,7 +116,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -230,7 +237,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -247,7 +255,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -264,7 +273,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -284,7 +294,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -299,7 +310,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -395,7 +407,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -412,7 +425,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -429,7 +443,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -449,7 +464,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -572,7 +588,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -593,37 +610,25 @@ { "fieldPath": "id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "description", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "customer_id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 } ] } }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/mysql/mysql_table_row_count_estimate_only.json b/metadata-ingestion/tests/integration/mysql/mysql_table_row_count_estimate_only.json index 7597013bd8..634e049849 100644 --- a/metadata-ingestion/tests/integration/mysql/mysql_table_row_count_estimate_only.json +++ b/metadata-ingestion/tests/integration/mysql/mysql_table_row_count_estimate_only.json @@ -16,7 +16,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -31,7 +32,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -46,7 +48,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -63,7 +66,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -78,7 +82,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -93,7 +98,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -213,7 +219,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -230,7 +237,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -250,7 +258,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -265,7 +274,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -361,7 +371,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -378,7 +389,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -398,7 +410,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -420,88 +433,44 @@ "fieldPath": "id", "uniqueCount": 5, "uniqueProportion": 1, - "nullCount": 0, - "min": "1", - "max": "5", - "mean": "3.0", - "median": "3", - "stdev": "1.5811388300841898", - "sampleValues": [ - "1", - "2", - "3", - "4", - "5" - ] + "nullCount": 0 }, { "fieldPath": "company", "uniqueCount": 5, "uniqueProportion": 1, - "nullCount": 0, - "sampleValues": [ - "Company A", - "Company B", - "Company C", - "Company D", - "Company E" - ] + "nullCount": 0 }, { "fieldPath": "last_name", "uniqueCount": 5, "uniqueProportion": 1, - "nullCount": 0, - "sampleValues": [ - "Axen", - "Bedecs", - "Donnell", - "Gratacos Solsona", - "Lee" - ] + "nullCount": 0 }, { "fieldPath": "first_name", "uniqueCount": 5, "uniqueProportion": 1, - "nullCount": 0, - "sampleValues": [ - "Anna", - "Antonio", - "Christina", - "Martin", - "Thomas" - ] + "nullCount": 0 }, { "fieldPath": "email_address", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "priority", "uniqueCount": 3, "uniqueProportion": 0.75, - "nullCount": 0, - "min": "3.8", - "max": "4.9", - "mean": "4.175000011920929", - "median": "4.0", - "stdev": "0.49244294899530355", - "sampleValues": [ - "4.0", - "4.9", - "4.0", - "3.8" - ] + "nullCount": 0 } ] } }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -522,37 +491,25 @@ { "fieldPath": "id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "description", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "customer_id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 } ] } }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_unity_catalog_config.py b/metadata-ingestion/tests/unit/test_unity_catalog_config.py index 4be6f60171..4098ed4074 100644 --- a/metadata-ingestion/tests/unit/test_unity_catalog_config.py +++ b/metadata-ingestion/tests/unit/test_unity_catalog_config.py @@ -38,7 +38,11 @@ def test_profiling_requires_warehouses_id(): { "token": "token", "workspace_url": "https://workspace_url", - "profiling": {"enabled": True, "warehouse_id": "my_warehouse_id"}, + "profiling": { + "enabled": True, + "method": "ge", + "warehouse_id": "my_warehouse_id", + }, } ) assert config.profiling.enabled is True @@ -47,7 +51,7 @@ def test_profiling_requires_warehouses_id(): { "token": "token", "workspace_url": "https://workspace_url", - "profiling": {"enabled": False}, + "profiling": {"enabled": False, "method": "ge"}, } ) assert config.profiling.enabled is False