Fix #5566 - Add Pipeline Service host IP endpoint (#6898)

Fix #5566 - Add Pipeline Service host IP endpoint (#6898)
This commit is contained in:
Pere Miquel Brull 2022-08-24 16:27:31 +02:00 committed by GitHub
parent 8cec6a6d08
commit 667c837ae0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 133 additions and 3 deletions

View File

@ -211,7 +211,7 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
"There are pending migrations to be run on the database."
+ " Please backup your data and run `./bootstrap/bootstrap_storage.sh migrate-all`."
+ " You can find more information on upgrading OpenMetadata at"
+ " https://docs.open-metadata.org/install/upgrade-openmetadata");
+ " https://docs.open-metadata.org/deployment/upgrade");
}
}

View File

@ -230,6 +230,21 @@ public class AirflowRESTClient extends PipelineServiceClient {
String.format("Failed to kill running workflows due to %s", response.body()));
}
@Override
public Map<String, String> getHostIp() {
HttpResponse<String> response;
try {
response = getRequestAuthenticatedForJsonContent("%s/%s/ip", serviceURL, API_ENDPOINT);
if (response.statusCode() == 200) {
return JsonUtils.readValue(response.body(), new TypeReference<>() {});
}
} catch (Exception e) {
throw PipelineServiceClientException.byMessage("Failed to get Pipeline Service host IP.", e.getMessage());
}
throw new PipelineServiceClientException(
String.format("Failed to get Pipeline Service host IP due to %s", response.body()));
}
@Override
public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;

View File

@ -520,6 +520,24 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
return pipelineServiceClient.getServiceStatus();
}
@GET
@Path("/ip")
@Operation(
operationId = "checkAirflowHostIp",
summary = "Check the Airflow REST host IP",
tags = "IngestionPipelines",
description = "Check the Airflow REST host IP",
responses = {
@ApiResponse(
responseCode = "200",
description = "Pipeline Service host IP",
content = @Content(mediaType = "application/json"))
})
public Response getHostIp(@Context UriInfo uriInfo, @Context SecurityContext securityContext) {
Map<String, String> hostIp = pipelineServiceClient.getHostIp();
return Response.ok(hostIp, MediaType.APPLICATION_JSON_TYPE).build();
}
@DELETE
@Path("/{id}")
@Operation(

View File

@ -141,4 +141,7 @@ public abstract class PipelineServiceClient {
/* Get the all last run logs of a deployed pipeline */
public abstract HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline);
/* Get the Pipeline Service host IP to whitelist in source systems */
public abstract Map<String, String> getHostIp();
}

View File

@ -57,4 +57,9 @@ public class MockPipelineServiceClient extends PipelineServiceClient {
public HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline) {
return null;
}
@Override
public Map<String, String> getHostIp() {
return null;
}
}

View File

@ -0,0 +1,45 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
IP endpoint
"""
import traceback
import requests
try:
from importlib.metadata import version
except ImportError:
from importlib_metadata import version
from airflow.api_connexion import security
from airflow.security import permissions
from airflow.www.app import csrf
from openmetadata_managed_apis.api.app import blueprint
from openmetadata_managed_apis.api.response import ApiResponse
@blueprint.route("/ip", methods=["GET"])
@csrf.exempt
@security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
def get_host_ip():
"""
/ip endpoint to check Airflow host IP. Users will need to whitelist
this IP to access their source systems.
"""
try:
return ApiResponse.success({"ip": requests.get("https://api.ipify.org").text})
except Exception as err:
return ApiResponse.error(
status=ApiResponse.STATUS_SERVER_ERROR,
error=f"Internal error obtaining host IP - {err} - {traceback.format_exc()}",
)

View File

@ -100,6 +100,14 @@ export const checkAirflowStatus = (): Promise<AxiosResponse> => {
return APIClient.get('/services/ingestionPipelines/status');
};
export const getPipelineServiceHostIp = async () => {
const response = await APIClient.get<{ ip?: string }>(
'/services/ingestionPipelines/ip'
);
return response.data;
};
export const getIngestionPipelineLogById = (
id: string
): Promise<AxiosResponse> => {

View File

@ -13,13 +13,16 @@
import { FontAwesomeIcon } from '@fortawesome/react-fontawesome';
import Form, { FormProps } from '@rjsf/core';
import { AxiosError } from 'axios';
import classNames from 'classnames';
import { isEmpty } from 'lodash';
import { LoadingState } from 'Models';
import React, { FunctionComponent, useState } from 'react';
import React, { FunctionComponent, useEffect, useState } from 'react';
import { getPipelineServiceHostIp } from '../../../axiosAPIs/ingestionPipelineAPI';
import { ConfigData } from '../../../interface/service.interface';
import { formatFormDataForRender } from '../../../utils/JSONSchemaFormUtils';
import SVGIcons, { Icons } from '../../../utils/SvgUtils';
import { showErrorToast } from '../../../utils/ToastUtils';
import { Button } from '../../buttons/Button/Button';
import { ArrayFieldTemplate } from '../../JSONSchemaTemplate/ArrayFieldTemplate';
import { ObjectFieldTemplate } from '../../JSONSchemaTemplate/ObjectFieldTemplate';
@ -55,6 +58,22 @@ const FormBuilder: FunctionComponent<Props> = ({
const [connectionTestingState, setConnectionTestingState] =
useState<LoadingState>('initial');
const [hostIp, setHostIp] = useState<string>('[fetching]');
const fetchHostIp = async () => {
try {
const data = await getPipelineServiceHostIp();
setHostIp(data?.ip || '[unknown]');
} catch (error) {
setHostIp('[error - unknown]');
showErrorToast(error as AxiosError);
}
};
useEffect(() => {
fetchHostIp();
}, []);
const handleCancel = () => {
setLocalFormData(formatFormDataForRender(formData));
if (onCancel) {
@ -108,10 +127,22 @@ const FormBuilder: FunctionComponent<Props> = ({
case 'initial':
default:
return 'Test your connections before creating service';
return 'Test your connections before creating the service';
}
};
const getPipelineServiceHostIpForm = () => {
return (
<div className="tw-flex">
<SVGIcons alt="info-badge" icon={Icons.ARROW_RIGHT_PRIMARY} />
<span className="tw-ml-2">
OpenMetadata will connect to your resource from the IP {hostIp}. Make
sure to allow inbound traffic in your network security settings.
</span>
</div>
);
};
return (
<Form
ArrayFieldTemplate={ArrayFieldTemplate}
@ -136,6 +167,11 @@ const FormBuilder: FunctionComponent<Props> = ({
No Connection Configs available.
</div>
)}
{!isEmpty(schema) && onTestConnection && (
<div className="tw-flex tw-justify-between tw-bg-white tw-border tw-border-main tw-shadow tw-rounded tw-p-3 tw-mt-4">
<div className="tw-self-center">{getPipelineServiceHostIpForm()}</div>
</div>
)}
{!isEmpty(schema) && onTestConnection && (
<div className="tw-flex tw-justify-between tw-bg-white tw-border tw-border-main tw-shadow tw-rounded tw-p-3 tw-mt-4">
<div className="tw-self-center">{getConnectionTestingMessage()}</div>