fix(ingest/deltalake) Deltalake ingestor doesn't delete metadata if the table is deleted. (#14763)

This commit is contained in:
alplatonov 2025-10-13 15:51:17 +02:00 committed by GitHub
parent 82338628b6
commit 9fb82a73ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -13,8 +13,9 @@ from datahub.configuration.source_common import (
) )
from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig
from datahub.ingestion.source.aws.s3_util import is_s3_uri from datahub.ingestion.source.aws.s3_util import is_s3_uri
from datahub.ingestion.source.state.stateful_ingestion_base import ( from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulIngestionConfigBase, StatefulIngestionConfigBase,
StatefulStaleMetadataRemovalConfig,
) )
# hide annoying debug errors from py4j # hide annoying debug errors from py4j
@ -39,9 +40,7 @@ class S3(ConfigModel):
class DeltaLakeSourceConfig( class DeltaLakeSourceConfig(
PlatformInstanceConfigMixin, PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase
EnvConfigMixin,
StatefulIngestionConfigBase,
): ):
base_path: str = Field( base_path: str = Field(
description="Path to table (s3 or local file system). If path is not a delta table path " description="Path to table (s3 or local file system). If path is not a delta table path "
@ -80,6 +79,11 @@ class DeltaLakeSourceConfig(
s3: Optional[S3] = Field(None) s3: Optional[S3] = Field(None)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None,
description="Stateful Ingestion Config with stale metadata removal",
)
@cached_property @cached_property
def is_s3(self): def is_s3(self):
return is_s3_uri(self.base_path or "") return is_s3_uri(self.base_path or "")