fix(ingest): remove original_table_name logic in sql source (#8130)

This commit is contained in:
Harshal Sheth 2023-05-31 15:58:09 -07:00 committed by GitHub
parent 4ade1311f1
commit 60dd9ef187
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 89 deletions

View File

@ -3,7 +3,12 @@ from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView
import bson import bson
import bson.dbref
import bson.int64
import bson.objectid
import bson.timestamp
import pymongo import pymongo
import pymongo.collection
from packaging import version from packaging import version
from pydantic import PositiveInt, validator from pydantic import PositiveInt, validator
from pydantic.fields import Field from pydantic.fields import Field
@ -199,7 +204,7 @@ def construct_schema_pymongo(
@platform_name("MongoDB") @platform_name("MongoDB")
@config_class(MongoDBConfig) @config_class(MongoDBConfig)
@support_status(SupportStatus.CERTIFIED) @support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") @capability(SourceCapability.SCHEMA_METADATA, "Enabled by default")
@dataclass @dataclass
class MongoDBSource(Source): class MongoDBSource(Source):
""" """

View File

@ -517,13 +517,6 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
profile_requests, profiler, platform=self.platform profile_requests, profiler, platform=self.platform
) )
def standardize_schema_table_names(
self, schema: str, entity: str
) -> Tuple[str, str]:
# Some SQLAlchemy dialects need a standardization step to clean the schema
# and table names. See BigQuery for an example of when this is useful.
return schema, entity
def get_identifier( def get_identifier(
self, *, schema: str, entity: str, inspector: Inspector, **kwargs: Any self, *, schema: str, entity: str, inspector: Inspector, **kwargs: Any
) -> str: ) -> str:
@ -572,9 +565,6 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
fk_dict["name"], foreign_fields, source_fields, foreign_dataset fk_dict["name"], foreign_fields, source_fields, foreign_dataset
) )
def normalise_dataset_name(self, dataset_name: str) -> str:
return dataset_name
def loop_tables( # noqa: C901 def loop_tables( # noqa: C901
self, self,
inspector: Inspector, inspector: Inspector,
@ -584,15 +574,10 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
tables_seen: Set[str] = set() tables_seen: Set[str] = set()
try: try:
for table in inspector.get_table_names(schema): for table in inspector.get_table_names(schema):
schema, table = self.standardize_schema_table_names(
schema=schema, entity=table
)
dataset_name = self.get_identifier( dataset_name = self.get_identifier(
schema=schema, entity=table, inspector=inspector schema=schema, entity=table, inspector=inspector
) )
dataset_name = self.normalise_dataset_name(dataset_name)
if dataset_name not in tables_seen: if dataset_name not in tables_seen:
tables_seen.add(dataset_name) tables_seen.add(dataset_name)
else: else:
@ -650,18 +635,8 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
inspector, schema, table inspector, schema, table
) )
# Tablename might be different from the real table if we ran some normalisation ont it.
# Getting normalized table name from the dataset_name
# Table is the last item in the dataset name
normalised_table = table
splits = dataset_name.split(".")
if splits:
normalised_table = splits[-1]
if properties and normalised_table != table:
properties["original_table_name"] = table
dataset_properties = DatasetPropertiesClass( dataset_properties = DatasetPropertiesClass(
name=normalised_table, name=table,
description=description, description=description,
customProperties=properties, customProperties=properties,
) )
@ -866,14 +841,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
try: try:
for view in inspector.get_view_names(schema): for view in inspector.get_view_names(schema):
schema, view = self.standardize_schema_table_names(
schema=schema, entity=view
)
dataset_name = self.get_identifier( dataset_name = self.get_identifier(
schema=schema, entity=view, inspector=inspector schema=schema, entity=view, inspector=inspector
) )
dataset_name = self.normalise_dataset_name(dataset_name)
self.report.report_entity_scanned(dataset_name, ent_type="view") self.report.report_entity_scanned(dataset_name, ent_type="view")
if not sql_config.view_pattern.allowed(dataset_name): if not sql_config.view_pattern.allowed(dataset_name):
@ -1068,9 +1038,6 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
logger.debug("Source does not support generating profile candidates.") logger.debug("Source does not support generating profile candidates.")
for table in inspector.get_table_names(schema): for table in inspector.get_table_names(schema):
schema, table = self.standardize_schema_table_names(
schema=schema, entity=table
)
dataset_name = self.get_identifier( dataset_name = self.get_identifier(
schema=schema, entity=table, inspector=inspector schema=schema, entity=table, inspector=inspector
) )
@ -1081,8 +1048,6 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
self.report.report_dropped(f"profile of {dataset_name}") self.report.report_dropped(f"profile of {dataset_name}")
continue continue
dataset_name = self.normalise_dataset_name(dataset_name)
if dataset_name not in tables_seen: if dataset_name not in tables_seen:
tables_seen.add(dataset_name) tables_seen.add(dataset_name)
else: else:

