mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-24 22:18:41 +00:00
* DQ BaseWorkflow * Test suite runner * test Suite workflow * Refactor DQ for BaseWorkflow * Lint * Fix source * Fix source * Fix source * Fix source * Fix test * Prepare docs * Clean sink * Clean legacy classes * typo * ProcessorStatus
This commit is contained in:
parent
2df27c4662
commit
f5e10c4a5f
@ -21,8 +21,8 @@ from metadata.utils.logger import cli_logger
|
||||
from metadata.workflow.data_insight import DataInsightWorkflow
|
||||
from metadata.workflow.workflow_output_handler import (
|
||||
WorkflowType,
|
||||
print_data_insight_status,
|
||||
print_init_error,
|
||||
print_status,
|
||||
)
|
||||
|
||||
logger = cli_logger()
|
||||
@ -48,5 +48,5 @@ def run_insight(config_path: str) -> None:
|
||||
|
||||
workflow.execute()
|
||||
workflow.stop()
|
||||
print_data_insight_status(workflow)
|
||||
print_status(workflow)
|
||||
workflow.raise_from_status()
|
||||
|
||||
@ -20,7 +20,6 @@ from datetime import datetime, timezone
|
||||
from typing import Callable, Iterable, Optional
|
||||
|
||||
from metadata.generated.schema.analytics.reportData import ReportData
|
||||
from metadata.ingestion.api.processor import ProcessorStatus
|
||||
from metadata.ingestion.api.status import Status
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
|
||||
@ -68,5 +67,5 @@ class DataProcessor(abc.ABC):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_status(self) -> ProcessorStatus:
|
||||
def get_status(self) -> Status:
|
||||
raise NotImplementedError
|
||||
|
||||
@ -41,7 +41,6 @@ from metadata.generated.schema.entity.data import (
|
||||
topic,
|
||||
)
|
||||
from metadata.generated.schema.entity.teams.user import User
|
||||
from metadata.ingestion.api.processor import ProcessorStatus
|
||||
from metadata.ingestion.api.status import Status
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.utils.helpers import get_entity_tier_from_tags
|
||||
@ -370,5 +369,5 @@ class WebAnalyticUserActivityReportDataProcessor(DataProcessor):
|
||||
"""Refine data"""
|
||||
self._refined_data = self.refine_user_event.send(entity)
|
||||
|
||||
def get_status(self) -> ProcessorStatus:
|
||||
def get_status(self) -> Status:
|
||||
return self.processor_status
|
||||
|
||||
@ -1,86 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Abstract Processor definition to build a Workflow
|
||||
"""
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Generic, List, Optional
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.ingestion.api.closeable import Closeable
|
||||
from metadata.ingestion.api.common import Entity
|
||||
from metadata.ingestion.api.models import StackTraceError
|
||||
from metadata.ingestion.api.status import Status
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
|
||||
class ProcessorStatus(Status):
|
||||
records: List[str] = Field(default_factory=list)
|
||||
|
||||
def processed(self, record: Any):
|
||||
self.records.append(record)
|
||||
|
||||
# disabling pylint until we remove this
|
||||
def warning(self, info: Any) -> None: # pylint: disable=W0221
|
||||
self.warnings.append(info)
|
||||
|
||||
|
||||
class ProfilerProcessorStatus(Status):
|
||||
entity: Optional[str] = None
|
||||
|
||||
def scanned(self, record: Any) -> None:
|
||||
self.records.append(record)
|
||||
|
||||
def failed_profiler(self, error: str, stack_trace: Optional[str] = None) -> None:
|
||||
self.failed(
|
||||
StackTraceError(
|
||||
name=self.entity if self.entity else "",
|
||||
error=error,
|
||||
stack_trace=stack_trace,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Processor(Closeable, Generic[Entity], metaclass=ABCMeta):
|
||||
"""
|
||||
Processor class
|
||||
"""
|
||||
|
||||
status: ProcessorStatus
|
||||
|
||||
def __init__(self):
|
||||
self.status = ProcessorStatus()
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(
|
||||
cls, config_dict: dict, metadata_config: OpenMetadataConnection, **kwargs
|
||||
) -> "Processor":
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def process(self, *args, **kwargs) -> Entity:
|
||||
pass
|
||||
|
||||
def get_status(self) -> ProcessorStatus:
|
||||
return self.status
|
||||
|
||||
@abstractmethod
|
||||
def close(self) -> None:
|
||||
pass
|
||||
@ -1,65 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Abstract Sink definition to build a Workflow
|
||||
"""
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Generic, List
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.ingestion.api.closeable import Closeable
|
||||
from metadata.ingestion.api.common import Entity
|
||||
from metadata.ingestion.api.status import Status
|
||||
|
||||
|
||||
class SinkStatus(Status):
|
||||
records: List[str] = Field(default_factory=list)
|
||||
|
||||
def records_written(self, record: str) -> None:
|
||||
self.records.append(record)
|
||||
|
||||
# Disable pylint until this is removed
|
||||
def warning(self, info: Any) -> None: # pylint: disable=W0221
|
||||
self.warnings.append(info)
|
||||
|
||||
|
||||
@dataclass # type: ignore[misc]
|
||||
class Sink(Closeable, Generic[Entity], metaclass=ABCMeta):
|
||||
"""All Sinks must inherit this base class."""
|
||||
|
||||
status: SinkStatus
|
||||
|
||||
def __init__(self):
|
||||
self.status = SinkStatus()
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(
|
||||
cls, config_dict: dict, metadata_config: OpenMetadataConnection
|
||||
) -> "Sink":
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def write_record(self, record: Entity) -> None:
|
||||
# must call callback when done.
|
||||
pass
|
||||
|
||||
def get_status(self) -> SinkStatus:
|
||||
return self.status
|
||||
|
||||
@abstractmethod
|
||||
def close(self) -> None:
|
||||
pass
|
||||
@ -1,52 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Abstract Stage definition to build a Workflow
|
||||
"""
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Generic
|
||||
|
||||
from metadata.ingestion.api.closeable import Closeable
|
||||
from metadata.ingestion.api.common import Entity
|
||||
from metadata.ingestion.api.status import Status
|
||||
|
||||
|
||||
class StageStatus(Status):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass # type: ignore[misc]
|
||||
class Stage(Closeable, Generic[Entity], metaclass=ABCMeta):
|
||||
"""
|
||||
Stage class
|
||||
"""
|
||||
|
||||
status: StageStatus
|
||||
|
||||
def __init__(self):
|
||||
self.status = StageStatus()
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(cls, config_dict: dict, metadata_config: dict) -> "Stage":
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stage_record(self, record: Entity):
|
||||
pass
|
||||
|
||||
def get_status(self) -> StageStatus:
|
||||
return self.status
|
||||
|
||||
@abstractmethod
|
||||
def close(self) -> None:
|
||||
pass
|
||||
@ -15,7 +15,7 @@ supporting sqlalchemy abstraction layer
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict, List, Optional, Union
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from sqlalchemy import Column
|
||||
from typing_extensions import Self
|
||||
@ -32,7 +32,8 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseCo
|
||||
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
|
||||
DatabaseServiceProfilerPipeline,
|
||||
)
|
||||
from metadata.ingestion.api.processor import ProfilerProcessorStatus
|
||||
from metadata.ingestion.api.models import StackTraceError
|
||||
from metadata.ingestion.api.status import Status
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.connections import get_connection
|
||||
from metadata.profiler.api.models import ProfileSampleConfig, TableConfig
|
||||
@ -42,6 +43,24 @@ from metadata.profiler.processor.runner import QueryRunner
|
||||
from metadata.utils.partition import get_partition_details
|
||||
|
||||
|
||||
class ProfilerProcessorStatus(Status):
|
||||
"""Keep track of the entity being processed"""
|
||||
|
||||
entity: Optional[str] = None
|
||||
|
||||
def scanned(self, record: Any) -> None:
|
||||
self.records.append(record)
|
||||
|
||||
def failed_profiler(self, error: str, stack_trace: Optional[str] = None) -> None:
|
||||
self.failed(
|
||||
StackTraceError(
|
||||
name=self.entity if self.entity else "",
|
||||
error=error,
|
||||
stack_trace=stack_trace,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class ProfilerInterface(ABC):
|
||||
"""Protocol interface for the profiler processor"""
|
||||
|
||||
|
||||
@ -23,10 +23,8 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
|
||||
)
|
||||
from metadata.generated.schema.entity.services.serviceType import ServiceType
|
||||
from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink
|
||||
from metadata.ingestion.api.processor import Processor
|
||||
from metadata.ingestion.api.stage import Stage
|
||||
from metadata.ingestion.api.step import Step
|
||||
from metadata.ingestion.api.steps import BulkSink, Sink, Source
|
||||
from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage
|
||||
from metadata.utils.class_helper import get_service_type_from_source_type
|
||||
from metadata.utils.logger import utils_logger
|
||||
|
||||
|
||||
135
ingestion/src/metadata/workflow/README.md
Normal file
135
ingestion/src/metadata/workflow/README.md
Normal file
@ -0,0 +1,135 @@
|
||||
# Base Workflow
|
||||
|
||||
The goal of the `BaseWorkflow` is to define a unique class that that controls the logic flow of executions. This means:
|
||||
- Having a consensus on how our executions are organized (steps)
|
||||
- Centralizing in a single place all the exception management. We don't want individual - and wrongly uncaught - exceptions
|
||||
to blow up the full executions.
|
||||
- Centralizing the `Status` handling: how do we report processed assets & failures and send them back to the `IngestionPipeline`.
|
||||
|
||||
## Steps
|
||||
|
||||
Each `Workflow` can be built by using `Steps` as lego pieces. Each of these pieces - steps - are a generic abstraction
|
||||
on which operations we can expect to happen inside. Currently, the `BaseWorkflow` accepts any number of sequential `Steps`,
|
||||
each of them taking care of a specific part of the business logic.
|
||||
|
||||

|
||||
|
||||
We mainly have four types of steps, iterative steps and return steps:
|
||||
|
||||
1. `IterStep`s are in charge of starting the workflow. They will read data from the external world and `yield` the elements
|
||||
that need to be processed down the pipeline.
|
||||
2. `ReturnStep`s accept one input, which they will further process and return one output.
|
||||
3. `StageStep`s accept one input, and they will write - stage - them somewhere. They are expected to be used together with the `BulkStep`.
|
||||
4. `BulkStep`s iterate over an input - produced by the `StageStep` - and will return nothing.
|
||||
|
||||
These names might be explanatory, but harder to imagine/read. Therefore, we have specific classes based on these steps
|
||||
that help us discuss better on the Workflow structure:
|
||||
|
||||
1. `IterStep` -> `Source`
|
||||
2. `ReturnStep` -> `Processor` & `Sink`
|
||||
3. `StageStep` -> `Step`
|
||||
4. `BulkStep` -> `BulkSink`
|
||||
|
||||
When developing each of this steps, we'll just need to implement their execution method (either `_iter` or `_run`), where in
|
||||
the `IterStep` the method is expected to `yield` results, and the rest to `return`.
|
||||
|
||||
We'll explain specific examples of these `Step`s in the next section.
|
||||
|
||||
## Workflows
|
||||
|
||||
Now that we have our pieces, we can define the `Workflow` structures. While the `Steps` could be joined together
|
||||
somewhat arbitrarily, there are specific recipes that we follow depending on our goals.
|
||||
|
||||
Each `Workflow` then can be build by defining its steps (starting with a `Source`, adding `Processor`s, etc.) and
|
||||
registering the steps in the `BaseWorkflow.set_steps` method.
|
||||
|
||||
The `BaseWorkflow` will take care of common logic, such as initializing the `metadata` object, the `timer` logger and
|
||||
sending the status to the `IngestionPipeline` when needed.
|
||||
|
||||
A couple of examples:
|
||||
|
||||
### Metadata Ingestion
|
||||
|
||||
Here we have two steps:
|
||||
- `Source`: that will list the metadata of the origin (Dashboards, Tables, Pipelines,...), and translate them to the OpenMetadata
|
||||
standard.
|
||||
- `REST Sink`: that will pick up the Create Requests of the above entities and send them to the OpenMetadata server.
|
||||
|
||||
What does the workflow do here? Group together the steps and streamline the execution. The workflow itself is the one
|
||||
that will know how to get each of the elements produced on the `Source` and pass them to the `Sink`.
|
||||
|
||||
### Profiler Ingestion
|
||||
|
||||
In this case we have 4 steps:
|
||||
- `Source`: that will pick up the tables from the OpenMetadata API that need to be profiled.
|
||||
- `Profiler Processor`: to execute the metrics and gather the results for each table.
|
||||
- `PII Processor`: that will get the result of the profiler, and add any classification that needs to be applied to the tables using NLP models.
|
||||
- `REST Sink`: to send the results to the OpenMetadata API.
|
||||
|
||||
Here again, the `Workflow` class will move the elements from `Source` -> `Profiler Processor` -> `PII processor` -> `REST Sink`.
|
||||
|
||||
## Status & Exceptions
|
||||
|
||||
While the `Workflow` controls the execution flow, the most important part is in terms of status handling & exception management.
|
||||
|
||||
### Status
|
||||
|
||||
Each `Step` has its own `Status`, storing what has been processed and what has failed. The overall `Workflow` Status is based
|
||||
on the statuses of the individual steps.
|
||||
|
||||
### Exceptions
|
||||
|
||||
To ensure that all the exception are caught, each `Step` executes its `run` methods of inside a `try/catch` block. It will
|
||||
only blow things up if we encounter a `WorkflowFatalError`. Any other exception will just be logged.
|
||||
|
||||
However, how do we want to handle exceptions that can happen in every different component? By treating exceptions as data.
|
||||
|
||||
Each `Step` will `yield` or `return` an `Either` object, meaning that processing a single element can either be `right` -
|
||||
and contain the expected results - or `left` - and contain the raised exception.
|
||||
|
||||
This consensus helps us ensure that we are keeping notes of the logged exceptions in the `Status` of each `Step`, so that
|
||||
all the errors can properly be logged at the end of the execution.
|
||||
|
||||
For example, this is the `run` method of the `IterStep`:
|
||||
|
||||
```python
|
||||
def run(self) -> Iterable[Optional[Entity]]:
|
||||
"""
|
||||
Run the step and handle the status and exceptions
|
||||
|
||||
Note that we are overwriting the default run implementation
|
||||
in order to create a generator with `yield`.
|
||||
"""
|
||||
try:
|
||||
for result in self._iter():
|
||||
if result.left is not None:
|
||||
self.status.failed(result.left)
|
||||
yield None
|
||||
|
||||
if result.right is not None:
|
||||
self.status.scanned(result.right)
|
||||
yield result.right
|
||||
except WorkflowFatalError as err:
|
||||
logger.error(f"Fatal error running step [{self}]: [{err}]")
|
||||
raise err
|
||||
except Exception as exc:
|
||||
error = f"Encountered exception running step [{self}]: [{exc}]"
|
||||
logger.warning(error)
|
||||
self.status.failed(
|
||||
StackTraceError(
|
||||
name="Unhandled", error=error, stack_trace=traceback.format_exc()
|
||||
)
|
||||
)
|
||||
```
|
||||
|
||||
By tracking `Unhandled` exceptions, we then know which pieces of the code need to be treated more carefully to control
|
||||
scenarios that we might not be aware of.
|
||||
|
||||
Then each `Step` control its own `Status` and exceptions (wrapped in the `Either`), and just push down the workflow
|
||||
the actual `right` response.
|
||||
|
||||
> OBS: We can think of this `Workflow` execution as a `flatMap` implementation.
|
||||
|
||||

|
||||
|
||||
Note how in theory, we can keep building the steps together.
|
||||
@ -18,7 +18,7 @@ import traceback
|
||||
from enum import Enum
|
||||
from logging import Logger
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Type, Union
|
||||
from typing import Dict, List, Type, Union
|
||||
|
||||
from pydantic import BaseModel
|
||||
from tabulate import tabulate
|
||||
@ -230,55 +230,6 @@ def print_status(workflow: "BaseWorkflow") -> None:
|
||||
)
|
||||
|
||||
|
||||
def print_test_suite_status(workflow) -> None:
|
||||
"""
|
||||
Print the test suite workflow results
|
||||
"""
|
||||
print_workflow_summary_legacy(
|
||||
workflow, processor=True, processor_status=workflow.status
|
||||
)
|
||||
|
||||
if workflow.result_status() == 1:
|
||||
log_ansi_encoded_string(
|
||||
color=ANSI.BRIGHT_RED, bold=True, message=WORKFLOW_FAILURE_MESSAGE
|
||||
)
|
||||
else:
|
||||
log_ansi_encoded_string(
|
||||
color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE
|
||||
)
|
||||
|
||||
|
||||
def print_data_insight_status(workflow) -> None:
|
||||
"""
|
||||
Print the test suite workflow results
|
||||
Args:
|
||||
workflow (DataInsightWorkflow): workflow object
|
||||
"""
|
||||
# TODO: fixme
|
||||
print_workflow_summary_legacy(
|
||||
workflow,
|
||||
processor=True,
|
||||
processor_status=workflow.source.get_status(),
|
||||
)
|
||||
|
||||
if workflow.source.get_status().source_start_time:
|
||||
log_ansi_encoded_string(
|
||||
message=f"Workflow finished in time {pretty_print_time_duration(time.time()-workflow.source.get_status().source_start_time)} ", # pylint: disable=line-too-long
|
||||
)
|
||||
|
||||
if workflow.result_status() == 1:
|
||||
log_ansi_encoded_string(message=WORKFLOW_FAILURE_MESSAGE)
|
||||
elif workflow.source.get_status().warnings or (
|
||||
hasattr(workflow, "sink") and workflow.sink.get_status().warnings
|
||||
):
|
||||
log_ansi_encoded_string(message=WORKFLOW_WARNING_MESSAGE)
|
||||
else:
|
||||
log_ansi_encoded_string(message=WORKFLOW_SUCCESS_MESSAGE)
|
||||
log_ansi_encoded_string(
|
||||
color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE
|
||||
)
|
||||
|
||||
|
||||
def is_debug_enabled(workflow) -> bool:
|
||||
return (
|
||||
hasattr(workflow, "config")
|
||||
@ -357,118 +308,6 @@ def print_workflow_status_debug(workflow: "BaseWorkflow") -> None:
|
||||
log_ansi_encoded_string(message=step.get_status().as_string())
|
||||
|
||||
|
||||
def get_source_status(workflow, source_status: Status) -> Optional[Status]:
|
||||
if hasattr(workflow, "source"):
|
||||
return source_status if source_status else workflow.source.get_status()
|
||||
return source_status
|
||||
|
||||
|
||||
def get_processor_status(workflow, processor_status: Status) -> Optional[Status]:
|
||||
if hasattr(workflow, "processor"):
|
||||
return processor_status if processor_status else workflow.processor.get_status()
|
||||
return processor_status
|
||||
|
||||
|
||||
def print_workflow_summary_legacy(
|
||||
workflow,
|
||||
source: bool = False,
|
||||
stage: bool = False,
|
||||
bulk_sink: bool = False,
|
||||
processor: bool = False,
|
||||
source_status: Status = None,
|
||||
processor_status: Status = None,
|
||||
):
|
||||
"""
|
||||
To be removed. All workflows should use the new `print_workflow_summary`
|
||||
after making the transition to the BaseWorkflow steps with common Status.
|
||||
"""
|
||||
source_status = get_source_status(workflow, source_status)
|
||||
processor_status = get_processor_status(workflow, processor_status)
|
||||
if is_debug_enabled(workflow):
|
||||
print_workflow_status_debug_legacy(
|
||||
workflow,
|
||||
bulk_sink,
|
||||
stage,
|
||||
source_status,
|
||||
processor_status,
|
||||
)
|
||||
summary = Summary()
|
||||
failures = []
|
||||
if source_status and source:
|
||||
summary += get_summary(source_status)
|
||||
failures.append(Failure(name="Source", failures=source_status.failures))
|
||||
if hasattr(workflow, "stage") and stage:
|
||||
summary += get_summary(workflow.stage.get_status())
|
||||
failures.append(
|
||||
Failure(name="Stage", failures=workflow.stage.get_status().failures)
|
||||
)
|
||||
if hasattr(workflow, "sink"):
|
||||
summary += get_summary(workflow.sink.get_status())
|
||||
failures.append(
|
||||
Failure(name="Sink", failures=workflow.sink.get_status().failures)
|
||||
)
|
||||
if hasattr(workflow, "bulk_sink") and bulk_sink:
|
||||
summary += get_summary(workflow.bulk_sink.get_status())
|
||||
failures.append(
|
||||
Failure(name="Bulk Sink", failures=workflow.bulk_sink.get_status().failures)
|
||||
)
|
||||
if processor_status and processor:
|
||||
summary += get_summary(processor_status)
|
||||
failures.append(Failure(name="Processor", failures=processor_status.failures))
|
||||
|
||||
print_failures_if_apply(failures)
|
||||
|
||||
log_ansi_encoded_string(bold=True, message="Workflow Summary:")
|
||||
log_ansi_encoded_string(message=f"Total processed records: {summary.records}")
|
||||
log_ansi_encoded_string(message=f"Total warnings: {summary.warnings}")
|
||||
log_ansi_encoded_string(message=f"Total filtered: {summary.filtered}")
|
||||
log_ansi_encoded_string(message=f"Total errors: {summary.errors}")
|
||||
|
||||
total_success = max(summary.records, 1)
|
||||
log_ansi_encoded_string(
|
||||
color=ANSI.BRIGHT_CYAN,
|
||||
bold=True,
|
||||
message=f"Success %: "
|
||||
f"{round(total_success * 100 / (total_success + summary.errors), 2)}",
|
||||
)
|
||||
|
||||
|
||||
def print_workflow_status_debug_legacy(
|
||||
workflow,
|
||||
bulk_sink: bool = False,
|
||||
stage: bool = False,
|
||||
source_status: Status = None,
|
||||
processor_status: Status = None,
|
||||
) -> None:
|
||||
"""
|
||||
Args:
|
||||
workflow: the workflow status to be printed
|
||||
bulk_sink: if bull_sink status must be printed
|
||||
stage: if stage status must be printed
|
||||
source_status: source status to be printed
|
||||
processor_status: processor status to be printed
|
||||
|
||||
Returns:
|
||||
Print Workflow status when the workflow logger level is DEBUG
|
||||
"""
|
||||
log_ansi_encoded_string(bold=True, message="Statuses detailed info:")
|
||||
if source_status:
|
||||
log_ansi_encoded_string(bold=True, message="Source Status:")
|
||||
log_ansi_encoded_string(message=source_status.as_string())
|
||||
if hasattr(workflow, "stage") and stage:
|
||||
log_ansi_encoded_string(bold=True, message="Stage Status:")
|
||||
log_ansi_encoded_string(message=workflow.stage.get_status().as_string())
|
||||
if hasattr(workflow, "sink"):
|
||||
log_ansi_encoded_string(bold=True, message="Sink Status:")
|
||||
log_ansi_encoded_string(message=workflow.sink.get_status().as_string())
|
||||
if hasattr(workflow, "bulk_sink") and bulk_sink:
|
||||
log_ansi_encoded_string(bold=True, message="Bulk Sink Status:")
|
||||
log_ansi_encoded_string(message=workflow.bulk_sink.get_status().as_string())
|
||||
if processor_status:
|
||||
log_ansi_encoded_string(bold=True, message="Processor Status:")
|
||||
log_ansi_encoded_string(message=processor_status.as_string())
|
||||
|
||||
|
||||
def get_summary(status: Status) -> Summary:
|
||||
records = len(status.records)
|
||||
warnings = len(status.warnings)
|
||||
|
||||
@ -18,7 +18,6 @@ from unittest import TestCase
|
||||
|
||||
import pytest
|
||||
|
||||
from metadata.ingestion.api.sink import SinkStatus
|
||||
from metadata.ingestion.api.status import Status
|
||||
|
||||
from .e2e_types import E2EType
|
||||
@ -95,17 +94,17 @@ class CliDashboardBase(TestCase):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_not_including(self, source_status: Status, sink_status: SinkStatus):
|
||||
def assert_not_including(self, source_status: Status, sink_status: Status):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_for_vanilla_ingestion(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_filtered_mix(self, source_status: Status, sink_status: SinkStatus):
|
||||
def assert_filtered_mix(self, source_status: Status, sink_status: Status):
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
|
||||
@ -19,7 +19,6 @@ from unittest import TestCase
|
||||
import pytest
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.ingestion.api.sink import SinkStatus
|
||||
from metadata.ingestion.api.status import Status
|
||||
|
||||
from .e2e_types import E2EType
|
||||
@ -243,54 +242,54 @@ class CliDBBase(TestCase):
|
||||
|
||||
@abstractmethod
|
||||
def assert_for_vanilla_ingestion(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_for_table_with_profiler(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_for_table_with_profiler_time_partition(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_for_delete_table_is_marked_as_deleted(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_filtered_schemas_includes(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_filtered_schemas_excludes(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_filtered_tables_includes(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_filtered_tables_excludes(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_filtered_mix(self, source_status: Status, sink_status: SinkStatus):
|
||||
def assert_filtered_mix(self, source_status: Status, sink_status: Status):
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
|
||||
@ -20,7 +20,6 @@ import pytest
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.tests.testDefinition import TestDefinition, TestPlatform
|
||||
from metadata.ingestion.api.sink import SinkStatus
|
||||
from metadata.ingestion.api.status import Status
|
||||
|
||||
from .test_cli import CliBase
|
||||
@ -107,12 +106,12 @@ class CliDBTBase(TestCase):
|
||||
|
||||
@abstractmethod
|
||||
def assert_for_vanilla_ingestion(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def assert_for_dbt_ingestion(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
@ -16,7 +16,6 @@ from typing import List
|
||||
|
||||
import pytest
|
||||
|
||||
from metadata.ingestion.api.sink import SinkStatus
|
||||
from metadata.ingestion.api.status import Status
|
||||
|
||||
from .base.e2e_types import E2EType
|
||||
@ -79,7 +78,7 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
|
||||
return "snowflake"
|
||||
|
||||
def assert_for_vanilla_ingestion(
|
||||
self, source_status: Status, sink_status: SinkStatus
|
||||
self, source_status: Status, sink_status: Status
|
||||
) -> None:
|
||||
self.assertTrue(len(source_status.failures) == 0)
|
||||
self.assertTrue(len(source_status.warnings) == 0)
|
||||
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 5.0 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 26 KiB |
Loading…
x
Reference in New Issue
Block a user