Fixes 4628 - Delete airflow ingestion pipeline when the database service corresponding to it is hard deleted (#4631)

This commit is contained in:
Suresh Srinivas 2022-05-02 10:27:55 -07:00 committed by GitHub
parent 5751b8d75f
commit 05eb0e60f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 274 additions and 173 deletions

View File

@ -250,7 +250,7 @@ public final class Entity {
String updatedBy, String entityType, UUID entityId, boolean recursive, boolean hardDelete, boolean internal)
throws IOException {
EntityRepository<?> dao = getEntityRepository(entityType);
dao.delete(updatedBy, entityId.toString(), recursive, hardDelete, internal);
dao.delete(updatedBy, entityId.toString(), recursive, hardDelete);
}
public static void restoreEntity(String updatedBy, String entityType, UUID entityId) throws IOException {

View File

@ -14,15 +14,12 @@
package org.openmetadata.catalog.airflow;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import javax.ws.rs.core.Response;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.openmetadata.catalog.airflow.models.AirflowAuthRequest;
@ -30,71 +27,47 @@ import org.openmetadata.catalog.airflow.models.AirflowAuthResponse;
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.catalog.exception.AirflowException;
import org.openmetadata.catalog.exception.IngestionPipelineDeploymentException;
import org.openmetadata.catalog.exception.PipelineServiceClientException;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.PipelineServiceClient;
@Slf4j
public class AirflowRESTClient {
private final URL airflowURL;
private final String username;
private final String password;
private final HttpClient client;
private static final String AUTH_HEADER = "Authorization";
private static final String AUTH_TOKEN = "Bearer %s";
private static final String CONTENT_HEADER = "Content-Type";
private static final String CONTENT_TYPE = "application/json";
public class AirflowRESTClient extends PipelineServiceClient {
private final String authEndpoint;
private final String deployEndpoint;
public AirflowRESTClient(AirflowConfiguration airflowConfig) {
try {
this.airflowURL = new URL(airflowConfig.getApiEndpoint());
} catch (MalformedURLException e) {
throw new AirflowException(airflowConfig.getApiEndpoint() + " Malformed.");
}
this.username = airflowConfig.getUsername();
this.password = airflowConfig.getPassword();
this.client =
HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(airflowConfig.getTimeout()))
.build();
super(
airflowConfig.getUsername(),
airflowConfig.getPassword(),
airflowConfig.getApiEndpoint(),
airflowConfig.getTimeout());
authEndpoint = String.format("%s/api/v1/security/login", serviceURL);
deployEndpoint = String.format("%s/rest_api/api?api=deploy_dag", serviceURL);
}
private String authenticate() throws InterruptedException, IOException {
String authEndpoint = "%s/api/v1/security/login";
String authUrl = String.format(authEndpoint, airflowURL);
@SneakyThrows
@Override
public String authenticate() throws IOException {
AirflowAuthRequest authRequest =
AirflowAuthRequest.builder().username(this.username).password(this.password).build();
String authPayload = JsonUtils.pojoToJson(authRequest);
HttpRequest request =
HttpRequest.newBuilder(URI.create(authUrl))
.header(CONTENT_HEADER, CONTENT_TYPE)
.POST(HttpRequest.BodyPublishers.ofString(authPayload))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
HttpResponse<String> response = post(authEndpoint, authPayload, false);
if (response.statusCode() == 200) {
AirflowAuthResponse authResponse = JsonUtils.readValue(response.body(), AirflowAuthResponse.class);
return authResponse.getAccessToken();
}
throw new AirflowException("Failed to get access_token. Please check AirflowConfiguration username, password");
throw new PipelineServiceClientException(
"Failed to get access_token. Please check AirflowConfiguration username, password");
}
public String deploy(IngestionPipeline ingestionPipeline) {
@Override
public String deployPipeline(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline);
String deployEndPoint = "%s/rest_api/api?api=deploy_dag";
String deployUrl = String.format(deployEndPoint, airflowURL);
HttpRequest request =
HttpRequest.newBuilder(URI.create(deployUrl))
.header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(pipelinePayload))
.build();
response = client.send(request, HttpResponse.BodyHandlers.ofString());
response = post(deployEndpoint, pipelinePayload);
if (response.statusCode() == 200) {
return response.body();
}
@ -102,27 +75,20 @@ public class AirflowRESTClient {
throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), e.getMessage());
}
throw new AirflowException(
throw new PipelineServiceClientException(
String.format(
"%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s",
ingestionPipeline.getName(), Response.Status.fromStatusCode(response.statusCode()), response.body()));
}
@Override
public String deletePipeline(String pipelineName) {
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
String triggerEndPoint = "%s/rest_api/api?api=delete_dag&dag_id=%s";
String triggerUrl = String.format(triggerEndPoint, airflowURL, pipelineName);
String deleteEndpoint = "%s/rest_api/api?api=delete_dag&dag_id=%s";
String deleteUrl = String.format(deleteEndpoint, serviceURL, pipelineName);
JSONObject requestPayload = new JSONObject();
requestPayload.put("workflow_name", pipelineName);
HttpRequest request =
HttpRequest.newBuilder(URI.create(triggerUrl))
.header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString()))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
HttpResponse<String> response = post(deleteUrl, requestPayload.toString());
return response.body();
} catch (Exception e) {
LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName));
@ -130,22 +96,15 @@ public class AirflowRESTClient {
return null;
}
@Override
public String runPipeline(String pipelineName) {
HttpResponse<String> response;
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
String triggerEndPoint = "%s/rest_api/api?api=trigger_dag";
String triggerUrl = String.format(triggerEndPoint, airflowURL);
String triggerUrl = String.format(triggerEndPoint, serviceURL);
JSONObject requestPayload = new JSONObject();
requestPayload.put("workflow_name", pipelineName);
HttpRequest request =
HttpRequest.newBuilder(URI.create(triggerUrl))
.header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString()))
.build();
response = client.send(request, HttpResponse.BodyHandlers.ofString());
response = post(triggerUrl, requestPayload.toString());
if (response.statusCode() == 200) {
return response.body();
}
@ -157,21 +116,14 @@ public class AirflowRESTClient {
pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(response.statusCode()));
}
public IngestionPipeline getStatus(IngestionPipeline ingestionPipeline) {
@Override
public IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
String statusEndPoint = "%s/rest_api/api?api=dag_status&dag_id=%s";
String statusUrl = String.format(statusEndPoint, airflowURL, ingestionPipeline.getName());
String statusUrl = String.format(statusEndPoint, serviceURL, ingestionPipeline.getName());
JSONObject requestPayload = new JSONObject();
HttpRequest request =
HttpRequest.newBuilder(URI.create(statusUrl))
.header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString()))
.build();
response = client.send(request, HttpResponse.BodyHandlers.ofString());
response = post(statusUrl, requestPayload.toString());
if (response.statusCode() == 200) {
List<PipelineStatus> statuses = JsonUtils.readObjects(response.body(), PipelineStatus.class);
ingestionPipeline.setPipelineStatuses(statuses);
@ -181,21 +133,22 @@ public class AirflowRESTClient {
ingestionPipeline.setDeployed(false);
}
} catch (Exception e) {
throw AirflowException.byMessage(ingestionPipeline.getName(), e.getMessage());
throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage());
}
throw AirflowException.byMessage(
throw PipelineServiceClientException.byMessage(
ingestionPipeline.getName(),
"Failed to fetch ingestion pipeline runs",
Response.Status.fromStatusCode(response.statusCode()));
}
public HttpResponse<String> getRESTStatus() {
@Override
public HttpResponse<String> getServiceStatus() {
HttpResponse<String> response;
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
String statusEndPoint = "%s/rest_api/api?api=rest_status";
String statusUrl = String.format(statusEndPoint, airflowURL);
String statusUrl = String.format(statusEndPoint, serviceURL);
HttpRequest request =
HttpRequest.newBuilder(URI.create(statusUrl))
.header(CONTENT_HEADER, CONTENT_TYPE)
@ -207,32 +160,25 @@ public class AirflowRESTClient {
return response;
}
} catch (Exception e) {
throw AirflowException.byMessage("Failed to get REST status.", e.getMessage());
throw PipelineServiceClientException.byMessage("Failed to get REST status.", e.getMessage());
}
throw new AirflowException(String.format("Failed to get REST status due to %s", response.body()));
throw new PipelineServiceClientException(String.format("Failed to get REST status due to %s", response.body()));
}
@Override
public HttpResponse<String> testConnection(TestServiceConnection testServiceConnection) {
HttpResponse<String> response;
try {
String token = authenticate();
String authToken = String.format(AUTH_TOKEN, token);
String statusEndPoint = "%s/rest_api/api?api=test_connection";
String statusUrl = String.format(statusEndPoint, airflowURL);
String statusUrl = String.format(statusEndPoint, serviceURL);
String connectionPayload = JsonUtils.pojoToJson(testServiceConnection);
HttpRequest request =
HttpRequest.newBuilder(URI.create(statusUrl))
.header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(connectionPayload))
.build();
response = client.send(request, HttpResponse.BodyHandlers.ofString());
response = post(statusUrl, connectionPayload);
if (response.statusCode() == 200) {
return response;
}
} catch (Exception e) {
throw AirflowException.byMessage("Failed to test connection.", e.getMessage());
throw PipelineServiceClientException.byMessage("Failed to test connection.", e.getMessage());
}
throw new AirflowException(String.format("Failed to test connection due to %s", response.body()));
throw new PipelineServiceClientException(String.format("Failed to test connection due to %s", response.body()));
}
}