View File

@ -265,13 +265,9 @@ class VerticaSource(SQLAlchemySource):
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
try: try:
for view in inspector.get_view_names(schema): for view in inspector.get_view_names(schema):
schema, view = self.standardize_schema_table_names(
schema=schema, entity=view
)
dataset_name = self.get_identifier( dataset_name = self.get_identifier(
schema=schema, entity=view, inspector=inspector schema=schema, entity=view, inspector=inspector
) )
dataset_name = self.normalise_dataset_name(dataset_name)
self.report.report_entity_scanned(dataset_name, ent_type="view") self.report.report_entity_scanned(dataset_name, ent_type="view")
@ -396,13 +392,9 @@ class VerticaSource(SQLAlchemySource):
try: try:
# table_tags = self.get_extra_tags(inspector, schema, "projection") # table_tags = self.get_extra_tags(inspector, schema, "projection")
for projection in inspector.get_projection_names(schema): # type: ignore for projection in inspector.get_projection_names(schema): # type: ignore
schema, projection = self.standardize_schema_table_names(
schema=schema, entity=projection
)
dataset_name = self.get_identifier( dataset_name = self.get_identifier(
schema=schema, entity=projection, inspector=inspector schema=schema, entity=projection, inspector=inspector
) )
dataset_name = self.normalise_dataset_name(dataset_name)
if dataset_name not in projections_seen: if dataset_name not in projections_seen:
projections_seen.add(dataset_name) projections_seen.add(dataset_name)
else: else:
@ -479,18 +471,8 @@ class VerticaSource(SQLAlchemySource):
inspector, schema, projection inspector, schema, projection
) )
# Tablename might be different from the real table if we ran some normalisation ont it.
# Getting normalized table name from the dataset_name
# Table is the last item in the dataset name
normalised_table = projection
splits = dataset_name.split(".")
if splits:
normalised_table = splits[-1]
if properties and normalised_table != projection:
properties["original_table_name"] = projection
dataset_properties = DatasetPropertiesClass( dataset_properties = DatasetPropertiesClass(
name=normalised_table, name=projection,
description=description, description=description,
customProperties=properties, customProperties=properties,
) )
@ -618,14 +600,10 @@ class VerticaSource(SQLAlchemySource):
models_seen: Set[str] = set() models_seen: Set[str] = set()
try: try:
for models in inspector.get_models_names(schema): # type: ignore for models in inspector.get_models_names(schema): # type: ignore
schema, models = self.standardize_schema_table_names(
schema=schema, entity=models
)
dataset_name = self.get_identifier( dataset_name = self.get_identifier(
schema="Entities", entity=models, inspector=inspector schema="Entities", entity=models, inspector=inspector
) )
dataset_name = self.normalise_dataset_name(dataset_name)
if dataset_name not in models_seen: if dataset_name not in models_seen:
models_seen.add(dataset_name) models_seen.add(dataset_name)
else: else:
@ -687,22 +665,12 @@ class VerticaSource(SQLAlchemySource):
description, properties, location = self.get_model_properties( description, properties, location = self.get_model_properties(
inspector, schema, table inspector, schema, table
) )
# Tablename might be different from the real table if we ran some normalisation ont it.
# Getting normalized table name from the dataset_name
# Table is the last item in the dataset name
normalised_table = table
splits = dataset_name.split(".")
if splits:
normalised_table = splits[-1]
if properties and normalised_table != table:
properties["original_table_name"] = table
dataset_properties = DatasetPropertiesClass( dataset_properties = DatasetPropertiesClass(
name=normalised_table, name=table,
description=description, description=description,
customProperties=properties, customProperties=properties,
) )
dataset_snapshot.aspects.append(dataset_properties) dataset_snapshot.aspects.append(dataset_properties)
schema_fields = self.get_schema_fields(dataset_name, columns) schema_fields = self.get_schema_fields(dataset_name, columns)
@ -796,14 +764,10 @@ class VerticaSource(SQLAlchemySource):
oauth_seen: Set[str] = set() oauth_seen: Set[str] = set()
try: try:
for oauth in inspector.get_Oauth_names(schema): # type: ignore for oauth in inspector.get_Oauth_names(schema): # type: ignore
schema, oauth = self.standardize_schema_table_names(
schema=schema, entity=oauth
)
dataset_name = self.get_identifier( dataset_name = self.get_identifier(
schema=schema, entity=oauth, inspector=inspector schema=schema, entity=oauth, inspector=inspector
) )
dataset_name = self.normalise_dataset_name(dataset_name)
if dataset_name not in oauth_seen: if dataset_name not in oauth_seen:
oauth_seen.add(dataset_name) oauth_seen.add(dataset_name)
else: else:
@ -861,17 +825,9 @@ class VerticaSource(SQLAlchemySource):
description, properties, location_urn = self.get_oauth_properties( description, properties, location_urn = self.get_oauth_properties(
inspector, schema, oauth inspector, schema, oauth
) )
# Tablename might be different from the real table if we ran some normalisation ont it.
# Getting normalized table name from the dataset_name
# Table is the last item in the dataset name
normalised_table = oauth
splits = dataset_name.split(".")
if splits:
normalised_table = splits[-1]
if properties and normalised_table != oauth:
properties["original_table_name"] = oauth
dataset_properties = DatasetPropertiesClass( dataset_properties = DatasetPropertiesClass(
name=normalised_table, name=oauth,
description=description, description=description,
customProperties=properties, customProperties=properties,
) )
@ -966,9 +922,6 @@ class VerticaSource(SQLAlchemySource):
profile_candidates = None # Default value if profile candidates not available. profile_candidates = None # Default value if profile candidates not available.
yield from super().loop_profiler_requests(inspector, schema, sql_config) yield from super().loop_profiler_requests(inspector, schema, sql_config)
for projection in inspector.get_projection_names(schema): # type: ignore for projection in inspector.get_projection_names(schema): # type: ignore
schema, projection = self.standardize_schema_table_names(
schema=schema, entity=projection
)
dataset_name = self.get_identifier( dataset_name = self.get_identifier(
schema=schema, entity=projection, inspector=inspector schema=schema, entity=projection, inspector=inspector
) )
@ -979,7 +932,6 @@ class VerticaSource(SQLAlchemySource):
if self.config.profiling.report_dropped_profiles: if self.config.profiling.report_dropped_profiles:
self.report.report_dropped(f"profile of {dataset_name}") self.report.report_dropped(f"profile of {dataset_name}")
continue continue
dataset_name = self.normalise_dataset_name(dataset_name)
if dataset_name not in tables_seen: if dataset_name not in tables_seen:
tables_seen.add(dataset_name) tables_seen.add(dataset_name)
else: else: