diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 2441873fd6..8807cff6fe 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -5,7 +5,6 @@ import itertools import logging import os import threading -import time import unittest.mock import uuid from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union @@ -40,6 +39,7 @@ from datahub.metadata.schema_classes import ( ValueFrequencyClass, ) from datahub.utilities.groupby import groupby_unsorted +from datahub.utilities.perf_timer import PerfTimer logger: logging.Logger = logging.getLogger(__name__) @@ -53,15 +53,15 @@ logger: logging.Logger = logging.getLogger(__name__) # engine parameter when the SqlAlchemyDatasource is actually set up, and then remove # it from the cached config object to avoid those same copying mechanisms. # -# We need to wrap this mechanism with a lock, since having multiple methods +# We need to wrap this mechanism with a lock, since having multiple threads # simultaneously patching the same methods will cause concurrency issues. -_properly_init_datasource_lock = threading.Lock() +_datasource_connection_injection_lock = threading.Lock() @contextlib.contextmanager -def _properly_init_datasource(conn): - with _properly_init_datasource_lock: +def _inject_connection_into_datasource(conn: Connection) -> Iterator[None]: + with _datasource_connection_injection_lock: underlying_datasource_init = SqlAlchemyDatasource.__init__ def sqlalchemy_datasource_init( @@ -90,8 +90,11 @@ class GEProfilingConfig(ConfigModel): enabled: bool = False limit: Optional[int] = None offset: Optional[int] = None + + # These settings will override the ones below. turn_off_expensive_profiling_metrics: bool = False profile_table_level_only: bool = False + include_field_null_count: bool = True include_field_min_value: bool = True include_field_max_value: bool = True @@ -102,6 +105,7 @@ class GEProfilingConfig(ConfigModel): include_field_distinct_value_frequencies: bool = True include_field_histogram: bool = True include_field_sample_values: bool = True + allow_deny_patterns: AllowDenyPattern = AllowDenyPattern.allow_all() max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = None @@ -131,7 +135,7 @@ class GEProfilingConfig(ConfigModel): "include_field_histogram", "include_field_sample_values", ] - # Supress all field-level metrics + # Suppress all field-level metrics for field_level_metric in all_field_level_metrics: values[field_level_metric] = False assert ( @@ -149,7 +153,7 @@ class GEProfilingConfig(ConfigModel): for expensive_field_metric in expensive_field_level_metrics: values[expensive_field_metric] = False if max_num_fields_to_profile is None: - # We currently profile upto 10 non-filtered columns in this mode by default. + # We currently profile up to 10 non-filtered columns in this mode by default. values[max_num_fields_to_profile_key] = 10 return values @@ -320,7 +324,7 @@ class DatahubGEProfiler: "url": conn.engine.url, }, ) - with _properly_init_datasource(conn): + with _inject_connection_into_datasource(conn): # Using the add_datasource method ensures that the datasource is added to # GE-internal cache, which avoids problems when calling GE methods later on. assert data_context.add_datasource( @@ -335,30 +339,28 @@ class DatahubGEProfiler: def generate_profiles( self, requests: List[GEProfilerRequest], max_workers: int ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]: - start_time = time.perf_counter() + with PerfTimer() as timer: + max_workers = min(max_workers, len(requests)) + logger.info( + f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while" + ) + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) as async_executor: + async_profiles = [ + async_executor.submit( + self.generate_profile_from_request, + request, + ) + for request in requests + ] - max_workers = min(max_workers, len(requests)) - logger.info( - f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while" - ) - with concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers - ) as async_executor: - async_profiles = [ - async_executor.submit( - self.generate_profile_from_request, - request, - ) - for request in requests - ] + for async_profile in concurrent.futures.as_completed(async_profiles): + yield async_profile.result() - for async_profile in concurrent.futures.as_completed(async_profiles): - yield async_profile.result() - - end_time = time.perf_counter() - logger.info( - f"Profiling {len(requests)} table(s) finished in {(end_time - start_time):.3f} seconds" - ) + logger.info( + f"Profiling {len(requests)} table(s) finished in {(timer.elapsed_seconds()):.3f} seconds" + ) def generate_profile_from_request( self, request: GEProfilerRequest @@ -406,23 +408,22 @@ class DatahubGEProfiler: pretty_name: str, ) -> ExpectationSuiteValidationResult: try: - start_time = time.perf_counter() - profile_results = ge_context.data_context.profile_data_asset( - ge_context.datasource_name, - profiler=DatahubConfigurableProfiler, - profiler_configuration={ - "config": self.config, - "dataset_name": pretty_name, - "report": self.report, - }, - batch_kwargs={ - "datasource": ge_context.datasource_name, - **batch_kwargs, - }, - ) - end_time = time.perf_counter() + with PerfTimer() as timer: + profile_results = ge_context.data_context.profile_data_asset( + ge_context.datasource_name, + profiler=DatahubConfigurableProfiler, + profiler_configuration={ + "config": self.config, + "dataset_name": pretty_name, + "report": self.report, + }, + batch_kwargs={ + "datasource": ge_context.datasource_name, + **batch_kwargs, + }, + ) logger.info( - f"Profiling for {pretty_name} took {(end_time - start_time):.3f} seconds." + f"Profiling for {pretty_name} took {(timer.elapsed_seconds()):.3f} seconds." ) assert profile_results["success"] diff --git a/metadata-ingestion/src/datahub/utilities/perf_timer.py b/metadata-ingestion/src/datahub/utilities/perf_timer.py new file mode 100644 index 0000000000..3fac1d68c3 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/perf_timer.py @@ -0,0 +1,44 @@ +import time +from contextlib import AbstractContextManager +from typing import Any, Optional + + +class PerfTimer(AbstractContextManager): + """ + A context manager that gives easy access to elapsed time for performance measurement. + """ + + start_time: Optional[float] = None + end_time: Optional[float] = None + + def start(self) -> None: + self.start_time = time.perf_counter() + self.end_time = None + + def finish(self) -> None: + assert self.start_time is not None + self.end_time = time.perf_counter() + + def __enter__(self) -> "PerfTimer": + self.start() + return self + + def __exit__( + self, + exc_type: Any, + exc: Any, + traceback: Any, + ) -> Optional[bool]: + self.finish() + return None + + def elapsed_seconds(self) -> float: + """ + Returns the elapsed time in seconds. + """ + + assert self.start_time is not None + if self.end_time is None: + return time.perf_counter() - self.start_time + else: + return self.end_time - self.start_time