From f571f67edafa1b93860bb554ae3adff394a6c221 Mon Sep 17 00:00:00 2001 From: Teddy Date: Mon, 4 Dec 2023 17:59:09 +0100 Subject: [PATCH] fix: DI skip on failure when listing entities (#14226) --- .../producer/web_analytics_producer.py | 1 + .../integration/ometa/test_ometa_table_api.py | 90 +++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py b/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py index d41f6099498..068b009228d 100644 --- a/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py +++ b/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py @@ -65,6 +65,7 @@ class WebAnalyticsProducer(ProducerInterface): after=after, limit=limit, fields=fields, + skip_on_failure=True, ) # type: ignore self._cache_events(after, events) return events diff --git a/ingestion/tests/integration/ometa/test_ometa_table_api.py b/ingestion/tests/integration/ometa/test_ometa_table_api.py index 47375f0c6f6..34b18e9db8b 100644 --- a/ingestion/tests/integration/ometa/test_ometa_table_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_table_api.py @@ -17,6 +17,10 @@ from copy import deepcopy from datetime import datetime, timezone from typing import List from unittest import TestCase +from unittest.mock import patch + +import pytest +from pydantic import ValidationError from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( @@ -66,8 +70,55 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.usageRequest import UsageRequest +from metadata.ingestion.ometa.client import REST from metadata.ingestion.ometa.ometa_api import OpenMetadata +BAD_RESPONSE = { + "data": [ + { + "id": "cb149dd4-f4c2-485e-acd3-74b7dca1015e", + "name": "my.fake.good.tableOne", + "columns": [ + { + "name": "col1", + "dataType": "BIGINT", + } + ], + }, + { + "id": "5d76676c-8e94-4e7e-97b8-294f4c16d0aa", + "name": "my.fake.good.tableTwo", + "columns": [ + { + "name": "col1", + "dataType": "BIGINT", + } + ], + }, + { + "id": "f063ff4e-99a3-4d42-8678-c484c2556e8d", + "name": "my.fake.bad.tableOne", + "columns": [ + { + "name": "col1", + "dataType": "BIGINT", + } + ], + "tags": [ + { + "tagFQN": "myTaghasMoreThanOneHundredAndTwentyCharactersAndItShouldBreakPydanticModelValidation.myTaghasMoreThanOneHundredAndTwentyCharactersAndItShouldBreakPydanticModelValidation", + "source": "Classification", + "labelType": "Manual", + "state": "Confirmed", + } + ], + }, + ], + "paging": { + "total": 3, + }, +} + class OMetaTableTest(TestCase): """ @@ -546,3 +597,42 @@ class OMetaTableTest(TestCase): entity=Table, fqn=table.fullyQualifiedName, fields=["tableProfilerConfig"] ) assert stored.tableProfilerConfig.profileSample == 50.0 + + def test_list_w_skip_on_failure(self): + """ + We can list all our Tables even when some of them are broken + """ + + # first validate that exception is raised when skip_on_failure is False + with patch.object(REST, "get", return_value=BAD_RESPONSE): + with pytest.raises(ValidationError): + self.metadata.list_entities(entity=Table) + + with patch.object(REST, "get", return_value=BAD_RESPONSE): + res = self.metadata.list_entities(entity=Table, skip_on_failure=True) + + # We should have 2 tables, the 3rd one is broken and should be skipped + assert len(res.entities) == 2 + + def test_list_all_w_skip_on_failure(self): + """ + Validate generator utility to fetch all tables even when some of them are broken + """ + # first validate that exception is raised when skip_on_failure is False + with patch.object(REST, "get", return_value=BAD_RESPONSE): + with pytest.raises(ValidationError): + res = self.metadata.list_all_entities( + entity=Table, + limit=1, # paginate in batches of pairs + ) + list(res) + + with patch.object(REST, "get", return_value=BAD_RESPONSE): + res = self.metadata.list_all_entities( + entity=Table, + limit=1, + skip_on_failure=True, # paginate in batches of pairs + ) + + # We should have 2 tables, the 3rd one is broken and should be skipped + assert len(list(res)) == 2