feat(ingest): add non-random sampling for mongo (#2778)

This commit is contained in:
Kevin Hu 2021-06-27 23:40:17 -07:00 committed by GitHub
parent c05459b446
commit 09bbcea0a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 4 deletions

View File

@ -574,6 +574,7 @@ Extracts:
- List of collections in each database and infers schemas for each collection
By default, schema inference samples 1,000 documents from each collection. Setting `schemaSamplingSize: null` will scan the entire collection.
Moreover, setting `useRandomSampling: False` will sample the first documents found without random selection, which may be faster for large collections.
Note that `schemaSamplingSize` has no effect if `enableSchemaInference: False` is set.
@ -593,6 +594,7 @@ source:
collection_pattern: {}
enableSchemaInference: True
schemaSamplingSize: 1000
useRandomSampling: True # whether to randomly sample docs for schema or just use the first ones, True by default
# database_pattern/collection_pattern are similar to schema_pattern/table_pattern from above
```

View File

@ -54,6 +54,7 @@ class MongoDBConfig(ConfigModel):
options: dict = {}
enableSchemaInference: bool = True
schemaSamplingSize: Optional[PositiveInt] = 1000
useRandomSampling: bool = True
env: str = DEFAULT_ENV
database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
@ -282,6 +283,7 @@ def construct_schema(
def construct_schema_pymongo(
collection: pymongo.collection.Collection,
delimiter: str,
use_random_sampling: bool,
sample_size: Optional[int] = None,
) -> Dict[Tuple[str, ...], SchemaDescription]:
"""
@ -302,10 +304,15 @@ def construct_schema_pymongo(
"""
if sample_size:
# get sample documents in collection
documents = collection.aggregate(
[{"$sample": {"size": sample_size}}], allowDiskUse=True
)
if use_random_sampling:
# get sample documents in collection
documents = collection.aggregate(
[{"$sample": {"size": sample_size}}], allowDiskUse=True
)
else:
documents = collection.aggregate(
[{"$limit": sample_size}], allowDiskUse=True
)
else:
# if sample_size is not provided, just take all items in the collection
documents = collection.find({})
@ -434,6 +441,7 @@ class MongoDBSource(Source):
collection_schema = construct_schema_pymongo(
database[collection_name],
delimiter=".",
use_random_sampling=self.config.useRandomSampling,
sample_size=self.config.schemaSamplingSize,
)