fix(ingest) Athena: db filter was not applied (#4127)

* Fix for db filter on Athena

* Black formatting

* Addressing pr comments

* Remove unneeded imports
This commit is contained in:
Tamas Nemeth 2022-02-11 22:42:41 +01:00 committed by GitHub
parent 7fcc71ed81
commit d24c52828b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,4 +1,7 @@
from typing import Optional import typing
from typing import List, Optional
from sqlalchemy.engine.reflection import Inspector
from datahub.ingestion.source.sql.sql_common import ( from datahub.ingestion.source.sql.sql_common import (
SQLAlchemyConfig, SQLAlchemyConfig,
@ -40,3 +43,11 @@ class AthenaSource(SQLAlchemySource):
def create(cls, config_dict, ctx): def create(cls, config_dict, ctx):
config = AthenaConfig.parse_obj(config_dict) config = AthenaConfig.parse_obj(config_dict)
return cls(config, ctx) return cls(config, ctx)
# It seems like database/schema filter in the connection string does not work and this to work around that
def get_schema_names(self, inspector: Inspector) -> List[str]:
athena_config = typing.cast(AthenaConfig, self.config)
schemas = inspector.get_schema_names()
if athena_config.database:
return [schema for schema in schemas if schema == athena_config.database]
return schemas