[WIP] Airlfow integration

This commit is contained in:
Sriharsha Chintalapani 2021-09-24 16:28:44 -07:00
parent a2f2e0bc2d
commit 4518be7ddb
16 changed files with 707 additions and 72 deletions

View File

@ -81,6 +81,16 @@ CREATE TABLE IF NOT EXISTS dashboard_service_entity (
UNIQUE KEY unique_name(name) UNIQUE KEY unique_name(name)
); );
CREATE TABLE IF NOT EXISTS pipeline_service_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
serviceType VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.serviceType') NOT NULL,
json JSON NOT NULL,
timestamp BIGINT,
PRIMARY KEY (id),
UNIQUE KEY unique_name(name)
);
-- --
-- Data entities -- Data entities
-- --

View File

@ -44,31 +44,31 @@ public abstract class MessagingServiceRepository {
private static final Logger LOG = LoggerFactory.getLogger(MessagingServiceRepository.class); private static final Logger LOG = LoggerFactory.getLogger(MessagingServiceRepository.class);
@CreateSqlObject @CreateSqlObject
abstract MessagingServiceDAO messagingServiceDOA(); abstract MessagingServiceDAO messagingServiceDAO();
@CreateSqlObject @CreateSqlObject
abstract EntityRelationshipDAO relationshipDAO(); abstract EntityRelationshipDAO relationshipDAO();
@Transaction @Transaction
public List<MessagingService> list(String name) throws IOException { public List<MessagingService> list(String name) throws IOException {
return JsonUtils.readObjects(messagingServiceDOA().list(name), MessagingService.class); return JsonUtils.readObjects(messagingServiceDAO().list(name), MessagingService.class);
} }
@Transaction @Transaction
public MessagingService get(String id) throws IOException { public MessagingService get(String id) throws IOException {
return EntityUtil.validate(id, messagingServiceDOA().findById(id), MessagingService.class); return EntityUtil.validate(id, messagingServiceDAO().findById(id), MessagingService.class);
} }
@Transaction @Transaction
public MessagingService getByName(String name) throws IOException { public MessagingService getByName(String name) throws IOException {
return EntityUtil.validate(name, messagingServiceDOA().findByName(name), MessagingService.class); return EntityUtil.validate(name, messagingServiceDAO().findByName(name), MessagingService.class);
} }
@Transaction @Transaction
public MessagingService create(MessagingService messagingService) throws JsonProcessingException { public MessagingService create(MessagingService messagingService) throws JsonProcessingException {
// Validate fields // Validate fields
validateIngestionSchedule(messagingService.getIngestionSchedule()); validateIngestionSchedule(messagingService.getIngestionSchedule());
messagingServiceDOA().insert(JsonUtils.pojoToJson(messagingService)); messagingServiceDAO().insert(JsonUtils.pojoToJson(messagingService));
return messagingService; return messagingService;
} }
@ -76,17 +76,17 @@ public abstract class MessagingServiceRepository {
Schedule ingestionSchedule) Schedule ingestionSchedule)
throws IOException { throws IOException {
validateIngestionSchedule(ingestionSchedule); validateIngestionSchedule(ingestionSchedule);
MessagingService dbService = EntityUtil.validate(id, messagingServiceDOA().findById(id), MessagingService.class); MessagingService dbService = EntityUtil.validate(id, messagingServiceDAO().findById(id), MessagingService.class);
// Update fields // Update fields
dbService.withDescription(description).withIngestionSchedule(ingestionSchedule) dbService.withDescription(description).withIngestionSchedule(ingestionSchedule)
.withSchemaRegistry(schemaRegistry).withBrokers(brokers); .withSchemaRegistry(schemaRegistry).withBrokers(brokers);
messagingServiceDOA().update(id, JsonUtils.pojoToJson(dbService)); messagingServiceDAO().update(id, JsonUtils.pojoToJson(dbService));
return dbService; return dbService;
} }
@Transaction @Transaction
public void delete(String id) { public void delete(String id) {
if (messagingServiceDOA().delete(id) <= 0) { if (messagingServiceDAO().delete(id) <= 0) {
throw EntityNotFoundException.byMessage(entityNotFound(Entity.MESSAGING_SERVICE, id)); throw EntityNotFoundException.byMessage(entityNotFound(Entity.MESSAGING_SERVICE, id));
} }
relationshipDAO().deleteAll(id); relationshipDAO().deleteAll(id);

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.joda.time.Period;
import org.joda.time.format.ISOPeriodFormat;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.type.Schedule;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.JsonUtils;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
public abstract class PipelineServiceRepository {
private static final Logger LOG = LoggerFactory.getLogger(PipelineServiceRepository.class);
@CreateSqlObject
abstract PipelineServiceDAO pipelineServiceDAO();
@CreateSqlObject
abstract EntityRelationshipDAO relationshipDAO();
@Transaction
public List<PipelineService> list(String name) throws IOException {
return JsonUtils.readObjects(pipelineServiceDAO().list(name), PipelineService.class);
}
@Transaction
public PipelineService get(String id) throws IOException {
return EntityUtil.validate(id, pipelineServiceDAO().findById(id), PipelineService.class);
}
@Transaction
public PipelineService getByName(String name) throws IOException {
return EntityUtil.validate(name, pipelineServiceDAO().findByName(name), PipelineService.class);
}
@Transaction
public PipelineService create(PipelineService pipelineService) throws JsonProcessingException {
// Validate fields
validateIngestionSchedule(pipelineService.getIngestionSchedule());
pipelineServiceDAO().insert(JsonUtils.pojoToJson(pipelineService));
return pipelineService;
}
public PipelineService update(String id, String description, URI url,
Schedule ingestionSchedule)
throws IOException {
validateIngestionSchedule(ingestionSchedule);
PipelineService pipelineService = EntityUtil.validate(id, pipelineServiceDAO().findById(id), PipelineService.class);
// Update fields
pipelineService.withDescription(description).withIngestionSchedule(ingestionSchedule)
.withUrl(url);
pipelineServiceDAO().update(id, JsonUtils.pojoToJson(pipelineService));
return pipelineService;
}
@Transaction
public void delete(String id) {
if (pipelineServiceDAO().delete(id) <= 0) {
throw EntityNotFoundException.byMessage(entityNotFound(Entity.PIPELINE_SERVICE, id));
}
relationshipDAO().deleteAll(id);
}
private void validateIngestionSchedule(Schedule ingestion) {
if (ingestion == null) {
return;
}
String duration = ingestion.getRepeatFrequency();
// ISO8601 duration format is P{y}Y{m}M{d}DT{h}H{m}M{s}S.
String[] splits = duration.split("T");
if (splits[0].contains("Y") || splits[0].contains("M") ||
(splits.length == 2 && splits[1].contains("S"))) {
throw new IllegalArgumentException("Ingestion repeatFrequency can only contain Days, Hours, and Minutes - " +
"example P{d}DT{h}H{m}M");
}
Period period;
try {
period = ISOPeriodFormat.standard().parsePeriod(duration);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid ingestion repeatFrequency " + duration, e);
}
if (period.toStandardMinutes().getMinutes() < 60) {
throw new IllegalArgumentException("Ingestion repeatFrequency is too short and must be more than 60 minutes");
}
}
public interface PipelineServiceDAO {
@SqlUpdate("INSERT INTO pipeline_service_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);
@SqlUpdate("UPDATE pipeline_service_entity SET json = :json where id = :id")
void update(@Bind("id") String id, @Bind("json") String json);
@SqlQuery("SELECT json FROM pipeline_service_entity WHERE id = :id")
String findById(@Bind("id") String id);
@SqlQuery("SELECT json FROM pipeline_service_entity WHERE name = :name")
String findByName(@Bind("name") String name);
@SqlQuery("SELECT json FROM pipeline_service_entity WHERE (name = :name OR :name is NULL)")
List<String> list(@Bind("name") String name);
@SqlUpdate("DELETE FROM pipeline_service_entity WHERE id = :id")
int delete(@Bind("id") String id);
}
}

View File

@ -17,7 +17,6 @@
package org.openmetadata.catalog.jdbi3; package org.openmetadata.catalog.jdbi3;
import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Chart;
import org.openmetadata.catalog.entity.data.Task; import org.openmetadata.catalog.entity.data.Task;
import org.openmetadata.catalog.entity.services.DashboardService; import org.openmetadata.catalog.entity.services.DashboardService;
import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.exception.CatalogExceptionMessage;
@ -25,7 +24,7 @@ import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.jdbi3.DashboardServiceRepository.DashboardServiceDAO; import org.openmetadata.catalog.jdbi3.DashboardServiceRepository.DashboardServiceDAO;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
import org.openmetadata.catalog.resources.charts.ChartResource; import org.openmetadata.catalog.resources.tasks.TaskResource;
import org.openmetadata.catalog.resources.tasks.TaskResource.TaskList; import org.openmetadata.catalog.resources.tasks.TaskResource.TaskList;
import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.TagLabel; import org.openmetadata.catalog.type.TagLabel;
@ -54,8 +53,9 @@ import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityN
public abstract class TaskRepository { public abstract class TaskRepository {
private static final Logger LOG = LoggerFactory.getLogger(TaskRepository.class); private static final Logger LOG = LoggerFactory.getLogger(TaskRepository.class);
private static final Fields TASK_UPDATE_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,taskConfig"); private static final Fields TASK_UPDATE_FIELDS = new Fields(TaskResource.FIELD_LIST, "owner," +
private static final Fields TASK_PATCH_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,service,tags"); "taskConfig,tags");
private static final Fields TASK_PATCH_FIELDS = new Fields(TaskResource.FIELD_LIST, "owner,service,tags");
public static String getFQN(EntityReference service, Task task) { public static String getFQN(EntityReference service, Task task) {
return (service.getName() + "." + task.getName()); return (service.getName() + "." + task.getName());

View File

@ -28,6 +28,7 @@ import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO; import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.resources.teams.TeamResource; import org.openmetadata.catalog.resources.teams.TeamResource;
import org.openmetadata.catalog.resources.teams.TeamResource.TeamList; import org.openmetadata.catalog.resources.teams.TeamResource.TeamList;
import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EntityReference;
@ -100,6 +101,9 @@ public abstract class TeamRepository {
@CreateSqlObject @CreateSqlObject
abstract ChartDAO chartDAO(); abstract ChartDAO chartDAO();
@CreateSqlObject
abstract TaskDAO taskDAO();
@Transaction @Transaction
public Team create(Team team, List<UUID> userIds) throws IOException { public Team create(Team team, List<UUID> userIds) throws IOException {
// Query 1 - Validate user IDs // Query 1 - Validate user IDs
@ -257,7 +261,7 @@ public abstract class TeamRepository {
private List<EntityReference> getOwns(String teamId) throws IOException { private List<EntityReference> getOwns(String teamId) throws IOException {
// Compile entities owned by the team // Compile entities owned by the team
return EntityUtil.getEntityReference(relationshipDAO().findTo(teamId, OWNS.ordinal()), tableDAO(), databaseDAO(), return EntityUtil.getEntityReference(relationshipDAO().findTo(teamId, OWNS.ordinal()), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO()); metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO());
} }
private void addUserRelationship(Team team, User user) { private void addUserRelationship(Team team, User user) {

View File

@ -22,6 +22,7 @@ import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO; import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO; import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EntityReference;
@ -76,6 +77,9 @@ public abstract class UsageRepository {
@CreateSqlObject @CreateSqlObject
abstract ChartDAO chartDAO(); abstract ChartDAO chartDAO();
@CreateSqlObject
abstract TaskDAO taskDAO();
@CreateSqlObject @CreateSqlObject
abstract EntityRelationshipDAO relationshipDAO(); abstract EntityRelationshipDAO relationshipDAO();
@ -84,7 +88,7 @@ public abstract class UsageRepository {
@Transaction @Transaction
public EntityUsage get(String entityType, String id, String date, int days) throws IOException { public EntityUsage get(String entityType, String id, String date, int days) throws IOException {
EntityReference ref = getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(), EntityReference ref = getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO()); metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO());
List<UsageDetails> usageDetails = usageDAO().getUsageById(id, date, days - 1); List<UsageDetails> usageDetails = usageDAO().getUsageById(id, date, days - 1);
return new EntityUsage().withUsage(usageDetails).withEntity(ref); return new EntityUsage().withUsage(usageDetails).withEntity(ref);
} }
@ -92,7 +96,7 @@ public abstract class UsageRepository {
@Transaction @Transaction
public EntityUsage getByName(String entityType, String fqn, String date, int days) throws IOException { public EntityUsage getByName(String entityType, String fqn, String date, int days) throws IOException {
EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fqn, tableDAO(), databaseDAO(), EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fqn, tableDAO(), databaseDAO(),
metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO()); metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO());
List<UsageDetails> usageDetails = usageDAO().getUsageById(ref.getId().toString(), date, days - 1); List<UsageDetails> usageDetails = usageDAO().getUsageById(ref.getId().toString(), date, days - 1);
return new EntityUsage().withUsage(usageDetails).withEntity(ref); return new EntityUsage().withUsage(usageDetails).withEntity(ref);
} }
@ -101,14 +105,14 @@ public abstract class UsageRepository {
public void create(String entityType, String id, DailyCount usage) throws IOException { public void create(String entityType, String id, DailyCount usage) throws IOException {
// Validate data entity for which usage is being collected // Validate data entity for which usage is being collected
getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(), metricsDAO(), getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(), metricsDAO(),
dashboardDAO(), reportDAO(), topicDAO(), chartDAO()); dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO());
addUsage(entityType, id, usage); addUsage(entityType, id, usage);
} }
@Transaction @Transaction
public void createByName(String entityType, String fullyQualifiedName, DailyCount usage) throws IOException { public void createByName(String entityType, String fullyQualifiedName, DailyCount usage) throws IOException {
EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fullyQualifiedName, tableDAO(), EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fullyQualifiedName, tableDAO(),
databaseDAO(), metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO()); databaseDAO(), metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO());
addUsage(entityType, ref.getId().toString(), usage); addUsage(entityType, ref.getId().toString(), usage);
LOG.info("Usage successfully posted by name"); LOG.info("Usage successfully posted by name");
} }

View File

@ -27,6 +27,7 @@ import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO; import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.resources.teams.UserResource; import org.openmetadata.catalog.resources.teams.UserResource;
import org.openmetadata.catalog.resources.teams.UserResource.UserList; import org.openmetadata.catalog.resources.teams.UserResource.UserList;
import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EntityReference;
@ -101,6 +102,9 @@ public abstract class UserRepository {
@CreateSqlObject @CreateSqlObject
abstract ChartDAO chartDAO(); abstract ChartDAO chartDAO();
@CreateSqlObject
abstract TaskDAO taskDAO();
@Transaction @Transaction
public UserList listAfter(Fields fields, int limitParam, String after) throws IOException, public UserList listAfter(Fields fields, int limitParam, String after) throws IOException,
ParseException, GeneralSecurityException { ParseException, GeneralSecurityException {
@ -235,12 +239,12 @@ public abstract class UserRepository {
} }
// Populate details in entity reference // Populate details in entity reference
return EntityUtil.getEntityReference(ownedEntities, tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), return EntityUtil.getEntityReference(ownedEntities, tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(),
reportDAO(), topicDAO(), chartDAO()); reportDAO(), topicDAO(), chartDAO(), taskDAO());
} }
private List<EntityReference> getFollows(User user) throws IOException { private List<EntityReference> getFollows(User user) throws IOException {
return EntityUtil.getEntityReference(relationshipDAO().findTo(user.getId().toString(), FOLLOWS.ordinal()), return EntityUtil.getEntityReference(relationshipDAO().findTo(user.getId().toString(), FOLLOWS.ordinal()),
tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO()); tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO());
} }
private void patchTeams(User original, User updated) throws IOException { private void patchTeams(User original, User updated) throws IOException {

View File

@ -54,6 +54,7 @@ import java.util.UUID;
@Consumes(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "pipelines", repositoryClass = "org.openmetadata.catalog.jdbi3.PipelineRepository") @Collection(name = "pipelines", repositoryClass = "org.openmetadata.catalog.jdbi3.PipelineRepository")
public class PipelineResource { public class PipelineResource {
public static final String COLLECTION_PATH = "/v1/pipelines/"; public static final String COLLECTION_PATH = "/v1/pipelines/";
private final List<String> attributes = RestUtil.getAttributes(Pipeline.class); private final List<String> attributes = RestUtil.getAttributes(Pipeline.class);
private final List<String> relationships = RestUtil.getAttributes(Pipeline.class); private final List<String> relationships = RestUtil.getAttributes(Pipeline.class);

View File

@ -111,7 +111,7 @@ public class MessagingServiceResource {
responses = { responses = {
@ApiResponse(responseCode = "200", description = "Messaging service instance", @ApiResponse(responseCode = "200", description = "Messaging service instance",
content = @Content(mediaType = "application/json", content = @Content(mediaType = "application/json",
schema = @Schema(implementation = Dashboard.class))), schema = @Schema(implementation = MessagingService.class))),
@ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found") @ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found")
}) })
public MessagingService get(@Context UriInfo uriInfo, public MessagingService get(@Context UriInfo uriInfo,
@ -184,17 +184,17 @@ public class MessagingServiceResource {
@DELETE @DELETE
@Path("/{id}") @Path("/{id}")
@Operation(summary = "Delete a database service", tags = "services", @Operation(summary = "Delete a messaging service", tags = "services",
description = "Delete a database services. If databases (and tables) belong the service, it can't be " + description = "Delete a messaing services. If topics belong the service, it can't be " +
"deleted.", "deleted.",
responses = { responses = {
@ApiResponse(responseCode = "200", description = "OK"), @ApiResponse(responseCode = "200", description = "OK"),
@ApiResponse(responseCode = "404", description = "DatabaseService service for instance {id} " + @ApiResponse(responseCode = "404", description = "MessagingService service for instance {id} " +
"is not found") "is not found")
}) })
public Response delete(@Context UriInfo uriInfo, public Response delete(@Context UriInfo uriInfo,
@Context SecurityContext securityContext, @Context SecurityContext securityContext,
@Parameter(description = "Id of the database service", schema = @Schema(type = "string")) @Parameter(description = "Id of the messaging service", schema = @Schema(type = "string"))
@PathParam("id") String id) { @PathParam("id") String id) {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
dao.delete(id); dao.delete(id);

View File

@ -25,10 +25,14 @@ import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.openmetadata.catalog.api.services.CreateMessagingService; import org.openmetadata.catalog.api.services.CreateMessagingService;
import org.openmetadata.catalog.api.services.CreatePipelineService;
import org.openmetadata.catalog.api.services.UpdateMessagingService; import org.openmetadata.catalog.api.services.UpdateMessagingService;
import org.openmetadata.catalog.api.services.UpdatePipelineService;
import org.openmetadata.catalog.entity.data.Dashboard; import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.services.MessagingService; import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.jdbi3.MessagingServiceRepository; import org.openmetadata.catalog.jdbi3.MessagingServiceRepository;
import org.openmetadata.catalog.jdbi3.PipelineServiceRepository;
import org.openmetadata.catalog.resources.Collection; import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.security.CatalogAuthorizer; import org.openmetadata.catalog.security.CatalogAuthorizer;
import org.openmetadata.catalog.security.SecurityUtil; import org.openmetadata.catalog.security.SecurityUtil;
@ -60,61 +64,62 @@ import java.util.UUID;
@Api(value = "Pipeline service collection", tags = "Services -> Pipeline service collection") @Api(value = "Pipeline service collection", tags = "Services -> Pipeline service collection")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "messagingServices", repositoryClass = "org.openmetadata.catalog.jdbi3.MessagingServiceRepository") @Collection(name = "pipelineServices", repositoryClass = "org.openmetadata.catalog.jdbi3.PipelineServiceRepository")
public class PipelineServiceResource { public class PipelineServiceResource {
private final MessagingServiceRepository dao; private final PipelineServiceRepository dao;
private final CatalogAuthorizer authorizer; private final CatalogAuthorizer authorizer;
public static EntityReference addHref(UriInfo uriInfo, EntityReference service) { public static EntityReference addHref(UriInfo uriInfo, EntityReference service) {
return service.withHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", service.getId())); return service.withHref(RestUtil.getHref(uriInfo, "v1/services/pipelineServices/", service.getId()));
} }
private static List<MessagingService> addHref(UriInfo uriInfo, List<MessagingService> instances) { private static List<PipelineService> addHref(UriInfo uriInfo, List<PipelineService> instances) {
instances.forEach(i -> addHref(uriInfo, i)); instances.forEach(i -> addHref(uriInfo, i));
return instances; return instances;
} }
private static MessagingService addHref(UriInfo uriInfo, MessagingService dbService) { private static PipelineService addHref(UriInfo uriInfo, PipelineService pipelineService) {
dbService.setHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", dbService.getId())); pipelineService.setHref(RestUtil.getHref(uriInfo, "v1/services/pipelineServices/",
return dbService; pipelineService.getId()));
return pipelineService;
} }
@Inject @Inject
public PipelineServiceResource(MessagingServiceRepository dao, CatalogAuthorizer authorizer) { public PipelineServiceResource(PipelineServiceRepository dao, CatalogAuthorizer authorizer) {
Objects.requireNonNull(dao, "MessagingServiceRepository must not be null"); Objects.requireNonNull(dao, "PipelineServiceRepository must not be null");
this.dao = dao; this.dao = dao;
this.authorizer = authorizer; this.authorizer = authorizer;
} }
static class MessagingServiceList extends ResultList<MessagingService> { static class PipelineServiceList extends ResultList<PipelineService> {
MessagingServiceList(List<MessagingService> data) { PipelineServiceList(List<PipelineService> data) {
super(data); super(data);
} }
} }
@GET @GET
@Operation(summary = "List messaging services", tags = "services", @Operation(summary = "List Pipeline services", tags = "services",
description = "Get a list of messaging services.", description = "Get a list of pipeline services.",
responses = { responses = {
@ApiResponse(responseCode = "200", description = "List of messaging service instances", @ApiResponse(responseCode = "200", description = "List of pipeline service instances",
content = @Content(mediaType = "application/json", content = @Content(mediaType = "application/json",
schema = @Schema(implementation = MessagingServiceList.class))) schema = @Schema(implementation = PipelineServiceList.class)))
}) })
public MessagingServiceList list(@Context UriInfo uriInfo, @QueryParam("name") String name) throws IOException { public PipelineServiceList list(@Context UriInfo uriInfo, @QueryParam("name") String name) throws IOException {
return new MessagingServiceList(addHref(uriInfo, dao.list(name))); return new PipelineServiceList(addHref(uriInfo, dao.list(name)));
} }
@GET @GET
@Path("/{id}") @Path("/{id}")
@Operation(summary = "Get a messaging service", tags = "services", @Operation(summary = "Get a pipeline service", tags = "services",
description = "Get a messaging service by `id`.", description = "Get a pipeline service by `id`.",
responses = { responses = {
@ApiResponse(responseCode = "200", description = "Messaging service instance", @ApiResponse(responseCode = "200", description = "Pipeline service instance",
content = @Content(mediaType = "application/json", content = @Content(mediaType = "application/json",
schema = @Schema(implementation = Dashboard.class))), schema = @Schema(implementation = PipelineService.class))),
@ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found") @ApiResponse(responseCode = "404", description = "Pipeline service for instance {id} is not found")
}) })
public MessagingService get(@Context UriInfo uriInfo, public PipelineService get(@Context UriInfo uriInfo,
@Context SecurityContext securityContext, @Context SecurityContext securityContext,
@PathParam("id") String id) throws IOException { @PathParam("id") String id) throws IOException {
return addHref(uriInfo, dao.get(id)); return addHref(uriInfo, dao.get(id));
@ -130,7 +135,7 @@ public class PipelineServiceResource {
schema = @Schema(implementation = Dashboard.class))), schema = @Schema(implementation = Dashboard.class))),
@ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found") @ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found")
}) })
public MessagingService getByName(@Context UriInfo uriInfo, public PipelineService getByName(@Context UriInfo uriInfo,
@Context SecurityContext securityContext, @Context SecurityContext securityContext,
@PathParam("name") String name) throws IOException { @PathParam("name") String name) throws IOException {
return addHref(uriInfo, dao.getByName(name)); return addHref(uriInfo, dao.getByName(name));
@ -147,54 +152,51 @@ public class PipelineServiceResource {
}) })
public Response create(@Context UriInfo uriInfo, public Response create(@Context UriInfo uriInfo,
@Context SecurityContext securityContext, @Context SecurityContext securityContext,
@Valid CreateMessagingService create) throws JsonProcessingException { @Valid CreatePipelineService create) throws JsonProcessingException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
MessagingService service = new MessagingService().withId(UUID.randomUUID()) PipelineService service = new PipelineService().withId(UUID.randomUUID())
.withName(create.getName()).withDescription(create.getDescription()) .withName(create.getName()).withDescription(create.getDescription())
.withServiceType(create.getServiceType()) .withServiceType(create.getServiceType())
.withBrokers(create.getBrokers()) .withUrl(create.getUrl())
.withSchemaRegistry(create.getSchemaRegistry())
.withIngestionSchedule(create.getIngestionSchedule()); .withIngestionSchedule(create.getIngestionSchedule());
addHref(uriInfo, dao.create(service)); addHref(uriInfo, dao.create(service));
return Response.created(service.getHref()).entity(service).build(); return Response.created(service.getHref()).entity(service).build();
} }
@PUT @PUT
@Path("/{id}") @Path("/{id}")
@Operation(summary = "Update a messaging service", tags = "services", @Operation(summary = "Update a pipeline service", tags = "services",
description = "Update an existing messaging service identified by `id`.", description = "Update an existing pipeline service identified by `id`.",
responses = { responses = {
@ApiResponse(responseCode = "200", description = "Messaging service instance", @ApiResponse(responseCode = "200", description = "Pipeline service instance",
content = @Content(mediaType = "application/json", content = @Content(mediaType = "application/json",
schema = @Schema(implementation = MessagingService.class))), schema = @Schema(implementation = PipelineService.class))),
@ApiResponse(responseCode = "400", description = "Bad request") @ApiResponse(responseCode = "400", description = "Bad request")
}) })
public Response update(@Context UriInfo uriInfo, public Response update(@Context UriInfo uriInfo,
@Context SecurityContext securityContext, @Context SecurityContext securityContext,
@Parameter(description = "Id of the messaging service", schema = @Schema(type = "string")) @Parameter(description = "Id of the pipeline service", schema = @Schema(type = "string"))
@PathParam("id") String id, @PathParam("id") String id,
@Valid UpdateMessagingService update) throws IOException { @Valid UpdatePipelineService update) throws IOException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
MessagingService service = addHref(uriInfo, PipelineService service = addHref(uriInfo,
dao.update(id, update.getDescription(), update.getBrokers(), update.getSchemaRegistry(), dao.update(id, update.getDescription(), update.getUrl(), update.getIngestionSchedule()));
update.getIngestionSchedule()));
return Response.ok(service).build(); return Response.ok(service).build();
} }
@DELETE @DELETE
@Path("/{id}") @Path("/{id}")
@Operation(summary = "Delete a database service", tags = "services", @Operation(summary = "Delete a pipeline service", tags = "services",
description = "Delete a database services. If databases (and tables) belong the service, it can't be " + description = "Delete a pipeline services. If pipelines (and tasks) belong to the service, it can't be " +
"deleted.", "deleted.",
responses = { responses = {
@ApiResponse(responseCode = "200", description = "OK"), @ApiResponse(responseCode = "200", description = "OK"),
@ApiResponse(responseCode = "404", description = "DatabaseService service for instance {id} " + @ApiResponse(responseCode = "404", description = "Pipeline service for instance {id} " +
"is not found") "is not found")
}) })
public Response delete(@Context UriInfo uriInfo, public Response delete(@Context UriInfo uriInfo,
@Context SecurityContext securityContext, @Context SecurityContext securityContext,
@Parameter(description = "Id of the database service", schema = @Schema(type = "string")) @Parameter(description = "Id of the pipeline service", schema = @Schema(type = "string"))
@PathParam("id") String id) { @PathParam("id") String id) {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
dao.delete(id); dao.delete(id);

View File

@ -123,7 +123,7 @@ public class TaskResource {
} }
} }
static final String FIELDS = "owner,service,followers,tags"; static final String FIELDS = "taskConfig,owner,service,followers,tags";
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "") public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "")
.split(",")); .split(","));

