Improve Nifi error handling (#7275)

This commit is contained in:
Pere Miquel Brull 2022-09-07 00:24:12 +02:00 committed by GitHub
parent 30c92a3c9a
commit 659d72841e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -11,7 +11,7 @@
"""
Nifi source to extract metadata
"""
import traceback
from typing import Iterable, List, Optional
from pydantic import BaseModel, ValidationError
@ -19,12 +19,7 @@ from pydantic import BaseModel, ValidationError
from metadata.clients.nifi_client import NifiClient
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import (
PipelineStatus,
StatusType,
Task,
TaskStatus,
)
from metadata.generated.schema.entity.data.pipeline import Task
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -118,25 +113,31 @@ class NifiSource(PipelineServiceSource):
def _get_tasks_from_details(
self, pipeline_details: NifiPipelineDetails
) -> List[Task]:
) -> Optional[List[Task]]:
"""
Prepare the list of the related Tasks
that form the Pipeline
"""
return [
Task(
name=processor.id_,
displayName=processor.name,
taskUrl=processor.uri.replace(self.service_connection.hostPort, ""),
taskType=processor.type_,
downstreamTasks=self._get_downstream_tasks_from(
source_id=processor.id_,
connections=pipeline_details.connections,
),
try:
return [
Task(
name=processor.id_,
displayName=processor.name,
taskUrl=processor.uri.replace(self.service_connection.hostPort, ""),
taskType=processor.type_,
downstreamTasks=self._get_downstream_tasks_from(
source_id=processor.id_,
connections=pipeline_details.connections,
),
)
for processor in pipeline_details.processors
]
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Wild error encountered when trying to get tasks from Pipeline Details {pipeline_details} - {err}."
)
for processor in pipeline_details.processors
]
return None
def yield_pipeline(
self, pipeline_details: NifiPipelineDetails
@ -166,6 +167,7 @@ class NifiSource(PipelineServiceSource):
Based on the latest refresh data.
https://github.com/open-metadata/OpenMetadata/issues/6955
"""
logger.info("Pipeline Status is not yet supported on Nifi")
def yield_pipeline_lineage_details(
self, pipeline_details: NifiPipelineDetails
@ -176,6 +178,7 @@ class NifiSource(PipelineServiceSource):
:return: Lineage request
https://github.com/open-metadata/OpenMetadata/issues/6950
"""
logger.info("Lineage is not yet supported on Nifi")
@staticmethod
def _get_connections_from_process_group(
@ -236,9 +239,15 @@ class NifiSource(PipelineServiceSource):
),
)
except (ValueError, KeyError, ValidationError) as err:
logger.warn(
logger.debug(traceback.format_exc())
logger.warning(
f"Cannot create NifiPipelineDetails from {process_group} - {err}"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Wild error encountered when trying to get pipelines from Process Group {process_group} - {err}."
)
def get_pipeline_name(self, pipeline_details: NifiPipelineDetails) -> str:
"""