73 lines
2.3 KiB
Python

import logging
import os
from unittest.mock import patch
import humanfriendly
import psutil
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig
from datahub.ingestion.source.unity.source import UnityCatalogSource
from datahub.utilities.perf_timer import PerfTimer
from tests.performance.data_generation import (
NormalDistribution,
generate_data,
generate_queries,
)
from tests.performance.databricks.unity_proxy_mock import UnityCatalogApiProxyMock
from tests.performance.helpers import workunit_sink
def run_test():
seed_metadata = generate_data(
num_containers=[1, 100, 5000],
num_tables=50000,
num_views=10000,
columns_per_table=NormalDistribution(100, 50),
parents_per_view=NormalDistribution(5, 5),
view_definition_length=NormalDistribution(1000, 300),
)
queries = generate_queries(
seed_metadata,
num_selects=100000,
num_operations=100000,
num_unique_queries=10000,
num_users=1000,
)
proxy_mock = UnityCatalogApiProxyMock(
seed_metadata, queries=queries, num_service_principals=10000
)
print("Data generated")
config = UnityCatalogSourceConfig(
token="",
workspace_url="http://localhost:1234",
include_usage_statistics=True,
include_hive_metastore=False,
)
ctx = PipelineContext(run_id="test")
with patch(
"datahub.ingestion.source.unity.source.UnityCatalogApiProxy",
lambda *args, **kwargs: proxy_mock,
):
source: UnityCatalogSource = UnityCatalogSource(ctx, config)
pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss
print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}")
with PerfTimer() as timer:
workunits = source.get_workunits()
num_workunits, peak_memory_usage = workunit_sink(workunits)
print(f"Workunits Generated: {num_workunits}")
print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds")
print(
f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}"
)
print(source.report.as_string())
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
run_test()