From fb087c5e35b630b93011f50e249fb8eb64fe522f Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Thu, 25 May 2023 00:35:28 +0200 Subject: [PATCH] fix(ingest/redshift):fixing schema filter (#8119) --- .../ingestion/source/redshift/config.py | 28 +++++ .../ingestion/source/redshift/redshift.py | 104 +++++++++++------- .../source/redshift/redshift_schema.py | 6 +- 3 files changed, 94 insertions(+), 44 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index f9b9fb73de..93850607e5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -1,3 +1,4 @@ +import logging from enum import Enum from typing import Any, Dict, List, Optional @@ -5,6 +6,7 @@ from pydantic import root_validator from pydantic.fields import Field from datahub.configuration import ConfigModel +from datahub.configuration.common import AllowDenyPattern from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated from datahub.configuration.source_common import DatasetLineageProviderConfigBase from datahub.ingestion.source.data_lake_common.path_spec import PathSpec @@ -16,6 +18,8 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( ) from datahub.ingestion.source.usage.usage_common import BaseUsageConfig +logger = logging.Logger(__name__) + # The lineage modes are documented in the Redshift source's docstring. class LineageMode(Enum): @@ -123,6 +127,11 @@ class RedshiftConfig( ) extra_client_options: Dict[str, Any] = {} + match_fully_qualified_names: bool = Field( + default=False, + description="Whether `schema_pattern` is matched against fully qualified schema name `.`.", + ) + @root_validator(pre=True) def check_email_is_set_on_usage(cls, values): if values.get("include_usage_statistics"): @@ -137,3 +146,22 @@ class RedshiftConfig( "database_alias" ), "either database or database_alias must be set" return values + + @root_validator(pre=False) + def backward_compatibility_configs_set(cls, values: Dict) -> Dict: + match_fully_qualified_names = values.get("match_fully_qualified_names") + + schema_pattern: Optional[AllowDenyPattern] = values.get("schema_pattern") + + if ( + schema_pattern is not None + and schema_pattern != AllowDenyPattern.allow_all() + and match_fully_qualified_names is not None + and not match_fully_qualified_names + ): + logger.warning( + "Please update `schema_pattern` to match against fully qualified schema name `.` and set config `match_fully_qualified_names : True`." + "Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. " + "The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`." + ) + return values diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 815e0619dc..2423785776 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -9,6 +9,7 @@ import psycopg2 # noqa: F401 import pydantic import redshift_connector +from datahub.configuration.pattern_utils import is_schema_allowed from datahub.emitter.mce_builder import ( make_data_platform_urn, make_dataset_urn_with_platform_instance, @@ -416,10 +417,17 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): for schema in RedshiftDataDictionary.get_schemas( conn=connection, database=database ): - logger.info(f"Processing schema: {database}.{schema.name}") - if not self.config.schema_pattern.allowed(schema.name): + if not is_schema_allowed( + self.config.schema_pattern, + schema.name, + database, + self.config.match_fully_qualified_names, + ): self.report.report_dropped(f"{database}.{schema.name}") continue + + logger.info(f"Processing schema: {database}.{schema.name}") + self.db_schemas[database][schema.name] = schema yield from self.process_schema(connection, database, schema) @@ -756,48 +764,60 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): def cache_tables_and_views(self, connection, database): tables, views = RedshiftDataDictionary.get_tables_and_views(conn=connection) for schema in tables: - if self.config.schema_pattern.allowed(f"{database}.{schema}"): - logging.debug( - f"Not caching tables for schema {database}.{schema} which is not allowed by schema_pattern" + if not is_schema_allowed( + self.config.schema_pattern, + schema, + database, + self.config.match_fully_qualified_names, + ): + logger.debug( + f"Not caching table for schema {database}.{schema} which is not allowed by schema_pattern" ) - self.db_tables[database][schema] = [] - for table in tables[schema]: - if self.config.table_pattern.allowed( - f"{database}.{schema}.{table.name}" - ): - self.db_tables[database][schema].append(table) - self.report.table_cached[f"{database}.{schema}"] = ( - self.report.table_cached.get(f"{database}.{schema}", 0) + 1 - ) - else: - logging.debug( - f"Table {database}.{schema}.{table.name} is filtered by table_pattern" - ) - self.report.table_filtered[f"{database}.{schema}"] = ( - self.report.table_filtered.get(f"{database}.{schema}", 0) - + 1 - ) + continue + + self.db_tables[database][schema] = [] + for table in tables[schema]: + if self.config.table_pattern.allowed( + f"{database}.{schema}.{table.name}" + ): + self.db_tables[database][schema].append(table) + self.report.table_cached[f"{database}.{schema}"] = ( + self.report.table_cached.get(f"{database}.{schema}", 0) + 1 + ) + else: + logger.debug( + f"Table {database}.{schema}.{table.name} is filtered by table_pattern" + ) + self.report.table_filtered[f"{database}.{schema}"] = ( + self.report.table_filtered.get(f"{database}.{schema}", 0) + 1 + ) + for schema in views: - logging.debug( - f"Not caching views for schema {database}.{schema} which is not allowed by schema_pattern" - ) - if self.config.schema_pattern.allowed(f"{database}.{schema}"): - self.db_views[database][schema] = [] - for view in views[schema]: - if self.config.view_pattern.allowed( - f"{database}.{schema}.{view.name}" - ): - self.db_views[database][schema].append(view) - self.report.view_cached[f"{database}.{schema}"] = ( - self.report.view_cached.get(f"{database}.{schema}", 0) + 1 - ) - else: - logging.debug( - f"View {database}.{schema}.{table.name} is filtered by view_pattern" - ) - self.report.view_filtered[f"{database}.{schema}"] = ( - self.report.view_filtered.get(f"{database}.{schema}", 0) + 1 - ) + if not is_schema_allowed( + self.config.schema_pattern, + schema, + database, + self.config.match_fully_qualified_names, + ): + logger.debug( + f"Not caching views for schema {database}.{schema} which is not allowed by schema_pattern" + ) + continue + + self.db_views[database][schema] = [] + for view in views[schema]: + if self.config.view_pattern.allowed(f"{database}.{schema}.{view.name}"): + self.db_views[database][schema].append(view) + self.report.view_cached[f"{database}.{schema}"] = ( + self.report.view_cached.get(f"{database}.{schema}", 0) + 1 + ) + else: + logger.debug( + f"View {database}.{schema}.{table.name} is filtered by view_pattern" + ) + self.report.view_filtered[f"{database}.{schema}"] = ( + self.report.view_filtered.get(f"{database}.{schema}", 0) + 1 + ) def get_all_tables( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py index 42f10d6c59..4a13d17d2c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py @@ -242,9 +242,11 @@ class RedshiftDataDictionary: ) ) for schema_key, schema_tables in tables.items(): - logger.info(f"In schema: {schema_key} discovered {len(tables)} tables") + logger.info( + f"In schema: {schema_key} discovered {len(schema_tables)} tables" + ) for schema_key, schema_views in views.items(): - logger.info(f"In schema: {schema_key} discovered {len(views)} views") + logger.info(f"In schema: {schema_key} discovered {len(schema_views)} views") return tables, views