fix(ingest): avoid requiring sqlalchemy for dynamodb classification (#10213)

This commit is contained in:
Harshal Sheth 2024-04-08 15:13:25 -07:00 committed by GitHub
parent 6c66e955ba
commit 29bf0e96c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 7 additions and 8 deletions

View File

@ -15,7 +15,6 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.glossary.classifier import ClassificationConfig, Classifier from datahub.ingestion.glossary.classifier import ClassificationConfig, Classifier
from datahub.ingestion.glossary.classifier_registry import classifier_registry from datahub.ingestion.glossary.classifier_registry import classifier_registry
from datahub.ingestion.source.common.data_reader import DataReader from datahub.ingestion.source.common.data_reader import DataReader
from datahub.ingestion.source.sql.sqlalchemy_data_reader import SAMPLE_SIZE_MULTIPLIER
from datahub.metadata.com.linkedin.pegasus2avro.common import ( from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp, AuditStamp,
GlossaryTermAssociation, GlossaryTermAssociation,
@ -26,6 +25,9 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata
from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.perf_timer import PerfTimer
SAMPLE_SIZE_MULTIPLIER = 1.2
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
@ -289,7 +291,7 @@ def classification_workunit_processor(
classification_handler: ClassificationHandler, classification_handler: ClassificationHandler,
data_reader: Optional[DataReader], data_reader: Optional[DataReader],
table_id: List[str], table_id: List[str],
data_reader_kwargs: dict = {}, data_reader_kwargs: Optional[dict] = None,
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
""" """
Classification handling for a particular table. Classification handling for a particular table.
@ -317,7 +319,7 @@ def classification_workunit_processor(
table_id, table_id,
classification_handler.config.classification.sample_size classification_handler.config.classification.sample_size
* SAMPLE_SIZE_MULTIPLIER, * SAMPLE_SIZE_MULTIPLIER,
**data_reader_kwargs, **(data_reader_kwargs or {}),
) )
if data_reader if data_reader
else dict() else dict()

View File

@ -38,6 +38,7 @@ from datahub.ingestion.api.source import (
) )
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.glossary.classification_mixin import ( from datahub.ingestion.glossary.classification_mixin import (
SAMPLE_SIZE_MULTIPLIER,
ClassificationHandler, ClassificationHandler,
classification_workunit_processor, classification_workunit_processor,
) )
@ -77,7 +78,6 @@ from datahub.ingestion.source.sql.sql_utils import (
gen_schema_container, gen_schema_container,
get_domain_wu, get_domain_wu,
) )
from datahub.ingestion.source.sql.sqlalchemy_data_reader import SAMPLE_SIZE_MULTIPLIER
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import ( from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantLineageRunSkipHandler, RedundantLineageRunSkipHandler,

View File

@ -48,6 +48,7 @@ from datahub.ingestion.api.source import (
) )
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.glossary.classification_mixin import ( from datahub.ingestion.glossary.classification_mixin import (
SAMPLE_SIZE_MULTIPLIER,
ClassificationHandler, ClassificationHandler,
ClassificationReportMixin, ClassificationReportMixin,
) )
@ -68,7 +69,6 @@ from datahub.ingestion.source.sql.sql_utils import (
schema_requires_v2, schema_requires_v2,
) )
from datahub.ingestion.source.sql.sqlalchemy_data_reader import ( from datahub.ingestion.source.sql.sqlalchemy_data_reader import (
SAMPLE_SIZE_MULTIPLIER,
SqlAlchemyTableDataReader, SqlAlchemyTableDataReader,
) )
from datahub.ingestion.source.state.stale_entity_removal_handler import ( from datahub.ingestion.source.state.stale_entity_removal_handler import (

View File

@ -72,6 +72,3 @@ class SqlAlchemyTableDataReader(DataReader):
def close(self) -> None: def close(self) -> None:
self.connection.close() self.connection.close()
SAMPLE_SIZE_MULTIPLIER = 1.2