From 09bbcea0a8b378926438e543f4c06a20d0478bc4 Mon Sep 17 00:00:00 2001 From: Kevin Hu <6051736+kevinhu@users.noreply.github.com> Date: Sun, 27 Jun 2021 23:40:17 -0700 Subject: [PATCH] feat(ingest): add non-random sampling for mongo (#2778) --- metadata-ingestion/README.md | 2 ++ .../src/datahub/ingestion/source/mongodb.py | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 280443a482..d4158f3581 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -574,6 +574,7 @@ Extracts: - List of collections in each database and infers schemas for each collection By default, schema inference samples 1,000 documents from each collection. Setting `schemaSamplingSize: null` will scan the entire collection. +Moreover, setting `useRandomSampling: False` will sample the first documents found without random selection, which may be faster for large collections. Note that `schemaSamplingSize` has no effect if `enableSchemaInference: False` is set. @@ -593,6 +594,7 @@ source: collection_pattern: {} enableSchemaInference: True schemaSamplingSize: 1000 + useRandomSampling: True # whether to randomly sample docs for schema or just use the first ones, True by default # database_pattern/collection_pattern are similar to schema_pattern/table_pattern from above ``` diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index 3585d0d603..b64e744a13 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -54,6 +54,7 @@ class MongoDBConfig(ConfigModel): options: dict = {} enableSchemaInference: bool = True schemaSamplingSize: Optional[PositiveInt] = 1000 + useRandomSampling: bool = True env: str = DEFAULT_ENV database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() @@ -282,6 +283,7 @@ def construct_schema( def construct_schema_pymongo( collection: pymongo.collection.Collection, delimiter: str, + use_random_sampling: bool, sample_size: Optional[int] = None, ) -> Dict[Tuple[str, ...], SchemaDescription]: """ @@ -302,10 +304,15 @@ def construct_schema_pymongo( """ if sample_size: - # get sample documents in collection - documents = collection.aggregate( - [{"$sample": {"size": sample_size}}], allowDiskUse=True - ) + if use_random_sampling: + # get sample documents in collection + documents = collection.aggregate( + [{"$sample": {"size": sample_size}}], allowDiskUse=True + ) + else: + documents = collection.aggregate( + [{"$limit": sample_size}], allowDiskUse=True + ) else: # if sample_size is not provided, just take all items in the collection documents = collection.find({}) @@ -434,6 +441,7 @@ class MongoDBSource(Source): collection_schema = construct_schema_pymongo( database[collection_name], delimiter=".", + use_random_sampling=self.config.useRandomSampling, sample_size=self.config.schemaSamplingSize, )