From 6cebf37b6cfa05797bcfe0a665b2cb9c1b2294d1 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 26 Aug 2022 21:50:01 +0200 Subject: [PATCH] Fix #6906 - Add Nifi connector (#6956) * Add Nifi connector * Mock token * Format * pass verify param --- .../connections/pipeline/nifiConnection.json | 54 + .../entity/services/pipelineService.json | 8 +- ingestion/setup.py | 1 + .../metadata/clients/connection_clients.py | 6 + ingestion/src/metadata/clients/nifi_client.py | 128 ++ .../ingestion/source/pipeline/nifi.py | 247 ++++ ingestion/src/metadata/utils/connections.py | 30 +- .../datasets/nifi_process_group.json | 1082 +++++++++++++++++ .../resources/datasets/nifi_resources.json | 168 +++ .../tests/unit/topology/pipeline/test_nifi.py | 190 +++ 10 files changed, 1912 insertions(+), 2 deletions(-) create mode 100644 catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/nifiConnection.json create mode 100644 ingestion/src/metadata/clients/nifi_client.py create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/nifi.py create mode 100644 ingestion/tests/unit/resources/datasets/nifi_process_group.json create mode 100644 ingestion/tests/unit/resources/datasets/nifi_resources.json create mode 100644 ingestion/tests/unit/topology/pipeline/test_nifi.py diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/nifiConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/nifiConnection.json new file mode 100644 index 00000000000..41ecabf162b --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/nifiConnection.json @@ -0,0 +1,54 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/nifiConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "NifiConnection", + "description": "Nifi Metadata Pipeline Connection Config", + "type": "object", + "javaType": "org.openmetadata.catalog.services.connections.pipeline.NifiConnection", + "definitions": { + "NifiType": { + "description": "Service type.", + "type": "string", + "enum": ["Nifi"], + "default": "Nifi" + } + }, + "properties": { + "type": { + "title": "Service Type", + "description": "Service Type", + "$ref": "#/definitions/NifiType", + "default": "Nifi" + }, + "hostPort": { + "expose": true, + "title": "Host And Port", + "description": "Pipeline Service Management/UI URI.", + "type": "string", + "format": "uri" + }, + "username": { + "title": "Username", + "description": "Nifi user to authenticate to the API.", + "type": "string" + }, + "password": { + "title": "Password", + "description": "Nifi password to authenticate to the API.", + "type": "string", + "format": "password" + }, + "verifySSL": { + "title": "Verify SSL", + "description": "Boolean marking if we need to verify the SSL certs for Nifi. False by default.", + "type": "boolean", + "default": false + }, + "supportsMetadataExtraction": { + "title": "Supports Metadata Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false, + "required": ["hostPort", "username", "password"] +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json index 67fb3bf2859..d71c4768312 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json @@ -14,7 +14,7 @@ "description": "Type of pipeline service - Airflow or Prefect.", "type": "string", "javaInterfaces": ["org.openmetadata.catalog.EnumInterface"], - "enum": ["Airflow", "Glue", "Airbyte", "Fivetran", "Dagster"], + "enum": ["Airflow", "Glue", "Airbyte", "Fivetran", "Dagster", "Nifi"], "javaEnums": [ { "name": "Airflow" @@ -30,6 +30,9 @@ }, { "name": "Dagster" + }, + { + "name": "Nifi" } ] }, @@ -58,6 +61,9 @@ }, { "$ref": "./connections/pipeline/dagsterConnection.json" + }, + { + "$ref": "./connections/pipeline/nifiConnection.json" } ] } diff --git a/ingestion/setup.py b/ingestion/setup.py index 21363f1c94e..a485bc21bbf 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -130,6 +130,7 @@ plugins: Dict[str, Set[str]] = { "deltalake": {"delta-spark~=2.0.0"}, "great-expectations": {"great-expectations~=0.15.0"}, "pinotdb": {"pinotdb~=0.3.11"}, + "nifi": {}, } dev = { "datamodel-code-generator==0.12.0", diff --git a/ingestion/src/metadata/clients/connection_clients.py b/ingestion/src/metadata/clients/connection_clients.py index dd04befb47e..5f9a5ee99d8 100644 --- a/ingestion/src/metadata/clients/connection_clients.py +++ b/ingestion/src/metadata/clients/connection_clients.py @@ -127,3 +127,9 @@ class FivetranClient: class DagsterClient: def __init__(self, client) -> None: self.client = client + + +@dataclass +class NifiClientWrapper: + def __init__(self, client) -> None: + self.client = client diff --git a/ingestion/src/metadata/clients/nifi_client.py b/ingestion/src/metadata/clients/nifi_client.py new file mode 100644 index 00000000000..28f4f3fd812 --- /dev/null +++ b/ingestion/src/metadata/clients/nifi_client.py @@ -0,0 +1,128 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Client to interact with Nifi apis +""" +import traceback +from typing import Any, Iterable, List + +import requests +from requests import HTTPError + +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +IDENTIFIER = "identifier" +PROCESS_GROUPS_STARTER = "/process-groups/" +RESOURCES = "resources" + + +class NifiClient: + """ + Wrapper on top of Nifi REST API + """ + + def __init__( + self, host_port: str, username: str, password: str, verify: bool = False + ): + self._token = None + self._resources = None + + self.username = username + self.password = password + self.verify = verify + self.api_endpoint = host_port + "/nifi-api" + + self.content_headers = {"Content-Type": "application/x-www-form-urlencoded"} + self.headers = {"Authorization": f"Bearer {self.token}", **self.content_headers} + + @property + def token(self) -> str: + """ + Get the token on the fly if it + has not been initialized yet + """ + if not self._token: + try: + res = requests.post( + f"{self.api_endpoint}/access/token", + verify=self.verify, + headers=self.content_headers, + data=f"username={self.username}&password={self.password}", + ) + self._token = res.text + + except HTTPError as err: + logger.error( + f"Connection error retrieving the Bearer Token to access Nifi - {err}" + ) + raise err + + except ValueError as err: + logger.error(f"Cannot pick up the token from token response - {err}") + raise err + + return self._token + + @property + def resources(self) -> List[dict]: + """ + This can be expensive. Only query it once. + """ + if not self._resources: + self._resources = self.get(RESOURCES) # API endpoint + + # Get the first `resources` key from the dict + return self._resources.get(RESOURCES) # Dict key + + def get(self, path: str) -> Any: + """ + GET call wrapper + """ + try: + res = requests.get( + f"{self.api_endpoint}/{path}", + verify=self.verify, + headers=self.headers, + ) + + return res.json() + + except HTTPError as err: + logger.warning(f"Connection error calling the Nifi API - {err}") + logger.debug(traceback.format_exc()) + + except ValueError as err: + logger.warning(f"Cannot pick up the JSON from API response - {err}") + logger.debug(traceback.format_exc()) + + except Exception as err: + logger.warning(f"Unknown error calling Nifi API - {err}") + logger.debug(traceback.format_exc()) + + def _get_process_group_ids(self) -> List[str]: + return [ + elem.get(IDENTIFIER).replace(PROCESS_GROUPS_STARTER, "") + for elem in self.resources + if elem.get(IDENTIFIER).startswith(PROCESS_GROUPS_STARTER) + ] + + def get_process_group(self, id_: str) -> dict: + return self.get(f"flow/process-groups/{id_}") + + def list_process_groups(self) -> Iterable[dict]: + """ + This will call the API endpoints + one at a time. + """ + for id_ in self._get_process_group_ids(): + yield self.get_process_group(id_=id_) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/nifi.py b/ingestion/src/metadata/ingestion/source/pipeline/nifi.py new file mode 100644 index 00000000000..1c264b8b469 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/nifi.py @@ -0,0 +1,247 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Nifi source to extract metadata +""" + +from typing import Iterable, List, Optional + +from pydantic import BaseModel, ValidationError + +from metadata.clients.nifi_client import NifiClient +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.pipeline import ( + PipelineStatus, + StatusType, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import ( + NifiConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +PROCESS_GROUP_FLOW = "processGroupFlow" +BREADCRUMB = "breadcrumb" + + +class NifiProcessor(BaseModel): + """ + Processor (task) description to be ingested + """ + + id_: str + name: Optional[str] + type_: str + uri: str + + +class NifiProcessorConnections(BaseModel): + """ + Describes connections between components + connections.[components].source|destination + """ + + id_: str + source_id: str + destination_id: str + + +class NifiPipelineDetails(BaseModel): + """ + Defines the necessary Nifi information + """ + + id_: str + name: Optional[str] + uri: str + processors: List[NifiProcessor] + connections: List[NifiProcessorConnections] + + +class NifiSource(PipelineServiceSource): + """ + Implements the necessary methods ot extract + Pipeline metadata from Airflow's metadata db + """ + + def __init__( + self, + config: WorkflowSource, + metadata_config: OpenMetadataConnection, + ): + super().__init__(config, metadata_config) + self.client: NifiClient = self.connection.client + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: NifiConnection = config.serviceConnection.__root__.config + if not isinstance(connection, NifiConnection): + raise InvalidSourceException( + f"Expected NifiConnection, but got {connection}" + ) + return cls(config, metadata_config) + + @staticmethod + def _get_downstream_tasks_from( + source_id: str, connections: List[NifiProcessorConnections] + ) -> List[str]: + """ + Fetch all tasks downstream from the source + """ + return [ + conn.destination_id for conn in connections if conn.source_id == source_id + ] + + def _get_tasks_from_details( + self, pipeline_details: NifiPipelineDetails + ) -> List[Task]: + """ + Prepare the list of the related Tasks + that form the Pipeline + """ + + return [ + Task( + name=processor.id_, + displayName=processor.name, + taskUrl=processor.uri.replace(self.service_connection.hostPort, ""), + taskType=processor.type_, + downstreamTasks=self._get_downstream_tasks_from( + source_id=processor.id_, + connections=pipeline_details.connections, + ), + ) + for processor in pipeline_details.processors + ] + + def yield_pipeline( + self, pipeline_details: NifiPipelineDetails + ) -> Iterable[CreatePipelineRequest]: + """ + Convert a Connection into a Pipeline Entity + :param pipeline_details: pipeline_details object from Nifi + :return: Create Pipeline request with tasks + """ + yield CreatePipelineRequest( + name=pipeline_details.id_, + displayName=pipeline_details.name, + pipelineUrl=pipeline_details.uri.replace( + self.service_connection.hostPort, "" + ), + tasks=self._get_tasks_from_details(pipeline_details), + service=EntityReference( + id=self.context.pipeline_service.id.__root__, type="pipelineService" + ), + ) + + def yield_pipeline_status( + self, pipeline_details: NifiPipelineDetails + ) -> Optional[OMetaPipelineStatus]: + """ + Method to get task & pipeline status. + Based on the latest refresh data. + https://github.com/open-metadata/OpenMetadata/issues/6955 + """ + + def yield_pipeline_lineage_details( + self, pipeline_details: NifiPipelineDetails + ) -> Optional[Iterable[AddLineageRequest]]: + """ + Parse all the stream available in the connection and create a lineage between them + :param pipeline_details: pipeline_details object from Nifi + :return: Lineage request + https://github.com/open-metadata/OpenMetadata/issues/6950 + """ + + @staticmethod + def _get_connections_from_process_group( + process_group: dict, + ) -> List[NifiProcessorConnections]: + """ + Parse the process_group dictionary to pick up the Connections + """ + connections_list = ( + process_group.get(PROCESS_GROUP_FLOW).get("flow").get("connections") + ) + + return [ + NifiProcessorConnections( + id_=connection.get("id"), + source_id=connection["component"]["source"]["id"], + destination_id=connection["component"]["destination"]["id"], + ) + for connection in connections_list + ] + + @staticmethod + def _get_processors_from_process_group(process_group: dict) -> List[NifiProcessor]: + """ + Parse the process_group dictionary to pick up the Processors + """ + processor_list = ( + process_group.get(PROCESS_GROUP_FLOW).get("flow").get("processors") + ) + + return [ + NifiProcessor( + id_=processor.get("id"), + uri=processor.get("uri"), + name=processor["component"].get("name"), + type_=processor["component"].get("type"), + ) + for processor in processor_list + ] + + def get_pipelines_list(self) -> Iterable[NifiPipelineDetails]: + """ + Get List of all pipelines + """ + for process_group in self.client.list_process_groups(): + try: + yield NifiPipelineDetails( + id_=process_group[PROCESS_GROUP_FLOW].get("id"), + name=process_group[PROCESS_GROUP_FLOW][BREADCRUMB][BREADCRUMB].get( + "name" + ), + uri=process_group[PROCESS_GROUP_FLOW].get("uri"), + processors=self._get_processors_from_process_group( + process_group=process_group + ), + connections=self._get_connections_from_process_group( + process_group=process_group + ), + ) + except (ValueError, KeyError, ValidationError) as err: + logger.warn( + f"Cannot create NifiPipelineDetails from {process_group} - {err}" + ) + + def get_pipeline_name(self, pipeline_details: NifiPipelineDetails) -> str: + """ + Get Pipeline Name + """ + return pipeline_details.name diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index e78f30ebef7..98b5ff927d6 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -42,12 +42,14 @@ from metadata.clients.connection_clients import ( MetabaseClient, MlflowClientWrapper, ModeClient, + NifiClientWrapper, PowerBiClient, RedashClient, SalesforceClient, SupersetClient, TableauClient, ) +from metadata.clients.nifi_client import NifiClient from metadata.generated.schema.entity.services.connections.connectionBasicType import ( ConnectionArguments, ) @@ -122,6 +124,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.fivetranConn from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import ( GlueConnection as GluePipelineConnection, ) +from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import ( + NifiConnection, +) from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn from metadata.utils.credentials import set_google_credentials from metadata.utils.source_connections import get_connection_args, get_connection_url @@ -287,7 +292,7 @@ def _(connection: DeltaLakeConnection, verbose: bool = False) -> DeltaLakeClient from delta import configure_spark_with_delta_pip builder = ( - pyspark.sql.SparkSession.builder.appName(connection.appName) + pyspark.sql.SparkSession.builder.appName("random") .enableHiveSupport() .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config( @@ -843,6 +848,29 @@ def _(connection: MlflowClientWrapper) -> None: ) +@get_connection.register +def _(connection: NifiConnection, verbose: bool = False): + + return NifiClientWrapper( + NifiClient( + host_port=connection.hostPort, + username=connection.username, + password=connection.password.get_secret_value(), + verify=connection.verifySSL, + ) + ) + + +@test_connection.register +def _(connection: NifiClientWrapper) -> None: + try: + connection.client.resources + except Exception as err: + raise SourceConnectionException( + f"Unknown error connecting with {connection} - {err}." + ) + + @get_connection.register def _(_: BackendConnection, verbose: bool = False): """ diff --git a/ingestion/tests/unit/resources/datasets/nifi_process_group.json b/ingestion/tests/unit/resources/datasets/nifi_process_group.json new file mode 100644 index 00000000000..d692c7291d1 --- /dev/null +++ b/ingestion/tests/unit/resources/datasets/nifi_process_group.json @@ -0,0 +1,1082 @@ +{ + "permissions": { + "canRead": true, + "canWrite": true + }, + "processGroupFlow": { + "id": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "uri": "https://localhost:8443/nifi-api/flow/process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310", + "breadcrumb": { + "id": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "permissions": { + "canRead": true, + "canWrite": true + }, + "breadcrumb": { + "id": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "NiFi Flow" + } + }, + "flow": { + "processGroups": [], + "remoteProcessGroups": [], + "processors": [ + { + "revision": { + "version": 0 + }, + "id": "d3f023ac-0182-1000-8bbe-e2b00347fff8", + "uri": "https://localhost:8443/nifi-api/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8", + "position": { + "x": 728, + "y": 672 + }, + "permissions": { + "canRead": true, + "canWrite": true + }, + "bulletins": [], + "component": { + "id": "d3f023ac-0182-1000-8bbe-e2b00347fff8", + "versionedComponentId": "a3a99f87-7c9d-3969-b27f-b9c986fc5a37", + "parentGroupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "position": { + "x": 728, + "y": 672 + }, + "name": "FetchFile", + "type": "org.apache.nifi.processors.standard.FetchFile", + "bundle": { + "group": "org.apache.nifi", + "artifact": "nifi-standard-nar", + "version": "1.17.0" + }, + "state": "STOPPED", + "style": {}, + "relationships": [ + { + "name": "failure", + "description": "Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.", + "autoTerminate": false, + "retry": false + }, + { + "name": "not.found", + "description": "Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.", + "autoTerminate": false, + "retry": false + }, + { + "name": "permission.denied", + "description": "Any FlowFile that could not be fetched from the file system due to the user running NiFi not having sufficient permissions will be transferred to this Relationship.", + "autoTerminate": false, + "retry": false + }, + { + "name": "success", + "description": "Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.", + "autoTerminate": false, + "retry": false + } + ], + "supportsParallelProcessing": true, + "supportsEventDriven": false, + "supportsBatching": false, + "supportsSensitiveDynamicProperties": false, + "persistsState": false, + "restricted": true, + "deprecated": false, + "executionNodeRestricted": false, + "multipleVersionsAvailable": false, + "inputRequirement": "INPUT_REQUIRED", + "config": { + "properties": { + "File to Fetch": "${absolute.path}/${filename}", + "Completion Strategy": "None", + "Move Destination Directory": null, + "Move Conflict Strategy": "Rename", + "Log level when file not found": "ERROR", + "Log level when permission denied": "ERROR" + }, + "descriptors": { + "File to Fetch": { + "name": "File to Fetch", + "displayName": "File to Fetch", + "description": "The fully-qualified filename of the file to fetch from the file system", + "defaultValue": "${absolute.path}/${filename}", + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": true, + "expressionLanguageScope": "Variable Registry and FlowFile Attributes", + "dependencies": [] + }, + "Completion Strategy": { + "name": "Completion Strategy", + "displayName": "Completion Strategy", + "description": "Specifies what to do with the original file on the file system once it has been pulled into NiFi", + "defaultValue": "None", + "allowableValues": [ + { + "allowableValue": { + "displayName": "None", + "value": "None", + "description": "Leave the file as-is" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "Move File", + "value": "Move File", + "description": "Moves the file to the directory specified by the property" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "Delete File", + "value": "Delete File", + "description": "Deletes the original file from the file system" + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Move Destination Directory": { + "name": "Move Destination Directory", + "displayName": "Move Destination Directory", + "description": "The directory to the move the original file to once it has been fetched from the file system. This property is ignored unless the Completion Strategy is set to \"Move File\". If the directory does not exist, it will be created.", + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": true, + "expressionLanguageScope": "Variable Registry and FlowFile Attributes", + "dependencies": [] + }, + "Move Conflict Strategy": { + "name": "Move Conflict Strategy", + "displayName": "Move Conflict Strategy", + "description": "If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, this property specifies how that naming conflict should be resolved", + "defaultValue": "Rename", + "allowableValues": [ + { + "allowableValue": { + "displayName": "Rename", + "value": "Rename", + "description": "The existing destination file should remain intact. The newly ingested file should be moved to the destination directory but be renamed to a random filename" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "Replace File", + "value": "Replace File", + "description": "The newly ingested file should replace the existing file in the Destination Directory" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "Keep Existing", + "value": "Keep Existing", + "description": "The existing file should in the Destination Directory should stay intact and the newly ingested file should be deleted" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "Fail", + "value": "Fail", + "description": "The existing destination file should remain intact and the incoming FlowFile should be routed to failure" + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Log level when file not found": { + "name": "Log level when file not found", + "displayName": "Log level when file not found", + "description": "Log level to use in case the file does not exist when the processor is triggered", + "defaultValue": "ERROR", + "allowableValues": [ + { + "allowableValue": { + "displayName": "TRACE", + "value": "TRACE" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "DEBUG", + "value": "DEBUG" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "INFO", + "value": "INFO" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "WARN", + "value": "WARN" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "ERROR", + "value": "ERROR" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "FATAL", + "value": "FATAL" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "NONE", + "value": "NONE" + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Log level when permission denied": { + "name": "Log level when permission denied", + "displayName": "Log level when permission denied", + "description": "Log level to use in case user pmbrull does not have sufficient permissions to read the file", + "defaultValue": "ERROR", + "allowableValues": [ + { + "allowableValue": { + "displayName": "TRACE", + "value": "TRACE" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "DEBUG", + "value": "DEBUG" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "INFO", + "value": "INFO" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "WARN", + "value": "WARN" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "ERROR", + "value": "ERROR" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "FATAL", + "value": "FATAL" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "NONE", + "value": "NONE" + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + } + }, + "schedulingPeriod": "0 sec", + "schedulingStrategy": "TIMER_DRIVEN", + "executionNode": "ALL", + "penaltyDuration": "30 sec", + "yieldDuration": "1 sec", + "bulletinLevel": "WARN", + "runDurationMillis": 0, + "concurrentlySchedulableTaskCount": 1, + "comments": "", + "lossTolerant": false, + "defaultConcurrentTasks": { + "TIMER_DRIVEN": "1", + "EVENT_DRIVEN": "0", + "CRON_DRIVEN": "1" + }, + "defaultSchedulingPeriod": { + "TIMER_DRIVEN": "0 sec", + "CRON_DRIVEN": "* * * * * ?" + }, + "retryCount": 10, + "retriedRelationships": [], + "backoffMechanism": "PENALIZE_FLOWFILE", + "maxBackoffPeriod": "10 mins" + }, + "validationErrors": [ + "'Relationship success' is invalid because Relationship 'success' is not connected to any component and is not auto-terminated", + "'Relationship not.found' is invalid because Relationship 'not.found' is not connected to any component and is not auto-terminated", + "'Relationship permission.denied' is invalid because Relationship 'permission.denied' is not connected to any component and is not auto-terminated", + "'Relationship failure' is invalid because Relationship 'failure' is not connected to any component and is not auto-terminated" + ], + "validationStatus": "INVALID", + "extensionMissing": false + }, + "inputRequirement": "INPUT_REQUIRED", + "status": { + "groupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "id": "d3f023ac-0182-1000-8bbe-e2b00347fff8", + "name": "FetchFile", + "runStatus": "Invalid", + "statsLastRefreshed": "18:21:10 CEST", + "aggregateSnapshot": { + "id": "d3f023ac-0182-1000-8bbe-e2b00347fff8", + "groupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "FetchFile", + "type": "FetchFile", + "runStatus": "Invalid", + "executionNode": "ALL", + "bytesRead": 0, + "bytesWritten": 0, + "read": "0 bytes", + "written": "0 bytes", + "flowFilesIn": 0, + "bytesIn": 0, + "input": "0 (0 bytes)", + "flowFilesOut": 0, + "bytesOut": 0, + "output": "0 (0 bytes)", + "taskCount": 0, + "tasksDurationNanos": 0, + "tasks": "0", + "tasksDuration": "00:00:00.000", + "activeThreadCount": 0, + "terminatedThreadCount": 0 + } + }, + "operatePermissions": { + "canRead": true, + "canWrite": true + } + }, + { + "revision": { + "clientId": "d59f9e9b-0182-1000-aa02-4b6360e52482", + "version": 1 + }, + "id": "d3f1304d-0182-1000-f0f5-9a6927976941", + "uri": "https://localhost:8443/nifi-api/processors/d3f1304d-0182-1000-f0f5-9a6927976941", + "position": { + "x": 728, + "y": 312 + }, + "permissions": { + "canRead": true, + "canWrite": true + }, + "bulletins": [], + "component": { + "id": "d3f1304d-0182-1000-f0f5-9a6927976941", + "versionedComponentId": "0c069383-5171-30e7-a96f-1d642688b79e", + "parentGroupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "position": { + "x": 728, + "y": 312 + }, + "name": "ListFile", + "type": "org.apache.nifi.processors.standard.ListFile", + "bundle": { + "group": "org.apache.nifi", + "artifact": "nifi-standard-nar", + "version": "1.17.0" + }, + "state": "STOPPED", + "style": {}, + "relationships": [ + { + "name": "success", + "description": "All FlowFiles that are received are routed to success", + "autoTerminate": false, + "retry": false + } + ], + "supportsParallelProcessing": false, + "supportsEventDriven": false, + "supportsBatching": false, + "supportsSensitiveDynamicProperties": false, + "persistsState": true, + "restricted": false, + "deprecated": false, + "executionNodeRestricted": false, + "multipleVersionsAvailable": false, + "inputRequirement": "INPUT_FORBIDDEN", + "config": { + "properties": { + "Input Directory": "/Users/pmbrull/dev/hive/employee", + "listing-strategy": "timestamps", + "Recurse Subdirectories": "true", + "record-writer": null, + "Input Directory Location": "Local", + "File Filter": "[^\\.].*", + "Path Filter": null, + "Include File Attributes": "true", + "Minimum File Age": "0 sec", + "Maximum File Age": null, + "Minimum File Size": "0 B", + "Maximum File Size": null, + "Ignore Hidden Files": "true", + "target-system-timestamp-precision": "auto-detect", + "et-state-cache": null, + "et-time-window": "3 hours", + "et-initial-listing-target": "all", + "et-node-identifier": "${hostname()}", + "track-performance": "false", + "max-performance-metrics": "100000", + "max-operation-time": "10 secs", + "max-listing-time": "3 mins" + }, + "descriptors": { + "Input Directory": { + "name": "Input Directory", + "displayName": "Input Directory", + "description": "The input directory from which files to pull files", + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": true, + "expressionLanguageScope": "Variable Registry Only", + "dependencies": [] + }, + "listing-strategy": { + "name": "listing-strategy", + "displayName": "Listing Strategy", + "description": "Specify how to determine new/updated entities. See each strategy descriptions for detail.", + "defaultValue": "timestamps", + "allowableValues": [ + { + "allowableValue": { + "displayName": "Tracking Timestamps", + "value": "timestamps", + "description": "This strategy tracks the latest timestamp of listed entity to determine new/updated entities. Since it only tracks few timestamps, it can manage listing state efficiently. However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy. For example, such situation can happen in a file system if a file with old timestamp is copied or moved into the target directory without its last modified timestamp being updated. Also may miss files when multiple subdirectories are being written at the same time while listing is running." + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "Tracking Entities", + "value": "entities", + "description": "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities. This strategy can pick entities having old timestamp that can be missed with 'Tracking Timestamps'. Works even when multiple subdirectories are being written at the same time while listing is running. However additional DistributedMapCache controller service is required and more JVM heap memory is used. See the description of 'Entity Tracking Time Window' property for further details on how it works." + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "No Tracking", + "value": "none", + "description": "This strategy lists an entity without any tracking. The same entity will be listed each time on executing this processor. It is recommended to change the default run schedule value. Any property that related to the persisting state will be disregarded." + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Recurse Subdirectories": { + "name": "Recurse Subdirectories", + "displayName": "Recurse Subdirectories", + "description": "Indicates whether to list files from subdirectories of the directory", + "defaultValue": "true", + "allowableValues": [ + { + "allowableValue": { + "displayName": "true", + "value": "true" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "false", + "value": "false" + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "record-writer": { + "name": "record-writer", + "displayName": "Record Writer", + "description": "Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.", + "allowableValues": [], + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "identifiesControllerService": "org.apache.nifi.serialization.RecordSetWriterFactory", + "identifiesControllerServiceBundle": { + "group": "org.apache.nifi", + "artifact": "nifi-standard-services-api-nar", + "version": "1.17.0" + }, + "dependencies": [] + }, + "Input Directory Location": { + "name": "Input Directory Location", + "displayName": "Input Directory Location", + "description": "Specifies where the Input Directory is located. This is used to determine whether state should be stored locally or across the cluster.", + "defaultValue": "Local", + "allowableValues": [ + { + "allowableValue": { + "displayName": "Local", + "value": "Local", + "description": "Input Directory is located on a local disk. State will be stored locally on each node in the cluster." + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "Remote", + "value": "Remote", + "description": "Input Directory is located on a remote system. State will be stored across the cluster so that the listing can be performed on Primary Node Only and another node can pick up where the last node left off, if the Primary Node changes" + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "File Filter": { + "name": "File Filter", + "displayName": "File Filter", + "description": "Only files whose names match the given regular expression will be picked up", + "defaultValue": "[^\\.].*", + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Path Filter": { + "name": "Path Filter", + "displayName": "Path Filter", + "description": "When Recurse Subdirectories is true, then only subdirectories whose path matches the given regular expression will be scanned", + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Include File Attributes": { + "name": "Include File Attributes", + "displayName": "Include File Attributes", + "description": "Whether or not to include information such as the file's Last Modified Time and Owner as FlowFile Attributes. Depending on the File System being used, gathering this information can be expensive and as a result should be disabled. This is especially true of remote file shares.", + "defaultValue": "true", + "allowableValues": [ + { + "allowableValue": { + "displayName": "true", + "value": "true" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "false", + "value": "false" + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Minimum File Age": { + "name": "Minimum File Age", + "displayName": "Minimum File Age", + "description": "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored", + "defaultValue": "0 sec", + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Maximum File Age": { + "name": "Maximum File Age", + "displayName": "Maximum File Age", + "description": "The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored", + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Minimum File Size": { + "name": "Minimum File Size", + "displayName": "Minimum File Size", + "description": "The minimum size that a file must be in order to be pulled", + "defaultValue": "0 B", + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Maximum File Size": { + "name": "Maximum File Size", + "displayName": "Maximum File Size", + "description": "The maximum size that a file can be in order to be pulled", + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "Ignore Hidden Files": { + "name": "Ignore Hidden Files", + "displayName": "Ignore Hidden Files", + "description": "Indicates whether or not hidden files should be ignored", + "defaultValue": "true", + "allowableValues": [ + { + "allowableValue": { + "displayName": "true", + "value": "true" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "false", + "value": "false" + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "target-system-timestamp-precision": { + "name": "target-system-timestamp-precision", + "displayName": "Target System Timestamp Precision", + "description": "Specify timestamp precision at the target system. Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.", + "defaultValue": "auto-detect", + "allowableValues": [ + { + "allowableValue": { + "displayName": "Auto Detect", + "value": "auto-detect", + "description": "Automatically detect time unit deterministically based on candidate entries timestamp. Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp. E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'." + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "Milliseconds", + "value": "millis", + "description": "This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options." + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "Seconds", + "value": "seconds", + "description": "For a target system that does not have millis precision, but has in seconds." + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "Minutes", + "value": "minutes", + "description": "For a target system that only supports precision in minutes." + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "et-state-cache": { + "name": "et-state-cache", + "displayName": "Entity Tracking State Cache", + "description": "Listed entities are stored in the specified cache storage so that this processor can resume listing across NiFi restart or in case of primary node change. 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'. To support large number of entities, the strategy uses DistributedMapCache instead of managed state. Cache key format is 'ListedEntities::{processorId}(::{nodeId})'. If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately. E.g. cluster wide cache key = 'ListedEntities::8dda2321-0164-1000-50fa-3042fe7d6a7b', per node cache key = 'ListedEntities::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3' The stored cache content is Gzipped JSON string. The cache key will be deleted when target listing configuration is changed. Used by 'Tracking Entities' strategy.", + "allowableValues": [], + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "identifiesControllerService": "org.apache.nifi.distributed.cache.client.DistributedMapCacheClient", + "identifiesControllerServiceBundle": { + "group": "org.apache.nifi", + "artifact": "nifi-standard-services-api-nar", + "version": "1.17.0" + }, + "dependencies": [] + }, + "et-time-window": { + "name": "et-time-window", + "displayName": "Entity Tracking Time Window", + "description": "Specify how long this processor should track already-listed entities. 'Tracking Entities' strategy can pick any entity whose timestamp is inside the specified time window. For example, if set to '30 minutes', any entity having timestamp in recent 30 minutes will be the listing target when this processor runs. A listed entity is considered 'new/updated' and a FlowFile is emitted if one of following condition meets: 1. does not exist in the already-listed entities, 2. has newer timestamp than the cached entity, 3. has different size than the cached entity. If a cached entity's timestamp becomes older than specified time window, that entity will be removed from the cached already-listed entities. Used by 'Tracking Entities' strategy.", + "defaultValue": "3 hours", + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": true, + "expressionLanguageScope": "Variable Registry Only", + "dependencies": [] + }, + "et-initial-listing-target": { + "name": "et-initial-listing-target", + "displayName": "Entity Tracking Initial Listing Target", + "description": "Specify how initial listing should be handled. Used by 'Tracking Entities' strategy.", + "defaultValue": "all", + "allowableValues": [ + { + "allowableValue": { + "displayName": "Tracking Time Window", + "value": "window", + "description": "Ignore entities having timestamp older than the specified 'Tracking Time Window' at the initial listing activity." + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "All Available", + "value": "all", + "description": "Regardless of entities timestamp, all existing entities will be listed at the initial listing activity." + }, + "canRead": true + } + ], + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "et-node-identifier": { + "name": "et-node-identifier", + "displayName": "Entity Tracking Node Identifier", + "description": "The configured value will be appended to the cache key so that listing state can be tracked per NiFi node rather than cluster wide when tracking state is scoped to LOCAL. Used by 'Tracking Entities' strategy.", + "defaultValue": "${hostname()}", + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": true, + "expressionLanguageScope": "Variable Registry Only", + "dependencies": [] + }, + "track-performance": { + "name": "track-performance", + "displayName": "Track Performance", + "description": "Whether or not the Processor should track the performance of disk access operations. If true, all accesses to disk will be recorded, including the file being accessed, the information being obtained, and how long it takes. This is then logged periodically at a DEBUG level. While the amount of data will be capped, this option may still consume a significant amount of heap (controlled by the 'Maximum Number of Files to Track' property), but it can be very useful for troubleshooting purposes if performance is poor is degraded.", + "defaultValue": "false", + "allowableValues": [ + { + "allowableValue": { + "displayName": "true", + "value": "true" + }, + "canRead": true + }, + { + "allowableValue": { + "displayName": "false", + "value": "false" + }, + "canRead": true + } + ], + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": false, + "expressionLanguageScope": "Not Supported", + "dependencies": [] + }, + "max-performance-metrics": { + "name": "max-performance-metrics", + "displayName": "Maximum Number of Files to Track", + "description": "If the 'Track Performance' property is set to 'true', this property indicates the maximum number of files whose performance metrics should be held onto. A smaller value for this property will result in less heap utilization, while a larger value may provide more accurate insights into how the disk access operations are performing", + "defaultValue": "100000", + "required": true, + "sensitive": false, + "dynamic": false, + "supportsEl": true, + "expressionLanguageScope": "Variable Registry Only", + "dependencies": [] + }, + "max-operation-time": { + "name": "max-operation-time", + "displayName": "Max Disk Operation Time", + "description": "The maximum amount of time that any single disk operation is expected to take. If any disk operation takes longer than this amount of time, a warning bulletin will be generated for each operation that exceeds this amount of time.", + "defaultValue": "10 secs", + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": true, + "expressionLanguageScope": "Variable Registry Only", + "dependencies": [] + }, + "max-listing-time": { + "name": "max-listing-time", + "displayName": "Max Directory Listing Time", + "description": "The maximum amount of time that listing any single directory is expected to take. If the listing for the directory specified by the 'Input Directory' property, or the listing of any subdirectory (if 'Recurse' is set to true) takes longer than this amount of time, a warning bulletin will be generated for each directory listing that exceeds this amount of time.", + "defaultValue": "3 mins", + "required": false, + "sensitive": false, + "dynamic": false, + "supportsEl": true, + "expressionLanguageScope": "Variable Registry Only", + "dependencies": [] + } + }, + "schedulingPeriod": "60 sec", + "schedulingStrategy": "TIMER_DRIVEN", + "executionNode": "ALL", + "penaltyDuration": "30 sec", + "yieldDuration": "1 sec", + "bulletinLevel": "WARN", + "runDurationMillis": 0, + "concurrentlySchedulableTaskCount": 1, + "comments": "", + "lossTolerant": false, + "defaultConcurrentTasks": { + "TIMER_DRIVEN": "1", + "EVENT_DRIVEN": "0", + "CRON_DRIVEN": "1" + }, + "defaultSchedulingPeriod": { + "TIMER_DRIVEN": "0 sec", + "CRON_DRIVEN": "* * * * * ?" + }, + "retryCount": 10, + "retriedRelationships": [], + "backoffMechanism": "PENALIZE_FLOWFILE", + "maxBackoffPeriod": "10 mins" + }, + "validationStatus": "VALID", + "extensionMissing": false + }, + "inputRequirement": "INPUT_FORBIDDEN", + "status": { + "groupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "id": "d3f1304d-0182-1000-f0f5-9a6927976941", + "name": "ListFile", + "runStatus": "Stopped", + "statsLastRefreshed": "18:21:10 CEST", + "aggregateSnapshot": { + "id": "d3f1304d-0182-1000-f0f5-9a6927976941", + "groupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "ListFile", + "type": "ListFile", + "runStatus": "Stopped", + "executionNode": "ALL", + "bytesRead": 0, + "bytesWritten": 0, + "read": "0 bytes", + "written": "0 bytes", + "flowFilesIn": 0, + "bytesIn": 0, + "input": "0 (0 bytes)", + "flowFilesOut": 0, + "bytesOut": 0, + "output": "0 (0 bytes)", + "taskCount": 0, + "tasksDurationNanos": 0, + "tasks": "0", + "tasksDuration": "00:00:00.000", + "activeThreadCount": 0, + "terminatedThreadCount": 0 + } + }, + "operatePermissions": { + "canRead": true, + "canWrite": true + } + } + ], + "inputPorts": [], + "outputPorts": [], + "connections": [ + { + "revision": { + "version": 0 + }, + "id": "d3f17ef8-0182-1000-61da-c996721cf425", + "uri": "https://localhost:8443/nifi-api/connections/d3f17ef8-0182-1000-61da-c996721cf425", + "permissions": { + "canRead": true, + "canWrite": true + }, + "component": { + "id": "d3f17ef8-0182-1000-61da-c996721cf425", + "versionedComponentId": "06668b4e-6d79-3bcb-b698-d5d5c90a9414", + "parentGroupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "source": { + "id": "d3f1304d-0182-1000-f0f5-9a6927976941", + "versionedComponentId": "0c069383-5171-30e7-a96f-1d642688b79e", + "type": "PROCESSOR", + "groupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "ListFile", + "running": false, + "comments": "" + }, + "destination": { + "id": "d3f023ac-0182-1000-8bbe-e2b00347fff8", + "versionedComponentId": "a3a99f87-7c9d-3969-b27f-b9c986fc5a37", + "type": "PROCESSOR", + "groupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "FetchFile", + "running": false, + "comments": "" + }, + "name": "", + "labelIndex": 1, + "zIndex": 0, + "selectedRelationships": [ + "success" + ], + "availableRelationships": [ + "success" + ], + "backPressureObjectThreshold": 10000, + "backPressureDataSizeThreshold": "1 GB", + "flowFileExpiration": "0 sec", + "prioritizers": [], + "bends": [], + "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE", + "loadBalancePartitionAttribute": "", + "loadBalanceCompression": "DO_NOT_COMPRESS", + "loadBalanceStatus": "LOAD_BALANCE_NOT_CONFIGURED" + }, + "status": { + "id": "d3f17ef8-0182-1000-61da-c996721cf425", + "groupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "success", + "statsLastRefreshed": "18:21:10 CEST", + "sourceId": "d3f1304d-0182-1000-f0f5-9a6927976941", + "sourceName": "ListFile", + "destinationId": "d3f023ac-0182-1000-8bbe-e2b00347fff8", + "destinationName": "FetchFile", + "aggregateSnapshot": { + "id": "d3f17ef8-0182-1000-61da-c996721cf425", + "groupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "success", + "sourceName": "ListFile", + "destinationName": "FetchFile", + "flowFilesIn": 0, + "bytesIn": 0, + "input": "0 (0 bytes)", + "flowFilesOut": 0, + "bytesOut": 0, + "output": "0 (0 bytes)", + "flowFilesQueued": 2, + "bytesQueued": 0, + "queued": "2 (0 bytes)", + "queuedSize": "0 bytes", + "queuedCount": "2", + "percentUseCount": 0, + "percentUseBytes": 0, + "flowFileAvailability": "FLOWFILE_AVAILABLE" + } + }, + "bends": [], + "labelIndex": 1, + "zIndex": 0, + "sourceId": "d3f1304d-0182-1000-f0f5-9a6927976941", + "sourceGroupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "sourceType": "PROCESSOR", + "destinationId": "d3f023ac-0182-1000-8bbe-e2b00347fff8", + "destinationGroupId": "d3d6b945-0182-1000-d7e4-d81b8f79f310", + "destinationType": "PROCESSOR" + } + ], + "labels": [], + "funnels": [] + }, + "lastRefreshed": "18:21:10 CEST" + } +} \ No newline at end of file diff --git a/ingestion/tests/unit/resources/datasets/nifi_resources.json b/ingestion/tests/unit/resources/datasets/nifi_resources.json new file mode 100644 index 00000000000..ee77a20cdc6 --- /dev/null +++ b/ingestion/tests/unit/resources/datasets/nifi_resources.json @@ -0,0 +1,168 @@ +{ + "resources": [ + { + "identifier": "/flow", + "name": "NiFi Flow" + }, + { + "identifier": "/system", + "name": "System" + }, + { + "identifier": "/controller", + "name": "Controller" + }, + { + "identifier": "/counters", + "name": "Counters" + }, + { + "identifier": "/provenance", + "name": "Provenance" + }, + { + "identifier": "/policies", + "name": "Access Policies" + }, + { + "identifier": "/tenants", + "name": "Tenant" + }, + { + "identifier": "/proxy", + "name": "Proxy User Requests" + }, + { + "identifier": "/resources", + "name": "NiFi Resources" + }, + { + "identifier": "/site-to-site", + "name": "Site to Site" + }, + { + "identifier": "/parameter-contexts", + "name": "Parameter Contexts" + }, + { + "identifier": "/restricted-components", + "name": "Restricted Components" + }, + { + "identifier": "/restricted-components/read-filesystem", + "name": "read filesystem" + }, + { + "identifier": "/restricted-components/write-filesystem", + "name": "write filesystem" + }, + { + "identifier": "/restricted-components/read-distributed-filesystem", + "name": "read distributed filesystem" + }, + { + "identifier": "/restricted-components/write-distributed-filesystem", + "name": "write distributed filesystem" + }, + { + "identifier": "/restricted-components/execute-code", + "name": "execute code" + }, + { + "identifier": "/restricted-components/access-keytab", + "name": "access keytab" + }, + { + "identifier": "/restricted-components/access-ticket-cache", + "name": "access ticket cache" + }, + { + "identifier": "/restricted-components/access-environment-credentials", + "name": "access environment credentials" + }, + { + "identifier": "/restricted-components/export-nifi-details", + "name": "export nifi details" + }, + { + "identifier": "/process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "NiFi Flow" + }, + { + "identifier": "/data/process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "Data for NiFi Flow" + }, + { + "identifier": "/provenance-data/process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "Provenance data for NiFi Flow" + }, + { + "identifier": "/policies/process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310", + "name": "Policies for NiFi Flow" + }, + { + "identifier": "/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8", + "name": "FetchFile" + }, + { + "identifier": "/data/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8", + "name": "Data for FetchFile" + }, + { + "identifier": "/provenance-data/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8", + "name": "Provenance data for FetchFile" + }, + { + "identifier": "/policies/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8", + "name": "Policies for FetchFile" + }, + { + "identifier": "/operation/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8", + "name": "Operations forFetchFile" + }, + { + "identifier": "/processors/d3f1304d-0182-1000-f0f5-9a6927976941", + "name": "ListFile" + }, + { + "identifier": "/data/processors/d3f1304d-0182-1000-f0f5-9a6927976941", + "name": "Data for ListFile" + }, + { + "identifier": "/provenance-data/processors/d3f1304d-0182-1000-f0f5-9a6927976941", + "name": "Provenance data for ListFile" + }, + { + "identifier": "/policies/processors/d3f1304d-0182-1000-f0f5-9a6927976941", + "name": "Policies for ListFile" + }, + { + "identifier": "/operation/processors/d3f1304d-0182-1000-f0f5-9a6927976941", + "name": "Operations forListFile" + }, + { + "identifier": "/controller-services/d3ec0f6b-0182-1000-b643-bec7e3d326d2", + "name": "DBCPConnectionPool" + }, + { + "identifier": "/policies/controller-services/d3ec0f6b-0182-1000-b643-bec7e3d326d2", + "name": "Policies for DBCPConnectionPool" + }, + { + "identifier": "/operation/controller-services/d3ec0f6b-0182-1000-b643-bec7e3d326d2", + "name": "Operations forDBCPConnectionPool" + }, + { + "identifier": "/controller-services/d3ec0f6b-0182-1000-b643-bec7e3d326d2", + "name": "DBCPConnectionPool" + }, + { + "identifier": "/policies/controller-services/d3ec0f6b-0182-1000-b643-bec7e3d326d2", + "name": "Policies for DBCPConnectionPool" + }, + { + "identifier": "/operation/controller-services/d3ec0f6b-0182-1000-b643-bec7e3d326d2", + "name": "Operations forDBCPConnectionPool" + } + ] +} \ No newline at end of file diff --git a/ingestion/tests/unit/topology/pipeline/test_nifi.py b/ingestion/tests/unit/topology/pipeline/test_nifi.py new file mode 100644 index 00000000000..74319547391 --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_nifi.py @@ -0,0 +1,190 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test nifi using the topology +""" +import json +from pathlib import Path +from unittest import TestCase +from unittest.mock import PropertyMock, patch + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.entity.data.pipeline import Pipeline, Task +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.source.pipeline.nifi import ( + NifiPipelineDetails, + NifiProcessor, + NifiProcessorConnections, + NifiSource, +) + +mock_file_path = ( + Path(__file__).parent.parent.parent / "resources/datasets/nifi_process_group.json" +) +with open(mock_file_path) as file: + mock_data: dict = json.load(file) + +resources_mock_file_path = ( + Path(__file__).parent.parent.parent / "resources/datasets/nifi_resources.json" +) +with open(mock_file_path) as file: + resources_mock_data: dict = json.load(file) + +mock_nifi_config = { + "source": { + "type": "nifi", + "serviceName": "nifi_source", + "serviceConnection": { + "config": { + "type": "Nifi", + "hostPort": "https://localhost:8443", + "username": "username", + "password": "password", + "verifySSL": False, + } + }, + "sourceConfig": {"config": {"type": "PipelineMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth", + } + }, +} + + +EXPECTED_NIFI_DETAILS = NifiPipelineDetails( + id_="d3d6b945-0182-1000-d7e4-d81b8f79f310", + name="NiFi Flow", + uri="/nifi-api/flow/process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310", + processors=[ + NifiProcessor( + id_="d3f023ac-0182-1000-8bbe-e2b00347fff8", + name="FetchFile", + type_="org.apache.nifi.processors.standard.FetchFile", + uri="/nifi-api/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8", + ), + NifiProcessor( + id_="d3f1304d-0182-1000-f0f5-9a6927976941", + name="ListFile", + type_="org.apache.nifi.processors.standard.ListFile", + uri="/nifi-api/processors/d3f1304d-0182-1000-f0f5-9a6927976941", + ), + ], + connections=[ + NifiProcessorConnections( + id_="d3f17ef8-0182-1000-61da-c996721cf425", + source_id="d3f1304d-0182-1000-f0f5-9a6927976941", + destination_id="d3f023ac-0182-1000-8bbe-e2b00347fff8", + ) + ], +) + + +EXPECTED_CREATED_PIPELINES = CreatePipelineRequest( + name="d3d6b945-0182-1000-d7e4-d81b8f79f310", + displayName="NiFi Flow", + pipelineUrl="/nifi-api/flow/process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310", + tasks=[ + Task( + name="d3f023ac-0182-1000-8bbe-e2b00347fff8", + displayName="FetchFile", + taskUrl="/nifi-api/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8", + taskType="org.apache.nifi.processors.standard.FetchFile", + downstreamTasks=[], + ), + Task( + name="d3f1304d-0182-1000-f0f5-9a6927976941", + displayName="ListFile", + taskUrl="/nifi-api/processors/d3f1304d-0182-1000-f0f5-9a6927976941", + taskType="org.apache.nifi.processors.standard.ListFile", + downstreamTasks=["d3f023ac-0182-1000-8bbe-e2b00347fff8"], + ), + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + +MOCK_PIPELINE_SERVICE = PipelineService( + id="85811038-099a-11ed-861d-0242ac120002", + name="nifi_source", + connection=PipelineConnection(), + serviceType=PipelineServiceType.Nifi, +) + +MOCK_PIPELINE = Pipeline( + id="2aaa012e-099a-11ed-861d-0242ac120002", + name="d3d6b945-0182-1000-d7e4-d81b8f79f310", + fullyQualifiedName="nifi_source.d3d6b945-0182-1000-d7e4-d81b8f79f310", + displayName="NiFi Flow", + pipelineUrl="/nifi-api/flow/process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310", + tasks=[ + Task( + name="d3f023ac-0182-1000-8bbe-e2b00347fff8", + displayName="FetchFile", + taskUrl="/nifi-api/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8", + taskType="org.apache.nifi.processors.standard.FetchFile", + downstreamTasks=[], + ), + Task( + name="d3f1304d-0182-1000-f0f5-9a6927976941", + displayName="ListFile", + taskUrl="/nifi-api/processors/d3f1304d-0182-1000-f0f5-9a6927976941", + taskType="org.apache.nifi.processors.standard.ListFile", + downstreamTasks=["d3f023ac-0182-1000-8bbe-e2b00347fff8"], + ), + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + + +class NifiUnitTest(TestCase): + @patch("metadata.ingestion.source.pipeline.pipeline_service.test_connection") + @patch( + "metadata.ingestion.source.pipeline.nifi.NifiClient.token", + new_callable=PropertyMock, + ) + def __init__(self, methodName, nifi_token_prop, test_connection) -> None: + super().__init__(methodName) + test_connection.return_value = False + + nifi_token_prop.return_value.token.return_value = "token" + + config = OpenMetadataWorkflowConfig.parse_obj(mock_nifi_config) + self.nifi = NifiSource.create( + mock_nifi_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + self.nifi.context.__dict__["pipeline"] = MOCK_PIPELINE + self.nifi.context.__dict__["pipeline_service"] = MOCK_PIPELINE_SERVICE + + def test_pipeline_name(self): + assert ( + self.nifi.get_pipeline_name(EXPECTED_NIFI_DETAILS) + == mock_data["processGroupFlow"]["breadcrumb"]["breadcrumb"]["name"] + ) + + def test_pipelines(self): + pipline = list(self.nifi.yield_pipeline(EXPECTED_NIFI_DETAILS))[0] + assert pipline == EXPECTED_CREATED_PIPELINES