2023-08-30 15:49:42 +02:00
|
|
|
# Copyright 2021 Collate
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
"""
|
|
|
|
Base workflow definition.
|
|
|
|
"""
|
2023-11-13 08:58:38 +01:00
|
|
|
|
2024-07-29 09:20:34 +02:00
|
|
|
import traceback
|
2023-08-30 15:49:42 +02:00
|
|
|
import uuid
|
|
|
|
from abc import ABC, abstractmethod
|
2024-06-07 04:36:17 +02:00
|
|
|
from datetime import datetime
|
2023-11-13 08:58:38 +01:00
|
|
|
from typing import Any, Dict, List, Optional, TypeVar, Union
|
2023-08-30 15:49:42 +02:00
|
|
|
|
2023-10-10 07:04:04 +02:00
|
|
|
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
|
|
|
|
CreateIngestionPipelineRequest,
|
|
|
|
)
|
2023-08-30 15:49:42 +02:00
|
|
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
|
|
|
OpenMetadataConnection,
|
|
|
|
)
|
|
|
|
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
2023-10-10 07:04:04 +02:00
|
|
|
AirflowConfig,
|
|
|
|
IngestionPipeline,
|
2023-08-30 15:49:42 +02:00
|
|
|
PipelineState,
|
|
|
|
)
|
2023-12-22 15:43:50 +01:00
|
|
|
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
|
|
|
|
StackTraceError,
|
|
|
|
)
|
2023-11-13 08:58:38 +01:00
|
|
|
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
|
2023-08-30 15:49:42 +02:00
|
|
|
from metadata.generated.schema.tests.testSuite import ServiceType
|
2023-10-10 07:04:04 +02:00
|
|
|
from metadata.generated.schema.type.entityReference import EntityReference
|
2023-08-30 15:49:42 +02:00
|
|
|
from metadata.ingestion.api.step import Step
|
2023-10-04 09:14:03 +02:00
|
|
|
from metadata.ingestion.ometa.client_utils import create_ometa_client
|
2023-08-30 15:49:42 +02:00
|
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
|
|
from metadata.timer.repeated_timer import RepeatedTimer
|
2023-10-10 07:04:04 +02:00
|
|
|
from metadata.utils import fqn
|
2023-08-30 15:49:42 +02:00
|
|
|
from metadata.utils.class_helper import (
|
2023-10-10 07:04:04 +02:00
|
|
|
get_pipeline_type_from_source_config,
|
|
|
|
get_reference_type_from_service_type,
|
2023-08-30 15:49:42 +02:00
|
|
|
get_service_class_from_service_type,
|
|
|
|
)
|
2024-02-16 09:58:20 +01:00
|
|
|
from metadata.utils.execution_time_tracker import ExecutionTimeTracker
|
2023-10-25 20:48:14 +02:00
|
|
|
from metadata.utils.helpers import datetime_to_ts
|
2023-08-30 15:49:42 +02:00
|
|
|
from metadata.utils.logger import ingestion_logger, set_loggers_level
|
2024-07-29 09:20:34 +02:00
|
|
|
from metadata.workflow.workflow_output_handler import WorkflowOutputHandler
|
2023-12-22 15:43:50 +01:00
|
|
|
from metadata.workflow.workflow_status_mixin import (
|
|
|
|
SUCCESS_THRESHOLD_VALUE,
|
|
|
|
WorkflowStatusMixin,
|
|
|
|
)
|
2023-08-30 15:49:42 +02:00
|
|
|
|
|
|
|
logger = ingestion_logger()
|
|
|
|
|
2023-11-13 08:58:38 +01:00
|
|
|
# Type of service linked to the Ingestion Pipeline
|
2023-08-30 15:49:42 +02:00
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
|
|
REPORTS_INTERVAL_SECONDS = 60
|
|
|
|
|
|
|
|
|
|
|
|
class InvalidWorkflowJSONException(Exception):
|
|
|
|
"""
|
|
|
|
Raised when we cannot properly parse the workflow
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
class BaseWorkflow(ABC, WorkflowStatusMixin):
|
|
|
|
"""
|
|
|
|
Base workflow implementation
|
|
|
|
"""
|
|
|
|
|
2023-11-13 08:58:38 +01:00
|
|
|
config: Union[Any, Dict]
|
2023-08-30 15:49:42 +02:00
|
|
|
_run_id: Optional[str] = None
|
|
|
|
metadata: OpenMetadata
|
2023-11-13 08:58:38 +01:00
|
|
|
metadata_config: OpenMetadataConnection
|
|
|
|
service_type: ServiceType
|
2023-08-30 15:49:42 +02:00
|
|
|
|
2023-11-13 08:58:38 +01:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
config: Union[Any, Dict],
|
|
|
|
log_level: LogLevels,
|
|
|
|
metadata_config: OpenMetadataConnection,
|
|
|
|
service_type: ServiceType,
|
2024-07-29 09:20:34 +02:00
|
|
|
output_handler: WorkflowOutputHandler = WorkflowOutputHandler(),
|
2023-11-13 08:58:38 +01:00
|
|
|
):
|
2023-08-30 15:49:42 +02:00
|
|
|
"""
|
|
|
|
Disabling pylint to wait for workflow reimplementation as a topology
|
|
|
|
"""
|
2024-07-29 09:20:34 +02:00
|
|
|
self.output_handler = output_handler
|
2023-08-30 15:49:42 +02:00
|
|
|
self.config = config
|
2023-11-13 08:58:38 +01:00
|
|
|
self.service_type = service_type
|
2023-08-30 15:49:42 +02:00
|
|
|
self._timer: Optional[RepeatedTimer] = None
|
2023-10-10 07:04:04 +02:00
|
|
|
self._ingestion_pipeline: Optional[IngestionPipeline] = None
|
2024-06-07 04:36:17 +02:00
|
|
|
self._start_ts = datetime_to_ts(datetime.now())
|
2024-02-16 09:58:20 +01:00
|
|
|
self._execution_time_tracker = ExecutionTimeTracker(
|
|
|
|
log_level == LogLevels.DEBUG
|
|
|
|
)
|
2023-08-30 15:49:42 +02:00
|
|
|
|
2023-11-13 08:58:38 +01:00
|
|
|
set_loggers_level(log_level.value)
|
2023-10-04 09:14:03 +02:00
|
|
|
|
|
|
|
# We create the ometa client at the workflow level and pass it to the steps
|
2023-11-13 08:58:38 +01:00
|
|
|
self.metadata_config = metadata_config
|
|
|
|
self.metadata = create_ometa_client(metadata_config)
|
2024-04-30 09:22:04 +02:00
|
|
|
self.set_ingestion_pipeline_status(state=PipelineState.running)
|
2023-08-30 15:49:42 +02:00
|
|
|
|
2023-11-13 08:58:38 +01:00
|
|
|
self.post_init()
|
2023-08-30 15:49:42 +02:00
|
|
|
|
2023-10-10 07:04:04 +02:00
|
|
|
@property
|
2024-07-12 09:44:21 +02:00
|
|
|
def ingestion_pipeline(self) -> Optional[IngestionPipeline]:
|
2023-10-10 07:04:04 +02:00
|
|
|
"""Get or create the Ingestion Pipeline from the configuration"""
|
2024-03-21 09:45:28 +01:00
|
|
|
if not self._ingestion_pipeline and self.config.ingestionPipelineFQN:
|
2023-10-10 07:04:04 +02:00
|
|
|
self._ingestion_pipeline = self.get_or_create_ingestion_pipeline()
|
|
|
|
|
|
|
|
return self._ingestion_pipeline
|
|
|
|
|
2023-08-30 15:49:42 +02:00
|
|
|
def stop(self) -> None:
|
|
|
|
"""
|
|
|
|
Main stopping logic
|
|
|
|
"""
|
2023-10-04 09:14:03 +02:00
|
|
|
# Stop the timer first. This runs in a separate thread and if not properly closed
|
|
|
|
# it can hung the workflow
|
2023-08-30 15:49:42 +02:00
|
|
|
self.timer.stop()
|
2023-10-04 09:14:03 +02:00
|
|
|
self.metadata.close()
|
|
|
|
|
2023-11-13 08:58:38 +01:00
|
|
|
for step in self.workflow_steps():
|
2023-10-04 09:14:03 +02:00
|
|
|
try:
|
|
|
|
step.close()
|
|
|
|
except Exception as exc:
|
|
|
|
logger.warning(f"Error trying to close the step {step} due to [{exc}]")
|
2023-08-30 15:49:42 +02:00
|
|
|
|
|
|
|
@property
|
|
|
|
def timer(self) -> RepeatedTimer:
|
|
|
|
"""
|
|
|
|
Status timer: It will print the source & sink status every `interval` seconds.
|
|
|
|
"""
|
|
|
|
if not self._timer:
|
2023-11-13 08:58:38 +01:00
|
|
|
self._timer = RepeatedTimer(
|
2024-07-29 09:20:34 +02:00
|
|
|
REPORTS_INTERVAL_SECONDS, self._report_ingestion_status
|
2023-08-30 15:49:42 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
return self._timer
|
|
|
|
|
|
|
|
@classmethod
|
2023-11-13 08:58:38 +01:00
|
|
|
@abstractmethod
|
|
|
|
def create(cls, config_dict: dict):
|
|
|
|
"""Single function to execute to create a Workflow instance"""
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def post_init(self) -> None:
|
|
|
|
"""Method to execute after we have initialized all the internals"""
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def execute_internal(self) -> None:
|
|
|
|
"""Workflow-specific logic to execute safely"""
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def calculate_success(self) -> float:
|
|
|
|
"""Get the success % of the internal execution"""
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def get_failures(self) -> List[StackTraceError]:
|
|
|
|
"""Get the failures to flag whether if the workflow succeeded or not"""
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def workflow_steps(self) -> List[Step]:
|
|
|
|
"""Steps to report status from"""
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def raise_from_status_internal(self, raise_warnings=False) -> None:
|
|
|
|
"""Based on the internal workflow status, raise a WorkflowExecutionError"""
|
|
|
|
|
|
|
|
def execute(self) -> None:
|
|
|
|
"""
|
2023-12-22 15:43:50 +01:00
|
|
|
Main entrypoint:
|
|
|
|
1. Start logging timer. It will be closed at `stop`
|
|
|
|
2. Execute the workflow
|
|
|
|
3. Validate the pipeline status
|
|
|
|
4. Update the pipeline status at the end
|
2023-11-13 08:58:38 +01:00
|
|
|
"""
|
2023-12-22 15:43:50 +01:00
|
|
|
pipeline_state = PipelineState.success
|
2023-11-13 08:58:38 +01:00
|
|
|
self.timer.trigger()
|
|
|
|
try:
|
|
|
|
self.execute_internal()
|
|
|
|
|
2023-12-22 15:43:50 +01:00
|
|
|
if SUCCESS_THRESHOLD_VALUE <= self.calculate_success() < 100:
|
|
|
|
pipeline_state = PipelineState.partialSuccess
|
2023-11-13 08:58:38 +01:00
|
|
|
|
|
|
|
# Any unhandled exception breaking the workflow should update the status
|
|
|
|
except Exception as err:
|
2023-12-22 15:43:50 +01:00
|
|
|
pipeline_state = PipelineState.failed
|
2023-11-13 08:58:38 +01:00
|
|
|
raise err
|
|
|
|
|
|
|
|
# Force resource closing. Required for killing the threading
|
|
|
|
finally:
|
2023-12-22 15:43:50 +01:00
|
|
|
ingestion_status = self.build_ingestion_status()
|
|
|
|
self.set_ingestion_pipeline_status(pipeline_state, ingestion_status)
|
2023-11-13 08:58:38 +01:00
|
|
|
self.stop()
|
2023-08-30 15:49:42 +02:00
|
|
|
|
|
|
|
@property
|
|
|
|
def run_id(self) -> str:
|
|
|
|
"""
|
|
|
|
If the config does not have an informed run id, we'll
|
|
|
|
generate and assign one here.
|
|
|
|
"""
|
|
|
|
if not self._run_id:
|
|
|
|
if self.config.pipelineRunId:
|
2024-06-05 21:18:37 +02:00
|
|
|
self._run_id = str(self.config.pipelineRunId.root)
|
2023-08-30 15:49:42 +02:00
|
|
|
else:
|
|
|
|
self._run_id = str(uuid.uuid4())
|
|
|
|
|
|
|
|
return self._run_id
|
|
|
|
|
2023-10-10 07:04:04 +02:00
|
|
|
def get_or_create_ingestion_pipeline(self) -> Optional[IngestionPipeline]:
|
|
|
|
"""
|
|
|
|
If we get the `ingestionPipelineFqn` from the `workflowConfig`, it means we want to
|
|
|
|
keep track of the status.
|
|
|
|
- During the UI deployment, the IngestionPipeline is already created from the UI.
|
|
|
|
- From external deployments, we might need to create the Ingestion Pipeline the first time
|
|
|
|
the YAML is executed.
|
|
|
|
If the Ingestion Pipeline is not created, create it now to update the status.
|
|
|
|
|
|
|
|
Note that during the very first run, the service might not even be created yet. In that case,
|
|
|
|
we won't be able to flag the RUNNING status. We'll wait until the metadata ingestion
|
|
|
|
workflow has prepared the necessary components, and we will update the SUCCESS/FAILED
|
|
|
|
status at the end of the flow.
|
|
|
|
"""
|
2023-10-18 11:49:38 +02:00
|
|
|
try:
|
|
|
|
maybe_pipeline: Optional[IngestionPipeline] = self.metadata.get_by_name(
|
|
|
|
entity=IngestionPipeline, fqn=self.config.ingestionPipelineFQN
|
|
|
|
)
|
2023-10-10 07:04:04 +02:00
|
|
|
|
2023-10-18 11:49:38 +02:00
|
|
|
if maybe_pipeline:
|
|
|
|
return maybe_pipeline
|
|
|
|
|
|
|
|
# Get the name from <service>.<name> or, for test suites, <tableFQN>.testSuite
|
|
|
|
*_, pipeline_name = fqn.split(self.config.ingestionPipelineFQN)
|
|
|
|
|
|
|
|
service = self._get_ingestion_pipeline_service()
|
|
|
|
|
|
|
|
if service is not None:
|
|
|
|
return self.metadata.create_or_update(
|
|
|
|
CreateIngestionPipelineRequest(
|
|
|
|
name=pipeline_name,
|
|
|
|
service=EntityReference(
|
|
|
|
id=service.id,
|
|
|
|
type=get_reference_type_from_service_type(
|
|
|
|
self.service_type
|
|
|
|
),
|
|
|
|
),
|
|
|
|
pipelineType=get_pipeline_type_from_source_config(
|
2024-07-29 09:20:34 +02:00
|
|
|
self.config.source.sourceConfig
|
2023-10-18 11:49:38 +02:00
|
|
|
),
|
|
|
|
sourceConfig=self.config.source.sourceConfig,
|
|
|
|
airflowConfig=AirflowConfig(),
|
|
|
|
)
|
|
|
|
)
|
2023-10-10 07:04:04 +02:00
|
|
|
|
2023-10-18 11:49:38 +02:00
|
|
|
return maybe_pipeline
|
2023-10-10 07:04:04 +02:00
|
|
|
|
2023-10-18 11:49:38 +02:00
|
|
|
except Exception as exc:
|
|
|
|
logger.error(
|
|
|
|
f"Error trying to get or create the Ingestion Pipeline due to [{exc}]"
|
2023-10-10 07:04:04 +02:00
|
|
|
)
|
2023-10-18 11:49:38 +02:00
|
|
|
return None
|
2023-10-10 07:04:04 +02:00
|
|
|
|
2023-10-18 11:49:38 +02:00
|
|
|
def _get_ingestion_pipeline_service(self) -> Optional[T]:
|
|
|
|
"""
|
|
|
|
Ingestion Pipelines are linked to either an EntityService (DatabaseService, MessagingService,...)
|
|
|
|
or a Test Suite.
|
|
|
|
|
|
|
|
Depending on the Source Config Type, we'll need to GET one or the other to create
|
|
|
|
the Ingestion Pipeline
|
|
|
|
"""
|
|
|
|
|
|
|
|
return self.metadata.get_by_name(
|
|
|
|
entity=get_service_class_from_service_type(self.service_type),
|
|
|
|
fqn=self.config.source.serviceName,
|
|
|
|
)
|
2024-07-29 09:20:34 +02:00
|
|
|
|
|
|
|
def _report_ingestion_status(self):
|
|
|
|
"""
|
|
|
|
Given a logger, use it to INFO the workflow status
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
for step in self.workflow_steps():
|
|
|
|
logger.info(
|
|
|
|
f"{step.name}: Processed {len(step.status.records)} records,"
|
2024-09-16 07:05:30 +02:00
|
|
|
f" updated {len(step.status.updated_records)} records,"
|
2024-07-29 09:20:34 +02:00
|
|
|
f" filtered {len(step.status.filtered)} records,"
|
|
|
|
f" found {len(step.status.failures)} errors"
|
|
|
|
)
|
|
|
|
|
|
|
|
except Exception as exc:
|
|
|
|
logger.debug(traceback.format_exc())
|
|
|
|
logger.error(f"Wild exception reporting status - {exc}")
|
|
|
|
|
|
|
|
def _is_debug_enabled(self) -> bool:
|
|
|
|
return (
|
|
|
|
hasattr(self, "config")
|
|
|
|
and hasattr(self.config, "workflowConfig")
|
|
|
|
and hasattr(self.config.workflowConfig, "loggerLevel")
|
|
|
|
and self.config.workflowConfig.loggerLevel is LogLevels.DEBUG
|
|
|
|
)
|
|
|
|
|
|
|
|
def print_status(self):
|
|
|
|
start_time = self.workflow_steps()[0].get_status().source_start_time
|
|
|
|
|
|
|
|
self.output_handler.print_status(
|
|
|
|
self.result_status(),
|
|
|
|
self.workflow_steps(),
|
|
|
|
start_time,
|
|
|
|
self._is_debug_enabled(),
|
|
|
|
)
|