From aa5e02d0ec29b322a68fae9992ab4f5085c262e0 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Thu, 22 Jun 2023 17:07:50 -0400 Subject: [PATCH] feat(ingest): Create zero usage aspects (#8205) Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> --- .../configuration/time_window_config.py | 35 +- .../datahub/ingestion/api/source_helpers.py | 68 +++- .../ingestion/source/bigquery_v2/bigquery.py | 24 +- .../ingestion/source/bigquery_v2/usage.py | 28 +- .../ingestion/source/redshift/redshift.py | 50 +-- .../ingestion/source/redshift/usage.py | 42 ++- .../snowflake/snowflake_lineage_legacy.py | 55 +-- .../source/snowflake/snowflake_lineage_v2.py | 43 +-- .../source/snowflake/snowflake_usage_v2.py | 45 ++- .../source/snowflake/snowflake_v2.py | 30 +- .../datahub/ingestion/source/unity/source.py | 4 +- .../datahub/ingestion/source/unity/usage.py | 33 +- .../redshift_usages_filtered_golden.json | 42 ++- .../redshift_usages_golden.json | 126 +++++-- .../redshift-usage/test_redshift_usage.py | 9 +- .../tests/integration/snowflake/common.py | 13 + .../snowflake/snowflake_golden.json | 336 ++++++++++++++++++ .../integration/snowflake/test_snowflake.py | 2 +- .../test_snowflake_legacy_lineage.py | 2 +- .../tests/performance/test_bigquery_usage.py | 9 +- .../tests/unit/test_bigquery_usage.py | 97 ++++- .../unit/test_bigqueryv2_usage_source.py | 4 +- .../tests/unit/test_cli_logging.py | 12 +- .../tests/unit/test_source_helpers.py | 103 ++++++ 24 files changed, 974 insertions(+), 238 deletions(-) diff --git a/metadata-ingestion/src/datahub/configuration/time_window_config.py b/metadata-ingestion/src/datahub/configuration/time_window_config.py index 365534af76..a4b451f0cd 100644 --- a/metadata-ingestion/src/datahub/configuration/time_window_config.py +++ b/metadata-ingestion/src/datahub/configuration/time_window_config.py @@ -1,6 +1,6 @@ import enum from datetime import datetime, timedelta, timezone -from typing import Any, Dict +from typing import Any, Dict, List import pydantic from pydantic.fields import Field @@ -62,3 +62,36 @@ class BaseTimeWindowConfig(ConfigModel): 'timezone is not UTC; try adding a "Z" to the value e.g. "2021-07-20T00:00:00Z"' ) return v + + def buckets(self) -> List[datetime]: + """Returns list of timestamps for each DatasetUsageStatistics bucket. + + Includes all buckets in the time window, including partially contained buckets. + """ + bucket_timedelta = get_bucket_duration_delta(self.bucket_duration) + + curr_bucket = get_time_bucket(self.start_time, self.bucket_duration) + buckets = [] + while curr_bucket < self.end_time: + buckets.append(curr_bucket) + curr_bucket += bucket_timedelta + + return buckets + + def majority_buckets(self) -> List[datetime]: + """Returns list of timestamps for each DatasetUsageStatistics bucket. + + Includes only buckets in the time window for which a majority of the bucket is ingested. + """ + bucket_timedelta = get_bucket_duration_delta(self.bucket_duration) + + curr_bucket = get_time_bucket(self.start_time, self.bucket_duration) + buckets = [] + while curr_bucket < self.end_time: + start = max(self.start_time, curr_bucket) + end = min(self.end_time, curr_bucket + bucket_timedelta) + if end - start >= bucket_timedelta / 2: + buckets.append(curr_bucket) + curr_bucket += bucket_timedelta + + return buckets diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 32aa6fc7e3..0eabd22e77 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -1,4 +1,5 @@ import logging +from datetime import datetime, timezone from typing import ( TYPE_CHECKING, Callable, @@ -13,6 +14,7 @@ from typing import ( Union, ) +from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.emitter.mce_builder import make_dataplatform_instance_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -20,13 +22,17 @@ from datahub.metadata.schema_classes import ( BrowsePathEntryClass, BrowsePathsClass, BrowsePathsV2Class, + ChangeTypeClass, ContainerClass, + DatasetUsageStatisticsClass, MetadataChangeEventClass, MetadataChangeProposalClass, StatusClass, TagKeyClass, + TimeWindowSizeClass, ) from datahub.telemetry import telemetry +from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import guess_entity_type from datahub.utilities.urns.urn_iter import list_urns @@ -149,11 +155,11 @@ def auto_materialize_referenced_tags( for wu in stream: for urn in list_urns(wu.metadata): - if guess_entity_type(urn) == "tag": + if guess_entity_type(urn) == TagUrn.ENTITY_TYPE: referenced_tags.add(urn) urn = wu.get_urn() - if guess_entity_type(urn) == "tag": + if guess_entity_type(urn) == TagUrn.ENTITY_TYPE: tags_with_aspects.add(urn) yield wu @@ -274,6 +280,64 @@ def auto_browse_path_v2( telemetry.telemetry_instance.ping("incorrect_browse_path_v2", properties) +def auto_empty_dataset_usage_statistics( + stream: Iterable[MetadataWorkUnit], + *, + dataset_urns: Set[str], + config: BaseTimeWindowConfig, + all_buckets: bool = False, # TODO: Enable when CREATE changeType is supported for timeseries aspects +) -> Iterable[MetadataWorkUnit]: + """Emit empty usage statistics aspect for all dataset_urns ingested with no usage.""" + buckets = config.buckets() if all_buckets else config.majority_buckets() + bucket_timestamps = [int(bucket.timestamp() * 1000) for bucket in buckets] + + # Maps time bucket -> urns with usage statistics for that bucket + usage_statistics_urns: Dict[int, Set[str]] = {ts: set() for ts in bucket_timestamps} + invalid_timestamps = set() + + for wu in stream: + yield wu + if not wu.is_primary_source: + continue + + urn = wu.get_urn() + if guess_entity_type(urn) == DatasetUrn.ENTITY_TYPE: + dataset_urns.add(urn) + usage_aspect = wu.get_aspect_of_type(DatasetUsageStatisticsClass) + if usage_aspect: + if usage_aspect.timestampMillis in bucket_timestamps: + usage_statistics_urns[usage_aspect.timestampMillis].add(urn) + elif all_buckets: + invalid_timestamps.add(usage_aspect.timestampMillis) + + if invalid_timestamps: + logger.warning( + f"Usage statistics with unexpected timestamps, bucket_duration={config.bucket_duration}:\n" + ", ".join( + str(datetime.fromtimestamp(ts, tz=timezone.utc)) + for ts in invalid_timestamps + ) + ) + + for bucket in bucket_timestamps: + for urn in dataset_urns - usage_statistics_urns[bucket]: + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=DatasetUsageStatisticsClass( + timestampMillis=bucket, + eventGranularity=TimeWindowSizeClass(unit=config.bucket_duration), + uniqueUserCount=0, + totalSqlQueries=0, + topSqlQueries=[], + userCounts=[], + fieldCounts=[], + ), + changeType=ChangeTypeClass.CREATE + if all_buckets + else ChangeTypeClass.UPSERT, + ).as_workunit() + + def _batch_workunits_by_urn( stream: Iterable[MetadataWorkUnit], ) -> Iterable[Tuple[str, List[MetadataWorkUnit]]]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 4daee3a59e..82e9647c48 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -154,6 +154,10 @@ def cleanup(config: BigQueryV2Config) -> None: ) @capability(SourceCapability.DESCRIPTIONS, "Enabled by default") @capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration") +@capability( + SourceCapability.USAGE_STATS, + "Enabled by default, can be disabled via configuration `include_usage_statistics`", +) @capability( SourceCapability.DELETION_DETECTION, "Optionally enabled via `stateful_ingestion.remove_stale_metadata`", @@ -218,7 +222,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): # For database, schema, tables, views, etc self.lineage_extractor = BigqueryLineageExtractor(config, self.report) - self.usage_extractor = BigQueryUsageExtractor(config, self.report) + self.usage_extractor = BigQueryUsageExtractor( + config, self.report, dataset_urn_builder=self.gen_dataset_urn_from_ref + ) self.domain_registry: Optional[DomainRegistry] = None if self.config.domain: @@ -330,7 +336,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): project_ids: List[str], report: BigQueryV2Report, ) -> CapabilityReport: - usage_extractor = BigQueryUsageExtractor(connection_conf, report) + usage_extractor = BigQueryUsageExtractor( + connection_conf, report, lambda ref: "" + ) for project_id in project_ids: try: logger.info(f"Usage capability test for project {project_id}") @@ -488,7 +496,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): yield from self._process_project(conn, project_id) if self._should_ingest_usage(): - yield from self.usage_extractor.run( + yield from self.usage_extractor.get_usage_workunits( [p.id for p in projects], self.table_refs ) @@ -1060,12 +1068,18 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): def gen_dataset_urn(self, project_id: str, dataset_name: str, table: str) -> str: datahub_dataset_name = BigqueryTableIdentifier(project_id, dataset_name, table) - dataset_urn = make_dataset_urn( + return make_dataset_urn( self.platform, str(datahub_dataset_name), self.config.env, ) - return dataset_urn + + def gen_dataset_urn_from_ref(self, ref: BigQueryTableRef) -> str: + return self.gen_dataset_urn( + ref.table_identifier.project_id, + ref.table_identifier.dataset, + ref.table_identifier.table, + ) def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]: schema_fields: List[SchemaField] = [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index d61134543d..1081dd8eec 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -29,6 +29,7 @@ from datahub.configuration.time_window_config import get_time_bucket from datahub.emitter.mce_builder import make_user_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.closeable import Closeable +from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( BQ_AUDIT_V2, @@ -371,9 +372,15 @@ class BigQueryUsageExtractor: ::: """ - def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report): + def __init__( + self, + config: BigQueryV2Config, + report: BigQueryV2Report, + dataset_urn_builder: Callable[[BigQueryTableRef], str], + ): self.config: BigQueryV2Config = config self.report: BigQueryV2Report = report + self.dataset_urn_builder = dataset_urn_builder # Replace hash of query with uuid if there are hash conflicts self.uuid_to_query: Dict[str, str] = {} @@ -384,13 +391,13 @@ class BigQueryUsageExtractor: and self.config.table_pattern.allowed(table_ref.table_identifier.table) ) - def run( + def get_usage_workunits( self, projects: Iterable[str], table_refs: Collection[str] ) -> Iterable[MetadataWorkUnit]: events = self._get_usage_events(projects) - yield from self._run(events, table_refs) + yield from self._get_workunits_internal(events, table_refs) - def _run( + def _get_workunits_internal( self, events: Iterable[AuditEvent], table_refs: Collection[str] ) -> Iterable[MetadataWorkUnit]: try: @@ -404,7 +411,14 @@ class BigQueryUsageExtractor: usage_state, table_refs ) - yield from self._generate_usage_workunits(usage_state) + yield from auto_empty_dataset_usage_statistics( + self._generate_usage_workunits(usage_state), + config=self.config, + dataset_urns={ + self.dataset_urn_builder(BigQueryTableRef.from_string_name(ref)) + for ref in table_refs + }, + ) usage_state.report_disk_usage(self.report) except Exception as e: logger.error("Error processing usage", exc_info=True) @@ -526,9 +540,7 @@ class BigQueryUsageExtractor: user_freq=entry.user_freq, column_freq=entry.column_freq, bucket_duration=self.config.bucket_duration, - resource_urn_builder=lambda resource: resource.to_urn( - self.config.env - ), + resource_urn_builder=self.dataset_urn_builder, top_n_queries=self.config.usage.top_n_queries, format_sql_queries=self.config.usage.format_sql_queries, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index c6af80641a..099e982691 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -108,7 +108,10 @@ logger: logging.Logger = logging.getLogger(__name__) @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") @capability(SourceCapability.DESCRIPTIONS, "Enabled by default") @capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration") -@capability(SourceCapability.USAGE_STATS, "Optionally enabled via configuration") +@capability( + SourceCapability.USAGE_STATS, + "Enabled by default, can be disabled via configuration `include_usage_statistics`", +) @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") class RedshiftSource(StatefulIngestionSourceBase, TestableSource): """ @@ -593,7 +596,6 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): database=database, schema=table.schema, sub_type=DatasetSubTypes.TABLE, - tags_to_add=[], custom_properties=custom_properties, ) @@ -609,17 +611,11 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): database=get_db_name(self.config), schema=schema, sub_type=DatasetSubTypes.VIEW, - tags_to_add=[], custom_properties={}, ) datahub_dataset_name = f"{database}.{schema}.{view.name}" - dataset_urn = make_dataset_urn_with_platform_instance( - platform=self.platform, - name=datahub_dataset_name, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) + dataset_urn = self.gen_dataset_urn(datahub_dataset_name) if view.ddl: view_properties_aspect = ViewProperties( materialized=view.type == "VIEW_MATERIALIZED", @@ -683,6 +679,14 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): entityUrn=dataset_urn, aspect=schema_metadata ).as_workunit() + def gen_dataset_urn(self, datahub_dataset_name: str) -> str: + return make_dataset_urn_with_platform_instance( + platform=self.platform, + name=datahub_dataset_name, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + # TODO: Move to common def gen_dataset_workunits( self, @@ -690,16 +694,10 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): database: str, schema: str, sub_type: str, - tags_to_add: Optional[List[str]] = None, custom_properties: Optional[Dict[str, str]] = None, ) -> Iterable[MetadataWorkUnit]: datahub_dataset_name = f"{database}.{schema}.{table.name}" - dataset_urn = make_dataset_urn_with_platform_instance( - platform=self.platform, - name=datahub_dataset_name, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) + dataset_urn = self.gen_dataset_urn(datahub_dataset_name) yield from self.gen_schema_metadata( dataset_urn, table, str(datahub_dataset_name) @@ -857,12 +855,12 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): return with PerfTimer() as timer: - usage_extractor = RedshiftUsageExtractor( + yield from RedshiftUsageExtractor( config=self.config, connection=connection, report=self.report, - ) - yield from usage_extractor.generate_usage(all_tables=all_tables) + dataset_urn_builder=self.gen_dataset_urn, + ).get_usage_workunits(all_tables=all_tables) self.report.usage_extraction_sec[database] = round( timer.elapsed_seconds(), 2 @@ -917,12 +915,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): ) continue datahub_dataset_name = f"{database}.{schema}.{table.name}" - dataset_urn = make_dataset_urn_with_platform_instance( - platform=self.platform, - name=datahub_dataset_name, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) + dataset_urn = self.gen_dataset_urn(datahub_dataset_name) lineage_info = self.lineage_extractor.get_lineage( table, @@ -937,12 +930,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): for schema in self.db_views[database]: for view in self.db_views[database][schema]: datahub_dataset_name = f"{database}.{schema}.{view.name}" - dataset_urn = make_dataset_urn_with_platform_instance( - platform=self.platform, - name=datahub_dataset_name, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) + dataset_urn = self.gen_dataset_urn(datahub_dataset_name) lineage_info = self.lineage_extractor.get_lineage( view, dataset_urn, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py index 6e768c4674..653b41d690 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py @@ -2,7 +2,7 @@ import collections import logging import time from datetime import datetime -from typing import Dict, Iterable, List, Optional, Union +from typing import Callable, Dict, Iterable, List, Optional, Union import pydantic.error_wrappers import redshift_connector @@ -12,6 +12,7 @@ from pydantic.main import BaseModel import datahub.emitter.mce_builder as builder from datahub.configuration.time_window_config import get_time_bucket from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.redshift.config import RedshiftConfig from datahub.ingestion.source.redshift.redshift_schema import ( @@ -22,7 +23,6 @@ from datahub.ingestion.source.redshift.report import RedshiftReport from datahub.ingestion.source.usage.usage_common import GenericAggregatedDataset from datahub.metadata.schema_classes import OperationClass, OperationTypeClass from datahub.utilities.perf_timer import PerfTimer -from datahub.utilities.urns.dataset_urn import DatasetUrn logger = logging.getLogger(__name__) @@ -169,19 +169,34 @@ class RedshiftUsageExtractor: config: RedshiftConfig, connection: redshift_connector.Connection, report: RedshiftReport, + dataset_urn_builder: Callable[[str], str], ): self.config = config self.report = report self.connection = connection + self.dataset_urn_builder = dataset_urn_builder - def generate_usage( + def get_usage_workunits( + self, all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] + ) -> Iterable[MetadataWorkUnit]: + yield from auto_empty_dataset_usage_statistics( + self._get_workunits_internal(all_tables), + config=self.config, + dataset_urns={ + self.dataset_urn_builder(f"{database}.{schema}.{table.name}") + for database in all_tables + for schema in all_tables[database] + for table in all_tables[database][schema] + }, + ) + + def _get_workunits_internal( self, all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] ) -> Iterable[MetadataWorkUnit]: self.report.num_usage_workunits_emitted = 0 self.report.num_usage_stat_skipped = 0 self.report.num_operational_stats_skipped = 0 - """Gets Redshift usage stats as work units""" if self.config.include_operational_stats: with PerfTimer() as timer: # Generate operation aspect workunits @@ -320,7 +335,6 @@ class RedshiftUsageExtractor: assert event.operation_type in ["insert", "delete"] - resource: str = f"{event.database}.{event.schema_}.{event.table}" reported_time: int = int(time.time() * 1000) last_updated_timestamp: int = int(event.endtime.timestamp() * 1000) user_email: str = event.username @@ -334,15 +348,10 @@ class RedshiftUsageExtractor: else OperationTypeClass.DELETE ), ) - dataset_urn = DatasetUrn.create_from_ids( - platform_id="redshift", - table_name=resource.lower(), - platform_instance=self.config.platform_instance, - env=self.config.env, - ) + resource: str = f"{event.database}.{event.schema_}.{event.table}".lower() yield MetadataChangeProposalWrapper( - entityUrn=str(dataset_urn), aspect=operation_aspect + entityUrn=self.dataset_urn_builder(resource), aspect=operation_aspect ).as_workunit() self.report.num_operational_stats_workunits_emitted += 1 @@ -354,7 +363,7 @@ class RedshiftUsageExtractor: floored_ts: datetime = get_time_bucket( event.starttime, self.config.bucket_duration ) - resource: str = f"{event.database}.{event.schema_}.{event.table}" + resource: str = f"{event.database}.{event.schema_}.{event.table}".lower() # Get a reference to the bucket value(or initialize not yet in dict) and update it. agg_bucket: AggregatedDataset = datasets[floored_ts].setdefault( resource, @@ -378,12 +387,7 @@ class RedshiftUsageExtractor: def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit: return agg.make_usage_workunit( self.config.bucket_duration, - lambda resource: builder.make_dataset_urn_with_platform_instance( - "redshift", - resource.lower(), - self.config.platform_instance, - self.config.env, - ), + self.dataset_urn_builder, self.config.top_n_queries, self.config.format_sql_queries, self.config.include_top_n_queries, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_legacy.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_legacy.py index 327537c989..5eb05e42ef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_legacy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_legacy.py @@ -2,7 +2,7 @@ import json import logging from collections import defaultdict from dataclasses import dataclass, field -from typing import Dict, FrozenSet, Iterable, List, Optional, Set +from typing import Callable, Dict, FrozenSet, Iterable, List, Optional, Set from pydantic import Field from pydantic.error_wrappers import ValidationError @@ -177,15 +177,20 @@ class SnowflakeLineageExtractor( Edition Note - Snowflake Standard Edition does not have Access History Feature. So it does not support lineage extraction for edges 3, 4, 5 mentioned above. """ - def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None: + def __init__( + self, + config: SnowflakeV2Config, + report: SnowflakeV2Report, + dataset_urn_builder: Callable[[str], str], + ) -> None: self._lineage_map: Dict[str, SnowflakeTableLineage] = defaultdict( SnowflakeTableLineage ) self._external_lineage_map: Dict[str, Set[str]] = defaultdict(set) self.config = config - self.platform = "snowflake" self.report = report self.logger = logger + self.dataset_urn_builder = dataset_urn_builder self.connection: Optional[SnowflakeConnection] = None def get_workunits( @@ -228,31 +233,21 @@ class SnowflakeLineageExtractor( def get_table_upstream_workunits(self, discovered_tables): if self.config.include_table_lineage: for dataset_name in discovered_tables: - dataset_urn = builder.make_dataset_urn_with_platform_instance( - self.platform, - dataset_name, - self.config.platform_instance, - self.config.env, - ) upstream_lineage = self._get_upstream_lineage_info(dataset_name) if upstream_lineage is not None: yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=upstream_lineage + entityUrn=self.dataset_urn_builder(dataset_name), + aspect=upstream_lineage, ).as_workunit() def get_view_upstream_workunits(self, discovered_views): if self.config.include_view_lineage: for view_name in discovered_views: - dataset_urn = builder.make_dataset_urn_with_platform_instance( - self.platform, - view_name, - self.config.platform_instance, - self.config.env, - ) upstream_lineage = self._get_upstream_lineage_info(view_name) if upstream_lineage is not None: yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=upstream_lineage + entityUrn=self.dataset_urn_builder(view_name), + aspect=upstream_lineage, ).as_workunit() def _get_upstream_lineage_info( @@ -267,17 +262,13 @@ class SnowflakeLineageExtractor( upstream_tables: List[UpstreamClass] = [] finegrained_lineages: List[FineGrainedLineage] = [] - dataset_urn = builder.make_dataset_urn_with_platform_instance( - self.platform, - dataset_name, - self.config.platform_instance, - self.config.env, - ) # Populate the table-lineage in aspect self.update_upstream_tables_lineage(upstream_tables, lineage) # Populate the column-lineage in aspect - self.update_upstream_columns_lineage(dataset_urn, finegrained_lineages, lineage) + self.update_upstream_columns_lineage( + self.dataset_urn_builder(dataset_name), finegrained_lineages, lineage + ) # Populate the external-table-lineage(s3->snowflake) in aspect self.update_external_tables_lineage(upstream_tables, external_lineage) @@ -579,14 +570,8 @@ class SnowflakeLineageExtractor( lineage.upstreamTables.values(), key=lambda x: x.upstreamDataset ): upstream_table_name = lineage_entry.upstreamDataset - upstream_table_urn = builder.make_dataset_urn_with_platform_instance( - self.platform, - upstream_table_name, - self.config.platform_instance, - self.config.env, - ) upstream_table = UpstreamClass( - dataset=upstream_table_urn, + dataset=self.dataset_urn_builder(upstream_table_name), type=DatasetLineageTypeClass.TRANSFORMED, ) upstream_tables.append(upstream_table) @@ -657,15 +642,9 @@ class SnowflakeLineageExtractor( upstream_dataset_name = self.get_dataset_identifier_from_qualified_name( upstream_col.objectName ) - upstream_dataset_urn = builder.make_dataset_urn_with_platform_instance( - self.platform, - upstream_dataset_name, - self.config.platform_instance, - self.config.env, - ) column_upstreams.append( builder.make_schema_field_urn( - upstream_dataset_urn, + self.dataset_urn_builder(upstream_dataset_name), self.snowflake_identifier(upstream_col.columnName), ) ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index 7a5fbf74cd..d889cfa7eb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -2,7 +2,7 @@ import json import logging from collections import defaultdict from dataclasses import dataclass, field -from typing import Dict, FrozenSet, Iterable, List, Optional, Set +from typing import Callable, Dict, FrozenSet, Iterable, List, Optional, Set from snowflake.connector import SnowflakeConnection @@ -77,12 +77,17 @@ class SnowflakeLineageExtractor( Edition Note - Snowflake Standard Edition does not have Access History Feature. So it does not support lineage extraction for edges 3, 4, 5 mentioned above. """ - def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None: + def __init__( + self, + config: SnowflakeV2Config, + report: SnowflakeV2Report, + dataset_urn_builder: Callable[[str], str], + ) -> None: self._external_lineage_map: Dict[str, Set[str]] = defaultdict(set) self.config = config - self.platform = "snowflake" self.report = report self.logger = logger + self.dataset_urn_builder = dataset_urn_builder self.connection: Optional[SnowflakeConnection] = None def get_workunits( @@ -188,12 +193,6 @@ class SnowflakeLineageExtractor( def _create_upstream_lineage_workunit( self, dataset_name, upstreams, fine_upstreams=[] ): - dataset_urn = builder.make_dataset_urn_with_platform_instance( - self.platform, - dataset_name, - self.config.platform_instance, - self.config.env, - ) logger.debug( f"Upstream lineage of '{dataset_name}': {[u.dataset for u in upstreams]}" ) @@ -209,7 +208,7 @@ class SnowflakeLineageExtractor( or None, ) return MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=upstream_lineage + entityUrn=self.dataset_urn_builder(dataset_name), aspect=upstream_lineage ).as_workunit() def get_upstreams_from_query_result_row(self, dataset_name, db_row): @@ -226,14 +225,9 @@ class SnowflakeLineageExtractor( and "UPSTREAM_COLUMNS" in db_row and db_row["UPSTREAM_COLUMNS"] is not None ): - dataset_urn = builder.make_dataset_urn_with_platform_instance( - self.platform, - dataset_name, - self.config.platform_instance, - self.config.env, - ) fine_upstreams = self.map_query_result_fine_upstreams( - dataset_urn, json.loads(db_row["UPSTREAM_COLUMNS"]) + self.dataset_urn_builder(dataset_name), + json.loads(db_row["UPSTREAM_COLUMNS"]), ) # Populate the external-table-lineage(s3->snowflake), if present @@ -379,12 +373,7 @@ class SnowflakeLineageExtractor( ): upstreams.append( UpstreamClass( - dataset=builder.make_dataset_urn_with_platform_instance( - self.platform, - upstream_name, - self.config.platform_instance, - self.config.env, - ), + dataset=self.dataset_urn_builder(upstream_name), type=DatasetLineageTypeClass.TRANSFORMED, ) ) @@ -495,15 +484,9 @@ class SnowflakeLineageExtractor( upstream_dataset_name = self.get_dataset_identifier_from_qualified_name( upstream_col.objectName ) - upstream_dataset_urn = builder.make_dataset_urn_with_platform_instance( - self.platform, - upstream_dataset_name, - self.config.platform_instance, - self.config.env, - ) column_upstreams.append( builder.make_schema_field_urn( - upstream_dataset_urn, + self.dataset_urn_builder(upstream_dataset_name), self.snowflake_identifier(upstream_col.columnName), ) ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index b86b6054d0..7cdc2283a0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -2,16 +2,14 @@ import json import logging import time from datetime import datetime, timezone -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Callable, Dict, Iterable, List, Optional import pydantic from snowflake.connector import SnowflakeConnection -from datahub.emitter.mce_builder import ( - make_dataset_urn_with_platform_instance, - make_user_urn, -) +from datahub.emitter.mce_builder import make_user_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.snowflake.constants import SnowflakeEdition from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config @@ -104,13 +102,19 @@ class SnowflakeJoinedAccessEvent(PermissiveModel): class SnowflakeUsageExtractor( SnowflakeQueryMixin, SnowflakeConnectionMixin, SnowflakeCommonMixin ): - def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None: + def __init__( + self, + config: SnowflakeV2Config, + report: SnowflakeV2Report, + dataset_urn_builder: Callable[[str], str], + ) -> None: self.config: SnowflakeV2Config = config self.report: SnowflakeV2Report = report + self.dataset_urn_builder = dataset_urn_builder self.logger = logger self.connection: Optional[SnowflakeConnection] = None - def get_workunits( + def get_usage_workunits( self, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: self.connection = self.create_connection() @@ -138,7 +142,14 @@ class SnowflakeUsageExtractor( # Now, we report the usage as well as operation metadata even if user email is absent if self.config.include_usage_stats: - yield from self.get_usage_workunits(discovered_datasets) + yield from auto_empty_dataset_usage_statistics( + self._get_workunits_internal(discovered_datasets), + config=self.config, + dataset_urns={ + self.dataset_urn_builder(dataset_identifier) + for dataset_identifier in discovered_datasets + }, + ) if self.config.include_operational_stats: # Generate the operation workunits. @@ -148,7 +159,7 @@ class SnowflakeUsageExtractor( event, discovered_datasets ) - def get_usage_workunits( + def _get_workunits_internal( self, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: with PerfTimer() as timer: @@ -210,15 +221,9 @@ class SnowflakeUsageExtractor( userCounts=self._map_user_counts(json.loads(row["USER_COUNTS"])), fieldCounts=self._map_field_counts(json.loads(row["FIELD_COUNTS"])), ) - dataset_urn = make_dataset_urn_with_platform_instance( - self.platform, - dataset_identifier, - self.config.platform_instance, - self.config.env, - ) yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=stats + entityUrn=self.dataset_urn_builder(dataset_identifier), aspect=stats ).as_workunit() except Exception as e: logger.debug( @@ -368,12 +373,6 @@ class SnowflakeUsageExtractor( ) continue - dataset_urn = make_dataset_urn_with_platform_instance( - self.platform, - dataset_identifier, - self.config.platform_instance, - self.config.env, - ) operation_aspect = OperationClass( timestampMillis=reported_time, lastUpdatedTimestamp=last_updated_timestamp, @@ -384,7 +383,7 @@ class SnowflakeUsageExtractor( else None, ) mcp = MetadataChangeProposalWrapper( - entityUrn=dataset_urn, + entityUrn=self.dataset_urn_builder(dataset_identifier), aspect=operation_aspect, ) wu = MetadataWorkUnit( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 7ecefd19c8..4ac64b6106 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -195,7 +195,7 @@ SNOWFLAKE_FIELD_TYPE_MAPPINGS = { ) @capability( SourceCapability.USAGE_STATS, - "Enabled by default, can be disabled via configuration `include_usage_stats", + "Enabled by default, can be disabled via configuration `include_usage_stats`", ) @capability( SourceCapability.DELETION_DETECTION, @@ -245,14 +245,17 @@ class SnowflakeV2Source( # For lineage if self.config.use_legacy_lineage_method: self.lineage_extractor = SnowflakeLineageLegacyExtractor( - config, self.report + config, self.report, dataset_urn_builder=self.gen_dataset_urn ) else: - self.lineage_extractor = SnowflakeLineageExtractor(config, self.report) + self.lineage_extractor = SnowflakeLineageExtractor( + config, self.report, dataset_urn_builder=self.gen_dataset_urn + ) if config.include_usage_stats or config.include_operational_stats: - # For usage stats - self.usage_extractor = SnowflakeUsageExtractor(config, self.report) + self.usage_extractor = SnowflakeUsageExtractor( + config, self.report, dataset_urn_builder=self.gen_dataset_urn + ) self.tag_extractor = SnowflakeTagExtractor( config, self.data_dictionary, self.report @@ -576,7 +579,7 @@ class SnowflakeV2Source( end_time_millis=datetime_to_ts_millis(self.config.end_time), ) - yield from self.usage_extractor.get_workunits(discovered_datasets) + yield from self.usage_extractor.get_usage_workunits(discovered_datasets) def report_warehouse_failure(self): if self.config.warehouse is not None: @@ -976,6 +979,14 @@ class SnowflakeV2Source( yield from self.gen_tag_workunits(tag) + def gen_dataset_urn(self, dataset_identifier: str) -> str: + return make_dataset_urn_with_platform_instance( + platform=self.platform, + name=dataset_identifier, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + def gen_dataset_workunits( self, table: Union[SnowflakeTable, SnowflakeView], @@ -983,12 +994,7 @@ class SnowflakeV2Source( db_name: str, ) -> Iterable[MetadataWorkUnit]: dataset_name = self.get_dataset_identifier(table.name, schema_name, db_name) - dataset_urn = make_dataset_urn_with_platform_instance( - self.platform, - dataset_name, - self.config.platform_instance, - self.config.env, - ) + dataset_urn = self.gen_dataset_urn(dataset_name) status = Status(removed=False) yield MetadataChangeProposalWrapper( diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 5e152351b0..9d82a9e247 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -196,7 +196,9 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource): table_urn_builder=self.gen_dataset_urn, user_urn_builder=self.gen_user_urn, ) - yield from usage_extractor.run(self.table_refs | self.view_refs) + yield from usage_extractor.get_usage_workunits( + self.table_refs | self.view_refs + ) if self.config.profiling.enabled: assert wait_on_warehouse diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py index befa1fdaeb..d5da93c7be 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py @@ -10,6 +10,7 @@ from databricks.sdk.service.sql import QueryStatementType from sqllineage.runner import LineageRunner from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy @@ -62,20 +63,24 @@ class UnityCatalogUsageExtractor: ) return self._spark_sql_parser - def run(self, table_refs: Set[TableReference]) -> Iterable[MetadataWorkUnit]: + def get_usage_workunits( + self, table_refs: Set[TableReference] + ) -> Iterable[MetadataWorkUnit]: try: - table_map = defaultdict(list) - for ref in table_refs: - table_map[ref.table].append(ref) - table_map[f"{ref.schema}.{ref.table}"].append(ref) - table_map[ref.qualified_table_name].append(ref) - - yield from self._generate_workunits(table_map) + yield from self._get_workunits_internal(table_refs) except Exception as e: logger.error("Error processing usage", exc_info=True) self.report.report_warning("usage-extraction", str(e)) - def _generate_workunits(self, table_map: TableMap) -> Iterable[MetadataWorkUnit]: + def _get_workunits_internal( + self, table_refs: Set[TableReference] + ) -> Iterable[MetadataWorkUnit]: + table_map = defaultdict(list) + for ref in table_refs: + table_map[ref.table].append(ref) + table_map[f"{ref.schema}.{ref.table}"].append(ref) + table_map[ref.qualified_table_name].append(ref) + for query in self._get_queries(): self.report.num_queries += 1 table_info = self._parse_query(query, table_map) @@ -100,9 +105,13 @@ class UnityCatalogUsageExtractor: ) return - yield from self.usage_aggregator.generate_workunits( - resource_urn_builder=self.table_urn_builder, - user_urn_builder=self.user_urn_builder, + yield from auto_empty_dataset_usage_statistics( + self.usage_aggregator.generate_workunits( + resource_urn_builder=self.table_urn_builder, + user_urn_builder=self.user_urn_builder, + ), + dataset_urns={self.table_urn_builder(ref) for ref in table_refs}, + config=self.config, ) def _generate_operation_workunit( diff --git a/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_filtered_golden.json b/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_filtered_golden.json index 99e0976024..001cff5dde 100644 --- a/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_filtered_golden.json +++ b/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_filtered_golden.json @@ -1,26 +1,52 @@ [ { - "auditHeader": null, "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)", - "entityKeyAspect": null, "changeType": "UPSERT", "aspectName": "operation", "aspect": { - "value": "{\"timestampMillis\": 1629795600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"INSERT\", \"lastUpdatedTimestamp\": 1631664000000}", - "contentType": "application/json" + "json": { + "timestampMillis": 1631696400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:test-name", + "operationType": "INSERT", + "lastUpdatedTimestamp": 1631664000000 + } } }, { - "auditHeader": null, "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)", - "entityKeyAspect": null, "changeType": "UPSERT", "aspectName": "datasetUsageStatistics", "aspect": { - "value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select userid from users\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 1, \"userEmail\": \"test-name@acryl.io\"}], \"fieldCounts\": []}", - "contentType": "application/json" + "json": { + "timestampMillis": 1631577600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 1, + "totalSqlQueries": 1, + "topSqlQueries": [ + "select userid from users" + ], + "userCounts": [ + { + "user": "urn:li:corpuser:test-name", + "count": 1, + "userEmail": "test-name@acryl.io" + } + ], + "fieldCounts": [] + } } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json b/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json index 29914abce2..006d375cc7 100644 --- a/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json +++ b/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json @@ -1,74 +1,152 @@ [ { - "auditHeader": null, "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)", - "entityKeyAspect": null, "changeType": "UPSERT", "aspectName": "operation", "aspect": { - "value": "{\"timestampMillis\": 1629795600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"INSERT\", \"lastUpdatedTimestamp\": 1631664000000}", - "contentType": "application/json" + "json": { + "timestampMillis": 1631696400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:test-name", + "operationType": "INSERT", + "lastUpdatedTimestamp": 1631664000000 + } } }, { - "auditHeader": null, "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,db1.schema1.category,PROD)", - "entityKeyAspect": null, "changeType": "UPSERT", "aspectName": "operation", "aspect": { - "value": "{\"timestampMillis\": 1629795600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"INSERT\", \"lastUpdatedTimestamp\": 1631664000000}", - "contentType": "application/json" + "json": { + "timestampMillis": 1631696400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:real_shirshanka", + "operationType": "INSERT", + "lastUpdatedTimestamp": 1631664000000 + } } }, { - "auditHeader": null, "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)", - "entityKeyAspect": null, "changeType": "UPSERT", "aspectName": "operation", "aspect": { - "value": "{\"timestampMillis\": 1629795600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"DELETE\", \"lastUpdatedTimestamp\": 1631664000000}", - "contentType": "application/json" + "json": { + "timestampMillis": 1631696400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:real_shirshanka", + "operationType": "DELETE", + "lastUpdatedTimestamp": 1631664000000 + } } }, { - "auditHeader": null, "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)", - "entityKeyAspect": null, "changeType": "UPSERT", "aspectName": "datasetUsageStatistics", "aspect": { - "value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select userid from users\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 1, \"userEmail\": \"test-name@acryl.io\"}], \"fieldCounts\": []}", - "contentType": "application/json" + "json": { + "timestampMillis": 1631577600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 1, + "totalSqlQueries": 1, + "topSqlQueries": [ + "select userid from users" + ], + "userCounts": [ + { + "user": "urn:li:corpuser:test-name", + "count": 1, + "userEmail": "test-name@acryl.io" + } + ], + "fieldCounts": [] + } } }, { - "auditHeader": null, "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,db1.schema1.category,PROD)", - "entityKeyAspect": null, "changeType": "UPSERT", "aspectName": "datasetUsageStatistics", "aspect": { - "value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select catid from category\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:real_shirshanka\", \"count\": 1, \"userEmail\": \"real_shirshanka@acryl.io\"}], \"fieldCounts\": []}", - "contentType": "application/json" + "json": { + "timestampMillis": 1631577600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 1, + "totalSqlQueries": 1, + "topSqlQueries": [ + "select catid from category" + ], + "userCounts": [ + { + "user": "urn:li:corpuser:real_shirshanka", + "count": 1, + "userEmail": "real_shirshanka@acryl.io" + } + ], + "fieldCounts": [] + } } }, { - "auditHeader": null, "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)", - "entityKeyAspect": null, "changeType": "UPSERT", "aspectName": "datasetUsageStatistics", "aspect": { - "value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select cost from orders\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:real_shirshanka\", \"count\": 1, \"userEmail\": \"real_shirshanka@acryl.io\"}], \"fieldCounts\": []}", - "contentType": "application/json" + "json": { + "timestampMillis": 1631577600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 1, + "totalSqlQueries": 1, + "topSqlQueries": [ + "select cost from orders" + ], + "userCounts": [ + { + "user": "urn:li:corpuser:real_shirshanka", + "count": 1, + "userEmail": "real_shirshanka@acryl.io" + } + ], + "fieldCounts": [] + } } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py b/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py index ad09464a2a..74eec82b39 100644 --- a/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py +++ b/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py @@ -6,6 +6,7 @@ from unittest.mock import Mock, patch from freezegun import freeze_time +from datahub.emitter.mce_builder import make_dataset_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.sink.file import write_metadata_file from datahub.ingestion.source.redshift.config import RedshiftConfig @@ -21,7 +22,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( ) from tests.test_helpers import mce_helpers -FROZEN_TIME = "2021-08-24 09:00:00" +FROZEN_TIME = "2021-09-15 09:00:00" def test_redshift_usage_config(): @@ -94,6 +95,7 @@ def test_redshift_usage_source(mock_cursor, mock_connection, pytestconfig, tmp_p config=config, connection=mock_connection, report=source_report, + dataset_urn_builder=lambda table: make_dataset_urn("redshift", table), ) all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = { @@ -127,7 +129,7 @@ def test_redshift_usage_source(mock_cursor, mock_connection, pytestconfig, tmp_p ] }, } - mwus = usage_extractor.generate_usage(all_tables=all_tables) + mwus = usage_extractor.get_usage_workunits(all_tables=all_tables) metadata: List[ Union[ MetadataChangeEvent, @@ -200,6 +202,7 @@ def test_redshift_usage_filtering(mock_cursor, mock_connection, pytestconfig, tm config=config, connection=mock_connection, report=RedshiftReport(), + dataset_urn_builder=lambda table: make_dataset_urn("redshift", table), ) all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = { @@ -215,7 +218,7 @@ def test_redshift_usage_filtering(mock_cursor, mock_connection, pytestconfig, tm ] }, } - mwus = usage_extractor.generate_usage(all_tables=all_tables) + mwus = usage_extractor.get_usage_workunits(all_tables=all_tables) metadata: List[ Union[ MetadataChangeEvent, diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index ebb229d1c7..5b220c18e9 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -1,6 +1,7 @@ import json from datetime import datetime, timezone +from datahub.configuration.time_window_config import BucketDuration from datahub.ingestion.source.snowflake import snowflake_query from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery @@ -247,6 +248,18 @@ def default_query_results(query): # noqa: C901 } for op_idx in range(1, NUM_OPS + 1) ] + elif ( + query + == snowflake_query.SnowflakeQuery.usage_per_object_per_time_bucket_for_time_window( + 1654499820000, + 1654586220000, + use_base_objects=False, + top_n_queries=10, + include_top_n_queries=True, + time_bucket_size=BucketDuration.DAY, + ) + ): + return [] elif query in ( snowflake_query.SnowflakeQuery.table_to_table_lineage_history( 1654499820000, diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json index 2e2b0be8ca..5fe9cb97e6 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json @@ -5108,6 +5108,342 @@ "runId": "snowflake-2022_06_07-17_00_00" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_8,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_10,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_2,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_6,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_9,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_5,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_7,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "json": { + "timestampMillis": 1654473600000, + "eventGranularity": { + "unit": "DAY", + "multiple": 1 + }, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "uniqueUserCount": 0, + "totalSqlQueries": 0, + "topSqlQueries": [], + "userCounts": [], + "fieldCounts": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-2022_06_07-17_00_00" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py index 62c4782120..4521f4091a 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py @@ -121,7 +121,7 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph): include_technical_schema=True, include_table_lineage=True, include_view_lineage=True, - include_usage_stats=False, + include_usage_stats=True, use_legacy_lineage_method=False, validate_upstreams_against_patterns=False, include_operational_stats=True, diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_legacy_lineage.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_legacy_lineage.py index 55dde8514d..43b05924ae 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_legacy_lineage.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_legacy_lineage.py @@ -95,7 +95,7 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph): include_technical_schema=True, include_table_lineage=True, include_view_lineage=True, - include_usage_stats=False, + include_usage_stats=True, use_legacy_lineage_method=True, validate_upstreams_against_patterns=False, include_operational_stats=True, diff --git a/metadata-ingestion/tests/performance/test_bigquery_usage.py b/metadata-ingestion/tests/performance/test_bigquery_usage.py index 8b347ec172..7e05ef070b 100644 --- a/metadata-ingestion/tests/performance/test_bigquery_usage.py +++ b/metadata-ingestion/tests/performance/test_bigquery_usage.py @@ -7,6 +7,7 @@ from typing import Iterable, Tuple import humanfriendly import psutil +from datahub.emitter.mce_builder import make_dataset_urn from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_config import ( BigQueryUsageConfig, @@ -44,7 +45,11 @@ def run_test(): ), file_backed_cache_size=1000, ) - usage_extractor = BigQueryUsageExtractor(config, report) + usage_extractor = BigQueryUsageExtractor( + config, + report, + lambda ref: make_dataset_urn("bigquery", str(ref.table_identifier)), + ) report.set_ingestion_stage("All", "Event Generation") num_projects = 100 @@ -70,7 +75,7 @@ def run_test(): report.set_ingestion_stage("All", "Event Ingestion") with PerfTimer() as timer: - workunits = usage_extractor._run(events, table_refs) + workunits = usage_extractor._get_workunits_internal(events, table_refs) num_workunits, peak_memory_usage = workunit_sink(workunits) report.set_ingestion_stage("All", "Done") print(f"Workunits Generated: {num_workunits}") diff --git a/metadata-ingestion/tests/unit/test_bigquery_usage.py b/metadata-ingestion/tests/unit/test_bigquery_usage.py index 7a55762694..01a7b34f3d 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_usage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_usage.py @@ -1,13 +1,14 @@ import logging import random from datetime import datetime, timedelta, timezone -from typing import cast +from typing import Iterable, cast from unittest.mock import MagicMock, patch import pytest from freezegun import freeze_time from datahub.configuration.time_window_config import BucketDuration +from datahub.emitter.mce_builder import make_dataset_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( @@ -36,6 +37,7 @@ from datahub.metadata.schema_classes import ( from tests.performance.bigquery import generate_events, ref_from_table from tests.performance.data_generation import generate_data, generate_queries from tests.performance.data_model import Container, FieldAccess, Query, Table, View +from tests.test_helpers.mce_helpers import assert_mces_equal PROJECT_1 = "project-1" PROJECT_2 = "project-2" @@ -195,7 +197,37 @@ def config() -> BigQueryV2Config: @pytest.fixture def usage_extractor(config: BigQueryV2Config) -> BigQueryUsageExtractor: report = BigQueryV2Report() - return BigQueryUsageExtractor(config, report) + return BigQueryUsageExtractor( + config, + report, + lambda ref: make_dataset_urn("bigquery", str(ref.table_identifier)), + ) + + +def make_zero_usage_workunit( + table: Table, time: datetime, bucket_duration: BucketDuration = BucketDuration.DAY +) -> MetadataWorkUnit: + return make_usage_workunit( + table=table, + dataset_usage_statistics=DatasetUsageStatisticsClass( + timestampMillis=int(time.timestamp() * 1000), + eventGranularity=TimeWindowSizeClass(unit=bucket_duration, multiple=1), + totalSqlQueries=0, + uniqueUserCount=0, + topSqlQueries=[], + userCounts=[], + fieldCounts=[], + ), + ) + + +def compare_workunits( + output: Iterable[MetadataWorkUnit], expected: Iterable[MetadataWorkUnit] +) -> None: + assert_mces_equal( + [wu.metadata.to_obj() for wu in output], + [wu.metadata.to_obj() for wu in expected], + ) def test_usage_counts_single_bucket_resource_project( @@ -217,8 +249,8 @@ def test_usage_counts_single_bucket_resource_project( proabability_of_project_mismatch=0.5, ) - workunits = usage_extractor._run(events, TABLE_REFS.values()) - assert list(workunits) == [ + workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values()) + expected = [ make_usage_workunit( table=TABLE_1, dataset_usage_statistics=DatasetUsageStatisticsClass( @@ -256,8 +288,11 @@ def test_usage_counts_single_bucket_resource_project( ), ], ), - ) + ), + make_zero_usage_workunit(TABLE_2, TS_1), + make_zero_usage_workunit(VIEW_1, TS_1), ] + compare_workunits(workunits, expected) def test_usage_counts_multiple_buckets_and_resources_view_usage( @@ -292,8 +327,8 @@ def test_usage_counts_multiple_buckets_and_resources_view_usage( proabability_of_project_mismatch=0.5, ) - workunits = usage_extractor._run(events, TABLE_REFS.values()) - assert list(workunits) == [ + workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values()) + expected = [ # TS 1 make_usage_workunit( table=TABLE_1, @@ -493,6 +528,7 @@ def test_usage_counts_multiple_buckets_and_resources_view_usage( ), ), ] + compare_workunits(workunits, expected) assert usage_extractor.report.num_view_query_events == 5 assert usage_extractor.report.num_view_query_events_failed_sql_parsing == 0 assert usage_extractor.report.num_view_query_events_failed_table_identification == 0 @@ -531,8 +567,8 @@ def test_usage_counts_multiple_buckets_and_resources_no_view_usage( proabability_of_project_mismatch=0.5, ) - workunits = usage_extractor._run(events, TABLE_REFS.values()) - assert list(workunits) == [ + workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values()) + expected = [ # TS 1 make_usage_workunit( table=TABLE_1, @@ -723,7 +759,10 @@ def test_usage_counts_multiple_buckets_and_resources_no_view_usage( ], ), ), + make_zero_usage_workunit(VIEW_1, TS_1), + # TS_2 not included as only 1 minute of it was ingested ] + compare_workunits(workunits, expected) assert usage_extractor.report.num_view_query_events == 0 @@ -745,8 +784,24 @@ def test_usage_counts_no_query_event( payload=None, ) ) - workunits = usage_extractor._run([event], [str(ref)]) - assert list(workunits) == [] + workunits = usage_extractor._get_workunits_internal([event], [str(ref)]) + expected = [ + MetadataChangeProposalWrapper( + entityUrn=ref.to_urn("PROD"), + aspect=DatasetUsageStatisticsClass( + timestampMillis=int(TS_1.timestamp() * 1000), + eventGranularity=TimeWindowSizeClass( + unit=BucketDuration.DAY, multiple=1 + ), + totalSqlQueries=0, + uniqueUserCount=0, + topSqlQueries=[], + userCounts=[], + fieldCounts=[], + ), + ).as_workunit() + ] + compare_workunits(workunits, expected) assert not caplog.records @@ -787,8 +842,10 @@ def test_usage_counts_no_columns( ), ] with caplog.at_level(logging.WARNING): - workunits = usage_extractor._run(events, TABLE_REFS.values()) - assert list(workunits) == [ + workunits = usage_extractor._get_workunits_internal( + events, [TABLE_REFS[TABLE_1.name]] + ) + expected = [ make_usage_workunit( table=TABLE_1, dataset_usage_statistics=DatasetUsageStatisticsClass( @@ -810,6 +867,7 @@ def test_usage_counts_no_columns( ), ) ] + compare_workunits(workunits, expected) assert not caplog.records @@ -849,8 +907,8 @@ def test_operational_stats( ) events = generate_events(queries, projects, table_to_project, config=config) - workunits = usage_extractor._run(events, table_refs.values()) - assert list(workunits) == [ + workunits = usage_extractor._get_workunits_internal(events, table_refs.values()) + expected = [ make_operational_workunit( table_refs[query.object_modified.name], OperationClass( @@ -887,6 +945,15 @@ def test_operational_stats( for query in queries if query.object_modified and query.type in OPERATION_STATEMENT_TYPES.values() ] + compare_workunits( + [ + wu + for wu in workunits + if isinstance(wu.metadata, MetadataChangeProposalWrapper) + and isinstance(wu.metadata.aspect, OperationClass) + ], + expected, + ) def test_get_tables_from_query(usage_extractor): diff --git a/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py b/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py index ead3640b0a..6ee1f05f05 100644 --- a/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py +++ b/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py @@ -111,7 +111,9 @@ AND OR protoPayload.metadata.tableDataRead.reason = "JOB" )""" # noqa: W293 - source = BigQueryUsageExtractor(config, BigQueryV2Report()) + source = BigQueryUsageExtractor( + config, BigQueryV2Report(), dataset_urn_builder=lambda _: "" + ) filter: str = source._generate_filter(BQ_AUDIT_V2) assert filter == expected_filter diff --git a/metadata-ingestion/tests/unit/test_cli_logging.py b/metadata-ingestion/tests/unit/test_cli_logging.py index 07d92c04e7..761b72e5e0 100644 --- a/metadata-ingestion/tests/unit/test_cli_logging.py +++ b/metadata-ingestion/tests/unit/test_cli_logging.py @@ -3,10 +3,20 @@ import pathlib import re import click +import pytest from click.testing import CliRunner from datahub.entrypoints import datahub -from datahub.utilities.logging_manager import get_log_buffer +from datahub.utilities.logging_manager import DATAHUB_PACKAGES, get_log_buffer + + +@pytest.fixture(autouse=True, scope="module") +def cleanup(): + """Attempt to clear undo the stateful changes done by invoking `my_logging_fn`, which calls `configure_logging`.""" + yield + for lib in DATAHUB_PACKAGES: + lib_logger = logging.getLogger(lib) + lib_logger.propagate = True @datahub.command() diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index dbf0c3ea50..b6ec6ebce2 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -1,7 +1,12 @@ +import logging +from datetime import datetime from typing import Any, Dict, Iterable, List, Union from unittest.mock import patch +from freezegun import freeze_time + import datahub.metadata.schema_classes as models +from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.emitter.mce_builder import ( make_container_urn, make_dataplatform_instance_urn, @@ -10,6 +15,7 @@ from datahub.emitter.mce_builder import ( from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, + auto_empty_dataset_usage_statistics, auto_status_aspect, auto_workunit, ) @@ -403,3 +409,100 @@ def test_auto_browse_path_v2_dry_run(telemetry_ping_mock): == 0 ) assert telemetry_ping_mock.call_count == 1 + + +@freeze_time("2023-01-02 00:00:00") +def test_auto_empty_dataset_usage_statistics(caplog): + has_urn = make_dataset_urn("my_platform", "has_aspect") + empty_urn = make_dataset_urn("my_platform", "no_aspect") + config = BaseTimeWindowConfig() + wus = [ + MetadataChangeProposalWrapper( + entityUrn=has_urn, + aspect=models.DatasetUsageStatisticsClass( + timestampMillis=int(config.start_time.timestamp() * 1000), + eventGranularity=models.TimeWindowSizeClass( + models.CalendarIntervalClass.DAY + ), + uniqueUserCount=1, + totalSqlQueries=1, + ), + ).as_workunit() + ] + with caplog.at_level(logging.WARNING): + new_wus = list( + auto_empty_dataset_usage_statistics( + wus, + dataset_urns={has_urn, empty_urn}, + config=config, + all_buckets=False, + ) + ) + assert not caplog.records + + assert new_wus == [ + *wus, + MetadataChangeProposalWrapper( + entityUrn=empty_urn, + aspect=models.DatasetUsageStatisticsClass( + timestampMillis=int(datetime(2023, 1, 1).timestamp() * 1000), + eventGranularity=models.TimeWindowSizeClass( + models.CalendarIntervalClass.DAY + ), + uniqueUserCount=0, + totalSqlQueries=0, + topSqlQueries=[], + userCounts=[], + fieldCounts=[], + ), + ).as_workunit(), + ] + + +@freeze_time("2023-01-02 00:00:00") +def test_auto_empty_dataset_usage_statistics_invalid_timestamp(caplog): + urn = make_dataset_urn("my_platform", "my_dataset") + config = BaseTimeWindowConfig() + wus = [ + MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=models.DatasetUsageStatisticsClass( + timestampMillis=0, + eventGranularity=models.TimeWindowSizeClass( + models.CalendarIntervalClass.DAY + ), + uniqueUserCount=1, + totalSqlQueries=1, + ), + ).as_workunit() + ] + with caplog.at_level(logging.WARNING): + new_wus = list( + auto_empty_dataset_usage_statistics( + wus, + dataset_urns={urn}, + config=config, + all_buckets=True, + ) + ) + assert len(caplog.records) == 1 + assert "1970-01-01 00:00:00+00:00" in caplog.records[0].msg + + assert new_wus == [ + *wus, + MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=models.DatasetUsageStatisticsClass( + timestampMillis=int(config.start_time.timestamp() * 1000), + eventGranularity=models.TimeWindowSizeClass( + models.CalendarIntervalClass.DAY + ), + uniqueUserCount=0, + totalSqlQueries=0, + topSqlQueries=[], + userCounts=[], + fieldCounts=[], + ), + changeType=models.ChangeTypeClass.CREATE, + ).as_workunit(), + ]