From dddfdcb7b512bdf1b81a5d594d54121178f8c0f6 Mon Sep 17 00:00:00 2001 From: KC31 <45307507+kunalchoudhary31@users.noreply.github.com> Date: Tue, 18 Mar 2025 11:43:20 +0530 Subject: [PATCH] Fixes ISSUE-13953: Converted Nifi Client from requests module to OM REST client (#20039) * ISSUE-13953 Converted Nifi Client from requests module to OM REST client * pyformat * lint --------- Co-authored-by: kc Co-authored-by: ulixius9 --- .../src/metadata/examples/workflows/nifi.yaml | 51 +++++++ .../src/metadata/ingestion/ometa/client.py | 20 +++ .../ingestion/source/pipeline/nifi/client.py | 136 +++++++++--------- .../source/pipeline/nifi/connection.py | 19 +-- 4 files changed, 137 insertions(+), 89 deletions(-) create mode 100644 ingestion/src/metadata/examples/workflows/nifi.yaml diff --git a/ingestion/src/metadata/examples/workflows/nifi.yaml b/ingestion/src/metadata/examples/workflows/nifi.yaml new file mode 100644 index 00000000000..ef519a7216a --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/nifi.yaml @@ -0,0 +1,51 @@ +source: + type: nifi + serviceName: nifi_source + serviceConnection: + config: + type: Nifi + nifiConfig: + username: XXXX + password: XXXX + verifySSL: false + ## client certificate authentication + # certificateAuthorityPath: path/to/CA + # clientCertificatePath: path/to/clientCertificate + # clientkeyPath: path/to/clientKey + hostPort: https://localhost:8443 + sourceConfig: + config: + type: PipelineMetadata + # lineageInformation: + # dbServiceNames: [] + # storageServiceNames: [] + # markDeletedPipelines: True + # includeTags: True + # includeLineage: true + # includeUnDeployedPipelines: true + # pipelineFilterPattern: + # includes: + # - pipeline1 + # - pipeline2 + # excludes: + # - pipeline3 + # - pipeline4 +sink: + type: metadata-rest + config: {} +workflowConfig: + loggerLevel: INFO # DEBUG, INFO, WARNING or ERROR + openMetadataServerConfig: + hostPort: "http://localhost:8585/api" + authProvider: openmetadata + securityConfig: + jwtToken: XXXX + ## Store the service Connection information + storeServiceConnection: true # false + ## Secrets Manager Configuration + # secretsManagerProvider: aws, azure or noop + # secretsManagerLoader: airflow or env + ## If SSL, fill the following + # verifySSL: validate # or ignore + # sslConfig: + # caCertificate: /local/path/to/certificate diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index 589b09866c7..3ef8b4e9bfd 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -14,6 +14,7 @@ Python API REST wrapper and helpers import time import traceback from datetime import datetime, timezone +from json import JSONDecodeError from typing import Any, Callable, Dict, List, Optional, Union import requests @@ -117,6 +118,8 @@ class ClientConfig(ConfigModel): verify: Optional[Union[bool, str]] = None cookies: Optional[Any] = None ttl_cache: int = 60 + timeout: Optional[int] = None + cert: Optional[Union[str, tuple]] = None # pylint: disable=too-many-instance-attributes @@ -140,6 +143,8 @@ class REST: self._auth_token_mode = self.config.auth_token_mode self._verify = self.config.verify self._cookies = self.config.cookies + self._cert = self.config.cert + self._timeout = self.config.timeout self._limits_reached = TTLCache(config.ttl_cache) @@ -177,6 +182,7 @@ class REST: self.config.expires_in = ( datetime.now(timezone.utc).timestamp() + expiry - 120 ) + if self.config.auth_header: headers[self.config.auth_header] = ( f"{self._auth_token_mode} {self.config.access_token}" @@ -189,6 +195,7 @@ class REST: # the value will be set to that value. # Example: "Proxy-Authorization": "%(Authorization)s" # This will result in the Authorization value being set for the Proxy-Authorization Extra Header + # Any header which is comming as extra header from client will overwrite the header with same name in headers if self.config.extra_headers: extra_headers: Dict[str, str] = self.config.extra_headers extra_headers = {k: (v % headers) for k, v in extra_headers.items()} @@ -210,6 +217,12 @@ class REST: if json: opts["json"] = json + if self._cert: + opts["cert"] = self._cert + + if self._timeout: + opts["timeout"] = self._timeout + total_retries = self._retry if self._retry > 0 else 0 retry = total_retries while retry >= 0: @@ -250,6 +263,12 @@ class REST: if resp.text != "": try: return resp.json() + except JSONDecodeError as json_decode_error: + logger.error( + f"Json decoding error while returning response {resp} in json format - {json_decode_error}." + f"The Response still returned to be handled by client..." + ) + return resp except Exception as exc: logger.debug(traceback.format_exc()) logger.warning( @@ -308,6 +327,7 @@ class REST: Parameters: path (str): data (): + json (): Returns: Response diff --git a/ingestion/src/metadata/ingestion/source/pipeline/nifi/client.py b/ingestion/src/metadata/ingestion/source/pipeline/nifi/client.py index 981a58a92af..794b080d6f0 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/nifi/client.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/nifi/client.py @@ -12,11 +12,19 @@ Client to interact with Nifi apis """ import traceback -from typing import Any, Iterable, List, Optional - -import requests -from requests import HTTPError +from typing import Dict, Iterable, List +from metadata.generated.schema.entity.services.connections.pipeline.nifi.basicAuth import ( + NifiBasicAuth, +) +from metadata.generated.schema.entity.services.connections.pipeline.nifi.clientCertificateAuth import ( + NifiClientCertificateAuth, +) +from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import ( + NifiConnection, +) +from metadata.ingestion.ometa.client import REST, ClientConfig, HTTPError +from metadata.utils.constants import AUTHORIZATION_HEADER, NO_ACCESS_TOKEN from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger @@ -26,6 +34,8 @@ IDENTIFIER = "identifier" PROCESS_GROUPS_STARTER = "/process-groups/" RESOURCES = "resources" REQUESTS_TIMEOUT = 60 * 5 +CONTENT_HEADER = {"Content-type": "application/x-www-form-urlencoded"} +NIFI_API_BASE_ENDPOINT = "/nifi-api" class NifiClient: @@ -33,39 +43,43 @@ class NifiClient: Wrapper on top of Nifi REST API """ - # pylint: disable=too-many-arguments - def __init__( - self, - host_port: str, - username: Optional[str] = None, - password: Optional[str] = None, - ca_file_path: Optional[str] = None, - client_cert_path: Optional[str] = None, - client_key_path: Optional[str] = None, - verify: bool = False, - ): - self._token = None - self._resources = None + client: REST - self.content_headers = {"Content-Type": "application/x-www-form-urlencoded"} - self.api_endpoint = clean_uri(host_port) + "/nifi-api" - self.username = username - self.password = password + def __init__(self, connection: NifiConnection): + self.connection = connection + self._token, self._resources, self.data = None, None, None + self.api_endpoint = clean_uri(self.connection.hostPort) + NIFI_API_BASE_ENDPOINT - if all(setting for setting in [self.username, self.password]): - self.data = {"username": self.username, "password": self.password} - self.verify = verify - self.headers = { - "Authorization": f"Bearer {self.token}", - **self.content_headers, + client_config = ClientConfig( + api_version="", + timeout=REQUESTS_TIMEOUT, + base_url=self.api_endpoint, + extra_headers=CONTENT_HEADER, + ) + + if isinstance(self.connection.nifiConfig, NifiBasicAuth): + self.verify = self.connection.nifiConfig.verifySSL + self.data = { + "username": self.connection.nifiConfig.username, + "password": self.connection.nifiConfig.password.get_secret_value() + if self.connection.nifiConfig.password + else None, } - self.client_cert = None - else: - self.data = None - self.verify = ca_file_path if ca_file_path else False - self.client_cert = (client_cert_path, client_key_path) - self.headers = self.content_headers - access = self.get("access") + client_config.verify = self.connection.nifiConfig.verifySSL + client_config.auth_header = AUTHORIZATION_HEADER + client_config.access_token = self.token + + self.client = REST(client_config) + elif isinstance(self.connection.nifiConfig, NifiClientCertificateAuth): + ca_path = self.connection.nifiConfig.certificateAuthorityPath + cc_path = self.connection.nifiConfig.clientCertificatePath + ck_path = self.connection.nifiConfig.clientkeyPath + + client_config.verify = ca_path if ca_path else False + client_config.cert = (cc_path, ck_path) + + self.client = REST(client_config) + access = self.client.get("access") logger.debug(access) @property @@ -76,13 +90,19 @@ class NifiClient: """ if not self._token: try: - res = requests.post( - f"{self.api_endpoint}/access/token", - verify=self.verify, - headers=self.content_headers, - data=self.data, - timeout=REQUESTS_TIMEOUT, + client = REST( + ClientConfig( + base_url=self.api_endpoint, + verify=self.verify, + extra_headers=CONTENT_HEADER, + timeout=REQUESTS_TIMEOUT, + api_version="", + auth_token_mode=None, + auth_token=lambda: (NO_ACCESS_TOKEN, 0), + ) ) + + res = client.post("access/token", data=self.data) self._token = res.text if res.status_code not in (200, 201): @@ -100,6 +120,7 @@ class NifiClient: except Exception as err: logger.error(f"Fetching token failed due to - {err}") + logger.debug(traceback.format_exc()) raise err return self._token @@ -110,7 +131,7 @@ class NifiClient: This can be expensive. Only query it once. """ if not self._resources: - self._resources = self.get(RESOURCES) # API endpoint + self._resources = self.client.get(RESOURCES) # API endpoint # Get the first `resources` key from the dict try: @@ -118,33 +139,6 @@ class NifiClient: except AttributeError: return [] - def get(self, path: str) -> Optional[Any]: - """ - GET call wrapper - """ - try: - res = requests.get( - f"{self.api_endpoint}/{path}", - verify=self.verify, - headers=self.headers, - timeout=REQUESTS_TIMEOUT, - cert=self.client_cert, - ) - - return res.json() - - except HTTPError as err: - logger.warning(f"Connection error calling the Nifi API - {err}") - raise err - - except ValueError as err: - logger.warning(f"Cannot pick up the JSON from API response - {err}") - raise err - - except Exception as err: - logger.warning(f"Unknown error calling Nifi API - {err}") - raise err - def _get_process_group_ids(self) -> List[str]: return [ elem.get(IDENTIFIER).replace(PROCESS_GROUPS_STARTER, "") @@ -152,10 +146,10 @@ class NifiClient: if elem.get(IDENTIFIER).startswith(PROCESS_GROUPS_STARTER) ] - def get_process_group(self, id_: str) -> dict: - return self.get(f"flow/process-groups/{id_}") + def get_process_group(self, id_: str) -> Dict: + return self.client.get(f"flow/process-groups/{id_}") - def list_process_groups(self) -> Iterable[dict]: + def list_process_groups(self) -> Iterable[Dict]: """ This will call the API endpoints one at a time. diff --git a/ingestion/src/metadata/ingestion/source/pipeline/nifi/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/nifi/connection.py index 680a8cff54c..0a1949c1909 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/nifi/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/nifi/connection.py @@ -17,9 +17,6 @@ from typing import Optional from metadata.generated.schema.entity.automations.workflow import ( Workflow as AutomationWorkflow, ) -from metadata.generated.schema.entity.services.connections.pipeline.nifi.basicAuth import ( - NifiBasicAuth, -) from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import ( NifiConnection, ) @@ -36,22 +33,8 @@ def get_connection(connection: NifiConnection) -> NifiClient: """ Create connection """ - if isinstance(connection.nifiConfig, NifiBasicAuth): - return NifiClient( - host_port=connection.hostPort, - username=connection.nifiConfig.username, - password=connection.nifiConfig.password.get_secret_value() - if connection.nifiConfig.password - else None, - verify=connection.nifiConfig.verifySSL, - ) - return NifiClient( - host_port=connection.hostPort, - ca_file_path=connection.nifiConfig.certificateAuthorityPath, - client_cert_path=connection.nifiConfig.clientCertificatePath, - client_key_path=connection.nifiConfig.clientkeyPath, - ) + return NifiClient(connection=connection) def test_connection(