refactor(ingest): simplify stateful ingestion config (#6454)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
This commit is contained in:
Harshal Sheth 2022-11-18 00:09:24 -05:00 committed by GitHub
parent b7c03731c4
commit 817406eadb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 35 additions and 195 deletions

View File

@ -65,25 +65,8 @@ Examples:
3. [BaseSQLAlchemyCheckpointState](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/sql_common_state.py#L17)
### 2. Modifying the SourceConfig
The `stateful_ingestion` config param that is mandatory for any source using stateful ingestion needs to be overridden with a custom config that is more specific to the source
and is inherited from `datahub.ingestion.source.state.stale_entity_removal_handler.StatefulStaleMetadataRemovalConfig`. The `StatefulStaleMetadataRemovalConfig` adds the following
additional parameters to the basic stateful ingestion config that is common for all sources. Typical customization involves overriding the `_entity_types` private config member which helps produce
more accurate documentation specific to the source.
```python
import pydantic
from typing import List
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionConfig
class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
""" Base specialized config for Stateful Ingestion with stale metadata removal capability. """
# Allows for sources to define(via override) the entity types they support.
_entity_types: List[str] = []
# Whether to enable removal of stale metadata.
remove_stale_metadata: bool = pydantic.Field(
default=True,
description=f"Soft-deletes the entities of type {', '.join(_entity_types)} in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
```
The source's config must inherit from `StatefulIngestionConfigBase`, and should declare a field named `stateful_ingestion` of type `Optional[StatefulStaleMetadataRemovalConfig]`.
Examples:
1. The `KafkaSourceConfig`
@ -94,17 +77,11 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import Stateful
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.configuration.source_common import DatasetSourceConfigBase
class KafkaSourceStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
""" Kafka custom stateful ingestion config definition(overrides _entity_types of StatefulStaleMetadataRemovalConfig). """
_entity_types: List[str] = pydantic.Field(default=["topic"])
class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
class KafkaSourceConfig(StatefulIngestionConfigBase):
# ...<other config params>...
""" Override the stateful_ingestion config param with the Kafka custom stateful ingestion config in the KafkaSourceConfig. """
stateful_ingestion: Optional[KafkaSourceStatefulIngestionConfig] = None
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
```
2. The [DBTStatefulIngestionConfig](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/dbt.py#L131)

View File

@ -108,16 +108,6 @@ DEFAULT_PLATFORM = "glue"
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]
class GlueStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the GlueSourceConfig.
"""
_entity_types: List[str] = Field(default=["table"])
class GlueSourceConfig(
AwsSourceConfig, GlueProfilingConfig, StatefulIngestionConfigBase
):
@ -164,7 +154,7 @@ class GlueSourceConfig(
description="Configs to ingest data profiles from glue table",
)
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[GlueStatefulIngestionConfig] = Field(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description=""
)
@ -1294,6 +1284,3 @@ class GlueSource(StatefulIngestionSourceBase):
def get_platform_instance_id(self) -> str:
return self.source_config.platform_instance or self.platform
def close(self):
self.prepare_for_commit()

View File

@ -107,6 +107,3 @@ class SagemakerSource(Source):
def get_report(self):
return self.report
def close(self):
pass

View File

@ -1187,6 +1187,3 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
def warn(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_warning(key, reason)
log.warning(f"{key} => {reason}")
def close(self) -> None:
self.prepare_for_commit()

View File

@ -611,6 +611,3 @@ class CSVEnricherSource(Source):
def get_report(self):
return self.report
def close(self):
pass

View File

@ -131,16 +131,6 @@ logger = logging.getLogger(__name__)
DBT_PLATFORM = "dbt"
class DBTStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of basic StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SQLAlchemyConfig.
"""
_entity_types: List[str] = pydantic.Field(default=["assertion", "dataset"])
@dataclass
class DBTSourceReport(StaleEntityRemovalSourceReport):
pass
@ -312,7 +302,7 @@ class DBTConfig(StatefulIngestionConfigBase, LineageConfig):
default=False,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)
stateful_ingestion: Optional[DBTStatefulIngestionConfig] = pydantic.Field(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="DBT Stateful Ingestion Config."
)
@ -1891,6 +1881,3 @@ class DBTSource(StatefulIngestionSourceBase):
raise ValueError("DBT project identifier is not found in manifest")
return f"{self.platform}_{project_id}"
def close(self):
self.prepare_for_commit()

View File

@ -331,6 +331,3 @@ class DeltaLakeSource(Source):
def get_report(self) -> SourceReport:
return self.report
def close(self):
pass

View File

@ -482,3 +482,4 @@ class ElasticsearchSource(Source):
def close(self):
if self.client:
self.client.close()
super().close()

View File

@ -410,6 +410,3 @@ class FeastRepositorySource(Source):
def get_report(self) -> SourceReport:
return self.report
def close(self) -> None:
return

View File

@ -332,6 +332,3 @@ class FeastSource(Source):
def get_report(self) -> FeastSourceReport:
return self.report
def close(self):
return

View File

@ -223,6 +223,7 @@ class GenericFileSource(TestableSource):
def close(self):
if self.fp:
self.fp.close()
super().close()
def _iterate_file(self, path: str) -> Iterable[Tuple[int, Any]]:
self.report.current_file_name = path

View File

@ -298,9 +298,6 @@ class IcebergSource(Source):
def get_report(self) -> SourceReport:
return self.report
def close(self) -> None:
pass
def _parse_datatype(type: IcebergTypes.Type, nullable: bool = False) -> Dict[str, Any]:
# Check for complex types: struct, list, map

View File

@ -471,9 +471,6 @@ class AzureADSource(Source):
def get_report(self) -> SourceReport:
return self.report
def close(self) -> None:
pass
def _get_azure_ad_groups(self) -> Iterable[List]:
yield from self._get_azure_ad_data(kind="/groups")

View File

@ -418,9 +418,6 @@ class OktaSource(Source):
def get_report(self):
return self.report
def close(self):
pass
# Instantiates Okta SDK Client.
def _create_okta_client(self):
config = {

View File

@ -51,16 +51,6 @@ from datahub.utilities.registries.domain_registry import DomainRegistry
logger = logging.getLogger(__name__)
class KafkaSourceStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SQLAlchemyConfig.
"""
_entity_types: List[str] = pydantic.Field(default=["topic"])
class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
env: str = DEFAULT_ENV
# TODO: inline the connection config
@ -74,8 +64,7 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
default={},
description="Provides the mapping for the `key` and the `value` schemas of a topic to the corresponding schema registry subject name. Each entry of this map has the form `<topic_name>-key`:`<schema_registry_subject_name_for_key_schema>` and `<topic_name>-value`:`<schema_registry_subject_name_for_value_schema>` for the key and the value schemas associated with the topic, respectively. This parameter is mandatory when the [RecordNameStrategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work) is used as the subject naming strategy in the kafka schema registry. NOTE: When provided, this overrides the default subject name resolution even when the `TopicNameStrategy` or the `TopicRecordNameStrategy` are used.",
)
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[KafkaSourceStatefulIngestionConfig] = None
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
schema_registry_class: str = pydantic.Field(
default="datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry",
description="The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.",
@ -280,6 +269,6 @@ class KafkaSource(StatefulIngestionSourceBase):
return self.report
def close(self) -> None:
self.prepare_for_commit()
if self.consumer:
self.consumer.close()
super().close()

View File

@ -411,8 +411,8 @@ class LDAPSource(Source):
return self.report
def close(self) -> None:
"""Closes the Source."""
self.ldap_client.unbind()
super().close()
def parse_from_attrs(attrs: Dict[str, Any], filter_key: str) -> List[str]:

View File

@ -1690,6 +1690,3 @@ class LookMLSource(Source):
def get_report(self):
return self.reporter
def close(self):
pass

View File

@ -169,6 +169,7 @@ class MetabaseSource(Source):
key="metabase-session",
reason=f"Unable to logout for user {self.config.username}",
)
super().close()
def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
try:

View File

@ -403,6 +403,3 @@ class BusinessGlossaryFileSource(Source):
def get_report(self):
return self.report
def close(self):
pass

View File

@ -204,6 +204,3 @@ class LineageFileSource(Source):
def get_report(self):
return self.report
def close(self):
pass

View File

@ -433,3 +433,4 @@ class MongoDBSource(Source):
def close(self):
self.mongo_client.close()
super().close()

View File

@ -326,9 +326,6 @@ class APISource(Source, ABC):
def get_report(self):
return self.report
def close(self):
pass
class OpenApiSource(APISource):
def __init__(self, config: OpenApiConfig, ctx: PipelineContext):

View File

@ -570,6 +570,3 @@ class PowerBiReportServerDashboardSource(Source):
def get_report(self) -> SourceReport:
return self.report
def close(self):
pass

View File

@ -552,5 +552,5 @@ class PulsarSource(StatefulIngestionSourceBase):
return self.report
def close(self):
self.prepare_for_commit()
super().close()
self.session.close()

View File

@ -796,6 +796,3 @@ class RedashSource(Source):
def get_report(self) -> SourceReport:
return self.report
def close(self):
pass

View File

@ -805,6 +805,3 @@ class S3Source(Source):
def get_report(self):
return self.report
def close(self):
pass

View File

@ -1014,6 +1014,3 @@ class SnowflakeV2Source(
# Stateful Ingestion Overrides.
def get_platform_instance_id(self) -> str:
return self.config.get_account()
def close(self):
self.prepare_for_commit()

View File

@ -194,3 +194,4 @@ class AthenaSource(SQLAlchemySource):
def close(self):
if self.cursor:
self.cursor.close()
super().close()

View File

@ -790,7 +790,7 @@ class PrestoOnHiveSource(SQLAlchemySource):
def close(self) -> None:
if self._alchemy_client.connection is not None:
self._alchemy_client.connection.close()
self.prepare_for_commit()
super().close()
def get_schema_fields_for_column(
self,

View File

@ -221,18 +221,6 @@ class SQLSourceReport(StaleEntityRemovalSourceReport):
self.query_combiner = query_combiner_report
class SQLAlchemyStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SQLAlchemyConfig.
"""
_entity_types: List[str] = pydantic.Field(
default=["assertion", "container", "table", "view"]
)
class SQLAlchemyConfig(StatefulIngestionConfigBase):
options: dict = {}
# Although the 'table_pattern' enables you to skip everything from certain schemas,
@ -269,7 +257,7 @@ class SQLAlchemyConfig(StatefulIngestionConfigBase):
profiling: GEProfilingConfig = GEProfilingConfig()
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[SQLAlchemyStatefulIngestionConfig] = None
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
@pydantic.root_validator(pre=True)
def view_pattern_is_table_pattern_unless_specified(
@ -1387,6 +1375,3 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
def get_report(self):
return self.report
def close(self):
self.prepare_for_commit()

View File

@ -29,10 +29,9 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
Base specialized config for Stateful Ingestion with stale metadata removal capability.
"""
_entity_types: List[str] = []
remove_stale_metadata: bool = pydantic.Field(
default=True,
description=f"Soft-deletes the entities of type {', '.join(_entity_types)} in the last successful run but missing in the current run with stateful_ingestion enabled.",
description="Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
fail_safe_threshold: float = pydantic.Field(
default=20.0,

View File

@ -57,12 +57,15 @@ class StatefulIngestionConfig(ConfigModel):
default=False,
description="The type of the ingestion state provider registered with datahub.",
)
# fmt: off
# 16MB
max_checkpoint_state_size: pydantic.PositiveInt = Field(default=2**24, description="The maximum size of the checkpoint state in bytes. Default is 16MB") # 16MB
# fmt: on
max_checkpoint_state_size: pydantic.PositiveInt = Field(
default=2**24, # 16 MB
description="The maximum size of the checkpoint state in bytes. Default is 16MB",
hidden_from_schema=True,
)
state_provider: Optional[DynamicTypedStateProviderConfig] = Field(
default=None, description="The ingestion state provider configuration."
default=None,
description="The ingestion state provider configuration.",
hidden_from_schema=True,
)
ignore_old_state: bool = Field(
default=False,
@ -323,3 +326,7 @@ class StatefulIngestionSourceBase(Source):
def prepare_for_commit(self) -> None:
"""NOTE: Sources should call this method from their close method."""
self._prepare_checkpoint_states_for_commit()
def close(self) -> None:
self.prepare_for_commit()
super().close()

View File

@ -115,16 +115,6 @@ logger: logging.Logger = logging.getLogger(__name__)
REPLACE_SLASH_CHAR = "|"
class TableauStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the TableauConfig.
"""
_entity_types: List[str] = Field(default=["dataset", "chart", "dashboard"])
class TableauConnectionConfig(ConfigModel):
connect_uri: str = Field(description="Tableau host URL.")
username: Optional[str] = Field(
@ -246,7 +236,7 @@ class TableauConfig(
description="[experimental] Extract usage statistics for dashboards and charts.",
)
stateful_ingestion: Optional[TableauStatefulIngestionConfig] = Field(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description=""
)
@ -322,8 +312,8 @@ class TableauSource(StatefulIngestionSourceBase):
def close(self) -> None:
if self.server is not None:
self.prepare_for_commit()
self.server.auth.sign_out()
super().close()
def _populate_usage_stat_registry(self):
if self.server is None:

View File

@ -1,4 +1,4 @@
from typing import Dict, List, Optional
from typing import Dict, Optional
import pydantic
from pydantic import Field
@ -12,16 +12,6 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
)
class UnityCatalogStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the UnityCatalogConfig.
"""
_entity_types: List[str] = Field(default=["dataset", "container"])
class UnityCatalogSourceConfig(StatefulIngestionConfigBase):
token: str = pydantic.Field(description="Databricks personal access token")
workspace_url: str = pydantic.Field(description="Databricks workspace url")
@ -64,6 +54,6 @@ class UnityCatalogSourceConfig(StatefulIngestionConfigBase):
description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",
)
stateful_ingestion: Optional[UnityCatalogStatefulIngestionConfig] = pydantic.Field(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Unity Catalog Stateful Ingestion Config."
)

View File

@ -593,6 +593,3 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
description=column.comment,
)
]
def close(self) -> None:
self.prepare_for_commit()

View File

@ -257,6 +257,3 @@ class ClickHouseUsageSource(Source):
def get_report(self) -> SourceReport:
return self.report
def close(self) -> None:
pass

View File

@ -411,6 +411,3 @@ class RedshiftUsageSource(Source):
def get_report(self) -> RedshiftUsageSourceReport:
return self.report
def close(self) -> None:
pass

View File

@ -462,6 +462,3 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase):
def get_report(self):
return self.report
def close(self):
self.prepare_for_commit()

View File

@ -287,6 +287,3 @@ class TrinoUsageSource(Source):
def get_report(self) -> SourceReport:
return self.report
def close(self) -> None:
pass

View File

@ -15,16 +15,6 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
from datahub.utilities import config_clean
class PulsarSourceStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of the StatefulStaleMetadataRemovalConfig to add custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the PulsarSourceConfig.
"""
_entity_types: List[str] = Field(default=["topic"])
def _is_valid_hostname(hostname: str) -> bool:
"""
Loosely ascii hostname validation. A hostname is considered valid when the total length does not exceed 253
@ -96,7 +86,7 @@ class PulsarSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
default_factory=dict, description="Domain patterns"
)
stateful_ingestion: Optional[PulsarSourceStatefulIngestionConfig] = Field(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="see Stateful Ingestion"
)