From 4518be7ddb3aa28b155e4fb9fc84b1b493d36bd3 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 24 Sep 2021 16:28:44 -0700 Subject: [PATCH] [WIP] Airlfow integration --- .../mysql/v001__create_db_connection_info.sql | 10 + .../jdbi3/MessagingServiceRepository.java | 16 +- .../jdbi3/PipelineServiceRepository.java | 139 ++++++ .../catalog/jdbi3/TaskRepository.java | 8 +- .../catalog/jdbi3/TeamRepository.java | 6 +- .../catalog/jdbi3/UsageRepository.java | 12 +- .../catalog/jdbi3/UserRepository.java | 8 +- .../resources/pipelines/PipelineResource.java | 1 + .../messaging/MessagingServiceResource.java | 10 +- .../pipeline/PipelineServiceResource.java | 86 ++-- .../catalog/resources/tasks/TaskResource.java | 2 +- .../openmetadata/catalog/util/EntityUtil.java | 23 +- .../api/services/createPipelineService.json | 36 ++ .../api/services/updatePipelineService.json | 23 + .../entity/services/pipelineService.json | 5 + .../services/PipelineServiceResourceTest.java | 394 ++++++++++++++++++ 16 files changed, 707 insertions(+), 72 deletions(-) create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineServiceRepository.java create mode 100644 catalog-rest-service/src/main/resources/json/schema/api/services/createPipelineService.json create mode 100644 catalog-rest-service/src/main/resources/json/schema/api/services/updatePipelineService.json create mode 100644 catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/PipelineServiceResourceTest.java diff --git a/bootstrap/sql/mysql/v001__create_db_connection_info.sql b/bootstrap/sql/mysql/v001__create_db_connection_info.sql index 7b9efec6a91..fa1fae23d4e 100644 --- a/bootstrap/sql/mysql/v001__create_db_connection_info.sql +++ b/bootstrap/sql/mysql/v001__create_db_connection_info.sql @@ -81,6 +81,16 @@ CREATE TABLE IF NOT EXISTS dashboard_service_entity ( UNIQUE KEY unique_name(name) ); +CREATE TABLE IF NOT EXISTS pipeline_service_entity ( + id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL, + name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL, + serviceType VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.serviceType') NOT NULL, + json JSON NOT NULL, + timestamp BIGINT, + PRIMARY KEY (id), + UNIQUE KEY unique_name(name) +); + -- -- Data entities -- diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/MessagingServiceRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/MessagingServiceRepository.java index 4f521da0a9b..08a753983e9 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/MessagingServiceRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/MessagingServiceRepository.java @@ -44,31 +44,31 @@ public abstract class MessagingServiceRepository { private static final Logger LOG = LoggerFactory.getLogger(MessagingServiceRepository.class); @CreateSqlObject - abstract MessagingServiceDAO messagingServiceDOA(); + abstract MessagingServiceDAO messagingServiceDAO(); @CreateSqlObject abstract EntityRelationshipDAO relationshipDAO(); @Transaction public List list(String name) throws IOException { - return JsonUtils.readObjects(messagingServiceDOA().list(name), MessagingService.class); + return JsonUtils.readObjects(messagingServiceDAO().list(name), MessagingService.class); } @Transaction public MessagingService get(String id) throws IOException { - return EntityUtil.validate(id, messagingServiceDOA().findById(id), MessagingService.class); + return EntityUtil.validate(id, messagingServiceDAO().findById(id), MessagingService.class); } @Transaction public MessagingService getByName(String name) throws IOException { - return EntityUtil.validate(name, messagingServiceDOA().findByName(name), MessagingService.class); + return EntityUtil.validate(name, messagingServiceDAO().findByName(name), MessagingService.class); } @Transaction public MessagingService create(MessagingService messagingService) throws JsonProcessingException { // Validate fields validateIngestionSchedule(messagingService.getIngestionSchedule()); - messagingServiceDOA().insert(JsonUtils.pojoToJson(messagingService)); + messagingServiceDAO().insert(JsonUtils.pojoToJson(messagingService)); return messagingService; } @@ -76,17 +76,17 @@ public abstract class MessagingServiceRepository { Schedule ingestionSchedule) throws IOException { validateIngestionSchedule(ingestionSchedule); - MessagingService dbService = EntityUtil.validate(id, messagingServiceDOA().findById(id), MessagingService.class); + MessagingService dbService = EntityUtil.validate(id, messagingServiceDAO().findById(id), MessagingService.class); // Update fields dbService.withDescription(description).withIngestionSchedule(ingestionSchedule) .withSchemaRegistry(schemaRegistry).withBrokers(brokers); - messagingServiceDOA().update(id, JsonUtils.pojoToJson(dbService)); + messagingServiceDAO().update(id, JsonUtils.pojoToJson(dbService)); return dbService; } @Transaction public void delete(String id) { - if (messagingServiceDOA().delete(id) <= 0) { + if (messagingServiceDAO().delete(id) <= 0) { throw EntityNotFoundException.byMessage(entityNotFound(Entity.MESSAGING_SERVICE, id)); } relationshipDAO().deleteAll(id); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineServiceRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineServiceRepository.java new file mode 100644 index 00000000000..e7beefede1a --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/PipelineServiceRepository.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.joda.time.Period; +import org.joda.time.format.ISOPeriodFormat; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.entity.services.PipelineService; +import org.openmetadata.catalog.exception.EntityNotFoundException; +import org.openmetadata.catalog.type.Schedule; +import org.openmetadata.catalog.util.EntityUtil; +import org.openmetadata.catalog.util.JsonUtils; +import org.skife.jdbi.v2.sqlobject.Bind; +import org.skife.jdbi.v2.sqlobject.CreateSqlObject; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import org.skife.jdbi.v2.sqlobject.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; + + +public abstract class PipelineServiceRepository { + private static final Logger LOG = LoggerFactory.getLogger(PipelineServiceRepository.class); + + @CreateSqlObject + abstract PipelineServiceDAO pipelineServiceDAO(); + + @CreateSqlObject + abstract EntityRelationshipDAO relationshipDAO(); + + @Transaction + public List list(String name) throws IOException { + return JsonUtils.readObjects(pipelineServiceDAO().list(name), PipelineService.class); + } + + @Transaction + public PipelineService get(String id) throws IOException { + return EntityUtil.validate(id, pipelineServiceDAO().findById(id), PipelineService.class); + } + + @Transaction + public PipelineService getByName(String name) throws IOException { + return EntityUtil.validate(name, pipelineServiceDAO().findByName(name), PipelineService.class); + } + + @Transaction + public PipelineService create(PipelineService pipelineService) throws JsonProcessingException { + // Validate fields + validateIngestionSchedule(pipelineService.getIngestionSchedule()); + pipelineServiceDAO().insert(JsonUtils.pojoToJson(pipelineService)); + return pipelineService; + } + + public PipelineService update(String id, String description, URI url, + Schedule ingestionSchedule) + throws IOException { + validateIngestionSchedule(ingestionSchedule); + PipelineService pipelineService = EntityUtil.validate(id, pipelineServiceDAO().findById(id), PipelineService.class); + // Update fields + pipelineService.withDescription(description).withIngestionSchedule(ingestionSchedule) + .withUrl(url); + pipelineServiceDAO().update(id, JsonUtils.pojoToJson(pipelineService)); + return pipelineService; + } + + @Transaction + public void delete(String id) { + if (pipelineServiceDAO().delete(id) <= 0) { + throw EntityNotFoundException.byMessage(entityNotFound(Entity.PIPELINE_SERVICE, id)); + } + relationshipDAO().deleteAll(id); + } + + private void validateIngestionSchedule(Schedule ingestion) { + if (ingestion == null) { + return; + } + String duration = ingestion.getRepeatFrequency(); + + // ISO8601 duration format is P{y}Y{m}M{d}DT{h}H{m}M{s}S. + String[] splits = duration.split("T"); + if (splits[0].contains("Y") || splits[0].contains("M") || + (splits.length == 2 && splits[1].contains("S"))) { + throw new IllegalArgumentException("Ingestion repeatFrequency can only contain Days, Hours, and Minutes - " + + "example P{d}DT{h}H{m}M"); + } + + Period period; + try { + period = ISOPeriodFormat.standard().parsePeriod(duration); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid ingestion repeatFrequency " + duration, e); + } + if (period.toStandardMinutes().getMinutes() < 60) { + throw new IllegalArgumentException("Ingestion repeatFrequency is too short and must be more than 60 minutes"); + } + } + + public interface PipelineServiceDAO { + @SqlUpdate("INSERT INTO pipeline_service_entity (json) VALUES (:json)") + void insert(@Bind("json") String json); + + @SqlUpdate("UPDATE pipeline_service_entity SET json = :json where id = :id") + void update(@Bind("id") String id, @Bind("json") String json); + + @SqlQuery("SELECT json FROM pipeline_service_entity WHERE id = :id") + String findById(@Bind("id") String id); + + @SqlQuery("SELECT json FROM pipeline_service_entity WHERE name = :name") + String findByName(@Bind("name") String name); + + @SqlQuery("SELECT json FROM pipeline_service_entity WHERE (name = :name OR :name is NULL)") + List list(@Bind("name") String name); + + @SqlUpdate("DELETE FROM pipeline_service_entity WHERE id = :id") + int delete(@Bind("id") String id); + } +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java index f03a08b2661..2459f3334eb 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TaskRepository.java @@ -17,7 +17,6 @@ package org.openmetadata.catalog.jdbi3; import org.openmetadata.catalog.Entity; -import org.openmetadata.catalog.entity.data.Chart; import org.openmetadata.catalog.entity.data.Task; import org.openmetadata.catalog.entity.services.DashboardService; import org.openmetadata.catalog.exception.CatalogExceptionMessage; @@ -25,7 +24,7 @@ import org.openmetadata.catalog.exception.EntityNotFoundException; import org.openmetadata.catalog.jdbi3.DashboardServiceRepository.DashboardServiceDAO; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; -import org.openmetadata.catalog.resources.charts.ChartResource; +import org.openmetadata.catalog.resources.tasks.TaskResource; import org.openmetadata.catalog.resources.tasks.TaskResource.TaskList; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.TagLabel; @@ -54,8 +53,9 @@ import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityN public abstract class TaskRepository { private static final Logger LOG = LoggerFactory.getLogger(TaskRepository.class); - private static final Fields TASK_UPDATE_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,taskConfig"); - private static final Fields TASK_PATCH_FIELDS = new Fields(ChartResource.FIELD_LIST, "owner,service,tags"); + private static final Fields TASK_UPDATE_FIELDS = new Fields(TaskResource.FIELD_LIST, "owner," + + "taskConfig,tags"); + private static final Fields TASK_PATCH_FIELDS = new Fields(TaskResource.FIELD_LIST, "owner,service,tags"); public static String getFQN(EntityReference service, Task task) { return (service.getName() + "." + task.getName()); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java index 4ab15dd1615..62bf835138c 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java @@ -28,6 +28,7 @@ import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO; +import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; import org.openmetadata.catalog.resources.teams.TeamResource; import org.openmetadata.catalog.resources.teams.TeamResource.TeamList; import org.openmetadata.catalog.type.EntityReference; @@ -100,6 +101,9 @@ public abstract class TeamRepository { @CreateSqlObject abstract ChartDAO chartDAO(); + @CreateSqlObject + abstract TaskDAO taskDAO(); + @Transaction public Team create(Team team, List userIds) throws IOException { // Query 1 - Validate user IDs @@ -257,7 +261,7 @@ public abstract class TeamRepository { private List getOwns(String teamId) throws IOException { // Compile entities owned by the team return EntityUtil.getEntityReference(relationshipDAO().findTo(teamId, OWNS.ordinal()), tableDAO(), databaseDAO(), - metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO()); + metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO()); } private void addUserRelationship(Team team, User user) { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageRepository.java index 36061b58638..10e48b92129 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageRepository.java @@ -22,6 +22,7 @@ import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO; import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO; +import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO; import org.openmetadata.catalog.type.EntityReference; @@ -76,6 +77,9 @@ public abstract class UsageRepository { @CreateSqlObject abstract ChartDAO chartDAO(); + @CreateSqlObject + abstract TaskDAO taskDAO(); + @CreateSqlObject abstract EntityRelationshipDAO relationshipDAO(); @@ -84,7 +88,7 @@ public abstract class UsageRepository { @Transaction public EntityUsage get(String entityType, String id, String date, int days) throws IOException { EntityReference ref = getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(), - metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO()); + metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO()); List usageDetails = usageDAO().getUsageById(id, date, days - 1); return new EntityUsage().withUsage(usageDetails).withEntity(ref); } @@ -92,7 +96,7 @@ public abstract class UsageRepository { @Transaction public EntityUsage getByName(String entityType, String fqn, String date, int days) throws IOException { EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fqn, tableDAO(), databaseDAO(), - metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO()); + metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO()); List usageDetails = usageDAO().getUsageById(ref.getId().toString(), date, days - 1); return new EntityUsage().withUsage(usageDetails).withEntity(ref); } @@ -101,14 +105,14 @@ public abstract class UsageRepository { public void create(String entityType, String id, DailyCount usage) throws IOException { // Validate data entity for which usage is being collected getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(), metricsDAO(), - dashboardDAO(), reportDAO(), topicDAO(), chartDAO()); + dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO()); addUsage(entityType, id, usage); } @Transaction public void createByName(String entityType, String fullyQualifiedName, DailyCount usage) throws IOException { EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fullyQualifiedName, tableDAO(), - databaseDAO(), metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO()); + databaseDAO(), metricsDAO(), reportDAO(), topicDAO(), chartDAO(), dashboardDAO(), taskDAO()); addUsage(entityType, ref.getId().toString(), usage); LOG.info("Usage successfully posted by name"); } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java index cfe31e49f74..a7c2cc0f8c4 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java @@ -27,6 +27,7 @@ import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO; +import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; import org.openmetadata.catalog.resources.teams.UserResource; import org.openmetadata.catalog.resources.teams.UserResource.UserList; import org.openmetadata.catalog.type.EntityReference; @@ -101,6 +102,9 @@ public abstract class UserRepository { @CreateSqlObject abstract ChartDAO chartDAO(); + @CreateSqlObject + abstract TaskDAO taskDAO(); + @Transaction public UserList listAfter(Fields fields, int limitParam, String after) throws IOException, ParseException, GeneralSecurityException { @@ -235,12 +239,12 @@ public abstract class UserRepository { } // Populate details in entity reference return EntityUtil.getEntityReference(ownedEntities, tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), - reportDAO(), topicDAO(), chartDAO()); + reportDAO(), topicDAO(), chartDAO(), taskDAO()); } private List getFollows(User user) throws IOException { return EntityUtil.getEntityReference(relationshipDAO().findTo(user.getId().toString(), FOLLOWS.ordinal()), - tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO()); + tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO(), chartDAO(), taskDAO()); } private void patchTeams(User original, User updated) throws IOException { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java index 17916a9e346..f62c9d75e7a 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/pipelines/PipelineResource.java @@ -54,6 +54,7 @@ import java.util.UUID; @Consumes(MediaType.APPLICATION_JSON) @Collection(name = "pipelines", repositoryClass = "org.openmetadata.catalog.jdbi3.PipelineRepository") public class PipelineResource { + public static final String COLLECTION_PATH = "/v1/pipelines/"; private final List attributes = RestUtil.getAttributes(Pipeline.class); private final List relationships = RestUtil.getAttributes(Pipeline.class); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java index d9a1792c7e0..2b97ded5877 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java @@ -111,7 +111,7 @@ public class MessagingServiceResource { responses = { @ApiResponse(responseCode = "200", description = "Messaging service instance", content = @Content(mediaType = "application/json", - schema = @Schema(implementation = Dashboard.class))), + schema = @Schema(implementation = MessagingService.class))), @ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found") }) public MessagingService get(@Context UriInfo uriInfo, @@ -184,17 +184,17 @@ public class MessagingServiceResource { @DELETE @Path("/{id}") - @Operation(summary = "Delete a database service", tags = "services", - description = "Delete a database services. If databases (and tables) belong the service, it can't be " + + @Operation(summary = "Delete a messaging service", tags = "services", + description = "Delete a messaing services. If topics belong the service, it can't be " + "deleted.", responses = { @ApiResponse(responseCode = "200", description = "OK"), - @ApiResponse(responseCode = "404", description = "DatabaseService service for instance {id} " + + @ApiResponse(responseCode = "404", description = "MessagingService service for instance {id} " + "is not found") }) public Response delete(@Context UriInfo uriInfo, @Context SecurityContext securityContext, - @Parameter(description = "Id of the database service", schema = @Schema(type = "string")) + @Parameter(description = "Id of the messaging service", schema = @Schema(type = "string")) @PathParam("id") String id) { SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); dao.delete(id); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java index c7f35a96cc2..0b6c88d8e66 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/pipeline/PipelineServiceResource.java @@ -25,10 +25,14 @@ import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; import org.openmetadata.catalog.api.services.CreateMessagingService; +import org.openmetadata.catalog.api.services.CreatePipelineService; import org.openmetadata.catalog.api.services.UpdateMessagingService; +import org.openmetadata.catalog.api.services.UpdatePipelineService; import org.openmetadata.catalog.entity.data.Dashboard; import org.openmetadata.catalog.entity.services.MessagingService; +import org.openmetadata.catalog.entity.services.PipelineService; import org.openmetadata.catalog.jdbi3.MessagingServiceRepository; +import org.openmetadata.catalog.jdbi3.PipelineServiceRepository; import org.openmetadata.catalog.resources.Collection; import org.openmetadata.catalog.security.CatalogAuthorizer; import org.openmetadata.catalog.security.SecurityUtil; @@ -60,61 +64,62 @@ import java.util.UUID; @Api(value = "Pipeline service collection", tags = "Services -> Pipeline service collection") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -@Collection(name = "messagingServices", repositoryClass = "org.openmetadata.catalog.jdbi3.MessagingServiceRepository") +@Collection(name = "pipelineServices", repositoryClass = "org.openmetadata.catalog.jdbi3.PipelineServiceRepository") public class PipelineServiceResource { - private final MessagingServiceRepository dao; + private final PipelineServiceRepository dao; private final CatalogAuthorizer authorizer; public static EntityReference addHref(UriInfo uriInfo, EntityReference service) { - return service.withHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", service.getId())); + return service.withHref(RestUtil.getHref(uriInfo, "v1/services/pipelineServices/", service.getId())); } - private static List addHref(UriInfo uriInfo, List instances) { + private static List addHref(UriInfo uriInfo, List instances) { instances.forEach(i -> addHref(uriInfo, i)); return instances; } - private static MessagingService addHref(UriInfo uriInfo, MessagingService dbService) { - dbService.setHref(RestUtil.getHref(uriInfo, "v1/services/messagingServices/", dbService.getId())); - return dbService; + private static PipelineService addHref(UriInfo uriInfo, PipelineService pipelineService) { + pipelineService.setHref(RestUtil.getHref(uriInfo, "v1/services/pipelineServices/", + pipelineService.getId())); + return pipelineService; } @Inject - public PipelineServiceResource(MessagingServiceRepository dao, CatalogAuthorizer authorizer) { - Objects.requireNonNull(dao, "MessagingServiceRepository must not be null"); + public PipelineServiceResource(PipelineServiceRepository dao, CatalogAuthorizer authorizer) { + Objects.requireNonNull(dao, "PipelineServiceRepository must not be null"); this.dao = dao; this.authorizer = authorizer; } - static class MessagingServiceList extends ResultList { - MessagingServiceList(List data) { + static class PipelineServiceList extends ResultList { + PipelineServiceList(List data) { super(data); } } @GET - @Operation(summary = "List messaging services", tags = "services", - description = "Get a list of messaging services.", + @Operation(summary = "List Pipeline services", tags = "services", + description = "Get a list of pipeline services.", responses = { - @ApiResponse(responseCode = "200", description = "List of messaging service instances", + @ApiResponse(responseCode = "200", description = "List of pipeline service instances", content = @Content(mediaType = "application/json", - schema = @Schema(implementation = MessagingServiceList.class))) + schema = @Schema(implementation = PipelineServiceList.class))) }) - public MessagingServiceList list(@Context UriInfo uriInfo, @QueryParam("name") String name) throws IOException { - return new MessagingServiceList(addHref(uriInfo, dao.list(name))); + public PipelineServiceList list(@Context UriInfo uriInfo, @QueryParam("name") String name) throws IOException { + return new PipelineServiceList(addHref(uriInfo, dao.list(name))); } @GET @Path("/{id}") - @Operation(summary = "Get a messaging service", tags = "services", - description = "Get a messaging service by `id`.", + @Operation(summary = "Get a pipeline service", tags = "services", + description = "Get a pipeline service by `id`.", responses = { - @ApiResponse(responseCode = "200", description = "Messaging service instance", + @ApiResponse(responseCode = "200", description = "Pipeline service instance", content = @Content(mediaType = "application/json", - schema = @Schema(implementation = Dashboard.class))), - @ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found") + schema = @Schema(implementation = PipelineService.class))), + @ApiResponse(responseCode = "404", description = "Pipeline service for instance {id} is not found") }) - public MessagingService get(@Context UriInfo uriInfo, + public PipelineService get(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("id") String id) throws IOException { return addHref(uriInfo, dao.get(id)); @@ -130,7 +135,7 @@ public class PipelineServiceResource { schema = @Schema(implementation = Dashboard.class))), @ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found") }) - public MessagingService getByName(@Context UriInfo uriInfo, + public PipelineService getByName(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("name") String name) throws IOException { return addHref(uriInfo, dao.getByName(name)); @@ -147,54 +152,51 @@ public class PipelineServiceResource { }) public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext, - @Valid CreateMessagingService create) throws JsonProcessingException { + @Valid CreatePipelineService create) throws JsonProcessingException { SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); - MessagingService service = new MessagingService().withId(UUID.randomUUID()) + PipelineService service = new PipelineService().withId(UUID.randomUUID()) .withName(create.getName()).withDescription(create.getDescription()) .withServiceType(create.getServiceType()) - .withBrokers(create.getBrokers()) - .withSchemaRegistry(create.getSchemaRegistry()) + .withUrl(create.getUrl()) .withIngestionSchedule(create.getIngestionSchedule()); - addHref(uriInfo, dao.create(service)); return Response.created(service.getHref()).entity(service).build(); } @PUT @Path("/{id}") - @Operation(summary = "Update a messaging service", tags = "services", - description = "Update an existing messaging service identified by `id`.", + @Operation(summary = "Update a pipeline service", tags = "services", + description = "Update an existing pipeline service identified by `id`.", responses = { - @ApiResponse(responseCode = "200", description = "Messaging service instance", + @ApiResponse(responseCode = "200", description = "Pipeline service instance", content = @Content(mediaType = "application/json", - schema = @Schema(implementation = MessagingService.class))), + schema = @Schema(implementation = PipelineService.class))), @ApiResponse(responseCode = "400", description = "Bad request") }) public Response update(@Context UriInfo uriInfo, @Context SecurityContext securityContext, - @Parameter(description = "Id of the messaging service", schema = @Schema(type = "string")) + @Parameter(description = "Id of the pipeline service", schema = @Schema(type = "string")) @PathParam("id") String id, - @Valid UpdateMessagingService update) throws IOException { + @Valid UpdatePipelineService update) throws IOException { SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); - MessagingService service = addHref(uriInfo, - dao.update(id, update.getDescription(), update.getBrokers(), update.getSchemaRegistry(), - update.getIngestionSchedule())); + PipelineService service = addHref(uriInfo, + dao.update(id, update.getDescription(), update.getUrl(), update.getIngestionSchedule())); return Response.ok(service).build(); } @DELETE @Path("/{id}") - @Operation(summary = "Delete a database service", tags = "services", - description = "Delete a database services. If databases (and tables) belong the service, it can't be " + + @Operation(summary = "Delete a pipeline service", tags = "services", + description = "Delete a pipeline services. If pipelines (and tasks) belong to the service, it can't be " + "deleted.", responses = { @ApiResponse(responseCode = "200", description = "OK"), - @ApiResponse(responseCode = "404", description = "DatabaseService service for instance {id} " + + @ApiResponse(responseCode = "404", description = "Pipeline service for instance {id} " + "is not found") }) public Response delete(@Context UriInfo uriInfo, @Context SecurityContext securityContext, - @Parameter(description = "Id of the database service", schema = @Schema(type = "string")) + @Parameter(description = "Id of the pipeline service", schema = @Schema(type = "string")) @PathParam("id") String id) { SecurityUtil.checkAdminOrBotRole(authorizer, securityContext); dao.delete(id); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java index f4ac3ae84cd..d9bd221e0de 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/tasks/TaskResource.java @@ -123,7 +123,7 @@ public class TaskResource { } } - static final String FIELDS = "owner,service,followers,tags"; + static final String FIELDS = "taskConfig,owner,service,followers,tags"; public static final List FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "") .split(",")); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index dbdb155fb0a..16825a946f8 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -249,14 +249,15 @@ public final class EntityUtil { TopicDAO topicDAO, ChartDAO chartDAO, TaskDAO taskDAO) throws IOException { for (EntityReference ref : list) { - getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO, chartDAO); + getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO, chartDAO, taskDAO); } return list; } public static EntityReference getEntityReference(EntityReference ref, TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, DashboardDAO dashboardDAO, - ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO) + ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO, + TaskDAO taskDAO) throws IOException { // Note href to entity reference is not added here String entity = ref.getType(); @@ -282,22 +283,27 @@ public final class EntityUtil { } else if (entity.equalsIgnoreCase(Entity.CHART)) { Chart instance = EntityUtil.validate(id, chartDAO.findById(id), Chart.class); return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } else if (entity.equalsIgnoreCase(Entity.TASK)) { + Task instance = EntityUtil.validate(id, taskDAO.findById(id), Task.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); } throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entity)); } public static EntityReference getEntityReference(String entity, UUID id, TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, DashboardDAO dashboardDAO, - ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO) + ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO, + TaskDAO taskDAO) throws IOException { EntityReference ref = new EntityReference().withId(id).withType(entity); - return getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO, chartDAO); + return getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, + reportDAO, topicDAO, chartDAO, taskDAO); } public static EntityReference getEntityReferenceByName(String entity, String fqn, TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO, - DashboardDAO dashboardDAO) + DashboardDAO dashboardDAO, TaskDAO taskDAO) throws IOException { if (entity.equalsIgnoreCase(Entity.TABLE)) { Table instance = EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class); @@ -327,6 +333,10 @@ public final class EntityUtil { Dashboard instance = EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class); return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.DASHBOARD) .withDescription(instance.getDescription()); + } else if (entity.equalsIgnoreCase(Entity.TASK)) { + Task instance = EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class); + return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.TASK) + .withDescription(instance.getDescription()); } throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn)); } @@ -362,6 +372,9 @@ public final class EntityUtil { } else if (clazz.toString().toLowerCase().endsWith(Entity.DASHBOARD.toLowerCase())) { Dashboard instance = (Dashboard) entity; return getEntityReference(instance); + } else if (clazz.toString().toLowerCase().endsWith(Entity.TASK.toLowerCase())) { + Task instance = (Task) entity; + return getEntityReference(instance); } else if (clazz.toString().toLowerCase().endsWith(Entity.MESSAGING_SERVICE.toLowerCase())) { MessagingService instance = (MessagingService) entity; return getEntityReference(instance); diff --git a/catalog-rest-service/src/main/resources/json/schema/api/services/createPipelineService.json b/catalog-rest-service/src/main/resources/json/schema/api/services/createPipelineService.json new file mode 100644 index 00000000000..d3e50c4df52 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/api/services/createPipelineService.json @@ -0,0 +1,36 @@ +{ + "$id": "https://open-metadata.org/schema/api/services/createPipelineService.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Create Pipeline service entity request", + "description": "Create Pipeline service entity request", + "type": "object", + "properties": { + "name": { + "description": "Name that identifies the this entity instance uniquely", + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "description": { + "description": "Description of pipeline service entity.", + "type": "string" + }, + "serviceType": { + "$ref": "../../entity/services/pipelineService.json#/definitions/pipelineServiceType" + }, + "url": { + "description": "Pipeline UI URL", + "type": "string", + "format": "uri" + }, + "ingestionSchedule": { + "description": "Schedule for running pipeline ingestion jobs", + "$ref": "../../type/schedule.json" + } + }, + "required": [ + "name", + "serviceType", + "url" + ] +} diff --git a/catalog-rest-service/src/main/resources/json/schema/api/services/updatePipelineService.json b/catalog-rest-service/src/main/resources/json/schema/api/services/updatePipelineService.json new file mode 100644 index 00000000000..8d0518c4db3 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/api/services/updatePipelineService.json @@ -0,0 +1,23 @@ +{ + "$id": "https://open-metadata.org/schema/api/services/updatePipelineService.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Update pipeline service entity request", + "description": "Update pipeline service entity request", + "type": "object", + + "properties" : { + "description": { + "description": "Description of Pipeline service entity.", + "type": "string" + }, + "url" : { + "description": "Pipeline Service UI URL.", + "type": "string", + "format": "uri" + }, + "ingestionSchedule" : { + "description": "Schedule for running metadata ingestion jobs", + "$ref" : "../../type/schedule.json" + } + } +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json index bb4e5c7a379..d5e5775a853 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json @@ -41,6 +41,11 @@ "description": "Description of a messaging service instance.", "type": "string" }, + "url": { + "description": "Pipeline Service Management/UI URL", + "type": "string", + "format": "uri" + }, "ingestionSchedule": { "description": "Schedule for running metadata ingestion jobs.", "$ref": "../../type/schedule.json" diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/PipelineServiceResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/PipelineServiceResourceTest.java new file mode 100644 index 00000000000..3179cc08715 --- /dev/null +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/PipelineServiceResourceTest.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.resources.services; + +import org.apache.http.client.HttpResponseException; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.openmetadata.catalog.CatalogApplicationTest; +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.api.services.CreatePipelineService; +import org.openmetadata.catalog.api.services.UpdatePipelineService; +import org.openmetadata.catalog.entity.services.PipelineService; +import org.openmetadata.catalog.exception.CatalogExceptionMessage; +import org.openmetadata.catalog.type.Schedule; +import org.openmetadata.catalog.util.RestUtil; +import org.openmetadata.catalog.util.TestUtils; + +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response.Status; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static javax.ws.rs.core.Response.Status.BAD_REQUEST; +import static javax.ws.rs.core.Response.Status.CONFLICT; +import static javax.ws.rs.core.Response.Status.FORBIDDEN; +import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static javax.ws.rs.core.Response.Status.OK; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders; +import static org.openmetadata.catalog.util.TestUtils.authHeaders; + +public class PipelineServiceResourceTest extends CatalogApplicationTest { + + public static URI PIPELINE_SERVICE_URL; + + @BeforeAll + public static void setup(TestInfo test) throws URISyntaxException { + PIPELINE_SERVICE_URL = new URI("http://localhost:8080"); + } + + @Test + public void post_serviceWithLongName_400_badRequest(TestInfo test) { + // Create pipeline with mandatory name field empty + CreatePipelineService create = create(test).withName(TestUtils.LONG_ENTITY_NAME); + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]"); + } + + @Test + public void post_withoutRequiredFields_400_badRequest(TestInfo test) { + // Create pipeline with mandatory name field null + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createService(create(test).withName(null), adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[name must not be null]"); + + // Create pipeline with mandatory name field empty + exception = assertThrows(HttpResponseException.class, () -> + createService(create(test).withName(""), adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[name size must be between 1 and 64]"); + + // Create pipeline with mandatory serviceType field empty + exception = assertThrows(HttpResponseException.class, () -> + createService(create(test).withServiceType(null), adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[serviceType must not be null]"); + + // Create pipeline with mandatory brokers field empty + exception = assertThrows(HttpResponseException.class, () -> + createService(create(test).withUrl(null), adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "[url must not be null]"); + } + + @Test + public void post_serviceAlreadyExists_409(TestInfo test) throws HttpResponseException { + CreatePipelineService create = create(test); + createService(create, adminAuthHeaders()); + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, CONFLICT, CatalogExceptionMessage.ENTITY_ALREADY_EXISTS); + } + + @Test + public void post_validService_as_admin_200_ok(TestInfo test) throws HttpResponseException { + // Create pipeline service with different optional fields + Map authHeaders = adminAuthHeaders(); + createAndCheckService(create(test, 1).withDescription(null), authHeaders); + createAndCheckService(create(test, 2).withDescription("description"), authHeaders); + createAndCheckService(create(test, 3).withIngestionSchedule(null), authHeaders); + } + + @Test + public void post_validService_as_non_admin_401(TestInfo test) { + // Create pipeline service with different optional fields + Map authHeaders = authHeaders("test@open-metadata.org"); + + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createAndCheckService(create(test, 1).withDescription(null), authHeaders)); + TestUtils.assertResponse(exception, FORBIDDEN, + "Principal: CatalogPrincipal{name='test'} is not admin"); + } + + @Test + public void post_invalidIngestionSchedule_4xx(TestInfo test) { + // No jdbc connection set + CreatePipelineService create = create(test); + Schedule schedule = create.getIngestionSchedule(); + + // Invalid format + create.withIngestionSchedule(schedule.withRepeatFrequency("INVALID")); + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "Invalid ingestion repeatFrequency INVALID"); + + // Duration that contains years, months and seconds are not allowed + create.withIngestionSchedule(schedule.withRepeatFrequency("P1Y")); + exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, + "Ingestion repeatFrequency can only contain Days, Hours, " + + "and Minutes - example P{d}DT{h}H{m}M"); + + create.withIngestionSchedule(schedule.withRepeatFrequency("P1M")); + exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, + "Ingestion repeatFrequency can only contain Days, Hours, " + + "and Minutes - example P{d}DT{h}H{m}M"); + + create.withIngestionSchedule(schedule.withRepeatFrequency("PT1S")); + exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, + "Ingestion repeatFrequency can only contain Days, Hours, " + + "and Minutes - example P{d}DT{h}H{m}M"); + } + + @Test + public void post_validIngestionSchedules_as_admin_200(TestInfo test) throws HttpResponseException { + Schedule schedule = new Schedule().withStartDate(new Date()); + schedule.withRepeatFrequency("PT60M"); // Repeat every 60M should be valid + createAndCheckService(create(test, 1).withIngestionSchedule(schedule), adminAuthHeaders()); + + schedule.withRepeatFrequency("PT1H49M"); + createAndCheckService(create(test, 2).withIngestionSchedule(schedule), adminAuthHeaders()); + + schedule.withRepeatFrequency("P1DT1H49M"); + createAndCheckService(create(test, 3).withIngestionSchedule(schedule), adminAuthHeaders()); + } + + @Test + public void post_ingestionScheduleIsTooShort_4xx(TestInfo test) { + // No jdbc connection set + CreatePipelineService create = create(test); + Schedule schedule = create.getIngestionSchedule(); + create.withIngestionSchedule(schedule.withRepeatFrequency("PT1M")); // Repeat every 0 seconds + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + createService(create, adminAuthHeaders())); + TestUtils.assertResponseContains(exception, BAD_REQUEST, + "Ingestion repeatFrequency is too short and must be more than 60 minutes"); + + create.withIngestionSchedule(schedule.withRepeatFrequency("PT59M")); // Repeat every 50 minutes 59 seconds + exception = assertThrows(HttpResponseException.class, () -> createService(create, adminAuthHeaders())); + TestUtils.assertResponse(exception, BAD_REQUEST, "Ingestion repeatFrequency is too short and must " + + "be more than 60 minutes"); + } + + @Test + public void put_updateNonExistentService_404() { + // Update pipeline description and ingestion service that are null + UpdatePipelineService update = new UpdatePipelineService().withDescription("description1"); + HttpResponseException exception = assertThrows(HttpResponseException.class, () + -> updatePipelineService(TestUtils.NON_EXISTENT_ENTITY.toString(), update, OK, adminAuthHeaders())); + TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound("PipelineService", + TestUtils.NON_EXISTENT_ENTITY)); + } + + @Test + public void put_updateService_as_admin_2xx(TestInfo test) throws HttpResponseException, URISyntaxException { + PipelineService dbService = createAndCheckService(create(test).withDescription(null).withIngestionSchedule(null) + .withUrl(PIPELINE_SERVICE_URL), adminAuthHeaders()); + String id = dbService.getId().toString(); + + // Update pipeline description and ingestion service that are null + UpdatePipelineService update = new UpdatePipelineService().withDescription("description1"); + updateAndCheckService(id, update, OK, adminAuthHeaders()); + // Update ingestion schedule + Schedule schedule = new Schedule().withStartDate(new Date()).withRepeatFrequency("P1D"); + update.withIngestionSchedule(schedule); + updateAndCheckService(id, update, OK, adminAuthHeaders()); + + // Update description and ingestion schedule again + update.withDescription("description1").withIngestionSchedule(schedule.withRepeatFrequency("PT1H")); + updateAndCheckService(id, update, OK, adminAuthHeaders()); + + // update broker list and schema registry + update.withUrl(new URI("http://localhost:9000")); + updateAndCheckService(id, update, OK, adminAuthHeaders()); + PipelineService updatedService = getService(dbService.getId(), adminAuthHeaders()); + validatePipelineServiceConfig(updatedService, List.of("localhost:0"), new URI("http://localhost:9000")); + } + + @Test + public void put_update_as_non_admin_401(TestInfo test) throws HttpResponseException { + Map authHeaders = adminAuthHeaders(); + PipelineService pipelineService = createAndCheckService(create(test).withDescription(null) + .withIngestionSchedule(null), + authHeaders); + String id = pipelineService.getId().toString(); + RestUtil.DATE_TIME_FORMAT.format(new Date()); + + // Update pipeline description and ingestion service that are null + UpdatePipelineService update = new UpdatePipelineService().withDescription("description1"); + + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + updateAndCheckService(id, update, OK, authHeaders("test@open-metadata.org"))); + TestUtils.assertResponse(exception, FORBIDDEN, "Principal: CatalogPrincipal{name='test'} " + + "is not admin"); + } + + @Test + public void get_nonExistentService_404_notFound() { + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + getService(TestUtils.NON_EXISTENT_ENTITY, adminAuthHeaders())); + TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.PIPELINE_SERVICE, + TestUtils.NON_EXISTENT_ENTITY)); + } + + @Test + public void get_nonExistentServiceByName_404_notFound() { + HttpResponseException exception = assertThrows(HttpResponseException.class, () + -> getServiceByName("invalidName", null, adminAuthHeaders())); + TestUtils.assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound(Entity.PIPELINE_SERVICE, + "invalidName")); + } + + public static PipelineService createAndCheckService(CreatePipelineService create, + Map authHeaders) throws HttpResponseException { + PipelineService service = createService(create, authHeaders); + validateService(service, create.getName(), create.getDescription(), create.getIngestionSchedule()); + + // GET the newly created service and validate + PipelineService getService = getService(service.getId(), authHeaders); + validateService(getService, create.getName(), create.getDescription(), create.getIngestionSchedule()); + + // GET the newly created service by name and validate + getService = getServiceByName(service.getName(), null, authHeaders); + validateService(getService, create.getName(), create.getDescription(), create.getIngestionSchedule()); + return service; + } + + public static PipelineService createService(CreatePipelineService create, + Map authHeaders) throws HttpResponseException { + return TestUtils.post(CatalogApplicationTest.getResource("services/pipelineServices"), + create, PipelineService.class, authHeaders); + } + + private static void validateService(PipelineService service, String expectedName, String expectedDescription, + Schedule expectedIngestion) { + assertNotNull(service.getId()); + assertNotNull(service.getHref()); + assertEquals(expectedName, service.getName()); + assertEquals(expectedDescription, service.getDescription()); + + if (expectedIngestion != null) { + assertEquals(expectedIngestion.getStartDate(), service.getIngestionSchedule().getStartDate()); + assertEquals(expectedIngestion.getRepeatFrequency(), service.getIngestionSchedule().getRepeatFrequency()); + } + } + + public static PipelineService getService(UUID id, Map authHeaders) throws HttpResponseException { + return getService(id, null, authHeaders); + } + + public static PipelineService getService(UUID id, String fields, Map authHeaders) + throws HttpResponseException { + WebTarget target = CatalogApplicationTest.getResource("services/pipelineServices/" + id); + target = fields != null ? target.queryParam("fields", fields) : target; + return TestUtils.get(target, PipelineService.class, authHeaders); + } + + public static PipelineService getServiceByName(String name, String fields, Map authHeaders) + throws HttpResponseException { + WebTarget target = CatalogApplicationTest.getResource("services/pipelineServices/name/" + name); + target = fields != null ? target.queryParam("fields", fields) : target; + return TestUtils.get(target, PipelineService.class, authHeaders); + } + + public static String getName(TestInfo test) { + return String.format("pservice_%s", test.getDisplayName()); + } + + public static String getName(TestInfo test, int index) { + return String.format("pservice_%d_%s", index, test.getDisplayName()); + } + + @Test + public void delete_ExistentPipelineService_as_admin_200(TestInfo test) throws HttpResponseException { + Map authHeaders = adminAuthHeaders(); + PipelineService pipelineService = createService(create(test), authHeaders); + deleteService(pipelineService.getId(), pipelineService.getName(), authHeaders); + } + + @Test + public void delete_as_user_401(TestInfo test) throws HttpResponseException { + Map authHeaders = adminAuthHeaders(); + PipelineService pipelineService = createService(create(test), authHeaders); + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + deleteService(pipelineService.getId(), pipelineService.getName(), + authHeaders("test@open-metadata.org"))); + TestUtils.assertResponse(exception, FORBIDDEN, + "Principal: CatalogPrincipal{name='test'} is not admin"); + } + + @Test + public void delete_notExistentPipelineService() { + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> + getService(TestUtils.NON_EXISTENT_ENTITY, adminAuthHeaders())); + TestUtils.assertResponse(exception, NOT_FOUND, + CatalogExceptionMessage.entityNotFound(Entity.PIPELINE_SERVICE, TestUtils.NON_EXISTENT_ENTITY)); + } + + private void deleteService(UUID id, String name, Map authHeaders) throws HttpResponseException { + TestUtils.delete(CatalogApplicationTest.getResource("services/pipelineServices/" + id), authHeaders); + + // Ensure deleted service does not exist + HttpResponseException exception = assertThrows(HttpResponseException.class, () -> getService(id, authHeaders)); + TestUtils.assertResponse(exception, NOT_FOUND, + CatalogExceptionMessage.entityNotFound(Entity.PIPELINE_SERVICE, id)); + + // Ensure deleted service does not exist when getting by name + exception = assertThrows(HttpResponseException.class, () -> getServiceByName(name, null, authHeaders)); + TestUtils.assertResponse(exception, NOT_FOUND, + CatalogExceptionMessage.entityNotFound(Entity.PIPELINE_SERVICE, name)); + } + + public static CreatePipelineService create(TestInfo test) { + return new CreatePipelineService().withName(getName(test)) + .withServiceType(CreatePipelineService.PipelineServiceType.Airflow) + .withUrl(PIPELINE_SERVICE_URL) + .withIngestionSchedule(new Schedule().withStartDate(new Date()).withRepeatFrequency("P1D")); + } + + private static CreatePipelineService create(TestInfo test, int index) { + return new CreatePipelineService().withName(getName(test, index)) + .withServiceType(CreatePipelineService.PipelineServiceType.Airflow) + .withUrl(PIPELINE_SERVICE_URL) + .withIngestionSchedule(new Schedule().withStartDate(new Date()).withRepeatFrequency("P1D")); + } + + public static void updateAndCheckService(String id, UpdatePipelineService update, Status status, + Map authHeaders) throws HttpResponseException { + PipelineService service = updatePipelineService(id, update, status, authHeaders); + validateService(service, service.getName(), update.getDescription(), update.getIngestionSchedule()); + + // GET the newly updated pipeline and validate + PipelineService getService = getService(service.getId(), authHeaders); + validateService(getService, service.getName(), update.getDescription(), update.getIngestionSchedule()); + + // GET the newly updated pipeline by name and validate + getService = getServiceByName(service.getName(), null, authHeaders); + validateService(getService, service.getName(), update.getDescription(), update.getIngestionSchedule()); + } + + public static PipelineService updatePipelineService(String id, UpdatePipelineService updated, + Status status, Map authHeaders) + throws HttpResponseException { + return TestUtils.put(CatalogApplicationTest.getResource("services/pipelineServices/" + id), updated, + PipelineService.class, status, authHeaders); + } + + private static void validatePipelineServiceConfig(PipelineService actualService, List expectedBrokers, + URI expectedUrl) + throws HttpResponseException { + assertEquals(actualService.getUrl(), expectedUrl); + } +}