GEN-1166 - Improve Ingestion Workflow Error Summary (#18280)

* GEN-1166 - Improve Ingestion Workflow Error Summary

* fix test

* docs

* comments
This commit is contained in:
Pere Miquel Brull 2024-10-16 18:15:50 +02:00 committed by GitHub
parent 89b6c1c1cd
commit 7012e73d75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 83 additions and 169 deletions

View File

@ -14,10 +14,6 @@ Generic Workflow entrypoint to execute Applications
from abc import ABC, abstractmethod
from typing import List, Optional
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
@ -25,13 +21,11 @@ from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.application import (
OpenMetadataApplicationConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.ingestion.api.step import Step, Summary
from metadata.ingestion.api.step import Step
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.importer import import_from_module
from metadata.utils.logger import ingestion_logger
from metadata.workflow.base import BaseWorkflow
from metadata.workflow.workflow_status_mixin import SUCCESS_THRESHOLD_VALUE
logger = ingestion_logger()
@ -92,15 +86,9 @@ class ApplicationWorkflow(BaseWorkflow, ABC):
# Applications are associated to the OpenMetadata Service
self.service_type: ServiceType = ServiceType.Metadata
metadata_config: OpenMetadataConnection = (
self.config.workflowConfig.openMetadataServerConfig
)
log_level: LogLevels = self.config.workflowConfig.loggerLevel
super().__init__(
config=self.config,
log_level=log_level,
metadata_config=metadata_config,
workflow_config=self.workflow_config,
service_type=self.service_type,
)
@ -134,26 +122,8 @@ class ApplicationWorkflow(BaseWorkflow, ABC):
"""Workflow-specific logic to execute safely"""
self.runner.run()
def calculate_success(self) -> float:
return self.runner.get_status().calculate_success()
def get_failures(self) -> List[StackTraceError]:
return self.workflow_steps()[0].get_status().failures
def workflow_steps(self) -> List[Step]:
return [self.runner]
def raise_from_status_internal(self, raise_warnings=False):
"""Check failed status in the runner"""
if (
self.runner.get_status().failures
and self.calculate_success() < SUCCESS_THRESHOLD_VALUE
):
raise WorkflowExecutionError(
f"{self.runner.name} reported errors: {Summary.from_step(self.runner)}"
)
if raise_warnings and self.runner.get_status().warnings:
raise WorkflowExecutionError(
f"{self.runner.name} reported warning: {Summary.from_step(self.runner)}"
)

View File

@ -16,8 +16,10 @@ import traceback
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
from statistics import mean
from typing import Any, Dict, List, Optional, TypeVar, Union
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
CreateIngestionPipelineRequest,
)
@ -32,10 +34,13 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels,
WorkflowConfig,
)
from metadata.generated.schema.tests.testSuite import ServiceType
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.step import Step, Summary
from metadata.ingestion.ometa.client_utils import create_ometa_client
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.timer.repeated_timer import RepeatedTimer
@ -49,10 +54,7 @@ from metadata.utils.execution_time_tracker import ExecutionTimeTracker
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger, set_loggers_level
from metadata.workflow.workflow_output_handler import WorkflowOutputHandler
from metadata.workflow.workflow_status_mixin import (
SUCCESS_THRESHOLD_VALUE,
WorkflowStatusMixin,
)
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
logger = ingestion_logger()
@ -82,8 +84,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
def __init__(
self,
config: Union[Any, Dict],
log_level: LogLevels,
metadata_config: OpenMetadataConnection,
workflow_config: WorkflowConfig,
service_type: ServiceType,
output_handler: WorkflowOutputHandler = WorkflowOutputHandler(),
):
@ -92,19 +93,22 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
"""
self.output_handler = output_handler
self.config = config
self.workflow_config = workflow_config
self.service_type = service_type
self._timer: Optional[RepeatedTimer] = None
self._ingestion_pipeline: Optional[IngestionPipeline] = None
self._start_ts = datetime_to_ts(datetime.now())
self._execution_time_tracker = ExecutionTimeTracker(
log_level == LogLevels.DEBUG
self.workflow_config.loggerLevel == LogLevels.DEBUG
)
set_loggers_level(log_level.value)
set_loggers_level(self.workflow_config.loggerLevel.value)
# We create the ometa client at the workflow level and pass it to the steps
self.metadata_config = metadata_config
self.metadata = create_ometa_client(metadata_config)
self.metadata = create_ometa_client(
self.workflow_config.openMetadataServerConfig
)
self.set_ingestion_pipeline_status(state=PipelineState.running)
self.post_init()
@ -157,9 +161,22 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
def execute_internal(self) -> None:
"""Workflow-specific logic to execute safely"""
@abstractmethod
def calculate_success(self) -> float:
"""Get the success % of the internal execution"""
def calculate_success(self) -> Optional[float]:
"""
Get the success % of the internal execution.
Since we'll use this to get a single success % from multiple steps, we'll take
the minimum success % from all the steps. This way, we can have a proper
workflow status.
E.g., if we have no errors on the source but a bunch of them on the sink,
we still want the flow to be marked as a failure or partial success.
"""
if not self.workflow_steps():
logger.warning("No steps to calculate success")
return None
return mean(
[step.get_status().calculate_success() for step in self.workflow_steps()]
)
@abstractmethod
def get_failures(self) -> List[StackTraceError]:
@ -169,9 +186,22 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
def workflow_steps(self) -> List[Step]:
"""Steps to report status from"""
@abstractmethod
def raise_from_status_internal(self, raise_warnings=False) -> None:
"""Based on the internal workflow status, raise a WorkflowExecutionError"""
for step in self.workflow_steps():
if (
step.get_status().failures
and step.get_status().calculate_success()
< self.workflow_config.successThreshold
):
raise WorkflowExecutionError(
f"{step.name} reported errors: {Summary.from_step(step)}"
)
if raise_warnings and step.status.warnings:
raise WorkflowExecutionError(
f"{step.name} reported warning: {Summary.from_step(step)}"
)
def execute(self) -> None:
"""
@ -186,7 +216,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
try:
self.execute_internal()
if SUCCESS_THRESHOLD_VALUE <= self.calculate_success() < 100:
if self.workflow_config.successThreshold <= self.calculate_success() < 100:
pipeline_state = PipelineState.partialSuccess
# Any unhandled exception breaking the workflow should update the status

