diff --git a/ingestion/src/metadata/ingestion/source/pipeline/nifi/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/nifi/metadata.py index 26701e9bcbe..2a6fb20b199 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/nifi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/nifi/metadata.py @@ -12,13 +12,14 @@ Nifi source to extract metadata """ import traceback -from typing import Iterable, List, Optional +from collections import defaultdict +from typing import Dict, Iterable, List, Optional from pydantic import BaseModel, ValidationError from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.data.pipeline import Task +from metadata.generated.schema.entity.data.pipeline import Pipeline, Task from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import ( NifiConnection, ) @@ -30,6 +31,9 @@ from metadata.generated.schema.type.basic import ( FullyQualifiedEntityName, SourceUrl, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus @@ -43,6 +47,7 @@ logger = ingestion_logger() PROCESS_GROUP_FLOW = "processGroupFlow" BREADCRUMB = "breadcrumb" +PARENT_BREADCRUMB = "parentBreadcrumb" class NifiProcessor(BaseModel): @@ -77,6 +82,7 @@ class NifiPipelineDetails(BaseModel): uri: str processors: List[NifiProcessor] connections: List[NifiProcessorConnections] + parent_pipeline_id: Optional[str] = None class NifiSource(PipelineServiceSource): @@ -85,6 +91,11 @@ class NifiSource(PipelineServiceSource): Pipeline metadata from Airflow's metadata db """ + def __init__(self, config: WorkflowSource, metadata: OpenMetadata): + super().__init__(config, metadata) + self.pipeline_parents_mapping: Dict[str, List[str]] = defaultdict(list) + self.process_group_connections: List[NifiProcessorConnections] = [] + @classmethod def create( cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None @@ -220,7 +231,7 @@ class NifiSource(PipelineServiceSource): """Get List of all pipelines""" for process_group in self.connection.list_process_groups(): try: - yield NifiPipelineDetails( + nifi_pipeline_details = NifiPipelineDetails( id_=process_group[PROCESS_GROUP_FLOW].get("id"), name=process_group[PROCESS_GROUP_FLOW][BREADCRUMB][BREADCRUMB].get( "name" @@ -232,7 +243,18 @@ class NifiSource(PipelineServiceSource): connections=self._get_connections_from_process_group( process_group=process_group ), + parent_pipeline_id=process_group[PROCESS_GROUP_FLOW][BREADCRUMB] + .get(PARENT_BREADCRUMB, {}) + .get("id"), ) + if nifi_pipeline_details.parent_pipeline_id: + self.pipeline_parents_mapping[nifi_pipeline_details.id_].append( + nifi_pipeline_details.parent_pipeline_id + ) + self.process_group_connections.extend( + self.get_process_group_connections(process_group) + ) + yield nifi_pipeline_details except (ValueError, KeyError, ValidationError) as err: logger.debug(traceback.format_exc()) logger.warning( @@ -244,5 +266,113 @@ class NifiSource(PipelineServiceSource): f"Wild error encountered when trying to get pipelines from Process Group {process_group} - {err}." ) + def get_process_group_connections( + self, process_group: dict + ) -> List[NifiProcessorConnections]: + """Get all connections for a process group""" + connections_list = ( + process_group.get(PROCESS_GROUP_FLOW).get("flow").get("connections") + ) + connections = [] + + for connection in connections_list: + try: + source = connection.get("component", {}).get("source", {}) + destination = connection.get("component", {}).get("destination", {}) + if ( + source.get("type") == "OUTPUT_PORT" + and destination.get("type") == "INPUT_PORT" + and source.get("groupId") != destination.get("groupId") + ): + connections.append( + NifiProcessorConnections( + id_=connection.get("id"), + source_id=source.get("groupId"), + destination_id=destination.get("groupId"), + ) + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Wild error encountered when trying to get process group connections from \ + {process_group[PROCESS_GROUP_FLOW][BREADCRUMB][BREADCRUMB].get('name')} - {err}." + ) + return connections + + def yield_pipeline_bulk_lineage_details( + self, + ) -> Iterable[Either[AddLineageRequest]]: + """ + Process the pipeline bulk lineage details + """ + # Process the lineage between parent and child pipelines + for pipeline_id, parent_pipeline_ids in self.pipeline_parents_mapping.items(): + to_entity = self.metadata.get_by_name( + entity=Pipeline, + fqn=f"{self.context.get().pipeline_service}.{pipeline_id}", + ) + if not to_entity: + logger.warning( + f"Pipeline {pipeline_id} not found in metadata, skipping lineage" + ) + continue + for parent_pipeline_id in parent_pipeline_ids: + from_entity = self.metadata.get_by_name( + entity=Pipeline, + fqn=f"{self.context.get().pipeline_service}.{parent_pipeline_id}", + ) + if not from_entity: + logger.warning( + f"Parent Pipeline {parent_pipeline_id} not found in metadata, skipping lineage" + ) + continue + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id, type="pipeline" + ), + toEntity=EntityReference(id=to_entity.id, type="pipeline"), + lineageDetails=LineageDetails( + source=LineageSource.PipelineLineage + ), + ) + ) + ) + + # Process the lineage between connected pipelines + for connection in self.process_group_connections: + from_entity = self.metadata.get_by_name( + entity=Pipeline, + fqn=f"{self.context.get().pipeline_service}.{connection.source_id}", + ) + if not from_entity: + logger.warning( + f"Pipeline {connection.source_id} not found in metadata, skipping lineage" + ) + continue + + to_entity = self.metadata.get_by_name( + entity=Pipeline, + fqn=f"{self.context.get().pipeline_service}.{connection.destination_id}", + ) + if not to_entity: + logger.warning( + f"Pipeline {connection.destination_id} not found in metadata, skipping lineage" + ) + continue + + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=from_entity.id, type="pipeline"), + toEntity=EntityReference(id=to_entity.id, type="pipeline"), + lineageDetails=LineageDetails( + source=LineageSource.PipelineLineage + ), + ) + ) + ) + def get_pipeline_name(self, pipeline_details: NifiPipelineDetails) -> str: return pipeline_details.name diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index e2701e9cf9d..23b92d9e116 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -137,6 +137,7 @@ class PipelineServiceTopology(ServiceTopology): nullable=True, ), ], + post_process=["process_pipeline_bulk_lineage"], ) @@ -316,6 +317,23 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): else: yield lineage + def yield_pipeline_bulk_lineage_details(self) -> Iterable[AddLineageRequest]: + """Method to yield the bulk pipeline lineage details""" + + def process_pipeline_bulk_lineage(self) -> Iterable[AddLineageRequest]: + """Method to process the bulk pipeline lineage""" + if self.source_config.includeLineage: + for lineage in self.yield_pipeline_bulk_lineage_details() or []: + if lineage.right is not None: + yield Either( + right=OMetaLineageRequest( + lineage_request=lineage.right, + override_lineage=self.source_config.overrideLineage, + ) + ) + else: + yield lineage + def _get_table_fqn_from_om(self, table_details: TableDetails) -> Optional[str]: """ Based on partial schema and table names look for matching table object in open metadata. diff --git a/ingestion/tests/unit/topology/pipeline/test_nifi.py b/ingestion/tests/unit/topology/pipeline/test_nifi.py index a9bc0782949..bb0754f8004 100644 --- a/ingestion/tests/unit/topology/pipeline/test_nifi.py +++ b/ingestion/tests/unit/topology/pipeline/test_nifi.py @@ -18,6 +18,7 @@ from unittest import TestCase from unittest.mock import PropertyMock, patch from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.pipeline import Pipeline, Task from metadata.generated.schema.entity.services.pipelineService import ( PipelineConnection, @@ -28,7 +29,11 @@ from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.models import Either +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.nifi.metadata import ( NifiPipelineDetails, NifiProcessor, @@ -78,6 +83,27 @@ mock_nifi_config = { }, } +EXPECTED_PARENT_NIFI_DETAILS = NifiPipelineDetails( + id_="affe20b6-b5b6-47fb-8dd3-ff53cd4aee4a", + name="Parent NiFi Flow", + uri="/nifi-api/flow/process-groups/affe20b6-b5b6-47fb-8dd3-ff53cd4aee4a", + processors=[ + NifiProcessor( + id_="ec8246b3-d740-4d8e-8571-7059a9f615e7", + name="Wait", + type_="org.apache.nifi.processors.standard.Wait", + uri="/nifi-api/processors/ec8246b3-d740-4d8e-8571-7059a9f615e7", + ), + NifiProcessor( + id_="be1ecb80-3c73-46ec-8e3f-6b90a14f91c7", + name="ValidateJson", + type_="org.apache.nifi.processors.standard.ValidateJson", + uri="/nifi-api/processors/be1ecb80-3c73-46ec-8e3f-6b90a14f91c7", + ), + ], + connections=[], +) + EXPECTED_NIFI_DETAILS = NifiPipelineDetails( id_="d3d6b945-0182-1000-d7e4-d81b8f79f310", @@ -104,6 +130,15 @@ EXPECTED_NIFI_DETAILS = NifiPipelineDetails( destination_id="d3f023ac-0182-1000-8bbe-e2b00347fff8", ) ], + parent_pipeline_id="affe20b6-b5b6-47fb-8dd3-ff53cd4aee4a", +) + +EXPECTED_NIFI_DETAILS_2 = NifiPipelineDetails( + id_="364e6ed1-feab-403c-a0c7-0003a55ea8aa", + name="NiFi Flow 2", + uri="/nifi-api/flow/process-groups/364e6ed1-feab-403c-a0c7-0003a55ea8aa", + processors=[], + connections=[], ) @@ -183,6 +218,79 @@ MOCK_PIPELINE = Pipeline( ), ) +MOCK_PIPELINE_2 = Pipeline( + id="38cd7319-cde9-4d68-b9d3-82d28b20b7bc", + name="364e6ed1-feab-403c-a0c7-0003a55ea8aa", + fullyQualifiedName="nifi_source.364e6ed1-feab-403c-a0c7-0003a55ea8aa", + displayName="NiFi Flow 2", + sourceUrl=( + "https://localhost:8443/nifi-api/flow/" + "process-groups/364e6ed1-feab-403c-a0c7-0003a55ea8aa" + ), + tasks=[], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + + +MOCK_PARENT_PIPELINE = Pipeline( + id="f9d1c7e1-4c0a-4578-a1bd-e9941c88a1c5", + name="affe20b6-b5b6-47fb-8dd3-ff53cd4aee4a", + fullyQualifiedName="nifi_source.affe20b6-b5b6-47fb-8dd3-ff53cd4aee4a", + displayName="Parent NiFi Flow", + sourceUrl=( + "https://localhost:8443/nifi-api/flow/" + "process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310" + ), + tasks=[ + Task( + name="ec8246b3-d740-4d8e-8571-7059a9f615e7", + displayName="Wait", + sourceUrl=( + "https://localhost:8443/nifi-api/processors/" + "ec8246b3-d740-4d8e-8571-7059a9f615e7" + ), + taskType="org.apache.nifi.processors.standard.Wait", + downstreamTasks=[], + ), + Task( + name="be1ecb80-3c73-46ec-8e3f-6b90a14f91c7", + displayName="ValidateJson", + sourceUrl=( + "https://localhost:8443/nifi-api/processors/" + "be1ecb80-3c73-46ec-8e3f-6b90a14f91c7" + ), + taskType="org.apache.nifi.processors.standard.ValidateJson", + downstreamTasks=["ec8246b3-d740-4d8e-8571-7059a9f615e7"], + ), + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + +EXPECTED_PIPELINE_BULK_LINEAGE_DETAILS = [ + Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=MOCK_PARENT_PIPELINE.id, type="pipeline"), + toEntity=EntityReference(id=MOCK_PIPELINE.id, type="pipeline"), + lineageDetails=LineageDetails(source=LineageSource.PipelineLineage), + ), + ) + ), + Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=MOCK_PIPELINE.id, type="pipeline"), + toEntity=EntityReference(id=MOCK_PIPELINE_2.id, type="pipeline"), + lineageDetails=LineageDetails(source=LineageSource.PipelineLineage), + ), + ) + ), +] + class NifiUnitTest(TestCase): """ @@ -205,13 +313,37 @@ class NifiUnitTest(TestCase): config = OpenMetadataWorkflowConfig.model_validate(mock_nifi_config) self.nifi = NifiSource.create( mock_nifi_config["source"], - config.workflowConfig.openMetadataServerConfig, + OpenMetadata(config.workflowConfig.openMetadataServerConfig), ) self.nifi.context.get().__dict__["pipeline"] = MOCK_PIPELINE.name.root self.nifi.context.get().__dict__[ "pipeline_service" ] = MOCK_PIPELINE_SERVICE.name.root + # Mock metadata.get_by_name to return different pipeline entities based on FQN + self.original_get_by_name = self.nifi.metadata.get_by_name + self.nifi.metadata.get_by_name = self._mock_get_by_name + + self.nifi.pipeline_parents_mapping = { + EXPECTED_NIFI_DETAILS.id_: [EXPECTED_NIFI_DETAILS.parent_pipeline_id], + } + self.nifi.process_group_connections = [ + NifiProcessorConnections( + id_="fd99b000-6117-47c1-a075-b09591b04d61", + source_id="d3d6b945-0182-1000-d7e4-d81b8f79f310", + destination_id="364e6ed1-feab-403c-a0c7-0003a55ea8aa", + ) + ] + + def _mock_get_by_name(self, entity, fqn): + """Mock function to return different pipeline entities based on FQN""" + fqn_pipeline_mapping = { + f"{self.nifi.context.get().pipeline_service}.{EXPECTED_PARENT_NIFI_DETAILS.id_}": MOCK_PARENT_PIPELINE, + f"{self.nifi.context.get().pipeline_service}.{EXPECTED_NIFI_DETAILS.id_}": MOCK_PIPELINE, + f"{self.nifi.context.get().pipeline_service}.{EXPECTED_NIFI_DETAILS_2.id_}": MOCK_PIPELINE_2, + } + return fqn_pipeline_mapping.get(fqn, self.original_get_by_name(entity, fqn)) + def test_pipeline_name(self): assert ( self.nifi.get_pipeline_name(EXPECTED_NIFI_DETAILS) @@ -221,3 +353,9 @@ class NifiUnitTest(TestCase): def test_pipelines(self): pipline = list(self.nifi.yield_pipeline(EXPECTED_NIFI_DETAILS))[0].right assert pipline == EXPECTED_CREATED_PIPELINES + + def test_pipeline_bulk_lineage_details(self): + pipeline_bulk_lineage_details = list( + self.nifi.yield_pipeline_bulk_lineage_details() + ) + assert pipeline_bulk_lineage_details == EXPECTED_PIPELINE_BULK_LINEAGE_DETAILS diff --git a/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/pipeline/nifi/index.md b/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/pipeline/nifi/index.md index 41d983e08c2..d8ee25040f4 100644 --- a/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/pipeline/nifi/index.md +++ b/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/pipeline/nifi/index.md @@ -7,8 +7,8 @@ slug: /connectors/pipeline/nifi name="NiFi" stage="PROD" platform="OpenMetadata" -availableFeatures=["Pipelines", "Usage"] -unavailableFeatures=["Pipeline Status", "Owners", "Tags", "Lineage"] +availableFeatures=["Pipelines", "Usage", "Lineage"] +unavailableFeatures=["Pipeline Status", "Owners", "Tags"] / %} In this section, we provide guides and references to use the NiFi connector. diff --git a/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/pipeline/nifi/yaml.md b/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/pipeline/nifi/yaml.md index fde90bafb0b..0dd07de6d97 100644 --- a/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/pipeline/nifi/yaml.md +++ b/openmetadata-docs/content/v1.8.x-SNAPSHOT/connectors/pipeline/nifi/yaml.md @@ -7,8 +7,8 @@ slug: /connectors/pipeline/nifi/yaml name="Nifi" stage="PROD" platform="OpenMetadata" -availableFeatures=["Pipelines", "Usage"] -unavailableFeatures=["Pipeline Status", "Owners", "Tags", "Lineage"] +availableFeatures=["Pipelines", "Usage", "Lineage"] +unavailableFeatures=["Pipeline Status", "Owners", "Tags"] / %} In this section, we provide guides and references to use the NiFi connector.