FIX #16481 - Truncate ingestion pipeline status (#16997)

* FIX #16481 - Truncate ingestion pipeline status

* FIX #16481 - Truncate ingestion pipeline status

* FIX #16481 - Truncate ingestion pipeline status
This commit is contained in:
Pere Miquel Brull 2024-07-12 09:44:21 +02:00 committed by GitHub
parent 4dca66d739
commit 2aef457785
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 251 additions and 16 deletions

View File

@ -13,9 +13,9 @@ Status output utilities
"""
import pprint
import time
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from pydantic import AfterValidator, BaseModel, Field
from typing_extensions import Annotated
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
@ -27,6 +27,22 @@ from metadata.utils.logger import get_log_name, ingestion_logger
logger = ingestion_logger()
MAX_STACK_TRACE_LENGTH = 1_000_000
TruncatedStr = Annotated[
Optional[str], AfterValidator(lambda v: v[:MAX_STACK_TRACE_LENGTH] if v else None)
]
class TruncatedStackTraceError(StackTraceError):
"""
Update StackTraceError to limit the payload size,
since some connectors can make it explode
"""
error: TruncatedStr
stackTrace: TruncatedStr = None
class Status(BaseModel):
"""
Class to handle status
@ -40,7 +56,7 @@ class Status(BaseModel):
updated_records: Annotated[List[Any], Field(default_factory=list)]
warnings: Annotated[List[Any], Field(default_factory=list)]
filtered: Annotated[List[Dict[str, str]], Field(default_factory=list)]
failures: Annotated[List[StackTraceError], Field(default_factory=list)]
failures: Annotated[List[TruncatedStackTraceError], Field(default_factory=list)]
def scanned(self, record: Any) -> None:
"""
@ -74,7 +90,14 @@ class Status(BaseModel):
"""
logger.warning(error.error)
logger.debug(error.stackTrace)
self.failures.append(error)
# Truncate StackTrace to avoid payload explosion
self.failures.append(
TruncatedStackTraceError(
name=error.name,
error=error.error,
stackTrace=error.stackTrace,
)
)
def fail_all(self, failures: List[StackTraceError]) -> None:
"""

View File

@ -107,7 +107,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
self.post_init()
@property
def ingestion_pipeline(self):
def ingestion_pipeline(self) -> Optional[IngestionPipeline]:
"""Get or create the Ingestion Pipeline from the configuration"""
if not self._ingestion_pipeline and self.config.ingestionPipelineFQN:
self._ingestion_pipeline = self.get_or_create_ingestion_pipeline()

View File

@ -0,0 +1,214 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test how we create and update status in Ingestion Pipelines
"""
import json
from unittest import TestCase
import pytest
from _openmetadata_testutils.ometa import int_admin_ometa
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
PipelineState,
PipelineStatus,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
IngestionStatus,
StackTraceError,
StepSummary,
)
from metadata.ingestion.api.status import TruncatedStackTraceError
from metadata.workflow.metadata import MetadataWorkflow
from ..integration_base import (
METADATA_INGESTION_CONFIG_TEMPLATE,
generate_name,
get_create_service,
)
class OMetaTableTest(TestCase):
"""
Run this integration test with the local API available
Install the ingestion package before running the tests
"""
metadata = int_admin_ometa()
service_name = generate_name()
@classmethod
def setUpClass(cls) -> None:
"""
Prepare ingredients
"""
# Create the service entity
create_service = get_create_service(
entity=DatabaseService, name=cls.service_name
)
cls.service_entity = cls.metadata.create_or_update(data=create_service)
workflow_config = json.loads(
METADATA_INGESTION_CONFIG_TEMPLATE.format(
type="mysql",
service_name=cls.service_name,
service_config=MysqlConnection(
username="openmetadata_user",
authType=BasicAuth(
password="openmetadata_password",
),
hostPort="localhost:3306",
).model_dump_json(),
source_config={},
)
)
workflow_config["ingestionPipelineFQN"] = f"{cls.service_name}.ingestion"
cls.workflow: MetadataWorkflow = MetadataWorkflow.create(workflow_config)
# Since we won't run the full workflow, let's create the service first
# which is needed to create the ingestion
cls.metadata.get_service_or_create(
entity=DatabaseService, config=cls.workflow.config.source
)
@classmethod
def tearDownClass(cls) -> None:
"""
Clean up
"""
service_id = str(
cls.metadata.get_by_name(
entity=DatabaseService, fqn=cls.service_name
).id.root
)
cls.metadata.delete(
entity=DatabaseService,
entity_id=service_id,
recursive=True,
hard_delete=True,
)
def test_create_ingestion_pipeline(self) -> None:
"""We can create an ingestion pipeline"""
ingestion_pipeline: IngestionPipeline = self.workflow.ingestion_pipeline
assert ingestion_pipeline is not None
assert ingestion_pipeline.name.root == "ingestion"
def test_add_status(self) -> None:
"""We can add status to the ingestion pipeline"""
ingestion_pipeline: IngestionPipeline = self.workflow.ingestion_pipeline
assert ingestion_pipeline is not None
# We can send a status to the ingestion pipeline
ingestion_status = IngestionStatus(
[
StepSummary(
name="source",
failures=[
StackTraceError(
name="error",
error="error",
stackTrace="stackTrace",
)
],
)
]
)
pipeline_status: PipelineStatus = self.workflow._new_pipeline_status(
PipelineState.success
)
pipeline_status.status = ingestion_status
# Gets properly created
self.metadata.create_or_update_pipeline_status(
ingestion_pipeline.fullyQualifiedName.root, pipeline_status
)
real_pipeline_status: PipelineStatus = self.metadata.get_pipeline_status(
ingestion_pipeline.fullyQualifiedName.root, self.workflow.run_id
)
assert real_pipeline_status.pipelineState == PipelineState.success
# If the status has too long names/errors it will fail
too_long_status = IngestionStatus(
[
StepSummary(
name="source",
failures=[
StackTraceError(
name="error",
error="error" * 20_000_000,
stackTrace="stackTrace",
)
],
)
]
)
pipeline_status: PipelineStatus = self.workflow._new_pipeline_status(
PipelineState.success
)
pipeline_status.status = too_long_status
# We get a bad request error
with pytest.raises(Exception) as exc:
self.metadata.create_or_update_pipeline_status(
ingestion_pipeline.fullyQualifiedName.root, pipeline_status
)
assert (
"String value length (20049662) exceeds the maximum allowed"
in str(exc.value)
) or ("Connection aborted." in str(exc.value))
# If we truncate the status it all runs good
truncated_long_status = IngestionStatus(
[
StepSummary(
name="source",
failures=[
TruncatedStackTraceError(
name="error",
error="error" * 20_000_000,
stackTrace="stackTrace",
)
],
)
]
)
pipeline_status: PipelineStatus = self.workflow._new_pipeline_status(
PipelineState.success
)
pipeline_status.status = truncated_long_status
res = self.metadata.create_or_update_pipeline_status(
ingestion_pipeline.fullyQualifiedName.root, pipeline_status
)
assert (
res["entityFullyQualifiedName"]
== ingestion_pipeline.fullyQualifiedName.root
)

