mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-06 04:26:57 +00:00
GEN-1493 - Fix paginate_es in opensearch (#17858)
* GEN-1493 - Fix opensearch pagination * GEN-1494 - Add CI for py-tests with Postgres and Opensearch * GEN-1494 - Add CI for py-tests with Postgres and Opensearch
This commit is contained in:
parent
49323ed5bc
commit
6a1cd0ef8b
2
.github/workflows/py-tests-postgres.yml
vendored
2
.github/workflows/py-tests-postgres.yml
vendored
@ -9,7 +9,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
name: py-tests
|
||||
name: py-tests-postgres
|
||||
on:
|
||||
workflow_dispatch:
|
||||
push:
|
||||
|
||||
@ -63,7 +63,6 @@ services:
|
||||
retries: 10
|
||||
volumes:
|
||||
- es-data:/usr/share/elasticsearch/data
|
||||
|
||||
|
||||
execute-migrate-all:
|
||||
build:
|
||||
|
||||
@ -331,19 +331,9 @@ class ESMixin(Generic[T]):
|
||||
else:
|
||||
break
|
||||
|
||||
# Get the data
|
||||
for hit in response.hits.hits:
|
||||
try:
|
||||
yield self.get_by_name(
|
||||
entity=entity,
|
||||
fqn=hit.source["fullyQualifiedName"],
|
||||
fields=fields,
|
||||
nullable=False, # Raise an error if we don't find the Entity
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
f"Error while getting {hit.source['fullyQualifiedName']} - {exc}"
|
||||
)
|
||||
yield from self._yield_hits_from_api(
|
||||
response=response, entity=entity, fields=fields
|
||||
)
|
||||
|
||||
# Get next page
|
||||
last_hit = response.hits.hits[-1] if response.hits.hits else None
|
||||
@ -362,3 +352,20 @@ class ESMixin(Generic[T]):
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Error while getting ES response: {exc}")
|
||||
return None
|
||||
|
||||
def _yield_hits_from_api(
|
||||
self, response: ESResponse, entity: Type[T], fields: Optional[List[str]]
|
||||
) -> Iterator[T]:
|
||||
"""Get the data from the API based on ES responses"""
|
||||
for hit in response.hits.hits:
|
||||
try:
|
||||
yield self.get_by_name(
|
||||
entity=entity,
|
||||
fqn=hit.source["fullyQualifiedName"],
|
||||
fields=fields,
|
||||
nullable=False, # Raise an error if we don't find the Entity
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
f"Error while getting {hit.source['fullyQualifiedName']} - {exc}"
|
||||
)
|
||||
|
||||
@ -300,7 +300,6 @@ class OMetaESTest(TestCase):
|
||||
res = self.metadata.es_get_queries_with_lineage(self.service.name.root)
|
||||
self.assertIn(self.checksum, res)
|
||||
|
||||
@pytest.skip("This never finished with Opensearch", allow_module_level=True)
|
||||
def test_paginate_no_filter(self):
|
||||
"""We can paginate all the data"""
|
||||
# Since the test can run in parallel with other tables being there, we just
|
||||
@ -309,7 +308,6 @@ class OMetaESTest(TestCase):
|
||||
assert asset
|
||||
break
|
||||
|
||||
@pytest.skip("This never finished with Opensearch", allow_module_level=True)
|
||||
def test_paginate_with_errors(self):
|
||||
"""We don't want to stop the ES yields just because a single Entity has an error"""
|
||||
# 1. First, prepare some tables
|
||||
|
||||
@ -385,6 +385,10 @@ public class OpenSearchClient implements SearchClient {
|
||||
}
|
||||
}
|
||||
|
||||
if (!nullOrEmpty(request.getSearchAfter())) {
|
||||
searchSourceBuilder.searchAfter(request.getSearchAfter());
|
||||
}
|
||||
|
||||
/* For backward-compatibility we continue supporting the deleted argument, this should be removed in future versions */
|
||||
if (request
|
||||
.getIndex()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user