Moved view lineage to post process (#6585)

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-08-04 23:33:17 +05:30 committed by GitHub
parent b37bca5471
commit 3d93aaad53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 92 additions and 87 deletions

View File

@ -90,9 +90,10 @@ class TopologyRunnerMixin(Generic[C]):
if node.post_process:
logger.debug(f"Post processing node {node}")
node_post_process = getattr(self, node.post_process)
for entity_request in node_post_process():
yield entity_request
for process in node.post_process:
node_post_process = getattr(self, process)
for entity_request in node_post_process():
yield entity_request
def next_record(self) -> Iterable[Entity]:
"""

View File

@ -66,7 +66,7 @@ class TopologyNode(BaseModel):
children: Optional[List[str]] = None # nodes to call execute next
post_process: Optional[
str
List[str]
] = None # Method to be run after the node has been fully processed

View File

@ -52,6 +52,7 @@ from metadata.ingestion.source.database.database_service import (
)
from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
from metadata.utils import fqn
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger
@ -90,6 +91,7 @@ class CommonDbSourceService(
self.data_models = {}
self.table_constraints = None
self.database_source_state = set()
self.context.table_views = []
super().__init__()
def set_inspector(self, database_name: str) -> None:
@ -276,6 +278,13 @@ class CommonDbSourceService(
inspector=self.inspector,
)
view_definition = self.get_view_definition(
table_type=table_type,
table_name=table_name,
schema_name=schema_name,
inspector=self.inspector,
)
table_request = CreateTableRequest(
name=table_name,
tableType=table_type,
@ -285,12 +294,7 @@ class CommonDbSourceService(
inspector=self.inspector,
),
columns=columns,
viewDefinition=self.get_view_definition(
table_type=table_type,
table_name=table_name,
schema_name=schema_name,
inspector=self.inspector,
),
viewDefinition=view_definition,
tableConstraints=table_constraints if table_constraints else None,
databaseSchema=EntityReference(
id=self.context.database_schema.id,
@ -301,6 +305,15 @@ class CommonDbSourceService(
), # Pick tags from context info, if any
)
if table_type == TableType.View or view_definition:
table_view = {
"table_name": table_name,
"table_type": table_type,
"schema_name": schema_name,
"db_name": db_name,
}
self.context.table_views.append(table_view)
yield table_request
self.register_record(table_request=table_request)
@ -312,54 +325,64 @@ class CommonDbSourceService(
"{}.{}".format(self.config.serviceName, table_name)
)
def yield_view_lineage(
self, table_name_and_type: Tuple[str, str]
) -> Optional[Iterable[AddLineageRequest]]:
table_name, table_type = table_name_and_type
table_entity: Table = self.context.table
schema_name = self.context.database_schema.name.__root__
db_name = self.context.database.name.__root__
view_definition = self.get_view_definition(
table_type=table_type,
table_name=table_name,
schema_name=schema_name,
inspector=self.inspector,
)
if table_type != TableType.View or not view_definition:
return
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
logger.info(f"Processing Lineage for Views")
for view in self.context.table_views:
table_name = view.get("table_name")
table_type = view.get("table_type")
schema_name = view.get("schema_name")
db_name = view.get("db_name")
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
table_name=table_name,
)
table_entity = self.metadata.get_by_name(
entity=Table,
fqn=table_fqn,
)
view_definition = self.get_view_definition(
table_type=table_type,
table_name=table_name,
schema_name=schema_name,
inspector=self.inspector,
)
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner
# Reverting changes after import is done
DictConfigurator.configure = configure
# Reverting changes after import is done
DictConfigurator.configure = configure
try:
result = LineageRunner(view_definition)
if result.source_tables and result.target_tables:
yield from get_lineage_by_query(
self.metadata,
query=view_definition,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
) or []
try:
result = LineageRunner(view_definition)
if result.source_tables and result.target_tables:
yield from get_lineage_by_query(
self.metadata,
query=view_definition,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
) or []
else:
yield from get_lineage_via_table_entity(
self.metadata,
table_entity=table_entity,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
query=view_definition,
) or []
except Exception:
logger.debug(traceback.format_exc())
logger.debug(f"Query : {view_definition}")
logger.warning("Could not parse query: Ingesting lineage failed")
else:
yield from get_lineage_via_table_entity(
self.metadata,
table_entity=table_entity,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
query=view_definition,
) or []
except Exception:
logger.debug(traceback.format_exc())
logger.debug(f"Query : {view_definition}")
logger.warning("Could not parse query: Ingesting lineage failed")
def test_connection(self) -> None:
"""

View File

@ -120,7 +120,7 @@ class DatabaseServiceTopology(ServiceTopology):
),
],
children=["database"],
post_process="create_dbt_lineage",
post_process=["create_dbt_lineage", "yield_view_lineage"],
)
database = TopologyNode(
producer="get_database_names",
@ -133,7 +133,7 @@ class DatabaseServiceTopology(ServiceTopology):
)
],
children=["databaseSchema"],
post_process="mark_tables_as_deleted",
post_process=["mark_tables_as_deleted"],
)
databaseSchema = TopologyNode(
producer="get_database_schema_names",
@ -171,13 +171,6 @@ class DatabaseServiceTopology(ServiceTopology):
consumer=["storage_service"],
nullable=True,
),
NodeStage(
type_=AddLineageRequest,
context="view_lineage",
processor="yield_view_lineage",
ack_sink=False,
nullable=True,
),
NodeStage(
type_=DataModelLink,
processor="yield_datamodel",
@ -317,9 +310,7 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC):
yield from self.yield_tag(schema_name) or []
@abstractmethod
def yield_view_lineage(
self, table_name_and_type: Tuple[str, TableType]
) -> Optional[Iterable[AddLineageRequest]]:
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
"""
From topology.
Parses view definition to get lineage information

View File

@ -322,10 +322,8 @@ class DatalakeSource(DatabaseServiceSource):
logger.debug(traceback.format_exc())
logger.error(err)
def yield_view_lineage(
self, table_name_and_type: Tuple[str, str]
) -> Optional[Iterable[AddLineageRequest]]:
pass
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]:
pass

View File

@ -351,10 +351,8 @@ class DeltalakeSource(DatabaseServiceSource):
return parsed_columns
def yield_view_lineage(
self, table_name_and_type: Tuple[str, str]
) -> Optional[Iterable[AddLineageRequest]]:
pass
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]:
pass

View File

@ -189,10 +189,8 @@ class DynamodbSource(DatabaseServiceSource):
"{}.{}".format(self.config.serviceName, table_name)
)
def yield_view_lineage(
self, table_name_and_type: Tuple[str, str]
) -> Optional[Iterable[AddLineageRequest]]:
pass
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]:
pass

View File

@ -341,10 +341,8 @@ class GlueSource(DatabaseServiceSource):
def standardize_table_name(self, schema: str, table: str) -> str:
return table[:128]
def yield_view_lineage(
self, table_name_and_type: Tuple[str, str]
) -> Optional[Iterable[AddLineageRequest]]:
pass
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]:
pass

View File

@ -219,10 +219,8 @@ class SalesforceSource(DatabaseServiceSource):
return "INT"
return "VARCHAR"
def yield_view_lineage(
self, table_name_and_type: Tuple[str, str]
) -> Optional[Iterable[AddLineageRequest]]:
pass
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]:
pass