feat(ingest/mongodb): support stateful ingestion (#9118)

This commit is contained in:
Tony Ouyang 2023-11-01 00:13:17 -07:00 committed by GitHub
parent 55f14530a3
commit 876de214c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 4269 additions and 4123 deletions

View File

@ -15,7 +15,12 @@ from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
@ -25,14 +30,21 @@ from datahub.ingestion.api.decorators import (
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.schema_inference.object import (
SchemaDescription,
construct_schema,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulIngestionConfigBase,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
@ -48,7 +60,10 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import (
TimeTypeClass,
UnionTypeClass,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetPropertiesClass,
)
logger = logging.getLogger(__name__)
@ -59,7 +74,9 @@ logger = logging.getLogger(__name__)
DENY_DATABASE_LIST = set(["admin", "config", "local"])
class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
class MongoDBConfig(
PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase
):
# See the MongoDB authentication docs for details and examples.
# https://pymongo.readthedocs.io/en/stable/examples/authentication.html
connect_uri: str = Field(
@ -99,6 +116,8 @@ class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
default=AllowDenyPattern.allow_all(),
description="regex patterns for collections to filter in ingestion.",
)
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
@validator("maxDocumentSize")
def check_max_doc_size_filter_is_valid(cls, doc_size_filter_value):
@ -108,7 +127,7 @@ class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
@dataclass
class MongoDBSourceReport(SourceReport):
class MongoDBSourceReport(StaleEntityRemovalSourceReport):
filtered: List[str] = field(default_factory=list)
def report_dropped(self, name: str) -> None:
@ -129,6 +148,7 @@ PYMONGO_TYPE_TO_MONGO_TYPE = {
bson.timestamp.Timestamp: "timestamp",
bson.dbref.DBRef: "dbref",
bson.objectid.ObjectId: "oid",
bson.Decimal128: "numberDecimal",
"mixed": "mixed",
}
@ -145,6 +165,7 @@ _field_type_mapping: Dict[Union[Type, str], Type] = {
bson.timestamp.Timestamp: TimeTypeClass,
bson.dbref.DBRef: BytesTypeClass,
bson.objectid.ObjectId: BytesTypeClass,
bson.Decimal128: NumberTypeClass,
dict: RecordTypeClass,
"mixed": UnionTypeClass,
}
@ -206,7 +227,7 @@ def construct_schema_pymongo(
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.SCHEMA_METADATA, "Enabled by default")
@dataclass
class MongoDBSource(Source):
class MongoDBSource(StatefulIngestionSourceBase):
"""
This plugin extracts the following:
@ -227,7 +248,7 @@ class MongoDBSource(Source):
mongo_client: MongoClient
def __init__(self, ctx: PipelineContext, config: MongoDBConfig):
super().__init__(ctx)
super().__init__(config, ctx)
self.config = config
self.report = MongoDBSourceReport()
@ -254,6 +275,14 @@ class MongoDBSource(Source):
config = MongoDBConfig.parse_obj(config_dict)
return cls(ctx, config)
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]
def get_pymongo_type_string(
self, field_type: Union[Type, str], collection_name: str
) -> str:
@ -332,16 +361,18 @@ class MongoDBSource(Source):
platform_instance=self.config.platform_instance,
)
dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[],
)
if self.config.platform_instance:
data_platform_instance = DataPlatformInstanceClass(
platform=make_data_platform_urn(platform),
instance=make_dataplatform_instance_urn(
platform, self.config.platform_instance
),
)
dataset_properties = DatasetPropertiesClass(
tags=[],
customProperties={},
)
dataset_snapshot.aspects.append(dataset_properties)
if self.config.enableSchemaInference:
assert self.config.maxDocumentSize is not None
@ -412,13 +443,20 @@ class MongoDBSource(Source):
fields=canonical_schema,
)
dataset_snapshot.aspects.append(schema_metadata)
# TODO: use list_indexes() or index_information() to get index information
# See https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.list_indexes.
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
yield MetadataWorkUnit(id=dataset_name, mce=mce)
yield from [
mcp.as_workunit()
for mcp in MetadataChangeProposalWrapper.construct_many(
entityUrn=dataset_urn,
aspects=[
schema_metadata,
dataset_properties,
data_platform_instance,
],
)
]
def is_server_version_gte_4_4(self) -> bool:
try: