From 7d2527bbeb7f11e2264cb51215d980c154e4cb5e Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 1 Sep 2022 06:39:12 +0200 Subject: [PATCH] Fix #6961 - Fallback to another IP service (#7019) Fix #6961 - Fallback to another IP service (#7019) --- .../catalog/airflow/AirflowRESTClient.java | 3 +- .../catalog/util/PipelineServiceClient.java | 23 +++++++++++-- .../configuration/airflowConfiguration.json | 4 +++ .../MockPipelineServiceClient.java | 7 ++-- .../PipelineServiceClientTest.java | 2 +- conf/openmetadata.yaml | 1 + ingestion/Dockerfile | 2 +- ingestion/Dockerfile_local | 2 +- .../api/routes/ip.py | 34 ++++++++++++++++++- 9 files changed, 67 insertions(+), 11 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java index 4887d4966fc..b7771b14093 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java @@ -41,6 +41,7 @@ public class AirflowRESTClient extends PipelineServiceClient { airflowConfig.getUsername(), airflowConfig.getPassword(), airflowConfig.getApiEndpoint(), + airflowConfig.getHostIp(), airflowConfig.getTimeout()); } @@ -231,7 +232,7 @@ public class AirflowRESTClient extends PipelineServiceClient { } @Override - public Map getHostIp() { + public Map requestGetHostIp() { HttpResponse response; try { response = getRequestAuthenticatedForJsonContent("%s/%s/ip", serviceURL, API_ENDPOINT); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java index 937b6cc4315..9aa936a44d6 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/PipelineServiceClient.java @@ -36,6 +36,7 @@ public abstract class PipelineServiceClient { protected final URL serviceURL; protected final String username; protected final String password; + protected final String hostIp; protected final HttpClient client; protected static final String AUTH_HEADER = "Authorization"; protected static final String CONTENT_HEADER = "Content-Type"; @@ -53,7 +54,7 @@ public abstract class PipelineServiceClient { SERVER_VERSION = rawServerVersion; } - public PipelineServiceClient(String userName, String password, String apiEndpoint, int apiTimeout) { + public PipelineServiceClient(String userName, String password, String apiEndpoint, String hostIp, int apiTimeout) { try { this.serviceURL = new URL(apiEndpoint); } catch (MalformedURLException e) { @@ -61,6 +62,7 @@ public abstract class PipelineServiceClient { } this.username = userName; this.password = password; + this.hostIp = hostIp; this.client = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_1_1) @@ -115,6 +117,18 @@ public abstract class PipelineServiceClient { return getVersionFromString(clientVersion).equals(getVersionFromString(SERVER_VERSION)); } + public final Map getHostIp() { + try { + if (this.hostIp == null || this.hostIp.isEmpty()) { + return requestGetHostIp(); + } else { + return Map.of("ip", this.hostIp); + } + } catch (Exception e) { + throw PipelineServiceClientException.byMessage("Failed to get Pipeline Service host IP.", e.getMessage()); + } + } + /* Check the status of pipeline service to ensure it is healthy */ public abstract Response getServiceStatus(); @@ -142,6 +156,9 @@ public abstract class PipelineServiceClient { /* Get the all last run logs of a deployed pipeline */ public abstract HttpResponse killIngestion(IngestionPipeline ingestionPipeline); - /* Get the Pipeline Service host IP to whitelist in source systems */ - public abstract Map getHostIp(); + /* + Get the Pipeline Service host IP to whitelist in source systems + Should return a map in the shape {"ip": "111.11.11.1"} + */ + public abstract Map requestGetHostIp(); } diff --git a/catalog-rest-service/src/main/resources/json/schema/configuration/airflowConfiguration.json b/catalog-rest-service/src/main/resources/json/schema/configuration/airflowConfiguration.json index 3ccdd7ff9a6..9ad5daccc35 100644 --- a/catalog-rest-service/src/main/resources/json/schema/configuration/airflowConfiguration.json +++ b/catalog-rest-service/src/main/resources/json/schema/configuration/airflowConfiguration.json @@ -10,6 +10,10 @@ "description": "API host endpoint for Airflow", "type": "string" }, + "hostIp": { + "description": "Airflow host IP that will be used to connect to the sources.", + "type": "string" + }, "username": { "description": "Username for Login", "type": "string" diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/MockPipelineServiceClient.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/MockPipelineServiceClient.java index c6df5682b2a..4419bece85d 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/MockPipelineServiceClient.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/MockPipelineServiceClient.java @@ -9,8 +9,9 @@ import org.openmetadata.catalog.util.PipelineServiceClient; public class MockPipelineServiceClient extends PipelineServiceClient { - public MockPipelineServiceClient(String userName, String password, String apiEndpoint, int apiTimeout) { - super(userName, password, apiEndpoint, apiTimeout); + public MockPipelineServiceClient( + String userName, String password, String apiEndpoint, String hostIp, int apiTimeout) { + super(userName, password, apiEndpoint, hostIp, apiTimeout); } @Override @@ -59,7 +60,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient { } @Override - public Map getHostIp() { + public Map requestGetHostIp() { return null; } } diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/PipelineServiceClientTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/PipelineServiceClientTest.java index 0a343d405e1..8396cdeccb0 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/PipelineServiceClientTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/pipelineService/PipelineServiceClientTest.java @@ -9,7 +9,7 @@ import org.openmetadata.catalog.exception.PipelineServiceVersionException; public class PipelineServiceClientTest { MockPipelineServiceClient mockPipelineServiceClient = - new MockPipelineServiceClient("user", "password", "https://endpoint.com", 10); + new MockPipelineServiceClient("user", "password", "https://endpoint.com", "111.11.11.1", 10); @Test public void testGetVersionFromString() { diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index c113aa90c5f..fd25caa3692 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -173,6 +173,7 @@ eventHandlerConfiguration: airflowConfiguration: apiEndpoint: ${AIRFLOW_HOST:-http://localhost:8080} + hostIp: ${AIRFLOW_HOST_IP:-""} username: ${AIRFLOW_USERNAME:-admin} password: ${AIRFLOW_PASSWORD:-admin} metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://localhost:8585/api} diff --git a/ingestion/Dockerfile b/ingestion/Dockerfile index 2f091c3ed48..ebdf21f41ad 100644 --- a/ingestion/Dockerfile +++ b/ingestion/Dockerfile @@ -1,7 +1,7 @@ FROM python:3.9-slim as base ENV AIRFLOW_HOME=/airflow RUN apt-get update && \ - apt-get install -y build-essential freetds-bin freetds-dev gcc libevent-dev libffi-dev libpq-dev librdkafka-dev libsasl2-dev libsasl2-modules libssl-dev libxml2 netcat openjdk-11-jre openssl postgresql postgresql-contrib python3.9-dev tdsodbc unixodbc unixodbc-dev wget --no-install-recommends && \ + apt-get install -y build-essential freetds-bin freetds-dev gcc libevent-dev libffi-dev libpq-dev librdkafka-dev libsasl2-dev libsasl2-modules libssl-dev libxml2 netcat openjdk-11-jre openssl postgresql postgresql-contrib python3.9-dev tdsodbc unixodbc unixodbc-dev wget vim --no-install-recommends && \ rm -rf /var/lib/apt/lists/* # Manually fix security vulnerability from curl diff --git a/ingestion/Dockerfile_local b/ingestion/Dockerfile_local index 91e44147b23..7a9be7dfb6f 100644 --- a/ingestion/Dockerfile_local +++ b/ingestion/Dockerfile_local @@ -3,7 +3,7 @@ ENV AIRFLOW_HOME=/airflow RUN apt-get update && \ apt-get install -y build-essential freetds-bin freetds-dev gcc libevent-dev libffi-dev libpq-dev librdkafka-dev \ libsasl2-dev libsasl2-modules libssl-dev libxml2 netcat openjdk-11-jre openssl postgresql postgresql-contrib \ - python3.9-dev tdsodbc unixodbc unixodbc-dev wget --no-install-recommends && \ + python3.9-dev tdsodbc unixodbc unixodbc-dev wget vim --no-install-recommends && \ rm -rf /var/lib/apt/lists/* # Manually fix security vulnerability from curl diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/ip.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/ip.py index 8a09b924004..547937f9ac8 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/ip.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/ip.py @@ -12,9 +12,12 @@ IP endpoint """ import traceback +from typing import Optional import requests from openmetadata_managed_apis.utils.logger import routes_logger +from requests.exceptions import ConnectionError +from urllib3.exceptions import NewConnectionError try: from importlib.metadata import version @@ -29,6 +32,25 @@ from openmetadata_managed_apis.api.response import ApiResponse logger = routes_logger() +IP_SERVICES = ["https://api.ipify.org", "https://api.my-ip.io/ip"] + + +def _get_ip_safely(url: str) -> Optional[str]: + """ + Safely retrieve the public IP + :param url: Service giving us the IP + :return: Host IP + """ + try: + host_ip = requests.get(url) + return host_ip.text + except (NewConnectionError, ConnectionError, ValueError) as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Could not extract IP info from {url} due to {err}. Retrying..." + ) + return None + @blueprint.route("/ip", methods=["GET"]) @csrf.exempt @@ -40,7 +62,17 @@ def get_host_ip(): """ try: - return ApiResponse.success({"ip": requests.get("https://api.ipify.org").text}) + + for ip_service in IP_SERVICES: + host_ip = _get_ip_safely(ip_service) + if host_ip: + return ApiResponse.success({"ip": host_ip}) + + return ApiResponse.error( + status=ApiResponse.STATUS_SERVER_ERROR, + error=f"Could not extract the host IP from neither {IP_SERVICES}. Verify connectivity.", + ) + except Exception as exc: msg = f"Internal error obtaining host IP due to [{exc}] " logger.debug(traceback.format_exc())