View File

@ -24,9 +24,6 @@ from abc import ABC, abstractmethod
from typing import List, Tuple, Type, cast
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
@ -38,7 +35,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.step import Step, Summary
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage
from metadata.ingestion.models.custom_types import ServiceWithConnectionType
from metadata.profiler.api.models import ProfilerProcessorConfig
@ -55,14 +52,15 @@ from metadata.utils.importer import (
)
from metadata.utils.logger import ingestion_logger
from metadata.workflow.base import BaseWorkflow, InvalidWorkflowJSONException
from metadata.workflow.workflow_status_mixin import SUCCESS_THRESHOLD_VALUE
logger = ingestion_logger()
class IngestionWorkflow(BaseWorkflow, ABC):
"""
Base Ingestion Workflow implementation
Base Ingestion Workflow implementation. This is used for all
workflows minus the application one, which directly inherits the
BaseWorkflow.
"""
config: OpenMetadataWorkflowConfig
@ -79,14 +77,9 @@ class IngestionWorkflow(BaseWorkflow, ABC):
self.config.source.type
)
metadata_config: OpenMetadataConnection = (
self.config.workflowConfig.openMetadataServerConfig
)
super().__init__(
config=config,
log_level=config.workflowConfig.loggerLevel,
metadata_config=metadata_config,
workflow_config=config.workflowConfig,
service_type=self.service_type,
)
@ -137,37 +130,12 @@ class IngestionWorkflow(BaseWorkflow, ABC):
if bulk_sink:
bulk_sink.run()
def calculate_success(self) -> float:
return self.source.get_status().calculate_success()
def get_failures(self) -> List[StackTraceError]:
return self.source.get_status().failures
def workflow_steps(self) -> List[Step]:
return [self.source] + list(self.steps)
def raise_from_status_internal(self, raise_warnings=False):
"""
Check the status of all steps
"""
if (
self.source.get_status().failures
and self.calculate_success() < SUCCESS_THRESHOLD_VALUE
):
raise WorkflowExecutionError(
f"{self.source.name} reported errors: {Summary.from_step(self.source)}"
)
for step in self.steps:
if step.status.failures:
raise WorkflowExecutionError(
f"{step.name} reported errors: {Summary.from_step(step)}"
)
if raise_warnings and step.status.warnings:
raise WorkflowExecutionError(
f"{step.name} reported warnings: {Summary.from_step(step)}"
)
def _retrieve_service_connection_if_needed(self, service_type: ServiceType) -> None:
"""
We override the current `serviceConnection` source config object if source workflow service already exists

View File

@ -14,6 +14,7 @@ Module handles the output messages from different workflows
"""
import time
from statistics import mean
from typing import Any, Dict, List, Optional, Type, Union
from pydantic import BaseModel
@ -114,16 +115,15 @@ class WorkflowOutputHandler:
self._print_summary(steps)
def _print_summary(self, steps: List[Step]):
def _print_summary(self, steps: List[Step]) -> None:
failures: List[Failure] = []
total_records: int = 0
total_errors: int = 0
if not steps:
log_ansi_encoded_string(message="No steps to process.")
return
for step in steps:
step_summary = Summary.from_step(step)
total_records += step_summary.records or 0
total_errors += step_summary.errors or 0
failures.append(
Failure(name=step.name, failures=step.get_status().failures)
)
@ -141,15 +141,18 @@ class WorkflowOutputHandler:
log_ansi_encoded_string(message=f"Filtered: {step_summary.filtered}")
log_ansi_encoded_string(message=f"Errors: {step_summary.errors}")
log_ansi_encoded_string(
message=f"Success %: {step.get_status().calculate_success()}"
)
self._print_failures_if_apply(failures)
total_success = max(total_records, 1)
# If nothing is processed, we'll have a success of 100%
success_pct = mean([step.get_status().calculate_success() for step in steps])
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message="Success %: "
+ f"{round(total_success * 100 / (total_success + total_errors), 2)}",
message="Workflow Success %: " + f"{round(success_pct, 2)}",
)
def _print_debug_summary(self, steps: List[Step]):

