fix(ingest/redshift):fixing schema filter (#8119)

This commit is contained in:
Tamas Nemeth 2023-05-25 00:35:28 +02:00 committed by GitHub
parent 84270bcac8
commit fb087c5e35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 44 deletions

View File

@ -1,3 +1,4 @@
import logging
from enum import Enum
from typing import Any, Dict, List, Optional
@ -5,6 +6,7 @@ from pydantic import root_validator
from pydantic.fields import Field
from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
@ -16,6 +18,8 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
logger = logging.Logger(__name__)
# The lineage modes are documented in the Redshift source's docstring.
class LineageMode(Enum):
@ -123,6 +127,11 @@ class RedshiftConfig(
)
extra_client_options: Dict[str, Any] = {}
match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<database>.<schema>`.",
)
@root_validator(pre=True)
def check_email_is_set_on_usage(cls, values):
if values.get("include_usage_statistics"):
@ -137,3 +146,22 @@ class RedshiftConfig(
"database_alias"
), "either database or database_alias must be set"
return values
@root_validator(pre=False)
def backward_compatibility_configs_set(cls, values: Dict) -> Dict:
match_fully_qualified_names = values.get("match_fully_qualified_names")
schema_pattern: Optional[AllowDenyPattern] = values.get("schema_pattern")
if (
schema_pattern is not None
and schema_pattern != AllowDenyPattern.allow_all()
and match_fully_qualified_names is not None
and not match_fully_qualified_names
):
logger.warning(
"Please update `schema_pattern` to match against fully qualified schema name `<database_name>.<schema_name>` and set config `match_fully_qualified_names : True`."
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. "
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)
return values

View File

@ -9,6 +9,7 @@ import psycopg2 # noqa: F401
import pydantic
import redshift_connector
from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataset_urn_with_platform_instance,
@ -416,10 +417,17 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
for schema in RedshiftDataDictionary.get_schemas(
conn=connection, database=database
):
logger.info(f"Processing schema: {database}.{schema.name}")
if not self.config.schema_pattern.allowed(schema.name):
if not is_schema_allowed(
self.config.schema_pattern,
schema.name,
database,
self.config.match_fully_qualified_names,
):
self.report.report_dropped(f"{database}.{schema.name}")
continue
logger.info(f"Processing schema: {database}.{schema.name}")
self.db_schemas[database][schema.name] = schema
yield from self.process_schema(connection, database, schema)
@ -756,48 +764,60 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
def cache_tables_and_views(self, connection, database):
tables, views = RedshiftDataDictionary.get_tables_and_views(conn=connection)
for schema in tables:
if self.config.schema_pattern.allowed(f"{database}.{schema}"):
logging.debug(
f"Not caching tables for schema {database}.{schema} which is not allowed by schema_pattern"
if not is_schema_allowed(
self.config.schema_pattern,
schema,
database,
self.config.match_fully_qualified_names,
):
logger.debug(
f"Not caching table for schema {database}.{schema} which is not allowed by schema_pattern"
)
self.db_tables[database][schema] = []
for table in tables[schema]:
if self.config.table_pattern.allowed(
f"{database}.{schema}.{table.name}"
):
self.db_tables[database][schema].append(table)
self.report.table_cached[f"{database}.{schema}"] = (
self.report.table_cached.get(f"{database}.{schema}", 0) + 1
)
else:
logging.debug(
f"Table {database}.{schema}.{table.name} is filtered by table_pattern"
)
self.report.table_filtered[f"{database}.{schema}"] = (
self.report.table_filtered.get(f"{database}.{schema}", 0)
+ 1
)
continue
self.db_tables[database][schema] = []
for table in tables[schema]:
if self.config.table_pattern.allowed(
f"{database}.{schema}.{table.name}"
):
self.db_tables[database][schema].append(table)
self.report.table_cached[f"{database}.{schema}"] = (
self.report.table_cached.get(f"{database}.{schema}", 0) + 1
)
else:
logger.debug(
f"Table {database}.{schema}.{table.name} is filtered by table_pattern"
)
self.report.table_filtered[f"{database}.{schema}"] = (
self.report.table_filtered.get(f"{database}.{schema}", 0) + 1
)
for schema in views:
logging.debug(
f"Not caching views for schema {database}.{schema} which is not allowed by schema_pattern"
)
if self.config.schema_pattern.allowed(f"{database}.{schema}"):
self.db_views[database][schema] = []
for view in views[schema]:
if self.config.view_pattern.allowed(
f"{database}.{schema}.{view.name}"
):
self.db_views[database][schema].append(view)
self.report.view_cached[f"{database}.{schema}"] = (
self.report.view_cached.get(f"{database}.{schema}", 0) + 1
)
else:
logging.debug(
f"View {database}.{schema}.{table.name} is filtered by view_pattern"
)
self.report.view_filtered[f"{database}.{schema}"] = (
self.report.view_filtered.get(f"{database}.{schema}", 0) + 1
)
if not is_schema_allowed(
self.config.schema_pattern,
schema,
database,
self.config.match_fully_qualified_names,
):
logger.debug(
f"Not caching views for schema {database}.{schema} which is not allowed by schema_pattern"
)
continue
self.db_views[database][schema] = []
for view in views[schema]:
if self.config.view_pattern.allowed(f"{database}.{schema}.{view.name}"):
self.db_views[database][schema].append(view)
self.report.view_cached[f"{database}.{schema}"] = (
self.report.view_cached.get(f"{database}.{schema}", 0) + 1
)
else:
logger.debug(
f"View {database}.{schema}.{table.name} is filtered by view_pattern"
)
self.report.view_filtered[f"{database}.{schema}"] = (
self.report.view_filtered.get(f"{database}.{schema}", 0) + 1
)
def get_all_tables(
self,

View File

@ -242,9 +242,11 @@ class RedshiftDataDictionary:
)
)
for schema_key, schema_tables in tables.items():
logger.info(f"In schema: {schema_key} discovered {len(tables)} tables")
logger.info(
f"In schema: {schema_key} discovered {len(schema_tables)} tables"
)
for schema_key, schema_views in views.items():
logger.info(f"In schema: {schema_key} discovered {len(views)} views")
logger.info(f"In schema: {schema_key} discovered {len(schema_views)} views")
return tables, views