mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-07 09:11:47 +00:00
1144 lines
39 KiB
Python
1144 lines
39 KiB
Python
import logging
|
|
import random
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Iterable
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
from freezegun import freeze_time
|
|
|
|
from datahub.configuration.time_window_config import BucketDuration
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
|
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
|
|
AuditEvent,
|
|
BigqueryTableIdentifier,
|
|
BigQueryTableRef,
|
|
QueryEvent,
|
|
ReadEvent,
|
|
)
|
|
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
|
|
BigQueryUsageConfig,
|
|
BigQueryV2Config,
|
|
)
|
|
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
|
|
from datahub.ingestion.source.bigquery_v2.common import BigQueryIdentifierBuilder
|
|
from datahub.ingestion.source.bigquery_v2.usage import (
|
|
OPERATION_STATEMENT_TYPES,
|
|
BigQueryUsageExtractor,
|
|
)
|
|
from datahub.metadata.schema_classes import (
|
|
DatasetFieldUsageCountsClass,
|
|
DatasetUsageStatisticsClass,
|
|
DatasetUserUsageCountsClass,
|
|
OperationClass,
|
|
TimeWindowSizeClass,
|
|
)
|
|
from datahub.sql_parsing.schema_resolver import SchemaResolver
|
|
from datahub.testing.compare_metadata_json import diff_metadata_json
|
|
from tests.performance.bigquery.bigquery_events import generate_events, ref_from_table
|
|
from tests.performance.data_generation import generate_data, generate_queries
|
|
from tests.performance.data_model import Container, FieldAccess, Query, Table, View
|
|
|
|
PROJECT_1 = "project-1"
|
|
PROJECT_2 = "project-2"
|
|
ACTOR_1, ACTOR_1_URN = "a@acryl.io", "urn:li:corpuser:a"
|
|
ACTOR_2, ACTOR_2_URN = "b@acryl.io", "urn:li:corpuser:b"
|
|
DATABASE_1 = Container("database_1")
|
|
DATABASE_2 = Container("database_2")
|
|
TABLE_1 = Table("table_1", DATABASE_1, columns=["id", "name", "age"], upstreams=[])
|
|
TABLE_2 = Table(
|
|
"table_2", DATABASE_1, columns=["id", "table_1_id", "value"], upstreams=[]
|
|
)
|
|
VIEW_1 = View(
|
|
name="view_1",
|
|
container=DATABASE_1,
|
|
columns=["id", "name", "total"],
|
|
definition="VIEW DEFINITION 1",
|
|
upstreams=[TABLE_1, TABLE_2],
|
|
)
|
|
ALL_TABLES = [TABLE_1, TABLE_2, VIEW_1]
|
|
|
|
TABLE_TO_PROJECT = {
|
|
TABLE_1.name: PROJECT_1,
|
|
TABLE_2.name: PROJECT_2,
|
|
VIEW_1.name: PROJECT_1,
|
|
}
|
|
TABLE_REFS = {
|
|
table.name: str(ref_from_table(table, TABLE_TO_PROJECT)) for table in ALL_TABLES
|
|
}
|
|
|
|
FROZEN_TIME = datetime(year=2023, month=2, day=1, tzinfo=timezone.utc)
|
|
TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc)
|
|
TS_2 = datetime(year=2023, month=1, day=2, tzinfo=timezone.utc)
|
|
|
|
|
|
def query_table_1_a(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query:
|
|
return Query(
|
|
text="SELECT * FROM table_1",
|
|
type="SELECT",
|
|
timestamp=timestamp,
|
|
actor=actor,
|
|
fields_accessed=[
|
|
FieldAccess("id", TABLE_1),
|
|
FieldAccess("name", TABLE_1),
|
|
FieldAccess("age", TABLE_1),
|
|
],
|
|
)
|
|
|
|
|
|
def query_table_1_b(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query:
|
|
return Query(
|
|
text="SELECT name FROM table_1",
|
|
type="SELECT",
|
|
timestamp=timestamp,
|
|
actor=actor,
|
|
fields_accessed=[FieldAccess("name", TABLE_1)],
|
|
)
|
|
|
|
|
|
def query_table_2(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query:
|
|
return Query(
|
|
text="SELECT * FROM table_2",
|
|
type="SELECT",
|
|
timestamp=timestamp,
|
|
actor=actor,
|
|
fields_accessed=[
|
|
FieldAccess("id", TABLE_2),
|
|
FieldAccess("table_1_id", TABLE_2),
|
|
FieldAccess("value", TABLE_2),
|
|
],
|
|
)
|
|
|
|
|
|
def query_tables_1_and_2(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query:
|
|
return Query(
|
|
text="SELECT t1.id, t1.name, t2.id, t2.value FROM table_1 t1 JOIN table_2 t2 ON table_1.id = table_2.table_1_id",
|
|
type="SELECT",
|
|
timestamp=timestamp,
|
|
actor=actor,
|
|
fields_accessed=[
|
|
FieldAccess("id", TABLE_1),
|
|
FieldAccess("name", TABLE_1),
|
|
FieldAccess("id", TABLE_2),
|
|
FieldAccess("value", TABLE_2),
|
|
],
|
|
)
|
|
|
|
|
|
def query_view_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query:
|
|
return Query(
|
|
text="SELECT * FROM project-1.database_1.view_1",
|
|
type="SELECT",
|
|
timestamp=timestamp,
|
|
actor=actor,
|
|
fields_accessed=[
|
|
FieldAccess("id", VIEW_1),
|
|
FieldAccess("name", VIEW_1),
|
|
FieldAccess("total", VIEW_1),
|
|
],
|
|
)
|
|
|
|
|
|
def query_view_1_and_table_1(timestamp: datetime = TS_1, actor: str = ACTOR_1) -> Query:
|
|
return Query(
|
|
text="""SELECT v.id, v.name, v.total, t.name as name1
|
|
FROM
|
|
`project-1.database_1.view_1` as v
|
|
inner join
|
|
`project-1.database_1.table_1` as t
|
|
on
|
|
v.id=t.id""",
|
|
type="SELECT",
|
|
timestamp=timestamp,
|
|
actor=actor,
|
|
fields_accessed=[
|
|
FieldAccess("id", VIEW_1),
|
|
FieldAccess("name", VIEW_1),
|
|
FieldAccess("total", VIEW_1),
|
|
FieldAccess("name", TABLE_1),
|
|
],
|
|
)
|
|
|
|
|
|
def make_usage_workunit(
|
|
table: Table,
|
|
dataset_usage_statistics: DatasetUsageStatisticsClass,
|
|
identifiers: BigQueryIdentifierBuilder,
|
|
) -> MetadataWorkUnit:
|
|
resource = BigQueryTableRef.from_string_name(TABLE_REFS[table.name])
|
|
return MetadataChangeProposalWrapper(
|
|
entityUrn=identifiers.gen_dataset_urn_from_raw_ref(resource),
|
|
aspectName=dataset_usage_statistics.get_aspect_name(),
|
|
aspect=dataset_usage_statistics,
|
|
).as_workunit()
|
|
|
|
|
|
def make_operational_workunit(
|
|
resource_urn: str, operation: OperationClass
|
|
) -> MetadataWorkUnit:
|
|
return MetadataChangeProposalWrapper(
|
|
entityUrn=resource_urn,
|
|
aspectName=operation.get_aspect_name(),
|
|
aspect=operation,
|
|
).as_workunit()
|
|
|
|
|
|
@pytest.fixture
|
|
def config() -> BigQueryV2Config:
|
|
return BigQueryV2Config(
|
|
file_backed_cache_size=1,
|
|
start_time=TS_1,
|
|
end_time=TS_2 + timedelta(minutes=1),
|
|
usage=BigQueryUsageConfig(
|
|
include_top_n_queries=True,
|
|
top_n_queries=30,
|
|
bucket_duration=BucketDuration.DAY,
|
|
include_operational_stats=False,
|
|
),
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def usage_extractor(config: BigQueryV2Config) -> BigQueryUsageExtractor:
|
|
report = BigQueryV2Report()
|
|
return BigQueryUsageExtractor(
|
|
config,
|
|
report,
|
|
schema_resolver=SchemaResolver(platform="bigquery"),
|
|
identifiers=BigQueryIdentifierBuilder(config, report),
|
|
)
|
|
|
|
|
|
def make_zero_usage_workunit(
|
|
table: Table,
|
|
time: datetime,
|
|
identifiers: BigQueryIdentifierBuilder,
|
|
bucket_duration: BucketDuration = BucketDuration.DAY,
|
|
) -> MetadataWorkUnit:
|
|
return make_usage_workunit(
|
|
table=table,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(time.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(unit=bucket_duration, multiple=1),
|
|
totalSqlQueries=0,
|
|
uniqueUserCount=0,
|
|
topSqlQueries=[],
|
|
userCounts=[],
|
|
fieldCounts=[],
|
|
),
|
|
identifiers=identifiers,
|
|
)
|
|
|
|
|
|
def compare_workunits(
|
|
output: Iterable[MetadataWorkUnit], expected: Iterable[MetadataWorkUnit]
|
|
) -> None:
|
|
assert not diff_metadata_json(
|
|
[wu.metadata.to_obj() for wu in output],
|
|
[wu.metadata.to_obj() for wu in expected],
|
|
)
|
|
|
|
|
|
def test_usage_counts_single_bucket_resource_project(
|
|
usage_extractor: BigQueryUsageExtractor,
|
|
config: BigQueryV2Config,
|
|
) -> None:
|
|
queries = [
|
|
query_table_1_a(TS_1, ACTOR_1),
|
|
query_table_1_a(TS_1, ACTOR_1),
|
|
query_table_1_a(TS_1, ACTOR_2),
|
|
query_table_1_b(TS_1, ACTOR_1),
|
|
query_table_1_b(TS_1, ACTOR_2),
|
|
]
|
|
events = generate_events(
|
|
queries,
|
|
[PROJECT_1, PROJECT_2],
|
|
TABLE_TO_PROJECT,
|
|
config=config,
|
|
proabability_of_project_mismatch=0.5,
|
|
)
|
|
|
|
workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values())
|
|
expected = [
|
|
make_usage_workunit(
|
|
table=TABLE_1,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_1.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=len(queries),
|
|
topSqlQueries=[query_table_1_a().text, query_table_1_b().text],
|
|
uniqueUserCount=2,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=3,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_2_URN,
|
|
count=2,
|
|
userEmail=ACTOR_2,
|
|
),
|
|
],
|
|
fieldCounts=[
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="name",
|
|
count=5,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="age",
|
|
count=3,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="id",
|
|
count=3,
|
|
),
|
|
],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
make_zero_usage_workunit(TABLE_2, TS_1, usage_extractor.identifiers),
|
|
make_zero_usage_workunit(VIEW_1, TS_1, usage_extractor.identifiers),
|
|
]
|
|
compare_workunits(workunits, expected)
|
|
|
|
|
|
def test_usage_counts_multiple_buckets_and_resources_view_usage(
|
|
usage_extractor: BigQueryUsageExtractor,
|
|
config: BigQueryV2Config,
|
|
) -> None:
|
|
queries = [
|
|
# TS 1
|
|
query_table_1_a(TS_1, ACTOR_1),
|
|
query_table_1_a(TS_1, ACTOR_2),
|
|
query_table_1_b(TS_1, ACTOR_1),
|
|
query_tables_1_and_2(TS_1, ACTOR_1),
|
|
query_tables_1_and_2(TS_1, ACTOR_1),
|
|
query_tables_1_and_2(TS_1, ACTOR_1),
|
|
query_view_1(TS_1, ACTOR_1),
|
|
query_view_1(TS_1, ACTOR_2),
|
|
query_view_1(TS_1, ACTOR_2),
|
|
# TS 2
|
|
query_table_1_a(TS_2, ACTOR_1),
|
|
query_table_1_a(TS_2, ACTOR_2),
|
|
query_table_1_b(TS_2, ACTOR_2),
|
|
query_tables_1_and_2(TS_2, ACTOR_2),
|
|
query_table_2(TS_2, ACTOR_2),
|
|
query_view_1(TS_2, ACTOR_1),
|
|
query_view_1_and_table_1(TS_2, ACTOR_1),
|
|
]
|
|
events = generate_events(
|
|
queries,
|
|
[PROJECT_1, PROJECT_2],
|
|
TABLE_TO_PROJECT,
|
|
config=config,
|
|
proabability_of_project_mismatch=0.5,
|
|
)
|
|
|
|
workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values())
|
|
expected = [
|
|
# TS 1
|
|
make_usage_workunit(
|
|
table=TABLE_1,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_1.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=6,
|
|
topSqlQueries=[
|
|
query_tables_1_and_2().text,
|
|
query_table_1_a().text,
|
|
query_table_1_b().text,
|
|
],
|
|
uniqueUserCount=2,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=5,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_2_URN,
|
|
count=1,
|
|
userEmail=ACTOR_2,
|
|
),
|
|
],
|
|
fieldCounts=[
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="name",
|
|
count=6,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="id",
|
|
count=5,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="age",
|
|
count=2,
|
|
),
|
|
],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
make_usage_workunit(
|
|
table=VIEW_1,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_1.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=3,
|
|
topSqlQueries=[
|
|
query_view_1().text,
|
|
],
|
|
uniqueUserCount=2,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_2_URN,
|
|
count=2,
|
|
userEmail=ACTOR_2,
|
|
),
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=1,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
],
|
|
fieldCounts=[],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
make_usage_workunit(
|
|
table=TABLE_2,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_1.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=3,
|
|
topSqlQueries=[
|
|
query_tables_1_and_2().text,
|
|
],
|
|
uniqueUserCount=1,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=3,
|
|
userEmail=ACTOR_1,
|
|
)
|
|
],
|
|
fieldCounts=[
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="id",
|
|
count=3,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="value",
|
|
count=3,
|
|
),
|
|
],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
# TS 2
|
|
make_usage_workunit(
|
|
table=TABLE_1,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_2.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=5,
|
|
topSqlQueries=[
|
|
query_table_1_a().text,
|
|
query_tables_1_and_2().text,
|
|
query_table_1_b().text,
|
|
query_view_1_and_table_1().text,
|
|
],
|
|
uniqueUserCount=2,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_2_URN,
|
|
count=3,
|
|
userEmail=ACTOR_2,
|
|
),
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=2,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
],
|
|
fieldCounts=[
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="name",
|
|
count=4,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="id",
|
|
count=3,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="age",
|
|
count=2,
|
|
),
|
|
],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
make_usage_workunit(
|
|
table=VIEW_1,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_2.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=2,
|
|
topSqlQueries=[query_view_1().text, query_view_1_and_table_1().text],
|
|
uniqueUserCount=1,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=2,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
],
|
|
fieldCounts=[],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
make_usage_workunit(
|
|
table=TABLE_2,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_2.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=2,
|
|
topSqlQueries=[query_tables_1_and_2().text, query_table_2().text],
|
|
uniqueUserCount=1,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_2_URN,
|
|
count=2,
|
|
userEmail=ACTOR_2,
|
|
)
|
|
],
|
|
fieldCounts=[
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="id",
|
|
count=2,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="value",
|
|
count=2,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="table_1_id",
|
|
count=1,
|
|
),
|
|
],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
]
|
|
compare_workunits(workunits, expected)
|
|
assert usage_extractor.report.num_view_query_events == 5
|
|
assert usage_extractor.report.num_view_query_events_failed_sql_parsing == 0
|
|
assert usage_extractor.report.num_view_query_events_failed_table_identification == 0
|
|
|
|
|
|
def test_usage_counts_multiple_buckets_and_resources_no_view_usage(
|
|
usage_extractor: BigQueryUsageExtractor,
|
|
config: BigQueryV2Config,
|
|
) -> None:
|
|
config.usage.apply_view_usage_to_tables = True
|
|
queries = [
|
|
# TS 1
|
|
query_table_1_a(TS_1, ACTOR_1),
|
|
query_table_1_a(TS_1, ACTOR_2),
|
|
query_table_1_b(TS_1, ACTOR_1),
|
|
query_tables_1_and_2(TS_1, ACTOR_1),
|
|
query_tables_1_and_2(TS_1, ACTOR_1),
|
|
query_tables_1_and_2(TS_1, ACTOR_1),
|
|
query_view_1(TS_1, ACTOR_1),
|
|
query_view_1(TS_1, ACTOR_2),
|
|
query_view_1(TS_1, ACTOR_2),
|
|
# TS 2
|
|
query_table_1_a(TS_2, ACTOR_1),
|
|
query_table_1_a(TS_2, ACTOR_2),
|
|
query_table_1_b(TS_2, ACTOR_2),
|
|
query_tables_1_and_2(TS_2, ACTOR_2),
|
|
query_table_2(TS_2, ACTOR_2),
|
|
query_view_1(TS_2, ACTOR_1),
|
|
query_view_1_and_table_1(TS_2, ACTOR_1),
|
|
]
|
|
events = generate_events(
|
|
queries,
|
|
[PROJECT_1, PROJECT_2],
|
|
TABLE_TO_PROJECT,
|
|
config=config,
|
|
proabability_of_project_mismatch=0.5,
|
|
)
|
|
|
|
workunits = usage_extractor._get_workunits_internal(events, TABLE_REFS.values())
|
|
expected = [
|
|
# TS 1
|
|
make_usage_workunit(
|
|
table=TABLE_1,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_1.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=9,
|
|
topSqlQueries=[
|
|
query_tables_1_and_2().text,
|
|
query_view_1().text,
|
|
query_table_1_a().text,
|
|
query_table_1_b().text,
|
|
],
|
|
uniqueUserCount=2,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=6,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_2_URN,
|
|
count=3,
|
|
userEmail=ACTOR_2,
|
|
),
|
|
],
|
|
fieldCounts=[
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="name",
|
|
count=9,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="id",
|
|
count=8,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="total",
|
|
count=3,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="age",
|
|
count=2,
|
|
),
|
|
],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
make_usage_workunit(
|
|
table=TABLE_2,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_1.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=6,
|
|
topSqlQueries=[query_tables_1_and_2().text, query_view_1().text],
|
|
uniqueUserCount=2,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=4,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_2_URN,
|
|
count=2,
|
|
userEmail=ACTOR_2,
|
|
),
|
|
],
|
|
fieldCounts=[
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="id",
|
|
count=6,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="name",
|
|
count=3,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="total",
|
|
count=3,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="value",
|
|
count=3,
|
|
),
|
|
],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
# TS 2
|
|
make_usage_workunit(
|
|
table=TABLE_1,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_2.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=6,
|
|
topSqlQueries=[
|
|
query_table_1_a().text,
|
|
query_tables_1_and_2().text,
|
|
query_view_1().text,
|
|
query_table_1_b().text,
|
|
query_view_1_and_table_1().text,
|
|
],
|
|
uniqueUserCount=2,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=3,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_2_URN,
|
|
count=3,
|
|
userEmail=ACTOR_2,
|
|
),
|
|
],
|
|
fieldCounts=[
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="name",
|
|
count=6,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="id",
|
|
count=5,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="age",
|
|
count=2,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="total",
|
|
count=2,
|
|
),
|
|
],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
make_usage_workunit(
|
|
table=TABLE_2,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_2.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=4,
|
|
topSqlQueries=[
|
|
query_tables_1_and_2().text,
|
|
query_view_1().text,
|
|
query_table_2().text,
|
|
query_view_1_and_table_1().text,
|
|
],
|
|
uniqueUserCount=2,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=2,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_2_URN,
|
|
count=2,
|
|
userEmail=ACTOR_2,
|
|
),
|
|
],
|
|
fieldCounts=[
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="id",
|
|
count=4,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="name",
|
|
count=2,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="total",
|
|
count=2,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="value",
|
|
count=2,
|
|
),
|
|
DatasetFieldUsageCountsClass(
|
|
fieldPath="table_1_id",
|
|
count=1,
|
|
),
|
|
],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
),
|
|
make_zero_usage_workunit(VIEW_1, TS_1, usage_extractor.identifiers),
|
|
# TS_2 not included as only 1 minute of it was ingested
|
|
]
|
|
compare_workunits(workunits, expected)
|
|
assert usage_extractor.report.num_view_query_events == 0
|
|
|
|
|
|
def test_usage_counts_no_query_event(
|
|
caplog: pytest.LogCaptureFixture,
|
|
usage_extractor: BigQueryUsageExtractor,
|
|
config: BigQueryV2Config,
|
|
) -> None:
|
|
with caplog.at_level(logging.WARNING):
|
|
ref = BigQueryTableRef(BigqueryTableIdentifier("project", "dataset", "table"))
|
|
event = AuditEvent.create(
|
|
ReadEvent(
|
|
jobName="job_name",
|
|
timestamp=TS_1,
|
|
actor_email=ACTOR_1,
|
|
resource=ref,
|
|
fieldsRead=["id", "name", "total"],
|
|
readReason="JOB",
|
|
payload=None,
|
|
)
|
|
)
|
|
workunits = usage_extractor._get_workunits_internal([event], [str(ref)])
|
|
expected = [
|
|
MetadataChangeProposalWrapper(
|
|
entityUrn=usage_extractor.identifiers.gen_dataset_urn_from_raw_ref(ref),
|
|
aspect=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_1.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=0,
|
|
uniqueUserCount=0,
|
|
topSqlQueries=[],
|
|
userCounts=[],
|
|
fieldCounts=[],
|
|
),
|
|
).as_workunit()
|
|
]
|
|
compare_workunits(workunits, expected)
|
|
assert not caplog.records
|
|
|
|
|
|
def test_usage_counts_no_columns(
|
|
caplog: pytest.LogCaptureFixture,
|
|
usage_extractor: BigQueryUsageExtractor,
|
|
config: BigQueryV2Config,
|
|
) -> None:
|
|
job_name = "job_name"
|
|
ref = BigQueryTableRef(
|
|
BigqueryTableIdentifier(PROJECT_1, DATABASE_1.name, TABLE_1.name)
|
|
)
|
|
events = [
|
|
AuditEvent.create(
|
|
ReadEvent(
|
|
jobName=job_name,
|
|
timestamp=TS_1,
|
|
actor_email=ACTOR_1,
|
|
resource=ref,
|
|
fieldsRead=[],
|
|
readReason="JOB",
|
|
payload=None,
|
|
),
|
|
),
|
|
AuditEvent.create(
|
|
QueryEvent(
|
|
job_name=job_name,
|
|
timestamp=TS_1,
|
|
actor_email=ACTOR_1,
|
|
query="SELECT * FROM table_1",
|
|
statementType="SELECT",
|
|
project_id=PROJECT_1,
|
|
destinationTable=None,
|
|
referencedTables=[ref],
|
|
referencedViews=[],
|
|
payload=None,
|
|
)
|
|
),
|
|
]
|
|
caplog.clear()
|
|
with caplog.at_level(logging.WARNING):
|
|
workunits = usage_extractor._get_workunits_internal(
|
|
events, [TABLE_REFS[TABLE_1.name]]
|
|
)
|
|
expected = [
|
|
make_usage_workunit(
|
|
table=TABLE_1,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_1.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=1,
|
|
topSqlQueries=["SELECT * FROM table_1"],
|
|
uniqueUserCount=1,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=1,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
],
|
|
fieldCounts=[],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
)
|
|
]
|
|
compare_workunits(workunits, expected)
|
|
assert not caplog.records
|
|
|
|
|
|
def test_usage_counts_no_columns_and_top_n_limit_hit(
|
|
caplog: pytest.LogCaptureFixture,
|
|
usage_extractor: BigQueryUsageExtractor,
|
|
config: BigQueryV2Config,
|
|
) -> None:
|
|
config.usage.top_n_queries = 1
|
|
|
|
job_name = "job_name"
|
|
ref = BigQueryTableRef(
|
|
BigqueryTableIdentifier(PROJECT_1, DATABASE_1.name, TABLE_1.name)
|
|
)
|
|
events = [
|
|
AuditEvent.create(
|
|
ReadEvent(
|
|
jobName=job_name,
|
|
timestamp=TS_1,
|
|
actor_email=ACTOR_1,
|
|
resource=ref,
|
|
fieldsRead=[],
|
|
readReason="JOB",
|
|
payload=None,
|
|
),
|
|
),
|
|
AuditEvent.create(
|
|
ReadEvent(
|
|
jobName="job_name_2",
|
|
timestamp=TS_1,
|
|
actor_email=ACTOR_1,
|
|
resource=ref,
|
|
fieldsRead=[],
|
|
readReason="JOB",
|
|
payload=None,
|
|
),
|
|
),
|
|
AuditEvent.create(
|
|
ReadEvent(
|
|
jobName="job_name_3",
|
|
timestamp=TS_1,
|
|
actor_email=ACTOR_1,
|
|
resource=ref,
|
|
fieldsRead=[],
|
|
readReason="JOB",
|
|
payload=None,
|
|
),
|
|
),
|
|
AuditEvent.create(
|
|
QueryEvent(
|
|
job_name=job_name,
|
|
timestamp=TS_1,
|
|
actor_email=ACTOR_1,
|
|
query="SELECT * FROM table_1",
|
|
statementType="SELECT",
|
|
project_id=PROJECT_1,
|
|
destinationTable=None,
|
|
referencedTables=[ref],
|
|
referencedViews=[],
|
|
payload=None,
|
|
)
|
|
),
|
|
AuditEvent.create(
|
|
QueryEvent(
|
|
job_name="job_name_2",
|
|
timestamp=TS_1,
|
|
actor_email=ACTOR_1,
|
|
query="SELECT * FROM table_1",
|
|
statementType="SELECT",
|
|
project_id=PROJECT_1,
|
|
destinationTable=None,
|
|
referencedTables=[ref],
|
|
referencedViews=[],
|
|
payload=None,
|
|
)
|
|
),
|
|
AuditEvent.create(
|
|
QueryEvent(
|
|
job_name="job_name_3",
|
|
timestamp=TS_1,
|
|
actor_email=ACTOR_1,
|
|
query="SELECT my_column FROM table_1",
|
|
statementType="SELECT",
|
|
project_id=PROJECT_1,
|
|
destinationTable=None,
|
|
referencedTables=[ref],
|
|
referencedViews=[],
|
|
payload=None,
|
|
)
|
|
),
|
|
]
|
|
caplog.clear()
|
|
with caplog.at_level(logging.WARNING):
|
|
workunits = usage_extractor._get_workunits_internal(
|
|
events, [TABLE_REFS[TABLE_1.name]]
|
|
)
|
|
expected = [
|
|
make_usage_workunit(
|
|
table=TABLE_1,
|
|
dataset_usage_statistics=DatasetUsageStatisticsClass(
|
|
timestampMillis=int(TS_1.timestamp() * 1000),
|
|
eventGranularity=TimeWindowSizeClass(
|
|
unit=BucketDuration.DAY, multiple=1
|
|
),
|
|
totalSqlQueries=3,
|
|
topSqlQueries=["SELECT * FROM table_1"],
|
|
uniqueUserCount=1,
|
|
userCounts=[
|
|
DatasetUserUsageCountsClass(
|
|
user=ACTOR_1_URN,
|
|
count=3,
|
|
userEmail=ACTOR_1,
|
|
),
|
|
],
|
|
fieldCounts=[],
|
|
),
|
|
identifiers=usage_extractor.identifiers,
|
|
)
|
|
]
|
|
compare_workunits(workunits, expected)
|
|
assert not caplog.records
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
@patch.object(BigQueryUsageExtractor, "_generate_usage_workunits")
|
|
def test_operational_stats(
|
|
mock: MagicMock,
|
|
usage_extractor: BigQueryUsageExtractor,
|
|
config: BigQueryV2Config,
|
|
) -> None:
|
|
mock.return_value = []
|
|
config.usage.apply_view_usage_to_tables = True
|
|
config.usage.include_operational_stats = True
|
|
seed_metadata = generate_data(
|
|
num_containers=3,
|
|
num_tables=5,
|
|
num_views=2,
|
|
time_range=timedelta(days=1),
|
|
)
|
|
all_tables = seed_metadata.tables + seed_metadata.views
|
|
|
|
num_projects = 2
|
|
projects = [f"project-{i}" for i in range(num_projects)]
|
|
table_to_project = {table.name: random.choice(projects) for table in all_tables}
|
|
table_refs = {
|
|
table.name: str(ref_from_table(table, table_to_project)) for table in all_tables
|
|
}
|
|
|
|
queries = list(
|
|
generate_queries(
|
|
seed_metadata,
|
|
num_selects=10,
|
|
num_operations=20,
|
|
num_unique_queries=10,
|
|
num_users=3,
|
|
)
|
|
)
|
|
|
|
events = generate_events(queries, projects, table_to_project, config=config)
|
|
workunits = usage_extractor._get_workunits_internal(events, table_refs.values())
|
|
expected = [
|
|
make_operational_workunit(
|
|
usage_extractor.identifiers.gen_dataset_urn_from_raw_ref(
|
|
BigQueryTableRef.from_string_name(
|
|
table_refs[query.object_modified.name]
|
|
)
|
|
),
|
|
OperationClass(
|
|
timestampMillis=int(FROZEN_TIME.timestamp() * 1000),
|
|
lastUpdatedTimestamp=int(query.timestamp.timestamp() * 1000),
|
|
actor=f"urn:li:corpuser:{query.actor.split('@')[0]}",
|
|
operationType=(
|
|
query.type
|
|
if query.type in OPERATION_STATEMENT_TYPES.values()
|
|
else "CUSTOM"
|
|
),
|
|
customOperationType=(
|
|
None
|
|
if query.type in OPERATION_STATEMENT_TYPES.values()
|
|
else query.type
|
|
),
|
|
affectedDatasets=list(
|
|
dict.fromkeys( # Preserve order
|
|
usage_extractor.identifiers.gen_dataset_urn_from_raw_ref(
|
|
BigQueryTableRef.from_string_name(
|
|
table_refs[field.table.name]
|
|
)
|
|
)
|
|
for field in query.fields_accessed
|
|
if not field.table.is_view()
|
|
)
|
|
)
|
|
+ list(
|
|
dict.fromkeys( # Preserve order
|
|
usage_extractor.identifiers.gen_dataset_urn_from_raw_ref(
|
|
BigQueryTableRef.from_string_name(table_refs[parent.name])
|
|
)
|
|
for field in query.fields_accessed
|
|
if field.table.is_view()
|
|
for parent in field.table.upstreams
|
|
)
|
|
),
|
|
),
|
|
)
|
|
for query in queries
|
|
if query.object_modified and query.type in OPERATION_STATEMENT_TYPES.values()
|
|
]
|
|
compare_workunits(
|
|
[
|
|
wu
|
|
for wu in workunits
|
|
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
|
|
and isinstance(wu.metadata.aspect, OperationClass)
|
|
],
|
|
expected,
|
|
)
|
|
|
|
|
|
def test_get_tables_from_query(usage_extractor):
|
|
assert usage_extractor.get_tables_from_query(
|
|
"SELECT * FROM project-1.database_1.view_1", default_project=PROJECT_1
|
|
) == [
|
|
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1"))
|
|
]
|
|
|
|
assert usage_extractor.get_tables_from_query(
|
|
"SELECT * FROM database_1.view_1", default_project=PROJECT_1
|
|
) == [
|
|
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1"))
|
|
]
|
|
|
|
assert sorted(
|
|
usage_extractor.get_tables_from_query(
|
|
"SELECT v.id, v.name, v.total, t.name as name1 FROM database_1.view_1 as v inner join database_1.table_1 as t on v.id=t.id",
|
|
default_project=PROJECT_1,
|
|
)
|
|
) == [
|
|
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "table_1")),
|
|
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")),
|
|
]
|
|
|
|
assert sorted(
|
|
usage_extractor.get_tables_from_query(
|
|
"CREATE TABLE database_1.new_table AS SELECT v.id, v.name, v.total, t.name as name1 FROM database_1.view_1 as v inner join database_1.table_1 as t on v.id=t.id",
|
|
default_project=PROJECT_1,
|
|
)
|
|
) == [
|
|
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "table_1")),
|
|
BigQueryTableRef(BigqueryTableIdentifier("project-1", "database_1", "view_1")),
|
|
]
|