From f39f57ddcd44f0b2079f9363469f1d2e0112bc3b Mon Sep 17 00:00:00 2001 From: Keshav Mohta <68001229+keshavmohta09@users.noreply.github.com> Date: Tue, 19 Aug 2025 13:55:02 +0530 Subject: [PATCH] Fix #22340: Execution Time Support for NiFi Connector (#22981) * feat: added nifi execution history * doc: added pipeline status in available features --- .../source/pipeline/nifi/metadata.py | 67 +++++++++++++++++-- .../v1.9.x/connectors/pipeline/nifi/index.md | 4 +- .../v1.9.x/connectors/pipeline/nifi/yaml.md | 4 +- 3 files changed, 66 insertions(+), 9 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/nifi/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/nifi/metadata.py index 2a6fb20b199..17a29b63192 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/nifi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/nifi/metadata.py @@ -11,18 +11,29 @@ """ Nifi source to extract metadata """ +import math import traceback from collections import defaultdict +from datetime import datetime from typing import Dict, Iterable, List, Optional from pydantic import BaseModel, ValidationError from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.data.pipeline import Pipeline, Task +from metadata.generated.schema.entity.data.pipeline import ( + Pipeline, + PipelineStatus, + StatusType, + Task, + TaskStatus, +) from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import ( NifiConnection, ) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -39,6 +50,7 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource +from metadata.utils import fqn from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger @@ -49,6 +61,13 @@ PROCESS_GROUP_FLOW = "processGroupFlow" BREADCRUMB = "breadcrumb" PARENT_BREADCRUMB = "parentBreadcrumb" +STATUS_MAP = { + "Running": StatusType.Successful.value, + "Stopped": StatusType.Pending.value, + "Invalid": StatusType.Failed.value, + "Disabled": StatusType.Skipped.value, +} + class NifiProcessor(BaseModel): """ @@ -59,6 +78,7 @@ class NifiProcessor(BaseModel): name: Optional[str] = None type_: str uri: str + run_status: Optional[str] = None class NifiProcessorConnections(BaseModel): @@ -129,7 +149,7 @@ class NifiSource(PipelineServiceSource): try: return [ Task( - name=processor.id_, + name=str(processor.id_), displayName=processor.name, sourceUrl=SourceUrl( f"{clean_uri(self.service_connection.hostPort)}{processor.uri}" @@ -173,10 +193,46 @@ class NifiSource(PipelineServiceSource): self, pipeline_details: NifiPipelineDetails ) -> Iterable[Either[OMetaPipelineStatus]]: """ - Method to get task & pipeline status. - Based on the latest refresh data. - https://github.com/open-metadata/OpenMetadata/issues/6955 + Method to get task & pipeline status with execution history. """ + try: + + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, + ) + + for task in pipeline_details.processors: + task_status = TaskStatus( + name=str(task.id_), + executionStatus=STATUS_MAP.get(task.run_status, StatusType.Pending), + ) + + pipeline_status = PipelineStatus( + executionStatus=task_status.executionStatus, + taskStatus=[task_status], + timestamp=math.floor( + (datetime.now().timestamp()) * 1000 # timestamp in milliseconds + ), + ) + + yield Either( + right=OMetaPipelineStatus( + pipeline_fqn=pipeline_fqn, + pipeline_status=pipeline_status, + ) + ) + + except Exception as exc: + yield Either( + left=StackTraceError( + name=pipeline_details.name, + error=f"Wild error ingesting pipeline status {pipeline_details} - {exc}", + stackTrace=traceback.format_exc(), + ) + ) def yield_pipeline_lineage_details( self, pipeline_details: NifiPipelineDetails @@ -223,6 +279,7 @@ class NifiSource(PipelineServiceSource): uri=processor.get("uri"), name=processor["component"].get("name"), type_=processor["component"].get("type"), + run_status=processor.get("status", {}).get("runStatus"), ) for processor in processor_list ] diff --git a/openmetadata-docs/content/v1.9.x/connectors/pipeline/nifi/index.md b/openmetadata-docs/content/v1.9.x/connectors/pipeline/nifi/index.md index 8d43c0a99e8..47baeddea13 100644 --- a/openmetadata-docs/content/v1.9.x/connectors/pipeline/nifi/index.md +++ b/openmetadata-docs/content/v1.9.x/connectors/pipeline/nifi/index.md @@ -8,8 +8,8 @@ slug: /connectors/pipeline/nifi name="NiFi" stage="PROD" platform="OpenMetadata" -availableFeatures=["Pipelines", "Usage", "Lineage"] -unavailableFeatures=["Pipeline Status", "Owners", "Tags"] +availableFeatures=["Pipelines", "Usage", "Lineage", "Pipeline Status"] +unavailableFeatures=["Owners", "Tags"] / %} In this section, we provide guides and references to use the NiFi connector. diff --git a/openmetadata-docs/content/v1.9.x/connectors/pipeline/nifi/yaml.md b/openmetadata-docs/content/v1.9.x/connectors/pipeline/nifi/yaml.md index d9998fd684c..cbde79bb9bb 100644 --- a/openmetadata-docs/content/v1.9.x/connectors/pipeline/nifi/yaml.md +++ b/openmetadata-docs/content/v1.9.x/connectors/pipeline/nifi/yaml.md @@ -8,8 +8,8 @@ slug: /connectors/pipeline/nifi/yaml name="Nifi" stage="PROD" platform="OpenMetadata" -availableFeatures=["Pipelines", "Usage", "Lineage"] -unavailableFeatures=["Pipeline Status", "Owners", "Tags"] +availableFeatures=["Pipelines", "Usage", "Lineage", "Pipeline Status"] +unavailableFeatures=["Owners", "Tags"] / %} In this section, we provide guides and references to use the NiFi connector.