Fix for wrong containers on Athena (#4167)

This commit is contained in:
Tamas Nemeth 2022-02-17 10:34:02 +01:00 committed by GitHub
parent 2e7f3ae6f3
commit 585aad1aac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 3 deletions

View File

@ -6,6 +6,8 @@ from pyathena.common import BaseCursor
from pyathena.model import AthenaTableMetadata
from sqlalchemy.engine.reflection import Inspector
from datahub.emitter.mcp_builder import DatabaseKey, gen_containers
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemyConfig,
SQLAlchemySource,
@ -95,6 +97,32 @@ class AthenaSource(SQLAlchemySource):
return [schema for schema in schemas if schema == athena_config.database]
return schemas
def gen_database_containers(
self, database: str
) -> typing.Iterable[MetadataWorkUnit]:
# In Athena the schema is the database and database is not existing
return []
def gen_schema_key(self, db_name: str, schema: str) -> DatabaseKey:
return DatabaseKey(
platform=self.platform, instance=self.config.env, database=schema
)
def gen_schema_containers(
self, schema: str, db_name: str
) -> typing.Iterable[MetadataWorkUnit]:
database_container_key = self.gen_database_key(database=schema)
container_workunits = gen_containers(
database_container_key,
schema,
["Database"],
)
for wu in container_workunits:
self.report.report_workunit(wu)
yield wu
def close(self):
if self.cursor:
self.cursor.close()

View File

@ -585,7 +585,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_schema_key(db_name, schema)
database_container_key = self.gen_database_key(database=db_name)
database_container_key: Optional[PlatformKey] = None
if db_name is not None:
database_container_key = self.gen_database_key(database=db_name)
container_workunits = gen_containers(
schema_container_key,
@ -625,8 +627,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
self.report.report_dropped(f"{schema}.*")
continue
if db_name:
yield from self.gen_schema_containers(schema, db_name)
yield from self.gen_schema_containers(schema, db_name)
if sql_config.include_tables:
yield from self.loop_tables(inspector, schema, sql_config)