diff --git a/ingestion/tests/integration/integration_base.py b/ingestion/tests/integration/integration_base.py index d54c9e19fff..6cb82487792 100644 --- a/ingestion/tests/integration/integration_base.py +++ b/ingestion/tests/integration/integration_base.py @@ -99,9 +99,24 @@ from metadata.generated.schema.type.basic import ( Markdown, TestCaseEntityName, ) +from metadata.generated.schema.type.tagLabel import ( + LabelType, + State, + TagFQN, + TagLabel, + TagSource, +) from metadata.ingestion.ometa.ometa_api import C, T from metadata.utils.dispatch import class_register +TIER1_TAG: TagLabel = TagLabel( + tagFQN=TagFQN(f"Tier.Tier1"), + name="Tier1", + source=TagSource.Classification, + labelType=LabelType.Automated, + state=State.Suggested, +) + COLUMNS = [ Column(name="id", dataType=DataType.BIGINT), Column(name="another", dataType=DataType.BIGINT), diff --git a/ingestion/tests/integration/ometa/test_ometa_es_api.py b/ingestion/tests/integration/ometa/test_ometa_es_api.py index a66d57c0a9a..23eee87c10a 100644 --- a/ingestion/tests/integration/ometa/test_ometa_es_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_es_api.py @@ -14,6 +14,7 @@ OMeta ES Mixin integration tests. The API needs to be up import logging import time import uuid +from copy import deepcopy from unittest import TestCase from unittest.mock import patch @@ -52,7 +53,7 @@ from metadata.generated.schema.type.basic import EntityName, SqlQuery from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn -from ..integration_base import get_create_entity +from ..integration_base import TIER1_TAG, get_create_entity class OMetaESTest(TestCase): @@ -363,3 +364,30 @@ class OMetaESTest(TestCase): ) ) assert len(assets) == 10 + + def test_paginate_with_filters(self): + """We can paginate only tier 1 tables""" + # prepare some tables with tier 1 tags + for idx, name in enumerate([f"filtered_{i}" for i in range(10)]): + table = self.metadata.create_or_update( + data=get_create_entity( + entity=Table, + name=EntityName(name), + reference=self.create_schema_entity.fullyQualifiedName, + ) + ) + if idx % 2 == 0: + dest = deepcopy(table) + dest.tags = [TIER1_TAG] + self.metadata.patch(entity=Table, source=table, destination=dest) + + query_filter = ( + '{"query":{"bool":{"must":[{"bool":{"must":[' + '{"term":{"tier.tagFQN":"Tier.Tier1"}},' + f'{{"term":{{"service.displayName.keyword":"{self.service_entity.name.root}"}}}}' + "]}}]}}}" + ) + assets = list( + self.metadata.paginate_es(entity=Table, query_filter=query_filter, size=2) + ) + assert len(assets) == 5