MINOR: Move external table lineage to post processing (#15633)

This commit is contained in:
Mayur Singal 2024-03-22 11:46:14 +05:30 committed by GitHub
parent f03ae2d6ab
commit 2208662886
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 17 additions and 37 deletions

View File

@ -108,7 +108,11 @@ class DatabaseServiceTopology(ServiceTopology):
# Note how we have `yield_view_lineage` and `yield_stored_procedure_lineage`
# as post_processed. This is because we cannot ensure proper lineage processing
# until we have finished ingesting all the metadata from the source.
post_process=["yield_view_lineage", "yield_procedure_lineage_and_queries"],
post_process=[
"yield_view_lineage",
"yield_procedure_lineage_and_queries",
"yield_external_table_lineage",
],
)
database = TopologyNode(
producer="get_database_names",
@ -170,11 +174,6 @@ class DatabaseServiceTopology(ServiceTopology):
consumer=["database_service", "database", "database_schema"],
use_cache=True,
),
NodeStage(
type_=AddLineageRequest,
processor="yield_external_table_lineage",
nullable=True,
),
NodeStage(
type_=OMetaLifeCycleData,
processor="yield_life_cycle_data",

View File

@ -609,8 +609,3 @@ class DatabricksSource(ExternalTableLineageMixin, CommonDbSourceService, MultiDB
f"Table description error for table [{schema_name}.{table_name}]: {exc}"
)
return description
def get_external_table_location(self):
return self.external_location_map.get(
(self.context.database, self.context.database_schema, self.context.table)
)

View File

@ -14,7 +14,7 @@ External Table Lineage Mixin
import traceback
from abc import ABC, abstractmethod
from typing import Iterable, Tuple
from typing import Iterable
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table
@ -32,32 +32,23 @@ class ExternalTableLineageMixin(ABC):
This mixin class is for deriving lineage between external table and container source/
"""
@abstractmethod
def get_external_table_location(self):
"""
Get external table location
"""
def yield_external_table_lineage(
self, table_name_and_type: Tuple[str, str]
) -> Iterable[AddLineageRequest]:
def yield_external_table_lineage(self) -> Iterable[AddLineageRequest]:
"""
Yield external table lineage
"""
table_name, _ = table_name_and_type
location = self.get_external_table_location()
if location:
for table_qualified_tuple, location in self.external_location_map.items() or []:
try:
location_entity = self.metadata.es_search_container_by_path(
full_path=location
)
database_name, schema_name, table_name = table_qualified_tuple
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
database_name=database_name,
schema_name=schema_name,
table_name=table_name,
skip_es_search=True,
)

View File

@ -31,6 +31,7 @@ from metadata.generated.schema.entity.data.storedProcedure import StoredProcedur
from metadata.generated.schema.entity.data.table import (
PartitionColumnDetails,
PartitionIntervalTypes,
Table,
TablePartition,
TableType,
)
@ -722,8 +723,3 @@ class SnowflakeSource(
)
else:
yield from super().mark_tables_as_deleted()
def get_external_table_location(self):
return self.external_location_map.get(
(self.context.database, self.context.database_schema, self.context.table)
)

View File

@ -97,6 +97,7 @@ class UnitycatalogSource(
self.service_connection: UnityCatalogConnection = (
self.config.serviceConnection.__root__.config
)
self.external_location_map = {}
self.client = get_connection(self.service_connection)
self.connection_obj = self.client
self.table_constraints = []
@ -290,12 +291,13 @@ class UnitycatalogSource(
Prepare a table request and pass it to the sink
"""
table_name, table_type = table_name_and_type
self.context.storage_location = None
table = self.client.tables.get(self.context.table_data.full_name)
if table.storage_location and not table.storage_location.startswith("dbfs"):
self.context.storage_location = table.storage_location
schema_name = self.context.database_schema
db_name = self.context.database
if table.storage_location and not table.storage_location.startswith("dbfs"):
self.external_location_map[
(db_name, schema_name, table_name)
] = table.storage_location
table_constraints = None
try:
columns = self.get_columns(table.columns)
@ -529,6 +531,3 @@ class UnitycatalogSource(
def close(self):
"""Nothing to close"""
def get_external_table_location(self):
return self.context.storage_location