MINOR: Refactor output_handlers to a WorkflowOutputHandler class (#17149)

* Refactor output_handlers to a WorkflowOutputHandler class

* Add old methods as deprecated to avoid breaking changes

* Extract WorkflowInitErrorHandler from workflow_output_handler

* Fix static checks

* Fix tests

* Fix tests

* Update code based on comments from PR

* Update comment
This commit is contained in:
IceS2 2024-07-29 09:20:34 +02:00 committed by GitHub
parent 20754ab28a
commit c522f14178
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
50 changed files with 607 additions and 487 deletions

View File

@ -14,8 +14,6 @@ from datetime import timedelta
import yaml
from airflow import DAG
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
@ -63,7 +61,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -20,8 +20,6 @@ from datetime import timedelta
import yaml
from airflow import DAG
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
@ -72,7 +70,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -14,8 +14,6 @@ from datetime import timedelta
import yaml
from airflow import DAG
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
@ -62,7 +60,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -14,8 +14,6 @@ from datetime import timedelta
from airflow import DAG
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
@ -88,7 +86,7 @@ def metadata_ingestion_workflow():
workflow = UsageWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -25,7 +25,6 @@ from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.usage import UsageWorkflow
from metadata.workflow.workflow_output_handler import print_status
WORKFLOW_MAP = {
PipelineType.metadata.value: MetadataWorkflow,
@ -109,7 +108,7 @@ def main():
workflow = workflow_class.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -171,8 +171,17 @@ ignore = [
"src/metadata/readers/*",
"src/metadata/timer/*",
"src/metadata/utils/*",
"src/metadata/workflow/*",
"src/metadata/workflow/base.py",
"src/metadata/workflow/application.py",
"src/metadata/workflow/data_insight.py",
"src/metadata/workflow/data_quality.py",
"src/metadata/workflow/ingestion.py",
"src/metadata/workflow/metadata.py",
"src/metadata/workflow/profiler.py",
"src/metadata/workflow/usage.py",
"src/metadata/workflow/workflow_status_mixin.py",
]
reportDeprecated = false
reportMissingTypeStubs = false
reportAny = false

View File

@ -19,7 +19,6 @@ from pathlib import Path
from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger
from metadata.workflow.application import ApplicationWorkflow
from metadata.workflow.application_output_handler import print_status
logger = cli_logger()
@ -42,5 +41,5 @@ def run_app(config_path: Path) -> None:
workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()

View File

@ -17,13 +17,12 @@ import traceback
from pathlib import Path
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_status,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler
logger = cli_logger()
@ -42,10 +41,12 @@ def run_test(config_path: Path) -> None:
workflow = TestSuiteWorkflow.create(workflow_config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, workflow_config_dict, WorkflowType.TEST)
WorkflowInitErrorHandler.print_init_error(
exc, workflow_config_dict, PipelineType.TestSuite
)
sys.exit(1)
workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()

View File

@ -17,13 +17,12 @@ import traceback
from pathlib import Path
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_status,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler
logger = cli_logger()
@ -42,10 +41,12 @@ def run_ingest(config_path: Path) -> None:
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INGEST)
WorkflowInitErrorHandler.print_init_error(
exc, config_dict, PipelineType.metadata
)
sys.exit(1)
workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()

View File

@ -17,13 +17,12 @@ import traceback
from pathlib import Path
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.workflow.data_insight import DataInsightWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_status,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler
logger = cli_logger()
@ -42,10 +41,12 @@ def run_insight(config_path: Path) -> None:
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INSIGHT)
WorkflowInitErrorHandler.print_init_error(
exc, config_dict, PipelineType.dataInsight
)
sys.exit(1)
workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()

View File

@ -21,11 +21,14 @@ from pydantic import BaseModel
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import UTF_8
from metadata.utils.logger import cli_logger
from metadata.workflow.workflow_output_handler import WorkflowType, print_init_error
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler
logger = cli_logger()
@ -53,7 +56,9 @@ def run_lineage(config_path: Path) -> None:
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INGEST)
WorkflowInitErrorHandler.print_init_error(
exc, config_dict, PipelineType.lineage
)
sys.exit(1)
if workflow.filePath:

View File

@ -17,13 +17,12 @@ import traceback
from pathlib import Path
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_status,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler
logger = cli_logger()
@ -42,10 +41,12 @@ def run_profiler(config_path: Path) -> None:
workflow = ProfilerWorkflow.create(workflow_config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, workflow_config_dict, WorkflowType.PROFILE)
WorkflowInitErrorHandler.print_init_error(
exc, workflow_config_dict, PipelineType.profiler
)
sys.exit(1)
workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()

View File

@ -17,13 +17,12 @@ import traceback
from pathlib import Path
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.workflow.usage import UsageWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_status,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler
logger = cli_logger()
@ -42,10 +41,10 @@ def run_usage(config_path: Path) -> None:
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INGEST)
WorkflowInitErrorHandler.print_init_error(exc, config_dict, PipelineType.usage)
sys.exit(1)
workflow.execute()
workflow.stop()
print_status(workflow)
workflow.print_status()
workflow.raise_from_status()

View File

@ -19,7 +19,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.ometa_api import C, OpenMetadata, T
from metadata.utils import fqn
from metadata.utils.logger import ometa_logger
@ -28,7 +28,7 @@ logger = ometa_logger()
def create_ometa_client(
metadata_config: OpenMetadataConnection,
) -> OpenMetadata:
) -> OpenMetadata[T, C]: # pyright: ignore[reportInvalidTypeVarUse]
"""Create an OpenMetadata client
Args:
@ -38,7 +38,7 @@ def create_ometa_client(
OpenMetadata: an OM client
"""
try:
metadata = OpenMetadata(metadata_config)
metadata = OpenMetadata[T, C](metadata_config)
metadata.health_check()
return metadata
except Exception as exc:

View File

