Fixes ISSUE-13953: Converted Nifi Client from requests module to OM REST client (#20039)

* ISSUE-13953 Converted Nifi Client from requests module to OM REST client

* pyformat

* lint

---------

Co-authored-by: kc <kc@kcs-MacBook-Pro.local>
Co-authored-by: ulixius9 <mayursingal9@gmail.com>
This commit is contained in:
KC31 2025-03-18 11:43:20 +05:30 committed by GitHub
parent 92f8fa4082
commit dddfdcb7b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 137 additions and 89 deletions

View File

@ -0,0 +1,51 @@
source:
type: nifi
serviceName: nifi_source
serviceConnection:
config:
type: Nifi
nifiConfig:
username: XXXX
password: XXXX
verifySSL: false
## client certificate authentication
# certificateAuthorityPath: path/to/CA
# clientCertificatePath: path/to/clientCertificate
# clientkeyPath: path/to/clientKey
hostPort: https://localhost:8443
sourceConfig:
config:
type: PipelineMetadata
# lineageInformation:
# dbServiceNames: []
# storageServiceNames: []
# markDeletedPipelines: True
# includeTags: True
# includeLineage: true
# includeUnDeployedPipelines: true
# pipelineFilterPattern:
# includes:
# - pipeline1
# - pipeline2
# excludes:
# - pipeline3
# - pipeline4
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: INFO # DEBUG, INFO, WARNING or ERROR
openMetadataServerConfig:
hostPort: "http://localhost:8585/api"
authProvider: openmetadata
securityConfig:
jwtToken: XXXX
## Store the service Connection information
storeServiceConnection: true # false
## Secrets Manager Configuration
# secretsManagerProvider: aws, azure or noop
# secretsManagerLoader: airflow or env
## If SSL, fill the following
# verifySSL: validate # or ignore
# sslConfig:
# caCertificate: /local/path/to/certificate

View File

@ -14,6 +14,7 @@ Python API REST wrapper and helpers
import time import time
import traceback import traceback
from datetime import datetime, timezone from datetime import datetime, timezone
from json import JSONDecodeError
from typing import Any, Callable, Dict, List, Optional, Union from typing import Any, Callable, Dict, List, Optional, Union
import requests import requests
@ -117,6 +118,8 @@ class ClientConfig(ConfigModel):
verify: Optional[Union[bool, str]] = None verify: Optional[Union[bool, str]] = None
cookies: Optional[Any] = None cookies: Optional[Any] = None
ttl_cache: int = 60 ttl_cache: int = 60
timeout: Optional[int] = None
cert: Optional[Union[str, tuple]] = None
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
@ -140,6 +143,8 @@ class REST:
self._auth_token_mode = self.config.auth_token_mode self._auth_token_mode = self.config.auth_token_mode
self._verify = self.config.verify self._verify = self.config.verify
self._cookies = self.config.cookies self._cookies = self.config.cookies
self._cert = self.config.cert
self._timeout = self.config.timeout
self._limits_reached = TTLCache(config.ttl_cache) self._limits_reached = TTLCache(config.ttl_cache)
@ -177,6 +182,7 @@ class REST:
self.config.expires_in = ( self.config.expires_in = (
datetime.now(timezone.utc).timestamp() + expiry - 120 datetime.now(timezone.utc).timestamp() + expiry - 120
) )
if self.config.auth_header: if self.config.auth_header:
headers[self.config.auth_header] = ( headers[self.config.auth_header] = (
f"{self._auth_token_mode} {self.config.access_token}" f"{self._auth_token_mode} {self.config.access_token}"
@ -189,6 +195,7 @@ class REST:
# the value will be set to that value. # the value will be set to that value.
# Example: "Proxy-Authorization": "%(Authorization)s" # Example: "Proxy-Authorization": "%(Authorization)s"
# This will result in the Authorization value being set for the Proxy-Authorization Extra Header # This will result in the Authorization value being set for the Proxy-Authorization Extra Header
# Any header which is comming as extra header from client will overwrite the header with same name in headers
if self.config.extra_headers: if self.config.extra_headers:
extra_headers: Dict[str, str] = self.config.extra_headers extra_headers: Dict[str, str] = self.config.extra_headers
extra_headers = {k: (v % headers) for k, v in extra_headers.items()} extra_headers = {k: (v % headers) for k, v in extra_headers.items()}
@ -210,6 +217,12 @@ class REST:
if json: if json:
opts["json"] = json opts["json"] = json
if self._cert:
opts["cert"] = self._cert
if self._timeout:
opts["timeout"] = self._timeout
total_retries = self._retry if self._retry > 0 else 0 total_retries = self._retry if self._retry > 0 else 0
retry = total_retries retry = total_retries
while retry >= 0: while retry >= 0:
@ -250,6 +263,12 @@ class REST:
if resp.text != "": if resp.text != "":
try: try:
return resp.json() return resp.json()
except JSONDecodeError as json_decode_error:
logger.error(
f"Json decoding error while returning response {resp} in json format - {json_decode_error}."
f"The Response still returned to be handled by client..."
)
return resp
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.warning( logger.warning(
@ -308,6 +327,7 @@ class REST:
Parameters: Parameters:
path (str): path (str):
data (): data ():
json ():
Returns: Returns:
Response Response

View File

@ -12,11 +12,19 @@
Client to interact with Nifi apis Client to interact with Nifi apis
""" """
import traceback import traceback
from typing import Any, Iterable, List, Optional from typing import Dict, Iterable, List
import requests
from requests import HTTPError
from metadata.generated.schema.entity.services.connections.pipeline.nifi.basicAuth import (
NifiBasicAuth,
)
from metadata.generated.schema.entity.services.connections.pipeline.nifi.clientCertificateAuth import (
NifiClientCertificateAuth,
)
from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import (
NifiConnection,
)
from metadata.ingestion.ometa.client import REST, ClientConfig, HTTPError
from metadata.utils.constants import AUTHORIZATION_HEADER, NO_ACCESS_TOKEN
from metadata.utils.helpers import clean_uri from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
@ -26,6 +34,8 @@ IDENTIFIER = "identifier"
PROCESS_GROUPS_STARTER = "/process-groups/" PROCESS_GROUPS_STARTER = "/process-groups/"
RESOURCES = "resources" RESOURCES = "resources"
REQUESTS_TIMEOUT = 60 * 5 REQUESTS_TIMEOUT = 60 * 5
CONTENT_HEADER = {"Content-type": "application/x-www-form-urlencoded"}
NIFI_API_BASE_ENDPOINT = "/nifi-api"
class NifiClient: class NifiClient:
@ -33,39 +43,43 @@ class NifiClient:
Wrapper on top of Nifi REST API Wrapper on top of Nifi REST API
""" """
# pylint: disable=too-many-arguments client: REST
def __init__(
self,
host_port: str,
username: Optional[str] = None,
password: Optional[str] = None,
ca_file_path: Optional[str] = None,
client_cert_path: Optional[str] = None,
client_key_path: Optional[str] = None,
verify: bool = False,
):
self._token = None
self._resources = None
self.content_headers = {"Content-Type": "application/x-www-form-urlencoded"} def __init__(self, connection: NifiConnection):
self.api_endpoint = clean_uri(host_port) + "/nifi-api" self.connection = connection
self.username = username self._token, self._resources, self.data = None, None, None
self.password = password self.api_endpoint = clean_uri(self.connection.hostPort) + NIFI_API_BASE_ENDPOINT
if all(setting for setting in [self.username, self.password]): client_config = ClientConfig(
self.data = {"username": self.username, "password": self.password} api_version="",
self.verify = verify timeout=REQUESTS_TIMEOUT,
self.headers = { base_url=self.api_endpoint,
"Authorization": f"Bearer {self.token}", extra_headers=CONTENT_HEADER,
**self.content_headers, )
if isinstance(self.connection.nifiConfig, NifiBasicAuth):
self.verify = self.connection.nifiConfig.verifySSL
self.data = {
"username": self.connection.nifiConfig.username,
"password": self.connection.nifiConfig.password.get_secret_value()
if self.connection.nifiConfig.password
else None,
} }
self.client_cert = None client_config.verify = self.connection.nifiConfig.verifySSL
else: client_config.auth_header = AUTHORIZATION_HEADER
self.data = None client_config.access_token = self.token
self.verify = ca_file_path if ca_file_path else False
self.client_cert = (client_cert_path, client_key_path) self.client = REST(client_config)
self.headers = self.content_headers elif isinstance(self.connection.nifiConfig, NifiClientCertificateAuth):
access = self.get("access") ca_path = self.connection.nifiConfig.certificateAuthorityPath
cc_path = self.connection.nifiConfig.clientCertificatePath
ck_path = self.connection.nifiConfig.clientkeyPath
client_config.verify = ca_path if ca_path else False
client_config.cert = (cc_path, ck_path)
self.client = REST(client_config)
access = self.client.get("access")
logger.debug(access) logger.debug(access)
@property @property
@ -76,13 +90,19 @@ class NifiClient:
""" """
if not self._token: if not self._token:
try: try:
res = requests.post( client = REST(
f"{self.api_endpoint}/access/token", ClientConfig(
verify=self.verify, base_url=self.api_endpoint,
headers=self.content_headers, verify=self.verify,
data=self.data, extra_headers=CONTENT_HEADER,
timeout=REQUESTS_TIMEOUT, timeout=REQUESTS_TIMEOUT,
api_version="",
auth_token_mode=None,
auth_token=lambda: (NO_ACCESS_TOKEN, 0),
)
) )
res = client.post("access/token", data=self.data)
self._token = res.text self._token = res.text
if res.status_code not in (200, 201): if res.status_code not in (200, 201):
@ -100,6 +120,7 @@ class NifiClient:
except Exception as err: except Exception as err:
logger.error(f"Fetching token failed due to - {err}") logger.error(f"Fetching token failed due to - {err}")
logger.debug(traceback.format_exc())
raise err raise err
return self._token return self._token
@ -110,7 +131,7 @@ class NifiClient:
This can be expensive. Only query it once. This can be expensive. Only query it once.
""" """
if not self._resources: if not self._resources:
self._resources = self.get(RESOURCES) # API endpoint self._resources = self.client.get(RESOURCES) # API endpoint
# Get the first `resources` key from the dict # Get the first `resources` key from the dict
try: try:
@ -118,33 +139,6 @@ class NifiClient:
except AttributeError: except AttributeError:
return [] return []
def get(self, path: str) -> Optional[Any]:
"""
GET call wrapper
"""
try:
res = requests.get(
f"{self.api_endpoint}/{path}",
verify=self.verify,
headers=self.headers,
timeout=REQUESTS_TIMEOUT,
cert=self.client_cert,
)
return res.json()
except HTTPError as err:
logger.warning(f"Connection error calling the Nifi API - {err}")
raise err
except ValueError as err:
logger.warning(f"Cannot pick up the JSON from API response - {err}")
raise err
except Exception as err:
logger.warning(f"Unknown error calling Nifi API - {err}")
raise err
def _get_process_group_ids(self) -> List[str]: def _get_process_group_ids(self) -> List[str]:
return [ return [
elem.get(IDENTIFIER).replace(PROCESS_GROUPS_STARTER, "") elem.get(IDENTIFIER).replace(PROCESS_GROUPS_STARTER, "")
@ -152,10 +146,10 @@ class NifiClient:
if elem.get(IDENTIFIER).startswith(PROCESS_GROUPS_STARTER) if elem.get(IDENTIFIER).startswith(PROCESS_GROUPS_STARTER)
] ]
def get_process_group(self, id_: str) -> dict: def get_process_group(self, id_: str) -> Dict:
return self.get(f"flow/process-groups/{id_}") return self.client.get(f"flow/process-groups/{id_}")
def list_process_groups(self) -> Iterable[dict]: def list_process_groups(self) -> Iterable[Dict]:
""" """
This will call the API endpoints This will call the API endpoints
one at a time. one at a time.

View File

@ -17,9 +17,6 @@ from typing import Optional
from metadata.generated.schema.entity.automations.workflow import ( from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow, Workflow as AutomationWorkflow,
) )
from metadata.generated.schema.entity.services.connections.pipeline.nifi.basicAuth import (
NifiBasicAuth,
)
from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import ( from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import (
NifiConnection, NifiConnection,
) )
@ -36,22 +33,8 @@ def get_connection(connection: NifiConnection) -> NifiClient:
""" """
Create connection Create connection
""" """
if isinstance(connection.nifiConfig, NifiBasicAuth):
return NifiClient(
host_port=connection.hostPort,
username=connection.nifiConfig.username,
password=connection.nifiConfig.password.get_secret_value()
if connection.nifiConfig.password
else None,
verify=connection.nifiConfig.verifySSL,
)
return NifiClient( return NifiClient(connection=connection)
host_port=connection.hostPort,
ca_file_path=connection.nifiConfig.certificateAuthorityPath,
client_cert_path=connection.nifiConfig.clientCertificatePath,
client_key_path=connection.nifiConfig.clientkeyPath,
)
def test_connection( def test_connection(