Fix status handling for new services (#13707)

This commit is contained in:
Pere Miquel Brull 2023-10-25 20:48:14 +02:00 committed by GitHub
parent b89f5eaad0
commit c6297b9cdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 9 deletions

View File

@ -22,6 +22,7 @@ To be extended by any other workflow:
import traceback
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional, Tuple, TypeVar, cast
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
@ -57,6 +58,7 @@ from metadata.utils.class_helper import (
get_service_class_from_service_type,
get_service_type_from_source_type,
)
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger, set_loggers_level
from metadata.workflow.workflow_output_handler import get_ingestion_status_timer
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
@ -97,6 +99,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
self.config = config
self._timer: Optional[RepeatedTimer] = None
self._ingestion_pipeline: Optional[IngestionPipeline] = None
self._start_ts = datetime_to_ts(datetime.now())
set_loggers_level(config.workflowConfig.loggerLevel.value)

View File

@ -17,6 +17,7 @@ from typing import Optional, Tuple
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
PipelineState,
PipelineStatus,
)
@ -26,7 +27,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import Source
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.helpers import datetime_to_ts
SUCCESS_THRESHOLD_VALUE = 90
@ -42,6 +42,8 @@ class WorkflowStatusMixin:
config: OpenMetadataWorkflowConfig
_run_id: Optional[str] = None
metadata: OpenMetadata
_start_ts: int
ingestion_pipeline: Optional[IngestionPipeline]
# All workflows require a source as a first step
source: Source
@ -84,6 +86,15 @@ class WorkflowStatusMixin:
f"{step.__class__.__name__} reported warnings", step.get_status()
)
def _new_pipeline_status(self, state: PipelineState) -> PipelineStatus:
"""Create new Pipeline Status"""
return PipelineStatus(
runId=self.run_id,
pipelineState=state,
startDate=self._start_ts,
timestamp=self._start_ts,
)
def set_ingestion_pipeline_status(
self,
state: PipelineState,
@ -95,16 +106,13 @@ class WorkflowStatusMixin:
# if we don't have a related Ingestion Pipeline FQN, no status is set.
if self.config.ingestionPipelineFQN and self.ingestion_pipeline:
if state in (PipelineState.queued, PipelineState.running):
pipeline_status = PipelineStatus(
runId=self.run_id,
pipelineState=state,
startDate=datetime_to_ts(datetime.now()),
timestamp=datetime_to_ts(datetime.now()),
)
pipeline_status = self._new_pipeline_status(state)
else:
pipeline_status = self.metadata.get_pipeline_status(
self.config.ingestionPipelineFQN, self.run_id
)
if not pipeline_status:
pipeline_status = self._new_pipeline_status(state)
# if workflow is ended then update the end date in status
pipeline_status.endDate = datetime.now().timestamp() * 1000
pipeline_status.pipelineState = state

View File

@ -44,12 +44,25 @@ component that can be run from - literally - anywhere.
In order to install it, you just need to get it from [PyPI](https://pypi.org/project/openmetadata-ingestion/).
We will show further examples later, but a piece of code is the best showcase for its simplicity. In order to run
a full ingestion process, you just need to execute a single function:
a full ingestion process, you just need to execute a single function. For example, if we wanted to run the ingestion
from within a simple YAML script:
```python
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_output_handler import print_status
# Specify your YAML configuration
CONFIG = """
source:
...
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: openmetadata
securityConfig:
jwtToken: ...
"""
def run():
workflow_config = yaml.safe_load(CONFIG)
workflow = MetadataWorkflow.create(workflow_config)
@ -57,10 +70,15 @@ def run():
workflow.raise_from_status()
print_status(workflow)
workflow.stop()
if __name__ == "__main__":
run()
```
Where this function runs is completely up to you, and you can adapt it to what makes the most sense within your
organization and engineering context.
organization and engineering context. Below you'll see some examples of different orchestrators you can leverage
to execute the ingestion process.
### 2. Ingestion Configuration