View File

@ -37,8 +37,6 @@ from metadata.utils.logger import ometa_logger
logger = ometa_logger()
SUCCESS_THRESHOLD_VALUE = 90
class WorkflowResultStatus(Enum):
SUCCESS = 0

View File

@ -26,9 +26,6 @@ from metadata.generated.schema.entity.data.table import (
Table,
TableProfilerConfig,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
@ -36,6 +33,7 @@ from metadata.generated.schema.entity.services.databaseService import (
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.profiler.api.models import ProfilerProcessorConfig
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
@ -122,7 +120,7 @@ def test_init_workflow(mocked_method, mocked_orm): # pylint: disable=unused-arg
mocked_method.assert_called()
assert isinstance(workflow.source.source_config, DatabaseServiceProfilerPipeline)
assert isinstance(workflow.metadata_config, OpenMetadataConnection)
assert isinstance(workflow.workflow_config, WorkflowConfig)
profiler_processor_step = workflow.steps[0]
assert isinstance(profiler_processor_step.profiler_config, ProfilerProcessorConfig)

View File

@ -113,75 +113,14 @@ We believe this update will bring greater consistency and clarity to our version
# Backward Incompatible Changes
## 1.5.0
## 1.6.0
### Multi Owners
OpenMetadata allows a single user or a team to be tagged as owners for any data assets. In Release 1.5.0, we allow users to tag multiple individual owners or a single team. This will allow organizations to add ownership to multiple individuals without necessarily needing to create a team around them like previously.
### Ingestion Workflow Status
This is a backward incompatible change, if you are using APIs, please make sure the owner field is now changed to “owners”
We are updating how we compute the success percentage. Previously, we took into account for partial success the results
of the Source (e.g., the tables we were able to properly retrieve from Snowflake, Redshift, etc.). This means that we had
an error threshold in there were if up to 90% of the tables were successfully ingested, we would still consider the
workflow as successful. However, any errors when sending the information to OpenMetadata would be considered as a failure.
### Import/Export Format
To support the multi-owner format, we have now changed how we export and import the CSV file in glossary, services, database, schema, table, etc. The new format will be
user:userName;team:TeamName
If you are importing an older file, please make sure to make this change.
### Pydantic V2
The core of OpenMetadata are the JSON Schemas that define the metadata standard. These schemas are automatically translated into Java, Typescript, and Python code with Pydantic classes.
In this release, we have [migrated](https://docs.pydantic.dev/latest/migration/) the codebase from Pydantic V1 to Pydantic V2.
### Deployment Related Changes (OSS only)
`./bootstrap/bootstrap_storage.sh` **removed**
OpenMetadata community has built rolling upgrades to database schema and the data to make upgrades easier. This tool is now called as ./bootstrap/openmetadata-ops.sh and has been part of our releases since 1.3. The `bootstrap_storage.sh` doesnt support new native schemas in OpenMetadata. Hence, we have deleted this tool from this release.
While upgrading, please refer to our Upgrade Notes in the documentation. Always follow the best practices provided there.
### Database Connection Pooling
OpenMetadata uses Jdbi to handle database-related operations such as read/write/delete. In this release, we introduced additional configs to help with connection pooling, allowing the efficient use of a database with low resources.
Please update the defaults if your cluster is running at a large scale to scale up the connections efficiently.
For the new configuration, please refer to the [doc](https://docs.open-metadata.org/latest/deployment/database-connection-pooling) here
### Data Insights
The Data Insights application is meant to give you a quick glance at your data's state and allow you to take action based on the information you receive. To continue pursuing this objective, the application was completely refactored to allow customizability.
Part of this refactor was making Data Insights an internal application, no longer relying on an external pipeline. This means triggering Data Insights from the Python SDK will no longer be possible.
With this change you will need to run a backfill on the Data Insights for the last couple of days since the Data Assets data changed.
### UI Changes
#### New Explore Page
Explore page displays hierarchically organized data assets by grouping them into `services > database > schema > tables/stored procedures`. This helps users organically find the data asset they are looking for based on a known database or schema they were using. This is a new feature and changes the way the Explore page was built in previous releases.
#### Connector Schema Changes
In the latest release, several updates and enhancements have been made to the JSON schema across various connectors. These changes aim to improve security, configurability, and expand integration capabilities. Here's a detailed breakdown of the updates:
- **KafkaConnect**: Added `schemaRegistryTopicSuffixName` to enhance topic configuration flexibility for schema registries.
- **GCS Datalake**: Introduced `bucketNames` field, allowing users to specify targeted storage buckets within the Google Cloud Storage environment.
- **OpenLineage**: Added `saslConfig` to enhance security by enabling SASL (Simple Authentication and Security Layer) configuration.
- **Salesforce**: Added sslConfig to strengthen the security layer for Salesforce connections by supporting SSL.
- **DeltaLake**: Updated schema by moving metastoreConnection to a newly created `metastoreConfig.json` file. Additionally, introduced `configSource` to better define source configurations, with new support for `metastoreConfig.json` and `storageConfig.json`.
- **Iceberg RestCatalog**: Removed clientId and `clientSecret` as mandatory fields, making the schema more flexible for different authentication methods.
- **DBT Cloud Pipelines**: Added as a new connector to support cloud-native data transformation workflows using DBT.
- **Looker**: Expanded support to include connections using GitLab integration, offering more flexible and secure version control.
- **Tableau**: Enhanced support by adding capabilities for connecting with `TableauPublishedDatasource` and `TableauEmbeddedDatasource`, providing more granular control over data visualization and reporting.
### Include DDL
During the Database Metadata ingestion, we can optionally pick up the DDL for both tables and views. During the metadata ingestion, we use the view DDLs to generate the View Lineage.
To reduce the processing time for out-of-the-box workflows, we are disabling the include DDL by default, whereas before, it was enabled, which potentially led to long-running workflows.
### Secrets Manager
Starting with the release 1.5.0, the JWT Token for the bots will be sent to the Secrets Manager if you configured one. It won't appear anymore in your dag_generated_configs in Airflow.
### Python SDK
The `metadata insight` command has been removed. Since Data Insights application was moved to be an internal system application instead of relying on external pipelines the SDK command to run the pipeline was removed.
Now, we're changing this behavior to consider the success rate of all the steps involved in the workflow. The UI will
then show more `Partial Success` statuses rather than `Failed`, properly reflecting the real state of the workflow.

View File

@ -163,6 +163,14 @@
"$ref": "#/definitions/logLevels",
"default": "INFO"
},
"successThreshold": {
"title": "Success Threshold",
"description": "The percentage of successfully processed records that must be achieved for the pipeline to be considered successful. Otherwise, the pipeline will be marked as failed.",
"type": "integer",
"default": 90,
"minimum": 0,
"maximum": 100
},
"openMetadataServerConfig": {
"$ref": "../entity/services/connections/metadata/openMetadataConnection.json"
},