import logging import random from datetime import datetime, timedelta, timezone from typing import Iterable from unittest.mock import MagicMock, patch import pytest from freezegun import freeze_time from datahub.configuration.time_window_config import BucketDuration from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( AuditEvent, BigqueryTableIdentifier, BigQueryTableRef, QueryEvent, ReadEvent, ) from datahub.ingestion.source.bigquery_v2.bigquery_config import ( BigQueryUsageConfig, BigQueryV2Config, ) from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.common import BigQueryIdentifierBuilder from datahub.ingestion.source.bigquery_v2.usage import ( OPERATION_STATEMENT_TYPES, BigQueryUsageExtractor, ) from datahub.metadata.schema_classes import ( DatasetFieldUsageCountsClass, DatasetUsageStatisticsClass, DatasetUserUsageCountsClass, OperationClass, TimeWindowSizeClass, ) from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.testing.compare_metadata_json import diff_metadata_json from tests.performance.bigquery.bigquery_events import generate_events, ref_from_table from tests.performance.data_generation import generate_data, generate_queries from tests.performance.data_model import Container, FieldAccess, Query, Table, View PROJECT_1 = "project-1" PROJECT_2 = "project-2" ACTOR_1, ACTOR_1_URN = "a@acryl.io", "urn:li:corpuser:a" ACTOR_2, ACTOR_2_URN = "b@acryl.io", "urn:li:corpuser:b" DATABASE_1 = Container("database_1") DATABASE_2 = Container("database_2") TABLE_1 = Table("table_1", DATABASE_1, columns=["id", "name", "age"], upstreams=[]) TABLE_2 = Table( "table_2", DATABASE_1, columns=["id", "table_1_id", "value"], upstreams=[] ) VIEW_1 = View( name="view_1", container=DATABASE_1, columns=["id", "name", "total"], definition="VIEW DEFINITION 1", upstreams=[TABLE_1, TABLE_2], ) ALL_TABLES = [TABLE_1, TABLE_2, VIEW_1] TABLE_TO_PROJECT = { TABLE_1.name: PROJECT_1, TABLE_2.name: PROJECT_2, VIEW_1.name: PROJECT_1, } TABLE_REFS = { table.name: str(ref_from_table(table, TABLE_TO_PROJECT)) for table in ALL_TABLES } FROZEN_TIME = datetime(year=2023, month=2, day=1, tzinfo=timezone.utc) TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc) TS_2 = datetime(year=2023, month=1, day=2, tzinfo=timezone.utc) def query_table_1_a(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query: return Query( text="SELECT * FROM table_1", type="SELECT", timestamp=timestamp, actor=actor, fields_accessed=[ FieldAccess("id", TABLE_1), FieldAccess("name", TABLE_1), FieldAccess("age", TABLE_1), ], ) def query_table_1_b(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query: return Query( text="SELECT name FROM table_1", type="SELECT", timestamp=timestamp, actor=actor, fields_accessed=[FieldAccess("name", TABLE_1)], ) def query_table_2(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query: return Query( text="SELECT * FROM table_2", type="SELECT", timestamp=timestamp, actor=actor, fields_accessed=[ FieldAccess("id", TABLE_2), FieldAccess("table_1_id", TABLE_2), FieldAccess("value", TABLE_2), ], ) def query_tables_1_and_2(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query: return Query( text="SELECT t1.id, t1.name, t2.id, t2.value FROM table_1 t1 JOIN table_2 t2 ON table_1.id = table_2.table_1_id", type="SELECT", timestamp=timestamp, actor=actor, fields_accessed=[ FieldAccess("id", TABLE_1), FieldAccess("name", TABLE_1), FieldAccess("id", TABLE_2), FieldAccess("value", TABLE_2), ], ) def query_view_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query: return Query( text="SELECT * FROM project-1.database_1.view_1", type="SELECT", timestamp=timestamp, actor=actor, fields_accessed=[ FieldAccess("id", VIEW_1), FieldAccess("name", VIEW_1), FieldAccess("total", VIEW_1), ], ) def query_view_1_and_table_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query: return Query( text="""SELECT v.id, v.name, v.total, t.name as name1 FROM `project-1.database_1.view_1` as v inner join `project-1.database_1.table_1` as t on v.id=t.id""", type="SELECT", timestamp=timestamp, actor=actor, fields_accessed=[ FieldAccess("id", VIEW_1), FieldAccess("name", VIEW_1), FieldAccess("total", VIEW_1), FieldAccess("name", TABLE_1), ], ) def make_usage_workunit( table: Table, dataset_usage_statistics: DatasetUsageStatisticsClass, identifiers: BigQueryIdentifierBuilder, ) -> MetadataWorkUnit: resource = BigQueryTableRef.from_string_name(TABLE_REFS[table.name]) return MetadataChangeProposalWrapper( entityUrn=identifiers.gen_dataset_urn_from_raw_ref(resource), aspectName=dataset_usage_statistics.get_aspect_name(), aspect=dataset_usage_statistics, ).as_workunit() def make_operational_workunit( resource_urn: str, operation: OperationClass ) -> MetadataWorkUnit: return MetadataChangeProposalWrapper( entityUrn=resource_urn, aspectName=operation.get_aspect_name(), aspect=operation, ).as_workunit() @pytest.fixture def config() -> BigQueryV2Config: return BigQueryV2Config( file_backed_cache_size=1, start_time=TS_1, end_time=TS_2 + timedelta(minutes=1), usage=BigQueryUsageConfig( include_top_n_queries=True, top_n_queries=30, bucket_duration=BucketDuration.DAY, include_operational_stats=False, ), ) @pytest.fixture def usage_extractor(config: BigQueryV2Config) -> BigQueryUsageExtractor: report = BigQueryV2Report() return BigQueryUsageExtractor( config, report, schema_resolver=SchemaResolver(platform="bigquery"), identifiers=BigQueryIdentifierBuilder(config, report), ) def make_zero_usage_workunit( table: Table, time: datetime, identifiers: BigQueryIdentifierBuilder, bucket_duration: BucketDuration = BucketDuration.DAY, ) -> MetadataWorkUnit: return make_usage_workunit( table=table, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(time.timestamp() * 1000), eventGranularity=TimeWindowSizeClass(unit=bucket_duration, multiple=1), totalSqlQueries=0, uniqueUserCount=0, topSqlQueries=[], userCounts=[], fieldCounts=[], ), identifiers=identifiers, ) def compare_workunits( output: Iterable[MetadataWorkUnit], expected: Iterable[MetadataWorkUnit] ) -> None: assert not diff_metadata_json( [wu.metadata.to_obj() for wu in output], [wu.metadata.to_obj() for wu in expected], ) def test_usage_counts_single_bucket_resource_project( usage_extractor: BigQueryUsageExtractor, config: BigQueryV2Config, ) -> None: queries = [ query_table_1_a(TS_1, ACTOR_1), query_table_1_a(TS_1, ACTOR_1), query_table_1_a(TS_1, ACTOR_2), query_table_1_b(TS_1, ACTOR_1), query_table_1_b(TS_1, ACTOR_2), ] events = generate_events( queries, [PROJECT_1, PROJECT_2], TABLE_TO_PROJECT, config=config, proabability_of_project_mismatch=0.5, ) workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values()) expected = [ make_usage_workunit( table=TABLE_1, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=len(queries), topSqlQueries=[query_table_1_a().text, query_table_1_b().text], uniqueUserCount=2, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=3, userEmail=ACTOR_1, ), DatasetUserUsageCountsClass( user=ACTOR_2_URN, count=2, userEmail=ACTOR_2, ), ], fieldCounts=[ DatasetFieldUsageCountsClass( fieldPath="name", count=5, ), DatasetFieldUsageCountsClass( fieldPath="age", count=3, ), DatasetFieldUsageCountsClass( fieldPath="id", count=3, ), ], ), identifiers=usage_extractor.identifiers, ), make_zero_usage_workunit(TABLE_2, TS_1, usage_extractor.identifiers), make_zero_usage_workunit(VIEW_1, TS_1, usage_extractor.identifiers), ] compare_workunits(workunits, expected) def test_usage_counts_multiple_buckets_and_resources_view_usage( usage_extractor: BigQueryUsageExtractor, config: BigQueryV2Config, ) -> None: queries = [ # TS 1 query_table_1_a(TS_1, ACTOR_1), query_table_1_a(TS_1, ACTOR_2), query_table_1_b(TS_1, ACTOR_1), query_tables_1_and_2(TS_1, ACTOR_1), query_tables_1_and_2(TS_1, ACTOR_1), query_tables_1_and_2(TS_1, ACTOR_1), query_view_1(TS_1, ACTOR_1), query_view_1(TS_1, ACTOR_2), query_view_1(TS_1, ACTOR_2), # TS 2 query_table_1_a(TS_2, ACTOR_1), query_table_1_a(TS_2, ACTOR_2), query_table_1_b(TS_2, ACTOR_2), query_tables_1_and_2(TS_2, ACTOR_2), query_table_2(TS_2, ACTOR_2), query_view_1(TS_2, ACTOR_1), query_view_1_and_table_1(TS_2, ACTOR_1), ] events = generate_events( queries, [PROJECT_1, PROJECT_2], TABLE_TO_PROJECT, config=config, proabability_of_project_mismatch=0.5, ) workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values()) expected = [ # TS 1 make_usage_workunit( table=TABLE_1, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=6, topSqlQueries=[ query_tables_1_and_2().text, query_table_1_a().text, query_table_1_b().text, ], uniqueUserCount=2, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=5, userEmail=ACTOR_1, ), DatasetUserUsageCountsClass( user=ACTOR_2_URN, count=1, userEmail=ACTOR_2, ), ], fieldCounts=[ DatasetFieldUsageCountsClass( fieldPath="name", count=6, ), DatasetFieldUsageCountsClass( fieldPath="id", count=5, ), DatasetFieldUsageCountsClass( fieldPath="age", count=2, ), ], ), identifiers=usage_extractor.identifiers, ), make_usage_workunit( table=VIEW_1, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=3, topSqlQueries=[ query_view_1().text, ], uniqueUserCount=2, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_2_URN, count=2, userEmail=ACTOR_2, ), DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=1, userEmail=ACTOR_1, ), ], fieldCounts=[], ), identifiers=usage_extractor.identifiers, ), make_usage_workunit( table=TABLE_2, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=3, topSqlQueries=[ query_tables_1_and_2().text, ], uniqueUserCount=1, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=3, userEmail=ACTOR_1, ) ], fieldCounts=[ DatasetFieldUsageCountsClass( fieldPath="id", count=3, ), DatasetFieldUsageCountsClass( fieldPath="value", count=3, ), ], ), identifiers=usage_extractor.identifiers, ), # TS 2 make_usage_workunit( table=TABLE_1, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_2.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=5, topSqlQueries=[ query_table_1_a().text, query_tables_1_and_2().text, query_table_1_b().text, query_view_1_and_table_1().text, ], uniqueUserCount=2, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_2_URN, count=3, userEmail=ACTOR_2, ), DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=2, userEmail=ACTOR_1, ), ], fieldCounts=[ DatasetFieldUsageCountsClass( fieldPath="name", count=4, ), DatasetFieldUsageCountsClass( fieldPath="id", count=3, ), DatasetFieldUsageCountsClass( fieldPath="age", count=2, ), ], ), identifiers=usage_extractor.identifiers, ), make_usage_workunit( table=VIEW_1, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_2.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=2, topSqlQueries=[query_view_1().text, query_view_1_and_table_1().text], uniqueUserCount=1, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=2, userEmail=ACTOR_1, ), ], fieldCounts=[], ), identifiers=usage_extractor.identifiers, ), make_usage_workunit( table=TABLE_2, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_2.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=2, topSqlQueries=[query_tables_1_and_2().text, query_table_2().text], uniqueUserCount=1, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_2_URN, count=2, userEmail=ACTOR_2, ) ], fieldCounts=[ DatasetFieldUsageCountsClass( fieldPath="id", count=2, ), DatasetFieldUsageCountsClass( fieldPath="value", count=2, ), DatasetFieldUsageCountsClass( fieldPath="table_1_id", count=1, ), ], ), identifiers=usage_extractor.identifiers, ), ] compare_workunits(workunits, expected) assert usage_extractor.report.num_view_query_events == 5 assert usage_extractor.report.num_view_query_events_failed_sql_parsing == 0 assert usage_extractor.report.num_view_query_events_failed_table_identification == 0 def test_usage_counts_multiple_buckets_and_resources_no_view_usage( usage_extractor: BigQueryUsageExtractor, config: BigQueryV2Config, ) -> None: config.usage.apply_view_usage_to_tables = True queries = [ # TS 1 query_table_1_a(TS_1, ACTOR_1), query_table_1_a(TS_1, ACTOR_2), query_table_1_b(TS_1, ACTOR_1), query_tables_1_and_2(TS_1, ACTOR_1), query_tables_1_and_2(TS_1, ACTOR_1), query_tables_1_and_2(TS_1, ACTOR_1), query_view_1(TS_1, ACTOR_1), query_view_1(TS_1, ACTOR_2), query_view_1(TS_1, ACTOR_2), # TS 2 query_table_1_a(TS_2, ACTOR_1), query_table_1_a(TS_2, ACTOR_2), query_table_1_b(TS_2, ACTOR_2), query_tables_1_and_2(TS_2, ACTOR_2), query_table_2(TS_2, ACTOR_2), query_view_1(TS_2, ACTOR_1), query_view_1_and_table_1(TS_2, ACTOR_1), ] events = generate_events( queries, [PROJECT_1, PROJECT_2], TABLE_TO_PROJECT, config=config, proabability_of_project_mismatch=0.5, ) workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values()) expected = [ # TS 1 make_usage_workunit( table=TABLE_1, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=9, topSqlQueries=[ query_tables_1_and_2().text, query_view_1().text, query_table_1_a().text, query_table_1_b().text, ], uniqueUserCount=2, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=6, userEmail=ACTOR_1, ), DatasetUserUsageCountsClass( user=ACTOR_2_URN, count=3, userEmail=ACTOR_2, ), ], fieldCounts=[ DatasetFieldUsageCountsClass( fieldPath="name", count=9, ), DatasetFieldUsageCountsClass( fieldPath="id", count=8, ), DatasetFieldUsageCountsClass( fieldPath="total", count=3, ), DatasetFieldUsageCountsClass( fieldPath="age", count=2, ), ], ), identifiers=usage_extractor.identifiers, ), make_usage_workunit( table=TABLE_2, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=6, topSqlQueries=[query_tables_1_and_2().text, query_view_1().text], uniqueUserCount=2, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=4, userEmail=ACTOR_1, ), DatasetUserUsageCountsClass( user=ACTOR_2_URN, count=2, userEmail=ACTOR_2, ), ], fieldCounts=[ DatasetFieldUsageCountsClass( fieldPath="id", count=6, ), DatasetFieldUsageCountsClass( fieldPath="name", count=3, ), DatasetFieldUsageCountsClass( fieldPath="total", count=3, ), DatasetFieldUsageCountsClass( fieldPath="value", count=3, ), ], ), identifiers=usage_extractor.identifiers, ), # TS 2 make_usage_workunit( table=TABLE_1, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_2.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=6, topSqlQueries=[ query_table_1_a().text, query_tables_1_and_2().text, query_view_1().text, query_table_1_b().text, query_view_1_and_table_1().text, ], uniqueUserCount=2, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=3, userEmail=ACTOR_1, ), DatasetUserUsageCountsClass( user=ACTOR_2_URN, count=3, userEmail=ACTOR_2, ), ], fieldCounts=[ DatasetFieldUsageCountsClass( fieldPath="name", count=6, ), DatasetFieldUsageCountsClass( fieldPath="id", count=5, ), DatasetFieldUsageCountsClass( fieldPath="age", count=2, ), DatasetFieldUsageCountsClass( fieldPath="total", count=2, ), ], ), identifiers=usage_extractor.identifiers, ), make_usage_workunit( table=TABLE_2, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_2.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=4, topSqlQueries=[ query_tables_1_and_2().text, query_view_1().text, query_table_2().text, query_view_1_and_table_1().text, ], uniqueUserCount=2, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=2, userEmail=ACTOR_1, ), DatasetUserUsageCountsClass( user=ACTOR_2_URN, count=2, userEmail=ACTOR_2, ), ], fieldCounts=[ DatasetFieldUsageCountsClass( fieldPath="id", count=4, ), DatasetFieldUsageCountsClass( fieldPath="name", count=2, ), DatasetFieldUsageCountsClass( fieldPath="total", count=2, ), DatasetFieldUsageCountsClass( fieldPath="value", count=2, ), DatasetFieldUsageCountsClass( fieldPath="table_1_id", count=1, ), ], ), identifiers=usage_extractor.identifiers, ), make_zero_usage_workunit(VIEW_1, TS_1, usage_extractor.identifiers), # TS_2 not included as only 1 minute of it was ingested ] compare_workunits(workunits, expected) assert usage_extractor.report.num_view_query_events == 0 def test_usage_counts_no_query_event( caplog: pytest.LogCaptureFixture, usage_extractor: BigQueryUsageExtractor, config: BigQueryV2Config, ) -> None: with caplog.at_level(logging.WARNING): ref = BigQueryTableRef(BigqueryTableIdentifier("project", "dataset", "table")) event = AuditEvent.create( ReadEvent( jobName="job_name", timestamp=TS_1, actor_email=ACTOR_1, resource=ref, fieldsRead=["id", "name", "total"], readReason="JOB", payload=None, ) ) workunits = usage_extractor._get_workunits_internal([event], [str(ref)]) expected = [ MetadataChangeProposalWrapper( entityUrn=usage_extractor.identifiers.gen_dataset_urn_from_raw_ref(ref), aspect=DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=0, uniqueUserCount=0, topSqlQueries=[], userCounts=[], fieldCounts=[], ), ).as_workunit() ] compare_workunits(workunits, expected) assert not caplog.records def test_usage_counts_no_columns( caplog: pytest.LogCaptureFixture, usage_extractor: BigQueryUsageExtractor, config: BigQueryV2Config, ) -> None: job_name = "job_name" ref = BigQueryTableRef( BigqueryTableIdentifier(PROJECT_1, DATABASE_1.name, TABLE_1.name) ) events = [ AuditEvent.create( ReadEvent( jobName=job_name, timestamp=TS_1, actor_email=ACTOR_1, resource=ref, fieldsRead=[], readReason="JOB", payload=None, ), ), AuditEvent.create( QueryEvent( job_name=job_name, timestamp=TS_1, actor_email=ACTOR_1, query="SELECT * FROM table_1", statementType="SELECT", project_id=PROJECT_1, destinationTable=None, referencedTables=[ref], referencedViews=[], payload=None, ) ), ] caplog.clear() with caplog.at_level(logging.WARNING): workunits = usage_extractor._get_workunits_internal( events, [TABLE_REFS[TABLE_1.name]] ) expected = [ make_usage_workunit( table=TABLE_1, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=1, topSqlQueries=["SELECT * FROM table_1"], uniqueUserCount=1, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=1, userEmail=ACTOR_1, ), ], fieldCounts=[], ), identifiers=usage_extractor.identifiers, ) ] compare_workunits(workunits, expected) assert not caplog.records def test_usage_counts_no_columns_and_top_n_limit_hit( caplog: pytest.LogCaptureFixture, usage_extractor: BigQueryUsageExtractor, config: BigQueryV2Config, ) -> None: config.usage.top_n_queries = 1 job_name = "job_name" ref = BigQueryTableRef( BigqueryTableIdentifier(PROJECT_1, DATABASE_1.name, TABLE_1.name) ) events = [ AuditEvent.create( ReadEvent( jobName=job_name, timestamp=TS_1, actor_email=ACTOR_1, resource=ref, fieldsRead=[], readReason="JOB", payload=None, ), ), AuditEvent.create( ReadEvent( jobName="job_name_2", timestamp=TS_1, actor_email=ACTOR_1, resource=ref, fieldsRead=[], readReason="JOB", payload=None, ), ), AuditEvent.create( ReadEvent( jobName="job_name_3", timestamp=TS_1, actor_email=ACTOR_1, resource=ref, fieldsRead=[], readReason="JOB", payload=None, ), ), AuditEvent.create( QueryEvent( job_name=job_name, timestamp=TS_1, actor_email=ACTOR_1, query="SELECT * FROM table_1", statementType="SELECT", project_id=PROJECT_1, destinationTable=None, referencedTables=[ref], referencedViews=[], payload=None, ) ), AuditEvent.create( QueryEvent( job_name="job_name_2", timestamp=TS_1, actor_email=ACTOR_1, query="SELECT * FROM table_1", statementType="SELECT", project_id=PROJECT_1, destinationTable=None, referencedTables=[ref], referencedViews=[], payload=None, ) ), AuditEvent.create( QueryEvent( job_name="job_name_3", timestamp=TS_1, actor_email=ACTOR_1, query="SELECT my_column FROM table_1", statementType="SELECT", project_id=PROJECT_1, destinationTable=None, referencedTables=[ref], referencedViews=[], payload=None, ) ), ] caplog.clear() with caplog.at_level(logging.WARNING): workunits = usage_extractor._get_workunits_internal( events, [TABLE_REFS[TABLE_1.name]] ) expected = [ make_usage_workunit( table=TABLE_1, dataset_usage_statistics=DatasetUsageStatisticsClass( timestampMillis=int(TS_1.timestamp() * 1000), eventGranularity=TimeWindowSizeClass( unit=BucketDuration.DAY, multiple=1 ), totalSqlQueries=3, topSqlQueries=["SELECT * FROM table_1"], uniqueUserCount=1, userCounts=[ DatasetUserUsageCountsClass( user=ACTOR_1_URN, count=3, userEmail=ACTOR_1, ), ], fieldCounts=[], ), identifiers=usage_extractor.identifiers, ) ] compare_workunits(workunits, expected) assert not caplog.records @freeze_time(FROZEN_TIME) @patch.object(BigQueryUsageExtractor, "_generate_usage_workunits") def test_operational_stats( mock: MagicMock, usage_extractor: BigQueryUsageExtractor, config: BigQueryV2Config, ) -> None: mock.return_value = [] config.usage.apply_view_usage_to_tables = True config.usage.include_operational_stats = True seed_metadata = generate_data( num_containers=3, num_tables=5, num_views=2, time_range=timedelta(days=1), ) all_tables = seed_metadata.tables + seed_metadata.views num_projects = 2 projects = [f"project-{i}" for i in range(num_projects)] table_to_project = {table.name: random.choice(projects) for table in all_tables} table_refs = { table.name: str(ref_from_table(table, table_to_project)) for table in all_tables } queries = list( generate_queries( seed_metadata, num_selects=10, num_operations=20, num_unique_queries=10, num_users=3, ) ) events = generate_events(queries, projects, table_to_project, config=config) workunits = usage_extractor._get_workunits_internal(events, table_refs.values()) expected = [ make_operational_workunit( usage_extractor.identifiers.gen_dataset_urn_from_raw_ref( BigQueryTableRef.from_string_name( table_refs[query.object_modified.name] ) ), OperationClass( timestampMillis=int(FROZEN_TIME.timestamp() * 1000), lastUpdatedTimestamp=int(query.timestamp.timestamp() * 1000), actor=f"urn:li:corpuser:{query.actor.split('@')[0]}", operationType=( query.type if query.type in OPERATION_STATEMENT_TYPES.values() else "CUSTOM" ), customOperationType=( None if query.type in OPERATION_STATEMENT_TYPES.values() else query.type ), affectedDatasets=list( dict.fromkeys( # Preserve order usage_extractor.identifiers.gen_dataset_urn_from_raw_ref( BigQueryTableRef.from_string_name( table_refs[field.table.name] ) ) for field in query.fields_accessed if not field.table.is_view() ) ) + list( dict.fromkeys( # Preserve order usage_extractor.identifiers.gen_dataset_urn_from_raw_ref( BigQueryTableRef.from_string_name(table_refs[parent.name]) ) for field in query.fields_accessed if field.table.is_view() for parent in field.table.upstreams ) ), ), ) for query in queries if query.object_modified and query.type in OPERATION_STATEMENT_TYPES.values() ] compare_workunits( [ wu for wu in workunits if isinstance(wu.metadata, MetadataChangeProposalWrapper) and isinstance(wu.metadata.aspect, OperationClass) ], expected, ) def test_get_tables_from_query(usage_extractor): assert usage_extractor.get_tables_from_query( "SELECT * FROM project-1.database_1.view_1", default_project=PROJECT_1 ) == [ BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")) ] assert usage_extractor.get_tables_from_query( "SELECT * FROM database_1.view_1", default_project=PROJECT_1 ) == [ BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")) ] assert sorted( usage_extractor.get_tables_from_query( "SELECT v.id, v.name, v.total, t.name as name1 FROM database_1.view_1 as v inner join database_1.table_1 as t on v.id=t.id", default_project=PROJECT_1, ) ) == [ BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "table_1")), BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")), ] assert sorted( usage_extractor.get_tables_from_query( "CREATE TABLE database_1.new_table AS SELECT v.id, v.name, v.total, t.name as name1 FROM database_1.view_1 as v inner join database_1.table_1 as t on v.id=t.id", default_project=PROJECT_1, ) ) == [ BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "table_1")), BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")), ]