2023-03-29 17:13:43 -04:00
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import random
|
|
|
|
from datetime import timedelta
|
|
|
|
|
|
|
|
import humanfriendly
|
|
|
|
import psutil
|
|
|
|
|
|
|
|
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
|
|
|
|
BigQueryUsageConfig,
|
|
|
|
BigQueryV2Config,
|
|
|
|
)
|
2023-04-14 21:09:43 -04:00
|
|
|
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
|
2024-08-26 11:21:00 +05:30
|
|
|
from datahub.ingestion.source.bigquery_v2.common import BigQueryIdentifierBuilder
|
2023-03-29 17:13:43 -04:00
|
|
|
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
|
2024-02-09 16:27:45 -05:00
|
|
|
from datahub.sql_parsing.schema_resolver import SchemaResolver
|
2023-03-29 17:13:43 -04:00
|
|
|
from datahub.utilities.perf_timer import PerfTimer
|
2023-10-04 10:23:13 -04:00
|
|
|
from tests.performance.bigquery.bigquery_events import generate_events, ref_from_table
|
2023-04-14 21:09:43 -04:00
|
|
|
from tests.performance.data_generation import (
|
|
|
|
NormalDistribution,
|
|
|
|
generate_data,
|
|
|
|
generate_queries,
|
|
|
|
)
|
2023-10-04 10:23:13 -04:00
|
|
|
from tests.performance.helpers import workunit_sink
|
2023-03-29 17:13:43 -04:00
|
|
|
|
|
|
|
|
2023-04-14 21:09:43 -04:00
|
|
|
def run_test():
|
2023-03-29 17:13:43 -04:00
|
|
|
report = BigQueryV2Report()
|
2023-04-14 21:09:43 -04:00
|
|
|
report.set_ingestion_stage("All", "Seed Data Generation")
|
2023-03-29 17:13:43 -04:00
|
|
|
seed_metadata = generate_data(
|
2023-04-14 21:09:43 -04:00
|
|
|
num_containers=2000,
|
|
|
|
num_tables=20000,
|
|
|
|
num_views=2000,
|
|
|
|
time_range=timedelta(days=7),
|
2023-03-29 17:13:43 -04:00
|
|
|
)
|
2023-10-04 10:23:13 -04:00
|
|
|
all_tables = seed_metadata.all_tables
|
2023-03-29 17:13:43 -04:00
|
|
|
|
|
|
|
config = BigQueryV2Config(
|
|
|
|
start_time=seed_metadata.start_time,
|
|
|
|
end_time=seed_metadata.end_time,
|
2023-05-24 22:18:58 +05:30
|
|
|
usage=BigQueryUsageConfig(
|
|
|
|
include_top_n_queries=True,
|
|
|
|
top_n_queries=10,
|
|
|
|
apply_view_usage_to_tables=True,
|
|
|
|
),
|
2023-04-14 21:09:43 -04:00
|
|
|
file_backed_cache_size=1000,
|
2023-03-29 17:13:43 -04:00
|
|
|
)
|
2023-06-22 17:07:50 -04:00
|
|
|
usage_extractor = BigQueryUsageExtractor(
|
|
|
|
config,
|
|
|
|
report,
|
2023-12-28 15:24:26 +05:30
|
|
|
schema_resolver=SchemaResolver(platform="bigquery"),
|
2024-08-26 11:21:00 +05:30
|
|
|
identifiers=BigQueryIdentifierBuilder(config, report),
|
2023-06-22 17:07:50 -04:00
|
|
|
)
|
2023-04-14 21:09:43 -04:00
|
|
|
report.set_ingestion_stage("All", "Event Generation")
|
2023-03-29 17:13:43 -04:00
|
|
|
|
2023-04-14 21:09:43 -04:00
|
|
|
num_projects = 100
|
2023-03-29 17:13:43 -04:00
|
|
|
projects = [f"project-{i}" for i in range(num_projects)]
|
|
|
|
table_to_project = {table.name: random.choice(projects) for table in all_tables}
|
|
|
|
table_refs = {str(ref_from_table(table, table_to_project)) for table in all_tables}
|
|
|
|
|
2023-04-14 21:09:43 -04:00
|
|
|
queries = list(
|
|
|
|
generate_queries(
|
|
|
|
seed_metadata,
|
|
|
|
num_selects=240_000,
|
|
|
|
num_operations=800_000,
|
|
|
|
num_unique_queries=50_000,
|
|
|
|
num_users=2000,
|
|
|
|
query_length=NormalDistribution(2000, 500),
|
|
|
|
)
|
2023-03-29 17:13:43 -04:00
|
|
|
)
|
2023-04-14 21:09:43 -04:00
|
|
|
queries.sort(key=lambda q: q.timestamp)
|
|
|
|
events = list(generate_events(queries, projects, table_to_project, config=config))
|
2023-03-29 17:13:43 -04:00
|
|
|
print(f"Events generated: {len(events)}")
|
2023-04-14 21:09:43 -04:00
|
|
|
pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss
|
|
|
|
print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}")
|
2023-03-29 17:13:43 -04:00
|
|
|
|
2023-04-14 21:09:43 -04:00
|
|
|
report.set_ingestion_stage("All", "Event Ingestion")
|
2023-03-29 17:13:43 -04:00
|
|
|
with PerfTimer() as timer:
|
2023-06-22 17:07:50 -04:00
|
|
|
workunits = usage_extractor._get_workunits_internal(events, table_refs)
|
2023-04-14 21:09:43 -04:00
|
|
|
num_workunits, peak_memory_usage = workunit_sink(workunits)
|
|
|
|
report.set_ingestion_stage("All", "Done")
|
2023-03-29 17:13:43 -04:00
|
|
|
print(f"Workunits Generated: {num_workunits}")
|
|
|
|
print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds")
|
|
|
|
|
|
|
|
print(
|
2023-04-14 21:09:43 -04:00
|
|
|
f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}"
|
2023-03-29 17:13:43 -04:00
|
|
|
)
|
2023-12-28 15:24:26 +05:30
|
|
|
print(f"Disk Used: {report.processing_perf.usage_state_size}")
|
2023-04-14 21:09:43 -04:00
|
|
|
print(f"Hash collisions: {report.num_usage_query_hash_collisions}")
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2024-05-07 21:12:39 +05:30
|
|
|
logging.basicConfig(level=logging.INFO)
|
2023-04-14 21:09:43 -04:00
|
|
|
run_test()
|