fix(ingest/unity): Use assigned metastore if not metastore listed in unity catalog (#7446)

This commit is contained in:
Tamas Nemeth 2023-02-28 08:06:28 +01:00 committed by GitHub
parent d54e3b81cf
commit 62e33e03a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 3 deletions

View File

@ -134,6 +134,13 @@ class UnityCatalogApiProxy:
self._unity_catalog_api.list_metastores()
return True
def assigned_metastore(self) -> Optional[Metastore]:
response: dict = self._unity_catalog_api.get_metastore_summary()
if response.get("metastore_id") is None:
logger.info("Not found assigned metastore")
return None
return self._create_metastore(response)
def metastores(self) -> Iterable[Metastore]:
response: dict = self._unity_catalog_api.list_metastores()
if response.get("metastores") is None:

View File

@ -1,6 +1,6 @@
import logging
import re
from typing import Iterable, List, Optional
from typing import Dict, Iterable, List, Optional
from datahub.emitter.mce_builder import (
make_data_platform_urn,
@ -172,7 +172,14 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
yield from self.process_metastores()
def process_metastores(self) -> Iterable[MetadataWorkUnit]:
metastores: Dict[str, Metastore] = {}
assigned_metastore = self.unity_catalog_api_proxy.assigned_metastore()
if assigned_metastore:
metastores[assigned_metastore.metastore_id] = assigned_metastore
for metastore in self.unity_catalog_api_proxy.metastores():
metastores[metastore.metastore_id] = metastore
for metastore in metastores.values():
if not self.config.metastore_id_pattern.allowed(metastore.metastore_id):
self.report.metastores.dropped(metastore.metastore_id)
continue

View File

@ -49,7 +49,24 @@ def register_mock_data(unity_catalog_api_instance):
}
]
}
unity_catalog_api_instance.get_metastore_summary.return_value = {
"name": "acryl metastore",
"storage_root": "s3://db-9063-5b7e3b6d3736-s3-root-bucket/metastore/2c983545-d403-4f87-9063-5b7e3b6d3736",
"default_data_access_config_id": "9a9bacc4-cd82-409f-b55c-8ad1b2bde8da",
"storage_root_credential_id": "9a9bacc4-cd82-409f-b55c-8ad1b2bde8da",
"storage_root_credential_name": "2c983545-d403-4f87-9063-5b7e3b6d3736-data-access-config-1666185153576",
"delta_sharing_scope": "INTERNAL",
"owner": "abc@acryl.io",
"privilege_model_version": "1.0",
"region": "us-west-1",
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"created_at": 1666185153375,
"created_by": "abc@acryl.io",
"updated_at": 1666185154797,
"updated_by": "abc@acryl.io",
"cloud": "aws",
"global_metastore_id": "aws:us-west-1:2c983545-d403-4f87-9063-5b7e3b6d3736",
}
unity_catalog_api_instance.list_catalogs.return_value = {
"catalogs": [
{
@ -132,7 +149,6 @@ def register_mock_data(unity_catalog_api_instance):
},
]
}
unity_catalog_api_instance.list_tables.return_value = {
"tables": [
{