Fixes #19692: Implemented Nifi Pipeline Lineage (#21802)

* feat: implemented nifi pipeline lineage

* test: implemented tests for nifi pipeline lineage

* fix: yield_pipeline_bulk_lineage_details output type hinting

* fix: component check in connections

---------

Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>
This commit is contained in:
Keshav Mohta 2025-06-18 13:01:04 +05:30 committed by GitHub
parent 563e689553
commit 7c0eeef049
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 294 additions and 8 deletions

View File

@ -12,13 +12,14 @@
Nifi source to extract metadata
"""
import traceback
from typing import Iterable, List, Optional
from collections import defaultdict
from typing import Dict, Iterable, List, Optional
from pydantic import BaseModel, ValidationError
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Task
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import (
NifiConnection,
)
@ -30,6 +31,9 @@ from metadata.generated.schema.type.basic import (
FullyQualifiedEntityName,
SourceUrl,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
@ -43,6 +47,7 @@ logger = ingestion_logger()
PROCESS_GROUP_FLOW = "processGroupFlow"
BREADCRUMB = "breadcrumb"
PARENT_BREADCRUMB = "parentBreadcrumb"
class NifiProcessor(BaseModel):
@ -77,6 +82,7 @@ class NifiPipelineDetails(BaseModel):
uri: str
processors: List[NifiProcessor]
connections: List[NifiProcessorConnections]
parent_pipeline_id: Optional[str] = None
class NifiSource(PipelineServiceSource):
@ -85,6 +91,11 @@ class NifiSource(PipelineServiceSource):
Pipeline metadata from Airflow's metadata db
"""
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
super().__init__(config, metadata)
self.pipeline_parents_mapping: Dict[str, List[str]] = defaultdict(list)
self.process_group_connections: List[NifiProcessorConnections] = []
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
@ -220,7 +231,7 @@ class NifiSource(PipelineServiceSource):
"""Get List of all pipelines"""
for process_group in self.connection.list_process_groups():
try:
yield NifiPipelineDetails(
nifi_pipeline_details = NifiPipelineDetails(
id_=process_group[PROCESS_GROUP_FLOW].get("id"),
name=process_group[PROCESS_GROUP_FLOW][BREADCRUMB][BREADCRUMB].get(
"name"
@ -232,7 +243,18 @@ class NifiSource(PipelineServiceSource):
connections=self._get_connections_from_process_group(
process_group=process_group
),
parent_pipeline_id=process_group[PROCESS_GROUP_FLOW][BREADCRUMB]
.get(PARENT_BREADCRUMB, {})
.get("id"),
)
if nifi_pipeline_details.parent_pipeline_id:
self.pipeline_parents_mapping[nifi_pipeline_details.id_].append(
nifi_pipeline_details.parent_pipeline_id
)
self.process_group_connections.extend(
self.get_process_group_connections(process_group)
)
yield nifi_pipeline_details
except (ValueError, KeyError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.warning(
@ -244,5 +266,113 @@ class NifiSource(PipelineServiceSource):
f"Wild error encountered when trying to get pipelines from Process Group {process_group} - {err}."
)
def get_process_group_connections(
self, process_group: dict
) -> List[NifiProcessorConnections]:
"""Get all connections for a process group"""
connections_list = (
process_group.get(PROCESS_GROUP_FLOW).get("flow").get("connections")
)
connections = []
for connection in connections_list:
try:
source = connection.get("component", {}).get("source", {})
destination = connection.get("component", {}).get("destination", {})
if (
source.get("type") == "OUTPUT_PORT"
and destination.get("type") == "INPUT_PORT"
and source.get("groupId") != destination.get("groupId")
):
connections.append(
NifiProcessorConnections(
id_=connection.get("id"),
source_id=source.get("groupId"),
destination_id=destination.get("groupId"),
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Wild error encountered when trying to get process group connections from \
{process_group[PROCESS_GROUP_FLOW][BREADCRUMB][BREADCRUMB].get('name')} - {err}."
)
return connections
def yield_pipeline_bulk_lineage_details(
self,
) -> Iterable[Either[AddLineageRequest]]:
"""
Process the pipeline bulk lineage details
"""
# Process the lineage between parent and child pipelines
for pipeline_id, parent_pipeline_ids in self.pipeline_parents_mapping.items():
to_entity = self.metadata.get_by_name(
entity=Pipeline,
fqn=f"{self.context.get().pipeline_service}.{pipeline_id}",
)
if not to_entity:
logger.warning(
f"Pipeline {pipeline_id} not found in metadata, skipping lineage"
)
continue
for parent_pipeline_id in parent_pipeline_ids:
from_entity = self.metadata.get_by_name(
entity=Pipeline,
fqn=f"{self.context.get().pipeline_service}.{parent_pipeline_id}",
)
if not from_entity:
logger.warning(
f"Parent Pipeline {parent_pipeline_id} not found in metadata, skipping lineage"
)
continue
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id, type="pipeline"
),
toEntity=EntityReference(id=to_entity.id, type="pipeline"),
lineageDetails=LineageDetails(
source=LineageSource.PipelineLineage
),
)
)
)
# Process the lineage between connected pipelines
for connection in self.process_group_connections:
from_entity = self.metadata.get_by_name(
entity=Pipeline,
fqn=f"{self.context.get().pipeline_service}.{connection.source_id}",
)
if not from_entity:
logger.warning(
f"Pipeline {connection.source_id} not found in metadata, skipping lineage"
)
continue
to_entity = self.metadata.get_by_name(
entity=Pipeline,
fqn=f"{self.context.get().pipeline_service}.{connection.destination_id}",
)
if not to_entity:
logger.warning(
f"Pipeline {connection.destination_id} not found in metadata, skipping lineage"
)
continue
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=from_entity.id, type="pipeline"),
toEntity=EntityReference(id=to_entity.id, type="pipeline"),
lineageDetails=LineageDetails(
source=LineageSource.PipelineLineage
),
)
)
)
def get_pipeline_name(self, pipeline_details: NifiPipelineDetails) -> str:
return pipeline_details.name

