Fix #6961 - Fallback to another IP service (#7019)

Fix #6961  - Fallback to another IP service (#7019)
This commit is contained in:
Pere Miquel Brull 2022-09-01 06:39:12 +02:00 committed by GitHub
parent 1b685fe1ca
commit 7d2527bbeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 67 additions and 11 deletions

View File

@ -41,6 +41,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
airflowConfig.getUsername(),
airflowConfig.getPassword(),
airflowConfig.getApiEndpoint(),
airflowConfig.getHostIp(),
airflowConfig.getTimeout());
}
@ -231,7 +232,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
}
@Override
public Map<String, String> getHostIp() {
public Map<String, String> requestGetHostIp() {
HttpResponse<String> response;
try {
response = getRequestAuthenticatedForJsonContent("%s/%s/ip", serviceURL, API_ENDPOINT);

View File

@ -36,6 +36,7 @@ public abstract class PipelineServiceClient {
protected final URL serviceURL;
protected final String username;
protected final String password;
protected final String hostIp;
protected final HttpClient client;
protected static final String AUTH_HEADER = "Authorization";
protected static final String CONTENT_HEADER = "Content-Type";
@ -53,7 +54,7 @@ public abstract class PipelineServiceClient {
SERVER_VERSION = rawServerVersion;
}
public PipelineServiceClient(String userName, String password, String apiEndpoint, int apiTimeout) {
public PipelineServiceClient(String userName, String password, String apiEndpoint, String hostIp, int apiTimeout) {
try {
this.serviceURL = new URL(apiEndpoint);
} catch (MalformedURLException e) {
@ -61,6 +62,7 @@ public abstract class PipelineServiceClient {
}
this.username = userName;
this.password = password;
this.hostIp = hostIp;
this.client =
HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
@ -115,6 +117,18 @@ public abstract class PipelineServiceClient {
return getVersionFromString(clientVersion).equals(getVersionFromString(SERVER_VERSION));
}
public final Map<String, String> getHostIp() {
try {
if (this.hostIp == null || this.hostIp.isEmpty()) {
return requestGetHostIp();
} else {
return Map.of("ip", this.hostIp);
}
} catch (Exception e) {
throw PipelineServiceClientException.byMessage("Failed to get Pipeline Service host IP.", e.getMessage());
}
}
/* Check the status of pipeline service to ensure it is healthy */
public abstract Response getServiceStatus();
@ -142,6 +156,9 @@ public abstract class PipelineServiceClient {
/* Get the all last run logs of a deployed pipeline */
public abstract HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline);
/* Get the Pipeline Service host IP to whitelist in source systems */
public abstract Map<String, String> getHostIp();
/*
Get the Pipeline Service host IP to whitelist in source systems
Should return a map in the shape {"ip": "111.11.11.1"}
*/
public abstract Map<String, String> requestGetHostIp();
}

View File

@ -10,6 +10,10 @@
"description": "API host endpoint for Airflow",
"type": "string"
},
"hostIp": {
"description": "Airflow host IP that will be used to connect to the sources.",
"type": "string"
},
"username": {
"description": "Username for Login",
"type": "string"

View File

@ -9,8 +9,9 @@ import org.openmetadata.catalog.util.PipelineServiceClient;
public class MockPipelineServiceClient extends PipelineServiceClient {
public MockPipelineServiceClient(String userName, String password, String apiEndpoint, int apiTimeout) {
super(userName, password, apiEndpoint, apiTimeout);
public MockPipelineServiceClient(
String userName, String password, String apiEndpoint, String hostIp, int apiTimeout) {
super(userName, password, apiEndpoint, hostIp, apiTimeout);
}
@Override
@ -59,7 +60,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient {
}
@Override
public Map<String, String> getHostIp() {
public Map<String, String> requestGetHostIp() {
return null;
}
}

View File

@ -9,7 +9,7 @@ import org.openmetadata.catalog.exception.PipelineServiceVersionException;
public class PipelineServiceClientTest {
MockPipelineServiceClient mockPipelineServiceClient =
new MockPipelineServiceClient("user", "password", "https://endpoint.com", 10);
new MockPipelineServiceClient("user", "password", "https://endpoint.com", "111.11.11.1", 10);
@Test
public void testGetVersionFromString() {

View File

@ -173,6 +173,7 @@ eventHandlerConfiguration:
airflowConfiguration:
apiEndpoint: ${AIRFLOW_HOST:-http://localhost:8080}
hostIp: ${AIRFLOW_HOST_IP:-""}
username: ${AIRFLOW_USERNAME:-admin}
password: ${AIRFLOW_PASSWORD:-admin}
metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://localhost:8585/api}

View File

@ -1,7 +1,7 @@
FROM python:3.9-slim as base
ENV AIRFLOW_HOME=/airflow
RUN apt-get update && \
apt-get install -y build-essential freetds-bin freetds-dev gcc libevent-dev libffi-dev libpq-dev librdkafka-dev libsasl2-dev libsasl2-modules libssl-dev libxml2 netcat openjdk-11-jre openssl postgresql postgresql-contrib python3.9-dev tdsodbc unixodbc unixodbc-dev wget --no-install-recommends && \
apt-get install -y build-essential freetds-bin freetds-dev gcc libevent-dev libffi-dev libpq-dev librdkafka-dev libsasl2-dev libsasl2-modules libssl-dev libxml2 netcat openjdk-11-jre openssl postgresql postgresql-contrib python3.9-dev tdsodbc unixodbc unixodbc-dev wget vim --no-install-recommends && \
rm -rf /var/lib/apt/lists/*
# Manually fix security vulnerability from curl

View File

@ -3,7 +3,7 @@ ENV AIRFLOW_HOME=/airflow
RUN apt-get update && \
apt-get install -y build-essential freetds-bin freetds-dev gcc libevent-dev libffi-dev libpq-dev librdkafka-dev \
libsasl2-dev libsasl2-modules libssl-dev libxml2 netcat openjdk-11-jre openssl postgresql postgresql-contrib \
python3.9-dev tdsodbc unixodbc unixodbc-dev wget --no-install-recommends && \
python3.9-dev tdsodbc unixodbc unixodbc-dev wget vim --no-install-recommends && \
rm -rf /var/lib/apt/lists/*
# Manually fix security vulnerability from curl

View File

@ -12,9 +12,12 @@
IP endpoint
"""
import traceback
from typing import Optional
import requests
from openmetadata_managed_apis.utils.logger import routes_logger
from requests.exceptions import ConnectionError
from urllib3.exceptions import NewConnectionError
try:
from importlib.metadata import version
@ -29,6 +32,25 @@ from openmetadata_managed_apis.api.response import ApiResponse
logger = routes_logger()
IP_SERVICES = ["https://api.ipify.org", "https://api.my-ip.io/ip"]
def _get_ip_safely(url: str) -> Optional[str]:
"""
Safely retrieve the public IP
:param url: Service giving us the IP
:return: Host IP
"""
try:
host_ip = requests.get(url)
return host_ip.text
except (NewConnectionError, ConnectionError, ValueError) as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not extract IP info from {url} due to {err}. Retrying..."
)
return None
@blueprint.route("/ip", methods=["GET"])
@csrf.exempt
@ -40,7 +62,17 @@ def get_host_ip():
"""
try:
return ApiResponse.success({"ip": requests.get("https://api.ipify.org").text})
for ip_service in IP_SERVICES:
host_ip = _get_ip_safely(ip_service)
if host_ip:
return ApiResponse.success({"ip": host_ip})
return ApiResponse.error(
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Could not extract the host IP from neither {IP_SERVICES}. Verify connectivity.",
)
except Exception as exc:
msg = f"Internal error obtaining host IP due to [{exc}] "
logger.debug(traceback.format_exc())