View File

@ -4,9 +4,6 @@ from typing import List
import pytest
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuiteConfigType,
TestSuitePipeline,
@ -24,6 +21,7 @@ from metadata.generated.schema.tests.basic import TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.basic import ComponentConfig
from metadata.ingestion.api.status import TruncatedStackTraceError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.data_quality import TestSuiteWorkflow
@ -90,9 +88,9 @@ def run_data_quality_workflow(
loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config
),
)
test_suite_procesor = TestSuiteWorkflow.create(workflow_config)
test_suite_procesor.execute()
test_suite_procesor.raise_from_status()
test_suite_processor = TestSuiteWorkflow.create(workflow_config)
test_suite_processor.execute()
test_suite_processor.raise_from_status()
yield
test_suite: TestSuite = metadata.get_by_name(
TestSuite, "MyTestSuite", nullable=True
@ -169,15 +167,15 @@ def test_incompatible_column_type(ingest_metadata, metadata: OpenMetadata, db_se
"openMetadataServerConfig": metadata.config.model_dump(),
},
}
test_suite_procesor = TestSuiteWorkflow.create(workflow_config)
test_suite_procesor.execute()
assert test_suite_procesor.steps[0].get_status().failures == [
StackTraceError(
test_suite_processor = TestSuiteWorkflow.create(workflow_config)
test_suite_processor.execute()
assert test_suite_processor.steps[0].get_status().failures == [
TruncatedStackTraceError(
name="Incompatible Column for Test Case",
error="Test case incompatible_column_type of type columnValueMaxToBeBetween is not compatible with column first_name of type VARCHAR",
)
], "Test case incompatible_column_type should fail"
assert (
f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer.customer_id.compatible_test"
in test_suite_procesor.steps[1].get_status().records
in test_suite_processor.steps[1].get_status().records
), "Test case compatible_test should pass"