Fix #22340: Execution Time Support for NiFi Connector (#22981)

* feat: added nifi execution history

* doc: added pipeline status in available features
This commit is contained in:
Keshav Mohta 2025-08-19 13:55:02 +05:30 committed by GitHub
parent 5c4e3f365b
commit f39f57ddcd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 66 additions and 9 deletions

View File

@ -11,18 +11,29 @@
""" """
Nifi source to extract metadata Nifi source to extract metadata
""" """
import math
import traceback import traceback
from collections import defaultdict from collections import defaultdict
from datetime import datetime
from typing import Dict, Iterable, List, Optional from typing import Dict, Iterable, List, Optional
from pydantic import BaseModel, ValidationError from pydantic import BaseModel, ValidationError
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task from metadata.generated.schema.entity.data.pipeline import (
Pipeline,
PipelineStatus,
StatusType,
Task,
TaskStatus,
)
from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import ( from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import (
NifiConnection, NifiConnection,
) )
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
@ -39,6 +50,7 @@ from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn
from metadata.utils.helpers import clean_uri from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
@ -49,6 +61,13 @@ PROCESS_GROUP_FLOW = "processGroupFlow"
BREADCRUMB = "breadcrumb" BREADCRUMB = "breadcrumb"
PARENT_BREADCRUMB = "parentBreadcrumb" PARENT_BREADCRUMB = "parentBreadcrumb"
STATUS_MAP = {
"Running": StatusType.Successful.value,
"Stopped": StatusType.Pending.value,
"Invalid": StatusType.Failed.value,
"Disabled": StatusType.Skipped.value,
}
class NifiProcessor(BaseModel): class NifiProcessor(BaseModel):
""" """
@ -59,6 +78,7 @@ class NifiProcessor(BaseModel):
name: Optional[str] = None name: Optional[str] = None
type_: str type_: str
uri: str uri: str
run_status: Optional[str] = None
class NifiProcessorConnections(BaseModel): class NifiProcessorConnections(BaseModel):
@ -129,7 +149,7 @@ class NifiSource(PipelineServiceSource):
try: try:
return [ return [
Task( Task(
name=processor.id_, name=str(processor.id_),
displayName=processor.name, displayName=processor.name,
sourceUrl=SourceUrl( sourceUrl=SourceUrl(
f"{clean_uri(self.service_connection.hostPort)}{processor.uri}" f"{clean_uri(self.service_connection.hostPort)}{processor.uri}"
@ -173,10 +193,46 @@ class NifiSource(PipelineServiceSource):
self, pipeline_details: NifiPipelineDetails self, pipeline_details: NifiPipelineDetails
) -> Iterable[Either[OMetaPipelineStatus]]: ) -> Iterable[Either[OMetaPipelineStatus]]:
""" """
Method to get task & pipeline status. Method to get task & pipeline status with execution history.
Based on the latest refresh data.
https://github.com/open-metadata/OpenMetadata/issues/6955
""" """
try:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
for task in pipeline_details.processors:
task_status = TaskStatus(
name=str(task.id_),
executionStatus=STATUS_MAP.get(task.run_status, StatusType.Pending),
)
pipeline_status = PipelineStatus(
executionStatus=task_status.executionStatus,
taskStatus=[task_status],
timestamp=math.floor(
(datetime.now().timestamp()) * 1000 # timestamp in milliseconds
),
)
yield Either(
right=OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn,
pipeline_status=pipeline_status,
)
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=pipeline_details.name,
error=f"Wild error ingesting pipeline status {pipeline_details} - {exc}",
stackTrace=traceback.format_exc(),
)
)
def yield_pipeline_lineage_details( def yield_pipeline_lineage_details(
self, pipeline_details: NifiPipelineDetails self, pipeline_details: NifiPipelineDetails
@ -223,6 +279,7 @@ class NifiSource(PipelineServiceSource):
uri=processor.get("uri"), uri=processor.get("uri"),
name=processor["component"].get("name"), name=processor["component"].get("name"),
type_=processor["component"].get("type"), type_=processor["component"].get("type"),
run_status=processor.get("status", {}).get("runStatus"),
) )
for processor in processor_list for processor in processor_list
] ]

View File

@ -8,8 +8,8 @@ slug: /connectors/pipeline/nifi
name="NiFi" name="NiFi"
stage="PROD" stage="PROD"
platform="OpenMetadata" platform="OpenMetadata"
availableFeatures=["Pipelines", "Usage", "Lineage"] availableFeatures=["Pipelines", "Usage", "Lineage", "Pipeline Status"]
unavailableFeatures=["Pipeline Status", "Owners", "Tags"] unavailableFeatures=["Owners", "Tags"]
/ %} / %}
In this section, we provide guides and references to use the NiFi connector. In this section, we provide guides and references to use the NiFi connector.

View File

@ -8,8 +8,8 @@ slug: /connectors/pipeline/nifi/yaml
name="Nifi" name="Nifi"
stage="PROD" stage="PROD"
platform="OpenMetadata" platform="OpenMetadata"
availableFeatures=["Pipelines", "Usage", "Lineage"] availableFeatures=["Pipelines", "Usage", "Lineage", "Pipeline Status"]
unavailableFeatures=["Pipeline Status", "Owners", "Tags"] unavailableFeatures=["Owners", "Tags"]
/ %} / %}
In this section, we provide guides and references to use the NiFi connector. In this section, we provide guides and references to use the NiFi connector.