From 7012e73d7575b7efcdaebde9b1095d44065aaf3c Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 16 Oct 2024 18:15:50 +0200 Subject: [PATCH] GEN-1166 - Improve Ingestion Workflow Error Summary (#18280) * GEN-1166 - Improve Ingestion Workflow Error Summary * fix test * docs * comments --- .../src/metadata/workflow/application.py | 34 +------- ingestion/src/metadata/workflow/base.py | 64 +++++++++++---- ingestion/src/metadata/workflow/ingestion.py | 42 ++-------- .../workflow/workflow_output_handler.py | 19 +++-- .../workflow/workflow_status_mixin.py | 2 - .../tests/unit/profiler/test_workflow.py | 6 +- .../upgrade/upgrade-prerequisites.md | 77 ++----------------- .../schema/metadataIngestion/workflow.json | 8 ++ 8 files changed, 83 insertions(+), 169 deletions(-) diff --git a/ingestion/src/metadata/workflow/application.py b/ingestion/src/metadata/workflow/application.py index 15bf98e132a..17ff0654070 100644 --- a/ingestion/src/metadata/workflow/application.py +++ b/ingestion/src/metadata/workflow/application.py @@ -14,10 +14,6 @@ Generic Workflow entrypoint to execute Applications from abc import ABC, abstractmethod from typing import List, Optional -from metadata.config.common import WorkflowExecutionError -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) @@ -25,13 +21,11 @@ from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.application import ( OpenMetadataApplicationConfig, ) -from metadata.generated.schema.metadataIngestion.workflow import LogLevels -from metadata.ingestion.api.step import Step, Summary +from metadata.ingestion.api.step import Step from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.importer import import_from_module from metadata.utils.logger import ingestion_logger from metadata.workflow.base import BaseWorkflow -from metadata.workflow.workflow_status_mixin import SUCCESS_THRESHOLD_VALUE logger = ingestion_logger() @@ -92,15 +86,9 @@ class ApplicationWorkflow(BaseWorkflow, ABC): # Applications are associated to the OpenMetadata Service self.service_type: ServiceType = ServiceType.Metadata - metadata_config: OpenMetadataConnection = ( - self.config.workflowConfig.openMetadataServerConfig - ) - log_level: LogLevels = self.config.workflowConfig.loggerLevel - super().__init__( config=self.config, - log_level=log_level, - metadata_config=metadata_config, + workflow_config=self.workflow_config, service_type=self.service_type, ) @@ -134,26 +122,8 @@ class ApplicationWorkflow(BaseWorkflow, ABC): """Workflow-specific logic to execute safely""" self.runner.run() - def calculate_success(self) -> float: - return self.runner.get_status().calculate_success() - def get_failures(self) -> List[StackTraceError]: return self.workflow_steps()[0].get_status().failures def workflow_steps(self) -> List[Step]: return [self.runner] - - def raise_from_status_internal(self, raise_warnings=False): - """Check failed status in the runner""" - if ( - self.runner.get_status().failures - and self.calculate_success() < SUCCESS_THRESHOLD_VALUE - ): - raise WorkflowExecutionError( - f"{self.runner.name} reported errors: {Summary.from_step(self.runner)}" - ) - - if raise_warnings and self.runner.get_status().warnings: - raise WorkflowExecutionError( - f"{self.runner.name} reported warning: {Summary.from_step(self.runner)}" - ) diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index 87bca9565a2..e777d6731ec 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -16,8 +16,10 @@ import traceback import uuid from abc import ABC, abstractmethod from datetime import datetime +from statistics import mean from typing import Any, Dict, List, Optional, TypeVar, Union +from metadata.config.common import WorkflowExecutionError from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import ( CreateIngestionPipelineRequest, ) @@ -32,10 +34,13 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) -from metadata.generated.schema.metadataIngestion.workflow import LogLevels +from metadata.generated.schema.metadataIngestion.workflow import ( + LogLevels, + WorkflowConfig, +) from metadata.generated.schema.tests.testSuite import ServiceType from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.step import Step +from metadata.ingestion.api.step import Step, Summary from metadata.ingestion.ometa.client_utils import create_ometa_client from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.timer.repeated_timer import RepeatedTimer @@ -49,10 +54,7 @@ from metadata.utils.execution_time_tracker import ExecutionTimeTracker from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger, set_loggers_level from metadata.workflow.workflow_output_handler import WorkflowOutputHandler -from metadata.workflow.workflow_status_mixin import ( - SUCCESS_THRESHOLD_VALUE, - WorkflowStatusMixin, -) +from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin logger = ingestion_logger() @@ -82,8 +84,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): def __init__( self, config: Union[Any, Dict], - log_level: LogLevels, - metadata_config: OpenMetadataConnection, + workflow_config: WorkflowConfig, service_type: ServiceType, output_handler: WorkflowOutputHandler = WorkflowOutputHandler(), ): @@ -92,19 +93,22 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): """ self.output_handler = output_handler self.config = config + self.workflow_config = workflow_config self.service_type = service_type self._timer: Optional[RepeatedTimer] = None self._ingestion_pipeline: Optional[IngestionPipeline] = None self._start_ts = datetime_to_ts(datetime.now()) + self._execution_time_tracker = ExecutionTimeTracker( - log_level == LogLevels.DEBUG + self.workflow_config.loggerLevel == LogLevels.DEBUG ) - set_loggers_level(log_level.value) + set_loggers_level(self.workflow_config.loggerLevel.value) # We create the ometa client at the workflow level and pass it to the steps - self.metadata_config = metadata_config - self.metadata = create_ometa_client(metadata_config) + self.metadata = create_ometa_client( + self.workflow_config.openMetadataServerConfig + ) self.set_ingestion_pipeline_status(state=PipelineState.running) self.post_init() @@ -157,9 +161,22 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): def execute_internal(self) -> None: """Workflow-specific logic to execute safely""" - @abstractmethod - def calculate_success(self) -> float: - """Get the success % of the internal execution""" + def calculate_success(self) -> Optional[float]: + """ + Get the success % of the internal execution. + Since we'll use this to get a single success % from multiple steps, we'll take + the minimum success % from all the steps. This way, we can have a proper + workflow status. + E.g., if we have no errors on the source but a bunch of them on the sink, + we still want the flow to be marked as a failure or partial success. + """ + if not self.workflow_steps(): + logger.warning("No steps to calculate success") + return None + + return mean( + [step.get_status().calculate_success() for step in self.workflow_steps()] + ) @abstractmethod def get_failures(self) -> List[StackTraceError]: @@ -169,9 +186,22 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): def workflow_steps(self) -> List[Step]: """Steps to report status from""" - @abstractmethod def raise_from_status_internal(self, raise_warnings=False) -> None: """Based on the internal workflow status, raise a WorkflowExecutionError""" + for step in self.workflow_steps(): + if ( + step.get_status().failures + and step.get_status().calculate_success() + < self.workflow_config.successThreshold + ): + raise WorkflowExecutionError( + f"{step.name} reported errors: {Summary.from_step(step)}" + ) + + if raise_warnings and step.status.warnings: + raise WorkflowExecutionError( + f"{step.name} reported warning: {Summary.from_step(step)}" + ) def execute(self) -> None: """ @@ -186,7 +216,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): try: self.execute_internal() - if SUCCESS_THRESHOLD_VALUE <= self.calculate_success() < 100: + if self.workflow_config.successThreshold <= self.calculate_success() < 100: pipeline_state = PipelineState.partialSuccess # Any unhandled exception breaking the workflow should update the status diff --git a/ingestion/src/metadata/workflow/ingestion.py b/ingestion/src/metadata/workflow/ingestion.py index cfa78c1259e..1e28f501317 100644 --- a/ingestion/src/metadata/workflow/ingestion.py +++ b/ingestion/src/metadata/workflow/ingestion.py @@ -24,9 +24,6 @@ from abc import ABC, abstractmethod from typing import List, Tuple, Type, cast from metadata.config.common import WorkflowExecutionError -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) from metadata.generated.schema.entity.services.connections.serviceConnection import ( ServiceConnection, ) @@ -38,7 +35,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.ingestion.api.parser import parse_workflow_config_gracefully -from metadata.ingestion.api.step import Step, Summary +from metadata.ingestion.api.step import Step from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage from metadata.ingestion.models.custom_types import ServiceWithConnectionType from metadata.profiler.api.models import ProfilerProcessorConfig @@ -55,14 +52,15 @@ from metadata.utils.importer import ( ) from metadata.utils.logger import ingestion_logger from metadata.workflow.base import BaseWorkflow, InvalidWorkflowJSONException -from metadata.workflow.workflow_status_mixin import SUCCESS_THRESHOLD_VALUE logger = ingestion_logger() class IngestionWorkflow(BaseWorkflow, ABC): """ - Base Ingestion Workflow implementation + Base Ingestion Workflow implementation. This is used for all + workflows minus the application one, which directly inherits the + BaseWorkflow. """ config: OpenMetadataWorkflowConfig @@ -79,14 +77,9 @@ class IngestionWorkflow(BaseWorkflow, ABC): self.config.source.type ) - metadata_config: OpenMetadataConnection = ( - self.config.workflowConfig.openMetadataServerConfig - ) - super().__init__( config=config, - log_level=config.workflowConfig.loggerLevel, - metadata_config=metadata_config, + workflow_config=config.workflowConfig, service_type=self.service_type, ) @@ -137,37 +130,12 @@ class IngestionWorkflow(BaseWorkflow, ABC): if bulk_sink: bulk_sink.run() - def calculate_success(self) -> float: - return self.source.get_status().calculate_success() - def get_failures(self) -> List[StackTraceError]: return self.source.get_status().failures def workflow_steps(self) -> List[Step]: return [self.source] + list(self.steps) - def raise_from_status_internal(self, raise_warnings=False): - """ - Check the status of all steps - """ - if ( - self.source.get_status().failures - and self.calculate_success() < SUCCESS_THRESHOLD_VALUE - ): - raise WorkflowExecutionError( - f"{self.source.name} reported errors: {Summary.from_step(self.source)}" - ) - - for step in self.steps: - if step.status.failures: - raise WorkflowExecutionError( - f"{step.name} reported errors: {Summary.from_step(step)}" - ) - if raise_warnings and step.status.warnings: - raise WorkflowExecutionError( - f"{step.name} reported warnings: {Summary.from_step(step)}" - ) - def _retrieve_service_connection_if_needed(self, service_type: ServiceType) -> None: """ We override the current `serviceConnection` source config object if source workflow service already exists diff --git a/ingestion/src/metadata/workflow/workflow_output_handler.py b/ingestion/src/metadata/workflow/workflow_output_handler.py index d1a2070e2a7..06df1f0dd54 100644 --- a/ingestion/src/metadata/workflow/workflow_output_handler.py +++ b/ingestion/src/metadata/workflow/workflow_output_handler.py @@ -14,6 +14,7 @@ Module handles the output messages from different workflows """ import time +from statistics import mean from typing import Any, Dict, List, Optional, Type, Union from pydantic import BaseModel @@ -114,16 +115,15 @@ class WorkflowOutputHandler: self._print_summary(steps) - def _print_summary(self, steps: List[Step]): + def _print_summary(self, steps: List[Step]) -> None: failures: List[Failure] = [] - total_records: int = 0 - total_errors: int = 0 + if not steps: + log_ansi_encoded_string(message="No steps to process.") + return for step in steps: step_summary = Summary.from_step(step) - total_records += step_summary.records or 0 - total_errors += step_summary.errors or 0 failures.append( Failure(name=step.name, failures=step.get_status().failures) ) @@ -141,15 +141,18 @@ class WorkflowOutputHandler: log_ansi_encoded_string(message=f"Filtered: {step_summary.filtered}") log_ansi_encoded_string(message=f"Errors: {step_summary.errors}") + log_ansi_encoded_string( + message=f"Success %: {step.get_status().calculate_success()}" + ) self._print_failures_if_apply(failures) - total_success = max(total_records, 1) + # If nothing is processed, we'll have a success of 100% + success_pct = mean([step.get_status().calculate_success() for step in steps]) log_ansi_encoded_string( color=ANSI.BRIGHT_CYAN, bold=True, - message="Success %: " - + f"{round(total_success * 100 / (total_success + total_errors), 2)}", + message="Workflow Success %: " + f"{round(success_pct, 2)}", ) def _print_debug_summary(self, steps: List[Step]): diff --git a/ingestion/src/metadata/workflow/workflow_status_mixin.py b/ingestion/src/metadata/workflow/workflow_status_mixin.py index e648ed00d43..fe8d99715c2 100644 --- a/ingestion/src/metadata/workflow/workflow_status_mixin.py +++ b/ingestion/src/metadata/workflow/workflow_status_mixin.py @@ -37,8 +37,6 @@ from metadata.utils.logger import ometa_logger logger = ometa_logger() -SUCCESS_THRESHOLD_VALUE = 90 - class WorkflowResultStatus(Enum): SUCCESS = 0 diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index 8cc358c6f9d..aeeb3df971d 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -26,9 +26,6 @@ from metadata.generated.schema.entity.data.table import ( Table, TableProfilerConfig, ) -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) from metadata.generated.schema.entity.services.databaseService import ( DatabaseService, DatabaseServiceType, @@ -36,6 +33,7 @@ from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( DatabaseServiceProfilerPipeline, ) +from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.generated.schema.type.entityReference import EntityReference from metadata.profiler.api.models import ProfilerProcessorConfig from metadata.profiler.interface.sqlalchemy.profiler_interface import ( @@ -122,7 +120,7 @@ def test_init_workflow(mocked_method, mocked_orm): # pylint: disable=unused-arg mocked_method.assert_called() assert isinstance(workflow.source.source_config, DatabaseServiceProfilerPipeline) - assert isinstance(workflow.metadata_config, OpenMetadataConnection) + assert isinstance(workflow.workflow_config, WorkflowConfig) profiler_processor_step = workflow.steps[0] assert isinstance(profiler_processor_step.profiler_config, ProfilerProcessorConfig) diff --git a/openmetadata-docs/content/partials/v1.6/deployment/upgrade/upgrade-prerequisites.md b/openmetadata-docs/content/partials/v1.6/deployment/upgrade/upgrade-prerequisites.md index 09a6f8e63ed..24bc4ebc30d 100644 --- a/openmetadata-docs/content/partials/v1.6/deployment/upgrade/upgrade-prerequisites.md +++ b/openmetadata-docs/content/partials/v1.6/deployment/upgrade/upgrade-prerequisites.md @@ -113,75 +113,14 @@ We believe this update will bring greater consistency and clarity to our version # Backward Incompatible Changes -## 1.5.0 +## 1.6.0 -### Multi Owners -OpenMetadata allows a single user or a team to be tagged as owners for any data assets. In Release 1.5.0, we allow users to tag multiple individual owners or a single team. This will allow organizations to add ownership to multiple individuals without necessarily needing to create a team around them like previously. +### Ingestion Workflow Status -This is a backward incompatible change, if you are using APIs, please make sure the owner field is now changed to “owners” +We are updating how we compute the success percentage. Previously, we took into account for partial success the results +of the Source (e.g., the tables we were able to properly retrieve from Snowflake, Redshift, etc.). This means that we had +an error threshold in there were if up to 90% of the tables were successfully ingested, we would still consider the +workflow as successful. However, any errors when sending the information to OpenMetadata would be considered as a failure. -### Import/Export Format -To support the multi-owner format, we have now changed how we export and import the CSV file in glossary, services, database, schema, table, etc. The new format will be -user:userName;team:TeamName - -If you are importing an older file, please make sure to make this change. - -### Pydantic V2 -The core of OpenMetadata are the JSON Schemas that define the metadata standard. These schemas are automatically translated into Java, Typescript, and Python code with Pydantic classes. - -In this release, we have [migrated](https://docs.pydantic.dev/latest/migration/) the codebase from Pydantic V1 to Pydantic V2. - -### Deployment Related Changes (OSS only) - -`./bootstrap/bootstrap_storage.sh` **removed** - -OpenMetadata community has built rolling upgrades to database schema and the data to make upgrades easier. This tool is now called as ./bootstrap/openmetadata-ops.sh and has been part of our releases since 1.3. The `bootstrap_storage.sh` doesn’t support new native schemas in OpenMetadata. Hence, we have deleted this tool from this release. - -While upgrading, please refer to our Upgrade Notes in the documentation. Always follow the best practices provided there. - -### Database Connection Pooling - -OpenMetadata uses Jdbi to handle database-related operations such as read/write/delete. In this release, we introduced additional configs to help with connection pooling, allowing the efficient use of a database with low resources. - -Please update the defaults if your cluster is running at a large scale to scale up the connections efficiently. - -For the new configuration, please refer to the [doc](https://docs.open-metadata.org/latest/deployment/database-connection-pooling) here - -### Data Insights - -The Data Insights application is meant to give you a quick glance at your data's state and allow you to take action based on the information you receive. To continue pursuing this objective, the application was completely refactored to allow customizability. - -Part of this refactor was making Data Insights an internal application, no longer relying on an external pipeline. This means triggering Data Insights from the Python SDK will no longer be possible. - -With this change you will need to run a backfill on the Data Insights for the last couple of days since the Data Assets data changed. - -### UI Changes - -#### New Explore Page - -Explore page displays hierarchically organized data assets by grouping them into `services > database > schema > tables/stored procedures`. This helps users organically find the data asset they are looking for based on a known database or schema they were using. This is a new feature and changes the way the Explore page was built in previous releases. - -#### Connector Schema Changes - -In the latest release, several updates and enhancements have been made to the JSON schema across various connectors. These changes aim to improve security, configurability, and expand integration capabilities. Here's a detailed breakdown of the updates: - -- **KafkaConnect**: Added `schemaRegistryTopicSuffixName` to enhance topic configuration flexibility for schema registries. -- **GCS Datalake**: Introduced `bucketNames` field, allowing users to specify targeted storage buckets within the Google Cloud Storage environment. -- **OpenLineage**: Added `saslConfig` to enhance security by enabling SASL (Simple Authentication and Security Layer) configuration. -- **Salesforce**: Added sslConfig to strengthen the security layer for Salesforce connections by supporting SSL. -- **DeltaLake**: Updated schema by moving metastoreConnection to a newly created `metastoreConfig.json` file. Additionally, introduced `configSource` to better define source configurations, with new support for `metastoreConfig.json` and `storageConfig.json`. -- **Iceberg RestCatalog**: Removed clientId and `clientSecret` as mandatory fields, making the schema more flexible for different authentication methods. -- **DBT Cloud Pipelines**: Added as a new connector to support cloud-native data transformation workflows using DBT. -- **Looker**: Expanded support to include connections using GitLab integration, offering more flexible and secure version control. -- **Tableau**: Enhanced support by adding capabilities for connecting with `TableauPublishedDatasource` and `TableauEmbeddedDatasource`, providing more granular control over data visualization and reporting. - -### Include DDL -During the Database Metadata ingestion, we can optionally pick up the DDL for both tables and views. During the metadata ingestion, we use the view DDLs to generate the View Lineage. - -To reduce the processing time for out-of-the-box workflows, we are disabling the include DDL by default, whereas before, it was enabled, which potentially led to long-running workflows. - -### Secrets Manager -Starting with the release 1.5.0, the JWT Token for the bots will be sent to the Secrets Manager if you configured one. It won't appear anymore in your dag_generated_configs in Airflow. - -### Python SDK -The `metadata insight` command has been removed. Since Data Insights application was moved to be an internal system application instead of relying on external pipelines the SDK command to run the pipeline was removed. +Now, we're changing this behavior to consider the success rate of all the steps involved in the workflow. The UI will +then show more `Partial Success` statuses rather than `Failed`, properly reflecting the real state of the workflow. diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json index 1a1b17787e6..1703f4b5b80 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json @@ -163,6 +163,14 @@ "$ref": "#/definitions/logLevels", "default": "INFO" }, + "successThreshold": { + "title": "Success Threshold", + "description": "The percentage of successfully processed records that must be achieved for the pipeline to be considered successful. Otherwise, the pipeline will be marked as failed.", + "type": "integer", + "default": 90, + "minimum": 0, + "maximum": 100 + }, "openMetadataServerConfig": { "$ref": "../entity/services/connections/metadata/openMetadataConnection.json" },