refactor(operation_config): change logging on is_profiling_enabled (#12416)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
cccs-cat001 2025-01-30 00:56:35 -05:00 committed by GitHub
parent 469e56fab1
commit bc09b28a64
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 16 additions and 15 deletions

View File

@ -251,6 +251,7 @@ iceberg_common = {
# Iceberg Python SDK
# Kept at 0.4.0 due to higher versions requiring pydantic>2, as soon as we are fine with it, bump this dependency
"pyiceberg>=0.4.0",
*cachetools_lib,
}
mssql_common = {
@ -407,13 +408,14 @@ plugins: Dict[str, Set[str]] = {
# UnsupportedProductError
# https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/release-notes.html#rn-7-14-0
# https://github.com/elastic/elasticsearch-py/issues/1639#issuecomment-883587433
"elasticsearch": {"elasticsearch==7.13.4"},
"elasticsearch": {"elasticsearch==7.13.4", *cachetools_lib},
"cassandra": {
"cassandra-driver>=3.28.0",
# We were seeing an error like this `numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject`
# with numpy 2.0. This likely indicates a mismatch between scikit-learn and numpy versions.
# https://stackoverflow.com/questions/40845304/runtimewarning-numpy-dtype-size-changed-may-indicate-binary-incompatibility
"numpy<2",
*cachetools_lib,
},
"feast": {
"feast>=0.34.0,<1",
@ -425,7 +427,7 @@ plugins: Dict[str, Set[str]] = {
"numpy<2",
},
"grafana": {"requests"},
"glue": aws_common,
"glue": aws_common | cachetools_lib,
# hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported
"hana": sql_common
| {
@ -482,11 +484,11 @@ plugins: Dict[str, Set[str]] = {
| classification_lib
| {"db-dtypes"} # Pandas extension data types
| cachetools_lib,
"s3": {*s3_base, *data_lake_profiling},
"s3": {*s3_base, *data_lake_profiling, *cachetools_lib},
"gcs": {*s3_base, *data_lake_profiling},
"abs": {*abs_base, *data_lake_profiling},
"abs": {*abs_base, *data_lake_profiling, *cachetools_lib},
"sagemaker": aws_common,
"salesforce": {"simple-salesforce"},
"salesforce": {"simple-salesforce", *cachetools_lib},
"snowflake": snowflake_common | usage_common | sqlglot_lib,
"snowflake-summary": snowflake_common | usage_common | sqlglot_lib,
"snowflake-queries": snowflake_common | usage_common | sqlglot_lib,

View File

@ -2,8 +2,6 @@ import logging
from abc import abstractmethod
from typing import Any, Dict, Optional
import cachetools
import cachetools.keys
import pydantic
from pydantic import Field
from sqlalchemy.engine import URL
@ -29,7 +27,6 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.operation_config import is_profiling_enabled
from datahub.utilities.cachetools_keys import self_methodkey
logger: logging.Logger = logging.getLogger(__name__)
@ -118,13 +115,6 @@ class SQLCommonConfig(
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
# TRICKY: The operation_config is time-dependent. Because we don't want to change
# whether or not we're running profiling mid-ingestion, we cache the result of this method.
# TODO: This decorator should be moved to the is_profiling_enabled(operation_config) method.
@cachetools.cached(
cache=cachetools.LRUCache(maxsize=1),
key=self_methodkey,
)
def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config

View File

@ -2,10 +2,12 @@ import datetime
import logging
from typing import Any, Dict, Optional
import cachetools
import pydantic
from pydantic.fields import Field
from datahub.configuration.common import ConfigModel
from datahub.utilities.cachetools_keys import self_methodkey
logger = logging.getLogger(__name__)
@ -62,6 +64,13 @@ class OperationConfig(ConfigModel):
return profile_date_of_month
# TRICKY: The operation_config is time-dependent. Because we don't want to change
# whether or not we're running profiling mid-ingestion, we cache the result of this method.
# An additional benefit is that we only print the log lines on the first call.
@cachetools.cached(
cache=cachetools.LRUCache(maxsize=1),
key=self_methodkey,
)
def is_profiling_enabled(operation_config: OperationConfig) -> bool:
if operation_config.lower_freq_profile_enabled is False:
return True