mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-01 11:19:05 +00:00
fix(ingest): profiling - Changing profiling defaults on Bigquery and Snowflake (#6640)
This commit is contained in:
parent
15e9475efb
commit
9a1f78fc60
@ -792,7 +792,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
|
||||
)
|
||||
|
||||
view = cast(BigqueryView, table)
|
||||
view_definition_string = view.ddl
|
||||
view_definition_string = view.view_definition
|
||||
view_properties_aspect = ViewProperties(
|
||||
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
|
||||
)
|
||||
|
||||
@ -6,13 +6,13 @@ from typing import Counter, Dict, List, Optional
|
||||
|
||||
import pydantic
|
||||
|
||||
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
|
||||
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
|
||||
from datahub.utilities.lossy_collections import LossyDict, LossyList
|
||||
from datahub.utilities.stats_collections import TopKDict
|
||||
|
||||
|
||||
@dataclass
|
||||
class BigQueryV2Report(SQLSourceReport):
|
||||
class BigQueryV2Report(ProfilingSqlReport):
|
||||
num_total_lineage_entries: TopKDict[str, int] = field(default_factory=TopKDict)
|
||||
num_skipped_lineage_entries_missing_data: TopKDict[str, int] = field(
|
||||
default_factory=TopKDict
|
||||
@ -55,7 +55,6 @@ class BigQueryV2Report(SQLSourceReport):
|
||||
upstream_lineage: LossyDict = field(default_factory=LossyDict)
|
||||
partition_info: Dict[str, str] = field(default_factory=TopKDict)
|
||||
profile_table_selection_criteria: Dict[str, str] = field(default_factory=TopKDict)
|
||||
num_tables_not_eligible_profiling: Dict[str, int] = field(default_factory=TopKDict)
|
||||
selected_profile_tables: Dict[str, List[str]] = field(default_factory=TopKDict)
|
||||
invalid_partition_ids: Dict[str, str] = field(default_factory=TopKDict)
|
||||
allow_pattern: Optional[str] = None
|
||||
|
||||
@ -8,49 +8,33 @@ from google.cloud import bigquery
|
||||
from google.cloud.bigquery.table import RowIterator, TableListItem, TimePartitioning
|
||||
|
||||
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
|
||||
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True, eq=True)
|
||||
class BigqueryColumn:
|
||||
name: str
|
||||
ordinal_position: int
|
||||
class BigqueryColumn(BaseColumn):
|
||||
field_path: str
|
||||
is_nullable: bool
|
||||
is_partition_column: bool
|
||||
data_type: str
|
||||
comment: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class BigqueryTable:
|
||||
name: str
|
||||
created: datetime
|
||||
last_altered: Optional[datetime]
|
||||
size_in_bytes: Optional[int]
|
||||
rows_count: Optional[int]
|
||||
expires: Optional[datetime]
|
||||
clustering_fields: Optional[List[str]]
|
||||
labels: Optional[Dict[str, str]]
|
||||
num_partitions: Optional[int]
|
||||
max_partition_id: Optional[str]
|
||||
max_shard_id: Optional[str]
|
||||
active_billable_bytes: Optional[int]
|
||||
long_term_billable_bytes: Optional[int]
|
||||
comment: str
|
||||
ddl: str
|
||||
time_partitioning: TimePartitioning
|
||||
class BigqueryTable(BaseTable):
|
||||
expires: Optional[datetime] = None
|
||||
clustering_fields: Optional[List[str]] = None
|
||||
labels: Optional[Dict[str, str]] = None
|
||||
num_partitions: Optional[int] = None
|
||||
max_partition_id: Optional[str] = None
|
||||
max_shard_id: Optional[str] = None
|
||||
active_billable_bytes: Optional[int] = None
|
||||
long_term_billable_bytes: Optional[int] = None
|
||||
time_partitioning: Optional[TimePartitioning] = None
|
||||
columns: List[BigqueryColumn] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BigqueryView:
|
||||
name: str
|
||||
created: datetime
|
||||
last_altered: datetime
|
||||
comment: str
|
||||
ddl: str
|
||||
class BigqueryView(BaseView):
|
||||
columns: List[BigqueryColumn] = field(default_factory=list)
|
||||
|
||||
|
||||
@ -350,7 +334,7 @@ class BigQueryDataDictionary:
|
||||
table.last_altered / 1000, tz=timezone.utc
|
||||
)
|
||||
if "last_altered" in table
|
||||
else None,
|
||||
else table.created,
|
||||
size_in_bytes=table.get("bytes"),
|
||||
rows_count=table.get("row_count"),
|
||||
comment=table.comment,
|
||||
@ -404,9 +388,11 @@ class BigQueryDataDictionary:
|
||||
BigqueryView(
|
||||
name=table.table_name,
|
||||
created=table.created,
|
||||
last_altered=table.last_altered if "last_altered" in table else None,
|
||||
last_altered=table.last_altered
|
||||
if "last_altered" in table
|
||||
else table.created,
|
||||
comment=table.comment,
|
||||
ddl=table.view_definition,
|
||||
view_definition=table.view_definition,
|
||||
)
|
||||
for table in cur
|
||||
]
|
||||
|
||||
@ -468,15 +468,15 @@ timestamp < "{end_time}"
|
||||
self, project: str, dataset: str, view: BigqueryView
|
||||
) -> List[BigqueryTableIdentifier]:
|
||||
parsed_tables = set()
|
||||
if view.ddl:
|
||||
if view.view_definition:
|
||||
try:
|
||||
parser = BigQuerySQLParser(
|
||||
view.ddl, self.config.sql_parser_use_external_process
|
||||
view.view_definition, self.config.sql_parser_use_external_process
|
||||
)
|
||||
tables = parser.get_tables()
|
||||
except Exception as ex:
|
||||
logger.debug(
|
||||
f"View {view.name} definination sql parsing failed on query: {view.ddl}. Edge from physical table to view won't be added. The error was {ex}."
|
||||
f"View {view.name} definination sql parsing failed on query: {view.view_definition}. Edge from physical table to view won't be added. The error was {ex}."
|
||||
)
|
||||
return []
|
||||
|
||||
|
||||
@ -4,8 +4,6 @@ import logging
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, cast
|
||||
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from sqlalchemy import create_engine, inspect
|
||||
from sqlalchemy.engine.reflection import Inspector
|
||||
|
||||
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
|
||||
from datahub.emitter.mcp_builder import wrap_aspect_as_workunit
|
||||
@ -17,12 +15,11 @@ from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
|
||||
BigqueryColumn,
|
||||
BigqueryTable,
|
||||
)
|
||||
from datahub.ingestion.source.ge_data_profiler import (
|
||||
DatahubGEProfiler,
|
||||
GEProfilerRequest,
|
||||
from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest
|
||||
from datahub.ingestion.source.sql.sql_generic_profiler import (
|
||||
GenericProfiler,
|
||||
TableProfilerRequest,
|
||||
)
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile
|
||||
from datahub.metadata.schema_classes import DatasetProfileClass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -33,11 +30,14 @@ class BigqueryProfilerRequest(GEProfilerRequest):
|
||||
profile_table_level_only: bool = False
|
||||
|
||||
|
||||
class BigqueryProfiler:
|
||||
class BigqueryProfiler(GenericProfiler):
|
||||
config: BigQueryV2Config
|
||||
report: BigQueryV2Report
|
||||
|
||||
def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report) -> None:
|
||||
super().__init__(config, report, "bigquery")
|
||||
self.config = config
|
||||
self.report = report
|
||||
self.platform = "bigquery"
|
||||
|
||||
@staticmethod
|
||||
def get_partition_range_from_partition_id(
|
||||
@ -184,9 +184,9 @@ WHERE
|
||||
|
||||
if len(profile_requests) == 0:
|
||||
continue
|
||||
|
||||
table_profile_requests = cast(List[TableProfilerRequest], profile_requests)
|
||||
for request, profile in self.generate_profiles(
|
||||
profile_requests,
|
||||
table_profile_requests,
|
||||
self.config.profiling.max_workers,
|
||||
platform=self.platform,
|
||||
profiler_args=self.get_profile_args(),
|
||||
@ -231,8 +231,11 @@ WHERE
|
||||
dataset_name, table.last_altered, table.size_in_bytes, table.rows_count
|
||||
):
|
||||
profile_table_level_only = True
|
||||
self.report.num_tables_not_eligible_profiling[dataset] = (
|
||||
self.report.num_tables_not_eligible_profiling.get(dataset, 0) + 1
|
||||
self.report.num_tables_not_eligible_profiling[f"{project}.{dataset}"] = (
|
||||
self.report.num_tables_not_eligible_profiling.get(
|
||||
f"{project}.{dataset}", 0
|
||||
)
|
||||
+ 1
|
||||
)
|
||||
|
||||
if not table.columns:
|
||||
@ -276,107 +279,3 @@ WHERE
|
||||
profile_table_level_only=profile_table_level_only,
|
||||
)
|
||||
return profile_request
|
||||
|
||||
def is_dataset_eligible_for_profiling(
|
||||
self,
|
||||
dataset_name: str,
|
||||
last_altered: Optional[datetime.datetime],
|
||||
size_in_bytes: Optional[int],
|
||||
rows_count: Optional[int],
|
||||
) -> bool:
|
||||
threshold_time: Optional[datetime.datetime] = None
|
||||
if self.config.profiling.profile_if_updated_since_days is not None:
|
||||
threshold_time = datetime.datetime.now(
|
||||
datetime.timezone.utc
|
||||
) - datetime.timedelta(self.config.profiling.profile_if_updated_since_days)
|
||||
|
||||
return (
|
||||
(
|
||||
self.config.table_pattern.allowed(dataset_name)
|
||||
and self.config.profile_pattern.allowed(dataset_name)
|
||||
)
|
||||
and (
|
||||
(threshold_time is None)
|
||||
or (last_altered is not None and last_altered >= threshold_time)
|
||||
)
|
||||
and (
|
||||
self.config.profiling.profile_table_size_limit is None
|
||||
or (
|
||||
size_in_bytes is not None
|
||||
and size_in_bytes / (2**30)
|
||||
<= self.config.profiling.profile_table_size_limit
|
||||
) # Note: Profiling is not allowed is size_in_bytes is not available
|
||||
)
|
||||
and (
|
||||
self.config.profiling.profile_table_row_limit is None
|
||||
or (
|
||||
rows_count is not None
|
||||
and rows_count <= self.config.profiling.profile_table_row_limit
|
||||
) # Note: Profiling is not allowed is rows_count is not available
|
||||
)
|
||||
)
|
||||
|
||||
def get_inspectors(self) -> Iterable[Inspector]:
|
||||
# This method can be overridden in the case that you want to dynamically
|
||||
# run on multiple databases.
|
||||
|
||||
url = self.config.get_sql_alchemy_url()
|
||||
logger.debug(f"sql_alchemy_url={url}")
|
||||
engine = create_engine(url, **self.config.options)
|
||||
with engine.connect() as conn:
|
||||
inspector = inspect(conn)
|
||||
yield inspector
|
||||
|
||||
def get_profiler_instance(self) -> "DatahubGEProfiler":
|
||||
logger.debug("Getting profiler instance from bigquery")
|
||||
url = self.config.get_sql_alchemy_url()
|
||||
|
||||
logger.debug(f"sql_alchemy_url={url}")
|
||||
|
||||
engine = create_engine(url, **self.config.options)
|
||||
with engine.connect() as conn:
|
||||
inspector = inspect(conn)
|
||||
|
||||
return DatahubGEProfiler(
|
||||
conn=inspector.bind,
|
||||
report=self.report,
|
||||
config=self.config.profiling,
|
||||
platform=self.platform,
|
||||
)
|
||||
|
||||
def get_profile_args(self) -> Dict:
|
||||
"""Passed down to GE profiler"""
|
||||
return {}
|
||||
|
||||
def generate_profiles(
|
||||
self,
|
||||
requests: List[BigqueryProfilerRequest],
|
||||
max_workers: int,
|
||||
platform: Optional[str] = None,
|
||||
profiler_args: Optional[Dict] = None,
|
||||
) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
|
||||
|
||||
ge_profile_requests: List[GEProfilerRequest] = [
|
||||
cast(GEProfilerRequest, request)
|
||||
for request in requests
|
||||
if not request.profile_table_level_only
|
||||
]
|
||||
table_level_profile_requests: List[BigqueryProfilerRequest] = [
|
||||
request for request in requests if request.profile_table_level_only
|
||||
]
|
||||
for request in table_level_profile_requests:
|
||||
profile = DatasetProfile(
|
||||
timestampMillis=int(datetime.datetime.now().timestamp() * 1000),
|
||||
columnCount=len(request.table.columns),
|
||||
rowCount=request.table.rows_count,
|
||||
sizeInBytes=request.table.size_in_bytes,
|
||||
)
|
||||
yield (request, profile)
|
||||
|
||||
if not ge_profile_requests:
|
||||
return
|
||||
|
||||
ge_profiler = self.get_profiler_instance()
|
||||
yield from ge_profiler.generate_profiles(
|
||||
ge_profile_requests, max_workers, platform, profiler_args
|
||||
)
|
||||
|
||||
@ -102,12 +102,12 @@ class GEProfilingConfig(ConfigModel):
|
||||
)
|
||||
|
||||
profile_table_size_limit: Optional[int] = Field(
|
||||
default=1,
|
||||
default=5,
|
||||
description="Profile tables only if their size is less then specified GBs. If set to `null`, no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`",
|
||||
)
|
||||
|
||||
profile_table_row_limit: Optional[int] = Field(
|
||||
default=50000,
|
||||
default=5000000,
|
||||
description="Profile tables only if their row count is less then specified count. If set to `null`, no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`",
|
||||
)
|
||||
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import dataclasses
|
||||
import datetime
|
||||
import logging
|
||||
from typing import Callable, Dict, Iterable, List, Optional, Tuple, cast
|
||||
from typing import Callable, Iterable, List, Optional, cast
|
||||
|
||||
from snowflake.sqlalchemy import snowdialect
|
||||
from sqlalchemy import create_engine, inspect
|
||||
@ -21,9 +20,14 @@ from datahub.ingestion.source.snowflake.snowflake_schema import (
|
||||
SnowflakeDatabase,
|
||||
SnowflakeTable,
|
||||
)
|
||||
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile
|
||||
from datahub.metadata.schema_classes import DatasetProfileClass
|
||||
from datahub.ingestion.source.snowflake.snowflake_utils import (
|
||||
SnowflakeCommonMixin,
|
||||
SnowflakeCommonProtocol,
|
||||
)
|
||||
from datahub.ingestion.source.sql.sql_generic_profiler import (
|
||||
GenericProfiler,
|
||||
TableProfilerRequest,
|
||||
)
|
||||
|
||||
snowdialect.ischema_names["GEOGRAPHY"] = sqltypes.NullType
|
||||
|
||||
@ -36,14 +40,17 @@ class SnowflakeProfilerRequest(GEProfilerRequest):
|
||||
profile_table_level_only: bool = False
|
||||
|
||||
|
||||
class SnowflakeProfiler(SnowflakeCommonMixin):
|
||||
class SnowflakeProfiler(SnowflakeCommonMixin, GenericProfiler, SnowflakeCommonProtocol):
|
||||
config: SnowflakeV2Config
|
||||
report: SnowflakeV2Report
|
||||
|
||||
def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None:
|
||||
super().__init__(config, report, self.platform)
|
||||
self.config = config
|
||||
self.report = report
|
||||
self.logger = logger
|
||||
|
||||
def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit]:
|
||||
|
||||
# Extra default SQLAlchemy option for better connection pooling and threading.
|
||||
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
|
||||
if self.config.profiling.enabled:
|
||||
@ -74,10 +81,13 @@ class SnowflakeProfiler(SnowflakeCommonMixin):
|
||||
|
||||
if len(profile_requests) == 0:
|
||||
continue
|
||||
|
||||
table_profile_requests = cast(List[TableProfilerRequest], profile_requests)
|
||||
|
||||
for request, profile in self.generate_profiles(
|
||||
db.name,
|
||||
profile_requests,
|
||||
table_profile_requests,
|
||||
self.config.profiling.max_workers,
|
||||
db.name,
|
||||
platform=self.platform,
|
||||
profiler_args=self.get_profile_args(),
|
||||
):
|
||||
@ -139,47 +149,11 @@ class SnowflakeProfiler(SnowflakeCommonMixin):
|
||||
)
|
||||
return profile_request
|
||||
|
||||
def is_dataset_eligible_for_profiling(
|
||||
self,
|
||||
dataset_name: str,
|
||||
last_altered: datetime.datetime,
|
||||
size_in_bytes: int,
|
||||
rows_count: Optional[int],
|
||||
) -> bool:
|
||||
threshold_time: Optional[datetime.datetime] = None
|
||||
if self.config.profiling.profile_if_updated_since_days is not None:
|
||||
threshold_time = datetime.datetime.now(
|
||||
datetime.timezone.utc
|
||||
) - datetime.timedelta(self.config.profiling.profile_if_updated_since_days)
|
||||
def get_profiler_instance(
|
||||
self, db_name: Optional[str] = None
|
||||
) -> "DatahubGEProfiler":
|
||||
assert db_name
|
||||
|
||||
return (
|
||||
(
|
||||
self.config.table_pattern.allowed(dataset_name)
|
||||
and self.config.profile_pattern.allowed(dataset_name)
|
||||
)
|
||||
and (threshold_time is None or last_altered >= threshold_time)
|
||||
and (
|
||||
self.config.profiling.profile_table_size_limit is None
|
||||
or (
|
||||
size_in_bytes is not None
|
||||
and size_in_bytes / (2**30)
|
||||
<= self.config.profiling.profile_table_size_limit
|
||||
)
|
||||
# Note: Profiling is not allowed is size_in_bytes is not available
|
||||
# and self.config.profiling.profile_table_size_limit is set
|
||||
)
|
||||
and (
|
||||
self.config.profiling.profile_table_row_limit is None
|
||||
or (
|
||||
rows_count is not None
|
||||
and rows_count <= self.config.profiling.profile_table_row_limit
|
||||
)
|
||||
# Note: Profiling is not allowed is rows_count is not available
|
||||
# and self.config.profiling.profile_table_row_limit is set
|
||||
)
|
||||
)
|
||||
|
||||
def get_profiler_instance(self, db_name: str) -> "DatahubGEProfiler":
|
||||
url = self.config.get_sql_alchemy_url(
|
||||
database=db_name,
|
||||
username=self.config.username,
|
||||
@ -204,10 +178,6 @@ class SnowflakeProfiler(SnowflakeCommonMixin):
|
||||
platform=self.platform,
|
||||
)
|
||||
|
||||
def get_profile_args(self) -> Dict:
|
||||
"""Passed down to GE profiler"""
|
||||
return {}
|
||||
|
||||
def callable_for_db_connection(self, db_name: str) -> Callable:
|
||||
def get_db_connection():
|
||||
conn = self.config.get_connection()
|
||||
@ -215,38 +185,3 @@ class SnowflakeProfiler(SnowflakeCommonMixin):
|
||||
return conn
|
||||
|
||||
return get_db_connection
|
||||
|
||||
def generate_profiles(
|
||||
self,
|
||||
db_name: str,
|
||||
requests: List[SnowflakeProfilerRequest],
|
||||
max_workers: int,
|
||||
platform: Optional[str] = None,
|
||||
profiler_args: Optional[Dict] = None,
|
||||
) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
|
||||
|
||||
ge_profile_requests: List[GEProfilerRequest] = [
|
||||
cast(GEProfilerRequest, request)
|
||||
for request in requests
|
||||
if not request.profile_table_level_only
|
||||
]
|
||||
table_level_profile_requests: List[SnowflakeProfilerRequest] = [
|
||||
request for request in requests if request.profile_table_level_only
|
||||
]
|
||||
for request in table_level_profile_requests:
|
||||
profile = DatasetProfile(
|
||||
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
|
||||
columnCount=len(request.table.columns),
|
||||
rowCount=request.table.rows_count,
|
||||
sizeInBytes=request.table.size_in_bytes,
|
||||
)
|
||||
yield (request, profile)
|
||||
|
||||
if len(ge_profile_requests) == 0:
|
||||
return
|
||||
|
||||
# Otherwise, if column level profiling is enabled, use GE profiler.
|
||||
ge_profiler = self.get_profiler_instance(db_name)
|
||||
yield from ge_profiler.generate_profiles(
|
||||
ge_profile_requests, max_workers, platform, profiler_args
|
||||
)
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
|
||||
from datahub.ingestion.source_report.sql.snowflake import SnowflakeReport
|
||||
from datahub.ingestion.source_report.usage.snowflake_usage import SnowflakeUsageReport
|
||||
|
||||
|
||||
class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport):
|
||||
class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlReport):
|
||||
|
||||
schemas_scanned: int = 0
|
||||
databases_scanned: int = 0
|
||||
|
||||
@ -8,6 +8,7 @@ from snowflake.connector import SnowflakeConnection
|
||||
|
||||
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
|
||||
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeQueryMixin
|
||||
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
@ -28,13 +29,8 @@ class SnowflakeFK:
|
||||
referred_column_names: List[str]
|
||||
|
||||
|
||||
@dataclass
|
||||
class SnowflakeColumn:
|
||||
name: str
|
||||
ordinal_position: int
|
||||
is_nullable: bool
|
||||
data_type: str
|
||||
comment: Optional[str]
|
||||
@dataclass(frozen=True, eq=True)
|
||||
class SnowflakeColumn(BaseColumn):
|
||||
character_maximum_length: Optional[int]
|
||||
numeric_precision: Optional[int]
|
||||
numeric_scale: Optional[int]
|
||||
@ -60,14 +56,8 @@ class SnowflakeColumn:
|
||||
|
||||
|
||||
@dataclass
|
||||
class SnowflakeTable:
|
||||
name: str
|
||||
created: datetime
|
||||
last_altered: datetime
|
||||
size_in_bytes: int
|
||||
rows_count: int
|
||||
comment: Optional[str]
|
||||
clustering_key: str
|
||||
class SnowflakeTable(BaseTable):
|
||||
clustering_key: Optional[str] = None
|
||||
pk: Optional[SnowflakePK] = None
|
||||
columns: List[SnowflakeColumn] = field(default_factory=list)
|
||||
foreign_keys: List[SnowflakeFK] = field(default_factory=list)
|
||||
@ -75,12 +65,7 @@ class SnowflakeTable:
|
||||
|
||||
|
||||
@dataclass
|
||||
class SnowflakeView:
|
||||
name: str
|
||||
created: datetime
|
||||
comment: Optional[str]
|
||||
view_definition: str
|
||||
last_altered: Optional[datetime] = None
|
||||
class SnowflakeView(BaseView):
|
||||
columns: List[SnowflakeColumn] = field(default_factory=list)
|
||||
|
||||
|
||||
@ -225,6 +210,7 @@ class SnowflakeDataDictionary(SnowflakeQueryMixin):
|
||||
# last_altered=table["last_altered"],
|
||||
comment=table["comment"],
|
||||
view_definition=table["text"],
|
||||
last_altered=table["created_on"],
|
||||
)
|
||||
)
|
||||
return views
|
||||
@ -245,6 +231,7 @@ class SnowflakeDataDictionary(SnowflakeQueryMixin):
|
||||
# last_altered=table["last_altered"],
|
||||
comment=table["comment"],
|
||||
view_definition=table["text"],
|
||||
last_altered=table["created_on"],
|
||||
)
|
||||
)
|
||||
return views
|
||||
|
||||
@ -31,23 +31,12 @@ SNOWFLAKE_DEFAULT_CLOUD = SnowflakeCloudProvider.AWS
|
||||
# Required only for mypy, since we are using mixin classes, and not inheritance.
|
||||
# Reference - https://mypy.readthedocs.io/en/latest/more_types.html#mixin-classes
|
||||
class SnowflakeLoggingProtocol(Protocol):
|
||||
@property
|
||||
def logger(self) -> logging.Logger:
|
||||
...
|
||||
logger: logging.Logger
|
||||
|
||||
|
||||
class SnowflakeCommonProtocol(Protocol):
|
||||
@property
|
||||
def logger(self) -> logging.Logger:
|
||||
...
|
||||
|
||||
@property
|
||||
def config(self) -> SnowflakeV2Config:
|
||||
...
|
||||
|
||||
@property
|
||||
def report(self) -> SnowflakeV2Report:
|
||||
...
|
||||
class SnowflakeCommonProtocol(SnowflakeLoggingProtocol, Protocol):
|
||||
config: SnowflakeV2Config
|
||||
report: SnowflakeV2Report
|
||||
|
||||
def get_dataset_identifier(
|
||||
self, table_name: str, schema_name: str, db_name: str
|
||||
|
||||
@ -1,3 +1,7 @@
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Generic, List, Optional, TypeVar
|
||||
|
||||
from pydantic.fields import Field
|
||||
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
@ -12,6 +16,42 @@ from datahub.ingestion.api.decorators import (
|
||||
from datahub.ingestion.source.sql.sql_common import SQLAlchemyConfig, SQLAlchemySource
|
||||
|
||||
|
||||
@dataclass(frozen=True, eq=True)
|
||||
class BaseColumn:
|
||||
name: str
|
||||
ordinal_position: int
|
||||
is_nullable: bool
|
||||
data_type: str
|
||||
comment: Optional[str]
|
||||
|
||||
|
||||
SqlTableColumn = TypeVar("SqlTableColumn", bound="BaseColumn")
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseTable(Generic[SqlTableColumn]):
|
||||
name: str
|
||||
comment: Optional[str]
|
||||
created: datetime
|
||||
last_altered: datetime
|
||||
size_in_bytes: int
|
||||
rows_count: int
|
||||
columns: List[SqlTableColumn] = field(default_factory=list)
|
||||
ddl: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseView(Generic[SqlTableColumn]):
|
||||
name: str
|
||||
comment: Optional[str]
|
||||
created: datetime
|
||||
last_altered: datetime
|
||||
view_definition: str
|
||||
size_in_bytes: Optional[int] = None
|
||||
rows_count: Optional[int] = None
|
||||
columns: List[SqlTableColumn] = field(default_factory=list)
|
||||
|
||||
|
||||
class SQLAlchemyGenericConfig(SQLAlchemyConfig):
|
||||
|
||||
platform: str = Field(
|
||||
|
||||
@ -0,0 +1,165 @@
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Union, cast
|
||||
|
||||
from sqlalchemy import create_engine, inspect
|
||||
from sqlalchemy.engine.reflection import Inspector
|
||||
|
||||
from datahub.ingestion.source.ge_data_profiler import (
|
||||
DatahubGEProfiler,
|
||||
GEProfilerRequest,
|
||||
)
|
||||
from datahub.ingestion.source.sql.sql_common import SQLAlchemyConfig, SQLSourceReport
|
||||
from datahub.ingestion.source.sql.sql_generic import BaseTable, BaseView
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile
|
||||
from datahub.metadata.schema_classes import DatasetProfileClass
|
||||
from datahub.utilities.stats_collections import TopKDict
|
||||
|
||||
|
||||
@dataclass
|
||||
class DetailedProfilerReportMixin:
|
||||
profiling_skipped_not_updated: TopKDict[str, int] = field(default_factory=TopKDict)
|
||||
profiling_skipped_size_limit: TopKDict[str, int] = field(default_factory=TopKDict)
|
||||
|
||||
profiling_skipped_row_limit: TopKDict[str, int] = field(default_factory=TopKDict)
|
||||
num_tables_not_eligible_profiling: Dict[str, int] = field(default_factory=TopKDict)
|
||||
|
||||
|
||||
class ProfilingSqlReport(DetailedProfilerReportMixin, SQLSourceReport):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class TableProfilerRequest(GEProfilerRequest):
|
||||
table: Union[BaseTable, BaseView]
|
||||
profile_table_level_only: bool = False
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GenericProfiler:
|
||||
def __init__(
|
||||
self, config: SQLAlchemyConfig, report: ProfilingSqlReport, platform: str
|
||||
) -> None:
|
||||
self.config = config
|
||||
self.report = report
|
||||
self.platform = platform
|
||||
|
||||
def generate_profiles(
|
||||
self,
|
||||
requests: List[TableProfilerRequest],
|
||||
max_workers: int,
|
||||
db_name: Optional[str] = None,
|
||||
platform: Optional[str] = None,
|
||||
profiler_args: Optional[Dict] = None,
|
||||
) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
|
||||
|
||||
ge_profile_requests: List[GEProfilerRequest] = [
|
||||
cast(GEProfilerRequest, request)
|
||||
for request in requests
|
||||
if not request.profile_table_level_only
|
||||
]
|
||||
table_level_profile_requests: List[TableProfilerRequest] = [
|
||||
request for request in requests if request.profile_table_level_only
|
||||
]
|
||||
for request in table_level_profile_requests:
|
||||
profile = DatasetProfile(
|
||||
timestampMillis=int(datetime.now().timestamp() * 1000),
|
||||
columnCount=len(request.table.columns),
|
||||
rowCount=request.table.rows_count,
|
||||
sizeInBytes=request.table.size_in_bytes,
|
||||
)
|
||||
yield (request, profile)
|
||||
|
||||
if not ge_profile_requests:
|
||||
return
|
||||
|
||||
# Otherwise, if column level profiling is enabled, use GE profiler.
|
||||
ge_profiler = self.get_profiler_instance(db_name)
|
||||
yield from ge_profiler.generate_profiles(
|
||||
ge_profile_requests, max_workers, platform, profiler_args
|
||||
)
|
||||
|
||||
def get_inspectors(self) -> Iterable[Inspector]:
|
||||
# This method can be overridden in the case that you want to dynamically
|
||||
# run on multiple databases.
|
||||
|
||||
url = self.config.get_sql_alchemy_url()
|
||||
logger.debug(f"sql_alchemy_url={url}")
|
||||
engine = create_engine(url, **self.config.options)
|
||||
with engine.connect() as conn:
|
||||
inspector = inspect(conn)
|
||||
yield inspector
|
||||
|
||||
def get_profiler_instance(
|
||||
self, db_name: Optional[str] = None
|
||||
) -> "DatahubGEProfiler":
|
||||
logger.debug(f"Getting profiler instance from {self.platform}")
|
||||
url = self.config.get_sql_alchemy_url()
|
||||
|
||||
logger.debug(f"sql_alchemy_url={url}")
|
||||
|
||||
engine = create_engine(url, **self.config.options)
|
||||
with engine.connect() as conn:
|
||||
inspector = inspect(conn)
|
||||
|
||||
return DatahubGEProfiler(
|
||||
conn=inspector.bind,
|
||||
report=self.report,
|
||||
config=self.config.profiling,
|
||||
platform=self.platform,
|
||||
)
|
||||
|
||||
def is_dataset_eligible_for_profiling(
|
||||
self,
|
||||
dataset_name: str,
|
||||
last_altered: Optional[datetime],
|
||||
size_in_bytes: Optional[int],
|
||||
rows_count: Optional[int],
|
||||
) -> bool:
|
||||
threshold_time: Optional[datetime] = None
|
||||
if self.config.profiling.profile_if_updated_since_days is not None:
|
||||
threshold_time = datetime.now(timezone.utc) - timedelta(
|
||||
self.config.profiling.profile_if_updated_since_days
|
||||
)
|
||||
|
||||
if not self.config.table_pattern.allowed(
|
||||
dataset_name
|
||||
) or not self.config.profile_pattern.allowed(dataset_name):
|
||||
return False
|
||||
|
||||
schema_name = dataset_name.rsplit(".", 1)[0]
|
||||
if (threshold_time is not None) and (
|
||||
last_altered is not None and last_altered < threshold_time
|
||||
):
|
||||
self.report.profiling_skipped_not_updated[schema_name] = (
|
||||
self.report.profiling_skipped_not_updated.get(schema_name, 0) + 1
|
||||
)
|
||||
return False
|
||||
|
||||
if self.config.profiling.profile_table_size_limit is not None and (
|
||||
size_in_bytes is None
|
||||
or size_in_bytes / (2**30)
|
||||
> self.config.profiling.profile_table_size_limit
|
||||
):
|
||||
self.report.profiling_skipped_size_limit[schema_name] = (
|
||||
self.report.profiling_skipped_size_limit.get(schema_name, 0) + 1
|
||||
)
|
||||
return False
|
||||
|
||||
if self.config.profiling.profile_table_row_limit is not None and (
|
||||
rows_count is None
|
||||
or rows_count > self.config.profiling.profile_table_row_limit
|
||||
):
|
||||
self.report.profiling_skipped_row_limit[schema_name] = (
|
||||
self.report.profiling_skipped_row_limit.get(schema_name, 0) + 1
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get_profile_args(self) -> Dict:
|
||||
"""Passed down to GE profiler"""
|
||||
return {}
|
||||
@ -2,7 +2,7 @@ from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
|
||||
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
|
||||
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
|
||||
|
||||
|
||||
@ -12,7 +12,7 @@ class BaseSnowflakeReport(BaseTimeWindowReport):
|
||||
|
||||
|
||||
@dataclass
|
||||
class SnowflakeReport(BaseSnowflakeReport, SQLSourceReport):
|
||||
class SnowflakeReport(BaseSnowflakeReport, ProfilingSqlReport):
|
||||
num_table_to_table_edges_scanned: int = 0
|
||||
num_table_to_view_edges_scanned: int = 0
|
||||
num_view_to_table_edges_scanned: int = 0
|
||||
|
||||
@ -24,7 +24,7 @@ FROM
|
||||
created=datetime.datetime.now(),
|
||||
last_altered=datetime.datetime.now(),
|
||||
comment="",
|
||||
ddl=ddl,
|
||||
view_definition=ddl,
|
||||
)
|
||||
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
|
||||
assert 1 == len(tables)
|
||||
@ -42,7 +42,7 @@ def test_parse_view_lineage_with_two_part_table_name():
|
||||
created=datetime.datetime.now(),
|
||||
last_altered=datetime.datetime.now(),
|
||||
comment="",
|
||||
ddl=ddl,
|
||||
view_definition=ddl,
|
||||
)
|
||||
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
|
||||
assert 1 == len(tables)
|
||||
@ -60,7 +60,7 @@ def test_one_part_table():
|
||||
created=datetime.datetime.now(),
|
||||
last_altered=datetime.datetime.now(),
|
||||
comment="",
|
||||
ddl=ddl,
|
||||
view_definition=ddl,
|
||||
)
|
||||
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
|
||||
assert 1 == len(tables)
|
||||
@ -78,7 +78,7 @@ def test_create_statement_with_multiple_table():
|
||||
created=datetime.datetime.now(),
|
||||
last_altered=datetime.datetime.now(),
|
||||
comment="",
|
||||
ddl=ddl,
|
||||
view_definition=ddl,
|
||||
)
|
||||
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
|
||||
tables.sort(key=lambda e: e.get_table_name())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user