refactor(profiler): add PerfTimer class and fix typos (#3497)

This commit is contained in:
Harshal Sheth 2021-11-02 18:04:52 -07:00 committed by GitHub
parent 61ece39cc1
commit 6d36b0a00c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 91 additions and 46 deletions

View File

@ -5,7 +5,6 @@ import itertools
import logging import logging
import os import os
import threading import threading
import time
import unittest.mock import unittest.mock
import uuid import uuid
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
@ -40,6 +39,7 @@ from datahub.metadata.schema_classes import (
ValueFrequencyClass, ValueFrequencyClass,
) )
from datahub.utilities.groupby import groupby_unsorted from datahub.utilities.groupby import groupby_unsorted
from datahub.utilities.perf_timer import PerfTimer
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
@ -53,15 +53,15 @@ logger: logging.Logger = logging.getLogger(__name__)
# engine parameter when the SqlAlchemyDatasource is actually set up, and then remove # engine parameter when the SqlAlchemyDatasource is actually set up, and then remove
# it from the cached config object to avoid those same copying mechanisms. # it from the cached config object to avoid those same copying mechanisms.
# #
# We need to wrap this mechanism with a lock, since having multiple methods # We need to wrap this mechanism with a lock, since having multiple threads
# simultaneously patching the same methods will cause concurrency issues. # simultaneously patching the same methods will cause concurrency issues.
_properly_init_datasource_lock = threading.Lock() _datasource_connection_injection_lock = threading.Lock()
@contextlib.contextmanager @contextlib.contextmanager
def _properly_init_datasource(conn): def _inject_connection_into_datasource(conn: Connection) -> Iterator[None]:
with _properly_init_datasource_lock: with _datasource_connection_injection_lock:
underlying_datasource_init = SqlAlchemyDatasource.__init__ underlying_datasource_init = SqlAlchemyDatasource.__init__
def sqlalchemy_datasource_init( def sqlalchemy_datasource_init(
@ -90,8 +90,11 @@ class GEProfilingConfig(ConfigModel):
enabled: bool = False enabled: bool = False
limit: Optional[int] = None limit: Optional[int] = None
offset: Optional[int] = None offset: Optional[int] = None
# These settings will override the ones below.
turn_off_expensive_profiling_metrics: bool = False turn_off_expensive_profiling_metrics: bool = False
profile_table_level_only: bool = False profile_table_level_only: bool = False
include_field_null_count: bool = True include_field_null_count: bool = True
include_field_min_value: bool = True include_field_min_value: bool = True
include_field_max_value: bool = True include_field_max_value: bool = True
@ -102,6 +105,7 @@ class GEProfilingConfig(ConfigModel):
include_field_distinct_value_frequencies: bool = True include_field_distinct_value_frequencies: bool = True
include_field_histogram: bool = True include_field_histogram: bool = True
include_field_sample_values: bool = True include_field_sample_values: bool = True
allow_deny_patterns: AllowDenyPattern = AllowDenyPattern.allow_all() allow_deny_patterns: AllowDenyPattern = AllowDenyPattern.allow_all()
max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = None max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = None
@ -131,7 +135,7 @@ class GEProfilingConfig(ConfigModel):
"include_field_histogram", "include_field_histogram",
"include_field_sample_values", "include_field_sample_values",
] ]
# Supress all field-level metrics # Suppress all field-level metrics
for field_level_metric in all_field_level_metrics: for field_level_metric in all_field_level_metrics:
values[field_level_metric] = False values[field_level_metric] = False
assert ( assert (
@ -149,7 +153,7 @@ class GEProfilingConfig(ConfigModel):
for expensive_field_metric in expensive_field_level_metrics: for expensive_field_metric in expensive_field_level_metrics:
values[expensive_field_metric] = False values[expensive_field_metric] = False
if max_num_fields_to_profile is None: if max_num_fields_to_profile is None:
# We currently profile upto 10 non-filtered columns in this mode by default. # We currently profile up to 10 non-filtered columns in this mode by default.
values[max_num_fields_to_profile_key] = 10 values[max_num_fields_to_profile_key] = 10
return values return values
@ -320,7 +324,7 @@ class DatahubGEProfiler:
"url": conn.engine.url, "url": conn.engine.url,
}, },
) )
with _properly_init_datasource(conn): with _inject_connection_into_datasource(conn):
# Using the add_datasource method ensures that the datasource is added to # Using the add_datasource method ensures that the datasource is added to
# GE-internal cache, which avoids problems when calling GE methods later on. # GE-internal cache, which avoids problems when calling GE methods later on.
assert data_context.add_datasource( assert data_context.add_datasource(
@ -335,30 +339,28 @@ class DatahubGEProfiler:
def generate_profiles( def generate_profiles(
self, requests: List[GEProfilerRequest], max_workers: int self, requests: List[GEProfilerRequest], max_workers: int
) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]: ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
start_time = time.perf_counter() with PerfTimer() as timer:
max_workers = min(max_workers, len(requests))
logger.info(
f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while"
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as async_executor:
async_profiles = [
async_executor.submit(
self.generate_profile_from_request,
request,
)
for request in requests
]
max_workers = min(max_workers, len(requests)) for async_profile in concurrent.futures.as_completed(async_profiles):
logger.info( yield async_profile.result()
f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while"
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as async_executor:
async_profiles = [
async_executor.submit(
self.generate_profile_from_request,
request,
)
for request in requests
]
for async_profile in concurrent.futures.as_completed(async_profiles): logger.info(
yield async_profile.result() f"Profiling {len(requests)} table(s) finished in {(timer.elapsed_seconds()):.3f} seconds"
)
end_time = time.perf_counter()
logger.info(
f"Profiling {len(requests)} table(s) finished in {(end_time - start_time):.3f} seconds"
)
def generate_profile_from_request( def generate_profile_from_request(
self, request: GEProfilerRequest self, request: GEProfilerRequest
@ -406,23 +408,22 @@ class DatahubGEProfiler:
pretty_name: str, pretty_name: str,
) -> ExpectationSuiteValidationResult: ) -> ExpectationSuiteValidationResult:
try: try:
start_time = time.perf_counter() with PerfTimer() as timer:
profile_results = ge_context.data_context.profile_data_asset( profile_results = ge_context.data_context.profile_data_asset(
ge_context.datasource_name, ge_context.datasource_name,
profiler=DatahubConfigurableProfiler, profiler=DatahubConfigurableProfiler,
profiler_configuration={ profiler_configuration={
"config": self.config, "config": self.config,
"dataset_name": pretty_name, "dataset_name": pretty_name,
"report": self.report, "report": self.report,
}, },
batch_kwargs={ batch_kwargs={
"datasource": ge_context.datasource_name, "datasource": ge_context.datasource_name,
**batch_kwargs, **batch_kwargs,
}, },
) )
end_time = time.perf_counter()
logger.info( logger.info(
f"Profiling for {pretty_name} took {(end_time - start_time):.3f} seconds." f"Profiling for {pretty_name} took {(timer.elapsed_seconds()):.3f} seconds."
) )
assert profile_results["success"] assert profile_results["success"]

View File

@ -0,0 +1,44 @@
import time
from contextlib import AbstractContextManager
from typing import Any, Optional
class PerfTimer(AbstractContextManager):
"""
A context manager that gives easy access to elapsed time for performance measurement.
"""
start_time: Optional[float] = None
end_time: Optional[float] = None
def start(self) -> None:
self.start_time = time.perf_counter()
self.end_time = None
def finish(self) -> None:
assert self.start_time is not None
self.end_time = time.perf_counter()
def __enter__(self) -> "PerfTimer":
self.start()
return self
def __exit__(
self,
exc_type: Any,
exc: Any,
traceback: Any,
) -> Optional[bool]:
self.finish()
return None
def elapsed_seconds(self) -> float:
"""
Returns the elapsed time in seconds.
"""
assert self.start_time is not None
if self.end_time is None:
return time.perf_counter() - self.start_time
else:
return self.end_time - self.start_time