feat(ingest/unity): GE Profiling (#8951)

This commit is contained in:
Andrew Sikowitz 2023-12-06 13:59:23 -05:00 committed by GitHub
parent 7a2b8bf5f9
commit 27f23ecdd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 449 additions and 211 deletions

View File

@ -12,6 +12,10 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #9257: The Python SDK urn types are now autogenerated. The new classes are largely backwards compatible with the previous, manually written classes, but many older methods are now deprecated in favor of a more uniform interface. The only breaking change is that the signature for the director constructor e.g. `TagUrn("tag", ["tag_name"])` is no longer supported, and the simpler `TagUrn("tag_name")` should be used instead.
The canonical place to import the urn classes from is `datahub.metadata.urns.*`. Other import paths, like `datahub.utilities.urns.corpuser_urn.CorpuserUrn` are retained for backwards compatibility, but are considered deprecated.
- #9286: The `DataHubRestEmitter.emit` method no longer returns anything. It previously returned a tuple of timestamps.
- #8951: A great expectations based profiler has been added for the Unity Catalog source.
To use the old profiler, set `method: analyze` under the `profiling` section in your recipe.
To use the new profiler, set `method: ge`. Profiling is disabled by default, so to enable it,
one of these methods must be specified.
### Potential Downtime

View File

@ -15,7 +15,8 @@
* [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html)
+ To ingest your workspace's notebooks and respective lineage, your service principal must have `CAN_READ` privileges on the folders containing the notebooks you want to ingest: [guide](https://docs.databricks.com/en/security/auth-authz/access-control/workspace-acl.html#folder-permissions).
+ To `include_usage_statistics` (enabled by default), your service principal must have `CAN_MANAGE` permissions on any SQL Warehouses you want to ingest: [guide](https://docs.databricks.com/security/auth-authz/access-control/sql-endpoint-acl.html).
+ To ingest `profiling` information with `call_analyze` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile.
+ To ingest `profiling` information with `method: ge`, you need `SELECT` privileges on all profiled tables.
+ To ingest `profiling` information with `method: analyze` and `call_analyze: true` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile.
* Alternatively, you can run [ANALYZE TABLE](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-analyze-table.html) yourself on any tables you want to profile, then set `call_analyze` to `false`.
You will still need `SELECT` privilege on those tables to fetch the results.
- Check the starter recipe below and replace `workspace_url` and `token` with your information from the previous steps.

View File

@ -2,19 +2,33 @@ source:
type: unity-catalog
config:
workspace_url: https://my-workspace.cloud.databricks.com
token: "mygenerated_databricks_token"
#metastore_id_pattern:
# deny:
# - 11111-2222-33333-44-555555
#catalog_pattern:
# allow:
# - my-catalog
token: "<token>"
include_metastore: false
include_ownership: true
profiling:
method: "ge"
enabled: true
warehouse_id: "<warehouse_id>"
profile_table_level_only: false
max_wait_secs: 60
pattern:
deny:
- ".*\\.unwanted_schema"
# profiling:
# method: "analyze"
# enabled: true
# warehouse_id: "<warehouse_id>"
# profile_table_level_only: true
# call_analyze: true
# catalogs: ["my_catalog"]
# schema_pattern:
# deny:
# - information_schema
# table_pattern:
# allow:
# - test.lineagedemo.dinner
# - my_catalog.my_schema.my_table
# First you have to create domains on Datahub by following this guide -> https://datahubproject.io/docs/domains/#domains-setup-prerequisites-and-permissions
# domain:
# urn:li:domain:1111-222-333-444-555:

View File

@ -262,7 +262,8 @@ databricks = {
"databricks-sdk>=0.9.0",
"pyspark~=3.3.0",
"requests",
"databricks-sql-connector",
# Version 2.4.0 includes sqlalchemy dialect, 2.8.0 includes some bug fixes
"databricks-sql-connector>=2.8.0",
}
mysql = sql_common | {"pymysql>=1.0.2"}
@ -393,7 +394,7 @@ plugins: Dict[str, Set[str]] = {
"powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib,
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"},
"unity-catalog": databricks | sqllineage_lib,
"unity-catalog": databricks | sql_common | sqllineage_lib,
"fivetran": snowflake_common,
}

View File

@ -183,7 +183,7 @@ WHERE
return
yield from self.generate_profile_workunits(
profile_requests,
self.config.profiling.max_workers,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
)

View File

@ -27,6 +27,7 @@ from typing import (
import sqlalchemy as sa
import sqlalchemy.sql.compiler
from great_expectations.core.profiler_types_mapping import ProfilerTypeMapping
from great_expectations.core.util import convert_to_json_serializable
from great_expectations.data_context import AbstractDataContext, BaseDataContext
from great_expectations.data_context.types.base import (
@ -77,8 +78,26 @@ MYSQL = "mysql"
SNOWFLAKE = "snowflake"
BIGQUERY = "bigquery"
REDSHIFT = "redshift"
DATABRICKS = "databricks"
TRINO = "trino"
# Type names for Databricks, to match Title Case types in sqlalchemy
ProfilerTypeMapping.INT_TYPE_NAMES.append("Integer")
ProfilerTypeMapping.INT_TYPE_NAMES.append("SmallInteger")
ProfilerTypeMapping.INT_TYPE_NAMES.append("BigInteger")
ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Float")
ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Numeric")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("String")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("Text")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("Unicode")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("UnicodeText")
ProfilerTypeMapping.BOOLEAN_TYPE_NAMES.append("Boolean")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Date")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("DateTime")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Time")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Interval")
ProfilerTypeMapping.BINARY_TYPE_NAMES.append("LargeBinary")
# The reason for this wacky structure is quite fun. GE basically assumes that
# the config structures were generated directly from YML and further assumes that
# they can be `deepcopy`'d without issue. The SQLAlchemy engine and connection
@ -697,6 +716,9 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
1, unique_count / non_null_count
)
if not profile.rowCount:
continue
self._get_dataset_column_sample_values(column_profile, column)
if (
@ -1172,7 +1194,7 @@ class DatahubGEProfiler:
},
)
if platform == BIGQUERY:
if platform == BIGQUERY or platform == DATABRICKS:
# This is done as GE makes the name as DATASET.TABLE
# but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups
name_parts = pretty_name.split(".")

View File

@ -59,8 +59,7 @@ class RedshiftProfiler(GenericProfiler):
yield from self.generate_profile_workunits(
profile_requests,
self.config.profiling.max_workers,
db,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
)

View File

@ -62,8 +62,7 @@ class SnowflakeProfiler(GenericProfiler, SnowflakeCommonMixin):
yield from self.generate_profile_workunits(
profile_requests,
self.config.profiling.max_workers,
database.name,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
)

View File

@ -69,8 +69,8 @@ class GenericProfiler:
def generate_profile_workunits(
self,
requests: List[TableProfilerRequest],
*,
max_workers: int,
db_name: Optional[str] = None,
platform: Optional[str] = None,
profiler_args: Optional[Dict] = None,
) -> Iterable[MetadataWorkUnit]:
@ -98,7 +98,7 @@ class GenericProfiler:
return
# Otherwise, if column level profiling is enabled, use GE profiler.
ge_profiler = self.get_profiler_instance(db_name)
ge_profiler = self.get_profiler_instance()
for ge_profiler_request, profile in ge_profiler.generate_profiles(
ge_profile_requests, max_workers, platform, profiler_args
@ -149,12 +149,18 @@ class GenericProfiler:
profile_table_level_only = self.config.profiling.profile_table_level_only
dataset_name = self.get_dataset_name(table.name, schema_name, db_name)
if not self.is_dataset_eligible_for_profiling(
dataset_name, table.last_altered, table.size_in_bytes, table.rows_count
dataset_name,
last_altered=table.last_altered,
size_in_bytes=table.size_in_bytes,
rows_count=table.rows_count,
):
# Profile only table level if dataset is filtered from profiling
# due to size limits alone
if self.is_dataset_eligible_for_profiling(
dataset_name, table.last_altered, 0, 0
dataset_name,
last_altered=table.last_altered,
size_in_bytes=None,
rows_count=None,
):
profile_table_level_only = True
else:
@ -199,9 +205,7 @@ class GenericProfiler:
inspector = inspect(conn)
yield inspector
def get_profiler_instance(
self, db_name: Optional[str] = None
) -> "DatahubGEProfiler":
def get_profiler_instance(self) -> "DatahubGEProfiler":
logger.debug(f"Getting profiler instance from {self.platform}")
url = self.config.get_sql_alchemy_url()
@ -221,9 +225,10 @@ class GenericProfiler:
def is_dataset_eligible_for_profiling(
self,
dataset_name: str,
last_altered: Optional[datetime],
size_in_bytes: Optional[int],
rows_count: Optional[int],
*,
last_altered: Optional[datetime] = None,
size_in_bytes: Optional[int] = None,
rows_count: Optional[int] = None,
) -> bool:
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,

View File

@ -6,7 +6,7 @@ from typing import Callable, Collection, Iterable, Optional
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.unity.config import UnityCatalogProfilerConfig
from datahub.ingestion.source.unity.config import UnityCatalogAnalyzeProfilerConfig
from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy
from datahub.ingestion.source.unity.proxy_types import (
ColumnProfile,
@ -23,8 +23,8 @@ logger = logging.getLogger(__name__)
@dataclass
class UnityCatalogProfiler:
config: UnityCatalogProfilerConfig
class UnityCatalogAnalyzeProfiler:
config: UnityCatalogAnalyzeProfilerConfig
report: UnityCatalogReport
proxy: UnityCatalogApiProxy
dataset_urn_builder: Callable[[TableReference], str]

View File

@ -1,10 +1,12 @@
import logging
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urlparse
import pydantic
from pydantic import Field
from typing_extensions import Literal
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import (
@ -13,6 +15,9 @@ from datahub.configuration.source_common import (
)
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.ge_data_profiler import DATABRICKS
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
@ -31,24 +36,20 @@ logger = logging.getLogger(__name__)
class UnityCatalogProfilerConfig(ConfigModel):
# TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig
enabled: bool = Field(
default=False, description="Whether profiling should be done."
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
method: str = Field(
description=(
"Profiling method to use."
" Options supported are `ge` and `analyze`."
" `ge` uses Great Expectations and runs SELECT SQL queries on profiled tables."
" `analyze` calls ANALYZE TABLE on profiled tables. Only works for delta tables."
),
)
# TODO: Support cluster compute as well, for ge profiling
warehouse_id: Optional[str] = Field(
default=None, description="SQL Warehouse id, for running profiling queries."
)
profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only or include column-level profiling as well.",
)
pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(
@ -58,6 +59,24 @@ class UnityCatalogProfilerConfig(ConfigModel):
),
)
class UnityCatalogAnalyzeProfilerConfig(UnityCatalogProfilerConfig):
method: Literal["analyze"] = "analyze"
# TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig
enabled: bool = Field(
default=False, description="Whether profiling should be done."
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)
profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only or include column-level profiling as well.",
)
call_analyze: bool = Field(
default=True,
description=(
@ -89,7 +108,17 @@ class UnityCatalogProfilerConfig(ConfigModel):
return not self.profile_table_level_only
class UnityCatalogGEProfilerConfig(UnityCatalogProfilerConfig, GEProfilingConfig):
method: Literal["ge"] = "ge"
max_wait_secs: Optional[int] = Field(
default=None,
description="Maximum time to wait for a table to be profiled.",
)
class UnityCatalogSourceConfig(
SQLCommonConfig,
StatefulIngestionConfigBase,
BaseUsageConfig,
DatasetSourceConfigMixin,
@ -217,8 +246,24 @@ class UnityCatalogSourceConfig(
description="Generate usage statistics.",
)
profiling: UnityCatalogProfilerConfig = Field(
default=UnityCatalogProfilerConfig(), description="Data profiling configuration"
profiling: Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig] = Field( # type: ignore
default=UnityCatalogGEProfilerConfig(),
description="Data profiling configuration",
discriminator="method",
)
scheme: str = DATABRICKS
def get_sql_alchemy_url(self):
return make_sqlalchemy_uri(
scheme=self.scheme,
username="token",
password=self.token,
at=urlparse(self.workspace_url).netloc,
db=None,
uri_opts={
"http_path": f"/sql/1.0/warehouses/{self.profiling.warehouse_id}"
},
)
def is_profiling_enabled(self) -> bool:
@ -226,6 +271,9 @@ class UnityCatalogSourceConfig(
self.profiling.operation_config
)
def is_ge_profiling(self) -> bool:
return self.profiling.method == "ge"
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Unity Catalog Stateful Ingestion Config."
)

View File

@ -0,0 +1,170 @@
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import Iterable, List, Optional
from sqlalchemy import create_engine
from sqlalchemy.engine import Connection
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.sql_generic import BaseTable
from datahub.ingestion.source.sql.sql_generic_profiler import (
GenericProfiler,
TableProfilerRequest,
)
from datahub.ingestion.source.unity.config import UnityCatalogGEProfilerConfig
from datahub.ingestion.source.unity.proxy_types import Table, TableReference
from datahub.ingestion.source.unity.report import UnityCatalogReport
logger = logging.getLogger(__name__)
@dataclass(init=False)
class UnityCatalogSQLGenericTable(BaseTable):
ref: TableReference = field(init=False)
def __init__(self, table: Table):
self.name = table.name
self.comment = table.comment
self.created = table.created_at
self.last_altered = table.updated_at
self.column_count = len(table.columns)
self.ref = table.ref
self.size_in_bytes = None
self.rows_count = None
self.ddl = None
class UnityCatalogGEProfiler(GenericProfiler):
sql_common_config: SQLCommonConfig
profiling_config: UnityCatalogGEProfilerConfig
report: UnityCatalogReport
def __init__(
self,
sql_common_config: SQLCommonConfig,
profiling_config: UnityCatalogGEProfilerConfig,
report: UnityCatalogReport,
) -> None:
super().__init__(sql_common_config, report, "databricks")
self.profiling_config = profiling_config
# TODO: Consider passing dataset urn builder directly
# So there is no repeated logic between this class and source.py
def get_workunits(self, tables: List[Table]) -> Iterable[MetadataWorkUnit]:
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
self.config.options.setdefault(
"max_overflow", self.profiling_config.max_workers
)
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
conn = engine.connect()
profile_requests = []
with ThreadPoolExecutor(
max_workers=self.profiling_config.max_workers
) as executor:
futures = [
executor.submit(
self.get_unity_profile_request,
UnityCatalogSQLGenericTable(table),
conn,
)
for table in tables
]
try:
for i, completed in enumerate(
as_completed(futures, timeout=self.profiling_config.max_wait_secs)
):
profile_request = completed.result()
if profile_request is not None:
profile_requests.append(profile_request)
if i > 0 and i % 100 == 0:
logger.info(f"Finished table-level profiling for {i} tables")
except TimeoutError:
logger.warning("Timed out waiting to complete table-level profiling.")
if len(profile_requests) == 0:
return
yield from self.generate_profile_workunits(
profile_requests,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
)
def get_dataset_name(self, table_name: str, schema_name: str, db_name: str) -> str:
# Note: unused... ideally should share logic with TableReference
return f"{db_name}.{schema_name}.{table_name}"
def get_unity_profile_request(
self, table: UnityCatalogSQLGenericTable, conn: Connection
) -> Optional[TableProfilerRequest]:
# TODO: Reduce code duplication with get_profile_request
skip_profiling = False
profile_table_level_only = self.profiling_config.profile_table_level_only
dataset_name = table.ref.qualified_table_name
try:
table.size_in_bytes = _get_dataset_size_in_bytes(table, conn)
except Exception as e:
logger.warning(f"Failed to get table size for {dataset_name}: {e}")
if table.size_in_bytes is None:
self.report.num_profile_missing_size_in_bytes += 1
if not self.is_dataset_eligible_for_profiling(
dataset_name,
size_in_bytes=table.size_in_bytes,
last_altered=table.last_altered,
rows_count=0, # Can't get row count ahead of time
):
# Profile only table level if dataset is filtered from profiling
# due to size limits alone
if self.is_dataset_eligible_for_profiling(
dataset_name,
last_altered=table.last_altered,
size_in_bytes=None,
rows_count=None,
):
profile_table_level_only = True
else:
skip_profiling = True
if table.column_count == 0:
skip_profiling = True
if skip_profiling:
if self.profiling_config.report_dropped_profiles:
self.report.report_dropped(dataset_name)
return None
self.report.report_entity_profiled(dataset_name)
logger.debug(f"Preparing profiling request for {dataset_name}")
return TableProfilerRequest(
table=table,
pretty_name=dataset_name,
batch_kwargs=dict(schema=table.ref.schema, table=table.name),
profile_table_level_only=profile_table_level_only,
)
def _get_dataset_size_in_bytes(
table: UnityCatalogSQLGenericTable, conn: Connection
) -> Optional[int]:
name = ".".join(
conn.dialect.identifier_preparer.quote(c)
for c in [table.ref.catalog, table.ref.schema, table.ref.table]
)
row = conn.execute(f"DESCRIBE DETAIL {name}").fetchone()
if row is None:
return None
else:
try:
return int(row._asdict()["sizeInBytes"])
except Exception:
return None

View File

@ -2,15 +2,13 @@ from dataclasses import dataclass, field
from typing import Tuple
from datahub.ingestion.api.report import EntityFilterReport
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
)
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.utilities.lossy_collections import LossyDict, LossyList
@dataclass
class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport):
class UnityCatalogReport(IngestionStageReport, ProfilingSqlReport):
metastores: EntityFilterReport = EntityFilterReport.field(type="metastore")
catalogs: EntityFilterReport = EntityFilterReport.field(type="catalog")
schemas: EntityFilterReport = EntityFilterReport.field(type="schema")
@ -36,5 +34,6 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport):
profile_table_errors: LossyDict[str, LossyList[Tuple[str, str]]] = field(
default_factory=LossyDict
)
num_profile_missing_size_in_bytes: int = 0
num_profile_failed_unsupported_column_type: int = 0
num_profile_failed_int_casts: int = 0

View File

@ -2,7 +2,6 @@ import logging
import re
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from typing import Dict, Iterable, List, Optional, Set, Union
from urllib.parse import urljoin
@ -52,9 +51,14 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import (
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig
from datahub.ingestion.source.unity.analyze_profiler import UnityCatalogAnalyzeProfiler
from datahub.ingestion.source.unity.config import (
UnityCatalogAnalyzeProfilerConfig,
UnityCatalogGEProfilerConfig,
UnityCatalogSourceConfig,
)
from datahub.ingestion.source.unity.connection_test import UnityCatalogConnectionTest
from datahub.ingestion.source.unity.profiler import UnityCatalogProfiler
from datahub.ingestion.source.unity.ge_profiler import UnityCatalogGEProfiler
from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy
from datahub.ingestion.source.unity.proxy_types import (
DATA_TYPE_REGISTRY,
@ -170,6 +174,9 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.view_refs: Set[TableReference] = set()
self.notebooks: FileBackedDict[Notebook] = FileBackedDict()
# Global map of tables, for profiling
self.tables: FileBackedDict[Table] = FileBackedDict()
@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
return UnityCatalogConnectionTest(config_dict).get_connection_test()
@ -233,16 +240,24 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
if self.config.is_profiling_enabled():
self.report.report_ingestion_stage_start("Wait on warehouse")
assert wait_on_warehouse
timeout = timedelta(seconds=self.config.profiling.max_wait_secs)
wait_on_warehouse.result(timeout)
profiling_extractor = UnityCatalogProfiler(
wait_on_warehouse.result()
self.report.report_ingestion_stage_start("Profiling")
if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig):
yield from UnityCatalogAnalyzeProfiler(
self.config.profiling,
self.report,
self.unity_catalog_api_proxy,
self.gen_dataset_urn,
)
self.report.report_ingestion_stage_start("Profiling")
yield from profiling_extractor.get_workunits(self.table_refs)
).get_workunits(self.table_refs)
elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig):
yield from UnityCatalogGEProfiler(
sql_common_config=self.config,
profiling_config=self.config.profiling,
report=self.report,
).get_workunits(list(self.tables.values()))
else:
raise ValueError("Unknown profiling config method")
def build_service_principal_map(self) -> None:
try:
@ -358,6 +373,16 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.report.tables.dropped(table.id, f"table ({table.table_type})")
continue
if (
self.config.is_profiling_enabled()
and self.config.is_ge_profiling()
and self.config.profiling.pattern.allowed(
table.ref.qualified_table_name
)
and not table.is_view
):
self.tables[table.ref.qualified_table_name] = table
if table.is_view:
self.view_refs.add(table.ref)
else:

View File

@ -2254,30 +2254,17 @@
{
"fieldPath": "id",
"uniqueCount": 0,
"nullCount": 0,
"min": "None",
"max": "None",
"mean": "None",
"median": "None",
"stdev": "0.0",
"sampleValues": []
"nullCount": 0
},
{
"fieldPath": "description",
"uniqueCount": 0,
"nullCount": 0,
"sampleValues": []
"nullCount": 0
},
{
"fieldPath": "customer_id",
"uniqueCount": 0,
"nullCount": 0,
"min": "None",
"max": "None",
"mean": "None",
"median": "None",
"stdev": "0.0",
"sampleValues": []
"nullCount": 0
}
]
}
@ -2625,8 +2612,7 @@
{
"fieldPath": "col",
"uniqueCount": 0,
"nullCount": 0,
"sampleValues": []
"nullCount": 0
}
]
}
@ -2655,8 +2641,7 @@
{
"fieldPath": "dummy",
"uniqueCount": 0,
"nullCount": 0,
"sampleValues": []
"nullCount": 0
}
]
}

