From 2aef457785d49c3c78fb1fb44cbb96bc745a2784 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 12 Jul 2024 09:44:21 +0200 Subject: [PATCH] FIX #16481 - Truncate ingestion pipeline status (#16997) * FIX #16481 - Truncate ingestion pipeline status * FIX #16481 - Truncate ingestion pipeline status * FIX #16481 - Truncate ingestion pipeline status --- .../src/metadata/ingestion/api/status.py | 31 ++- ingestion/src/metadata/workflow/base.py | 2 +- .../ometa/test_ometa_ingestion_pipeline.py | 214 ++++++++++++++++++ .../integration/postgres/test_data_quality.py | 20 +- 4 files changed, 251 insertions(+), 16 deletions(-) create mode 100644 ingestion/tests/integration/ometa/test_ometa_ingestion_pipeline.py diff --git a/ingestion/src/metadata/ingestion/api/status.py b/ingestion/src/metadata/ingestion/api/status.py index 278b744be1d..de7a647f1f5 100644 --- a/ingestion/src/metadata/ingestion/api/status.py +++ b/ingestion/src/metadata/ingestion/api/status.py @@ -13,9 +13,9 @@ Status output utilities """ import pprint import time -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional -from pydantic import BaseModel, Field +from pydantic import AfterValidator, BaseModel, Field from typing_extensions import Annotated from metadata.generated.schema.entity.services.ingestionPipelines.status import ( @@ -27,6 +27,22 @@ from metadata.utils.logger import get_log_name, ingestion_logger logger = ingestion_logger() +MAX_STACK_TRACE_LENGTH = 1_000_000 +TruncatedStr = Annotated[ + Optional[str], AfterValidator(lambda v: v[:MAX_STACK_TRACE_LENGTH] if v else None) +] + + +class TruncatedStackTraceError(StackTraceError): + """ + Update StackTraceError to limit the payload size, + since some connectors can make it explode + """ + + error: TruncatedStr + stackTrace: TruncatedStr = None + + class Status(BaseModel): """ Class to handle status @@ -40,7 +56,7 @@ class Status(BaseModel): updated_records: Annotated[List[Any], Field(default_factory=list)] warnings: Annotated[List[Any], Field(default_factory=list)] filtered: Annotated[List[Dict[str, str]], Field(default_factory=list)] - failures: Annotated[List[StackTraceError], Field(default_factory=list)] + failures: Annotated[List[TruncatedStackTraceError], Field(default_factory=list)] def scanned(self, record: Any) -> None: """ @@ -74,7 +90,14 @@ class Status(BaseModel): """ logger.warning(error.error) logger.debug(error.stackTrace) - self.failures.append(error) + # Truncate StackTrace to avoid payload explosion + self.failures.append( + TruncatedStackTraceError( + name=error.name, + error=error.error, + stackTrace=error.stackTrace, + ) + ) def fail_all(self, failures: List[StackTraceError]) -> None: """ diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index dbde3bbe1c7..b0bbe0d24e2 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -107,7 +107,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): self.post_init() @property - def ingestion_pipeline(self): + def ingestion_pipeline(self) -> Optional[IngestionPipeline]: """Get or create the Ingestion Pipeline from the configuration""" if not self._ingestion_pipeline and self.config.ingestionPipelineFQN: self._ingestion_pipeline = self.get_or_create_ingestion_pipeline() diff --git a/ingestion/tests/integration/ometa/test_ometa_ingestion_pipeline.py b/ingestion/tests/integration/ometa/test_ometa_ingestion_pipeline.py new file mode 100644 index 00000000000..1fd8472e021 --- /dev/null +++ b/ingestion/tests/integration/ometa/test_ometa_ingestion_pipeline.py @@ -0,0 +1,214 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test how we create and update status in Ingestion Pipelines +""" +import json +from unittest import TestCase + +import pytest + +from _openmetadata_testutils.ometa import int_admin_ometa +from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( + BasicAuth, +) +from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( + MysqlConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + IngestionPipeline, + PipelineState, + PipelineStatus, +) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + IngestionStatus, + StackTraceError, + StepSummary, +) +from metadata.ingestion.api.status import TruncatedStackTraceError +from metadata.workflow.metadata import MetadataWorkflow + +from ..integration_base import ( + METADATA_INGESTION_CONFIG_TEMPLATE, + generate_name, + get_create_service, +) + + +class OMetaTableTest(TestCase): + """ + Run this integration test with the local API available + Install the ingestion package before running the tests + """ + + metadata = int_admin_ometa() + service_name = generate_name() + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare ingredients + """ + # Create the service entity + create_service = get_create_service( + entity=DatabaseService, name=cls.service_name + ) + cls.service_entity = cls.metadata.create_or_update(data=create_service) + + workflow_config = json.loads( + METADATA_INGESTION_CONFIG_TEMPLATE.format( + type="mysql", + service_name=cls.service_name, + service_config=MysqlConnection( + username="openmetadata_user", + authType=BasicAuth( + password="openmetadata_password", + ), + hostPort="localhost:3306", + ).model_dump_json(), + source_config={}, + ) + ) + workflow_config["ingestionPipelineFQN"] = f"{cls.service_name}.ingestion" + cls.workflow: MetadataWorkflow = MetadataWorkflow.create(workflow_config) + + # Since we won't run the full workflow, let's create the service first + # which is needed to create the ingestion + cls.metadata.get_service_or_create( + entity=DatabaseService, config=cls.workflow.config.source + ) + + @classmethod + def tearDownClass(cls) -> None: + """ + Clean up + """ + + service_id = str( + cls.metadata.get_by_name( + entity=DatabaseService, fqn=cls.service_name + ).id.root + ) + + cls.metadata.delete( + entity=DatabaseService, + entity_id=service_id, + recursive=True, + hard_delete=True, + ) + + def test_create_ingestion_pipeline(self) -> None: + """We can create an ingestion pipeline""" + + ingestion_pipeline: IngestionPipeline = self.workflow.ingestion_pipeline + assert ingestion_pipeline is not None + assert ingestion_pipeline.name.root == "ingestion" + + def test_add_status(self) -> None: + """We can add status to the ingestion pipeline""" + + ingestion_pipeline: IngestionPipeline = self.workflow.ingestion_pipeline + assert ingestion_pipeline is not None + + # We can send a status to the ingestion pipeline + ingestion_status = IngestionStatus( + [ + StepSummary( + name="source", + failures=[ + StackTraceError( + name="error", + error="error", + stackTrace="stackTrace", + ) + ], + ) + ] + ) + + pipeline_status: PipelineStatus = self.workflow._new_pipeline_status( + PipelineState.success + ) + pipeline_status.status = ingestion_status + + # Gets properly created + self.metadata.create_or_update_pipeline_status( + ingestion_pipeline.fullyQualifiedName.root, pipeline_status + ) + + real_pipeline_status: PipelineStatus = self.metadata.get_pipeline_status( + ingestion_pipeline.fullyQualifiedName.root, self.workflow.run_id + ) + assert real_pipeline_status.pipelineState == PipelineState.success + + # If the status has too long names/errors it will fail + too_long_status = IngestionStatus( + [ + StepSummary( + name="source", + failures=[ + StackTraceError( + name="error", + error="error" * 20_000_000, + stackTrace="stackTrace", + ) + ], + ) + ] + ) + + pipeline_status: PipelineStatus = self.workflow._new_pipeline_status( + PipelineState.success + ) + pipeline_status.status = too_long_status + + # We get a bad request error + with pytest.raises(Exception) as exc: + self.metadata.create_or_update_pipeline_status( + ingestion_pipeline.fullyQualifiedName.root, pipeline_status + ) + + assert ( + "String value length (20049662) exceeds the maximum allowed" + in str(exc.value) + ) or ("Connection aborted." in str(exc.value)) + + # If we truncate the status it all runs good + truncated_long_status = IngestionStatus( + [ + StepSummary( + name="source", + failures=[ + TruncatedStackTraceError( + name="error", + error="error" * 20_000_000, + stackTrace="stackTrace", + ) + ], + ) + ] + ) + + pipeline_status: PipelineStatus = self.workflow._new_pipeline_status( + PipelineState.success + ) + pipeline_status.status = truncated_long_status + + res = self.metadata.create_or_update_pipeline_status( + ingestion_pipeline.fullyQualifiedName.root, pipeline_status + ) + + assert ( + res["entityFullyQualifiedName"] + == ingestion_pipeline.fullyQualifiedName.root + ) diff --git a/ingestion/tests/integration/postgres/test_data_quality.py b/ingestion/tests/integration/postgres/test_data_quality.py index 34996344eac..d51b42c2663 100644 --- a/ingestion/tests/integration/postgres/test_data_quality.py +++ b/ingestion/tests/integration/postgres/test_data_quality.py @@ -4,9 +4,6 @@ from typing import List import pytest from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.services.ingestionPipelines.status import ( - StackTraceError, -) from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuiteConfigType, TestSuitePipeline, @@ -24,6 +21,7 @@ from metadata.generated.schema.tests.basic import TestCaseStatus from metadata.generated.schema.tests.testCase import TestCase from metadata.generated.schema.tests.testSuite import TestSuite from metadata.generated.schema.type.basic import ComponentConfig +from metadata.ingestion.api.status import TruncatedStackTraceError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.data_quality import TestSuiteWorkflow @@ -90,9 +88,9 @@ def run_data_quality_workflow( loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config ), ) - test_suite_procesor = TestSuiteWorkflow.create(workflow_config) - test_suite_procesor.execute() - test_suite_procesor.raise_from_status() + test_suite_processor = TestSuiteWorkflow.create(workflow_config) + test_suite_processor.execute() + test_suite_processor.raise_from_status() yield test_suite: TestSuite = metadata.get_by_name( TestSuite, "MyTestSuite", nullable=True @@ -169,15 +167,15 @@ def test_incompatible_column_type(ingest_metadata, metadata: OpenMetadata, db_se "openMetadataServerConfig": metadata.config.model_dump(), }, } - test_suite_procesor = TestSuiteWorkflow.create(workflow_config) - test_suite_procesor.execute() - assert test_suite_procesor.steps[0].get_status().failures == [ - StackTraceError( + test_suite_processor = TestSuiteWorkflow.create(workflow_config) + test_suite_processor.execute() + assert test_suite_processor.steps[0].get_status().failures == [ + TruncatedStackTraceError( name="Incompatible Column for Test Case", error="Test case incompatible_column_type of type columnValueMaxToBeBetween is not compatible with column first_name of type VARCHAR", ) ], "Test case incompatible_column_type should fail" assert ( f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer.customer_id.compatible_test" - in test_suite_procesor.steps[1].get_status().records + in test_suite_processor.steps[1].get_status().records ), "Test case compatible_test should pass"