@ -62,6 +62,7 @@ from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig
SERVICE_TYPE_REF = {
ServiceType.Database.value: "databaseService",
@ -102,14 +103,14 @@ def _clean(source_type: str):
return source_type
def get_pipeline_type_from_source_config(source_config_type) -> PipelineType:
def get_pipeline_type_from_source_config(source_config: SourceConfig) -> PipelineType:
"""From the YAML serviceType, get the Ingestion Pipeline Type"""
pipeline_type = SOURCE_CONFIG_TYPE_INGESTION.get(
source_config_type.__class__.__name__
source_config.config.__class__.__name__
)
if not pipeline_type:
raise ValueError(
f"Cannot find Pipeline Type for SourceConfig {source_config_type}"
f"Cannot find Pipeline Type for SourceConfig {source_config.config}"
)
return pipeline_type

View File

@ -17,7 +17,7 @@ import threading
from copy import deepcopy
from functools import wraps
from time import perf_counter
from typing import List, Optional
from typing import Dict, List, Optional
from pydantic import BaseModel
@ -81,7 +81,7 @@ class ExecutionTimeTrackerState(metaclass=Singleton):
def __init__(self):
"""Initializes the state and the lock."""
self.state = {}
self.state: Dict[str, float] = {}
self.lock = threading.Lock()
def add(self, context: ExecutionTimeTrackerContext, elapsed: float):

View File

@ -8,27 +8,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module handles the output messages for applications
Generic Workflow entrypoint to execute Applications
"""
import time
from metadata.utils.helpers import pretty_print_time_duration
from metadata.utils.logger import ANSI, log_ansi_encoded_string
from metadata.workflow.output_handler import print_workflow_summary
from metadata.utils.deprecation import deprecated
from metadata.workflow.base import BaseWorkflow
def print_status(workflow: "ApplicationWorkflow") -> None:
"""
Print the workflow results
"""
print_workflow_summary(workflow)
if workflow.runner.get_status().source_start_time:
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message="Workflow finished in time: "
f"{pretty_print_time_duration(time.time()-workflow.runner.get_status().source_start_time)}",
)
@deprecated(message="Use 'workflow.print_status()' instead.", release="1.6")
def print_status(workflow: BaseWorkflow):
workflow.print_status()

View File

@ -12,6 +12,7 @@
Base workflow definition.
"""
import traceback
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
@ -47,7 +48,7 @@ from metadata.utils.class_helper import (
from metadata.utils.execution_time_tracker import ExecutionTimeTracker
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger, set_loggers_level
from metadata.workflow.output_handler import report_ingestion_status
from metadata.workflow.workflow_output_handler import WorkflowOutputHandler
from metadata.workflow.workflow_status_mixin import (
SUCCESS_THRESHOLD_VALUE,
WorkflowStatusMixin,
@ -84,10 +85,12 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
log_level: LogLevels,
metadata_config: OpenMetadataConnection,
service_type: ServiceType,
output_handler: WorkflowOutputHandler = WorkflowOutputHandler(),
):
"""
Disabling pylint to wait for workflow reimplementation as a topology
"""
self.output_handler = output_handler
self.config = config
self.service_type = service_type
self._timer: Optional[RepeatedTimer] = None
@ -136,7 +139,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
"""
if not self._timer:
self._timer = RepeatedTimer(
REPORTS_INTERVAL_SECONDS, report_ingestion_status, logger, self
REPORTS_INTERVAL_SECONDS, self._report_ingestion_status
)
return self._timer
@ -249,7 +252,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
),
),
pipelineType=get_pipeline_type_from_source_config(
self.config.source.sourceConfig.config
self.config.source.sourceConfig
),
sourceConfig=self.config.source.sourceConfig,
airflowConfig=AirflowConfig(),
@ -277,3 +280,37 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
entity=get_service_class_from_service_type(self.service_type),
fqn=self.config.source.serviceName,
)
def _report_ingestion_status(self):
"""
Given a logger, use it to INFO the workflow status
"""
try:
for step in self.workflow_steps():
logger.info(
f"{step.name}: Processed {len(step.status.records)} records,"
f" filtered {len(step.status.filtered)} records,"
f" found {len(step.status.failures)} errors"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Wild exception reporting status - {exc}")
def _is_debug_enabled(self) -> bool:
return (
hasattr(self, "config")
and hasattr(self.config, "workflowConfig")
and hasattr(self.config.workflowConfig, "loggerLevel")
and self.config.workflowConfig.loggerLevel is LogLevels.DEBUG
)
def print_status(self):
start_time = self.workflow_steps()[0].get_status().source_start_time
self.output_handler.print_status(
self.result_status(),
self.workflow_steps(),
start_time,
self._is_debug_enabled(),
)

View File

@ -10,41 +10,21 @@
# limitations under the License.
"""
Common Output Handling methods
Module that handles the legacy WorkflowType until deprecation
"""
import traceback
from enum import Enum
from logging import Logger
from pathlib import Path
from typing import Dict, List
from typing import Optional
from pydantic import BaseModel
from tabulate import tabulate
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.ingestion.api.step import Summary
from metadata.ingestion.lineage.models import QueryParsingFailures
from metadata.utils.execution_time_tracker import ExecutionTimeTracker
from metadata.utils.helpers import pretty_print_time_duration
from metadata.utils.logger import ANSI, log_ansi_encoded_string
WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures"
WORKFLOW_WARNING_MESSAGE = "Workflow finished with warnings"
WORKFLOW_SUCCESS_MESSAGE = "Workflow finished successfully"
class Failure(BaseModel):
"""
Auxiliary class to print the error per status
"""
name: str
failures: List[StackTraceError]
from metadata.utils.deprecation import deprecated
@deprecated(
message="Use 'PipelineType' in 'metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline'",
release="1.6",
)
class WorkflowType(Enum):
"""
Workflow type enums based on the `metadata` CLI commands
@ -59,190 +39,31 @@ class WorkflowType(Enum):
APP = "application"
EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows"
# TODO: Delete this method after the removal of WorkflowType in release 1.6
# Remember to remove it where it is being used
def workflow_type_to_pipeline_type(
workflow_type: WorkflowType, source_type_name: Optional[str]
) -> PipelineType:
"""Helper Function to Map between the Deprecated WorkflowType to PipelineType."""
URLS = {
WorkflowType.INGEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/metadata",
WorkflowType.PROFILE: "https://docs.open-metadata.org/how-to-guides/data-quality-observability/profiler/workflow",
WorkflowType.TEST: "https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality",
WorkflowType.LINEAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/lineage",
WorkflowType.USAGE: "https://docs.open-metadata.org/connectors/ingestion/workflows/usage",
}
def _fix_ingest_type() -> PipelineType:
"""Helper Function to Map between the Deprecated WorkflowType.INGESTION and the
correct PipelineType."""
if source_type_name:
if source_type_name.endswith("lineage"):
return PipelineType.lineage
if source_type_name.endswith("usage"):
return PipelineType.usage
return PipelineType.metadata
DEFAULT_EXAMPLE_FILE = {
WorkflowType.INGEST: "bigquery",
WorkflowType.PROFILE: "bigquery_profiler",
WorkflowType.TEST: "test_suite",
WorkflowType.LINEAGE: "bigquery_lineage",
WorkflowType.USAGE: "bigquery_usage",
}
def print_more_info(workflow_type: WorkflowType) -> None:
"""
Print more information message
"""
log_ansi_encoded_string(
message=f"\nFor more information, please visit: {URLS[workflow_type]}"
"\nOr join us in Slack: https://slack.open-metadata.org/"
)
def print_error_msg(msg: str) -> None:
"""
Print message with error style
"""
log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}")
def get_failures(failure: Failure) -> List[Dict[str, str]]:
return [
{
"From": failure.name,
"Entity Name": f.name,
"Message": f.error,
"Stack Trace": f.stackTrace,
}
for f in failure.failures
]
def print_failures_if_apply(failures: List[Failure]) -> None:
# take only the ones that contain failures
failures = [f for f in failures if f.failures]
if failures:
# create a list of dictionaries' list
all_data = [get_failures(failure) for failure in failures]
# create a single of dictionaries
data = [f for fs in all_data for f in fs]
# create a dictionary with a key and a list of values from the list
error_table = {k: [dic[k] for dic in data] for k in data[0]}
if len(list(error_table.items())[0][1]) > 100:
log_ansi_encoded_string(
bold=True, message="Showing only the first 100 failures:"
)
# truncate list if number of values are over 100
error_table = {k: v[:100] for k, v in error_table.items()}
else:
log_ansi_encoded_string(bold=True, message="List of failures:")
log_ansi_encoded_string(
message=f"\n{tabulate(error_table, headers='keys', tablefmt='grid')}"
)
def is_debug_enabled(workflow) -> bool:
return (
hasattr(workflow, "config")
and hasattr(workflow.config, "workflowConfig")
and hasattr(workflow.config.workflowConfig, "loggerLevel")
and workflow.config.workflowConfig.loggerLevel is LogLevels.DEBUG
)
def print_execution_time_summary():
"""Log the ExecutionTimeTracker Summary."""
tracker = ExecutionTimeTracker()
summary_table = {
"Context": [],
"Execution Time Aggregate": [],
map_ = {
WorkflowType.INGEST: _fix_ingest_type(),
WorkflowType.PROFILE: PipelineType.profiler,
WorkflowType.TEST: PipelineType.TestSuite,
WorkflowType.LINEAGE: PipelineType.lineage,
WorkflowType.USAGE: PipelineType.usage,
WorkflowType.INSIGHT: PipelineType.dataInsight,
WorkflowType.APP: PipelineType.application,
}
for key in sorted(tracker.state.state.keys()):
summary_table["Context"].append(key)
summary_table["Execution Time Aggregate"].append(
pretty_print_time_duration(tracker.state.state[key])
)
log_ansi_encoded_string(bold=True, message="Execution Time Summary")
log_ansi_encoded_string(message=f"\n{tabulate(summary_table, tablefmt='grid')}")
def print_query_parsing_issues():
"""Log the QueryParsingFailures Summary."""
query_failures = QueryParsingFailures()
summary_table = {
"Query": [],
"Error": [],
}
for failure in query_failures:
summary_table["Query"].append(failure.query)
summary_table["Error"].append(failure.error)
if summary_table["Query"]:
log_ansi_encoded_string(bold=True, message="Query Parsing Error Summary")
log_ansi_encoded_string(
message=f"\n{tabulate(summary_table, tablefmt='grid', headers=summary_table.keys())}"
)
def print_workflow_summary(workflow: "BaseWorkflow") -> None:
"""
Args:
workflow: the workflow status to be printed
Returns:
Print Workflow status when the workflow logger level is DEBUG
"""
if is_debug_enabled(workflow):
print_workflow_status_debug(workflow)
print_execution_time_summary()
print_query_parsing_issues()
failures = []
total_records = 0
total_errors = 0
for step in workflow.workflow_steps():
step_summary = Summary.from_step(step)
total_records += step_summary.records
total_errors += step_summary.errors
failures.append(Failure(name=step.name, failures=step.get_status().failures))
log_ansi_encoded_string(bold=True, message=f"Workflow {step.name} Summary:")
log_ansi_encoded_string(message=f"Processed records: {step_summary.records}")
log_ansi_encoded_string(
message=f"Updated records: {step_summary.updated_records}"
)
log_ansi_encoded_string(message=f"Warnings: {step_summary.warnings}")
if step_summary.filtered:
log_ansi_encoded_string(message=f"Filtered: {step_summary.filtered}")
log_ansi_encoded_string(message=f"Errors: {step_summary.errors}")
print_failures_if_apply(failures)
total_success = max(total_records, 1)
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message=f"Success %: "
f"{round(total_success * 100 / (total_success + total_errors), 2)}",
)
def print_workflow_status_debug(workflow: "BaseWorkflow") -> None:
"""Print the statuses from each workflow step"""
log_ansi_encoded_string(bold=True, message="Statuses detailed info:")
for step in workflow.workflow_steps():
log_ansi_encoded_string(bold=True, message=f"{step.name} Status:")
log_ansi_encoded_string(message=step.get_status().as_string())
def report_ingestion_status(logger: Logger, workflow: "BaseWorkflow") -> None:
"""
Given a logger, use it to INFO the workflow status
"""
try:
for step in workflow.workflow_steps():
logger.info(
f"{step.name}: Processed {len(step.status.records)} records,"
f" filtered {len(step.status.filtered)} records,"
f" found {len(step.status.failures)} errors"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Wild exception reporting status - {exc}")
return map_.get(workflow_type, PipelineType.metadata)

View File

@ -0,0 +1,154 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module handles the init error messages from different workflows
"""
import traceback
from pathlib import Path
from typing import Any, Dict, Optional, Type, Union
from metadata.config.common import ConfigurationError
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.ingestion.api.parser import (
InvalidWorkflowException,
ParsingConfigurationError,
)
from metadata.utils.constants import UTF_8
from metadata.utils.logger import ANSI, log_ansi_encoded_string
EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows"
URLS: Dict[PipelineType, str] = {
PipelineType.metadata: "https://docs.open-metadata.org/connectors/ingestion/workflows/metadata",
PipelineType.profiler: "https://docs.open-metadata.org/connectors/ingestion/workflows/profiler",
PipelineType.TestSuite: "https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality",
PipelineType.lineage: "https://docs.open-metadata.org/connectors/ingestion/workflows/lineage",
PipelineType.usage: "https://docs.open-metadata.org/connectors/ingestion/workflows/usage",
PipelineType.dbt: "https://docs.open-metadata.org/connectors/ingestion/workflows/dbt",
}
DEFAULT_EXAMPLE_FILE: Dict[PipelineType, str] = {
PipelineType.metadata: "bigquery",
PipelineType.profiler: "bigquery_profiler",
PipelineType.TestSuite: "test_suite",
PipelineType.lineage: "bigquery_lineage",
PipelineType.usage: "bigquery_usage",
}
class WorkflowInitErrorHandler:
"""Resonsible for handling the InitError flow, when a Workflow errors before even initializing."""
@staticmethod
def print_init_error(
exc: Union[Exception, Type[Exception]],
config: Dict[str, Any],
pipeline_type: PipelineType = PipelineType.metadata,
):
"""
Print a workflow initialization error
"""
source_type_name = WorkflowInitErrorHandler._get_source_type_name(config)
if isinstance(
exc,
(ParsingConfigurationError, ConfigurationError, InvalidWorkflowException),
):
WorkflowInitErrorHandler._print_error_msg(
f"Error loading {pipeline_type.name} configuration: {exc}"
)
WorkflowInitErrorHandler._print_file_example(
source_type_name, pipeline_type
)
else:
WorkflowInitErrorHandler._print_error_msg(
f"\nError initializing {pipeline_type.name}: {exc}"
)
WorkflowInitErrorHandler._print_error_msg(traceback.format_exc())
WorkflowInitErrorHandler._print_more_info(pipeline_type)
@staticmethod
def _get_source_type_name(config: Dict[str, Any]) -> Optional[str]:
"""Returns the Source Type Name based on the Configuration passed."""
source_type_name = None
if (
config
and config.get("source", None) is not None
and config["source"].get("type", None) is not None
):
source_type_name = config["source"].get("type")
source_type_name = source_type_name.replace("-", "-")
return source_type_name
@staticmethod
def _print_file_example(
source_type_name: Optional[str], pipeline_type: PipelineType
):
"""
Print an example file for a given configuration
"""
if source_type_name is not None:
example_file = WorkflowInitErrorHandler._calculate_example_file(
source_type_name, pipeline_type
)
example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml"
if not example_path.exists():
example_file = DEFAULT_EXAMPLE_FILE[pipeline_type]
example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml"
log_ansi_encoded_string(
message=f"\nMake sure you are following the following format e.g. '{example_file}':"
)
log_ansi_encoded_string(message="------------")
with open(example_path, encoding=UTF_8) as file:
log_ansi_encoded_string(message=file.read())
log_ansi_encoded_string(message="------------")
@staticmethod
def _calculate_example_file(
source_type_name: str, pipeline_type: PipelineType
) -> str:
"""
Calculates the ingestion type depending on the source type name and workflow_type
"""
if pipeline_type == PipelineType.usage:
return f"{source_type_name}_usage"
if pipeline_type == PipelineType.lineage:
return f"{source_type_name}_lineage"
if pipeline_type == PipelineType.profiler:
return f"{source_type_name}_profiler"
if pipeline_type == PipelineType.TestSuite:
return DEFAULT_EXAMPLE_FILE[pipeline_type]
return source_type_name
@staticmethod
def _print_more_info(pipeline_type: PipelineType) -> None:
"""
Print more information message
"""
log_ansi_encoded_string(
message=f"\nFor more information, please visit: {URLS[pipeline_type]}"
+ "\nOr join us in Slack: https://slack.open-metadata.org/"
)
@staticmethod
def _print_error_msg(msg: str) -> None:
"""
Print message with error style
"""
log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}")

View File

@ -14,127 +14,218 @@ Module handles the output messages from different workflows
"""
import time
import traceback
from typing import Type, Union
from typing import Any, Dict, List, Optional, Type, Union
from metadata.config.common import ConfigurationError
from metadata.ingestion.api.parser import (
InvalidWorkflowException,
ParsingConfigurationError,
)
from metadata.utils.constants import UTF_8
from pydantic import BaseModel
from tabulate import tabulate
from metadata.ingestion.api.status import TruncatedStackTraceError
from metadata.ingestion.api.step import Step, Summary
from metadata.ingestion.lineage.models import QueryParsingFailures
from metadata.utils.deprecation import deprecated
from metadata.utils.execution_time_tracker import ExecutionTimeTracker
from metadata.utils.helpers import pretty_print_time_duration
from metadata.utils.logger import ANSI, log_ansi_encoded_string
from metadata.workflow.output_handler import (
DEFAULT_EXAMPLE_FILE,
EXAMPLES_WORKFLOW_PATH,
WORKFLOW_FAILURE_MESSAGE,
WorkflowType,
print_error_msg,
print_more_info,
print_workflow_summary,
workflow_type_to_pipeline_type,
)
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler
from metadata.workflow.workflow_status_mixin import WorkflowResultStatus
WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures"
WORKFLOW_WARNING_MESSAGE = "Workflow finished with warnings"
WORKFLOW_SUCCESS_MESSAGE = "Workflow finished successfully"
def calculate_ingestion_type(source_type_name: str) -> WorkflowType:
class Failure(BaseModel):
"""
Calculates the ingestion type depending on the source type name
Auxiliary class to print the error per status
"""
if source_type_name.endswith("lineage"):
return WorkflowType.LINEAGE
if source_type_name.endswith("usage"):
return WorkflowType.USAGE
return WorkflowType.INGEST
name: str
failures: List[TruncatedStackTraceError]
def calculate_example_file(source_type_name: str, workflow_type: WorkflowType) -> str:
"""
Calculates the ingestion type depending on the source type name and workflow_type
"""
if workflow_type == WorkflowType.USAGE:
return f"{source_type_name}_usage"
if workflow_type == WorkflowType.LINEAGE:
return f"{source_type_name}_lineage"
if workflow_type == WorkflowType.PROFILE:
return f"{source_type_name}_profiler"
if workflow_type == WorkflowType.TEST:
return DEFAULT_EXAMPLE_FILE[workflow_type]
return source_type_name
def print_file_example(source_type_name: str, workflow_type: WorkflowType):
"""
Print an example file for a given configuration
"""
if source_type_name is not None:
example_file = calculate_example_file(source_type_name, workflow_type)
example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml"
if not example_path.exists():
example_file = DEFAULT_EXAMPLE_FILE[workflow_type]
example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml"
log_ansi_encoded_string(
message=f"\nMake sure you are following the following format e.g. '{example_file}':"
)
log_ansi_encoded_string(message="------------")
with open(example_path, encoding=UTF_8) as file:
log_ansi_encoded_string(message=file.read())
log_ansi_encoded_string(message="------------")
@deprecated(message="Use 'workflow.print_status()' instead.", release="1.6")
def print_status(
workflow: "BaseWorkflow", # pyright: ignore[reportUndefinedVariable,reportUnknownParameterType]
):
workflow.print_status() # pyright: ignore[reportUnknownMemberType]
@deprecated(
message=(
"Use 'WorkflowInitErrorHandler.print_init_error(exc, config, workflow_type)'"
" from 'metadata.workflow.workflow_init_error_handler'"
),
release="1.6",
)
def print_init_error(
exc: Union[Exception, Type[Exception]],
config: dict,
config: Dict[str, Any],
workflow_type: WorkflowType = WorkflowType.INGEST,
) -> None:
"""
Print a workflow initialization error
"""
source_type_name = None
if (
):
# pylint: disable=W0212
source_type_name = WorkflowInitErrorHandler._get_source_type_name( # pyright: ignore[reportPrivateUsage]
config
and config.get("source", None) is not None
and config["source"].get("type", None) is not None
)
WorkflowInitErrorHandler.print_init_error(
exc, config, workflow_type_to_pipeline_type(workflow_type, source_type_name)
)
class WorkflowOutputHandler:
"""Responsible for dealing with the Workflow Outputs"""
def print_status(
self,
result_status: WorkflowResultStatus,
steps: List[Step],
start_time: Optional[Any] = None,
debug: bool = False,
):
source_type_name = config["source"].get("type")
source_type_name = source_type_name.replace("-", "-")
workflow_type = (
calculate_ingestion_type(source_type_name)
if workflow_type == WorkflowType.INGEST
else workflow_type
)
"""
Print the workflow results
"""
self.print_summary(steps, debug)
if isinstance(
exc, (ParsingConfigurationError, ConfigurationError, InvalidWorkflowException)
):
print_error_msg(f"Error loading {workflow_type.name} configuration: {exc}")
print_file_example(source_type_name, workflow_type)
print_more_info(workflow_type)
else:
print_error_msg(f"\nError initializing {workflow_type.name}: {exc}")
print_error_msg(traceback.format_exc())
print_more_info(workflow_type)
if start_time:
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message="Workflow finished in time: "
+ f"{pretty_print_time_duration(time.time() - start_time)}",
)
if result_status == WorkflowResultStatus.FAILURE:
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED,
bold=True,
message=WORKFLOW_FAILURE_MESSAGE,
)
def print_status(workflow: "IngestionWorkflow") -> None:
"""
Print the workflow results
"""
def print_summary(self, steps: List[Step], debug: bool = False):
"""Prints the summary information for a Workflow Execution."""
if debug:
self._print_debug_summary(steps)
self._print_execution_time_summary()
self._print_query_parsing_issues()
print_workflow_summary(workflow)
self._print_summary(steps)
# Get the time to execute the first step
first_step = workflow.workflow_steps()[0]
if first_step.get_status().source_start_time:
def _print_summary(self, steps: List[Step]):
failures: List[Failure] = []
total_records: int = 0
total_errors: int = 0
for step in steps:
step_summary = Summary.from_step(step)
total_records += step_summary.records or 0
total_errors += step_summary.errors or 0
failures.append(
Failure(name=step.name, failures=step.get_status().failures)
)
log_ansi_encoded_string(bold=True, message=f"Workflow {step.name} Summary:")
log_ansi_encoded_string(
message=f"Processed records: {step_summary.records}"
)
log_ansi_encoded_string(
message=f"Updated records: {step_summary.updated_records}"
)
log_ansi_encoded_string(message=f"Warnings: {step_summary.warnings}")
if step_summary.filtered:
log_ansi_encoded_string(message=f"Filtered: {step_summary.filtered}")
log_ansi_encoded_string(message=f"Errors: {step_summary.errors}")
self._print_failures_if_apply(failures)
total_success = max(total_records, 1)
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message="Workflow finished in time: "
f"{pretty_print_time_duration(time.time()-first_step.get_status().source_start_time)}",
message="Success %: "
+ f"{round(total_success * 100 / (total_success + total_errors), 2)}",
)
if workflow.result_status() == 1:
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED,
bold=True,
message=WORKFLOW_FAILURE_MESSAGE,
)
def _print_debug_summary(self, steps: List[Step]):
log_ansi_encoded_string(bold=True, message="Statuses detailed info:")
for step in steps:
log_ansi_encoded_string(bold=True, message=f"{step.name} Status:")
log_ansi_encoded_string(message=step.get_status().as_string())
def _print_execution_time_summary(self):
"""Log the ExecutionTimeTracker Summary."""
tracker = ExecutionTimeTracker()
summary_table: Dict[str, List[Union[str, float]]] = {
"Context": [],
"Execution Time Aggregate": [],
}
for key in sorted(tracker.state.state.keys()):
summary_table["Context"].append(key)
summary_table["Execution Time Aggregate"].append(
pretty_print_time_duration(tracker.state.state[key])
)
log_ansi_encoded_string(bold=True, message="Execution Time Summary")
log_ansi_encoded_string(message=f"\n{tabulate(summary_table, tablefmt='grid')}")
def _print_query_parsing_issues(self):
"""Log the QueryParsingFailures Summary."""
query_failures = QueryParsingFailures()
summary_table: Dict[str, List[Optional[str]]] = {
"Query": [],
"Error": [],
}
for failure in query_failures:
summary_table["Query"].append(failure.query)
summary_table["Error"].append(failure.error)
if summary_table["Query"]:
log_ansi_encoded_string(bold=True, message="Query Parsing Error Summary")
log_ansi_encoded_string(
message=f"\n{tabulate(summary_table, tablefmt='grid', headers=list(summary_table.keys()))}"
)
def _get_failures(self, failure: Failure) -> List[Dict[str, Optional[str]]]:
return [
{
"From": failure.name,
"Entity Name": f.name,
"Message": f.error,
"Stack Trace": f.stackTrace,
}
for f in failure.failures
]
def _print_failures_if_apply(self, failures: List[Failure]) -> None:
# take only the ones that contain failures
failures = [f for f in failures if f.failures]
if failures:
# create a list of dictionaries' list
all_data = [self._get_failures(failure) for failure in failures]
# create a single of dictionaries
data = [f for fs in all_data for f in fs]
# create a dictionary with a key and a list of values from the list
error_table = {k: [dic[k] for dic in data] for k in data[0]}
if len(list(error_table.items())[0][1]) > 100:
log_ansi_encoded_string(
bold=True, message="Showing only the first 100 failures:"
)
# truncate list if number of values are over 100
error_table = {k: v[:100] for k, v in error_table.items()}
else:
log_ansi_encoded_string(bold=True, message="List of failures:")
log_ansi_encoded_string(
message=f"\n{tabulate(error_table, headers='keys', tablefmt='grid')}"
)

