2023-04-14 19:54:22 -04:00
import logging
2023-04-07 12:18:26 -07:00
import random
from datetime import datetime , timedelta , timezone
2023-12-05 12:33:00 -05:00
from typing import Iterable
2023-04-07 12:18:26 -07:00
from unittest . mock import MagicMock , patch
import pytest
from freezegun import freeze_time
from datahub . configuration . time_window_config import BucketDuration
from datahub . emitter . mcp import MetadataChangeProposalWrapper
from datahub . ingestion . api . workunit import MetadataWorkUnit
2023-04-14 19:54:22 -04:00
from datahub . ingestion . source . bigquery_v2 . bigquery_audit import (
AuditEvent ,
BigqueryTableIdentifier ,
BigQueryTableRef ,
QueryEvent ,
ReadEvent ,
)
2023-04-07 12:18:26 -07:00
from datahub . ingestion . source . bigquery_v2 . bigquery_config import (
BigQueryUsageConfig ,
BigQueryV2Config ,
)
from datahub . ingestion . source . bigquery_v2 . bigquery_report import BigQueryV2Report
2024-08-26 11:21:00 +05:30
from datahub . ingestion . source . bigquery_v2 . common import BigQueryIdentifierBuilder
2023-04-07 12:18:26 -07:00
from datahub . ingestion . source . bigquery_v2 . usage import (
OPERATION_STATEMENT_TYPES ,
BigQueryUsageExtractor ,
)
from datahub . metadata . schema_classes import (
DatasetFieldUsageCountsClass ,
DatasetUsageStatisticsClass ,
DatasetUserUsageCountsClass ,
OperationClass ,
TimeWindowSizeClass ,
)
2024-02-09 16:27:45 -05:00
from datahub . sql_parsing . schema_resolver import SchemaResolver
2023-07-11 10:39:47 -04:00
from datahub . testing . compare_metadata_json import diff_metadata_json
2023-10-04 10:23:13 -04:00
from tests . performance . bigquery . bigquery_events import generate_events , ref_from_table
2023-04-07 12:18:26 -07:00
from tests . performance . data_generation import generate_data , generate_queries
from tests . performance . data_model import Container , FieldAccess , Query , Table , View
PROJECT_1 = " project-1 "
PROJECT_2 = " project-2 "
ACTOR_1 , ACTOR_1_URN = " a@acryl.io " , " urn:li:corpuser:a "
ACTOR_2 , ACTOR_2_URN = " b@acryl.io " , " urn:li:corpuser:b "
DATABASE_1 = Container ( " database_1 " )
DATABASE_2 = Container ( " database_2 " )
2023-12-05 12:33:00 -05:00
TABLE_1 = Table ( " table_1 " , DATABASE_1 , columns = [ " id " , " name " , " age " ] , upstreams = [ ] )
TABLE_2 = Table (
" table_2 " , DATABASE_1 , columns = [ " id " , " table_1_id " , " value " ] , upstreams = [ ]
)
2023-04-07 12:18:26 -07:00
VIEW_1 = View (
name = " view_1 " ,
container = DATABASE_1 ,
columns = [ " id " , " name " , " total " ] ,
definition = " VIEW DEFINITION 1 " ,
2023-12-05 12:33:00 -05:00
upstreams = [ TABLE_1 , TABLE_2 ] ,
2023-04-07 12:18:26 -07:00
)
ALL_TABLES = [ TABLE_1 , TABLE_2 , VIEW_1 ]
TABLE_TO_PROJECT = {
TABLE_1 . name : PROJECT_1 ,
TABLE_2 . name : PROJECT_2 ,
VIEW_1 . name : PROJECT_1 ,
}
TABLE_REFS = {
table . name : str ( ref_from_table ( table , TABLE_TO_PROJECT ) ) for table in ALL_TABLES
}
FROZEN_TIME = datetime ( year = 2023 , month = 2 , day = 1 , tzinfo = timezone . utc )
TS_1 = datetime ( year = 2023 , month = 1 , day = 1 , tzinfo = timezone . utc )
TS_2 = datetime ( year = 2023 , month = 1 , day = 2 , tzinfo = timezone . utc )
def query_table_1_a ( timestamp : datetime = TS_1 , actor : str = ACTOR_1 ) - > Query :
return Query (
text = " SELECT * FROM table_1 " ,
type = " SELECT " ,
timestamp = timestamp ,
actor = actor ,
fields_accessed = [
FieldAccess ( " id " , TABLE_1 ) ,
FieldAccess ( " name " , TABLE_1 ) ,
FieldAccess ( " age " , TABLE_1 ) ,
] ,
)
def query_table_1_b ( timestamp : datetime = TS_1 , actor : str = ACTOR_1 ) - > Query :
return Query (
text = " SELECT name FROM table_1 " ,
type = " SELECT " ,
timestamp = timestamp ,
actor = actor ,
fields_accessed = [ FieldAccess ( " name " , TABLE_1 ) ] ,
)
def query_table_2 ( timestamp : datetime = TS_1 , actor : str = ACTOR_1 ) - > Query :
return Query (
text = " SELECT * FROM table_2 " ,
type = " SELECT " ,
timestamp = timestamp ,
actor = actor ,
fields_accessed = [
FieldAccess ( " id " , TABLE_2 ) ,
FieldAccess ( " table_1_id " , TABLE_2 ) ,
FieldAccess ( " value " , TABLE_2 ) ,
] ,
)
def query_tables_1_and_2 ( timestamp : datetime = TS_1 , actor : str = ACTOR_1 ) - > Query :
return Query (
text = " SELECT t1.id, t1.name, t2.id, t2.value FROM table_1 t1 JOIN table_2 t2 ON table_1.id = table_2.table_1_id " ,
type = " SELECT " ,
timestamp = timestamp ,
actor = actor ,
fields_accessed = [
FieldAccess ( " id " , TABLE_1 ) ,
FieldAccess ( " name " , TABLE_1 ) ,
FieldAccess ( " id " , TABLE_2 ) ,
FieldAccess ( " value " , TABLE_2 ) ,
] ,
)
def query_view_1 ( timestamp : datetime = TS_1 , actor : str = ACTOR_1 ) - > Query :
return Query (
2023-05-24 22:18:58 +05:30
text = " SELECT * FROM project-1.database_1.view_1 " ,
2023-04-07 12:18:26 -07:00
type = " SELECT " ,
timestamp = timestamp ,
actor = actor ,
fields_accessed = [
FieldAccess ( " id " , VIEW_1 ) ,
FieldAccess ( " name " , VIEW_1 ) ,
FieldAccess ( " total " , VIEW_1 ) ,
] ,
)
2023-05-24 22:18:58 +05:30
def query_view_1_and_table_1 ( timestamp : datetime = TS_1 , actor : str = ACTOR_1 ) - > Query :
return Query (
text = """ SELECT v.id, v.name, v.total, t.name as name1
FROM
` project - 1. database_1 . view_1 ` as v
inner join
` project - 1. database_1 . table_1 ` as t
on
v . id = t . id """ ,
type = " SELECT " ,
timestamp = timestamp ,
actor = actor ,
fields_accessed = [
FieldAccess ( " id " , VIEW_1 ) ,
FieldAccess ( " name " , VIEW_1 ) ,
FieldAccess ( " total " , VIEW_1 ) ,
FieldAccess ( " name " , TABLE_1 ) ,
] ,
)
2023-04-07 12:18:26 -07:00
def make_usage_workunit (
2024-08-26 11:21:00 +05:30
table : Table ,
dataset_usage_statistics : DatasetUsageStatisticsClass ,
identifiers : BigQueryIdentifierBuilder ,
2023-04-07 12:18:26 -07:00
) - > MetadataWorkUnit :
resource = BigQueryTableRef . from_string_name ( TABLE_REFS [ table . name ] )
return MetadataChangeProposalWrapper (
2024-08-26 11:21:00 +05:30
entityUrn = identifiers . gen_dataset_urn_from_raw_ref ( resource ) ,
2023-04-07 12:18:26 -07:00
aspectName = dataset_usage_statistics . get_aspect_name ( ) ,
aspect = dataset_usage_statistics ,
) . as_workunit ( )
def make_operational_workunit (
2024-08-26 11:21:00 +05:30
resource_urn : str , operation : OperationClass
2023-04-07 12:18:26 -07:00
) - > MetadataWorkUnit :
return MetadataChangeProposalWrapper (
2024-08-26 11:21:00 +05:30
entityUrn = resource_urn ,
2023-04-07 12:18:26 -07:00
aspectName = operation . get_aspect_name ( ) ,
aspect = operation ,
) . as_workunit ( )
@pytest.fixture
def config ( ) - > BigQueryV2Config :
return BigQueryV2Config (
file_backed_cache_size = 1 ,
start_time = TS_1 ,
end_time = TS_2 + timedelta ( minutes = 1 ) ,
usage = BigQueryUsageConfig (
include_top_n_queries = True ,
2024-04-09 11:54:02 +02:00
top_n_queries = 30 ,
2023-04-07 12:18:26 -07:00
bucket_duration = BucketDuration . DAY ,
include_operational_stats = False ,
) ,
)
@pytest.fixture
def usage_extractor ( config : BigQueryV2Config ) - > BigQueryUsageExtractor :
report = BigQueryV2Report ( )
2023-06-22 17:07:50 -04:00
return BigQueryUsageExtractor (
config ,
report ,
2023-12-28 15:24:26 +05:30
schema_resolver = SchemaResolver ( platform = " bigquery " ) ,
2024-08-26 11:21:00 +05:30
identifiers = BigQueryIdentifierBuilder ( config , report ) ,
2023-06-22 17:07:50 -04:00
)
def make_zero_usage_workunit (
2024-08-26 11:21:00 +05:30
table : Table ,
time : datetime ,
identifiers : BigQueryIdentifierBuilder ,
bucket_duration : BucketDuration = BucketDuration . DAY ,
2023-06-22 17:07:50 -04:00
) - > MetadataWorkUnit :
return make_usage_workunit (
table = table ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( time . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass ( unit = bucket_duration , multiple = 1 ) ,
totalSqlQueries = 0 ,
uniqueUserCount = 0 ,
topSqlQueries = [ ] ,
userCounts = [ ] ,
fieldCounts = [ ] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = identifiers ,
2023-06-22 17:07:50 -04:00
)
def compare_workunits (
output : Iterable [ MetadataWorkUnit ] , expected : Iterable [ MetadataWorkUnit ]
) - > None :
2023-07-11 10:39:47 -04:00
assert not diff_metadata_json (
2023-06-22 17:07:50 -04:00
[ wu . metadata . to_obj ( ) for wu in output ] ,
[ wu . metadata . to_obj ( ) for wu in expected ] ,
)
2023-04-07 12:18:26 -07:00
def test_usage_counts_single_bucket_resource_project (
usage_extractor : BigQueryUsageExtractor ,
config : BigQueryV2Config ,
) - > None :
queries = [
query_table_1_a ( TS_1 , ACTOR_1 ) ,
query_table_1_a ( TS_1 , ACTOR_1 ) ,
query_table_1_a ( TS_1 , ACTOR_2 ) ,
query_table_1_b ( TS_1 , ACTOR_1 ) ,
query_table_1_b ( TS_1 , ACTOR_2 ) ,
]
events = generate_events (
queries ,
[ PROJECT_1 , PROJECT_2 ] ,
TABLE_TO_PROJECT ,
config = config ,
proabability_of_project_mismatch = 0.5 ,
)
2023-06-22 17:07:50 -04:00
workunits = usage_extractor . _get_workunits_internal ( events , TABLE_REFS . values ( ) )
expected = [
2023-04-07 12:18:26 -07:00
make_usage_workunit (
table = TABLE_1 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_1 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = len ( queries ) ,
topSqlQueries = [ query_table_1_a ( ) . text , query_table_1_b ( ) . text ] ,
uniqueUserCount = 2 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
count = 3 ,
userEmail = ACTOR_1 ,
) ,
DatasetUserUsageCountsClass (
user = ACTOR_2_URN ,
count = 2 ,
userEmail = ACTOR_2 ,
) ,
] ,
fieldCounts = [
DatasetFieldUsageCountsClass (
fieldPath = " name " ,
count = 5 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " age " ,
count = 3 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " id " ,
count = 3 ,
) ,
] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-06-22 17:07:50 -04:00
) ,
2024-08-26 11:21:00 +05:30
make_zero_usage_workunit ( TABLE_2 , TS_1 , usage_extractor . identifiers ) ,
make_zero_usage_workunit ( VIEW_1 , TS_1 , usage_extractor . identifiers ) ,
2023-04-07 12:18:26 -07:00
]
2023-06-22 17:07:50 -04:00
compare_workunits ( workunits , expected )
2023-04-07 12:18:26 -07:00
2023-05-24 22:18:58 +05:30
def test_usage_counts_multiple_buckets_and_resources_view_usage (
2023-04-07 12:18:26 -07:00
usage_extractor : BigQueryUsageExtractor ,
config : BigQueryV2Config ,
) - > None :
queries = [
# TS 1
query_table_1_a ( TS_1 , ACTOR_1 ) ,
query_table_1_a ( TS_1 , ACTOR_2 ) ,
query_table_1_b ( TS_1 , ACTOR_1 ) ,
query_tables_1_and_2 ( TS_1 , ACTOR_1 ) ,
query_tables_1_and_2 ( TS_1 , ACTOR_1 ) ,
query_tables_1_and_2 ( TS_1 , ACTOR_1 ) ,
query_view_1 ( TS_1 , ACTOR_1 ) ,
query_view_1 ( TS_1 , ACTOR_2 ) ,
query_view_1 ( TS_1 , ACTOR_2 ) ,
# TS 2
query_table_1_a ( TS_2 , ACTOR_1 ) ,
query_table_1_a ( TS_2 , ACTOR_2 ) ,
query_table_1_b ( TS_2 , ACTOR_2 ) ,
query_tables_1_and_2 ( TS_2 , ACTOR_2 ) ,
query_table_2 ( TS_2 , ACTOR_2 ) ,
query_view_1 ( TS_2 , ACTOR_1 ) ,
2023-05-24 22:18:58 +05:30
query_view_1_and_table_1 ( TS_2 , ACTOR_1 ) ,
2023-04-07 12:18:26 -07:00
]
events = generate_events (
queries ,
[ PROJECT_1 , PROJECT_2 ] ,
TABLE_TO_PROJECT ,
config = config ,
proabability_of_project_mismatch = 0.5 ,
)
2023-06-22 17:07:50 -04:00
workunits = usage_extractor . _get_workunits_internal ( events , TABLE_REFS . values ( ) )
expected = [
2023-04-07 12:18:26 -07:00
# TS 1
make_usage_workunit (
table = TABLE_1 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_1 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = 6 ,
topSqlQueries = [
query_tables_1_and_2 ( ) . text ,
query_table_1_a ( ) . text ,
query_table_1_b ( ) . text ,
] ,
uniqueUserCount = 2 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
count = 5 ,
userEmail = ACTOR_1 ,
) ,
DatasetUserUsageCountsClass (
user = ACTOR_2_URN ,
count = 1 ,
userEmail = ACTOR_2 ,
) ,
] ,
fieldCounts = [
DatasetFieldUsageCountsClass (
fieldPath = " name " ,
count = 6 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " id " ,
count = 5 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " age " ,
count = 2 ,
) ,
] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-04-07 12:18:26 -07:00
) ,
make_usage_workunit (
table = VIEW_1 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_1 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = 3 ,
topSqlQueries = [
query_view_1 ( ) . text ,
] ,
uniqueUserCount = 2 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_2_URN ,
count = 2 ,
userEmail = ACTOR_2 ,
) ,
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
count = 1 ,
userEmail = ACTOR_1 ,
) ,
] ,
2023-05-24 22:18:58 +05:30
fieldCounts = [ ] ,
2023-04-07 12:18:26 -07:00
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-04-07 12:18:26 -07:00
) ,
make_usage_workunit (
table = TABLE_2 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_1 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = 3 ,
topSqlQueries = [
query_tables_1_and_2 ( ) . text ,
] ,
uniqueUserCount = 1 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
count = 3 ,
userEmail = ACTOR_1 ,
)
] ,
fieldCounts = [
DatasetFieldUsageCountsClass (
fieldPath = " id " ,
count = 3 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " value " ,
count = 3 ,
) ,
] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-04-07 12:18:26 -07:00
) ,
# TS 2
make_usage_workunit (
table = TABLE_1 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_2 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
2023-05-24 22:18:58 +05:30
totalSqlQueries = 5 ,
2023-04-07 12:18:26 -07:00
topSqlQueries = [
query_table_1_a ( ) . text ,
query_tables_1_and_2 ( ) . text ,
2023-04-14 21:09:43 -04:00
query_table_1_b ( ) . text ,
2023-05-24 22:18:58 +05:30
query_view_1_and_table_1 ( ) . text ,
2023-04-07 12:18:26 -07:00
] ,
uniqueUserCount = 2 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_2_URN ,
count = 3 ,
userEmail = ACTOR_2 ,
) ,
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
2023-05-24 22:18:58 +05:30
count = 2 ,
2023-04-07 12:18:26 -07:00
userEmail = ACTOR_1 ,
) ,
] ,
fieldCounts = [
DatasetFieldUsageCountsClass (
fieldPath = " name " ,
count = 4 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " id " ,
count = 3 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " age " ,
count = 2 ,
) ,
] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-04-07 12:18:26 -07:00
) ,
make_usage_workunit (
table = VIEW_1 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_2 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
2023-05-24 22:18:58 +05:30
totalSqlQueries = 2 ,
topSqlQueries = [ query_view_1 ( ) . text , query_view_1_and_table_1 ( ) . text ] ,
uniqueUserCount = 1 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
count = 2 ,
userEmail = ACTOR_1 ,
) ,
] ,
fieldCounts = [ ] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-05-24 22:18:58 +05:30
) ,
make_usage_workunit (
table = TABLE_2 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_2 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = 2 ,
topSqlQueries = [ query_tables_1_and_2 ( ) . text , query_table_2 ( ) . text ] ,
uniqueUserCount = 1 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_2_URN ,
count = 2 ,
userEmail = ACTOR_2 ,
)
] ,
fieldCounts = [
DatasetFieldUsageCountsClass (
fieldPath = " id " ,
count = 2 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " value " ,
count = 2 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " table_1_id " ,
count = 1 ,
) ,
] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-05-24 22:18:58 +05:30
) ,
]
2023-06-22 17:07:50 -04:00
compare_workunits ( workunits , expected )
2023-05-24 22:18:58 +05:30
assert usage_extractor . report . num_view_query_events == 5
assert usage_extractor . report . num_view_query_events_failed_sql_parsing == 0
assert usage_extractor . report . num_view_query_events_failed_table_identification == 0
def test_usage_counts_multiple_buckets_and_resources_no_view_usage (
usage_extractor : BigQueryUsageExtractor ,
config : BigQueryV2Config ,
) - > None :
config . usage . apply_view_usage_to_tables = True
queries = [
# TS 1
query_table_1_a ( TS_1 , ACTOR_1 ) ,
query_table_1_a ( TS_1 , ACTOR_2 ) ,
query_table_1_b ( TS_1 , ACTOR_1 ) ,
query_tables_1_and_2 ( TS_1 , ACTOR_1 ) ,
query_tables_1_and_2 ( TS_1 , ACTOR_1 ) ,
query_tables_1_and_2 ( TS_1 , ACTOR_1 ) ,
query_view_1 ( TS_1 , ACTOR_1 ) ,
query_view_1 ( TS_1 , ACTOR_2 ) ,
query_view_1 ( TS_1 , ACTOR_2 ) ,
# TS 2
query_table_1_a ( TS_2 , ACTOR_1 ) ,
query_table_1_a ( TS_2 , ACTOR_2 ) ,
query_table_1_b ( TS_2 , ACTOR_2 ) ,
query_tables_1_and_2 ( TS_2 , ACTOR_2 ) ,
query_table_2 ( TS_2 , ACTOR_2 ) ,
query_view_1 ( TS_2 , ACTOR_1 ) ,
query_view_1_and_table_1 ( TS_2 , ACTOR_1 ) ,
]
events = generate_events (
queries ,
[ PROJECT_1 , PROJECT_2 ] ,
TABLE_TO_PROJECT ,
config = config ,
proabability_of_project_mismatch = 0.5 ,
)
2023-06-22 17:07:50 -04:00
workunits = usage_extractor . _get_workunits_internal ( events , TABLE_REFS . values ( ) )
expected = [
2023-05-24 22:18:58 +05:30
# TS 1
make_usage_workunit (
table = TABLE_1 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_1 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = 9 ,
2023-04-07 12:18:26 -07:00
topSqlQueries = [
2023-05-24 22:18:58 +05:30
query_tables_1_and_2 ( ) . text ,
2023-04-07 12:18:26 -07:00
query_view_1 ( ) . text ,
2023-05-24 22:18:58 +05:30
query_table_1_a ( ) . text ,
query_table_1_b ( ) . text ,
2023-04-07 12:18:26 -07:00
] ,
2023-05-24 22:18:58 +05:30
uniqueUserCount = 2 ,
2023-04-07 12:18:26 -07:00
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
2023-05-24 22:18:58 +05:30
count = 6 ,
2023-04-07 12:18:26 -07:00
userEmail = ACTOR_1 ,
) ,
2023-05-24 22:18:58 +05:30
DatasetUserUsageCountsClass (
user = ACTOR_2_URN ,
count = 3 ,
userEmail = ACTOR_2 ,
) ,
2023-04-07 12:18:26 -07:00
] ,
fieldCounts = [
2023-05-24 22:18:58 +05:30
DatasetFieldUsageCountsClass (
fieldPath = " name " ,
count = 9 ,
) ,
2023-04-07 12:18:26 -07:00
DatasetFieldUsageCountsClass (
fieldPath = " id " ,
2023-05-24 22:18:58 +05:30
count = 8 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " total " ,
count = 3 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " age " ,
count = 2 ,
) ,
] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-05-24 22:18:58 +05:30
) ,
make_usage_workunit (
table = TABLE_2 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_1 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = 6 ,
topSqlQueries = [ query_tables_1_and_2 ( ) . text , query_view_1 ( ) . text ] ,
uniqueUserCount = 2 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
count = 4 ,
userEmail = ACTOR_1 ,
) ,
DatasetUserUsageCountsClass (
user = ACTOR_2_URN ,
count = 2 ,
userEmail = ACTOR_2 ,
) ,
] ,
fieldCounts = [
DatasetFieldUsageCountsClass (
fieldPath = " id " ,
count = 6 ,
2023-04-07 12:18:26 -07:00
) ,
DatasetFieldUsageCountsClass (
fieldPath = " name " ,
2023-05-24 22:18:58 +05:30
count = 3 ,
2023-04-07 12:18:26 -07:00
) ,
DatasetFieldUsageCountsClass (
fieldPath = " total " ,
2023-05-24 22:18:58 +05:30
count = 3 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " value " ,
count = 3 ,
) ,
] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-05-24 22:18:58 +05:30
) ,
# TS 2
make_usage_workunit (
table = TABLE_1 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_2 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = 6 ,
topSqlQueries = [
query_table_1_a ( ) . text ,
query_tables_1_and_2 ( ) . text ,
query_view_1 ( ) . text ,
query_table_1_b ( ) . text ,
query_view_1_and_table_1 ( ) . text ,
] ,
uniqueUserCount = 2 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
count = 3 ,
userEmail = ACTOR_1 ,
) ,
DatasetUserUsageCountsClass (
user = ACTOR_2_URN ,
count = 3 ,
userEmail = ACTOR_2 ,
) ,
] ,
fieldCounts = [
DatasetFieldUsageCountsClass (
fieldPath = " name " ,
count = 6 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " id " ,
count = 5 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " age " ,
count = 2 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " total " ,
count = 2 ,
2023-04-07 12:18:26 -07:00
) ,
] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-04-07 12:18:26 -07:00
) ,
make_usage_workunit (
table = TABLE_2 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_2 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
2023-05-24 22:18:58 +05:30
totalSqlQueries = 4 ,
topSqlQueries = [
query_tables_1_and_2 ( ) . text ,
query_view_1 ( ) . text ,
query_table_2 ( ) . text ,
query_view_1_and_table_1 ( ) . text ,
] ,
uniqueUserCount = 2 ,
2023-04-07 12:18:26 -07:00
userCounts = [
2023-05-24 22:18:58 +05:30
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
count = 2 ,
userEmail = ACTOR_1 ,
) ,
2023-04-07 12:18:26 -07:00
DatasetUserUsageCountsClass (
user = ACTOR_2_URN ,
count = 2 ,
userEmail = ACTOR_2 ,
2023-05-24 22:18:58 +05:30
) ,
2023-04-07 12:18:26 -07:00
] ,
fieldCounts = [
DatasetFieldUsageCountsClass (
fieldPath = " id " ,
2023-05-24 22:18:58 +05:30
count = 4 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " name " ,
count = 2 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " total " ,
2023-04-07 12:18:26 -07:00
count = 2 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " value " ,
count = 2 ,
) ,
DatasetFieldUsageCountsClass (
fieldPath = " table_1_id " ,
count = 1 ,
) ,
] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-04-07 12:18:26 -07:00
) ,
2024-08-26 11:21:00 +05:30
make_zero_usage_workunit ( VIEW_1 , TS_1 , usage_extractor . identifiers ) ,
2023-06-22 17:07:50 -04:00
# TS_2 not included as only 1 minute of it was ingested
2023-04-07 12:18:26 -07:00
]
2023-06-22 17:07:50 -04:00
compare_workunits ( workunits , expected )
2023-05-24 22:18:58 +05:30
assert usage_extractor . report . num_view_query_events == 0
2023-04-07 12:18:26 -07:00
2023-04-14 19:54:22 -04:00
def test_usage_counts_no_query_event (
caplog : pytest . LogCaptureFixture ,
usage_extractor : BigQueryUsageExtractor ,
config : BigQueryV2Config ,
) - > None :
with caplog . at_level ( logging . WARNING ) :
ref = BigQueryTableRef ( BigqueryTableIdentifier ( " project " , " dataset " , " table " ) )
event = AuditEvent . create (
ReadEvent (
jobName = " job_name " ,
timestamp = TS_1 ,
actor_email = ACTOR_1 ,
resource = ref ,
fieldsRead = [ " id " , " name " , " total " ] ,
readReason = " JOB " ,
payload = None ,
)
)
2023-06-22 17:07:50 -04:00
workunits = usage_extractor . _get_workunits_internal ( [ event ] , [ str ( ref ) ] )
expected = [
MetadataChangeProposalWrapper (
2024-08-26 11:21:00 +05:30
entityUrn = usage_extractor . identifiers . gen_dataset_urn_from_raw_ref ( ref ) ,
2023-06-22 17:07:50 -04:00
aspect = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_1 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = 0 ,
uniqueUserCount = 0 ,
topSqlQueries = [ ] ,
userCounts = [ ] ,
fieldCounts = [ ] ,
) ,
) . as_workunit ( )
]
compare_workunits ( workunits , expected )
2023-04-14 19:54:22 -04:00
assert not caplog . records
def test_usage_counts_no_columns (
caplog : pytest . LogCaptureFixture ,
usage_extractor : BigQueryUsageExtractor ,
config : BigQueryV2Config ,
) - > None :
job_name = " job_name "
ref = BigQueryTableRef (
BigqueryTableIdentifier ( PROJECT_1 , DATABASE_1 . name , TABLE_1 . name )
)
events = [
AuditEvent . create (
ReadEvent (
jobName = job_name ,
timestamp = TS_1 ,
actor_email = ACTOR_1 ,
resource = ref ,
fieldsRead = [ ] ,
readReason = " JOB " ,
payload = None ,
) ,
) ,
AuditEvent . create (
QueryEvent (
job_name = job_name ,
timestamp = TS_1 ,
actor_email = ACTOR_1 ,
query = " SELECT * FROM table_1 " ,
statementType = " SELECT " ,
project_id = PROJECT_1 ,
destinationTable = None ,
referencedTables = [ ref ] ,
referencedViews = [ ] ,
payload = None ,
)
) ,
]
2023-12-05 12:33:00 -05:00
caplog . clear ( )
2023-04-14 19:54:22 -04:00
with caplog . at_level ( logging . WARNING ) :
2023-06-22 17:07:50 -04:00
workunits = usage_extractor . _get_workunits_internal (
events , [ TABLE_REFS [ TABLE_1 . name ] ]
)
expected = [
2023-04-14 19:54:22 -04:00
make_usage_workunit (
table = TABLE_1 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_1 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = 1 ,
topSqlQueries = [ " SELECT * FROM table_1 " ] ,
uniqueUserCount = 1 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
count = 1 ,
userEmail = ACTOR_1 ,
) ,
] ,
fieldCounts = [ ] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2023-04-14 19:54:22 -04:00
)
]
2023-06-22 17:07:50 -04:00
compare_workunits ( workunits , expected )
2023-04-14 19:54:22 -04:00
assert not caplog . records
2024-04-09 11:54:02 +02:00
def test_usage_counts_no_columns_and_top_n_limit_hit (
caplog : pytest . LogCaptureFixture ,
usage_extractor : BigQueryUsageExtractor ,
config : BigQueryV2Config ,
) - > None :
config . usage . top_n_queries = 1
job_name = " job_name "
ref = BigQueryTableRef (
BigqueryTableIdentifier ( PROJECT_1 , DATABASE_1 . name , TABLE_1 . name )
)
events = [
AuditEvent . create (
ReadEvent (
jobName = job_name ,
timestamp = TS_1 ,
actor_email = ACTOR_1 ,
resource = ref ,
fieldsRead = [ ] ,
readReason = " JOB " ,
payload = None ,
) ,
) ,
AuditEvent . create (
ReadEvent (
jobName = " job_name_2 " ,
timestamp = TS_1 ,
actor_email = ACTOR_1 ,
resource = ref ,
fieldsRead = [ ] ,
readReason = " JOB " ,
payload = None ,
) ,
) ,
AuditEvent . create (
ReadEvent (
jobName = " job_name_3 " ,
timestamp = TS_1 ,
actor_email = ACTOR_1 ,
resource = ref ,
fieldsRead = [ ] ,
readReason = " JOB " ,
payload = None ,
) ,
) ,
AuditEvent . create (
QueryEvent (
job_name = job_name ,
timestamp = TS_1 ,
actor_email = ACTOR_1 ,
query = " SELECT * FROM table_1 " ,
statementType = " SELECT " ,
project_id = PROJECT_1 ,
destinationTable = None ,
referencedTables = [ ref ] ,
referencedViews = [ ] ,
payload = None ,
)
) ,
AuditEvent . create (
QueryEvent (
job_name = " job_name_2 " ,
timestamp = TS_1 ,
actor_email = ACTOR_1 ,
query = " SELECT * FROM table_1 " ,
statementType = " SELECT " ,
project_id = PROJECT_1 ,
destinationTable = None ,
referencedTables = [ ref ] ,
referencedViews = [ ] ,
payload = None ,
)
) ,
AuditEvent . create (
QueryEvent (
job_name = " job_name_3 " ,
timestamp = TS_1 ,
actor_email = ACTOR_1 ,
query = " SELECT my_column FROM table_1 " ,
statementType = " SELECT " ,
project_id = PROJECT_1 ,
destinationTable = None ,
referencedTables = [ ref ] ,
referencedViews = [ ] ,
payload = None ,
)
) ,
]
caplog . clear ( )
with caplog . at_level ( logging . WARNING ) :
workunits = usage_extractor . _get_workunits_internal (
events , [ TABLE_REFS [ TABLE_1 . name ] ]
)
expected = [
make_usage_workunit (
table = TABLE_1 ,
dataset_usage_statistics = DatasetUsageStatisticsClass (
timestampMillis = int ( TS_1 . timestamp ( ) * 1000 ) ,
eventGranularity = TimeWindowSizeClass (
unit = BucketDuration . DAY , multiple = 1
) ,
totalSqlQueries = 3 ,
topSqlQueries = [ " SELECT * FROM table_1 " ] ,
uniqueUserCount = 1 ,
userCounts = [
DatasetUserUsageCountsClass (
user = ACTOR_1_URN ,
count = 3 ,
userEmail = ACTOR_1 ,
) ,
] ,
fieldCounts = [ ] ,
) ,
2024-08-26 11:21:00 +05:30
identifiers = usage_extractor . identifiers ,
2024-04-09 11:54:02 +02:00
)
]
compare_workunits ( workunits , expected )
assert not caplog . records
2023-04-07 12:18:26 -07:00
@freeze_time ( FROZEN_TIME )
@patch.object ( BigQueryUsageExtractor , " _generate_usage_workunits " )
def test_operational_stats (
mock : MagicMock ,
usage_extractor : BigQueryUsageExtractor ,
config : BigQueryV2Config ,
) - > None :
mock . return_value = [ ]
2023-05-24 22:18:58 +05:30
config . usage . apply_view_usage_to_tables = True
2023-04-07 12:18:26 -07:00
config . usage . include_operational_stats = True
seed_metadata = generate_data (
num_containers = 3 ,
num_tables = 5 ,
num_views = 2 ,
time_range = timedelta ( days = 1 ) ,
)
all_tables = seed_metadata . tables + seed_metadata . views
num_projects = 2
projects = [ f " project- { i } " for i in range ( num_projects ) ]
table_to_project = { table . name : random . choice ( projects ) for table in all_tables }
table_refs = {
table . name : str ( ref_from_table ( table , table_to_project ) ) for table in all_tables
}
queries = list (
generate_queries (
seed_metadata ,
num_selects = 10 ,
num_operations = 20 ,
2023-04-14 21:09:43 -04:00
num_unique_queries = 10 ,
2023-04-07 12:18:26 -07:00
num_users = 3 ,
)
)
events = generate_events ( queries , projects , table_to_project , config = config )
2023-06-22 17:07:50 -04:00
workunits = usage_extractor . _get_workunits_internal ( events , table_refs . values ( ) )
expected = [
2023-04-07 12:18:26 -07:00
make_operational_workunit (
2024-08-26 11:21:00 +05:30
usage_extractor . identifiers . gen_dataset_urn_from_raw_ref (
BigQueryTableRef . from_string_name (
table_refs [ query . object_modified . name ]
)
) ,
2023-04-07 12:18:26 -07:00
OperationClass (
timestampMillis = int ( FROZEN_TIME . timestamp ( ) * 1000 ) ,
lastUpdatedTimestamp = int ( query . timestamp . timestamp ( ) * 1000 ) ,
actor = f " urn:li:corpuser: { query . actor . split ( ' @ ' ) [ 0 ] } " ,
2024-02-09 16:27:45 -05:00
operationType = (
query . type
if query . type in OPERATION_STATEMENT_TYPES . values ( )
else " CUSTOM "
) ,
customOperationType = (
None
if query . type in OPERATION_STATEMENT_TYPES . values ( )
else query . type
) ,
2023-04-07 12:18:26 -07:00
affectedDatasets = list (
dict . fromkeys ( # Preserve order
2024-08-26 11:21:00 +05:30
usage_extractor . identifiers . gen_dataset_urn_from_raw_ref (
BigQueryTableRef . from_string_name (
table_refs [ field . table . name ]
)
)
2023-04-07 12:18:26 -07:00
for field in query . fields_accessed
if not field . table . is_view ( )
)
2023-05-24 22:18:58 +05:30
)
+ list (
dict . fromkeys ( # Preserve order
2024-08-26 11:21:00 +05:30
usage_extractor . identifiers . gen_dataset_urn_from_raw_ref (
BigQueryTableRef . from_string_name ( table_refs [ parent . name ] )
)
2023-05-24 22:18:58 +05:30
for field in query . fields_accessed
if field . table . is_view ( )
2023-12-05 12:33:00 -05:00
for parent in field . table . upstreams
2023-05-24 22:18:58 +05:30
)
2023-04-07 12:18:26 -07:00
) ,
) ,
)
for query in queries
if query . object_modified and query . type in OPERATION_STATEMENT_TYPES . values ( )
]
2023-06-22 17:07:50 -04:00
compare_workunits (
[
wu
for wu in workunits
if isinstance ( wu . metadata , MetadataChangeProposalWrapper )
and isinstance ( wu . metadata . aspect , OperationClass )
] ,
expected ,
)
2023-05-24 22:18:58 +05:30
def test_get_tables_from_query ( usage_extractor ) :
assert usage_extractor . get_tables_from_query (
2023-12-28 15:24:26 +05:30
" SELECT * FROM project-1.database_1.view_1 " , default_project = PROJECT_1
2023-05-24 22:18:58 +05:30
) == [
BigQueryTableRef ( BigqueryTableIdentifier ( " project-1 " , " database_1 " , " view_1 " ) )
]
assert usage_extractor . get_tables_from_query (
2023-12-28 15:24:26 +05:30
" SELECT * FROM database_1.view_1 " , default_project = PROJECT_1
2023-05-24 22:18:58 +05:30
) == [
BigQueryTableRef ( BigqueryTableIdentifier ( " project-1 " , " database_1 " , " view_1 " ) )
]
assert sorted (
usage_extractor . get_tables_from_query (
" SELECT v.id, v.name, v.total, t.name as name1 FROM database_1.view_1 as v inner join database_1.table_1 as t on v.id=t.id " ,
2023-12-28 15:24:26 +05:30
default_project = PROJECT_1 ,
2023-05-24 22:18:58 +05:30
)
) == [
BigQueryTableRef ( BigqueryTableIdentifier ( " project-1 " , " database_1 " , " table_1 " ) ) ,
BigQueryTableRef ( BigqueryTableIdentifier ( " project-1 " , " database_1 " , " view_1 " ) ) ,
]
assert sorted (
usage_extractor . get_tables_from_query (
" CREATE TABLE database_1.new_table AS SELECT v.id, v.name, v.total, t.name as name1 FROM database_1.view_1 as v inner join database_1.table_1 as t on v.id=t.id " ,
2023-12-28 15:24:26 +05:30
default_project = PROJECT_1 ,
2023-05-24 22:18:58 +05:30
)
) == [
BigQueryTableRef ( BigqueryTableIdentifier ( " project-1 " , " database_1 " , " table_1 " ) ) ,
BigQueryTableRef ( BigqueryTableIdentifier ( " project-1 " , " database_1 " , " view_1 " ) ) ,
]