Fix #9139: Fix shcema filter being added twice to status (#9148)

This commit is contained in:
Mayur Singal 2022-12-05 20:35:55 +05:30 committed by GitHub
parent 9d93bb2693
commit 81cbf93df5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 18 deletions

View File

@ -27,7 +27,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Table, TablePartition, TableType
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
@ -54,7 +53,7 @@ from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandl
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
from metadata.utils import fqn
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.filters import filter_by_table
from metadata.utils.helpers import calculate_execution_time_generator
from metadata.utils.logger import ingestion_logger
@ -147,21 +146,7 @@ class CommonDbSourceService(
"""
return schema names
"""
for schema_name in self.get_raw_database_schema_names():
schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.database_service.name.__root__,
database_name=self.context.database.name.__root__,
schema_name=schema_name,
)
if filter_by_schema(
self.source_config.schemaFilterPattern,
schema_fqn if self.source_config.useFqnForFiltering else schema_name,
):
self.status.filter(schema_fqn, "Schema Filtered Out")
continue
yield schema_name
yield from self._get_filtered_schema_names()
def yield_database_schema(
self, schema_name: str

View File

@ -74,6 +74,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.dbt_source import DBTMixin
from metadata.utils import fqn
from metadata.utils.dbt_config import get_dbt_details
from metadata.utils.filters import filter_by_schema
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -525,6 +526,24 @@ class DatabaseServiceSource(
for schema in schema_list:
yield from self.delete_schema_tables(schema.fullyQualifiedName.__root__)
def _get_filtered_schema_names(self, add_to_status: bool = True) -> Iterable[str]:
for schema_name in self.get_raw_database_schema_names():
schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.database_service.name.__root__,
database_name=self.context.database.name.__root__,
schema_name=schema_name,
)
if filter_by_schema(
self.source_config.schemaFilterPattern,
schema_fqn if self.source_config.useFqnForFiltering else schema_name,
):
if add_to_status:
self.status.filter(schema_fqn, "Schema Filtered Out")
continue
yield schema_name
def mark_tables_as_deleted(self):
"""
Use the current inspector to mark tables as deleted
@ -539,7 +558,7 @@ class DatabaseServiceSource(
# If markAllDeletedTables is False (Default), Only delete tables which are deleted from the datasource
else:
schema_names_list = self.get_database_schema_names()
schema_names_list = self._get_filtered_schema_names(add_to_status=False)
for schema_name in schema_names_list:
schema_fqn = fqn.build(
self.metadata,