73 lines
2.3 KiB
Python
Raw Permalink Normal View History

import logging
import os
from unittest.mock import patch
import humanfriendly
import psutil
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig
from datahub.ingestion.source.unity.source import UnityCatalogSource
from datahub.utilities.perf_timer import PerfTimer
from tests.performance.data_generation import (
NormalDistribution,
generate_data,
generate_queries,
)
from tests.performance.databricks.unity_proxy_mock import UnityCatalogApiProxyMock
from tests.performance.helpers import workunit_sink
def run_test():
seed_metadata = generate_data(
num_containers=[1, 100, 5000],
num_tables=50000,
num_views=10000,
columns_per_table=NormalDistribution(100, 50),
parents_per_view=NormalDistribution(5, 5),
view_definition_length=NormalDistribution(1000, 300),
)
queries = generate_queries(
seed_metadata,
num_selects=100000,
num_operations=100000,
num_unique_queries=10000,
num_users=1000,
)
proxy_mock = UnityCatalogApiProxyMock(
seed_metadata, queries=queries, num_service_principals=10000
)
print("Data generated")
config = UnityCatalogSourceConfig(
token="",
workspace_url="http://localhost:1234",
include_usage_statistics=True,
include_hive_metastore=False,
)
ctx = PipelineContext(run_id="test")
with patch(
"datahub.ingestion.source.unity.source.UnityCatalogApiProxy",
lambda *args, **kwargs: proxy_mock,
):
source: UnityCatalogSource = UnityCatalogSource(ctx, config)
pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss
print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}")
with PerfTimer() as timer:
workunits = source.get_workunits()
num_workunits, peak_memory_usage = workunit_sink(workunits)
print(f"Workunits Generated: {num_workunits}")
print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds")
print(
f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}"
)
print(source.report.as_string())
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
run_test()