feat(ingest/bigquery): usage for views (#8046)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
Co-authored-by: Andrew Sikowitz <andrew.sikowitz@acryl.io>
This commit is contained in:
Mayuri Nehate 2023-05-24 22:18:58 +05:30 committed by GitHub
parent 1f67463778
commit b3d80e57e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 503 additions and 52 deletions

View File

@ -261,6 +261,8 @@ class QueryEvent:
default_dataset: Optional[str] = None default_dataset: Optional[str] = None
numAffectedRows: Optional[int] = None numAffectedRows: Optional[int] = None
query_on_view: bool = False
@staticmethod @staticmethod
def get_missing_key_entry(entry: AuditLogEntry) -> Optional[str]: def get_missing_key_entry(entry: AuditLogEntry) -> Optional[str]:
return get_first_missing_key( return get_first_missing_key(
@ -344,6 +346,8 @@ class QueryEvent:
BigQueryTableRef.from_spec_obj(spec).get_sanitized_table_ref() BigQueryTableRef.from_spec_obj(spec).get_sanitized_table_ref()
for spec in raw_ref_views for spec in raw_ref_views
] ]
query_event.query_on_view = True
# payload # payload
query_event.payload = entry.payload if debug_include_full_payloads else None query_event.payload = entry.payload if debug_include_full_payloads else None
if not query_event.job_name: if not query_event.job_name:
@ -420,6 +424,8 @@ class QueryEvent:
BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref() BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref()
for spec in raw_ref_views for spec in raw_ref_views
] ]
query_event.query_on_view = True
# payload # payload
query_event.payload = payload if debug_include_full_payloads else None query_event.payload = payload if debug_include_full_payloads else None
@ -487,6 +493,8 @@ class QueryEvent:
BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref() BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref()
for spec in raw_ref_views for spec in raw_ref_views
] ]
query_event.query_on_view = True
# payload # payload
query_event.payload = payload if debug_include_full_payloads else None query_event.payload = payload if debug_include_full_payloads else None
@ -519,6 +527,8 @@ class ReadEvent:
payload: Any payload: Any
from_query: bool = False
# We really should use composition here since the query isn't actually # We really should use composition here since the query isn't actually
# part of the read event, but this solution is just simpler. # part of the read event, but this solution is just simpler.
# query: Optional["QueryEvent"] = None # populated via join # query: Optional["QueryEvent"] = None # populated via join
@ -582,6 +592,27 @@ class ReadEvent:
) )
return readEvent return readEvent
@classmethod
def from_query_event(
cls,
read_resource: BigQueryTableRef,
query_event: QueryEvent,
debug_include_full_payloads: bool = False,
) -> "ReadEvent":
readEvent = ReadEvent(
actor_email=query_event.actor_email,
timestamp=query_event.timestamp,
resource=read_resource,
fieldsRead=[],
readReason="JOB",
jobName=query_event.job_name,
payload=query_event.payload if debug_include_full_payloads else None,
from_query=True,
)
return readEvent
@classmethod @classmethod
def from_exported_bigquery_audit_metadata( def from_exported_bigquery_audit_metadata(
cls, row: BigQueryAuditMetadata, debug_include_full_payloads: bool = False cls, row: BigQueryAuditMetadata, debug_include_full_payloads: bool = False

View File

@ -28,6 +28,11 @@ class BigQueryUsageConfig(BaseUsageConfig):
description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.", description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.",
) )
apply_view_usage_to_tables: bool = Field(
default=False,
description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies usage to views / tables mentioned in the query. If set to True, usage is applied to base tables only.",
)
class BigQueryV2Config( class BigQueryV2Config(
BigQueryBaseConfig, BigQueryBaseConfig,

View File

@ -68,6 +68,9 @@ class BigQueryV2Report(ProfilingSqlReport):
total_query_log_entries: int = 0 total_query_log_entries: int = 0
num_read_events: int = 0 num_read_events: int = 0
num_query_events: int = 0 num_query_events: int = 0
num_view_query_events: int = 0
num_view_query_events_failed_sql_parsing: int = 0
num_view_query_events_failed_table_identification: int = 0
num_filtered_read_events: int = 0 num_filtered_read_events: int = 0
num_filtered_query_events: int = 0 num_filtered_query_events: int = 0
num_usage_query_hash_collisions: int = 0 num_usage_query_hash_collisions: int = 0

View File

@ -36,6 +36,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
AuditEvent, AuditEvent,
AuditLogEntry, AuditLogEntry,
BigQueryAuditMetadata, BigQueryAuditMetadata,
BigqueryTableIdentifier,
BigQueryTableRef, BigQueryTableRef,
QueryEvent, QueryEvent,
ReadEvent, ReadEvent,
@ -53,6 +54,7 @@ from datahub.ingestion.source.usage.usage_common import (
make_usage_workunit, make_usage_workunit,
) )
from datahub.metadata.schema_classes import OperationClass, OperationTypeClass from datahub.metadata.schema_classes import OperationClass, OperationTypeClass
from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser
from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedDict from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedDict
from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.perf_timer import PerfTimer
@ -142,7 +144,7 @@ def bigquery_audit_metadata_query_template(
AND AND
( (
( (
JSON_EXTRACT_SCALAR(protopayload_auditlog.methodName) IN protopayload_auditlog.methodName IN
( (
"google.cloud.bigquery.v2.JobService.Query", "google.cloud.bigquery.v2.JobService.Query",
"google.cloud.bigquery.v2.JobService.InsertJob" "google.cloud.bigquery.v2.JobService.InsertJob"
@ -184,6 +186,7 @@ class BigQueryUsageState(Closeable):
e.timestamp, config.bucket_duration e.timestamp, config.bucket_duration
), ),
"user": lambda e: e.actor_email, "user": lambda e: e.actor_email,
"from_query": lambda e: int(e.from_query),
}, },
cache_max_size=config.file_backed_cache_size, cache_max_size=config.file_backed_cache_size,
# Evict entire cache to reduce db calls. # Evict entire cache to reduce db calls.
@ -198,6 +201,7 @@ class BigQueryUsageState(Closeable):
extra_columns={ extra_columns={
"query": lambda e: e.query, "query": lambda e: e.query,
"is_read": lambda e: int(e.statementType in READ_STATEMENT_TYPES), "is_read": lambda e: int(e.statementType in READ_STATEMENT_TYPES),
"on_view": lambda e: int(e.query_on_view),
}, },
cache_max_size=config.file_backed_cache_size, cache_max_size=config.file_backed_cache_size,
cache_eviction_batch_size=max(int(config.file_backed_cache_size * 0.9), 1), cache_eviction_batch_size=max(int(config.file_backed_cache_size * 0.9), 1),
@ -328,6 +332,20 @@ class BigQueryUsageState(Closeable):
column_freq=json.loads(row["column_freq"] or "[]"), column_freq=json.loads(row["column_freq"] or "[]"),
) )
def delete_original_read_events_for_view_query_events(self) -> None:
self.read_events.sql_query(
"""
DELETE FROM
read_events
WHERE
read_events.from_query = 0 AND
read_events.name in (
SELECT q.key FROM query_events q WHERE q.on_view = 1
)
""",
refs=[self.query_events],
)
def report_disk_usage(self, report: BigQueryV2Report) -> None: def report_disk_usage(self, report: BigQueryV2Report) -> None:
report.usage_state_size = str( report.usage_state_size = str(
{ {
@ -342,7 +360,7 @@ class BigQueryUsageState(Closeable):
class BigQueryUsageExtractor: class BigQueryUsageExtractor:
""" """
This plugin extracts the following: This plugin extracts the following:
* Statistics on queries issued and tables and columns accessed (excludes views) * Statistics on queries issued and tables and columns accessed
* Aggregation of these statistics into buckets, by day or hour granularity * Aggregation of these statistics into buckets, by day or hour granularity
:::note :::note
@ -389,6 +407,26 @@ class BigQueryUsageExtractor:
logger.error("Error processing usage", exc_info=True) logger.error("Error processing usage", exc_info=True)
self.report.report_warning("usage-ingestion", str(e)) self.report.report_warning("usage-ingestion", str(e))
def generate_read_events_from_query(
self, query_event_on_view: QueryEvent
) -> Iterable[AuditEvent]:
try:
tables = self.get_tables_from_query(
query_event_on_view.project_id,
query_event_on_view.query,
)
assert tables is not None and len(tables) != 0
for table in tables:
yield AuditEvent.create(
ReadEvent.from_query_event(table, query_event_on_view)
)
except Exception as ex:
logger.debug(
f"Generating read events failed for this query on view: {query_event_on_view.query}. "
f"Usage won't be added. The error was {ex}."
)
self.report.num_view_query_events_failed_sql_parsing += 1
def _ingest_events( def _ingest_events(
self, self,
events: Iterable[AuditEvent], events: Iterable[AuditEvent],
@ -397,8 +435,33 @@ class BigQueryUsageExtractor:
) -> None: ) -> None:
"""Read log and store events in usage_state.""" """Read log and store events in usage_state."""
num_aggregated = 0 num_aggregated = 0
num_generated = 0
for audit_event in events: for audit_event in events:
try: try:
# Note for View Usage:
# If Query Event references a view, bigquery audit logs do not contain Read Event for view
# in its audit logs, but only for it base tables. To extract usage for views, we parse the
# sql query to find bigquery tables and views read in the query and generate Read Events
# for them in our code (`from_query`=True). For such Query Events, we delete the original
# Read Events coming from Bigquery audit logs and keep only generated ones.
# Caveats of SQL parsing approach used here:
# 1. If query parsing fails, usage for such query is not considered/counted.
# 2. Due to limitations of query parsing, field level usage is not available.
# To limit the impact, we use query parsing only for those queries that reference at least
# one view. For all other queries, field level usage is available through bigquery audit logs.
if (
audit_event.query_event
and audit_event.query_event.query_on_view
and not self.config.usage.apply_view_usage_to_tables
):
query_event = audit_event.query_event
self.report.num_view_query_events += 1
for new_event in self.generate_read_events_from_query(query_event):
num_generated += self._store_usage_event(
new_event, usage_state, table_refs
)
num_aggregated += self._store_usage_event( num_aggregated += self._store_usage_event(
audit_event, usage_state, table_refs audit_event, usage_state, table_refs
) )
@ -409,6 +472,10 @@ class BigQueryUsageExtractor:
self._report_error("store-event", e) self._report_error("store-event", e)
logger.info(f"Total number of events aggregated = {num_aggregated}.") logger.info(f"Total number of events aggregated = {num_aggregated}.")
if self.report.num_view_query_events > 0:
logger.info(f"Total number of read events generated = {num_generated}.")
usage_state.delete_original_read_events_for_view_query_events()
def _generate_operational_workunits( def _generate_operational_workunits(
self, usage_state: BigQueryUsageState, table_refs: Collection[str] self, usage_state: BigQueryUsageState, table_refs: Collection[str]
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
@ -903,6 +970,56 @@ class BigQueryUsageExtractor:
f"log-parse-{project_id}", e, group="usage-log-parse" f"log-parse-{project_id}", e, group="usage-log-parse"
) )
def get_tables_from_query(
self, default_project: str, query: str
) -> Optional[List[BigQueryTableRef]]:
"""
This method attempts to parse bigquery objects read in the query
"""
if not query:
return None
parsed_tables = set()
try:
parser = BigQuerySQLParser(
query,
self.config.sql_parser_use_external_process,
use_raw_names=self.config.lineage_sql_parser_use_raw_names,
)
tables = parser.get_tables()
except Exception as ex:
logger.debug(
f"Sql parsing failed on this query on view: {query}. "
f"Usage won't be added. The error was {ex}."
)
return None
for table in tables:
parts = table.split(".")
if len(parts) == 2:
parsed_tables.add(
BigQueryTableRef(
BigqueryTableIdentifier(
project_id=default_project, dataset=parts[0], table=parts[1]
)
).get_sanitized_table_ref()
)
elif len(parts) == 3:
parsed_tables.add(
BigQueryTableRef(
BigqueryTableIdentifier(
project_id=parts[0], dataset=parts[1], table=parts[2]
)
).get_sanitized_table_ref()
)
else:
logger.debug(
f"Invalid table identifier {table} when parsing query on view {query}"
)
self.report.num_view_query_events_failed_table_identification += 1
return list(parsed_tables)
def _report_error( def _report_error(
self, label: str, e: Exception, group: Optional[str] = None self, label: str, e: Exception, group: Optional[str] = None
) -> None: ) -> None:

View File

@ -15,5 +15,5 @@ class SnowflakeUsageConfig(BaseUsageConfig):
) )
apply_view_usage_to_tables: bool = pydantic.Field( apply_view_usage_to_tables: bool = pydantic.Field(
default=False, default=False,
description="Allow/deny patterns for views in snowflake dataset names.", description="Whether to apply view's usage to its base tables. If set to True, usage is applied to base tables only.",
) )

View File

@ -2,7 +2,7 @@ import dataclasses
import random import random
import uuid import uuid
from collections import defaultdict from collections import defaultdict
from typing import Dict, Iterable, List from typing import Dict, Iterable, List, cast
from typing_extensions import get_args from typing_extensions import get_args
@ -15,7 +15,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
) )
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.usage import OPERATION_STATEMENT_TYPES from datahub.ingestion.source.bigquery_v2.usage import OPERATION_STATEMENT_TYPES
from tests.performance.data_model import Query, StatementType, Table from tests.performance.data_model import Query, StatementType, Table, View
# https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.TableDataRead.Reason # https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.TableDataRead.Reason
READ_REASONS = [ READ_REASONS = [
@ -55,6 +55,14 @@ def generate_events(
else random.choice(projects) else random.choice(projects)
) )
job_name = str(uuid.uuid4()) job_name = str(uuid.uuid4())
referencedViews = list(
dict.fromkeys(
ref_from_table(field.table, table_to_project)
for field in query.fields_accessed
if field.table.is_view()
)
)
yield AuditEvent.create( yield AuditEvent.create(
QueryEvent( QueryEvent(
job_name=job_name, job_name=job_name,
@ -72,24 +80,34 @@ def generate_events(
for field in query.fields_accessed for field in query.fields_accessed
if not field.table.is_view() if not field.table.is_view()
) )
), )
referencedViews=list( + list(
dict.fromkeys( dict.fromkeys( # Preserve order
ref_from_table(field.table, table_to_project) ref_from_table(parent, table_to_project)
for field in query.fields_accessed for field in query.fields_accessed
if field.table.is_view() if field.table.is_view()
for parent in cast(View, field.table).parents
) )
), ),
referencedViews=referencedViews,
payload=dataclasses.asdict(query) payload=dataclasses.asdict(query)
if config.debug_include_full_payloads if config.debug_include_full_payloads
else None, else None,
query_on_view=True if referencedViews else False,
) )
) )
table_accesses = defaultdict(list) table_accesses = defaultdict(set)
for field in query.fields_accessed: for field in query.fields_accessed:
table_accesses[ref_from_table(field.table, table_to_project)].append( if not field.table.is_view():
field.column table_accesses[ref_from_table(field.table, table_to_project)].add(
) field.column
)
else:
# assuming that same fields are accessed in parent tables
for parent in cast(View, field.table).parents:
table_accesses[ref_from_table(parent, table_to_project)].add(
field.column
)
for ref, columns in table_accesses.items(): for ref, columns in table_accesses.items():
yield AuditEvent.create( yield AuditEvent.create(
@ -98,7 +116,7 @@ def generate_events(
timestamp=query.timestamp, timestamp=query.timestamp,
actor_email=query.actor, actor_email=query.actor,
resource=ref, resource=ref,
fieldsRead=columns, fieldsRead=list(columns),
readReason=random.choice(READ_REASONS), readReason=random.choice(READ_REASONS),
payload=dataclasses.asdict(query) payload=dataclasses.asdict(query)
if config.debug_include_full_payloads if config.debug_include_full_payloads

View File

@ -37,7 +37,11 @@ def run_test():
config = BigQueryV2Config( config = BigQueryV2Config(
start_time=seed_metadata.start_time, start_time=seed_metadata.start_time,
end_time=seed_metadata.end_time, end_time=seed_metadata.end_time,
usage=BigQueryUsageConfig(include_top_n_queries=True, top_n_queries=10), usage=BigQueryUsageConfig(
include_top_n_queries=True,
top_n_queries=10,
apply_view_usage_to_tables=True,
),
file_backed_cache_size=1000, file_backed_cache_size=1000,
) )
usage_extractor = BigQueryUsageExtractor(config, report) usage_extractor = BigQueryUsageExtractor(config, report)

View File

@ -1,6 +1,7 @@
import logging import logging
import random import random
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import cast
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import pytest import pytest
@ -122,7 +123,7 @@ def query_tables_1_and_2(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Qu
def query_view_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query: def query_view_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query:
return Query( return Query(
text="SELECT * FROM view_1", text="SELECT * FROM project-1.database_1.view_1",
type="SELECT", type="SELECT",
timestamp=timestamp, timestamp=timestamp,
actor=actor, actor=actor,
@ -134,6 +135,27 @@ def query_view_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query:
) )
def query_view_1_and_table_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query:
return Query(
text="""SELECT v.id, v.name, v.total, t.name as name1
FROM
`project-1.database_1.view_1` as v
inner join
`project-1.database_1.table_1` as t
on
v.id=t.id""",
type="SELECT",
timestamp=timestamp,
actor=actor,
fields_accessed=[
FieldAccess("id", VIEW_1),
FieldAccess("name", VIEW_1),
FieldAccess("total", VIEW_1),
FieldAccess("name", TABLE_1),
],
)
def make_usage_workunit( def make_usage_workunit(
table: Table, dataset_usage_statistics: DatasetUsageStatisticsClass table: Table, dataset_usage_statistics: DatasetUsageStatisticsClass
) -> MetadataWorkUnit: ) -> MetadataWorkUnit:
@ -238,7 +260,7 @@ def test_usage_counts_single_bucket_resource_project(
] ]
def test_usage_counts_multiple_buckets_and_resources( def test_usage_counts_multiple_buckets_and_resources_view_usage(
usage_extractor: BigQueryUsageExtractor, usage_extractor: BigQueryUsageExtractor,
config: BigQueryV2Config, config: BigQueryV2Config,
) -> None: ) -> None:
@ -260,6 +282,7 @@ def test_usage_counts_multiple_buckets_and_resources(
query_tables_1_and_2(TS_2, ACTOR_2), query_tables_1_and_2(TS_2, ACTOR_2),
query_table_2(TS_2, ACTOR_2), query_table_2(TS_2, ACTOR_2),
query_view_1(TS_2, ACTOR_1), query_view_1(TS_2, ACTOR_1),
query_view_1_and_table_1(TS_2, ACTOR_1),
] ]
events = generate_events( events = generate_events(
queries, queries,
@ -338,20 +361,7 @@ def test_usage_counts_multiple_buckets_and_resources(
userEmail=ACTOR_1, userEmail=ACTOR_1,
), ),
], ],
fieldCounts=[ fieldCounts=[],
DatasetFieldUsageCountsClass(
fieldPath="id",
count=3,
),
DatasetFieldUsageCountsClass(
fieldPath="name",
count=3,
),
DatasetFieldUsageCountsClass(
fieldPath="total",
count=3,
),
],
), ),
), ),
make_usage_workunit( make_usage_workunit(
@ -393,11 +403,12 @@ def test_usage_counts_multiple_buckets_and_resources(
eventGranularity=TimeWindowSizeClass( eventGranularity=TimeWindowSizeClass(
unit=BucketDuration.DAY, multiple=1 unit=BucketDuration.DAY, multiple=1
), ),
totalSqlQueries=4, totalSqlQueries=5,
topSqlQueries=[ topSqlQueries=[
query_table_1_a().text, query_table_1_a().text,
query_tables_1_and_2().text, query_tables_1_and_2().text,
query_table_1_b().text, query_table_1_b().text,
query_view_1_and_table_1().text,
], ],
uniqueUserCount=2, uniqueUserCount=2,
userCounts=[ userCounts=[
@ -408,7 +419,7 @@ def test_usage_counts_multiple_buckets_and_resources(
), ),
DatasetUserUsageCountsClass( DatasetUserUsageCountsClass(
user=ACTOR_1_URN, user=ACTOR_1_URN,
count=1, count=2,
userEmail=ACTOR_1, userEmail=ACTOR_1,
), ),
], ],
@ -435,32 +446,17 @@ def test_usage_counts_multiple_buckets_and_resources(
eventGranularity=TimeWindowSizeClass( eventGranularity=TimeWindowSizeClass(
unit=BucketDuration.DAY, multiple=1 unit=BucketDuration.DAY, multiple=1
), ),
totalSqlQueries=1, totalSqlQueries=2,
topSqlQueries=[ topSqlQueries=[query_view_1().text, query_view_1_and_table_1().text],
query_view_1().text,
],
uniqueUserCount=1, uniqueUserCount=1,
userCounts=[ userCounts=[
DatasetUserUsageCountsClass( DatasetUserUsageCountsClass(
user=ACTOR_1_URN, user=ACTOR_1_URN,
count=1, count=2,
userEmail=ACTOR_1, userEmail=ACTOR_1,
), ),
], ],
fieldCounts=[ fieldCounts=[],
DatasetFieldUsageCountsClass(
fieldPath="id",
count=1,
),
DatasetFieldUsageCountsClass(
fieldPath="name",
count=1,
),
DatasetFieldUsageCountsClass(
fieldPath="total",
count=1,
),
],
), ),
), ),
make_usage_workunit( make_usage_workunit(
@ -497,6 +493,238 @@ def test_usage_counts_multiple_buckets_and_resources(
), ),
), ),
] ]
assert usage_extractor.report.num_view_query_events == 5
assert usage_extractor.report.num_view_query_events_failed_sql_parsing == 0
assert usage_extractor.report.num_view_query_events_failed_table_identification == 0
def test_usage_counts_multiple_buckets_and_resources_no_view_usage(
usage_extractor: BigQueryUsageExtractor,
config: BigQueryV2Config,
) -> None:
config.usage.apply_view_usage_to_tables = True
queries = [
# TS 1
query_table_1_a(TS_1, ACTOR_1),
query_table_1_a(TS_1, ACTOR_2),
query_table_1_b(TS_1, ACTOR_1),
query_tables_1_and_2(TS_1, ACTOR_1),
query_tables_1_and_2(TS_1, ACTOR_1),
query_tables_1_and_2(TS_1, ACTOR_1),
query_view_1(TS_1, ACTOR_1),
query_view_1(TS_1, ACTOR_2),
query_view_1(TS_1, ACTOR_2),
# TS 2
query_table_1_a(TS_2, ACTOR_1),
query_table_1_a(TS_2, ACTOR_2),
query_table_1_b(TS_2, ACTOR_2),
query_tables_1_and_2(TS_2, ACTOR_2),
query_table_2(TS_2, ACTOR_2),
query_view_1(TS_2, ACTOR_1),
query_view_1_and_table_1(TS_2, ACTOR_1),
]
events = generate_events(
queries,
[PROJECT_1, PROJECT_2],
TABLE_TO_PROJECT,
config=config,
proabability_of_project_mismatch=0.5,
)
workunits = usage_extractor._run(events, TABLE_REFS.values())
assert list(workunits) == [
# TS 1
make_usage_workunit(
table=TABLE_1,
dataset_usage_statistics=DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
eventGranularity=TimeWindowSizeClass(
unit=BucketDuration.DAY, multiple=1
),
totalSqlQueries=9,
topSqlQueries=[
query_tables_1_and_2().text,
query_view_1().text,
query_table_1_a().text,
query_table_1_b().text,
],
uniqueUserCount=2,
userCounts=[
DatasetUserUsageCountsClass(
user=ACTOR_1_URN,
count=6,
userEmail=ACTOR_1,
),
DatasetUserUsageCountsClass(
user=ACTOR_2_URN,
count=3,
userEmail=ACTOR_2,
),
],
fieldCounts=[
DatasetFieldUsageCountsClass(
fieldPath="name",
count=9,
),
DatasetFieldUsageCountsClass(
fieldPath="id",
count=8,
),
DatasetFieldUsageCountsClass(
fieldPath="total",
count=3,
),
DatasetFieldUsageCountsClass(
fieldPath="age",
count=2,
),
],
),
),
make_usage_workunit(
table=TABLE_2,
dataset_usage_statistics=DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
eventGranularity=TimeWindowSizeClass(
unit=BucketDuration.DAY, multiple=1
),
totalSqlQueries=6,
topSqlQueries=[query_tables_1_and_2().text, query_view_1().text],
uniqueUserCount=2,
userCounts=[
DatasetUserUsageCountsClass(
user=ACTOR_1_URN,
count=4,
userEmail=ACTOR_1,
),
DatasetUserUsageCountsClass(
user=ACTOR_2_URN,
count=2,
userEmail=ACTOR_2,
),
],
fieldCounts=[
DatasetFieldUsageCountsClass(
fieldPath="id",
count=6,
),
DatasetFieldUsageCountsClass(
fieldPath="name",
count=3,
),
DatasetFieldUsageCountsClass(
fieldPath="total",
count=3,
),
DatasetFieldUsageCountsClass(
fieldPath="value",
count=3,
),
],
),
),
# TS 2
make_usage_workunit(
table=TABLE_1,
dataset_usage_statistics=DatasetUsageStatisticsClass(
timestampMillis=int(TS_2.timestamp() * 1000),
eventGranularity=TimeWindowSizeClass(
unit=BucketDuration.DAY, multiple=1
),
totalSqlQueries=6,
topSqlQueries=[
query_table_1_a().text,
query_tables_1_and_2().text,
query_view_1().text,
query_table_1_b().text,
query_view_1_and_table_1().text,
],
uniqueUserCount=2,
userCounts=[
DatasetUserUsageCountsClass(
user=ACTOR_1_URN,
count=3,
userEmail=ACTOR_1,
),
DatasetUserUsageCountsClass(
user=ACTOR_2_URN,
count=3,
userEmail=ACTOR_2,
),
],
fieldCounts=[
DatasetFieldUsageCountsClass(
fieldPath="name",
count=6,
),
DatasetFieldUsageCountsClass(
fieldPath="id",
count=5,
),
DatasetFieldUsageCountsClass(
fieldPath="age",
count=2,
),
DatasetFieldUsageCountsClass(
fieldPath="total",
count=2,
),
],
),
),
make_usage_workunit(
table=TABLE_2,
dataset_usage_statistics=DatasetUsageStatisticsClass(
timestampMillis=int(TS_2.timestamp() * 1000),
eventGranularity=TimeWindowSizeClass(
unit=BucketDuration.DAY, multiple=1
),
totalSqlQueries=4,
topSqlQueries=[
query_tables_1_and_2().text,
query_view_1().text,
query_table_2().text,
query_view_1_and_table_1().text,
],
uniqueUserCount=2,
userCounts=[
DatasetUserUsageCountsClass(
user=ACTOR_1_URN,
count=2,
userEmail=ACTOR_1,
),
DatasetUserUsageCountsClass(
user=ACTOR_2_URN,
count=2,
userEmail=ACTOR_2,
),
],
fieldCounts=[
DatasetFieldUsageCountsClass(
fieldPath="id",
count=4,
),
DatasetFieldUsageCountsClass(
fieldPath="name",
count=2,
),
DatasetFieldUsageCountsClass(
fieldPath="total",
count=2,
),
DatasetFieldUsageCountsClass(
fieldPath="value",
count=2,
),
DatasetFieldUsageCountsClass(
fieldPath="table_1_id",
count=1,
),
],
),
),
]
assert usage_extractor.report.num_view_query_events == 0
def test_usage_counts_no_query_event( def test_usage_counts_no_query_event(
@ -593,6 +821,7 @@ def test_operational_stats(
config: BigQueryV2Config, config: BigQueryV2Config,
) -> None: ) -> None:
mock.return_value = [] mock.return_value = []
config.usage.apply_view_usage_to_tables = True
config.usage.include_operational_stats = True config.usage.include_operational_stats = True
seed_metadata = generate_data( seed_metadata = generate_data(
num_containers=3, num_containers=3,
@ -642,9 +871,53 @@ def test_operational_stats(
for field in query.fields_accessed for field in query.fields_accessed
if not field.table.is_view() if not field.table.is_view()
) )
)
+ list(
dict.fromkeys( # Preserve order
BigQueryTableRef.from_string_name(
table_refs[parent.name]
).to_urn("PROD")
for field in query.fields_accessed
if field.table.is_view()
for parent in cast(View, field.table).parents
)
), ),
), ),
) )
for query in queries for query in queries
if query.object_modified and query.type in OPERATION_STATEMENT_TYPES.values() if query.object_modified and query.type in OPERATION_STATEMENT_TYPES.values()
] ]
def test_get_tables_from_query(usage_extractor):
assert usage_extractor.get_tables_from_query(
PROJECT_1, "SELECT * FROM project-1.database_1.view_1"
) == [
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1"))
]
assert usage_extractor.get_tables_from_query(
PROJECT_1, "SELECT * FROM database_1.view_1"
) == [
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1"))
]
assert sorted(
usage_extractor.get_tables_from_query(
PROJECT_1,
"SELECT v.id, v.name, v.total, t.name as name1 FROM database_1.view_1 as v inner join database_1.table_1 as t on v.id=t.id",
)
) == [
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "table_1")),
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")),
]
assert sorted(
usage_extractor.get_tables_from_query(
PROJECT_1,
"CREATE TABLE database_1.new_table AS SELECT v.id, v.name, v.total, t.name as name1 FROM database_1.view_1 as v inner join database_1.table_1 as t on v.id=t.id",
)
) == [
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "table_1")),
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")),
]