diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index f06ab8eebbd..ba98347151e 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -108,7 +108,11 @@ class DatabaseServiceTopology(ServiceTopology): # Note how we have `yield_view_lineage` and `yield_stored_procedure_lineage` # as post_processed. This is because we cannot ensure proper lineage processing # until we have finished ingesting all the metadata from the source. - post_process=["yield_view_lineage", "yield_procedure_lineage_and_queries"], + post_process=[ + "yield_view_lineage", + "yield_procedure_lineage_and_queries", + "yield_external_table_lineage", + ], ) database = TopologyNode( producer="get_database_names", @@ -170,11 +174,6 @@ class DatabaseServiceTopology(ServiceTopology): consumer=["database_service", "database", "database_schema"], use_cache=True, ), - NodeStage( - type_=AddLineageRequest, - processor="yield_external_table_lineage", - nullable=True, - ), NodeStage( type_=OMetaLifeCycleData, processor="yield_life_cycle_data", diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py b/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py index d800b90357d..fea5ea8f718 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py @@ -609,8 +609,3 @@ class DatabricksSource(ExternalTableLineageMixin, CommonDbSourceService, MultiDB f"Table description error for table [{schema_name}.{table_name}]: {exc}" ) return description - - def get_external_table_location(self): - return self.external_location_map.get( - (self.context.database, self.context.database_schema, self.context.table) - ) diff --git a/ingestion/src/metadata/ingestion/source/database/external_table_lineage_mixin.py b/ingestion/src/metadata/ingestion/source/database/external_table_lineage_mixin.py index f82219786b9..dc07c4357d6 100644 --- a/ingestion/src/metadata/ingestion/source/database/external_table_lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/source/database/external_table_lineage_mixin.py @@ -14,7 +14,7 @@ External Table Lineage Mixin import traceback from abc import ABC, abstractmethod -from typing import Iterable, Tuple +from typing import Iterable from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Table @@ -32,32 +32,23 @@ class ExternalTableLineageMixin(ABC): This mixin class is for deriving lineage between external table and container source/ """ - @abstractmethod - def get_external_table_location(self): - """ - Get external table location - """ - - def yield_external_table_lineage( - self, table_name_and_type: Tuple[str, str] - ) -> Iterable[AddLineageRequest]: + def yield_external_table_lineage(self) -> Iterable[AddLineageRequest]: """ Yield external table lineage """ - table_name, _ = table_name_and_type - location = self.get_external_table_location() - if location: + for table_qualified_tuple, location in self.external_location_map.items() or []: try: location_entity = self.metadata.es_search_container_by_path( full_path=location ) + database_name, schema_name, table_name = table_qualified_tuple table_fqn = fqn.build( self.metadata, entity_type=Table, service_name=self.context.database_service, - database_name=self.context.database, - schema_name=self.context.database_schema, + database_name=database_name, + schema_name=schema_name, table_name=table_name, skip_es_search=True, ) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 265aadc9a34..bf1fdd6666d 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -31,6 +31,7 @@ from metadata.generated.schema.entity.data.storedProcedure import StoredProcedur from metadata.generated.schema.entity.data.table import ( PartitionColumnDetails, PartitionIntervalTypes, + Table, TablePartition, TableType, ) @@ -722,8 +723,3 @@ class SnowflakeSource( ) else: yield from super().mark_tables_as_deleted() - - def get_external_table_location(self): - return self.external_location_map.get( - (self.context.database, self.context.database_schema, self.context.table) - ) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index 785391248be..1f6ab613a4f 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -97,6 +97,7 @@ class UnitycatalogSource( self.service_connection: UnityCatalogConnection = ( self.config.serviceConnection.__root__.config ) + self.external_location_map = {} self.client = get_connection(self.service_connection) self.connection_obj = self.client self.table_constraints = [] @@ -290,12 +291,13 @@ class UnitycatalogSource( Prepare a table request and pass it to the sink """ table_name, table_type = table_name_and_type - self.context.storage_location = None table = self.client.tables.get(self.context.table_data.full_name) - if table.storage_location and not table.storage_location.startswith("dbfs"): - self.context.storage_location = table.storage_location schema_name = self.context.database_schema db_name = self.context.database + if table.storage_location and not table.storage_location.startswith("dbfs"): + self.external_location_map[ + (db_name, schema_name, table_name) + ] = table.storage_location table_constraints = None try: columns = self.get_columns(table.columns) @@ -529,6 +531,3 @@ class UnitycatalogSource( def close(self): """Nothing to close""" - - def get_external_table_location(self): - return self.context.storage_location