View File

@ -249,14 +249,15 @@ public final class EntityUtil {
TopicDAO topicDAO, ChartDAO chartDAO, TopicDAO topicDAO, ChartDAO chartDAO,
TaskDAO taskDAO) throws IOException { TaskDAO taskDAO) throws IOException {
for (EntityReference ref : list) { for (EntityReference ref : list) {
getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO, chartDAO); getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO, chartDAO, taskDAO);
} }
return list; return list;
} }
public static EntityReference getEntityReference(EntityReference ref, TableDAO tableDAO, DatabaseDAO databaseDAO, public static EntityReference getEntityReference(EntityReference ref, TableDAO tableDAO, DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO, MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO) ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
TaskDAO taskDAO)
throws IOException { throws IOException {
// Note href to entity reference is not added here // Note href to entity reference is not added here
String entity = ref.getType(); String entity = ref.getType();
@ -282,22 +283,27 @@ public final class EntityUtil {
} else if (entity.equalsIgnoreCase(Entity.CHART)) { } else if (entity.equalsIgnoreCase(Entity.CHART)) {
Chart instance = EntityUtil.validate(id, chartDAO.findById(id), Chart.class); Chart instance = EntityUtil.validate(id, chartDAO.findById(id), Chart.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
Task instance = EntityUtil.validate(id, taskDAO.findById(id), Task.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} }
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entity)); throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entity));
} }
public static EntityReference getEntityReference(String entity, UUID id, TableDAO tableDAO, DatabaseDAO databaseDAO, public static EntityReference getEntityReference(String entity, UUID id, TableDAO tableDAO, DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO, MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO) ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
TaskDAO taskDAO)
throws IOException { throws IOException {
EntityReference ref = new EntityReference().withId(id).withType(entity); EntityReference ref = new EntityReference().withId(id).withType(entity);
return getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO, chartDAO); return getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO,
reportDAO, topicDAO, chartDAO, taskDAO);
} }
public static EntityReference getEntityReferenceByName(String entity, String fqn, TableDAO tableDAO, public static EntityReference getEntityReferenceByName(String entity, String fqn, TableDAO tableDAO,
DatabaseDAO databaseDAO, MetricsDAO metricsDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO, ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
DashboardDAO dashboardDAO) DashboardDAO dashboardDAO, TaskDAO taskDAO)
throws IOException { throws IOException {
if (entity.equalsIgnoreCase(Entity.TABLE)) { if (entity.equalsIgnoreCase(Entity.TABLE)) {
Table instance = EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class); Table instance = EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class);
@ -327,6 +333,10 @@ public final class EntityUtil {
Dashboard instance = EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class); Dashboard instance = EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.DASHBOARD) return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.DASHBOARD)
.withDescription(instance.getDescription()); .withDescription(instance.getDescription());
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
Task instance = EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.TASK)
.withDescription(instance.getDescription());
} }
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn)); throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn));
} }
@ -362,6 +372,9 @@ public final class EntityUtil {
} else if (clazz.toString().toLowerCase().endsWith(Entity.DASHBOARD.toLowerCase())) { } else if (clazz.toString().toLowerCase().endsWith(Entity.DASHBOARD.toLowerCase())) {
Dashboard instance = (Dashboard) entity; Dashboard instance = (Dashboard) entity;
return getEntityReference(instance); return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.TASK.toLowerCase())) {
Task instance = (Task) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.MESSAGING_SERVICE.toLowerCase())) { } else if (clazz.toString().toLowerCase().endsWith(Entity.MESSAGING_SERVICE.toLowerCase())) {
MessagingService instance = (MessagingService) entity; MessagingService instance = (MessagingService) entity;
return getEntityReference(instance); return getEntityReference(instance);

View File

@ -0,0 +1,36 @@
{
"$id": "https://open-metadata.org/schema/api/services/createPipelineService.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Create Pipeline service entity request",
"description": "Create Pipeline service entity request",
"type": "object",
"properties": {
"name": {
"description": "Name that identifies the this entity instance uniquely",
"type": "string",
"minLength": 1,
"maxLength": 64
},
"description": {
"description": "Description of pipeline service entity.",
"type": "string"
},
"serviceType": {
"$ref": "../../entity/services/pipelineService.json#/definitions/pipelineServiceType"
},
"url": {
"description": "Pipeline UI URL",
"type": "string",
"format": "uri"
},
"ingestionSchedule": {
"description": "Schedule for running pipeline ingestion jobs",
"$ref": "../../type/schedule.json"
}
},
"required": [
"name",
"serviceType",
"url"
]
}

View File

@ -0,0 +1,23 @@
{
"$id": "https://open-metadata.org/schema/api/services/updatePipelineService.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Update pipeline service entity request",
"description": "Update pipeline service entity request",
"type": "object",
"properties" : {
"description": {
"description": "Description of Pipeline service entity.",
"type": "string"
},
"url" : {
"description": "Pipeline Service UI URL.",
"type": "string",
"format": "uri"
},
"ingestionSchedule" : {
"description": "Schedule for running metadata ingestion jobs",
"$ref" : "../../type/schedule.json"
}
}
}

View File

@ -41,6 +41,11 @@
"description": "Description of a messaging service instance.", "description": "Description of a messaging service instance.",
"type": "string" "type": "string"
}, },
"url": {
"description": "Pipeline Service Management/UI URL",
"type": "string",
"format": "uri"
},
"ingestionSchedule": { "ingestionSchedule": {
"description": "Schedule for running metadata ingestion jobs.", "description": "Schedule for running metadata ingestion jobs.",
"$ref": "../../type/schedule.json" "$ref": "../../type/schedule.json"

View File

@ -0,0 +1,394 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.resources.services;
import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.openmetadata.catalog.CatalogApplicationTest;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.services.CreatePipelineService;
import org.openmetadata.catalog.api.services.UpdatePipelineService;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.type.Schedule;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.TestUtils;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CONFLICT;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import static org.openmetadata.catalog.util.TestUtils.authHeaders;
public class PipelineServiceResourceTest extends CatalogApplicationTest {
public static URI PIPELINE_SERVICE_URL;
@BeforeAll
public static void setup(TestInfo test) throws URISyntaxException {
PIPELINE_SERVICE_URL = new URI("http://localhost:8080");
}
@Test
public void post_serviceWithLongName_400_badRequest(TestInfo test) {
// Create pipeline with mandatory name field empty
CreatePipelineService create = create(test).withName(TestUtils.LONG_ENTITY_NAME);
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]");
}
@Test
public void post_withoutRequiredFields_400_badRequest(TestInfo test) {
// Create pipeline with mandatory name field null
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create(test).withName(null), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[name must not be null]");
// Create pipeline with mandatory name field empty
exception = assertThrows(HttpResponseException.class, () ->
createService(create(test).withName(""), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]");
// Create pipeline with mandatory serviceType field empty
exception = assertThrows(HttpResponseException.class, () ->
createService(create(test).withServiceType(null), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[serviceType must not be null]");
// Create pipeline with mandatory brokers field empty
exception = assertThrows(HttpResponseException.class, () ->
createService(create(test).withUrl(null), adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "[url must not be null]");
}
@Test
public void post_serviceAlreadyExists_409(TestInfo test) throws HttpResponseException {
CreatePipelineService create = create(test);
createService(create, adminAuthHeaders());
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, CONFLICT, CatalogExceptionMessage.ENTITY_ALREADY_EXISTS);
}
@Test
public void post_validService_as_admin_200_ok(TestInfo test) throws HttpResponseException {
// Create pipeline service with different optional fields
Map<String, String> authHeaders = adminAuthHeaders();
createAndCheckService(create(test, 1).withDescription(null), authHeaders);
createAndCheckService(create(test, 2).withDescription("description"), authHeaders);
createAndCheckService(create(test, 3).withIngestionSchedule(null), authHeaders);
}
@Test
public void post_validService_as_non_admin_401(TestInfo test) {
// Create pipeline service with different optional fields
Map<String, String> authHeaders = authHeaders("test@open-metadata.org");
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createAndCheckService(create(test, 1).withDescription(null), authHeaders));
TestUtils.assertResponse(exception, FORBIDDEN,
"Principal: CatalogPrincipal{name='test'} is not admin");
}
@Test
public void post_invalidIngestionSchedule_4xx(TestInfo test) {
// No jdbc connection set
CreatePipelineService create = create(test);
Schedule schedule = create.getIngestionSchedule();
// Invalid format
create.withIngestionSchedule(schedule.withRepeatFrequency("INVALID"));
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "Invalid ingestion repeatFrequency INVALID");
// Duration that contains years, months and seconds are not allowed
create.withIngestionSchedule(schedule.withRepeatFrequency("P1Y"));
exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST,
"Ingestion repeatFrequency can only contain Days, Hours, " +
"and Minutes - example P{d}DT{h}H{m}M");
create.withIngestionSchedule(schedule.withRepeatFrequency("P1M"));
exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST,
"Ingestion repeatFrequency can only contain Days, Hours, " +
"and Minutes - example P{d}DT{h}H{m}M");
create.withIngestionSchedule(schedule.withRepeatFrequency("PT1S"));
exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST,
"Ingestion repeatFrequency can only contain Days, Hours, " +
"and Minutes - example P{d}DT{h}H{m}M");
}
@Test
public void post_validIngestionSchedules_as_admin_200(TestInfo test) throws HttpResponseException {
Schedule schedule = new Schedule().withStartDate(new Date());
schedule.withRepeatFrequency("PT60M"); // Repeat every 60M should be valid
createAndCheckService(create(test, 1).withIngestionSchedule(schedule), adminAuthHeaders());
schedule.withRepeatFrequency("PT1H49M");
createAndCheckService(create(test, 2).withIngestionSchedule(schedule), adminAuthHeaders());
schedule.withRepeatFrequency("P1DT1H49M");
createAndCheckService(create(test, 3).withIngestionSchedule(schedule), adminAuthHeaders());
}
@Test
public void post_ingestionScheduleIsTooShort_4xx(TestInfo test) {
// No jdbc connection set
CreatePipelineService create = create(test);
Schedule schedule = create.getIngestionSchedule();
create.withIngestionSchedule(schedule.withRepeatFrequency("PT1M")); // Repeat every 0 seconds
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createService(create, adminAuthHeaders()));
TestUtils.assertResponseContains(exception, BAD_REQUEST,
"Ingestion repeatFrequency is too short and must be more than 60 minutes");
create.withIngestionSchedule(schedule.withRepeatFrequency("PT59M")); // Repeat every 50 minutes 59 seconds
exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders()));
TestUtils.assertResponse(exception, BAD_REQUEST, "Ingestion repeatFrequency is too short and must " +
"be more than 60 minutes");
}
@Test
public void put_updateNonExistentService_404() {
// Update pipeline description and ingestion service that are null
UpdatePipelineService update = new UpdatePipelineService().withDescription("description1");
HttpResponseException exception = assertThrows(HttpResponseException.class, ()
-> updatePipelineService(TestUtils.NON_EXISTENT_ENTITY.toString(), update, OK, adminAuthHeaders()));
TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound("PipelineService",
TestUtils.NON_EXISTENT_ENTITY));
}
@Test
public void put_updateService_as_admin_2xx(TestInfo test) throws HttpResponseException, URISyntaxException {
PipelineService dbService = createAndCheckService(create(test).withDescription(null).withIngestionSchedule(null)
.withUrl(PIPELINE_SERVICE_URL), adminAuthHeaders());
String id = dbService.getId().toString();
// Update pipeline description and ingestion service that are null
UpdatePipelineService update = new UpdatePipelineService().withDescription("description1");
updateAndCheckService(id, update, OK, adminAuthHeaders());
// Update ingestion schedule
Schedule schedule = new Schedule().withStartDate(new Date()).withRepeatFrequency("P1D");
update.withIngestionSchedule(schedule);
updateAndCheckService(id, update, OK, adminAuthHeaders());
// Update description and ingestion schedule again
update.withDescription("description1").withIngestionSchedule(schedule.withRepeatFrequency("PT1H"));
updateAndCheckService(id, update, OK, adminAuthHeaders());
// update broker list and schema registry
update.withUrl(new URI("http://localhost:9000"));
updateAndCheckService(id, update, OK, adminAuthHeaders());
PipelineService updatedService = getService(dbService.getId(), adminAuthHeaders());
validatePipelineServiceConfig(updatedService, List.of("localhost:0"), new URI("http://localhost:9000"));
}
@Test
public void put_update_as_non_admin_401(TestInfo test) throws HttpResponseException {
Map<String, String> authHeaders = adminAuthHeaders();
PipelineService pipelineService = createAndCheckService(create(test).withDescription(null)
.withIngestionSchedule(null),
authHeaders);
String id = pipelineService.getId().toString();
RestUtil.DATE_TIME_FORMAT.format(new Date());
// Update pipeline description and ingestion service that are null
UpdatePipelineService update = new UpdatePipelineService().withDescription("description1");
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
updateAndCheckService(id, update, OK, authHeaders("test@open-metadata.org")));
TestUtils.assertResponse(exception, FORBIDDEN, "Principal: CatalogPrincipal{name='test'} " +
"is not admin");
}
@Test
public void get_nonExistentService_404_notFound() {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
getService(TestUtils.NON_EXISTENT_ENTITY, adminAuthHeaders()));
TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.PIPELINE_SERVICE,
TestUtils.NON_EXISTENT_ENTITY));
}
@Test
public void get_nonExistentServiceByName_404_notFound() {
HttpResponseException exception = assertThrows(HttpResponseException.class, ()
-> getServiceByName("invalidName", null, adminAuthHeaders()));
TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.PIPELINE_SERVICE,
"invalidName"));
}
public static PipelineService createAndCheckService(CreatePipelineService create,
Map<String, String> authHeaders) throws HttpResponseException {
PipelineService service = createService(create, authHeaders);
validateService(service, create.getName(), create.getDescription(), create.getIngestionSchedule());
// GET the newly created service and validate
PipelineService getService = getService(service.getId(), authHeaders);
validateService(getService, create.getName(), create.getDescription(), create.getIngestionSchedule());
// GET the newly created service by name and validate
getService = getServiceByName(service.getName(), null, authHeaders);
validateService(getService, create.getName(), create.getDescription(), create.getIngestionSchedule());
return service;
}
public static PipelineService createService(CreatePipelineService create,
Map<String, String> authHeaders) throws HttpResponseException {
return TestUtils.post(CatalogApplicationTest.getResource("services/pipelineServices"),
create, PipelineService.class, authHeaders);
}
private static void validateService(PipelineService service, String expectedName, String expectedDescription,
Schedule expectedIngestion) {
assertNotNull(service.getId());
assertNotNull(service.getHref());
assertEquals(expectedName, service.getName());
assertEquals(expectedDescription, service.getDescription());
if (expectedIngestion != null) {
assertEquals(expectedIngestion.getStartDate(), service.getIngestionSchedule().getStartDate());
assertEquals(expectedIngestion.getRepeatFrequency(), service.getIngestionSchedule().getRepeatFrequency());
}
}
public static PipelineService getService(UUID id, Map<String, String> authHeaders) throws HttpResponseException {
return getService(id, null, authHeaders);
}
public static PipelineService getService(UUID id, String fields, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = CatalogApplicationTest.getResource("services/pipelineServices/" + id);
target = fields != null ? target.queryParam("fields", fields) : target;
return TestUtils.get(target, PipelineService.class, authHeaders);
}
public static PipelineService getServiceByName(String name, String fields, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = CatalogApplicationTest.getResource("services/pipelineServices/name/" + name);
target = fields != null ? target.queryParam("fields", fields) : target;
return TestUtils.get(target, PipelineService.class, authHeaders);
}
public static String getName(TestInfo test) {
return String.format("pservice_%s", test.getDisplayName());
}
public static String getName(TestInfo test, int index) {
return String.format("pservice_%d_%s", index, test.getDisplayName());
}
@Test
public void delete_ExistentPipelineService_as_admin_200(TestInfo test) throws HttpResponseException {
Map<String, String> authHeaders = adminAuthHeaders();
PipelineService pipelineService = createService(create(test), authHeaders);
deleteService(pipelineService.getId(), pipelineService.getName(), authHeaders);
}
@Test
public void delete_as_user_401(TestInfo test) throws HttpResponseException {
Map<String, String> authHeaders = adminAuthHeaders();
PipelineService pipelineService = createService(create(test), authHeaders);
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
deleteService(pipelineService.getId(), pipelineService.getName(),
authHeaders("test@open-metadata.org")));
TestUtils.assertResponse(exception, FORBIDDEN,
"Principal: CatalogPrincipal{name='test'} is not admin");
}
@Test
public void delete_notExistentPipelineService() {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
getService(TestUtils.NON_EXISTENT_ENTITY, adminAuthHeaders()));
TestUtils.assertResponse(exception, NOT_FOUND,
CatalogExceptionMessage.entityNotFound(Entity.PIPELINE_SERVICE, TestUtils.NON_EXISTENT_ENTITY));
}
private void deleteService(UUID id, String name, Map<String, String> authHeaders) throws HttpResponseException {
TestUtils.delete(CatalogApplicationTest.getResource("services/pipelineServices/" + id), authHeaders);
// Ensure deleted service does not exist
HttpResponseException exception = assertThrows(HttpResponseException.class, () -> getService(id, authHeaders));
TestUtils.assertResponse(exception, NOT_FOUND,
CatalogExceptionMessage.entityNotFound(Entity.PIPELINE_SERVICE, id));
// Ensure deleted service does not exist when getting by name
exception = assertThrows(HttpResponseException.class, () -> getServiceByName(name, null, authHeaders));
TestUtils.assertResponse(exception, NOT_FOUND,
CatalogExceptionMessage.entityNotFound(Entity.PIPELINE_SERVICE, name));
}
public static CreatePipelineService create(TestInfo test) {
return new CreatePipelineService().withName(getName(test))
.withServiceType(CreatePipelineService.PipelineServiceType.Airflow)
.withUrl(PIPELINE_SERVICE_URL)
.withIngestionSchedule(new Schedule().withStartDate(new Date()).withRepeatFrequency("P1D"));
}
private static CreatePipelineService create(TestInfo test, int index) {
return new CreatePipelineService().withName(getName(test, index))
.withServiceType(CreatePipelineService.PipelineServiceType.Airflow)
.withUrl(PIPELINE_SERVICE_URL)
.withIngestionSchedule(new Schedule().withStartDate(new Date()).withRepeatFrequency("P1D"));
}
public static void updateAndCheckService(String id, UpdatePipelineService update, Status status,
Map<String, String> authHeaders) throws HttpResponseException {
PipelineService service = updatePipelineService(id, update, status, authHeaders);
validateService(service, service.getName(), update.getDescription(), update.getIngestionSchedule());
// GET the newly updated pipeline and validate
PipelineService getService = getService(service.getId(), authHeaders);
validateService(getService, service.getName(), update.getDescription(), update.getIngestionSchedule());
// GET the newly updated pipeline by name and validate
getService = getServiceByName(service.getName(), null, authHeaders);
validateService(getService, service.getName(), update.getDescription(), update.getIngestionSchedule());
}
public static PipelineService updatePipelineService(String id, UpdatePipelineService updated,
Status status, Map<String, String> authHeaders)
throws HttpResponseException {
return TestUtils.put(CatalogApplicationTest.getResource("services/pipelineServices/" + id), updated,
PipelineService.class, status, authHeaders);
}
private static void validatePipelineServiceConfig(PipelineService actualService, List<String> expectedBrokers,
URI expectedUrl)
throws HttpResponseException {
assertEquals(actualService.getUrl(), expectedUrl);
}
}