mirror of
https://github.com/datahub-project/datahub.git
synced 2026-01-07 15:27:05 +00:00
fix(ingest): Ensure mongodb source doesn't croak on large documents. (#3456)
This commit is contained in:
parent
561c04bcf8
commit
8d8cdc44e8
@ -8,7 +8,7 @@ from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesVie
|
||||
import bson
|
||||
import pymongo
|
||||
from mypy_extensions import TypedDict
|
||||
from pydantic import PositiveInt
|
||||
from pydantic import PositiveInt, validator
|
||||
from pymongo.mongo_client import MongoClient
|
||||
|
||||
from datahub.configuration.common import AllowDenyPattern, ConfigModel
|
||||
@ -56,11 +56,20 @@ class MongoDBConfig(ConfigModel):
|
||||
schemaSamplingSize: Optional[PositiveInt] = 1000
|
||||
useRandomSampling: bool = True
|
||||
maxSchemaSize: Optional[PositiveInt] = 300
|
||||
# mongodb only supports 16MB as max size for documents. However, if we try to retrieve a larger document it
|
||||
# errors out with "16793600" as the maximum size supported.
|
||||
maxDocumentSize: Optional[PositiveInt] = 16793600
|
||||
env: str = DEFAULT_ENV
|
||||
|
||||
database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
|
||||
collection_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
|
||||
|
||||
@validator("maxDocumentSize")
|
||||
def check_max_doc_size_filter_is_valid(cls, doc_size_filter_value):
|
||||
if doc_size_filter_value > 16793600:
|
||||
raise ValueError("maxDocumentSize must be a positive value <= 16793600.")
|
||||
return doc_size_filter_value
|
||||
|
||||
|
||||
@dataclass
|
||||
class MongoDBSourceReport(SourceReport):
|
||||
@ -285,6 +294,7 @@ def construct_schema_pymongo(
|
||||
collection: pymongo.collection.Collection,
|
||||
delimiter: str,
|
||||
use_random_sampling: bool,
|
||||
max_document_size: int,
|
||||
sample_size: Optional[int] = None,
|
||||
) -> Dict[Tuple[str, ...], SchemaDescription]:
|
||||
"""
|
||||
@ -302,21 +312,27 @@ def construct_schema_pymongo(
|
||||
sample_size:
|
||||
number of items in the collection to sample
|
||||
(reads entire collection if not provided)
|
||||
max_document_size:
|
||||
maximum size of the document that will be considered for generating the schema.
|
||||
"""
|
||||
|
||||
if sample_size:
|
||||
if use_random_sampling:
|
||||
# get sample documents in collection
|
||||
documents = collection.aggregate(
|
||||
[{"$sample": {"size": sample_size}}], allowDiskUse=True
|
||||
)
|
||||
else:
|
||||
documents = collection.aggregate(
|
||||
[{"$limit": sample_size}], allowDiskUse=True
|
||||
)
|
||||
doc_size_field = "temporary_doc_size_field"
|
||||
# create a temporary field to store the size of the document. filter on it and then remove it.
|
||||
aggregations = [
|
||||
{"$addFields": {doc_size_field: {"$bsonSize": "$$ROOT"}}},
|
||||
{"$match": {doc_size_field: {"$lt": max_document_size}}},
|
||||
{"$project": {doc_size_field: 0}},
|
||||
]
|
||||
if use_random_sampling:
|
||||
# get sample documents in collection
|
||||
aggregations.append({"$sample": {"size": sample_size}})
|
||||
documents = collection.aggregate(
|
||||
aggregations,
|
||||
allowDiskUse=True,
|
||||
)
|
||||
else:
|
||||
# if sample_size is not provided, just take all items in the collection
|
||||
documents = collection.find({})
|
||||
aggregations.append({"$limit": sample_size})
|
||||
documents = collection.aggregate(aggregations, allowDiskUse=True)
|
||||
|
||||
return construct_schema(list(documents), delimiter)
|
||||
|
||||
@ -440,11 +456,12 @@ class MongoDBSource(Source):
|
||||
dataset_snapshot.aspects.append(dataset_properties)
|
||||
|
||||
if self.config.enableSchemaInference:
|
||||
|
||||
assert self.config.maxDocumentSize is not None
|
||||
collection_schema = construct_schema_pymongo(
|
||||
database[collection_name],
|
||||
delimiter=".",
|
||||
use_random_sampling=self.config.useRandomSampling,
|
||||
max_document_size=self.config.maxDocumentSize,
|
||||
sample_size=self.config.schemaSamplingSize,
|
||||
)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user