mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-24 16:38:19 +00:00
fix(ingest/glue): Add additional checks and logging when specifying catalog_id (#12168)
This commit is contained in:
parent
f4b33b59d1
commit
756b199506
@ -52,6 +52,7 @@ from datahub.ingestion.api.decorators import (
|
||||
platform_name,
|
||||
support_status,
|
||||
)
|
||||
from datahub.ingestion.api.report import EntityFilterReport
|
||||
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.ingestion.source.aws import s3_util
|
||||
@ -115,7 +116,6 @@ from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_col
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
DEFAULT_PLATFORM = "glue"
|
||||
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]
|
||||
|
||||
@ -220,6 +220,7 @@ class GlueSourceConfig(
|
||||
class GlueSourceReport(StaleEntityRemovalSourceReport):
|
||||
tables_scanned = 0
|
||||
filtered: List[str] = dataclass_field(default_factory=list)
|
||||
databases: EntityFilterReport = EntityFilterReport.field(type="database")
|
||||
|
||||
num_job_script_location_missing: int = 0
|
||||
num_job_script_location_invalid: int = 0
|
||||
@ -668,6 +669,7 @@ class GlueSource(StatefulIngestionSourceBase):
|
||||
return MetadataWorkUnit(id=f'{job_name}-{node["Id"]}', mce=mce)
|
||||
|
||||
def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
|
||||
logger.debug("Getting all databases")
|
||||
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetDatabases.html
|
||||
paginator = self.glue_client.get_paginator("get_databases")
|
||||
|
||||
@ -684,10 +686,18 @@ class GlueSource(StatefulIngestionSourceBase):
|
||||
pattern += "[?!TargetDatabase]"
|
||||
|
||||
for database in paginator_response.search(pattern):
|
||||
if self.source_config.database_pattern.allowed(database["Name"]):
|
||||
if (not self.source_config.database_pattern.allowed(database["Name"])) or (
|
||||
self.source_config.catalog_id
|
||||
and database.get("CatalogId")
|
||||
and database.get("CatalogId") != self.source_config.catalog_id
|
||||
):
|
||||
self.report.databases.dropped(database["Name"])
|
||||
else:
|
||||
self.report.databases.processed(database["Name"])
|
||||
yield database
|
||||
|
||||
def get_tables_from_database(self, database: Mapping[str, Any]) -> Iterable[Dict]:
|
||||
logger.debug(f"Getting tables from database {database['Name']}")
|
||||
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetTables.html
|
||||
paginator = self.glue_client.get_paginator("get_tables")
|
||||
database_name = database["Name"]
|
||||
|
||||
@ -124,7 +124,7 @@
|
||||
"CreateTime": "June 01, 2021 at 14:55:13"
|
||||
},
|
||||
"name": "empty-database",
|
||||
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
|
||||
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
|
||||
"env": "PROD"
|
||||
}
|
||||
}
|
||||
|
||||
@ -124,7 +124,7 @@
|
||||
"CreateTime": "June 01, 2021 at 14:55:13"
|
||||
},
|
||||
"name": "empty-database",
|
||||
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
|
||||
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
|
||||
"env": "PROD"
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,7 +129,7 @@
|
||||
"CreateTime": "June 01, 2021 at 14:55:13"
|
||||
},
|
||||
"name": "empty-database",
|
||||
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
|
||||
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
|
||||
"env": "PROD"
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,8 +35,8 @@ from tests.test_helpers.state_helpers import (
|
||||
validate_all_providers_have_committed_successfully,
|
||||
)
|
||||
from tests.unit.glue.test_glue_source_stubs import (
|
||||
databases_1,
|
||||
databases_2,
|
||||
empty_database,
|
||||
flights_database,
|
||||
get_bucket_tagging,
|
||||
get_databases_delta_response,
|
||||
get_databases_response,
|
||||
@ -64,6 +64,7 @@ from tests.unit.glue.test_glue_source_stubs import (
|
||||
tables_2,
|
||||
tables_profiling_1,
|
||||
target_database_tables,
|
||||
test_database,
|
||||
)
|
||||
|
||||
FROZEN_TIME = "2020-04-14 07:00:00"
|
||||
@ -310,6 +311,40 @@ def test_config_without_platform():
|
||||
assert source.platform == "glue"
|
||||
|
||||
|
||||
def test_get_databases_filters_by_catalog():
|
||||
def format_databases(databases):
|
||||
return set(d["Name"] for d in databases)
|
||||
|
||||
all_catalogs_source: GlueSource = GlueSource(
|
||||
config=GlueSourceConfig(aws_region="us-west-2"),
|
||||
ctx=PipelineContext(run_id="glue-source-test"),
|
||||
)
|
||||
with Stubber(all_catalogs_source.glue_client) as glue_stubber:
|
||||
glue_stubber.add_response("get_databases", get_databases_response, {})
|
||||
|
||||
expected = [flights_database, test_database, empty_database]
|
||||
actual = all_catalogs_source.get_all_databases()
|
||||
assert format_databases(actual) == format_databases(expected)
|
||||
assert all_catalogs_source.report.databases.dropped_entities.as_obj() == []
|
||||
|
||||
catalog_id = "123412341234"
|
||||
single_catalog_source: GlueSource = GlueSource(
|
||||
config=GlueSourceConfig(catalog_id=catalog_id, aws_region="us-west-2"),
|
||||
ctx=PipelineContext(run_id="glue-source-test"),
|
||||
)
|
||||
with Stubber(single_catalog_source.glue_client) as glue_stubber:
|
||||
glue_stubber.add_response(
|
||||
"get_databases", get_databases_response, {"CatalogId": catalog_id}
|
||||
)
|
||||
|
||||
expected = [flights_database, test_database]
|
||||
actual = single_catalog_source.get_all_databases()
|
||||
assert format_databases(actual) == format_databases(expected)
|
||||
assert single_catalog_source.report.databases.dropped_entities.as_obj() == [
|
||||
"empty-database"
|
||||
]
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
|
||||
deleted_actor_golden_mcs = "{}/glue_deleted_actor_mces_golden.json".format(
|
||||
@ -357,8 +392,8 @@ def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
|
||||
tables_on_first_call = tables_1
|
||||
tables_on_second_call = tables_2
|
||||
mock_get_all_databases_and_tables.side_effect = [
|
||||
(databases_1, tables_on_first_call),
|
||||
(databases_2, tables_on_second_call),
|
||||
([flights_database], tables_on_first_call),
|
||||
([test_database], tables_on_second_call),
|
||||
]
|
||||
|
||||
pipeline_run1 = run_and_get_pipeline(pipeline_config_dict)
|
||||
|
||||
@ -88,12 +88,14 @@ get_databases_response = {
|
||||
"Permissions": ["ALL"],
|
||||
}
|
||||
],
|
||||
"CatalogId": "123412341234",
|
||||
"CatalogId": "000000000000",
|
||||
},
|
||||
]
|
||||
}
|
||||
databases_1 = [{"Name": "flights-database", "CatalogId": "123412341234"}]
|
||||
databases_2 = [{"Name": "test-database", "CatalogId": "123412341234"}]
|
||||
flights_database = {"Name": "flights-database", "CatalogId": "123412341234"}
|
||||
test_database = {"Name": "test-database", "CatalogId": "123412341234"}
|
||||
empty_database = {"Name": "empty-database", "CatalogId": "000000000000"}
|
||||
|
||||
tables_1 = [
|
||||
{
|
||||
"Name": "avro",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user