View File

@ -16,7 +16,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -31,7 +32,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -46,7 +48,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -63,7 +66,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -80,7 +84,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -95,7 +100,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -110,7 +116,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -230,7 +237,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -247,7 +255,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -264,7 +273,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -284,7 +294,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -299,7 +310,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -395,7 +407,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -412,7 +425,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -429,7 +443,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -449,7 +464,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -572,7 +588,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -593,37 +610,25 @@
{
"fieldPath": "id",
"uniqueCount": 0,
"nullCount": 0,
"min": "None",
"max": "None",
"mean": "None",
"median": "None",
"stdev": "0.0",
"sampleValues": []
"nullCount": 0
},
{
"fieldPath": "description",
"uniqueCount": 0,
"nullCount": 0,
"sampleValues": []
"nullCount": 0
},
{
"fieldPath": "customer_id",
"uniqueCount": 0,
"nullCount": 0,
"min": "None",
"max": "None",
"mean": "None",
"median": "None",
"stdev": "0.0",
"sampleValues": []
"nullCount": 0
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test"
"runId": "mysql-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -16,7 +16,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -31,7 +32,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -46,7 +48,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -63,7 +66,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -78,7 +82,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -93,7 +98,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -213,7 +219,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -230,7 +237,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -250,7 +258,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -265,7 +274,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -361,7 +371,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -378,7 +389,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -398,7 +410,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -420,88 +433,44 @@
"fieldPath": "id",
"uniqueCount": 5,
"uniqueProportion": 1,
"nullCount": 0,
"min": "1",
"max": "5",
"mean": "3.0",
"median": "3",
"stdev": "1.5811388300841898",
"sampleValues": [
"1",
"2",
"3",
"4",
"5"
]
"nullCount": 0
},
{
"fieldPath": "company",
"uniqueCount": 5,
"uniqueProportion": 1,
"nullCount": 0,
"sampleValues": [
"Company A",
"Company B",
"Company C",
"Company D",
"Company E"
]
"nullCount": 0
},
{
"fieldPath": "last_name",
"uniqueCount": 5,
"uniqueProportion": 1,
"nullCount": 0,
"sampleValues": [
"Axen",
"Bedecs",
"Donnell",
"Gratacos Solsona",
"Lee"
]
"nullCount": 0
},
{
"fieldPath": "first_name",
"uniqueCount": 5,
"uniqueProportion": 1,
"nullCount": 0,
"sampleValues": [
"Anna",
"Antonio",
"Christina",
"Martin",
"Thomas"
]
"nullCount": 0
},
{
"fieldPath": "email_address",
"uniqueCount": 0,
"nullCount": 0,
"sampleValues": []
"nullCount": 0
},
{
"fieldPath": "priority",
"uniqueCount": 3,
"uniqueProportion": 0.75,
"nullCount": 0,
"min": "3.8",
"max": "4.9",
"mean": "4.175000011920929",
"median": "4.0",
"stdev": "0.49244294899530355",
"sampleValues": [
"4.0",
"4.9",
"4.0",
"3.8"
]
"nullCount": 0
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
@ -522,37 +491,25 @@
{
"fieldPath": "id",
"uniqueCount": 0,
"nullCount": 0,
"min": "None",
"max": "None",
"mean": "None",
"median": "None",
"stdev": "0.0",
"sampleValues": []
"nullCount": 0
},
{
"fieldPath": "description",
"uniqueCount": 0,
"nullCount": 0,
"sampleValues": []
"nullCount": 0
},
{
"fieldPath": "customer_id",
"uniqueCount": 0,
"nullCount": 0,
"min": "None",
"max": "None",
"mean": "None",
"median": "None",
"stdev": "0.0",
"sampleValues": []
"nullCount": 0
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
"runId": "mysql-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -38,7 +38,11 @@ def test_profiling_requires_warehouses_id():
{
"token": "token",
"workspace_url": "https://workspace_url",
"profiling": {"enabled": True, "warehouse_id": "my_warehouse_id"},
"profiling": {
"enabled": True,
"method": "ge",
"warehouse_id": "my_warehouse_id",
},
}
)
assert config.profiling.enabled is True
@ -47,7 +51,7 @@ def test_profiling_requires_warehouses_id():
{
"token": "token",
"workspace_url": "https://workspace_url",
"profiling": {"enabled": False},
"profiling": {"enabled": False, "method": "ge"},
}
)
assert config.profiling.enabled is False