View File

@ -37,7 +37,7 @@ public class IngestionPipelineDeploymentException extends WebServiceException {
Response.Status.BAD_REQUEST, buildMessageByName(name, errorMessage));
}
private static String buildMessageByName(String name, String errorMessage) {
public static String buildMessageByName(String name, String errorMessage) {
return String.format(BY_NAME_MESSAGE, name, errorMessage);
}
}

View File

@ -15,23 +15,23 @@ package org.openmetadata.catalog.exception;
import javax.ws.rs.core.Response;
public class AirflowException extends WebServiceException {
public class PipelineServiceClientException extends WebServiceException {
private static final String BY_NAME_MESSAGE = "Airflow Exception [%s] due to [%s].";
public AirflowException(String message) {
public PipelineServiceClientException(String message) {
super(Response.Status.BAD_REQUEST, message);
}
private AirflowException(Response.Status status, String message) {
private PipelineServiceClientException(Response.Status status, String message) {
super(status, message);
}
public static AirflowException byMessage(String name, String errorMessage, Response.Status status) {
return new AirflowException(status, buildMessageByName(name, errorMessage));
public static PipelineServiceClientException byMessage(String name, String errorMessage, Response.Status status) {
return new PipelineServiceClientException(status, buildMessageByName(name, errorMessage));
}
public static AirflowException byMessage(String name, String errorMessage) {
return new AirflowException(Response.Status.BAD_REQUEST, buildMessageByName(name, errorMessage));
public static PipelineServiceClientException byMessage(String name, String errorMessage) {
return new PipelineServiceClientException(Response.Status.BAD_REQUEST, buildMessageByName(name, errorMessage));
}
private static String buildMessageByName(String name, String errorMessage) {

View File

@ -364,7 +364,9 @@ public abstract class EntityRepository<T> {
}
public final T create(UriInfo uriInfo, T entity) throws IOException {
return withHref(uriInfo, createInternal(entity));
entity = withHref(uriInfo, createInternal(entity));
postCreate(entity);
return entity;
}
@Transaction
@ -384,8 +386,16 @@ public abstract class EntityRepository<T> {
return update(uriInfo, original, updated);
}
@Transaction
public final PutResponse<T> createOrUpdate(UriInfo uriInfo, T updated) throws IOException {
PutResponse<T> response = createOrUpdateInternal(uriInfo, updated);
if (response.getStatus() == Status.CREATED) {
postCreate(response.getEntity());
}
return response;
}
@Transaction
public final PutResponse<T> createOrUpdateInternal(UriInfo uriInfo, T updated) throws IOException {
prepare(updated);
// Check if there is any original, deleted or not
T original = JsonUtils.readValue(dao.findJsonByFqn(getFullyQualifiedName(updated), ALL), entityClass);
@ -395,6 +405,16 @@ public abstract class EntityRepository<T> {
return update(uriInfo, original, updated);
}
protected void postCreate(T entity) {
// Override to perform any operation required after creation.
// For example ingestion pipeline creates a pipeline in AirFlow.
}
protected void postUpdate(T entity) {
// Override to perform any operation required after an entity update.
// For example ingestion pipeline creates a pipeline in AirFlow.
}
@Transaction
public PutResponse<T> update(UriInfo uriInfo, T original, T updated) throws IOException {
// Get all the fields in the original entity that can be updated during PUT operation
@ -470,23 +490,25 @@ public abstract class EntityRepository<T> {
return new PutResponse<>(added > 0 ? Status.CREATED : Status.OK, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED);
}
@Transaction
public final DeleteResponse<T> delete(String updatedBy, String id, boolean recursive, boolean hardDelete)
throws IOException {
return delete(updatedBy, id, recursive, hardDelete, false);
DeleteResponse response = deleteInternal(updatedBy, id, recursive, hardDelete);
postDelete((T) response.getEntity());
return response;
}
protected void postDelete(T entity) {
// Override this method to perform any operation required after deletion.
// For example ingestion pipeline deletes a pipeline in AirFlow.
}
@Transaction
public final DeleteResponse<T> delete(
String updatedBy, String id, boolean recursive, boolean hardDelete, boolean internal) throws IOException {
public final DeleteResponse<T> deleteInternal(String updatedBy, String id, boolean recursive, boolean hardDelete)
throws IOException {
// Validate entity
String json = dao.findJsonById(id, hardDelete ? Include.ALL : Include.NON_DELETED);
String json = dao.findJsonById(id, ALL);
if (json == null) {
if (!internal) {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entityType, id));
} else {
return null; // Maybe already deleted
}
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entityType, id));
}
T original = JsonUtils.readValue(json, entityClass);

View File

@ -34,10 +34,12 @@ import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.FullyQualifiedName;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.PipelineServiceClient;
public class IngestionPipelineRepository extends EntityRepository<IngestionPipeline> {
private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,source,airflowConfig,loggerLevel";
private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,source,airflowConfig,loggerLevel";
private static PipelineServiceClient pipelineServiceClient;
public IngestionPipelineRepository(CollectionDAO dao) {
super(
@ -114,6 +116,31 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
return getContainer(ingestionPipeline.getId(), Entity.INGESTION_PIPELINE);
}
@Override
protected void postUpdate(IngestionPipeline entity) {
deploy(entity); // Deploy the ingestion pipeline
}
@Override
protected void postCreate(IngestionPipeline entity) {
deploy(entity); // Deploy the ingestion pipeline
}
@Override
protected void postDelete(IngestionPipeline entity) {
pipelineServiceClient.deletePipeline(entity.getName());
}
public void setPipelineServiceClient(PipelineServiceClient client) {
pipelineServiceClient = client;
}
private void deploy(IngestionPipeline ingestionPipeline) {
if (Boolean.TRUE.equals(ingestionPipeline.getAirflowConfig().getForceDeploy())) {
pipelineServiceClient.deployPipeline(ingestionPipeline);
}
}
public static class IngestionPipelineEntityInterface extends EntityInterface<IngestionPipeline> {
public IngestionPipelineEntityInterface(IngestionPipeline entity) {
super(Entity.INGESTION_PIPELINE, entity);

View File

@ -78,6 +78,7 @@ public class DatabaseServiceResource extends EntityResource<DatabaseService, Dat
public DatabaseService addHref(UriInfo uriInfo, DatabaseService service) {
service.setHref(RestUtil.getHref(uriInfo, COLLECTION_PATH, service.getId()));
Entity.withHref(uriInfo, service.getOwner());
Entity.withHref(uriInfo, service.getPipelines());
return service;
}

View File

@ -77,6 +77,7 @@ import org.openmetadata.catalog.type.EntityHistory;
import org.openmetadata.catalog.type.Include;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.OpenMetadataClientSecurityUtil;
import org.openmetadata.catalog.util.PipelineServiceClient;
import org.openmetadata.catalog.util.ResultList;
@Slf4j
@ -87,7 +88,7 @@ import org.openmetadata.catalog.util.ResultList;
@Collection(name = "IngestionPipelines")
public class IngestionPipelineResource extends EntityResource<IngestionPipeline, IngestionPipelineRepository> {
public static final String COLLECTION_PATH = "v1/services/ingestionPipelines/";
private AirflowRESTClient airflowRESTClient;
private PipelineServiceClient pipelineServiceClient;
private AirflowConfiguration airflowConfiguration;
@Override
@ -103,7 +104,8 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
public void initialize(CatalogApplicationConfig config) {
this.airflowConfiguration = config.getAirflowConfiguration();
this.airflowRESTClient = new AirflowRESTClient(config.getAirflowConfiguration());
this.pipelineServiceClient = new AirflowRESTClient(config.getAirflowConfiguration());
dao.setPipelineServiceClient(pipelineServiceClient);
}
public static class IngestionPipelineList extends ResultList<IngestionPipeline> {
@ -318,9 +320,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateIngestionPipeline create)
throws IOException {
IngestionPipeline ingestionPipeline = getIngestionPipeline(securityContext, create);
Response response = create(uriInfo, securityContext, ingestionPipeline, ADMIN | BOT);
deploy(ingestionPipeline);
return response;
return create(uriInfo, securityContext, ingestionPipeline, ADMIN | BOT);
}
@PATCH
@ -365,9 +365,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateIngestionPipeline update)
throws IOException {
IngestionPipeline ingestionPipeline = getIngestionPipeline(securityContext, update);
Response response = createOrUpdate(uriInfo, securityContext, ingestionPipeline, ADMIN | BOT | OWNER);
deploy(ingestionPipeline);
return response;
return createOrUpdate(uriInfo, securityContext, ingestionPipeline, ADMIN | BOT | OWNER);
}
@POST
@ -389,7 +387,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
throws IOException {
Fields fields = getFields(FIELD_OWNER);
IngestionPipeline pipeline = dao.get(uriInfo, id, fields);
airflowRESTClient.runPipeline(pipeline.getName());
pipelineServiceClient.runPipeline(pipeline.getName());
return addHref(uriInfo, dao.get(uriInfo, id, fields));
}
@ -409,7 +407,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Valid TestServiceConnection testServiceConnection) {
HttpResponse<String> response = airflowRESTClient.testConnection(testServiceConnection);
HttpResponse<String> response = pipelineServiceClient.testConnection(testServiceConnection);
return Response.status(200, response.body()).build();
}
@ -426,7 +424,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
content = @Content(mediaType = "application/json"))
})
public Response getRESTStatus(@Context UriInfo uriInfo, @Context SecurityContext securityContext) {
HttpResponse<String> response = airflowRESTClient.getRESTStatus();
HttpResponse<String> response = pipelineServiceClient.getServiceStatus();
return Response.status(200, response.body()).build();
}
@ -449,10 +447,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
boolean hardDelete,
@Parameter(description = "Pipeline Id", schema = @Schema(type = "string")) @PathParam("id") String id)
throws IOException {
Response response = delete(uriInfo, securityContext, id, false, hardDelete, ADMIN | BOT);
IngestionPipeline pipeline = (IngestionPipeline) response.getEntity();
airflowRESTClient.deletePipeline(pipeline.getName());
return response;
return delete(uriInfo, securityContext, id, false, hardDelete, ADMIN | BOT);
}
private IngestionPipeline getIngestionPipeline(SecurityContext securityContext, CreateIngestionPipeline create)
@ -476,12 +471,6 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
.withUpdatedAt(System.currentTimeMillis());
}
private void deploy(IngestionPipeline ingestionPipeline) {
if (Boolean.TRUE.equals(ingestionPipeline.getAirflowConfig().getForceDeploy())) {
airflowRESTClient.deploy(ingestionPipeline);
}
}
private Source buildIngestionSource(CreateIngestionPipeline create) throws IOException {
Source source;
String serviceType = create.getService().getType();
@ -525,7 +514,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
private IngestionPipeline addStatus(IngestionPipeline ingestionPipeline) {
try {
ingestionPipeline = airflowRESTClient.getStatus(ingestionPipeline);
ingestionPipeline = pipelineServiceClient.getPipelineStatus(ingestionPipeline);
} catch (Exception e) {
LOG.error("Failed to fetch status for {} due to {}", ingestionPipeline.getName(), e);
}

View File

@ -0,0 +1,88 @@
package org.openmetadata.catalog.util;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.exception.PipelineServiceClientException;
/**
* Client to make API calls to add, deleted, and deploy pipelines on a PipelineService, such as Airflow. Core
* abstractions are as follows:
*
* <ul>
* <li>A PipelineService is a service such as AirFlow to which a pipeline can be deployed
* <li>A Pipeline is a workflow for performing certain taks. Example - ingestion pipeline is a workflow that connects
* to a database service or other services and collect metadata.
* <li>Pipeline uses `Connection` to a service as dependency. A Pipeline might need to connection to database service
* to collect metadata, OpenMetadata to user metadata over APIs, etc.
* </ul>
*/
public abstract class PipelineServiceClient {
protected final URL serviceURL;
protected final String username;
protected final String password;
protected final HttpClient client;
protected static final String AUTH_HEADER = "Authorization";
protected static final String AUTH_TOKEN = "Bearer %s";
protected static final String CONTENT_HEADER = "Content-Type";
protected static final String CONTENT_TYPE = "application/json";
public PipelineServiceClient(String userName, String password, String apiEndpoint, int apiTimeout) {
try {
this.serviceURL = new URL(apiEndpoint);
} catch (MalformedURLException e) {
throw new PipelineServiceClientException(apiEndpoint + " Malformed.");
}
this.username = userName;
this.password = password;
this.client =
HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(apiTimeout))
.build();
}
public final HttpResponse<String> post(String endpoint, String payload) throws IOException, InterruptedException {
return post(endpoint, payload, true);
}
public final HttpResponse<String> post(String endpoint, String payload, boolean authenticate)
throws IOException, InterruptedException {
String authToken = authenticate ? String.format(AUTH_TOKEN, authenticate()) : null;
HttpRequest request =
HttpRequest.newBuilder(URI.create(endpoint))
.header(CONTENT_HEADER, CONTENT_TYPE)
.header(AUTH_HEADER, authToken)
.POST(HttpRequest.BodyPublishers.ofString(payload))
.build();
return client.send(request, HttpResponse.BodyHandlers.ofString());
}
/* Authenticate with the service */
public abstract String authenticate() throws IOException;
/* Check the status of pipeline service to ensure it is healthy */
public abstract HttpResponse<String> getServiceStatus();
/* Test the connection to the service such as database service a pipeline depends on. */
public abstract HttpResponse<String> testConnection(TestServiceConnection testServiceConnection);
/* Deploy a pipeline to the pipeline service */
public abstract String deployPipeline(IngestionPipeline ingestionPipeline);
/* Deploy run the pipeline at the pipeline service */
public abstract String runPipeline(String pipelineName);
/* Stop and delete a pipeline at the pipeline service */
public abstract String deletePipeline(String pipelineName);
/* Get the status of a deployed pipeline */
public abstract IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline);
}

View File

@ -26,6 +26,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import lombok.Getter;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.common.utils.CommonUtil;
@ -106,9 +107,9 @@ public final class RestUtil {
}
public static class PutResponse<T> {
private T entity;
@Getter private T entity;
private ChangeEvent changeEvent;
private final Response.Status status;
@Getter private final Response.Status status;
private final String changeType;
/**
@ -128,10 +129,6 @@ public final class RestUtil {
this.changeType = changeType;
}
public T getEntity() {
return entity;
}
public Response toResponse() {
ResponseBuilder responseBuilder = Response.status(status).header(CHANGE_CUSTOM_HEADER, changeType);
if (changeType.equals(RestUtil.ENTITY_CREATED)

View File

@ -35,6 +35,7 @@ import org.openmetadata.catalog.api.services.ingestionPipelines.CreateIngestionP
import org.openmetadata.catalog.entity.services.DatabaseService;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceEntityInterface;
import org.openmetadata.catalog.jdbi3.IngestionPipelineRepository.IngestionPipelineEntityInterface;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.FilterPattern;
import org.openmetadata.catalog.metadataIngestion.SourceConfig;
@ -176,15 +177,12 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
@Test
void put_addIngestion_as_admin_2xx(TestInfo test) throws IOException {
DatabaseService service = createAndCheckEntity(createRequest(test).withDescription(null), ADMIN_AUTH_HEADERS);
// Create database service without any database connection
CreateDatabaseService create = createRequest(test);
DatabaseService service = createAndCheckEntity(create, ADMIN_AUTH_HEADERS);
EntityReference serviceRef = new DatabaseServiceEntityInterface(service).getEntityReference();
DatabaseConnection oldDatabaseConnection = create.getConnection();
// Update database description and ingestion service that are null
CreateDatabaseService update = createRequest(test).withDescription("description1");
ChangeDescription change = getChangeDescription(service.getVersion());
change.getFieldsAdded().add(new FieldChange().withName("description").withNewValue("description1"));
updateAndCheckEntity(update, OK, ADMIN_AUTH_HEADERS, UpdateType.MINOR_UPDATE, change);
SnowflakeConnection snowflakeConnection =
new SnowflakeConnection()
.withDatabase("test")
@ -192,11 +190,23 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
.withPassword("password")
.withUsername("username");
DatabaseConnection databaseConnection = new DatabaseConnection().withConfig(snowflakeConnection);
update.withConnection(databaseConnection);
service = updateEntity(update, OK, ADMIN_AUTH_HEADERS);
// Get the recently updated entity and verify the changes
service = getEntity(service.getId(), ADMIN_AUTH_HEADERS);
validateDatabaseConnection(databaseConnection, service.getConnection(), service.getServiceType());
// Update database connection to a new connection
CreateDatabaseService update = createRequest(test).withConnection(databaseConnection);
ChangeDescription change = getChangeDescription(service.getVersion());
change
.getFieldsUpdated()
.add(
new FieldChange()
.withName("connection")
.withOldValue(oldDatabaseConnection)
.withNewValue(databaseConnection));
service = updateAndCheckEntity(update, OK, ADMIN_AUTH_HEADERS, UpdateType.MINOR_UPDATE, change);
oldDatabaseConnection = service.getConnection();
oldDatabaseConnection.setConfig(
JsonUtils.convertValue(oldDatabaseConnection.getConfig(), SnowflakeConnection.class));
// Update the connection with additional property
ConnectionArguments connectionArguments =
new ConnectionArguments()
.withAdditionalProperty("credentials", "/tmp/creds.json")
@ -206,11 +216,17 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
snowflakeConnection.withConnectionArguments(connectionArguments).withConnectionOptions(connectionOptions);
databaseConnection.withConfig(snowflakeConnection);
update.withConnection(databaseConnection);
service = updateEntity(update, OK, ADMIN_AUTH_HEADERS);
// Get the recently updated entity and verify the changes
service = getEntity(service.getId(), ADMIN_AUTH_HEADERS);
validateDatabaseConnection(databaseConnection, service.getConnection(), service.getServiceType());
change = getChangeDescription(service.getVersion());
change
.getFieldsUpdated()
.add(
new FieldChange()
.withName("connection")
.withOldValue(oldDatabaseConnection)
.withNewValue(databaseConnection));
service = updateAndCheckEntity(update, OK, ADMIN_AUTH_HEADERS, UpdateType.MINOR_UPDATE, change);
// Add ingestion pipeline to the database service
IngestionPipelineResourceTest ingestionPipelineResourceTest = new IngestionPipelineResourceTest();
CreateIngestionPipeline createIngestionPipeline =
ingestionPipelineResourceTest.createRequest(test).withService(serviceRef);
@ -221,21 +237,28 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
.withIncludeViews(true)
.withSchemaFilterPattern(new FilterPattern().withExcludes(List.of("information_schema.*", "test.*")))
.withTableFilterPattern(new FilterPattern().withIncludes(List.of("sales.*", "users.*")));
SourceConfig sourceConfig = new SourceConfig().withConfig(databaseServiceMetadataPipeline);
createIngestionPipeline.withSourceConfig(sourceConfig);
IngestionPipeline ingestionPipeline =
ingestionPipelineResourceTest.createEntity(createIngestionPipeline, ADMIN_AUTH_HEADERS);
DatabaseService updatedService = getEntity(service.getId(), "pipelines", ADMIN_AUTH_HEADERS);
assertEquals(1, updatedService.getPipelines().size());
EntityReference expectedPipeline = updatedService.getPipelines().get(0);
assertEquals(ingestionPipeline.getId(), expectedPipeline.getId());
assertEquals(ingestionPipeline.getName(), expectedPipeline.getName());
assertEquals(ingestionPipeline.getFullyQualifiedName(), expectedPipeline.getFullyQualifiedName());
assertReference(
new IngestionPipelineEntityInterface(ingestionPipeline).getEntityReference(),
updatedService.getPipelines().get(0));
// TODO remove this
DatabaseConnection expectedDatabaseConnection =
JsonUtils.convertValue(ingestionPipeline.getSource().getServiceConnection(), DatabaseConnection.class);
SnowflakeConnection expectedSnowflake =
JsonUtils.convertValue(expectedDatabaseConnection.getConfig(), SnowflakeConnection.class);
assertEquals(expectedSnowflake, snowflakeConnection);
// Delete the database service and ensure ingestion pipeline is deleted
deleteEntity(updatedService.getId(), true, true, ADMIN_AUTH_HEADERS);
ingestionPipelineResourceTest.assertEntityDeleted(ingestionPipeline.getId(), true);
}
@Override
@ -297,6 +320,12 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
Schedule expectedSchedule = (Schedule) expected;
Schedule actualSchedule = JsonUtils.readValue((String) actual, Schedule.class);
assertEquals(expectedSchedule, actualSchedule);
} else if (fieldName.equals("connection")) {
DatabaseConnection expectedConnection = (DatabaseConnection) expected;
DatabaseConnection actualConnection = JsonUtils.readValue((String) actual, DatabaseConnection.class);
actualConnection.setConfig(JsonUtils.convertValue(actualConnection.getConfig(), SnowflakeConnection.class));
// TODO remove this hardcoding
validateDatabaseConnection(expectedConnection, actualConnection, DatabaseServiceType.Snowflake);
} else {
super.assertCommonFieldChange(fieldName, expected, actual);
}

View File

@ -16,13 +16,13 @@ package org.openmetadata.catalog.resources.services.ingestionpipelines;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.openmetadata.catalog.Entity.FIELD_OWNER;
import static org.openmetadata.catalog.util.TestUtils.ADMIN_AUTH_HEADERS;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE;
import static org.openmetadata.catalog.util.TestUtils.assertListNotNull;
import static org.openmetadata.catalog.util.TestUtils.assertListNull;
import static org.openmetadata.catalog.util.TestUtils.assertResponse;
import static org.openmetadata.catalog.util.TestUtils.assertResponseContains;
import java.io.IOException;
@ -50,6 +50,7 @@ import org.openmetadata.catalog.entity.services.DatabaseService;
import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.catalog.exception.IngestionPipelineDeploymentException;
import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceEntityInterface;
import org.openmetadata.catalog.jdbi3.IngestionPipelineRepository;
import org.openmetadata.catalog.metadataIngestion.DashboardServiceMetadataPipeline;
@ -205,9 +206,10 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
createRequest(test)
.withService(BIGQUERY_REFERENCE)
.withAirflowConfig(new AirflowConfig().withStartDate("2021-11-21").withForceDeploy(true));
HttpResponseException exception =
assertThrows(HttpResponseException.class, () -> createEntity(create, ADMIN_AUTH_HEADERS));
// TODO check for error
assertResponse(
() -> createEntity(create, ADMIN_AUTH_HEADERS),
BAD_REQUEST,
IngestionPipelineDeploymentException.buildMessageByName(create.getName(), "value"));
}
@Test