mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 15:50:14 +00:00
68 lines
2.4 KiB
Python
68 lines
2.4 KiB
Python
import functools
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from unittest import mock
|
|
|
|
import humanfriendly
|
|
import psutil
|
|
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
|
|
from datahub.ingestion.source.snowflake.snowflake_v2 import SnowflakeV2Source
|
|
from datahub.utilities.perf_timer import PerfTimer
|
|
from tests.integration.snowflake.common import default_query_results
|
|
from tests.performance.helpers import workunit_sink
|
|
|
|
|
|
def run_test():
|
|
with mock.patch("snowflake.connector.connect") as mock_connect:
|
|
sf_connection = mock.MagicMock()
|
|
sf_cursor = mock.MagicMock()
|
|
mock_connect.return_value = sf_connection
|
|
sf_connection.cursor.return_value = sf_cursor
|
|
|
|
sf_cursor.execute.side_effect = functools.partial(
|
|
default_query_results,
|
|
num_tables=30000,
|
|
num_views=10000,
|
|
num_cols=30,
|
|
num_ops=30,
|
|
num_usages=500,
|
|
)
|
|
|
|
config = SnowflakeV2Config(
|
|
account_id="ABC12345.ap-south-1.aws",
|
|
username="TST_USR",
|
|
password="TST_PWD",
|
|
include_technical_schema=False,
|
|
include_table_lineage=True,
|
|
include_usage_stats=True,
|
|
include_operational_stats=True,
|
|
start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(tzinfo=timezone.utc),
|
|
end_time=datetime(2022, 6, 7, 7, 17, 0, 0).replace(tzinfo=timezone.utc),
|
|
format_sql_queries=True,
|
|
)
|
|
ctx = PipelineContext(run_id="test")
|
|
source = SnowflakeV2Source(ctx, config)
|
|
|
|
pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss
|
|
logging.info(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}")
|
|
|
|
with PerfTimer() as timer:
|
|
workunits = source.get_workunits()
|
|
num_workunits, peak_memory_usage = workunit_sink(workunits)
|
|
logging.info(f"Workunits Generated: {num_workunits}")
|
|
logging.info(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds")
|
|
logging.info(source.get_report().as_string())
|
|
|
|
logging.info(
|
|
f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}"
|
|
)
|
|
logging.info(source.report.aspects)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO)
|
|
run_test()
|