View File

@ -137,6 +137,7 @@ class PipelineServiceTopology(ServiceTopology):
nullable=True,
),
],
post_process=["process_pipeline_bulk_lineage"],
)
@ -316,6 +317,23 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
else:
yield lineage
def yield_pipeline_bulk_lineage_details(self) -> Iterable[AddLineageRequest]:
"""Method to yield the bulk pipeline lineage details"""
def process_pipeline_bulk_lineage(self) -> Iterable[AddLineageRequest]:
"""Method to process the bulk pipeline lineage"""
if self.source_config.includeLineage:
for lineage in self.yield_pipeline_bulk_lineage_details() or []:
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideLineage,
)
)
else:
yield lineage
def _get_table_fqn_from_om(self, table_details: TableDetails) -> Optional[str]:
"""
Based on partial schema and table names look for matching table object in open metadata.

View File

@ -18,6 +18,7 @@ from unittest import TestCase
from unittest.mock import PropertyMock, patch
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
@ -28,7 +29,11 @@ from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.nifi.metadata import (
NifiPipelineDetails,
NifiProcessor,
@ -78,6 +83,27 @@ mock_nifi_config = {
},
}
EXPECTED_PARENT_NIFI_DETAILS = NifiPipelineDetails(
id_="affe20b6-b5b6-47fb-8dd3-ff53cd4aee4a",
name="Parent NiFi Flow",
uri="/nifi-api/flow/process-groups/affe20b6-b5b6-47fb-8dd3-ff53cd4aee4a",
processors=[
NifiProcessor(
id_="ec8246b3-d740-4d8e-8571-7059a9f615e7",
name="Wait",
type_="org.apache.nifi.processors.standard.Wait",
uri="/nifi-api/processors/ec8246b3-d740-4d8e-8571-7059a9f615e7",
),
NifiProcessor(
id_="be1ecb80-3c73-46ec-8e3f-6b90a14f91c7",
name="ValidateJson",
type_="org.apache.nifi.processors.standard.ValidateJson",
uri="/nifi-api/processors/be1ecb80-3c73-46ec-8e3f-6b90a14f91c7",
),
],
connections=[],
)
EXPECTED_NIFI_DETAILS = NifiPipelineDetails(
id_="d3d6b945-0182-1000-d7e4-d81b8f79f310",
@ -104,6 +130,15 @@ EXPECTED_NIFI_DETAILS = NifiPipelineDetails(
destination_id="d3f023ac-0182-1000-8bbe-e2b00347fff8",
)
],
parent_pipeline_id="affe20b6-b5b6-47fb-8dd3-ff53cd4aee4a",
)
EXPECTED_NIFI_DETAILS_2 = NifiPipelineDetails(
id_="364e6ed1-feab-403c-a0c7-0003a55ea8aa",
name="NiFi Flow 2",
uri="/nifi-api/flow/process-groups/364e6ed1-feab-403c-a0c7-0003a55ea8aa",
processors=[],
connections=[],
)
@ -183,6 +218,79 @@ MOCK_PIPELINE = Pipeline(
),
)
MOCK_PIPELINE_2 = Pipeline(
id="38cd7319-cde9-4d68-b9d3-82d28b20b7bc",
name="364e6ed1-feab-403c-a0c7-0003a55ea8aa",
fullyQualifiedName="nifi_source.364e6ed1-feab-403c-a0c7-0003a55ea8aa",
displayName="NiFi Flow 2",
sourceUrl=(
"https://localhost:8443/nifi-api/flow/"
"process-groups/364e6ed1-feab-403c-a0c7-0003a55ea8aa"
),
tasks=[],
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
),
)
MOCK_PARENT_PIPELINE = Pipeline(
id="f9d1c7e1-4c0a-4578-a1bd-e9941c88a1c5",
name="affe20b6-b5b6-47fb-8dd3-ff53cd4aee4a",
fullyQualifiedName="nifi_source.affe20b6-b5b6-47fb-8dd3-ff53cd4aee4a",
displayName="Parent NiFi Flow",
sourceUrl=(
"https://localhost:8443/nifi-api/flow/"
"process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310"
),
tasks=[
Task(
name="ec8246b3-d740-4d8e-8571-7059a9f615e7",
displayName="Wait",
sourceUrl=(
"https://localhost:8443/nifi-api/processors/"
"ec8246b3-d740-4d8e-8571-7059a9f615e7"
),
taskType="org.apache.nifi.processors.standard.Wait",
downstreamTasks=[],
),
Task(
name="be1ecb80-3c73-46ec-8e3f-6b90a14f91c7",
displayName="ValidateJson",
sourceUrl=(
"https://localhost:8443/nifi-api/processors/"
"be1ecb80-3c73-46ec-8e3f-6b90a14f91c7"
),
taskType="org.apache.nifi.processors.standard.ValidateJson",
downstreamTasks=["ec8246b3-d740-4d8e-8571-7059a9f615e7"],
),
],
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
),
)
EXPECTED_PIPELINE_BULK_LINEAGE_DETAILS = [
Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=MOCK_PARENT_PIPELINE.id, type="pipeline"),
toEntity=EntityReference(id=MOCK_PIPELINE.id, type="pipeline"),
lineageDetails=LineageDetails(source=LineageSource.PipelineLineage),
),
)
),
Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=MOCK_PIPELINE.id, type="pipeline"),
toEntity=EntityReference(id=MOCK_PIPELINE_2.id, type="pipeline"),
lineageDetails=LineageDetails(source=LineageSource.PipelineLineage),
),
)
),
]
class NifiUnitTest(TestCase):
"""
@ -205,13 +313,37 @@ class NifiUnitTest(TestCase):
config = OpenMetadataWorkflowConfig.model_validate(mock_nifi_config)
self.nifi = NifiSource.create(
mock_nifi_config["source"],
config.workflowConfig.openMetadataServerConfig,
OpenMetadata(config.workflowConfig.openMetadataServerConfig),
)
self.nifi.context.get().__dict__["pipeline"] = MOCK_PIPELINE.name.root
self.nifi.context.get().__dict__[
"pipeline_service"
] = MOCK_PIPELINE_SERVICE.name.root
# Mock metadata.get_by_name to return different pipeline entities based on FQN
self.original_get_by_name = self.nifi.metadata.get_by_name
self.nifi.metadata.get_by_name = self._mock_get_by_name
self.nifi.pipeline_parents_mapping = {
EXPECTED_NIFI_DETAILS.id_: [EXPECTED_NIFI_DETAILS.parent_pipeline_id],
}
self.nifi.process_group_connections = [
NifiProcessorConnections(
id_="fd99b000-6117-47c1-a075-b09591b04d61",
source_id="d3d6b945-0182-1000-d7e4-d81b8f79f310",
destination_id="364e6ed1-feab-403c-a0c7-0003a55ea8aa",
)
]
def _mock_get_by_name(self, entity, fqn):
"""Mock function to return different pipeline entities based on FQN"""
fqn_pipeline_mapping = {
f"{self.nifi.context.get().pipeline_service}.{EXPECTED_PARENT_NIFI_DETAILS.id_}": MOCK_PARENT_PIPELINE,
f"{self.nifi.context.get().pipeline_service}.{EXPECTED_NIFI_DETAILS.id_}": MOCK_PIPELINE,
f"{self.nifi.context.get().pipeline_service}.{EXPECTED_NIFI_DETAILS_2.id_}": MOCK_PIPELINE_2,
}
return fqn_pipeline_mapping.get(fqn, self.original_get_by_name(entity, fqn))
def test_pipeline_name(self):
assert (
self.nifi.get_pipeline_name(EXPECTED_NIFI_DETAILS)
@ -221,3 +353,9 @@ class NifiUnitTest(TestCase):
def test_pipelines(self):
pipline = list(self.nifi.yield_pipeline(EXPECTED_NIFI_DETAILS))[0].right
assert pipline == EXPECTED_CREATED_PIPELINES
def test_pipeline_bulk_lineage_details(self):
pipeline_bulk_lineage_details = list(
self.nifi.yield_pipeline_bulk_lineage_details()
)
assert pipeline_bulk_lineage_details == EXPECTED_PIPELINE_BULK_LINEAGE_DETAILS

View File

@ -7,8 +7,8 @@ slug: /connectors/pipeline/nifi
name="NiFi"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Pipelines", "Usage"]
unavailableFeatures=["Pipeline Status", "Owners", "Tags", "Lineage"]
availableFeatures=["Pipelines", "Usage", "Lineage"]
unavailableFeatures=["Pipeline Status", "Owners", "Tags"]
/ %}
In this section, we provide guides and references to use the NiFi connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/pipeline/nifi/yaml
name="Nifi"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Pipelines", "Usage"]
unavailableFeatures=["Pipeline Status", "Owners", "Tags", "Lineage"]
availableFeatures=["Pipelines", "Usage", "Lineage"]
unavailableFeatures=["Pipeline Status", "Owners", "Tags"]
/ %}
In this section, we provide guides and references to use the NiFi connector.