diff --git a/ingestion/src/metadata/ingestion/source/pipeline/nifi.py b/ingestion/src/metadata/ingestion/source/pipeline/nifi.py index 1c264b8b469..bf0c27afb3b 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/nifi.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/nifi.py @@ -11,7 +11,7 @@ """ Nifi source to extract metadata """ - +import traceback from typing import Iterable, List, Optional from pydantic import BaseModel, ValidationError @@ -19,12 +19,7 @@ from pydantic import BaseModel, ValidationError from metadata.clients.nifi_client import NifiClient from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.data.pipeline import ( - PipelineStatus, - StatusType, - Task, - TaskStatus, -) +from metadata.generated.schema.entity.data.pipeline import Task from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -118,25 +113,31 @@ class NifiSource(PipelineServiceSource): def _get_tasks_from_details( self, pipeline_details: NifiPipelineDetails - ) -> List[Task]: + ) -> Optional[List[Task]]: """ Prepare the list of the related Tasks that form the Pipeline """ - - return [ - Task( - name=processor.id_, - displayName=processor.name, - taskUrl=processor.uri.replace(self.service_connection.hostPort, ""), - taskType=processor.type_, - downstreamTasks=self._get_downstream_tasks_from( - source_id=processor.id_, - connections=pipeline_details.connections, - ), + try: + return [ + Task( + name=processor.id_, + displayName=processor.name, + taskUrl=processor.uri.replace(self.service_connection.hostPort, ""), + taskType=processor.type_, + downstreamTasks=self._get_downstream_tasks_from( + source_id=processor.id_, + connections=pipeline_details.connections, + ), + ) + for processor in pipeline_details.processors + ] + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Wild error encountered when trying to get tasks from Pipeline Details {pipeline_details} - {err}." ) - for processor in pipeline_details.processors - ] + return None def yield_pipeline( self, pipeline_details: NifiPipelineDetails @@ -166,6 +167,7 @@ class NifiSource(PipelineServiceSource): Based on the latest refresh data. https://github.com/open-metadata/OpenMetadata/issues/6955 """ + logger.info("Pipeline Status is not yet supported on Nifi") def yield_pipeline_lineage_details( self, pipeline_details: NifiPipelineDetails @@ -176,6 +178,7 @@ class NifiSource(PipelineServiceSource): :return: Lineage request https://github.com/open-metadata/OpenMetadata/issues/6950 """ + logger.info("Lineage is not yet supported on Nifi") @staticmethod def _get_connections_from_process_group( @@ -236,9 +239,15 @@ class NifiSource(PipelineServiceSource): ), ) except (ValueError, KeyError, ValidationError) as err: - logger.warn( + logger.debug(traceback.format_exc()) + logger.warning( f"Cannot create NifiPipelineDetails from {process_group} - {err}" ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Wild error encountered when trying to get pipelines from Process Group {process_group} - {err}." + ) def get_pipeline_name(self, pipeline_details: NifiPipelineDetails) -> str: """