From 81cbf93df595f912f38d4fb60ec7d4ecd06b053f Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Mon, 5 Dec 2022 20:35:55 +0530 Subject: [PATCH] Fix #9139: Fix shcema filter being added twice to status (#9148) --- .../source/database/common_db_source.py | 19 ++--------------- .../source/database/database_service.py | 21 ++++++++++++++++++- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 8a576bc0a8e..f078154d2f4 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -27,7 +27,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( ) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table, TablePartition, TableType from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, @@ -54,7 +53,7 @@ from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandl from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource from metadata.utils import fqn from metadata.utils.connections import get_connection, test_connection -from metadata.utils.filters import filter_by_schema, filter_by_table +from metadata.utils.filters import filter_by_table from metadata.utils.helpers import calculate_execution_time_generator from metadata.utils.logger import ingestion_logger @@ -147,21 +146,7 @@ class CommonDbSourceService( """ return schema names """ - for schema_name in self.get_raw_database_schema_names(): - schema_fqn = fqn.build( - self.metadata, - entity_type=DatabaseSchema, - service_name=self.context.database_service.name.__root__, - database_name=self.context.database.name.__root__, - schema_name=schema_name, - ) - if filter_by_schema( - self.source_config.schemaFilterPattern, - schema_fqn if self.source_config.useFqnForFiltering else schema_name, - ): - self.status.filter(schema_fqn, "Schema Filtered Out") - continue - yield schema_name + yield from self._get_filtered_schema_names() def yield_database_schema( self, schema_name: str diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index deb3e1e0ade..6bb5cf22d61 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -74,6 +74,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.dbt_source import DBTMixin from metadata.utils import fqn from metadata.utils.dbt_config import get_dbt_details +from metadata.utils.filters import filter_by_schema from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -525,6 +526,24 @@ class DatabaseServiceSource( for schema in schema_list: yield from self.delete_schema_tables(schema.fullyQualifiedName.__root__) + def _get_filtered_schema_names(self, add_to_status: bool = True) -> Iterable[str]: + for schema_name in self.get_raw_database_schema_names(): + schema_fqn = fqn.build( + self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.database_service.name.__root__, + database_name=self.context.database.name.__root__, + schema_name=schema_name, + ) + if filter_by_schema( + self.source_config.schemaFilterPattern, + schema_fqn if self.source_config.useFqnForFiltering else schema_name, + ): + if add_to_status: + self.status.filter(schema_fqn, "Schema Filtered Out") + continue + yield schema_name + def mark_tables_as_deleted(self): """ Use the current inspector to mark tables as deleted @@ -539,7 +558,7 @@ class DatabaseServiceSource( # If markAllDeletedTables is False (Default), Only delete tables which are deleted from the datasource else: - schema_names_list = self.get_database_schema_names() + schema_names_list = self._get_filtered_schema_names(add_to_status=False) for schema_name in schema_names_list: schema_fqn = fqn.build( self.metadata,