From 2936c0e4e357e9ec9ef06349642418cfb662265b Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 20 Jul 2022 12:44:50 +0200 Subject: [PATCH] Remove retry in ES query (#6191) Remove retry in ES query (#6191) --- .../ingestion/ometa/mixins/es_mixin.py | 40 ++----------------- ingestion/src/metadata/utils/fqn.py | 3 -- .../integration/ometa/test_ometa_es_api.py | 28 +++++++++++-- 3 files changed, 28 insertions(+), 43 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 8e5299eb2a9..a37f0c4d4c0 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -13,7 +13,6 @@ Mixin class containing Lineage specific methods To be used by OpenMetadata class """ -import time from typing import Generic, List, Optional, Type, TypeVar from pydantic import BaseModel @@ -60,44 +59,12 @@ class ESMixin(Generic[T]): return None - def _search_es_entity_retry( - self, entity_type: Type[T], query_string: str, retries: int = 3 - ) -> Optional[List[T]]: - """ - Run the ES query `retries` times if the results are None. - - It might be because the index has not yet been updated. - - :param entity_type: Entity to look for - :param query_string: Query to run - :param retries: Times to retry - :return: List of Entities or None - """ - times = max(1, retries) # Try at least once - while times: - entity_list = self._search_es_entity( - entity_type=entity_type, query_string=query_string - ) - if entity_list: - return entity_list - - logger.debug( - f"Could not find any entities for ES query {query_string}. Will retry in 1 second..." - ) - - times -= 1 - if times: # Only wait if we have another iteration coming - time.sleep(1) - - return None - def es_search_from_fqn( self, entity_type: Type[T], fqn_search_string: str, from_count: int = 0, size: int = 10, - retries: int = 3, ) -> Optional[List[T]]: """ Given a service_name and some filters, search for entities using ES @@ -106,7 +73,6 @@ class ESMixin(Generic[T]): :param fqn_search_string: string used to search by FQN. E.g., service.*.schema.table :param from_count: Records to expect :param size: Number of records - :param retries: Number of retries for the ES query :return: List of entities """ query_string = self.fqdn_search.format( @@ -117,9 +83,11 @@ class ESMixin(Generic[T]): ) try: - return self._search_es_entity_retry( - entity_type=entity_type, query_string=query_string, retries=retries + entity_list = self._search_es_entity( + entity_type=entity_type, query_string=query_string ) + if entity_list: + return entity_list except KeyError: logger.warning( diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index ababab1b2b2..35e0b897612 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -124,7 +124,6 @@ def _( database_name: Optional[str], schema_name: Optional[str], table_name: str, - retries: int = 3, fetch_multiple_entities: bool = False, ) -> Union[Optional[str], Optional[List[str]]]: """ @@ -134,7 +133,6 @@ def _( :param database_name: DB name or None :param schema_name: Schema name or None :param table_name: Table name - :param retries: ES Search retries :return: """ if not service_name or not table_name: @@ -151,7 +149,6 @@ def _( es_result = metadata.es_search_from_fqn( entity_type=Table, fqn_search_string=fqn_search_string, - retries=retries, ) entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result( entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities diff --git a/ingestion/tests/integration/ometa/test_ometa_es_api.py b/ingestion/tests/integration/ometa/test_ometa_es_api.py index 69d8ac86192..7befb0dc30f 100644 --- a/ingestion/tests/integration/ometa/test_ometa_es_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_es_api.py @@ -11,6 +11,8 @@ """ OMeta ES Mixin integration tests. The API needs to be up """ +import logging +import time from unittest import TestCase from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest @@ -62,6 +64,25 @@ class OMetaESTest(TestCase): ) service_type = "databaseService" + @classmethod + def check_es_index(cls) -> None: + """ + Wait until the index has been updated with the test table. + """ + logging.info("Checking ES index status...") + tries = 0 + + res = None + while not res and tries <= 5: # Kill in 5 seconds + + res = cls.metadata.es_search_from_fqn( + entity_type=Table, + fqn_search_string="test-service-es.test-db-es.test-schema-es.test-es", + ) + if not res: + tries += 1 + time.sleep(1) + @classmethod def setUpClass(cls) -> None: """ @@ -99,6 +120,9 @@ class OMetaESTest(TestCase): cls.entity = cls.metadata.create_or_update(create) + # Leave some time for indexes to get updated, otherwise this happens too fast + cls.check_es_index() + @classmethod def tearDownClass(cls) -> None: """ @@ -131,7 +155,6 @@ class OMetaESTest(TestCase): entity_type=Table, fqn_search_string=fqn_search_string, size=100, - retries=10, ) # We get the created table back @@ -149,7 +172,6 @@ class OMetaESTest(TestCase): entity_type=Table, fqn_search_string=fqn_search_string, size=100, - retries=10, ) self.assertIsNotNone(res) @@ -166,7 +188,6 @@ class OMetaESTest(TestCase): entity_type=Table, fqn_search_string=fqn_search_string, size=100, - retries=10, ) self.assertIsNotNone(res) @@ -179,7 +200,6 @@ class OMetaESTest(TestCase): res = self.metadata.es_search_from_fqn( entity_type=Table, fqn_search_string="random", - retries=1, ) self.assertIsNone(res)