fix(ingest/druid) Handling gracefully if no table returned in a schema (#8203)

This commit is contained in:
Tamas Nemeth 2023-06-15 15:10:21 +02:00 committed by GitHub
parent 6ce0a248fd
commit a3ceee46d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,6 +1,8 @@
# This import verifies that the dependencies are available. # This import verifies that the dependencies are available.
import pydruid # noqa: F401 import pydruid # noqa: F401
from pydantic.fields import Field from pydantic.fields import Field
from pydruid.db.sqlalchemy import DruidDialect
from sqlalchemy.exc import ResourceClosedError
from datahub.configuration.common import AllowDenyPattern from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.api.decorators import ( from datahub.ingestion.api.decorators import (
@ -14,12 +16,25 @@ from datahub.ingestion.api.decorators import (
from datahub.ingestion.source.sql.sql_common import SQLAlchemySource from datahub.ingestion.source.sql.sql_common import SQLAlchemySource
from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig
get_table_names_source = DruidDialect.get_table_names
def get_table_names(self, connection, schema=None, **kwargs):
try:
return get_table_names_source(self, connection, schema=schema, **kwargs)
# Druid throws ResourceClosedError when there is no table in the schema
except ResourceClosedError:
return []
DruidDialect.get_table_names = get_table_names
class DruidConfig(BasicSQLAlchemyConfig): class DruidConfig(BasicSQLAlchemyConfig):
# defaults # defaults
scheme = "druid" scheme = "druid"
schema_pattern: AllowDenyPattern = Field( schema_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern(deny=["^(lookup|sys).*"]), default=AllowDenyPattern(deny=["^(lookup|sysgit|view).*"]),
description="regex patterns for schemas to filter in ingestion.", description="regex patterns for schemas to filter in ingestion.",
) )