diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index f4629acf149..3850bcb679e 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -90,9 +90,10 @@ class TopologyRunnerMixin(Generic[C]): if node.post_process: logger.debug(f"Post processing node {node}") - node_post_process = getattr(self, node.post_process) - for entity_request in node_post_process(): - yield entity_request + for process in node.post_process: + node_post_process = getattr(self, process) + for entity_request in node_post_process(): + yield entity_request def next_record(self) -> Iterable[Entity]: """ diff --git a/ingestion/src/metadata/ingestion/models/topology.py b/ingestion/src/metadata/ingestion/models/topology.py index cb69b02422b..da9c1c7cbe5 100644 --- a/ingestion/src/metadata/ingestion/models/topology.py +++ b/ingestion/src/metadata/ingestion/models/topology.py @@ -66,7 +66,7 @@ class TopologyNode(BaseModel): children: Optional[List[str]] = None # nodes to call execute next post_process: Optional[ - str + List[str] ] = None # Method to be run after the node has been fully processed diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index eeaa3b01f05..90b384f2273 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -52,6 +52,7 @@ from metadata.ingestion.source.database.database_service import ( ) from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource +from metadata.utils import fqn from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.logger import ingestion_logger @@ -90,6 +91,7 @@ class CommonDbSourceService( self.data_models = {} self.table_constraints = None self.database_source_state = set() + self.context.table_views = [] super().__init__() def set_inspector(self, database_name: str) -> None: @@ -276,6 +278,13 @@ class CommonDbSourceService( inspector=self.inspector, ) + view_definition = self.get_view_definition( + table_type=table_type, + table_name=table_name, + schema_name=schema_name, + inspector=self.inspector, + ) + table_request = CreateTableRequest( name=table_name, tableType=table_type, @@ -285,12 +294,7 @@ class CommonDbSourceService( inspector=self.inspector, ), columns=columns, - viewDefinition=self.get_view_definition( - table_type=table_type, - table_name=table_name, - schema_name=schema_name, - inspector=self.inspector, - ), + viewDefinition=view_definition, tableConstraints=table_constraints if table_constraints else None, databaseSchema=EntityReference( id=self.context.database_schema.id, @@ -301,6 +305,15 @@ class CommonDbSourceService( ), # Pick tags from context info, if any ) + if table_type == TableType.View or view_definition: + table_view = { + "table_name": table_name, + "table_type": table_type, + "schema_name": schema_name, + "db_name": db_name, + } + self.context.table_views.append(table_view) + yield table_request self.register_record(table_request=table_request) @@ -312,54 +325,64 @@ class CommonDbSourceService( "{}.{}".format(self.config.serviceName, table_name) ) - def yield_view_lineage( - self, table_name_and_type: Tuple[str, str] - ) -> Optional[Iterable[AddLineageRequest]]: - table_name, table_type = table_name_and_type - table_entity: Table = self.context.table - schema_name = self.context.database_schema.name.__root__ - db_name = self.context.database.name.__root__ - view_definition = self.get_view_definition( - table_type=table_type, - table_name=table_name, - schema_name=schema_name, - inspector=self.inspector, - ) - if table_type != TableType.View or not view_definition: - return - # Prevent sqllineage from modifying the logger config - # Disable the DictConfigurator.configure method while importing LineageRunner - configure = DictConfigurator.configure - DictConfigurator.configure = lambda _: None - from sqllineage.runner import LineageRunner + def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: + logger.info(f"Processing Lineage for Views") + for view in self.context.table_views: + table_name = view.get("table_name") + table_type = view.get("table_type") + schema_name = view.get("schema_name") + db_name = view.get("db_name") + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.context.database_service.name.__root__, + database_name=db_name, + schema_name=schema_name, + table_name=table_name, + ) + table_entity = self.metadata.get_by_name( + entity=Table, + fqn=table_fqn, + ) + view_definition = self.get_view_definition( + table_type=table_type, + table_name=table_name, + schema_name=schema_name, + inspector=self.inspector, + ) + # Prevent sqllineage from modifying the logger config + # Disable the DictConfigurator.configure method while importing LineageRunner + configure = DictConfigurator.configure + DictConfigurator.configure = lambda _: None + from sqllineage.runner import LineageRunner - # Reverting changes after import is done - DictConfigurator.configure = configure + # Reverting changes after import is done + DictConfigurator.configure = configure - try: - result = LineageRunner(view_definition) - if result.source_tables and result.target_tables: - yield from get_lineage_by_query( - self.metadata, - query=view_definition, - service_name=self.context.database_service.name.__root__, - database_name=db_name, - schema_name=schema_name, - ) or [] + try: + result = LineageRunner(view_definition) + if result.source_tables and result.target_tables: + yield from get_lineage_by_query( + self.metadata, + query=view_definition, + service_name=self.context.database_service.name.__root__, + database_name=db_name, + schema_name=schema_name, + ) or [] - else: - yield from get_lineage_via_table_entity( - self.metadata, - table_entity=table_entity, - service_name=self.context.database_service.name.__root__, - database_name=db_name, - schema_name=schema_name, - query=view_definition, - ) or [] - except Exception: - logger.debug(traceback.format_exc()) - logger.debug(f"Query : {view_definition}") - logger.warning("Could not parse query: Ingesting lineage failed") + else: + yield from get_lineage_via_table_entity( + self.metadata, + table_entity=table_entity, + service_name=self.context.database_service.name.__root__, + database_name=db_name, + schema_name=schema_name, + query=view_definition, + ) or [] + except Exception: + logger.debug(traceback.format_exc()) + logger.debug(f"Query : {view_definition}") + logger.warning("Could not parse query: Ingesting lineage failed") def test_connection(self) -> None: """ diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index d1468489072..614185e54a7 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -120,7 +120,7 @@ class DatabaseServiceTopology(ServiceTopology): ), ], children=["database"], - post_process="create_dbt_lineage", + post_process=["create_dbt_lineage", "yield_view_lineage"], ) database = TopologyNode( producer="get_database_names", @@ -133,7 +133,7 @@ class DatabaseServiceTopology(ServiceTopology): ) ], children=["databaseSchema"], - post_process="mark_tables_as_deleted", + post_process=["mark_tables_as_deleted"], ) databaseSchema = TopologyNode( producer="get_database_schema_names", @@ -171,13 +171,6 @@ class DatabaseServiceTopology(ServiceTopology): consumer=["storage_service"], nullable=True, ), - NodeStage( - type_=AddLineageRequest, - context="view_lineage", - processor="yield_view_lineage", - ack_sink=False, - nullable=True, - ), NodeStage( type_=DataModelLink, processor="yield_datamodel", @@ -317,9 +310,7 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC): yield from self.yield_tag(schema_name) or [] @abstractmethod - def yield_view_lineage( - self, table_name_and_type: Tuple[str, TableType] - ) -> Optional[Iterable[AddLineageRequest]]: + def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: """ From topology. Parses view definition to get lineage information diff --git a/ingestion/src/metadata/ingestion/source/database/datalake.py b/ingestion/src/metadata/ingestion/source/database/datalake.py index 819d75f6990..1aed2ad3556 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake.py @@ -322,10 +322,8 @@ class DatalakeSource(DatabaseServiceSource): logger.debug(traceback.format_exc()) logger.error(err) - def yield_view_lineage( - self, table_name_and_type: Tuple[str, str] - ) -> Optional[Iterable[AddLineageRequest]]: - pass + def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: + yield from [] def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]: pass diff --git a/ingestion/src/metadata/ingestion/source/database/deltalake.py b/ingestion/src/metadata/ingestion/source/database/deltalake.py index 652213e2ab8..d78479b0496 100644 --- a/ingestion/src/metadata/ingestion/source/database/deltalake.py +++ b/ingestion/src/metadata/ingestion/source/database/deltalake.py @@ -351,10 +351,8 @@ class DeltalakeSource(DatabaseServiceSource): return parsed_columns - def yield_view_lineage( - self, table_name_and_type: Tuple[str, str] - ) -> Optional[Iterable[AddLineageRequest]]: - pass + def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: + yield from [] def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]: pass diff --git a/ingestion/src/metadata/ingestion/source/database/dynamodb.py b/ingestion/src/metadata/ingestion/source/database/dynamodb.py index f4f80b3b461..98bf535a441 100644 --- a/ingestion/src/metadata/ingestion/source/database/dynamodb.py +++ b/ingestion/src/metadata/ingestion/source/database/dynamodb.py @@ -189,10 +189,8 @@ class DynamodbSource(DatabaseServiceSource): "{}.{}".format(self.config.serviceName, table_name) ) - def yield_view_lineage( - self, table_name_and_type: Tuple[str, str] - ) -> Optional[Iterable[AddLineageRequest]]: - pass + def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: + yield from [] def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]: pass diff --git a/ingestion/src/metadata/ingestion/source/database/glue.py b/ingestion/src/metadata/ingestion/source/database/glue.py index cda60b3a9f7..5eee8abc5ac 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue.py +++ b/ingestion/src/metadata/ingestion/source/database/glue.py @@ -341,10 +341,8 @@ class GlueSource(DatabaseServiceSource): def standardize_table_name(self, schema: str, table: str) -> str: return table[:128] - def yield_view_lineage( - self, table_name_and_type: Tuple[str, str] - ) -> Optional[Iterable[AddLineageRequest]]: - pass + def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: + yield from [] def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]: pass diff --git a/ingestion/src/metadata/ingestion/source/database/salesforce.py b/ingestion/src/metadata/ingestion/source/database/salesforce.py index 727b66437ab..85d86fc7416 100644 --- a/ingestion/src/metadata/ingestion/source/database/salesforce.py +++ b/ingestion/src/metadata/ingestion/source/database/salesforce.py @@ -219,10 +219,8 @@ class SalesforceSource(DatabaseServiceSource): return "INT" return "VARCHAR" - def yield_view_lineage( - self, table_name_and_type: Tuple[str, str] - ) -> Optional[Iterable[AddLineageRequest]]: - pass + def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: + yield from [] def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]: pass