feat(ingest): Create zero usage aspects (#8205)

Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
This commit is contained in:
Andrew Sikowitz 2023-06-22 17:07:50 -04:00 committed by GitHub
parent 620d245d57
commit aa5e02d0ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 974 additions and 238 deletions

View File

@ -1,6 +1,6 @@
import enum
from datetime import datetime, timedelta, timezone
from typing import Any, Dict
from typing import Any, Dict, List
import pydantic
from pydantic.fields import Field
@ -62,3 +62,36 @@ class BaseTimeWindowConfig(ConfigModel):
'timezone is not UTC; try adding a "Z" to the value e.g. "2021-07-20T00:00:00Z"'
)
return v
def buckets(self) -> List[datetime]:
"""Returns list of timestamps for each DatasetUsageStatistics bucket.
Includes all buckets in the time window, including partially contained buckets.
"""
bucket_timedelta = get_bucket_duration_delta(self.bucket_duration)
curr_bucket = get_time_bucket(self.start_time, self.bucket_duration)
buckets = []
while curr_bucket < self.end_time:
buckets.append(curr_bucket)
curr_bucket += bucket_timedelta
return buckets
def majority_buckets(self) -> List[datetime]:
"""Returns list of timestamps for each DatasetUsageStatistics bucket.
Includes only buckets in the time window for which a majority of the bucket is ingested.
"""
bucket_timedelta = get_bucket_duration_delta(self.bucket_duration)
curr_bucket = get_time_bucket(self.start_time, self.bucket_duration)
buckets = []
while curr_bucket < self.end_time:
start = max(self.start_time, curr_bucket)
end = min(self.end_time, curr_bucket + bucket_timedelta)
if end - start >= bucket_timedelta / 2:
buckets.append(curr_bucket)
curr_bucket += bucket_timedelta
return buckets

View File

