diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 280443a482..d4158f3581 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -574,6 +574,7 @@ Extracts: - List of collections in each database and infers schemas for each collection By default, schema inference samples 1,000 documents from each collection. Setting `schemaSamplingSize: null` will scan the entire collection. +Moreover, setting `useRandomSampling: False` will sample the first documents found without random selection, which may be faster for large collections. Note that `schemaSamplingSize` has no effect if `enableSchemaInference: False` is set. @@ -593,6 +594,7 @@ source: collection_pattern: {} enableSchemaInference: True schemaSamplingSize: 1000 + useRandomSampling: True # whether to randomly sample docs for schema or just use the first ones, True by default # database_pattern/collection_pattern are similar to schema_pattern/table_pattern from above ``` diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index 3585d0d603..b64e744a13 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -54,6 +54,7 @@ class MongoDBConfig(ConfigModel): options: dict = {} enableSchemaInference: bool = True schemaSamplingSize: Optional[PositiveInt] = 1000 + useRandomSampling: bool = True env: str = DEFAULT_ENV database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() @@ -282,6 +283,7 @@ def construct_schema( def construct_schema_pymongo( collection: pymongo.collection.Collection, delimiter: str, + use_random_sampling: bool, sample_size: Optional[int] = None, ) -> Dict[Tuple[str, ...], SchemaDescription]: """ @@ -302,10 +304,15 @@ def construct_schema_pymongo( """ if sample_size: - # get sample documents in collection - documents = collection.aggregate( - [{"$sample": {"size": sample_size}}], allowDiskUse=True - ) + if use_random_sampling: + # get sample documents in collection + documents = collection.aggregate( + [{"$sample": {"size": sample_size}}], allowDiskUse=True + ) + else: + documents = collection.aggregate( + [{"$limit": sample_size}], allowDiskUse=True + ) else: # if sample_size is not provided, just take all items in the collection documents = collection.find({}) @@ -434,6 +441,7 @@ class MongoDBSource(Source): collection_schema = construct_schema_pymongo( database[collection_name], delimiter=".", + use_random_sampling=self.config.useRandomSampling, sample_size=self.config.schemaSamplingSize, )