diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index 49a1c121385..e5aab5f490a 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -22,6 +22,7 @@ To be extended by any other workflow: import traceback import uuid from abc import ABC, abstractmethod +from datetime import datetime from typing import Optional, Tuple, TypeVar, cast from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import ( @@ -57,6 +58,7 @@ from metadata.utils.class_helper import ( get_service_class_from_service_type, get_service_type_from_source_type, ) +from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger, set_loggers_level from metadata.workflow.workflow_output_handler import get_ingestion_status_timer from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin @@ -97,6 +99,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): self.config = config self._timer: Optional[RepeatedTimer] = None self._ingestion_pipeline: Optional[IngestionPipeline] = None + self._start_ts = datetime_to_ts(datetime.now()) set_loggers_level(config.workflowConfig.loggerLevel.value) diff --git a/ingestion/src/metadata/workflow/workflow_status_mixin.py b/ingestion/src/metadata/workflow/workflow_status_mixin.py index 05373d26ffd..5928957a5cc 100644 --- a/ingestion/src/metadata/workflow/workflow_status_mixin.py +++ b/ingestion/src/metadata/workflow/workflow_status_mixin.py @@ -17,6 +17,7 @@ from typing import Optional, Tuple from metadata.config.common import WorkflowExecutionError from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + IngestionPipeline, PipelineState, PipelineStatus, ) @@ -26,7 +27,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.ingestion.api.step import Step from metadata.ingestion.api.steps import Source from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.helpers import datetime_to_ts SUCCESS_THRESHOLD_VALUE = 90 @@ -42,6 +42,8 @@ class WorkflowStatusMixin: config: OpenMetadataWorkflowConfig _run_id: Optional[str] = None metadata: OpenMetadata + _start_ts: int + ingestion_pipeline: Optional[IngestionPipeline] # All workflows require a source as a first step source: Source @@ -84,6 +86,15 @@ class WorkflowStatusMixin: f"{step.__class__.__name__} reported warnings", step.get_status() ) + def _new_pipeline_status(self, state: PipelineState) -> PipelineStatus: + """Create new Pipeline Status""" + return PipelineStatus( + runId=self.run_id, + pipelineState=state, + startDate=self._start_ts, + timestamp=self._start_ts, + ) + def set_ingestion_pipeline_status( self, state: PipelineState, @@ -95,16 +106,13 @@ class WorkflowStatusMixin: # if we don't have a related Ingestion Pipeline FQN, no status is set. if self.config.ingestionPipelineFQN and self.ingestion_pipeline: if state in (PipelineState.queued, PipelineState.running): - pipeline_status = PipelineStatus( - runId=self.run_id, - pipelineState=state, - startDate=datetime_to_ts(datetime.now()), - timestamp=datetime_to_ts(datetime.now()), - ) + pipeline_status = self._new_pipeline_status(state) else: pipeline_status = self.metadata.get_pipeline_status( self.config.ingestionPipelineFQN, self.run_id ) + if not pipeline_status: + pipeline_status = self._new_pipeline_status(state) # if workflow is ended then update the end date in status pipeline_status.endDate = datetime.now().timestamp() * 1000 pipeline_status.pipelineState = state diff --git a/openmetadata-docs/content/v1.2.x/deployment/ingestion/index.md b/openmetadata-docs/content/v1.2.x/deployment/ingestion/index.md index 0b429944933..7f8fb01e7f0 100644 --- a/openmetadata-docs/content/v1.2.x/deployment/ingestion/index.md +++ b/openmetadata-docs/content/v1.2.x/deployment/ingestion/index.md @@ -44,12 +44,25 @@ component that can be run from - literally - anywhere. In order to install it, you just need to get it from [PyPI](https://pypi.org/project/openmetadata-ingestion/). We will show further examples later, but a piece of code is the best showcase for its simplicity. In order to run -a full ingestion process, you just need to execute a single function: +a full ingestion process, you just need to execute a single function. For example, if we wanted to run the ingestion +from within a simple YAML script: ```python from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.workflow_output_handler import print_status +# Specify your YAML configuration +CONFIG = """ +source: + ... +workflowConfig: + openMetadataServerConfig: + hostPort: 'http://localhost:8585/api' + authProvider: openmetadata + securityConfig: + jwtToken: ... +""" + def run(): workflow_config = yaml.safe_load(CONFIG) workflow = MetadataWorkflow.create(workflow_config) @@ -57,10 +70,15 @@ def run(): workflow.raise_from_status() print_status(workflow) workflow.stop() + + +if __name__ == "__main__": + run() ``` Where this function runs is completely up to you, and you can adapt it to what makes the most sense within your -organization and engineering context. +organization and engineering context. Below you'll see some examples of different orchestrators you can leverage +to execute the ingestion process. ### 2. Ingestion Configuration