diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py index e56fcec03c..379a773e24 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py @@ -261,6 +261,8 @@ class QueryEvent: default_dataset: Optional[str] = None numAffectedRows: Optional[int] = None + query_on_view: bool = False + @staticmethod def get_missing_key_entry(entry: AuditLogEntry) -> Optional[str]: return get_first_missing_key( @@ -344,6 +346,8 @@ class QueryEvent: BigQueryTableRef.from_spec_obj(spec).get_sanitized_table_ref() for spec in raw_ref_views ] + query_event.query_on_view = True + # payload query_event.payload = entry.payload if debug_include_full_payloads else None if not query_event.job_name: @@ -420,6 +424,8 @@ class QueryEvent: BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref() for spec in raw_ref_views ] + query_event.query_on_view = True + # payload query_event.payload = payload if debug_include_full_payloads else None @@ -487,6 +493,8 @@ class QueryEvent: BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref() for spec in raw_ref_views ] + query_event.query_on_view = True + # payload query_event.payload = payload if debug_include_full_payloads else None @@ -519,6 +527,8 @@ class ReadEvent: payload: Any + from_query: bool = False + # We really should use composition here since the query isn't actually # part of the read event, but this solution is just simpler. # query: Optional["QueryEvent"] = None # populated via join @@ -582,6 +592,27 @@ class ReadEvent: ) return readEvent + @classmethod + def from_query_event( + cls, + read_resource: BigQueryTableRef, + query_event: QueryEvent, + debug_include_full_payloads: bool = False, + ) -> "ReadEvent": + + readEvent = ReadEvent( + actor_email=query_event.actor_email, + timestamp=query_event.timestamp, + resource=read_resource, + fieldsRead=[], + readReason="JOB", + jobName=query_event.job_name, + payload=query_event.payload if debug_include_full_payloads else None, + from_query=True, + ) + + return readEvent + @classmethod def from_exported_bigquery_audit_metadata( cls, row: BigQueryAuditMetadata, debug_include_full_payloads: bool = False diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index f6dd66b668..f1023fa6c8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -28,6 +28,11 @@ class BigQueryUsageConfig(BaseUsageConfig): description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.", ) + apply_view_usage_to_tables: bool = Field( + default=False, + description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies usage to views / tables mentioned in the query. If set to True, usage is applied to base tables only.", + ) + class BigQueryV2Config( BigQueryBaseConfig, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 89f2218372..479fc69806 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -68,6 +68,9 @@ class BigQueryV2Report(ProfilingSqlReport): total_query_log_entries: int = 0 num_read_events: int = 0 num_query_events: int = 0 + num_view_query_events: int = 0 + num_view_query_events_failed_sql_parsing: int = 0 + num_view_query_events_failed_table_identification: int = 0 num_filtered_read_events: int = 0 num_filtered_query_events: int = 0 num_usage_query_hash_collisions: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index 38aab3162d..41abb6f26d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -36,6 +36,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( AuditEvent, AuditLogEntry, BigQueryAuditMetadata, + BigqueryTableIdentifier, BigQueryTableRef, QueryEvent, ReadEvent, @@ -53,6 +54,7 @@ from datahub.ingestion.source.usage.usage_common import ( make_usage_workunit, ) from datahub.metadata.schema_classes import OperationClass, OperationTypeClass +from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedDict from datahub.utilities.perf_timer import PerfTimer @@ -142,7 +144,7 @@ def bigquery_audit_metadata_query_template( AND ( ( - JSON_EXTRACT_SCALAR(protopayload_auditlog.methodName) IN + protopayload_auditlog.methodName IN ( "google.cloud.bigquery.v2.JobService.Query", "google.cloud.bigquery.v2.JobService.InsertJob" @@ -184,6 +186,7 @@ class BigQueryUsageState(Closeable): e.timestamp, config.bucket_duration ), "user": lambda e: e.actor_email, + "from_query": lambda e: int(e.from_query), }, cache_max_size=config.file_backed_cache_size, # Evict entire cache to reduce db calls. @@ -198,6 +201,7 @@ class BigQueryUsageState(Closeable): extra_columns={ "query": lambda e: e.query, "is_read": lambda e: int(e.statementType in READ_STATEMENT_TYPES), + "on_view": lambda e: int(e.query_on_view), }, cache_max_size=config.file_backed_cache_size, cache_eviction_batch_size=max(int(config.file_backed_cache_size * 0.9), 1), @@ -328,6 +332,20 @@ class BigQueryUsageState(Closeable): column_freq=json.loads(row["column_freq"] or "[]"), ) + def delete_original_read_events_for_view_query_events(self) -> None: + self.read_events.sql_query( + """ + DELETE FROM + read_events + WHERE + read_events.from_query = 0 AND + read_events.name in ( + SELECT q.key FROM query_events q WHERE q.on_view = 1 + ) + """, + refs=[self.query_events], + ) + def report_disk_usage(self, report: BigQueryV2Report) -> None: report.usage_state_size = str( { @@ -342,7 +360,7 @@ class BigQueryUsageState(Closeable): class BigQueryUsageExtractor: """ This plugin extracts the following: - * Statistics on queries issued and tables and columns accessed (excludes views) + * Statistics on queries issued and tables and columns accessed * Aggregation of these statistics into buckets, by day or hour granularity :::note @@ -389,6 +407,26 @@ class BigQueryUsageExtractor: logger.error("Error processing usage", exc_info=True) self.report.report_warning("usage-ingestion", str(e)) + def generate_read_events_from_query( + self, query_event_on_view: QueryEvent + ) -> Iterable[AuditEvent]: + try: + tables = self.get_tables_from_query( + query_event_on_view.project_id, + query_event_on_view.query, + ) + assert tables is not None and len(tables) != 0 + for table in tables: + yield AuditEvent.create( + ReadEvent.from_query_event(table, query_event_on_view) + ) + except Exception as ex: + logger.debug( + f"Generating read events failed for this query on view: {query_event_on_view.query}. " + f"Usage won't be added. The error was {ex}." + ) + self.report.num_view_query_events_failed_sql_parsing += 1 + def _ingest_events( self, events: Iterable[AuditEvent], @@ -397,8 +435,33 @@ class BigQueryUsageExtractor: ) -> None: """Read log and store events in usage_state.""" num_aggregated = 0 + num_generated = 0 for audit_event in events: try: + # Note for View Usage: + # If Query Event references a view, bigquery audit logs do not contain Read Event for view + # in its audit logs, but only for it base tables. To extract usage for views, we parse the + # sql query to find bigquery tables and views read in the query and generate Read Events + # for them in our code (`from_query`=True). For such Query Events, we delete the original + # Read Events coming from Bigquery audit logs and keep only generated ones. + + # Caveats of SQL parsing approach used here: + # 1. If query parsing fails, usage for such query is not considered/counted. + # 2. Due to limitations of query parsing, field level usage is not available. + # To limit the impact, we use query parsing only for those queries that reference at least + # one view. For all other queries, field level usage is available through bigquery audit logs. + if ( + audit_event.query_event + and audit_event.query_event.query_on_view + and not self.config.usage.apply_view_usage_to_tables + ): + query_event = audit_event.query_event + self.report.num_view_query_events += 1 + + for new_event in self.generate_read_events_from_query(query_event): + num_generated += self._store_usage_event( + new_event, usage_state, table_refs + ) num_aggregated += self._store_usage_event( audit_event, usage_state, table_refs ) @@ -409,6 +472,10 @@ class BigQueryUsageExtractor: self._report_error("store-event", e) logger.info(f"Total number of events aggregated = {num_aggregated}.") + if self.report.num_view_query_events > 0: + logger.info(f"Total number of read events generated = {num_generated}.") + usage_state.delete_original_read_events_for_view_query_events() + def _generate_operational_workunits( self, usage_state: BigQueryUsageState, table_refs: Collection[str] ) -> Iterable[MetadataWorkUnit]: @@ -903,6 +970,56 @@ class BigQueryUsageExtractor: f"log-parse-{project_id}", e, group="usage-log-parse" ) + def get_tables_from_query( + self, default_project: str, query: str + ) -> Optional[List[BigQueryTableRef]]: + """ + This method attempts to parse bigquery objects read in the query + """ + if not query: + return None + + parsed_tables = set() + try: + parser = BigQuerySQLParser( + query, + self.config.sql_parser_use_external_process, + use_raw_names=self.config.lineage_sql_parser_use_raw_names, + ) + tables = parser.get_tables() + except Exception as ex: + logger.debug( + f"Sql parsing failed on this query on view: {query}. " + f"Usage won't be added. The error was {ex}." + ) + return None + + for table in tables: + parts = table.split(".") + if len(parts) == 2: + parsed_tables.add( + BigQueryTableRef( + BigqueryTableIdentifier( + project_id=default_project, dataset=parts[0], table=parts[1] + ) + ).get_sanitized_table_ref() + ) + elif len(parts) == 3: + parsed_tables.add( + BigQueryTableRef( + BigqueryTableIdentifier( + project_id=parts[0], dataset=parts[1], table=parts[2] + ) + ).get_sanitized_table_ref() + ) + else: + logger.debug( + f"Invalid table identifier {table} when parsing query on view {query}" + ) + self.report.num_view_query_events_failed_table_identification += 1 + + return list(parsed_tables) + def _report_error( self, label: str, e: Exception, group: Optional[str] = None ) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py b/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py index 0930a75eae..747bde0a8b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py @@ -15,5 +15,5 @@ class SnowflakeUsageConfig(BaseUsageConfig): ) apply_view_usage_to_tables: bool = pydantic.Field( default=False, - description="Allow/deny patterns for views in snowflake dataset names.", + description="Whether to apply view's usage to its base tables. If set to True, usage is applied to base tables only.", ) diff --git a/metadata-ingestion/tests/performance/bigquery.py b/metadata-ingestion/tests/performance/bigquery.py index bd807d93fc..d9b5571a80 100644 --- a/metadata-ingestion/tests/performance/bigquery.py +++ b/metadata-ingestion/tests/performance/bigquery.py @@ -2,7 +2,7 @@ import dataclasses import random import uuid from collections import defaultdict -from typing import Dict, Iterable, List +from typing import Dict, Iterable, List, cast from typing_extensions import get_args @@ -15,7 +15,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( ) from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config from datahub.ingestion.source.bigquery_v2.usage import OPERATION_STATEMENT_TYPES -from tests.performance.data_model import Query, StatementType, Table +from tests.performance.data_model import Query, StatementType, Table, View # https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.TableDataRead.Reason READ_REASONS = [ @@ -55,6 +55,14 @@ def generate_events( else random.choice(projects) ) job_name = str(uuid.uuid4()) + referencedViews = list( + dict.fromkeys( + ref_from_table(field.table, table_to_project) + for field in query.fields_accessed + if field.table.is_view() + ) + ) + yield AuditEvent.create( QueryEvent( job_name=job_name, @@ -72,24 +80,34 @@ def generate_events( for field in query.fields_accessed if not field.table.is_view() ) - ), - referencedViews=list( - dict.fromkeys( - ref_from_table(field.table, table_to_project) + ) + + list( + dict.fromkeys( # Preserve order + ref_from_table(parent, table_to_project) for field in query.fields_accessed if field.table.is_view() + for parent in cast(View, field.table).parents ) ), + referencedViews=referencedViews, payload=dataclasses.asdict(query) if config.debug_include_full_payloads else None, + query_on_view=True if referencedViews else False, ) ) - table_accesses = defaultdict(list) + table_accesses = defaultdict(set) for field in query.fields_accessed: - table_accesses[ref_from_table(field.table, table_to_project)].append( - field.column - ) + if not field.table.is_view(): + table_accesses[ref_from_table(field.table, table_to_project)].add( + field.column + ) + else: + # assuming that same fields are accessed in parent tables + for parent in cast(View, field.table).parents: + table_accesses[ref_from_table(parent, table_to_project)].add( + field.column + ) for ref, columns in table_accesses.items(): yield AuditEvent.create( @@ -98,7 +116,7 @@ def generate_events( timestamp=query.timestamp, actor_email=query.actor, resource=ref, - fieldsRead=columns, + fieldsRead=list(columns), readReason=random.choice(READ_REASONS), payload=dataclasses.asdict(query) if config.debug_include_full_payloads diff --git a/metadata-ingestion/tests/performance/test_bigquery_usage.py b/metadata-ingestion/tests/performance/test_bigquery_usage.py index 1e430d18f9..8b347ec172 100644 --- a/metadata-ingestion/tests/performance/test_bigquery_usage.py +++ b/metadata-ingestion/tests/performance/test_bigquery_usage.py @@ -37,7 +37,11 @@ def run_test(): config = BigQueryV2Config( start_time=seed_metadata.start_time, end_time=seed_metadata.end_time, - usage=BigQueryUsageConfig(include_top_n_queries=True, top_n_queries=10), + usage=BigQueryUsageConfig( + include_top_n_queries=True, + top_n_queries=10, + apply_view_usage_to_tables=True, + ), file_backed_cache_size=1000, ) usage_extractor = BigQueryUsageExtractor(config, report) diff --git a/metadata-ingestion/tests/unit/test_bigquery_usage.py b/metadata-ingestion/tests/unit/test_bigquery_usage.py index 08ed0df44f..7a55762694 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_usage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_usage.py @@ -1,6 +1,7 @@ import logging import random from datetime import datetime, timedelta, timezone +from typing import cast from unittest.mock import MagicMock, patch import pytest @@ -122,7 +123,7 @@ def query_tables_1_and_2(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Qu def query_view_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query: return Query( - text="SELECT * FROM view_1", + text="SELECT * FROM project-1.database_1.view_1", type="SELECT", timestamp=timestamp, actor=actor, @@ -134,6 +135,27 @@ def query_view_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query: ) +def query_view_1_and_table_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query: + return Query( + text="""SELECT v.id, v.name, v.total, t.name as name1 + FROM + `project-1.database_1.view_1` as v + inner join + `project-1.database_1.table_1` as t + on + v.id=t.id""", + type="SELECT", + timestamp=timestamp, + actor=actor, + fields_accessed=[ + FieldAccess("id", VIEW_1), + FieldAccess("name", VIEW_1), + FieldAccess("total", VIEW_1), + FieldAccess("name", TABLE_1), + ], + ) + + def make_usage_workunit( table: Table, dataset_usage_statistics: DatasetUsageStatisticsClass ) -> MetadataWorkUnit: @@ -238,7 +260,7 @@ def test_usage_counts_single_bucket_resource_project( ] -def test_usage_counts_multiple_buckets_and_resources( +def test_usage_counts_multiple_buckets_and_resources_view_usage( usage_extractor: BigQueryUsageExtractor, config: BigQueryV2Config, ) -> None: @@ -260,6 +282,7 @@ def test_usage_counts_multiple_buckets_and_resources( query_tables_1_and_2(TS_2, ACTOR_2), query_table_2(TS_2, ACTOR_2), query_view_1(TS_2, ACTOR_1), + query_view_1_and_table_1(TS_2, ACTOR_1), ] events = generate_events( queries, @@ -338,20 +361,7 @@ def test_usage_counts_multiple_buckets_and_resources( userEmail=ACTOR_1, ), ], - fieldCounts=[ - DatasetFieldUsageCountsClass( - fieldPath="id", - count=3, - ), - DatasetFieldUsageCountsClass( - fieldPath="name", - count=3, - ), - DatasetFieldUsageCountsClass( - fieldPath="total", - count=3, - ), - ], + fieldCounts=[], ), ), make_usage_workunit( @@ -393,11 +403,12 @@ def test_usage_counts_multiple_buckets_and_resources( eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), - totalSqlQueries=4, + totalSqlQueries=5, topSqlQueries=[ query_table_1_a().text, query_tables_1_and_2().text, query_table_1_b().text, + query_view_1_and_table_1().text, ], uniqueUserCount=2, userCounts=[ @@ -408,7 +419,7 @@ def test_usage_counts_multiple_buckets_and_resources( ), DatasetUserUsageCountsClass( user=ACTOR_1_URN, - count=1, + count=2, userEmail=ACTOR_1, ), ], @@ -435,32 +446,17 @@ def test_usage_counts_multiple_buckets_and_resources( eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), - totalSqlQueries=1, - topSqlQueries=[ - query_view_1().text, - ], + totalSqlQueries=2, + topSqlQueries=[query_view_1().text, query_view_1_and_table_1().text], uniqueUserCount=1, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, - count=1, + count=2, userEmail=ACTOR_1, ), ], - fieldCounts=[ - DatasetFieldUsageCountsClass( - fieldPath="id", - count=1, - ), - DatasetFieldUsageCountsClass( - fieldPath="name", - count=1, - ), - DatasetFieldUsageCountsClass( - fieldPath="total", - count=1, - ), - ], + fieldCounts=[], ), ), make_usage_workunit( @@ -497,6 +493,238 @@ def test_usage_counts_multiple_buckets_and_resources( ), ), ] + assert usage_extractor.report.num_view_query_events == 5 + assert usage_extractor.report.num_view_query_events_failed_sql_parsing == 0 + assert usage_extractor.report.num_view_query_events_failed_table_identification == 0 + + +def test_usage_counts_multiple_buckets_and_resources_no_view_usage( + usage_extractor: BigQueryUsageExtractor, + config: BigQueryV2Config, +) -> None: + config.usage.apply_view_usage_to_tables = True + queries = [ + # TS 1 + query_table_1_a(TS_1, ACTOR_1), + query_table_1_a(TS_1, ACTOR_2), + query_table_1_b(TS_1, ACTOR_1), + query_tables_1_and_2(TS_1, ACTOR_1), + query_tables_1_and_2(TS_1, ACTOR_1), + query_tables_1_and_2(TS_1, ACTOR_1), + query_view_1(TS_1, ACTOR_1), + query_view_1(TS_1, ACTOR_2), + query_view_1(TS_1, ACTOR_2), + # TS 2 + query_table_1_a(TS_2, ACTOR_1), + query_table_1_a(TS_2, ACTOR_2), + query_table_1_b(TS_2, ACTOR_2), + query_tables_1_and_2(TS_2, ACTOR_2), + query_table_2(TS_2, ACTOR_2), + query_view_1(TS_2, ACTOR_1), + query_view_1_and_table_1(TS_2, ACTOR_1), + ] + events = generate_events( + queries, + [PROJECT_1, PROJECT_2], + TABLE_TO_PROJECT, + config=config, + proabability_of_project_mismatch=0.5, + ) + + workunits = usage_extractor._run(events, TABLE_REFS.values()) + assert list(workunits) == [ + # TS 1 + make_usage_workunit( + table=TABLE_1, + dataset_usage_statistics=DatasetUsageStatisticsClass( + timestampMillis=int(TS_1.timestamp() * 1000), + eventGranularity=TimeWindowSizeClass( + unit=BucketDuration.DAY, multiple=1 + ), + totalSqlQueries=9, + topSqlQueries=[ + query_tables_1_and_2().text, + query_view_1().text, + query_table_1_a().text, + query_table_1_b().text, + ], + uniqueUserCount=2, + userCounts=[ + DatasetUserUsageCountsClass( + user=ACTOR_1_URN, + count=6, + userEmail=ACTOR_1, + ), + DatasetUserUsageCountsClass( + user=ACTOR_2_URN, + count=3, + userEmail=ACTOR_2, + ), + ], + fieldCounts=[ + DatasetFieldUsageCountsClass( + fieldPath="name", + count=9, + ), + DatasetFieldUsageCountsClass( + fieldPath="id", + count=8, + ), + DatasetFieldUsageCountsClass( + fieldPath="total", + count=3, + ), + DatasetFieldUsageCountsClass( + fieldPath="age", + count=2, + ), + ], + ), + ), + make_usage_workunit( + table=TABLE_2, + dataset_usage_statistics=DatasetUsageStatisticsClass( + timestampMillis=int(TS_1.timestamp() * 1000), + eventGranularity=TimeWindowSizeClass( + unit=BucketDuration.DAY, multiple=1 + ), + totalSqlQueries=6, + topSqlQueries=[query_tables_1_and_2().text, query_view_1().text], + uniqueUserCount=2, + userCounts=[ + DatasetUserUsageCountsClass( + user=ACTOR_1_URN, + count=4, + userEmail=ACTOR_1, + ), + DatasetUserUsageCountsClass( + user=ACTOR_2_URN, + count=2, + userEmail=ACTOR_2, + ), + ], + fieldCounts=[ + DatasetFieldUsageCountsClass( + fieldPath="id", + count=6, + ), + DatasetFieldUsageCountsClass( + fieldPath="name", + count=3, + ), + DatasetFieldUsageCountsClass( + fieldPath="total", + count=3, + ), + DatasetFieldUsageCountsClass( + fieldPath="value", + count=3, + ), + ], + ), + ), + # TS 2 + make_usage_workunit( + table=TABLE_1, + dataset_usage_statistics=DatasetUsageStatisticsClass( + timestampMillis=int(TS_2.timestamp() * 1000), + eventGranularity=TimeWindowSizeClass( + unit=BucketDuration.DAY, multiple=1 + ), + totalSqlQueries=6, + topSqlQueries=[ + query_table_1_a().text, + query_tables_1_and_2().text, + query_view_1().text, + query_table_1_b().text, + query_view_1_and_table_1().text, + ], + uniqueUserCount=2, + userCounts=[ + DatasetUserUsageCountsClass( + user=ACTOR_1_URN, + count=3, + userEmail=ACTOR_1, + ), + DatasetUserUsageCountsClass( + user=ACTOR_2_URN, + count=3, + userEmail=ACTOR_2, + ), + ], + fieldCounts=[ + DatasetFieldUsageCountsClass( + fieldPath="name", + count=6, + ), + DatasetFieldUsageCountsClass( + fieldPath="id", + count=5, + ), + DatasetFieldUsageCountsClass( + fieldPath="age", + count=2, + ), + DatasetFieldUsageCountsClass( + fieldPath="total", + count=2, + ), + ], + ), + ), + make_usage_workunit( + table=TABLE_2, + dataset_usage_statistics=DatasetUsageStatisticsClass( + timestampMillis=int(TS_2.timestamp() * 1000), + eventGranularity=TimeWindowSizeClass( + unit=BucketDuration.DAY, multiple=1 + ), + totalSqlQueries=4, + topSqlQueries=[ + query_tables_1_and_2().text, + query_view_1().text, + query_table_2().text, + query_view_1_and_table_1().text, + ], + uniqueUserCount=2, + userCounts=[ + DatasetUserUsageCountsClass( + user=ACTOR_1_URN, + count=2, + userEmail=ACTOR_1, + ), + DatasetUserUsageCountsClass( + user=ACTOR_2_URN, + count=2, + userEmail=ACTOR_2, + ), + ], + fieldCounts=[ + DatasetFieldUsageCountsClass( + fieldPath="id", + count=4, + ), + DatasetFieldUsageCountsClass( + fieldPath="name", + count=2, + ), + DatasetFieldUsageCountsClass( + fieldPath="total", + count=2, + ), + DatasetFieldUsageCountsClass( + fieldPath="value", + count=2, + ), + DatasetFieldUsageCountsClass( + fieldPath="table_1_id", + count=1, + ), + ], + ), + ), + ] + assert usage_extractor.report.num_view_query_events == 0 def test_usage_counts_no_query_event( @@ -593,6 +821,7 @@ def test_operational_stats( config: BigQueryV2Config, ) -> None: mock.return_value = [] + config.usage.apply_view_usage_to_tables = True config.usage.include_operational_stats = True seed_metadata = generate_data( num_containers=3, @@ -642,9 +871,53 @@ def test_operational_stats( for field in query.fields_accessed if not field.table.is_view() ) + ) + + list( + dict.fromkeys( # Preserve order + BigQueryTableRef.from_string_name( + table_refs[parent.name] + ).to_urn("PROD") + for field in query.fields_accessed + if field.table.is_view() + for parent in cast(View, field.table).parents + ) ), ), ) for query in queries if query.object_modified and query.type in OPERATION_STATEMENT_TYPES.values() ] + + +def test_get_tables_from_query(usage_extractor): + assert usage_extractor.get_tables_from_query( + PROJECT_1, "SELECT * FROM project-1.database_1.view_1" + ) == [ + BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")) + ] + + assert usage_extractor.get_tables_from_query( + PROJECT_1, "SELECT * FROM database_1.view_1" + ) == [ + BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")) + ] + + assert sorted( + usage_extractor.get_tables_from_query( + PROJECT_1, + "SELECT v.id, v.name, v.total, t.name as name1 FROM database_1.view_1 as v inner join database_1.table_1 as t on v.id=t.id", + ) + ) == [ + BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "table_1")), + BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")), + ] + + assert sorted( + usage_extractor.get_tables_from_query( + PROJECT_1, + "CREATE TABLE database_1.new_table AS SELECT v.id, v.name, v.total, t.name as name1 FROM database_1.view_1 as v inner join database_1.table_1 as t on v.id=t.id", + ) + ) == [ + BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "table_1")), + BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")), + ]