#13190 - Improve URL handling for Airflow REST Client (#14403)

* #13190 - Improve URL handling for Airflow REST Client

* Fmt

* Format
This commit is contained in:
Pere Miquel Brull 2023-12-19 12:08:17 +01:00 committed by GitHub
parent 0e92a975e3
commit fac1896fdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -28,6 +28,7 @@ import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.utils.URIBuilder;
import org.json.JSONObject;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
@ -57,7 +58,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
protected final String password;
protected final HttpClient client;
protected final URL serviceURL;
private static final String API_ENDPOINT = "api/v1/openmetadata";
private static final List<String> API_ENDPOINT_SEGMENTS = List.of("api", "v1", "openmetadata");
private static final String DAG_ID = "dag_id";
public AirflowRESTClient(PipelineServiceClientConfiguration config) throws KeyStoreException {
@ -119,8 +120,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
HttpResponse<String> response;
try {
String deployEndpoint = "%s/%s/deploy";
String deployUrl = String.format(deployEndpoint, serviceURL, API_ENDPOINT);
String deployUrl = buildURI("deploy").build().toString();
String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline);
response = post(deployUrl, pipelinePayload);
if (response.statusCode() == 200) {
@ -147,10 +147,9 @@ public class AirflowRESTClient extends PipelineServiceClient {
String pipelineName = ingestionPipeline.getName();
HttpResponse<String> response;
try {
String deleteEndpoint = "%s/%s/delete?dag_id=%s";
response =
deleteRequestAuthenticatedForJsonContent(
deleteEndpoint, serviceURL, API_ENDPOINT, pipelineName);
URIBuilder uri = buildURI("delete");
uri.addParameter(DAG_ID, pipelineName);
response = deleteRequestAuthenticatedForJsonContent(uri.build().toString());
if (response.statusCode() == 200) {
return new PipelineServiceClientResponse().withCode(200).withPlatform(this.getPlatform());
}
@ -171,8 +170,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
String pipelineName = ingestionPipeline.getName();
HttpResponse<String> response;
try {
String triggerEndPoint = "%s/%s/trigger";
String triggerUrl = String.format(triggerEndPoint, serviceURL, API_ENDPOINT);
String triggerUrl = buildURI("trigger").build().toString();
JSONObject requestPayload = new JSONObject();
requestPayload.put(DAG_ID, pipelineName);
response = post(triggerUrl, requestPayload.toString());
@ -196,14 +194,12 @@ public class AirflowRESTClient extends PipelineServiceClient {
public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String toggleEndPoint;
String toggleUrl;
JSONObject requestPayload = new JSONObject();
requestPayload.put(DAG_ID, ingestionPipeline.getName());
// If the pipeline is currently enabled, disable it
if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) {
toggleEndPoint = "%s/%s/disable";
toggleUrl = String.format(toggleEndPoint, serviceURL, API_ENDPOINT);
toggleUrl = buildURI("disable").build().toString();
response = post(toggleUrl, requestPayload.toString());
if (response.statusCode() == 200) {
ingestionPipeline.setEnabled(false);
@ -220,8 +216,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
}
// otherwise, enable it back
} else {
toggleEndPoint = "%s/%s/enable";
toggleUrl = String.format(toggleEndPoint, serviceURL, API_ENDPOINT);
toggleUrl = buildURI("enable").build().toString();
response = post(toggleUrl, requestPayload.toString());
if (response.statusCode() == 200) {
ingestionPipeline.setEnabled(true);
@ -250,10 +245,10 @@ public class AirflowRESTClient extends PipelineServiceClient {
public List<PipelineStatus> getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true";
response =
getRequestAuthenticatedForJsonContent(
statusEndPoint, serviceURL, API_ENDPOINT, ingestionPipeline.getName());
URIBuilder uri = buildURI("status");
uri.addParameter(DAG_ID, ingestionPipeline.getName());
uri.addParameter("only_queued", "true");
response = getRequestAuthenticatedForJsonContent(uri.build().toString());
if (response.statusCode() == 200) {
return JsonUtils.readObjects(response.body(), PipelineStatus.class);
}
@ -276,8 +271,8 @@ public class AirflowRESTClient extends PipelineServiceClient {
public PipelineServiceClientResponse getServiceStatusInternal() {
HttpResponse<String> response;
try {
response =
getRequestAuthenticatedForJsonContent("%s/%s/health-auth", serviceURL, API_ENDPOINT);
String healthUrl = buildURI("health-auth").build().toString();
response = getRequestAuthenticatedForJsonContent(healthUrl);
// We can reach the APIs and get the status back from Airflow
if (response.statusCode() == 200) {
@ -321,8 +316,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
public PipelineServiceClientResponse runAutomationsWorkflow(Workflow workflow) {
HttpResponse<String> response;
try {
String automationsEndpoint = "%s/%s/run_automation";
String automationsUrl = String.format(automationsEndpoint, serviceURL, API_ENDPOINT);
String automationsUrl = buildURI("run_automation").build().toString();
String workflowPayload = JsonUtils.pojoToJson(workflow);
response = post(automationsUrl, workflowPayload);
if (response.statusCode() == 200) {
@ -362,9 +356,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
HttpResponse<String> response;
String workflowPayload = JsonUtils.pojoToJson(request);
try {
String automationsEndpoint = "%s/%s/%s";
String automationsUrl =
String.format(automationsEndpoint, serviceURL, API_ENDPOINT, endpoint);
String automationsUrl = buildURI(endpoint).build().toString();
response = post(automationsUrl, workflowPayload);
if (response.statusCode() == 200) {
return new PipelineServiceClientResponse()
@ -387,8 +379,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String killEndPoint = "%s/%s/kill";
String killUrl = String.format(killEndPoint, serviceURL, API_ENDPOINT);
String killUrl = buildURI("kill").build().toString();
JSONObject requestPayload = new JSONObject();
requestPayload.put(DAG_ID, ingestionPipeline.getName());
response = post(killUrl, requestPayload.toString());
@ -410,7 +401,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
public Map<String, String> requestGetHostIp() {
HttpResponse<String> response;
try {
response = getRequestAuthenticatedForJsonContent("%s/%s/ip", serviceURL, API_ENDPOINT);
response = getRequestAuthenticatedForJsonContent(buildURI("ip").build().toString());
if (response.statusCode() == 200) {
return JsonUtils.readValue(response.body(), new TypeReference<>() {});
}
@ -428,15 +419,15 @@ public class AirflowRESTClient extends PipelineServiceClient {
HttpResponse<String> response;
String taskId = TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString());
// Init empty after query param
String afterParam = "";
URIBuilder uri = buildURI("last_dag_logs");
if (after != null) {
afterParam = String.format("&after=%s", after);
uri.addParameter("after", after);
}
uri.addParameter(DAG_ID, ingestionPipeline.getName());
uri.addParameter("task_id", taskId);
try {
response =
getRequestAuthenticatedForJsonContent(
"%s/%s/last_dag_logs?dag_id=%s&task_id=%s%s",
serviceURL, API_ENDPOINT, ingestionPipeline.getName(), taskId, afterParam);
response = getRequestAuthenticatedForJsonContent(uri.build().toString());
if (response.statusCode() == 200) {
return JsonUtils.readValue(response.body(), new TypeReference<>() {});
}
@ -448,25 +439,30 @@ public class AirflowRESTClient extends PipelineServiceClient {
String.format("Failed to get last ingestion logs due to %s", response.body()));
}
private HttpResponse<String> getRequestAuthenticatedForJsonContent(
String stringUrlFormat, Object... stringReplacement)
private URIBuilder buildURI(String path) {
try {
List<String> pathInternal = new ArrayList<>(API_ENDPOINT_SEGMENTS);
pathInternal.add(path);
return new URIBuilder(String.valueOf(serviceURL)).setPathSegments(pathInternal);
} catch (Exception e) {
throw PipelineServiceClientException.byMessage(
String.format("Failed to built request URI for path [%s].", path), e.getMessage());
}
}
private HttpResponse<String> getRequestAuthenticatedForJsonContent(String url)
throws IOException, InterruptedException {
HttpRequest request =
authenticatedRequestBuilder(stringUrlFormat, stringReplacement).GET().build();
HttpRequest request = authenticatedRequestBuilder(url).GET().build();
return client.send(request, HttpResponse.BodyHandlers.ofString());
}
private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(
String stringUrlFormat, Object... stringReplacement)
private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(String url)
throws IOException, InterruptedException {
HttpRequest request =
authenticatedRequestBuilder(stringUrlFormat, stringReplacement).DELETE().build();
HttpRequest request = authenticatedRequestBuilder(url).DELETE().build();
return client.send(request, HttpResponse.BodyHandlers.ofString());
}
private HttpRequest.Builder authenticatedRequestBuilder(
String stringUrlFormat, Object... stringReplacement) {
String url = String.format(stringUrlFormat, stringReplacement);
private HttpRequest.Builder authenticatedRequestBuilder(String url) {
return HttpRequest.newBuilder(URI.create(url))
.header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, getBasicAuthenticationHeader(username, password));