View File

@ -14,6 +14,7 @@ Add methods to the workflows for updating the IngestionPipeline status
import traceback
import uuid
from datetime import datetime
from enum import Enum
from typing import Optional, Tuple
from metadata.config.common import WorkflowExecutionError
@ -39,6 +40,11 @@ logger = ometa_logger()
SUCCESS_THRESHOLD_VALUE = 90
class WorkflowResultStatus(Enum):
SUCCESS = 0
FAILURE = 1
class WorkflowStatusMixin:
"""
Helper methods to manage IngestionPipeline status
@ -127,13 +133,13 @@ class WorkflowStatusMixin:
self.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def result_status(self) -> int:
def result_status(self) -> WorkflowResultStatus:
"""
Returns 1 if source status is failed, 0 otherwise.
"""
if self.get_failures():
return 1
return 0
return WorkflowResultStatus.FAILURE
return WorkflowResultStatus.SUCCESS
def build_ingestion_status(self) -> Optional[IngestionStatus]:
"""

View File

@ -38,7 +38,6 @@ from metadata.utils.time_utils import (
get_end_of_day_timestamp_mill,
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
Base = declarative_base()
@ -149,7 +148,7 @@ class TestGreatExpectationIntegration(TestCase):
ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
print_status(ingestion_workflow)
ingestion_workflow.print_status()
ingestion_workflow.stop()
@classmethod

View File

@ -61,7 +61,6 @@ from metadata.utils.time_utils import (
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent
BIGQUERY_CONFIG_FILE = "cli_e2e/database/bigquery/bigquery.yaml"
@ -143,7 +142,7 @@ class TestBigquerySystem(TestCase):
ingestion_workflow = MetadataWorkflow.create(cls.config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
print_status(ingestion_workflow)
ingestion_workflow.print_status()
ingestion_workflow.stop()
# get table fqn
@ -168,7 +167,7 @@ class TestBigquerySystem(TestCase):
profiler_workflow = ProfilerWorkflow.create(config)
profiler_workflow.execute()
profiler_workflow.raise_from_status()
print_status(profiler_workflow)
profiler_workflow.print_status()
profiler_workflow.stop()
# get latest profile metrics

View File

@ -47,7 +47,6 @@ from metadata.utils.time_utils import (
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent
REDSHIFT_CONFIG_FILE = "cli_e2e/database/redshift/redshift.yaml"
@ -110,7 +109,7 @@ class TestRedshiftSystem(TestCase):
ingestion_workflow = MetadataWorkflow.create(cls.config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
print_status(ingestion_workflow)
ingestion_workflow.print_status()
ingestion_workflow.stop()
# get table fqn
@ -135,7 +134,7 @@ class TestRedshiftSystem(TestCase):
profiler_workflow = ProfilerWorkflow.create(config)
profiler_workflow.execute()
profiler_workflow.raise_from_status()
print_status(profiler_workflow)
profiler_workflow.print_status()
profiler_workflow.stop()
# get latest profile metrics

View File

@ -23,12 +23,12 @@ How to use this test
We will need to perform at least one `DELETE`, `INSERT`, `UPDATE` on any table from the schema.
query example:
```
INSERT INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES
INSERT INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES
(1, 'FOO'),
(2, 'BAR'),
(3, 'BAZZ')
INSERT OVERWRITE INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES
INSERT OVERWRITE INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES
(4, 'FOOBAR'),
(5, 'FOOBAZZ'),
(6, 'BARBAZZ')
@ -66,7 +66,6 @@ from metadata.utils.time_utils import (
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent
SNOWFLAKE_CONFIG_FILE = "cli_e2e/database/snowflake/snowflake.yaml"
@ -134,7 +133,7 @@ class TestSnowflakeystem(TestCase):
ingestion_workflow = MetadataWorkflow.create(cls.config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
print_status(ingestion_workflow)
ingestion_workflow.print_status()
ingestion_workflow.stop()
# get table fqn
@ -159,7 +158,7 @@ class TestSnowflakeystem(TestCase):
profiler_workflow = ProfilerWorkflow.create(config)
profiler_workflow.execute()
profiler_workflow.raise_from_status()
print_status(profiler_workflow)
profiler_workflow.print_status()
profiler_workflow.stop()
# get latest profile metrics

View File

@ -41,7 +41,7 @@ from metadata.utils.time_utils import (
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
from metadata.workflow.workflow_output_handler import WorkflowResultStatus
SERVICE_NAME = Path(__file__).stem
REGION = "us-west-1"
@ -139,7 +139,7 @@ class DatalakeProfilerTestE2E(TestCase):
ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
print_status(ingestion_workflow)
ingestion_workflow.print_status()
ingestion_workflow.stop()
def test_datalake_profiler_workflow(self):
@ -159,7 +159,7 @@ class DatalakeProfilerTestE2E(TestCase):
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
assert status == WorkflowResultStatus.SUCCESS
table_profile = self.metadata.get_profile_data(
f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"',
@ -207,7 +207,7 @@ class DatalakeProfilerTestE2E(TestCase):
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
assert status == WorkflowResultStatus.SUCCESS
table = self.metadata.get_by_name(
entity=Table,
@ -252,7 +252,7 @@ class DatalakeProfilerTestE2E(TestCase):
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
assert status == WorkflowResultStatus.SUCCESS
table = self.metadata.get_by_name(
entity=Table,
@ -298,7 +298,7 @@ class DatalakeProfilerTestE2E(TestCase):
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
assert status == WorkflowResultStatus.SUCCESS
table = self.metadata.get_by_name(
entity=Table,
@ -355,7 +355,7 @@ class DatalakeProfilerTestE2E(TestCase):
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
assert status == WorkflowResultStatus.SUCCESS
table = self.metadata.get_by_name(
entity=Table,

View File

@ -44,7 +44,7 @@ from metadata.utils.time_utils import (
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
from metadata.workflow.workflow_output_handler import WorkflowResultStatus
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)
@ -185,7 +185,7 @@ class ProfilerWorkflowTest(TestCase):
ingestion_workflow = MetadataWorkflow.create(ingestion_config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
print_status(ingestion_workflow)
ingestion_workflow.print_status()
ingestion_workflow.stop()
@classmethod
@ -253,7 +253,7 @@ class ProfilerWorkflowTest(TestCase):
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
assert status == WorkflowResultStatus.SUCCESS
table = self.metadata.get_by_name(
entity=Table,
@ -278,7 +278,7 @@ class ProfilerWorkflowTest(TestCase):
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
assert status == WorkflowResultStatus.SUCCESS
table = self.metadata.get_by_name(
entity=Table,
@ -309,7 +309,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
print_status(profiler_workflow)
profiler_workflow.print_status()
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -361,7 +361,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
print_status(profiler_workflow)
profiler_workflow.print_status()
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -400,7 +400,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
print_status(profiler_workflow)
profiler_workflow.print_status()
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -448,7 +448,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
print_status(profiler_workflow)
profiler_workflow.print_status()
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -488,7 +488,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
print_status(profiler_workflow)
profiler_workflow.print_status()
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -535,7 +535,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
print_status(profiler_workflow)
profiler_workflow.print_status()
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -575,7 +575,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
print_status(profiler_workflow)
profiler_workflow.print_status()
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -633,7 +633,7 @@ class ProfilerWorkflowTest(TestCase):
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
assert status == WorkflowResultStatus.SUCCESS
table = self.metadata.get_by_name(
entity=Table,
@ -726,7 +726,7 @@ class ProfilerWorkflowTest(TestCase):
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
assert status == WorkflowResultStatus.SUCCESS
table = self.metadata.get_by_name(
entity=Table,

View File

@ -45,7 +45,7 @@ from metadata.utils.test_utils import accumulate_errors
from metadata.utils.time_utils import get_end_of_day_timestamp_mill
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
from metadata.workflow.workflow_output_handler import WorkflowResultStatus
SERVICE_NAME = Path(__file__).stem
@ -144,7 +144,7 @@ class NoSQLProfiler(TestCase):
)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
print_status(ingestion_workflow)
ingestion_workflow.print_status()
ingestion_workflow.stop()
@classmethod
@ -176,7 +176,7 @@ class NoSQLProfiler(TestCase):
profiler_workflow.execute()
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
assert status == WorkflowResultStatus.SUCCESS
def test_simple(self):
workflow_config = deepcopy(self.ingestion_config)

View File

@ -20,7 +20,6 @@ from minio import Minio
from _openmetadata_testutils.ometa import OM_JWT, int_admin_ometa
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
from ..containers import MinioContainerConfigs, get_minio_container
@ -112,7 +111,7 @@ def ingest_s3_storage(minio, metadata, service_name, create_data):
workflow = MetadataWorkflow.create(yaml.safe_load(config))
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
yield

View File

@ -0,0 +1,27 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate the deprecated functions still work.
"""
from metadata.workflow.workflow_output_handler import print_init_error, print_status
from .test_base_workflow import SimpleWorkflow, config
# TODO: remove after the print_status and print_init_error functions are removed in Release 1.6
class TestDeprecatedSimpleWorkflow:
def test_workflow_print_status(self):
workflow = SimpleWorkflow(config=config)
workflow.execute()
print_status(workflow)
def test_workflow_print_init_error(self):
print_init_error(Exception(), config.model_dump())

View File

@ -34,7 +34,6 @@ from metadata.generated.schema.metadataIngestion.applicationPipeline import (
ApplicationPipeline,
)
from metadata.workflow.application import ApplicationWorkflow
from metadata.workflow.application_output_handler import print_status
def application_workflow(workflow_config: OpenMetadataApplicationConfig):
@ -54,7 +53,7 @@ def application_workflow(workflow_config: OpenMetadataApplicationConfig):
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -36,7 +36,6 @@ from metadata.generated.schema.metadataIngestion.application import (
from metadata.generated.schema.type.basic import Timestamp, Uuid
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.workflow.workflow_output_handler import print_status
# pylint: disable=ungrouped-imports
try:
@ -207,7 +206,7 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -42,7 +42,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.data_insight import DataInsightWorkflow
from metadata.workflow.workflow_output_handler import print_status
def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -63,7 +62,7 @@ def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -28,7 +28,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
WorkflowConfig,
)
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -48,7 +47,7 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -28,7 +28,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
WorkflowConfig,
)
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.workflow_output_handler import print_status
def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -48,7 +47,7 @@ def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -32,7 +32,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Stage,
)
from metadata.workflow.usage import UsageWorkflow
from metadata.workflow.workflow_output_handler import print_status
def usage_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -52,7 +51,7 @@ def usage_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -15,7 +15,7 @@ def run():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
```

View File

@ -327,7 +327,7 @@ we just need the following few lines of Python code:
```python
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
config = """
<your YAML configuration>
@ -337,7 +337,7 @@ workflow_config = yaml.safe_load(config)
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
```

View File

@ -65,7 +65,7 @@ from airflow.utils.dates import days_ago
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
default_args = {
"owner": "user_name",
@ -108,7 +108,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -54,7 +54,7 @@ except ModuleNotFoundError:
from metadata.config.common import load_config_file
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
from airflow.utils.dates import days_ago
@ -76,7 +76,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
with DAG(
@ -225,7 +225,7 @@ default_args = {
def metadata_ingestion_workflow():
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
import yaml
config = """
...
@ -235,7 +235,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -163,7 +163,7 @@ import yaml
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
CONFIG = f"""
source:
@ -182,7 +182,7 @@ def run():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
@ -286,7 +286,7 @@ except ModuleNotFoundError:
from metadata.config.common import load_config_file
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
# Import the hook
from airflow.hooks.base import BaseHook
@ -326,7 +326,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
with DAG(
@ -363,7 +363,7 @@ except ModuleNotFoundError:
from metadata.config.common import load_config_file
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
config = """
source:
@ -388,7 +388,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
with DAG(

View File

@ -52,7 +52,7 @@ from airflow.utils.dates import days_ago
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
default_args = {
"owner": "user_name",
@ -73,7 +73,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -38,7 +38,7 @@ import yaml
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
CONFIG = f"""
source:
@ -57,7 +57,7 @@ def run():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()

View File

@ -27,7 +27,7 @@ ingestion from within a simple Python script:
```python
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
# Specify your YAML configuration
CONFIG = """
@ -46,7 +46,7 @@ def run():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
@ -357,7 +357,7 @@ import yaml
```python {% srNumber=1 %}
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
```
@ -396,7 +396,7 @@ def run():
workflow = MetadataWorkflow.create(CONFIG)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
```
@ -468,7 +468,7 @@ import yaml
```python {% srNumber=1 %}
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
```
@ -503,7 +503,7 @@ def run():
workflow = MetadataWorkflow.create(CONFIG)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
```
@ -574,7 +574,7 @@ import yaml
```python {% srNumber=1 %}
from metadata.workflow.usage import UsageWorkflow
from metadata.workflow.workflow_output_handler import print_status
```
@ -617,7 +617,7 @@ def run():
workflow = UsageWorkflow.create(CONFIG)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
```
@ -690,7 +690,7 @@ import yaml
```python {% srNumber=1 %}
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
```
@ -727,7 +727,7 @@ def run():
workflow = ProfilerWorkflow.create(CONFIG)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
```
@ -804,7 +804,7 @@ import yaml
```python {% srNumber=1 %}
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.workflow_output_handler import print_status
```
@ -840,7 +840,7 @@ def run():
workflow = TestSuiteWorkflow.create(CONFIG)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
```

View File

@ -54,7 +54,7 @@ except ModuleNotFoundError:
from airflow.utils.dates import days_ago
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
default_args = {
"retries": 3,
@ -71,7 +71,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
with DAG(
@ -392,7 +392,7 @@ default_args = {
def metadata_ingestion_workflow():
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
import yaml
@ -403,7 +403,7 @@ YAML config
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
with DAG(

View File

@ -230,12 +230,12 @@ We have reorganized the structure of the `Workflow` classes, which requires upda
The `Workflow` class that you import can then be called as follows:
```python
from metadata.workflow.workflow_output_handler import print_status
workflow = workflow_class.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow) # This method has been updated. Before it was `workflow.print_status()`
workflow.print_status() # This method has been updated. Before it was `workflow.print_status()`
workflow.stop()
```

View File

@ -70,7 +70,7 @@ import yaml
from datetime import timedelta
from airflow import DAG
from metadata.workflow.data_insight import DataInsightWorkflow
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
@ -98,7 +98,7 @@ def metadata_ingestion_workflow():
workflow = DataInsightWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
with DAG(

View File

@ -50,7 +50,7 @@ except ModuleNotFoundError:
from metadata.config.common import load_config_file
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
from airflow.utils.dates import days_ago
default_args = {
@ -71,7 +71,7 @@ def metadata_ingestion_workflow():
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()
with DAG(

View File

@ -169,7 +169,7 @@ If you'd rather have a Python script taking care of the execution, you can use:
```python
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
# Specify your YAML configuration
CONFIG = """
@ -188,7 +188,7 @@ def run():
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow)
workflow.print_status()
workflow.stop()