@ -1,4 +1,5 @@
import logging
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Callable,
@ -13,6 +14,7 @@ from typing import (
Union,
)
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
@ -20,13 +22,17 @@ from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsClass,
BrowsePathsV2Class,
ChangeTypeClass,
ContainerClass,
DatasetUsageStatisticsClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
StatusClass,
TagKeyClass,
TimeWindowSizeClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.tag_urn import TagUrn
from datahub.utilities.urns.urn import guess_entity_type
from datahub.utilities.urns.urn_iter import list_urns
@ -149,11 +155,11 @@ def auto_materialize_referenced_tags(
for wu in stream:
for urn in list_urns(wu.metadata):
if guess_entity_type(urn) == "tag":
if guess_entity_type(urn) == TagUrn.ENTITY_TYPE:
referenced_tags.add(urn)
urn = wu.get_urn()
if guess_entity_type(urn) == "tag":
if guess_entity_type(urn) == TagUrn.ENTITY_TYPE:
tags_with_aspects.add(urn)
yield wu
@ -274,6 +280,64 @@ def auto_browse_path_v2(
telemetry.telemetry_instance.ping("incorrect_browse_path_v2", properties)
def auto_empty_dataset_usage_statistics(
stream: Iterable[MetadataWorkUnit],
*,
dataset_urns: Set[str],
config: BaseTimeWindowConfig,
all_buckets: bool = False, # TODO: Enable when CREATE changeType is supported for timeseries aspects
) -> Iterable[MetadataWorkUnit]:
"""Emit empty usage statistics aspect for all dataset_urns ingested with no usage."""
buckets = config.buckets() if all_buckets else config.majority_buckets()
bucket_timestamps = [int(bucket.timestamp() * 1000) for bucket in buckets]
# Maps time bucket -> urns with usage statistics for that bucket
usage_statistics_urns: Dict[int, Set[str]] = {ts: set() for ts in bucket_timestamps}
invalid_timestamps = set()
for wu in stream:
yield wu
if not wu.is_primary_source:
continue
urn = wu.get_urn()
if guess_entity_type(urn) == DatasetUrn.ENTITY_TYPE:
dataset_urns.add(urn)
usage_aspect = wu.get_aspect_of_type(DatasetUsageStatisticsClass)
if usage_aspect:
if usage_aspect.timestampMillis in bucket_timestamps:
usage_statistics_urns[usage_aspect.timestampMillis].add(urn)
elif all_buckets:
invalid_timestamps.add(usage_aspect.timestampMillis)
if invalid_timestamps:
logger.warning(
f"Usage statistics with unexpected timestamps, bucket_duration={config.bucket_duration}:\n"
", ".join(
str(datetime.fromtimestamp(ts, tz=timezone.utc))
for ts in invalid_timestamps
)
)
for bucket in bucket_timestamps:
for urn in dataset_urns - usage_statistics_urns[bucket]:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DatasetUsageStatisticsClass(
timestampMillis=bucket,
eventGranularity=TimeWindowSizeClass(unit=config.bucket_duration),
uniqueUserCount=0,
totalSqlQueries=0,
topSqlQueries=[],
userCounts=[],
fieldCounts=[],
),
changeType=ChangeTypeClass.CREATE
if all_buckets
else ChangeTypeClass.UPSERT,
).as_workunit()
def _batch_workunits_by_urn(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[Tuple[str, List[MetadataWorkUnit]]]:

View File

@ -154,6 +154,10 @@ def cleanup(config: BigQueryV2Config) -> None:
)
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default")
@capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration")
@capability(
SourceCapability.USAGE_STATS,
"Enabled by default, can be disabled via configuration `include_usage_statistics`",
)
@capability(
SourceCapability.DELETION_DETECTION,
"Optionally enabled via `stateful_ingestion.remove_stale_metadata`",
@ -218,7 +222,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
# For database, schema, tables, views, etc
self.lineage_extractor = BigqueryLineageExtractor(config, self.report)
self.usage_extractor = BigQueryUsageExtractor(config, self.report)
self.usage_extractor = BigQueryUsageExtractor(
config, self.report, dataset_urn_builder=self.gen_dataset_urn_from_ref
)
self.domain_registry: Optional[DomainRegistry] = None
if self.config.domain:
@ -330,7 +336,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
project_ids: List[str],
report: BigQueryV2Report,
) -> CapabilityReport:
usage_extractor = BigQueryUsageExtractor(connection_conf, report)
usage_extractor = BigQueryUsageExtractor(
connection_conf, report, lambda ref: ""
)
for project_id in project_ids:
try:
logger.info(f"Usage capability test for project {project_id}")
@ -488,7 +496,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
yield from self._process_project(conn, project_id)
if self._should_ingest_usage():
yield from self.usage_extractor.run(
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.table_refs
)
@ -1060,12 +1068,18 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
def gen_dataset_urn(self, project_id: str, dataset_name: str, table: str) -> str:
datahub_dataset_name = BigqueryTableIdentifier(project_id, dataset_name, table)
dataset_urn = make_dataset_urn(
return make_dataset_urn(
self.platform,
str(datahub_dataset_name),
self.config.env,
)
return dataset_urn
def gen_dataset_urn_from_ref(self, ref: BigQueryTableRef) -> str:
return self.gen_dataset_urn(
ref.table_identifier.project_id,
ref.table_identifier.dataset,
ref.table_identifier.table,
)
def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
schema_fields: List[SchemaField] = []

View File

@ -29,6 +29,7 @@ from datahub.configuration.time_window_config import get_time_bucket
from datahub.emitter.mce_builder import make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BQ_AUDIT_V2,
@ -371,9 +372,15 @@ class BigQueryUsageExtractor:
:::
"""
def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report):
def __init__(
self,
config: BigQueryV2Config,
report: BigQueryV2Report,
dataset_urn_builder: Callable[[BigQueryTableRef], str],
):
self.config: BigQueryV2Config = config
self.report: BigQueryV2Report = report
self.dataset_urn_builder = dataset_urn_builder
# Replace hash of query with uuid if there are hash conflicts
self.uuid_to_query: Dict[str, str] = {}
@ -384,13 +391,13 @@ class BigQueryUsageExtractor:
and self.config.table_pattern.allowed(table_ref.table_identifier.table)
)
def run(
def get_usage_workunits(
self, projects: Iterable[str], table_refs: Collection[str]
) -> Iterable[MetadataWorkUnit]:
events = self._get_usage_events(projects)
yield from self._run(events, table_refs)
yield from self._get_workunits_internal(events, table_refs)
def _run(
def _get_workunits_internal(
self, events: Iterable[AuditEvent], table_refs: Collection[str]
) -> Iterable[MetadataWorkUnit]:
try:
@ -404,7 +411,14 @@ class BigQueryUsageExtractor:
usage_state, table_refs
)
yield from self._generate_usage_workunits(usage_state)
yield from auto_empty_dataset_usage_statistics(
self._generate_usage_workunits(usage_state),
config=self.config,
dataset_urns={
self.dataset_urn_builder(BigQueryTableRef.from_string_name(ref))
for ref in table_refs
},
)
usage_state.report_disk_usage(self.report)
except Exception as e:
logger.error("Error processing usage", exc_info=True)
@ -526,9 +540,7 @@ class BigQueryUsageExtractor:
user_freq=entry.user_freq,
column_freq=entry.column_freq,
bucket_duration=self.config.bucket_duration,
resource_urn_builder=lambda resource: resource.to_urn(
self.config.env
),
resource_urn_builder=self.dataset_urn_builder,
top_n_queries=self.config.usage.top_n_queries,
format_sql_queries=self.config.usage.format_sql_queries,
)

View File

@ -108,7 +108,10 @@ logger: logging.Logger = logging.getLogger(__name__)
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default")
@capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration")
@capability(SourceCapability.USAGE_STATS, "Optionally enabled via configuration")
@capability(
SourceCapability.USAGE_STATS,
"Enabled by default, can be disabled via configuration `include_usage_statistics`",
)
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
"""
@ -593,7 +596,6 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
database=database,
schema=table.schema,
sub_type=DatasetSubTypes.TABLE,
tags_to_add=[],
custom_properties=custom_properties,
)
@ -609,17 +611,11 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
database=get_db_name(self.config),
schema=schema,
sub_type=DatasetSubTypes.VIEW,
tags_to_add=[],
custom_properties={},
)
datahub_dataset_name = f"{database}.{schema}.{view.name}"
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datahub_dataset_name,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
dataset_urn = self.gen_dataset_urn(datahub_dataset_name)
if view.ddl:
view_properties_aspect = ViewProperties(
materialized=view.type == "VIEW_MATERIALIZED",
@ -683,6 +679,14 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
entityUrn=dataset_urn, aspect=schema_metadata
).as_workunit()
def gen_dataset_urn(self, datahub_dataset_name: str) -> str:
return make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datahub_dataset_name,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
# TODO: Move to common
def gen_dataset_workunits(
self,
@ -690,16 +694,10 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
database: str,
schema: str,
sub_type: str,
tags_to_add: Optional[List[str]] = None,
custom_properties: Optional[Dict[str, str]] = None,
) -> Iterable[MetadataWorkUnit]:
datahub_dataset_name = f"{database}.{schema}.{table.name}"
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datahub_dataset_name,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
dataset_urn = self.gen_dataset_urn(datahub_dataset_name)
yield from self.gen_schema_metadata(
dataset_urn, table, str(datahub_dataset_name)
@ -857,12 +855,12 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
return
with PerfTimer() as timer:
usage_extractor = RedshiftUsageExtractor(
yield from RedshiftUsageExtractor(
config=self.config,
connection=connection,
report=self.report,
)
yield from usage_extractor.generate_usage(all_tables=all_tables)
dataset_urn_builder=self.gen_dataset_urn,
).get_usage_workunits(all_tables=all_tables)
self.report.usage_extraction_sec[database] = round(
timer.elapsed_seconds(), 2
@ -917,12 +915,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
)
continue
datahub_dataset_name = f"{database}.{schema}.{table.name}"
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datahub_dataset_name,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
dataset_urn = self.gen_dataset_urn(datahub_dataset_name)
lineage_info = self.lineage_extractor.get_lineage(
table,
@ -937,12 +930,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
for schema in self.db_views[database]:
for view in self.db_views[database][schema]:
datahub_dataset_name = f"{database}.{schema}.{view.name}"
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datahub_dataset_name,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
dataset_urn = self.gen_dataset_urn(datahub_dataset_name)
lineage_info = self.lineage_extractor.get_lineage(
view,
dataset_urn,

View File

@ -2,7 +2,7 @@ import collections
import logging
import time
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Union
from typing import Callable, Dict, Iterable, List, Optional, Union
import pydantic.error_wrappers
import redshift_connector
@ -12,6 +12,7 @@ from pydantic.main import BaseModel
import datahub.emitter.mce_builder as builder
from datahub.configuration.time_window_config import get_time_bucket
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.redshift_schema import (
@ -22,7 +23,6 @@ from datahub.ingestion.source.redshift.report import RedshiftReport
from datahub.ingestion.source.usage.usage_common import GenericAggregatedDataset
from datahub.metadata.schema_classes import OperationClass, OperationTypeClass
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.urns.dataset_urn import DatasetUrn
logger = logging.getLogger(__name__)
@ -169,19 +169,34 @@ class RedshiftUsageExtractor:
config: RedshiftConfig,
connection: redshift_connector.Connection,
report: RedshiftReport,
dataset_urn_builder: Callable[[str], str],
):
self.config = config
self.report = report
self.connection = connection
self.dataset_urn_builder = dataset_urn_builder
def generate_usage(
def get_usage_workunits(
self, all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]]
) -> Iterable[MetadataWorkUnit]:
yield from auto_empty_dataset_usage_statistics(
self._get_workunits_internal(all_tables),
config=self.config,
dataset_urns={
self.dataset_urn_builder(f"{database}.{schema}.{table.name}")
for database in all_tables
for schema in all_tables[database]
for table in all_tables[database][schema]
},
)
def _get_workunits_internal(
self, all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]]
) -> Iterable[MetadataWorkUnit]:
self.report.num_usage_workunits_emitted = 0
self.report.num_usage_stat_skipped = 0
self.report.num_operational_stats_skipped = 0
"""Gets Redshift usage stats as work units"""
if self.config.include_operational_stats:
with PerfTimer() as timer:
# Generate operation aspect workunits
@ -320,7 +335,6 @@ class RedshiftUsageExtractor:
assert event.operation_type in ["insert", "delete"]
resource: str = f"{event.database}.{event.schema_}.{event.table}"
reported_time: int = int(time.time() * 1000)
last_updated_timestamp: int = int(event.endtime.timestamp() * 1000)
user_email: str = event.username
@ -334,15 +348,10 @@ class RedshiftUsageExtractor:
else OperationTypeClass.DELETE
),
)
dataset_urn = DatasetUrn.create_from_ids(
platform_id="redshift",
table_name=resource.lower(),
platform_instance=self.config.platform_instance,
env=self.config.env,
)
resource: str = f"{event.database}.{event.schema_}.{event.table}".lower()
yield MetadataChangeProposalWrapper(
entityUrn=str(dataset_urn), aspect=operation_aspect
entityUrn=self.dataset_urn_builder(resource), aspect=operation_aspect
).as_workunit()
self.report.num_operational_stats_workunits_emitted += 1
@ -354,7 +363,7 @@ class RedshiftUsageExtractor:
floored_ts: datetime = get_time_bucket(
event.starttime, self.config.bucket_duration
)
resource: str = f"{event.database}.{event.schema_}.{event.table}"
resource: str = f"{event.database}.{event.schema_}.{event.table}".lower()
# Get a reference to the bucket value(or initialize not yet in dict) and update it.
agg_bucket: AggregatedDataset = datasets[floored_ts].setdefault(
resource,
@ -378,12 +387,7 @@ class RedshiftUsageExtractor:
def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit:
return agg.make_usage_workunit(
self.config.bucket_duration,
lambda resource: builder.make_dataset_urn_with_platform_instance(
"redshift",
resource.lower(),
self.config.platform_instance,
self.config.env,
),
self.dataset_urn_builder,
self.config.top_n_queries,
self.config.format_sql_queries,
self.config.include_top_n_queries,

View File

@ -2,7 +2,7 @@ import json
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, FrozenSet, Iterable, List, Optional, Set
from typing import Callable, Dict, FrozenSet, Iterable, List, Optional, Set
from pydantic import Field
from pydantic.error_wrappers import ValidationError
@ -177,15 +177,20 @@ class SnowflakeLineageExtractor(
Edition Note - Snowflake Standard Edition does not have Access History Feature. So it does not support lineage extraction for edges 3, 4, 5 mentioned above.
"""
def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None:
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
dataset_urn_builder: Callable[[str], str],
) -> None:
self._lineage_map: Dict[str, SnowflakeTableLineage] = defaultdict(
SnowflakeTableLineage
)
self._external_lineage_map: Dict[str, Set[str]] = defaultdict(set)
self.config = config
self.platform = "snowflake"
self.report = report
self.logger = logger
self.dataset_urn_builder = dataset_urn_builder
self.connection: Optional[SnowflakeConnection] = None
def get_workunits(
@ -228,31 +233,21 @@ class SnowflakeLineageExtractor(
def get_table_upstream_workunits(self, discovered_tables):
if self.config.include_table_lineage:
for dataset_name in discovered_tables:
dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
upstream_lineage = self._get_upstream_lineage_info(dataset_name)
if upstream_lineage is not None:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=upstream_lineage
entityUrn=self.dataset_urn_builder(dataset_name),
aspect=upstream_lineage,
).as_workunit()
def get_view_upstream_workunits(self, discovered_views):
if self.config.include_view_lineage:
for view_name in discovered_views:
dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
view_name,
self.config.platform_instance,
self.config.env,
)
upstream_lineage = self._get_upstream_lineage_info(view_name)
if upstream_lineage is not None:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=upstream_lineage
entityUrn=self.dataset_urn_builder(view_name),
aspect=upstream_lineage,
).as_workunit()
def _get_upstream_lineage_info(
@ -267,17 +262,13 @@ class SnowflakeLineageExtractor(
upstream_tables: List[UpstreamClass] = []
finegrained_lineages: List[FineGrainedLineage] = []
dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
# Populate the table-lineage in aspect
self.update_upstream_tables_lineage(upstream_tables, lineage)
# Populate the column-lineage in aspect
self.update_upstream_columns_lineage(dataset_urn, finegrained_lineages, lineage)
self.update_upstream_columns_lineage(
self.dataset_urn_builder(dataset_name), finegrained_lineages, lineage
)
# Populate the external-table-lineage(s3->snowflake) in aspect
self.update_external_tables_lineage(upstream_tables, external_lineage)
@ -579,14 +570,8 @@ class SnowflakeLineageExtractor(
lineage.upstreamTables.values(), key=lambda x: x.upstreamDataset
):
upstream_table_name = lineage_entry.upstreamDataset
upstream_table_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
upstream_table_name,
self.config.platform_instance,
self.config.env,
)
upstream_table = UpstreamClass(
dataset=upstream_table_urn,
dataset=self.dataset_urn_builder(upstream_table_name),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_tables.append(upstream_table)
@ -657,15 +642,9 @@ class SnowflakeLineageExtractor(
upstream_dataset_name = self.get_dataset_identifier_from_qualified_name(
upstream_col.objectName
)
upstream_dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
upstream_dataset_name,
self.config.platform_instance,
self.config.env,
)
column_upstreams.append(
builder.make_schema_field_urn(
upstream_dataset_urn,
self.dataset_urn_builder(upstream_dataset_name),
self.snowflake_identifier(upstream_col.columnName),
)
)

View File

@ -2,7 +2,7 @@ import json
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, FrozenSet, Iterable, List, Optional, Set
from typing import Callable, Dict, FrozenSet, Iterable, List, Optional, Set
from snowflake.connector import SnowflakeConnection
@ -77,12 +77,17 @@ class SnowflakeLineageExtractor(
Edition Note - Snowflake Standard Edition does not have Access History Feature. So it does not support lineage extraction for edges 3, 4, 5 mentioned above.
"""
def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None:
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
dataset_urn_builder: Callable[[str], str],
) -> None:
self._external_lineage_map: Dict[str, Set[str]] = defaultdict(set)
self.config = config
self.platform = "snowflake"
self.report = report
self.logger = logger
self.dataset_urn_builder = dataset_urn_builder
self.connection: Optional[SnowflakeConnection] = None
def get_workunits(
@ -188,12 +193,6 @@ class SnowflakeLineageExtractor(
def _create_upstream_lineage_workunit(
self, dataset_name, upstreams, fine_upstreams=[]
):
dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
logger.debug(
f"Upstream lineage of '{dataset_name}': {[u.dataset for u in upstreams]}"
)
@ -209,7 +208,7 @@ class SnowflakeLineageExtractor(
or None,
)
return MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=upstream_lineage
entityUrn=self.dataset_urn_builder(dataset_name), aspect=upstream_lineage
).as_workunit()
def get_upstreams_from_query_result_row(self, dataset_name, db_row):
@ -226,14 +225,9 @@ class SnowflakeLineageExtractor(
and "UPSTREAM_COLUMNS" in db_row
and db_row["UPSTREAM_COLUMNS"] is not None
):
dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
fine_upstreams = self.map_query_result_fine_upstreams(
dataset_urn, json.loads(db_row["UPSTREAM_COLUMNS"])
self.dataset_urn_builder(dataset_name),
json.loads(db_row["UPSTREAM_COLUMNS"]),
)
# Populate the external-table-lineage(s3->snowflake), if present
@ -379,12 +373,7 @@ class SnowflakeLineageExtractor(
):
upstreams.append(
UpstreamClass(
dataset=builder.make_dataset_urn_with_platform_instance(
self.platform,
upstream_name,
self.config.platform_instance,
self.config.env,
),
dataset=self.dataset_urn_builder(upstream_name),
type=DatasetLineageTypeClass.TRANSFORMED,
)
)
@ -495,15 +484,9 @@ class SnowflakeLineageExtractor(
upstream_dataset_name = self.get_dataset_identifier_from_qualified_name(
upstream_col.objectName
)
upstream_dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
upstream_dataset_name,
self.config.platform_instance,
self.config.env,
)
column_upstreams.append(
builder.make_schema_field_urn(
upstream_dataset_urn,
self.dataset_urn_builder(upstream_dataset_name),
self.snowflake_identifier(upstream_col.columnName),
)
)

View File

@ -2,16 +2,14 @@ import json
import logging
import time
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Optional
from typing import Any, Callable, Dict, Iterable, List, Optional
import pydantic
from snowflake.connector import SnowflakeConnection
from datahub.emitter.mce_builder import (
make_dataset_urn_with_platform_instance,
make_user_urn,
)
from datahub.emitter.mce_builder import make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.constants import SnowflakeEdition
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
@ -104,13 +102,19 @@ class SnowflakeJoinedAccessEvent(PermissiveModel):
class SnowflakeUsageExtractor(
SnowflakeQueryMixin, SnowflakeConnectionMixin, SnowflakeCommonMixin
):
def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None:
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
dataset_urn_builder: Callable[[str], str],
) -> None:
self.config: SnowflakeV2Config = config
self.report: SnowflakeV2Report = report
self.dataset_urn_builder = dataset_urn_builder
self.logger = logger
self.connection: Optional[SnowflakeConnection] = None
def get_workunits(
def get_usage_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
self.connection = self.create_connection()
@ -138,7 +142,14 @@ class SnowflakeUsageExtractor(
# Now, we report the usage as well as operation metadata even if user email is absent
if self.config.include_usage_stats:
yield from self.get_usage_workunits(discovered_datasets)
yield from auto_empty_dataset_usage_statistics(
self._get_workunits_internal(discovered_datasets),
config=self.config,
dataset_urns={
self.dataset_urn_builder(dataset_identifier)
for dataset_identifier in discovered_datasets
},
)
if self.config.include_operational_stats:
# Generate the operation workunits.
@ -148,7 +159,7 @@ class SnowflakeUsageExtractor(
event, discovered_datasets
)
def get_usage_workunits(
def _get_workunits_internal(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
with PerfTimer() as timer:
@ -210,15 +221,9 @@ class SnowflakeUsageExtractor(
userCounts=self._map_user_counts(json.loads(row["USER_COUNTS"])),
fieldCounts=self._map_field_counts(json.loads(row["FIELD_COUNTS"])),
)
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_identifier,
self.config.platform_instance,
self.config.env,
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=stats
entityUrn=self.dataset_urn_builder(dataset_identifier), aspect=stats
).as_workunit()
except Exception as e:
logger.debug(
@ -368,12 +373,6 @@ class SnowflakeUsageExtractor(
)
continue
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_identifier,
self.config.platform_instance,
self.config.env,
)
operation_aspect = OperationClass(
timestampMillis=reported_time,
lastUpdatedTimestamp=last_updated_timestamp,
@ -384,7 +383,7 @@ class SnowflakeUsageExtractor(
else None,
)
mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
entityUrn=self.dataset_urn_builder(dataset_identifier),
aspect=operation_aspect,
)
wu = MetadataWorkUnit(

View File

@ -195,7 +195,7 @@ SNOWFLAKE_FIELD_TYPE_MAPPINGS = {
)
@capability(
SourceCapability.USAGE_STATS,
"Enabled by default, can be disabled via configuration `include_usage_stats",
"Enabled by default, can be disabled via configuration `include_usage_stats`",
)
@capability(
SourceCapability.DELETION_DETECTION,
@ -245,14 +245,17 @@ class SnowflakeV2Source(
# For lineage
if self.config.use_legacy_lineage_method:
self.lineage_extractor = SnowflakeLineageLegacyExtractor(
config, self.report
config, self.report, dataset_urn_builder=self.gen_dataset_urn
)
else:
self.lineage_extractor = SnowflakeLineageExtractor(config, self.report)
self.lineage_extractor = SnowflakeLineageExtractor(
config, self.report, dataset_urn_builder=self.gen_dataset_urn
)
if config.include_usage_stats or config.include_operational_stats:
# For usage stats
self.usage_extractor = SnowflakeUsageExtractor(config, self.report)
self.usage_extractor = SnowflakeUsageExtractor(
config, self.report, dataset_urn_builder=self.gen_dataset_urn
)
self.tag_extractor = SnowflakeTagExtractor(
config, self.data_dictionary, self.report
@ -576,7 +579,7 @@ class SnowflakeV2Source(
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)
yield from self.usage_extractor.get_workunits(discovered_datasets)
yield from self.usage_extractor.get_usage_workunits(discovered_datasets)
def report_warehouse_failure(self):
if self.config.warehouse is not None:
@ -976,6 +979,14 @@ class SnowflakeV2Source(
yield from self.gen_tag_workunits(tag)
def gen_dataset_urn(self, dataset_identifier: str) -> str:
return make_dataset_urn_with_platform_instance(
platform=self.platform,
name=dataset_identifier,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
def gen_dataset_workunits(
self,
table: Union[SnowflakeTable, SnowflakeView],
@ -983,12 +994,7 @@ class SnowflakeV2Source(
db_name: str,
) -> Iterable[MetadataWorkUnit]:
dataset_name = self.get_dataset_identifier(table.name, schema_name, db_name)
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
dataset_urn = self.gen_dataset_urn(dataset_name)
status = Status(removed=False)
yield MetadataChangeProposalWrapper(

View File

@ -196,7 +196,9 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
table_urn_builder=self.gen_dataset_urn,
user_urn_builder=self.gen_user_urn,
)
yield from usage_extractor.run(self.table_refs | self.view_refs)
yield from usage_extractor.get_usage_workunits(
self.table_refs | self.view_refs
)
if self.config.profiling.enabled:
assert wait_on_warehouse

View File

@ -10,6 +10,7 @@ from databricks.sdk.service.sql import QueryStatementType
from sqllineage.runner import LineageRunner
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig
from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy
@ -62,20 +63,24 @@ class UnityCatalogUsageExtractor:
)
return self._spark_sql_parser
def run(self, table_refs: Set[TableReference]) -> Iterable[MetadataWorkUnit]:
def get_usage_workunits(
self, table_refs: Set[TableReference]
) -> Iterable[MetadataWorkUnit]:
try:
table_map = defaultdict(list)
for ref in table_refs:
table_map[ref.table].append(ref)
table_map[f"{ref.schema}.{ref.table}"].append(ref)
table_map[ref.qualified_table_name].append(ref)
yield from self._generate_workunits(table_map)
yield from self._get_workunits_internal(table_refs)
except Exception as e:
logger.error("Error processing usage", exc_info=True)
self.report.report_warning("usage-extraction", str(e))
def _generate_workunits(self, table_map: TableMap) -> Iterable[MetadataWorkUnit]:
def _get_workunits_internal(
self, table_refs: Set[TableReference]
) -> Iterable[MetadataWorkUnit]:
table_map = defaultdict(list)
for ref in table_refs:
table_map[ref.table].append(ref)
table_map[f"{ref.schema}.{ref.table}"].append(ref)
table_map[ref.qualified_table_name].append(ref)
for query in self._get_queries():
self.report.num_queries += 1
table_info = self._parse_query(query, table_map)
@ -100,9 +105,13 @@ class UnityCatalogUsageExtractor:
)
return
yield from self.usage_aggregator.generate_workunits(
resource_urn_builder=self.table_urn_builder,
user_urn_builder=self.user_urn_builder,
yield from auto_empty_dataset_usage_statistics(
self.usage_aggregator.generate_workunits(
resource_urn_builder=self.table_urn_builder,
user_urn_builder=self.user_urn_builder,
),
dataset_urns={self.table_urn_builder(ref) for ref in table_refs},
config=self.config,
)
def _generate_operation_workunit(

View File

@ -1,26 +1,52 @@
[
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1629795600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"INSERT\", \"lastUpdatedTimestamp\": 1631664000000}",
"contentType": "application/json"
"json": {
"timestampMillis": 1631696400000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:test-name",
"operationType": "INSERT",
"lastUpdatedTimestamp": 1631664000000
}
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select userid from users\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 1, \"userEmail\": \"test-name@acryl.io\"}], \"fieldCounts\": []}",
"contentType": "application/json"
"json": {
"timestampMillis": 1631577600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 1,
"totalSqlQueries": 1,
"topSqlQueries": [
"select userid from users"
],
"userCounts": [
{
"user": "urn:li:corpuser:test-name",
"count": 1,
"userEmail": "test-name@acryl.io"
}
],
"fieldCounts": []
}
}
}
]

View File

@ -1,74 +1,152 @@
[
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1629795600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"INSERT\", \"lastUpdatedTimestamp\": 1631664000000}",
"contentType": "application/json"
"json": {
"timestampMillis": 1631696400000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:test-name",
"operationType": "INSERT",
"lastUpdatedTimestamp": 1631664000000
}
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,db1.schema1.category,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1629795600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"INSERT\", \"lastUpdatedTimestamp\": 1631664000000}",
"contentType": "application/json"
"json": {
"timestampMillis": 1631696400000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:real_shirshanka",
"operationType": "INSERT",
"lastUpdatedTimestamp": 1631664000000
}
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1629795600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"DELETE\", \"lastUpdatedTimestamp\": 1631664000000}",
"contentType": "application/json"
"json": {
"timestampMillis": 1631696400000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:real_shirshanka",
"operationType": "DELETE",
"lastUpdatedTimestamp": 1631664000000
}
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select userid from users\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 1, \"userEmail\": \"test-name@acryl.io\"}], \"fieldCounts\": []}",
"contentType": "application/json"
"json": {
"timestampMillis": 1631577600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 1,
"totalSqlQueries": 1,
"topSqlQueries": [
"select userid from users"
],
"userCounts": [
{
"user": "urn:li:corpuser:test-name",
"count": 1,
"userEmail": "test-name@acryl.io"
}
],
"fieldCounts": []
}
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,db1.schema1.category,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select catid from category\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:real_shirshanka\", \"count\": 1, \"userEmail\": \"real_shirshanka@acryl.io\"}], \"fieldCounts\": []}",
"contentType": "application/json"
"json": {
"timestampMillis": 1631577600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 1,
"totalSqlQueries": 1,
"topSqlQueries": [
"select catid from category"
],
"userCounts": [
{
"user": "urn:li:corpuser:real_shirshanka",
"count": 1,
"userEmail": "real_shirshanka@acryl.io"
}
],
"fieldCounts": []
}
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select cost from orders\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:real_shirshanka\", \"count\": 1, \"userEmail\": \"real_shirshanka@acryl.io\"}], \"fieldCounts\": []}",
"contentType": "application/json"
"json": {
"timestampMillis": 1631577600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 1,
"totalSqlQueries": 1,
"topSqlQueries": [
"select cost from orders"
],
"userCounts": [
{
"user": "urn:li:corpuser:real_shirshanka",
"count": 1,
"userEmail": "real_shirshanka@acryl.io"
}
],
"fieldCounts": []
}
}
}
]

View File

@ -6,6 +6,7 @@ from unittest.mock import Mock, patch
from freezegun import freeze_time
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.redshift.config import RedshiftConfig
@ -21,7 +22,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
)
from tests.test_helpers import mce_helpers
FROZEN_TIME = "2021-08-24 09:00:00"
FROZEN_TIME = "2021-09-15 09:00:00"
def test_redshift_usage_config():
@ -94,6 +95,7 @@ def test_redshift_usage_source(mock_cursor, mock_connection, pytestconfig, tmp_p
config=config,
connection=mock_connection,
report=source_report,
dataset_urn_builder=lambda table: make_dataset_urn("redshift", table),
)
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = {
@ -127,7 +129,7 @@ def test_redshift_usage_source(mock_cursor, mock_connection, pytestconfig, tmp_p
]
},
}
mwus = usage_extractor.generate_usage(all_tables=all_tables)
mwus = usage_extractor.get_usage_workunits(all_tables=all_tables)
metadata: List[
Union[
MetadataChangeEvent,
@ -200,6 +202,7 @@ def test_redshift_usage_filtering(mock_cursor, mock_connection, pytestconfig, tm
config=config,
connection=mock_connection,
report=RedshiftReport(),
dataset_urn_builder=lambda table: make_dataset_urn("redshift", table),
)
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = {
@ -215,7 +218,7 @@ def test_redshift_usage_filtering(mock_cursor, mock_connection, pytestconfig, tm
]
},
}
mwus = usage_extractor.generate_usage(all_tables=all_tables)
mwus = usage_extractor.get_usage_workunits(all_tables=all_tables)
metadata: List[
Union[
MetadataChangeEvent,

View File

@ -1,6 +1,7 @@
import json
from datetime import datetime, timezone
from datahub.configuration.time_window_config import BucketDuration
from datahub.ingestion.source.snowflake import snowflake_query
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
@ -247,6 +248,18 @@ def default_query_results(query): # noqa: C901
}
for op_idx in range(1, NUM_OPS + 1)
]
elif (
query
== snowflake_query.SnowflakeQuery.usage_per_object_per_time_bucket_for_time_window(
1654499820000,
1654586220000,
use_base_objects=False,
top_n_queries=10,
include_top_n_queries=True,
time_bucket_size=BucketDuration.DAY,
)
):
return []
elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history(
1654499820000,

View File

@ -5108,6 +5108,342 @@
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_8,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_10,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_2,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_6,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_9,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_5,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_7,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1654473600000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"uniqueUserCount": 0,
"totalSqlQueries": 0,
"topSqlQueries": [],
"userCounts": [],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",

View File

@ -121,7 +121,7 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
include_technical_schema=True,
include_table_lineage=True,
include_view_lineage=True,
include_usage_stats=False,
include_usage_stats=True,
use_legacy_lineage_method=False,
validate_upstreams_against_patterns=False,
include_operational_stats=True,

View File

@ -95,7 +95,7 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
include_technical_schema=True,
include_table_lineage=True,
include_view_lineage=True,
include_usage_stats=False,
include_usage_stats=True,
use_legacy_lineage_method=True,
validate_upstreams_against_patterns=False,
include_operational_stats=True,

View File

@ -7,6 +7,7 @@ from typing import Iterable, Tuple
import humanfriendly
import psutil
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryUsageConfig,
@ -44,7 +45,11 @@ def run_test():
),
file_backed_cache_size=1000,
)
usage_extractor = BigQueryUsageExtractor(config, report)
usage_extractor = BigQueryUsageExtractor(
config,
report,
lambda ref: make_dataset_urn("bigquery", str(ref.table_identifier)),
)
report.set_ingestion_stage("All", "Event Generation")
num_projects = 100
@ -70,7 +75,7 @@ def run_test():
report.set_ingestion_stage("All", "Event Ingestion")
with PerfTimer() as timer:
workunits = usage_extractor._run(events, table_refs)
workunits = usage_extractor._get_workunits_internal(events, table_refs)
num_workunits, peak_memory_usage = workunit_sink(workunits)
report.set_ingestion_stage("All", "Done")
print(f"Workunits Generated: {num_workunits}")

View File

@ -1,13 +1,14 @@
import logging
import random
from datetime import datetime, timedelta, timezone
from typing import cast
from typing import Iterable, cast
from unittest.mock import MagicMock, patch
import pytest
from freezegun import freeze_time
from datahub.configuration.time_window_config import BucketDuration
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
@ -36,6 +37,7 @@ from datahub.metadata.schema_classes import (
from tests.performance.bigquery import generate_events, ref_from_table
from tests.performance.data_generation import generate_data, generate_queries
from tests.performance.data_model import Container, FieldAccess, Query, Table, View
from tests.test_helpers.mce_helpers import assert_mces_equal
PROJECT_1 = "project-1"
PROJECT_2 = "project-2"
@ -195,7 +197,37 @@ def config() -> BigQueryV2Config:
@pytest.fixture
def usage_extractor(config: BigQueryV2Config) -> BigQueryUsageExtractor:
report = BigQueryV2Report()
return BigQueryUsageExtractor(config, report)
return BigQueryUsageExtractor(
config,
report,
lambda ref: make_dataset_urn("bigquery", str(ref.table_identifier)),
)
def make_zero_usage_workunit(
table: Table, time: datetime, bucket_duration: BucketDuration = BucketDuration.DAY
) -> MetadataWorkUnit:
return make_usage_workunit(
table=table,
dataset_usage_statistics=DatasetUsageStatisticsClass(
timestampMillis=int(time.timestamp() * 1000),
eventGranularity=TimeWindowSizeClass(unit=bucket_duration, multiple=1),
totalSqlQueries=0,
uniqueUserCount=0,
topSqlQueries=[],
userCounts=[],
fieldCounts=[],
),
)
def compare_workunits(
output: Iterable[MetadataWorkUnit], expected: Iterable[MetadataWorkUnit]
) -> None:
assert_mces_equal(
[wu.metadata.to_obj() for wu in output],
[wu.metadata.to_obj() for wu in expected],
)
def test_usage_counts_single_bucket_resource_project(
@ -217,8 +249,8 @@ def test_usage_counts_single_bucket_resource_project(
proabability_of_project_mismatch=0.5,
)
workunits = usage_extractor._run(events, TABLE_REFS.values())
assert list(workunits) == [
workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values())
expected = [
make_usage_workunit(
table=TABLE_1,
dataset_usage_statistics=DatasetUsageStatisticsClass(
@ -256,8 +288,11 @@ def test_usage_counts_single_bucket_resource_project(
),
],
),
)
),
make_zero_usage_workunit(TABLE_2, TS_1),
make_zero_usage_workunit(VIEW_1, TS_1),
]
compare_workunits(workunits, expected)
def test_usage_counts_multiple_buckets_and_resources_view_usage(
@ -292,8 +327,8 @@ def test_usage_counts_multiple_buckets_and_resources_view_usage(
proabability_of_project_mismatch=0.5,
)
workunits = usage_extractor._run(events, TABLE_REFS.values())
assert list(workunits) == [
workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values())
expected = [
# TS 1
make_usage_workunit(
table=TABLE_1,
@ -493,6 +528,7 @@ def test_usage_counts_multiple_buckets_and_resources_view_usage(
),
),
]
compare_workunits(workunits, expected)
assert usage_extractor.report.num_view_query_events == 5
assert usage_extractor.report.num_view_query_events_failed_sql_parsing == 0
assert usage_extractor.report.num_view_query_events_failed_table_identification == 0
@ -531,8 +567,8 @@ def test_usage_counts_multiple_buckets_and_resources_no_view_usage(
proabability_of_project_mismatch=0.5,
)
workunits = usage_extractor._run(events, TABLE_REFS.values())
assert list(workunits) == [
workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values())
expected = [
# TS 1
make_usage_workunit(
table=TABLE_1,
@ -723,7 +759,10 @@ def test_usage_counts_multiple_buckets_and_resources_no_view_usage(
],
),
),
make_zero_usage_workunit(VIEW_1, TS_1),
# TS_2 not included as only 1 minute of it was ingested
]
compare_workunits(workunits, expected)
assert usage_extractor.report.num_view_query_events == 0
@ -745,8 +784,24 @@ def test_usage_counts_no_query_event(
payload=None,
)
)
workunits = usage_extractor._run([event], [str(ref)])
assert list(workunits) == []
workunits = usage_extractor._get_workunits_internal([event], [str(ref)])
expected = [
MetadataChangeProposalWrapper(
entityUrn=ref.to_urn("PROD"),
aspect=DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
eventGranularity=TimeWindowSizeClass(
unit=BucketDuration.DAY, multiple=1
),
totalSqlQueries=0,
uniqueUserCount=0,
topSqlQueries=[],
userCounts=[],
fieldCounts=[],
),
).as_workunit()
]
compare_workunits(workunits, expected)
assert not caplog.records
@ -787,8 +842,10 @@ def test_usage_counts_no_columns(
),
]
with caplog.at_level(logging.WARNING):
workunits = usage_extractor._run(events, TABLE_REFS.values())
assert list(workunits) == [
workunits = usage_extractor._get_workunits_internal(
events, [TABLE_REFS[TABLE_1.name]]
)
expected = [
make_usage_workunit(
table=TABLE_1,
dataset_usage_statistics=DatasetUsageStatisticsClass(
@ -810,6 +867,7 @@ def test_usage_counts_no_columns(
),
)
]
compare_workunits(workunits, expected)
assert not caplog.records
@ -849,8 +907,8 @@ def test_operational_stats(
)
events = generate_events(queries, projects, table_to_project, config=config)
workunits = usage_extractor._run(events, table_refs.values())
assert list(workunits) == [
workunits = usage_extractor._get_workunits_internal(events, table_refs.values())
expected = [
make_operational_workunit(
table_refs[query.object_modified.name],
OperationClass(
@ -887,6 +945,15 @@ def test_operational_stats(
for query in queries
if query.object_modified and query.type in OPERATION_STATEMENT_TYPES.values()
]
compare_workunits(
[
wu
for wu in workunits
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
and isinstance(wu.metadata.aspect, OperationClass)
],
expected,
)
def test_get_tables_from_query(usage_extractor):

View File

@ -111,7 +111,9 @@ AND
OR
protoPayload.metadata.tableDataRead.reason = "JOB"
)""" # noqa: W293
source = BigQueryUsageExtractor(config, BigQueryV2Report())
source = BigQueryUsageExtractor(
config, BigQueryV2Report(), dataset_urn_builder=lambda _: ""
)
filter: str = source._generate_filter(BQ_AUDIT_V2)
assert filter == expected_filter

View File

@ -3,10 +3,20 @@ import pathlib
import re
import click
import pytest
from click.testing import CliRunner
from datahub.entrypoints import datahub
from datahub.utilities.logging_manager import get_log_buffer
from datahub.utilities.logging_manager import DATAHUB_PACKAGES, get_log_buffer
@pytest.fixture(autouse=True, scope="module")
def cleanup():
"""Attempt to clear undo the stateful changes done by invoking `my_logging_fn`, which calls `configure_logging`."""
yield
for lib in DATAHUB_PACKAGES:
lib_logger = logging.getLogger(lib)
lib_logger.propagate = True
@datahub.command()

View File

@ -1,7 +1,12 @@
import logging
from datetime import datetime
from typing import Any, Dict, Iterable, List, Union
from unittest.mock import patch
from freezegun import freeze_time
import datahub.metadata.schema_classes as models
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mce_builder import (
make_container_urn,
make_dataplatform_instance_urn,
@ -10,6 +15,7 @@ from datahub.emitter.mce_builder import (
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_empty_dataset_usage_statistics,
auto_status_aspect,
auto_workunit,
)
@ -403,3 +409,100 @@ def test_auto_browse_path_v2_dry_run(telemetry_ping_mock):
== 0
)
assert telemetry_ping_mock.call_count == 1
@freeze_time("2023-01-02 00:00:00")
def test_auto_empty_dataset_usage_statistics(caplog):
has_urn = make_dataset_urn("my_platform", "has_aspect")
empty_urn = make_dataset_urn("my_platform", "no_aspect")
config = BaseTimeWindowConfig()
wus = [
MetadataChangeProposalWrapper(
entityUrn=has_urn,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(config.start_time.timestamp() * 1000),
eventGranularity=models.TimeWindowSizeClass(
models.CalendarIntervalClass.DAY
),
uniqueUserCount=1,
totalSqlQueries=1,
),
).as_workunit()
]
with caplog.at_level(logging.WARNING):
new_wus = list(
auto_empty_dataset_usage_statistics(
wus,
dataset_urns={has_urn, empty_urn},
config=config,
all_buckets=False,
)
)
assert not caplog.records
assert new_wus == [
*wus,
MetadataChangeProposalWrapper(
entityUrn=empty_urn,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(datetime(2023, 1, 1).timestamp() * 1000),
eventGranularity=models.TimeWindowSizeClass(
models.CalendarIntervalClass.DAY
),
uniqueUserCount=0,
totalSqlQueries=0,
topSqlQueries=[],
userCounts=[],
fieldCounts=[],
),
).as_workunit(),
]
@freeze_time("2023-01-02 00:00:00")
def test_auto_empty_dataset_usage_statistics_invalid_timestamp(caplog):
urn = make_dataset_urn("my_platform", "my_dataset")
config = BaseTimeWindowConfig()
wus = [
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=0,
eventGranularity=models.TimeWindowSizeClass(
models.CalendarIntervalClass.DAY
),
uniqueUserCount=1,
totalSqlQueries=1,
),
).as_workunit()
]
with caplog.at_level(logging.WARNING):
new_wus = list(
auto_empty_dataset_usage_statistics(
wus,
dataset_urns={urn},
config=config,
all_buckets=True,
)
)
assert len(caplog.records) == 1
assert "1970-01-01 00:00:00+00:00" in caplog.records[0].msg
assert new_wus == [
*wus,
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(config.start_time.timestamp() * 1000),
eventGranularity=models.TimeWindowSizeClass(
models.CalendarIntervalClass.DAY
),
uniqueUserCount=0,
totalSqlQueries=0,
topSqlQueries=[],
userCounts=[],
fieldCounts=[],
),
changeType=models.ChangeTypeClass.CREATE,
).as_workunit(),
]