diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index fae4ef7400..53e9094ee8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -3,7 +3,12 @@ from dataclasses import dataclass, field from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView import bson +import bson.dbref +import bson.int64 +import bson.objectid +import bson.timestamp import pymongo +import pymongo.collection from packaging import version from pydantic import PositiveInt, validator from pydantic.fields import Field @@ -199,7 +204,7 @@ def construct_schema_pymongo( @platform_name("MongoDB") @config_class(MongoDBConfig) @support_status(SupportStatus.CERTIFIED) -@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") +@capability(SourceCapability.SCHEMA_METADATA, "Enabled by default") @dataclass class MongoDBSource(Source): """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 288b4bf1e7..fb659d9548 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -517,13 +517,6 @@ class SQLAlchemySource(StatefulIngestionSourceBase): profile_requests, profiler, platform=self.platform ) - def standardize_schema_table_names( - self, schema: str, entity: str - ) -> Tuple[str, str]: - # Some SQLAlchemy dialects need a standardization step to clean the schema - # and table names. See BigQuery for an example of when this is useful. - return schema, entity - def get_identifier( self, *, schema: str, entity: str, inspector: Inspector, **kwargs: Any ) -> str: @@ -572,9 +565,6 @@ class SQLAlchemySource(StatefulIngestionSourceBase): fk_dict["name"], foreign_fields, source_fields, foreign_dataset ) - def normalise_dataset_name(self, dataset_name: str) -> str: - return dataset_name - def loop_tables( # noqa: C901 self, inspector: Inspector, @@ -584,15 +574,10 @@ class SQLAlchemySource(StatefulIngestionSourceBase): tables_seen: Set[str] = set() try: for table in inspector.get_table_names(schema): - schema, table = self.standardize_schema_table_names( - schema=schema, entity=table - ) dataset_name = self.get_identifier( schema=schema, entity=table, inspector=inspector ) - dataset_name = self.normalise_dataset_name(dataset_name) - if dataset_name not in tables_seen: tables_seen.add(dataset_name) else: @@ -650,18 +635,8 @@ class SQLAlchemySource(StatefulIngestionSourceBase): inspector, schema, table ) - # Tablename might be different from the real table if we ran some normalisation ont it. - # Getting normalized table name from the dataset_name - # Table is the last item in the dataset name - normalised_table = table - splits = dataset_name.split(".") - if splits: - normalised_table = splits[-1] - if properties and normalised_table != table: - properties["original_table_name"] = table - dataset_properties = DatasetPropertiesClass( - name=normalised_table, + name=table, description=description, customProperties=properties, ) @@ -866,14 +841,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase): ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: try: for view in inspector.get_view_names(schema): - schema, view = self.standardize_schema_table_names( - schema=schema, entity=view - ) dataset_name = self.get_identifier( schema=schema, entity=view, inspector=inspector ) - dataset_name = self.normalise_dataset_name(dataset_name) - self.report.report_entity_scanned(dataset_name, ent_type="view") if not sql_config.view_pattern.allowed(dataset_name): @@ -1068,9 +1038,6 @@ class SQLAlchemySource(StatefulIngestionSourceBase): logger.debug("Source does not support generating profile candidates.") for table in inspector.get_table_names(schema): - schema, table = self.standardize_schema_table_names( - schema=schema, entity=table - ) dataset_name = self.get_identifier( schema=schema, entity=table, inspector=inspector ) @@ -1081,8 +1048,6 @@ class SQLAlchemySource(StatefulIngestionSourceBase): self.report.report_dropped(f"profile of {dataset_name}") continue - dataset_name = self.normalise_dataset_name(dataset_name) - if dataset_name not in tables_seen: tables_seen.add(dataset_name) else: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py index 148b865ca4..ac08593d97 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py @@ -265,13 +265,9 @@ class VerticaSource(SQLAlchemySource): ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: try: for view in inspector.get_view_names(schema): - schema, view = self.standardize_schema_table_names( - schema=schema, entity=view - ) dataset_name = self.get_identifier( schema=schema, entity=view, inspector=inspector ) - dataset_name = self.normalise_dataset_name(dataset_name) self.report.report_entity_scanned(dataset_name, ent_type="view") @@ -396,13 +392,9 @@ class VerticaSource(SQLAlchemySource): try: # table_tags = self.get_extra_tags(inspector, schema, "projection") for projection in inspector.get_projection_names(schema): # type: ignore - schema, projection = self.standardize_schema_table_names( - schema=schema, entity=projection - ) dataset_name = self.get_identifier( schema=schema, entity=projection, inspector=inspector ) - dataset_name = self.normalise_dataset_name(dataset_name) if dataset_name not in projections_seen: projections_seen.add(dataset_name) else: @@ -479,18 +471,8 @@ class VerticaSource(SQLAlchemySource): inspector, schema, projection ) - # Tablename might be different from the real table if we ran some normalisation ont it. - # Getting normalized table name from the dataset_name - # Table is the last item in the dataset name - normalised_table = projection - splits = dataset_name.split(".") - if splits: - normalised_table = splits[-1] - if properties and normalised_table != projection: - properties["original_table_name"] = projection - dataset_properties = DatasetPropertiesClass( - name=normalised_table, + name=projection, description=description, customProperties=properties, ) @@ -618,14 +600,10 @@ class VerticaSource(SQLAlchemySource): models_seen: Set[str] = set() try: for models in inspector.get_models_names(schema): # type: ignore - schema, models = self.standardize_schema_table_names( - schema=schema, entity=models - ) dataset_name = self.get_identifier( schema="Entities", entity=models, inspector=inspector ) - dataset_name = self.normalise_dataset_name(dataset_name) if dataset_name not in models_seen: models_seen.add(dataset_name) else: @@ -687,22 +665,12 @@ class VerticaSource(SQLAlchemySource): description, properties, location = self.get_model_properties( inspector, schema, table ) - # Tablename might be different from the real table if we ran some normalisation ont it. - # Getting normalized table name from the dataset_name - # Table is the last item in the dataset name - normalised_table = table - splits = dataset_name.split(".") - if splits: - normalised_table = splits[-1] - if properties and normalised_table != table: - properties["original_table_name"] = table dataset_properties = DatasetPropertiesClass( - name=normalised_table, + name=table, description=description, customProperties=properties, ) - dataset_snapshot.aspects.append(dataset_properties) schema_fields = self.get_schema_fields(dataset_name, columns) @@ -796,14 +764,10 @@ class VerticaSource(SQLAlchemySource): oauth_seen: Set[str] = set() try: for oauth in inspector.get_Oauth_names(schema): # type: ignore - schema, oauth = self.standardize_schema_table_names( - schema=schema, entity=oauth - ) dataset_name = self.get_identifier( schema=schema, entity=oauth, inspector=inspector ) - dataset_name = self.normalise_dataset_name(dataset_name) if dataset_name not in oauth_seen: oauth_seen.add(dataset_name) else: @@ -861,17 +825,9 @@ class VerticaSource(SQLAlchemySource): description, properties, location_urn = self.get_oauth_properties( inspector, schema, oauth ) - # Tablename might be different from the real table if we ran some normalisation ont it. - # Getting normalized table name from the dataset_name - # Table is the last item in the dataset name - normalised_table = oauth - splits = dataset_name.split(".") - if splits: - normalised_table = splits[-1] - if properties and normalised_table != oauth: - properties["original_table_name"] = oauth + dataset_properties = DatasetPropertiesClass( - name=normalised_table, + name=oauth, description=description, customProperties=properties, ) @@ -966,9 +922,6 @@ class VerticaSource(SQLAlchemySource): profile_candidates = None # Default value if profile candidates not available. yield from super().loop_profiler_requests(inspector, schema, sql_config) for projection in inspector.get_projection_names(schema): # type: ignore - schema, projection = self.standardize_schema_table_names( - schema=schema, entity=projection - ) dataset_name = self.get_identifier( schema=schema, entity=projection, inspector=inspector ) @@ -979,7 +932,6 @@ class VerticaSource(SQLAlchemySource): if self.config.profiling.report_dropped_profiles: self.report.report_dropped(f"profile of {dataset_name}") continue - dataset_name = self.normalise_dataset_name(dataset_name) if dataset_name not in tables_seen: tables_seen.add(dataset_name) else: