diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 577569773d7..4f35b856789 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -27,7 +27,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Table, TablePartition, TableType from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, @@ -39,11 +38,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.lineage.parser import LineageParser -from metadata.ingestion.lineage.sql_lineage import ( - get_lineage_by_query, - get_lineage_via_table_entity, -) from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -71,7 +65,6 @@ class TableNameAndType(BaseModel): type_: TableType = TableType.Regular -# pylint: disable=too-many-public-methods class CommonDbSourceService( DatabaseServiceSource, SqlColumnHandlerMixin, SqlAlchemySource, ABC ): @@ -102,7 +95,6 @@ class CommonDbSourceService( self._connection = None # Lazy init as well self.table_constraints = None self.database_source_state = set() - self.context.table_views = [] super().__init__() def set_inspector(self, database_name: str) -> None: @@ -375,15 +367,6 @@ class CommonDbSourceService( table_request.tableType = TableType.Partitioned.value table_request.tablePartition = partition_details - if table_type == TableType.View or view_definition: - table_view = { - "table_name": table_name, - "table_type": table_type, - "schema_name": schema_name, - "db_name": db_name, - } - self.context.table_views.append(table_view) - yield table_request self.register_record(table_request=table_request) @@ -392,58 +375,6 @@ class CommonDbSourceService( logger.warning(f"Unexpected exception to yield table [{table_name}]: {exc}") self.status.failures.append(f"{self.config.serviceName}.{table_name}") - def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: - logger.info("Processing Lineage for Views") - for view in self.context.table_views: - table_name = view.get("table_name") - table_type = view.get("table_type") - schema_name = view.get("schema_name") - db_name = view.get("db_name") - table_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=self.context.database_service.name.__root__, - database_name=db_name, - schema_name=schema_name, - table_name=table_name, - ) - table_entity = self.metadata.get_by_name( - entity=Table, - fqn=table_fqn, - ) - view_definition = self.get_view_definition( - table_type=table_type, - table_name=table_name, - schema_name=schema_name, - inspector=self.inspector, - ) - - try: - lineage_parser = LineageParser(view_definition) - if lineage_parser.source_tables and lineage_parser.target_tables: - yield from get_lineage_by_query( - self.metadata, - query=view_definition, - service_name=self.context.database_service.name.__root__, - database_name=db_name, - schema_name=schema_name, - ) or [] - - else: - yield from get_lineage_via_table_entity( - self.metadata, - table_entity=table_entity, - service_name=self.context.database_service.name.__root__, - database_name=db_name, - schema_name=schema_name, - query=view_definition, - ) or [] - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Could not parse query [{view_definition}] ingesting lineage failed: {exc}" - ) - def test_connection(self) -> None: """ Used a timed-bound function to test that the engine diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 0ee18c98d1a..4777043c6b2 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -23,7 +23,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( ) from metadata.generated.schema.api.data.createLocation import CreateLocationRequest from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.services.createStorageService import ( CreateStorageServiceRequest, ) @@ -119,9 +118,6 @@ class DatabaseServiceTopology(ServiceTopology): ), ], children=["database"], - post_process=[ - "yield_view_lineage", - ], ) database = TopologyNode( producer="get_database_names", @@ -302,13 +298,6 @@ class DatabaseServiceSource( if self.source_config.includeTags: yield from self.yield_tag(schema_name) or [] - @abstractmethod - def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: - """ - From topology. - Parses view definition to get lineage information - """ - @abstractmethod def yield_table( self, table_name_and_type: Tuple[str, TableType] diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index 6f938f6af86..4aca9df6ad6 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -20,7 +20,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( Column, @@ -567,9 +566,6 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- return cols - def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: - yield from [] - def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]: pass diff --git a/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py index f9a7a57bb3c..8b53fdf5f19 100644 --- a/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py @@ -23,7 +23,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Column, Table, TableType from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import ( @@ -395,9 +394,6 @@ class DeltalakeSource(DatabaseServiceSource): return parsed_columns - def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: - yield from [] - def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]: pass diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py index 091cdd30c33..4d63f866374 100644 --- a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py @@ -22,7 +22,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Column, Table, TableType from metadata.generated.schema.entity.services.connections.database.domoDatabaseConnection import ( DomoDatabaseConnection, @@ -190,9 +189,6 @@ class DomodatabaseSource(DatabaseServiceSource): def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]: pass - def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: - yield from [] - def standardize_table_name( # pylint: disable=unused-argument self, schema: str, table: str ) -> str: diff --git a/ingestion/src/metadata/ingestion/source/database/dynamodb/metadata.py b/ingestion/src/metadata/ingestion/source/database/dynamodb/metadata.py index 04a8213912d..9c7fc990976 100644 --- a/ingestion/src/metadata/ingestion/source/database/dynamodb/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dynamodb/metadata.py @@ -20,7 +20,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Column, Table, TableType from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import ( DynamoDBConnection, @@ -214,9 +213,6 @@ class DynamodbSource(DatabaseServiceSource): logger.warning(f"Unexpected exception to yield table [{table_name}]: {exc}") self.status.failures.append(f"{self.config.serviceName}.{table_name}") - def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: - yield from [] - def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]: pass diff --git a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py index cc231400ded..d29fc85b289 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py @@ -20,7 +20,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( ) from metadata.generated.schema.api.data.createLocation import CreateLocationRequest from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.location import Location, LocationType @@ -373,9 +372,6 @@ class GlueSource(DatabaseServiceSource): def standardize_table_name(self, _: str, table: str) -> str: return table[:128] - def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: - yield from [] - def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]: pass diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index 71f410a41cb..c0a41827f35 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -17,8 +17,14 @@ from abc import ABC from typing import Iterable, Iterator, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Table, TableType from metadata.generated.schema.type.tableQuery import TableQuery -from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query +from metadata.ingestion.lineage.parser import LineageParser +from metadata.ingestion.lineage.sql_lineage import ( + get_lineage_by_query, + get_lineage_via_table_entity, +) from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.utils.logger import ingestion_logger @@ -88,11 +94,39 @@ class LineageSource(QueryParserSource, ABC): logger.debug(traceback.format_exc()) logger.error(f"Source usage processing error: {exc}") - def next_record(self) -> Iterable[AddLineageRequest]: + def get_database_fqns(self): """ - Based on the query logs, prepare the lineage - and send it to the sink + List all service databases """ + return [ + database.fullyQualifiedName.__root__ + for database in self.metadata.list_all_entities( + entity=Database, + params={"service": self.config.serviceName}, + ) + ] + + def get_all_tables_from_api(self) -> Iterable[Table]: + """ + Get all the views from the API with their viewDefinition + """ + for database_fqn in self.get_database_fqns(): + logger.info(f"Getting view lineage from {database_fqn}") + yield from self.metadata.list_all_entities( + entity=Table, + fields=[ + "viewDefinition", + ], + params={"service": self.config.serviceName, "database": database_fqn}, + ) + + def yield_table_query(self): + """ + Basic generator getting lineage from + table queries that we pick up from the + service database + """ + logger.info("Processing query history lineage...") for table_query in self.get_table_query(): lineages = get_lineage_by_query( @@ -105,3 +139,54 @@ class LineageSource(QueryParserSource, ABC): for lineage_request in lineages or []: yield lineage_request + + def yield_view_ddl(self): + """ + Obtain the tables' viewDefinition to pick + up the lineage from its DDL + """ + logger.info("Processing view lineage...") + for table_entity in self.get_all_tables_from_api(): + # We only pick up views whose viewDefinition is informed + if ( + table_entity.tableType in {TableType.View, TableType.MaterializedView} + and table_entity.viewDefinition + and table_entity.viewDefinition.__root__ + ): + + try: + + lineage_parser = LineageParser(table_entity.viewDefinition.__root__) + if lineage_parser.source_tables and lineage_parser.target_tables: + yield from get_lineage_by_query( + self.metadata, + query=table_entity.viewDefinition.__root__, + service_name=self.config.serviceName, + database_name=table_entity.database.name, + schema_name=table_entity.databaseSchema.name, + ) or [] + + else: + yield from get_lineage_via_table_entity( + self.metadata, + table_entity=table_entity, + service_name=self.config.serviceName, + database_name=table_entity.database.name, + schema_name=table_entity.databaseSchema.name, + query=table_entity.viewDefinition.__root__, + ) or [] + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Could not parse query [{table_entity.viewDefinition.__root__}]." + f" Ingesting lineage failed due to: {exc}" + ) + + def next_record(self) -> Iterable[AddLineageRequest]: + """ + Based on the query logs, prepare the lineage + and send it to the sink + """ + yield from self.yield_table_query() + yield from self.yield_view_ddl() diff --git a/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py b/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py index 0cc7384a298..689f0f5c466 100644 --- a/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py @@ -19,7 +19,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import ( Column, Constraint, @@ -247,9 +246,6 @@ class SalesforceSource(DatabaseServiceSource): return "INT" return "VARCHAR" - def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: - yield from [] - def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]: pass