mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-07 13:07:22 +00:00
parent
0190456f3d
commit
2936c0e4e3
@ -13,7 +13,6 @@ Mixin class containing Lineage specific methods
|
||||
|
||||
To be used by OpenMetadata class
|
||||
"""
|
||||
import time
|
||||
from typing import Generic, List, Optional, Type, TypeVar
|
||||
|
||||
from pydantic import BaseModel
|
||||
@ -60,44 +59,12 @@ class ESMixin(Generic[T]):
|
||||
|
||||
return None
|
||||
|
||||
def _search_es_entity_retry(
|
||||
self, entity_type: Type[T], query_string: str, retries: int = 3
|
||||
) -> Optional[List[T]]:
|
||||
"""
|
||||
Run the ES query `retries` times if the results are None.
|
||||
|
||||
It might be because the index has not yet been updated.
|
||||
|
||||
:param entity_type: Entity to look for
|
||||
:param query_string: Query to run
|
||||
:param retries: Times to retry
|
||||
:return: List of Entities or None
|
||||
"""
|
||||
times = max(1, retries) # Try at least once
|
||||
while times:
|
||||
entity_list = self._search_es_entity(
|
||||
entity_type=entity_type, query_string=query_string
|
||||
)
|
||||
if entity_list:
|
||||
return entity_list
|
||||
|
||||
logger.debug(
|
||||
f"Could not find any entities for ES query {query_string}. Will retry in 1 second..."
|
||||
)
|
||||
|
||||
times -= 1
|
||||
if times: # Only wait if we have another iteration coming
|
||||
time.sleep(1)
|
||||
|
||||
return None
|
||||
|
||||
def es_search_from_fqn(
|
||||
self,
|
||||
entity_type: Type[T],
|
||||
fqn_search_string: str,
|
||||
from_count: int = 0,
|
||||
size: int = 10,
|
||||
retries: int = 3,
|
||||
) -> Optional[List[T]]:
|
||||
"""
|
||||
Given a service_name and some filters, search for entities using ES
|
||||
@ -106,7 +73,6 @@ class ESMixin(Generic[T]):
|
||||
:param fqn_search_string: string used to search by FQN. E.g., service.*.schema.table
|
||||
:param from_count: Records to expect
|
||||
:param size: Number of records
|
||||
:param retries: Number of retries for the ES query
|
||||
:return: List of entities
|
||||
"""
|
||||
query_string = self.fqdn_search.format(
|
||||
@ -117,9 +83,11 @@ class ESMixin(Generic[T]):
|
||||
)
|
||||
|
||||
try:
|
||||
return self._search_es_entity_retry(
|
||||
entity_type=entity_type, query_string=query_string, retries=retries
|
||||
entity_list = self._search_es_entity(
|
||||
entity_type=entity_type, query_string=query_string
|
||||
)
|
||||
if entity_list:
|
||||
return entity_list
|
||||
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
|
||||
@ -124,7 +124,6 @@ def _(
|
||||
database_name: Optional[str],
|
||||
schema_name: Optional[str],
|
||||
table_name: str,
|
||||
retries: int = 3,
|
||||
fetch_multiple_entities: bool = False,
|
||||
) -> Union[Optional[str], Optional[List[str]]]:
|
||||
"""
|
||||
@ -134,7 +133,6 @@ def _(
|
||||
:param database_name: DB name or None
|
||||
:param schema_name: Schema name or None
|
||||
:param table_name: Table name
|
||||
:param retries: ES Search retries
|
||||
:return:
|
||||
"""
|
||||
if not service_name or not table_name:
|
||||
@ -151,7 +149,6 @@ def _(
|
||||
es_result = metadata.es_search_from_fqn(
|
||||
entity_type=Table,
|
||||
fqn_search_string=fqn_search_string,
|
||||
retries=retries,
|
||||
)
|
||||
entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result(
|
||||
entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities
|
||||
|
||||
@ -11,6 +11,8 @@
|
||||
"""
|
||||
OMeta ES Mixin integration tests. The API needs to be up
|
||||
"""
|
||||
import logging
|
||||
import time
|
||||
from unittest import TestCase
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
@ -62,6 +64,25 @@ class OMetaESTest(TestCase):
|
||||
)
|
||||
service_type = "databaseService"
|
||||
|
||||
@classmethod
|
||||
def check_es_index(cls) -> None:
|
||||
"""
|
||||
Wait until the index has been updated with the test table.
|
||||
"""
|
||||
logging.info("Checking ES index status...")
|
||||
tries = 0
|
||||
|
||||
res = None
|
||||
while not res and tries <= 5: # Kill in 5 seconds
|
||||
|
||||
res = cls.metadata.es_search_from_fqn(
|
||||
entity_type=Table,
|
||||
fqn_search_string="test-service-es.test-db-es.test-schema-es.test-es",
|
||||
)
|
||||
if not res:
|
||||
tries += 1
|
||||
time.sleep(1)
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
"""
|
||||
@ -99,6 +120,9 @@ class OMetaESTest(TestCase):
|
||||
|
||||
cls.entity = cls.metadata.create_or_update(create)
|
||||
|
||||
# Leave some time for indexes to get updated, otherwise this happens too fast
|
||||
cls.check_es_index()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
"""
|
||||
@ -131,7 +155,6 @@ class OMetaESTest(TestCase):
|
||||
entity_type=Table,
|
||||
fqn_search_string=fqn_search_string,
|
||||
size=100,
|
||||
retries=10,
|
||||
)
|
||||
|
||||
# We get the created table back
|
||||
@ -149,7 +172,6 @@ class OMetaESTest(TestCase):
|
||||
entity_type=Table,
|
||||
fqn_search_string=fqn_search_string,
|
||||
size=100,
|
||||
retries=10,
|
||||
)
|
||||
|
||||
self.assertIsNotNone(res)
|
||||
@ -166,7 +188,6 @@ class OMetaESTest(TestCase):
|
||||
entity_type=Table,
|
||||
fqn_search_string=fqn_search_string,
|
||||
size=100,
|
||||
retries=10,
|
||||
)
|
||||
|
||||
self.assertIsNotNone(res)
|
||||
@ -179,7 +200,6 @@ class OMetaESTest(TestCase):
|
||||
res = self.metadata.es_search_from_fqn(
|
||||
entity_type=Table,
|
||||
fqn_search_string="random",
|
||||
retries=1,
|
||||
)
|
||||
|
||||
self.assertIsNone(res)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user