Fix #579: Address review comments

This commit is contained in:
Sriharsha Chintalapani 2021-09-25 09:20:14 -07:00
parent ad07e6bd57
commit e997c395a3
11 changed files with 70 additions and 142 deletions

View File

@ -236,10 +236,7 @@ public abstract class ChartRepository {
}
public EntityReference getOwner(Chart chart) throws IOException {
if (chart == null) {
return null;
}
return EntityUtil.populateOwner(chart.getId(), relationshipDAO(), userDAO(), teamDAO());
return chart != null ? EntityUtil.populateOwner(chart.getId(), relationshipDAO(), userDAO(), teamDAO()) : null;
}
private void setOwner(Chart chart, EntityReference owner) {

View File

@ -17,14 +17,13 @@
package org.openmetadata.catalog.jdbi3;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.joda.time.Period;
import org.joda.time.format.ISOPeriodFormat;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.DashboardService;
import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.type.Schedule;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.Utils;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
@ -67,7 +66,7 @@ public abstract class DashboardServiceRepository {
@Transaction
public DashboardService create(DashboardService dashboardService) throws JsonProcessingException {
// Validate fields
validateIngestionSchedule(dashboardService.getIngestionSchedule());
Utils.validateIngestionSchedule(dashboardService.getIngestionSchedule());
dashboardServiceDAO().insert(JsonUtils.pojoToJson(dashboardService));
return dashboardService;
}
@ -75,7 +74,7 @@ public abstract class DashboardServiceRepository {
public DashboardService update(String id, String description, URI dashboardUrl, String username, String password,
Schedule ingestionSchedule)
throws IOException {
validateIngestionSchedule(ingestionSchedule);
Utils.validateIngestionSchedule(ingestionSchedule);
DashboardService dashboardService = EntityUtil.validate(id, dashboardServiceDAO().findById(id),
DashboardService.class);
// Update fields
@ -93,31 +92,6 @@ public abstract class DashboardServiceRepository {
relationshipDAO().deleteAll(id);
}
private void validateIngestionSchedule(Schedule ingestion) {
if (ingestion == null) {
return;
}
String duration = ingestion.getRepeatFrequency();
// ISO8601 duration format is P{y}Y{m}M{d}DT{h}H{m}M{s}S.
String[] splits = duration.split("T");
if (splits[0].contains("Y") || splits[0].contains("M") ||
(splits.length == 2 && splits[1].contains("S"))) {
throw new IllegalArgumentException("Ingestion repeatFrequency can only contain Days, Hours, and Minutes - " +
"example P{d}DT{h}H{m}M");
}
Period period;
try {
period = ISOPeriodFormat.standard().parsePeriod(duration);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid ingestion repeatFrequency " + duration, e);
}
if (period.toStandardMinutes().getMinutes() < 60) {
throw new IllegalArgumentException("Ingestion repeatFrequency is too short and must be more than 60 minutes");
}
}
public interface DashboardServiceDAO {
@SqlUpdate("INSERT INTO dashboard_service_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);

View File

@ -244,10 +244,8 @@ public abstract class DatabaseRepository {
}
public EntityReference getOwner(Database database) throws IOException {
if (database == null) {
return null;
}
return EntityUtil.populateOwner(database.getId(), relationshipDAO(), userDAO(), teamDAO());
return database != null ? EntityUtil.populateOwner(database.getId(), relationshipDAO(), userDAO(), teamDAO())
: null;
}
private void setOwner(Database database, EntityReference owner) {

View File

@ -26,6 +26,7 @@ import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.JsonUtils;
import org.joda.time.Period;
import org.joda.time.format.ISOPeriodFormat;
import org.openmetadata.catalog.util.Utils;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
@ -67,14 +68,14 @@ public abstract class DatabaseServiceRepository {
@Transaction
public DatabaseService create(DatabaseService databaseService) throws JsonProcessingException {
// Validate fields
validateIngestionSchedule(databaseService.getIngestionSchedule());
Utils.validateIngestionSchedule(databaseService.getIngestionSchedule());
dbServiceDAO().insert(JsonUtils.pojoToJson(databaseService));
return databaseService;
}
public DatabaseService update(String id, String description, JdbcInfo jdbc, Schedule ingestionSchedule)
throws IOException {
validateIngestionSchedule(ingestionSchedule);
Utils.validateIngestionSchedule(ingestionSchedule);
DatabaseService dbService = EntityUtil.validate(id, dbServiceDAO().findById(id), DatabaseService.class);
// Update fields
dbService.withDescription(description).withJdbc((jdbc)).withIngestionSchedule(ingestionSchedule);
@ -90,31 +91,6 @@ public abstract class DatabaseServiceRepository {
relationshipDAO().deleteAll(id);
}
private void validateIngestionSchedule(Schedule ingestion) {
if (ingestion == null) {
return;
}
String duration = ingestion.getRepeatFrequency();
// ISO8601 duration format is P{y}Y{m}M{d}DT{h}H{m}M{s}S.
String[] splits = duration.split("T");
if (splits[0].contains("Y") || splits[0].contains("M") ||
(splits.length == 2 && splits[1].contains("S"))) {
throw new IllegalArgumentException("Ingestion repeatFrequency can only contain Days, Hours, and Minutes - " +
"example P{d}DT{h}H{m}M");
}
Period period;
try {
period = ISOPeriodFormat.standard().parsePeriod(duration);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid ingestion repeatFrequency " + duration, e);
}
if (period.toStandardMinutes().getMinutes() < 60) {
throw new IllegalArgumentException("Ingestion repeatFrequency is too short and must be more than 60 minutes");
}
}
public interface DatabaseServiceDAO {
@SqlUpdate("INSERT INTO dbservice_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);

View File

@ -17,14 +17,13 @@
package org.openmetadata.catalog.jdbi3;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.joda.time.Period;
import org.joda.time.format.ISOPeriodFormat;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.type.Schedule;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.Utils;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
@ -67,7 +66,7 @@ public abstract class MessagingServiceRepository {
@Transaction
public MessagingService create(MessagingService messagingService) throws JsonProcessingException {
// Validate fields
validateIngestionSchedule(messagingService.getIngestionSchedule());
Utils.validateIngestionSchedule(messagingService.getIngestionSchedule());
messagingServiceDAO().insert(JsonUtils.pojoToJson(messagingService));
return messagingService;
}
@ -75,7 +74,7 @@ public abstract class MessagingServiceRepository {
public MessagingService update(String id, String description, List<String> brokers, URI schemaRegistry,
Schedule ingestionSchedule)
throws IOException {
validateIngestionSchedule(ingestionSchedule);
Utils.validateIngestionSchedule(ingestionSchedule);
MessagingService dbService = EntityUtil.validate(id, messagingServiceDAO().findById(id), MessagingService.class);
// Update fields
dbService.withDescription(description).withIngestionSchedule(ingestionSchedule)
@ -92,31 +91,6 @@ public abstract class MessagingServiceRepository {
relationshipDAO().deleteAll(id);
}
private void validateIngestionSchedule(Schedule ingestion) {
if (ingestion == null) {
return;
}
String duration = ingestion.getRepeatFrequency();
// ISO8601 duration format is P{y}Y{m}M{d}DT{h}H{m}M{s}S.
String[] splits = duration.split("T");
if (splits[0].contains("Y") || splits[0].contains("M") ||
(splits.length == 2 && splits[1].contains("S"))) {
throw new IllegalArgumentException("Ingestion repeatFrequency can only contain Days, Hours, and Minutes - " +
"example P{d}DT{h}H{m}M");
}
Period period;
try {
period = ISOPeriodFormat.standard().parsePeriod(duration);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid ingestion repeatFrequency " + duration, e);
}
if (period.toStandardMinutes().getMinutes() < 60) {
throw new IllegalArgumentException("Ingestion repeatFrequency is too short and must be more than 60 minutes");
}
}
public interface MessagingServiceDAO {
@SqlUpdate("INSERT INTO messaging_service_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);

View File

@ -17,14 +17,13 @@
package org.openmetadata.catalog.jdbi3;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.joda.time.Period;
import org.joda.time.format.ISOPeriodFormat;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.type.Schedule;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.Utils;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
@ -67,15 +66,16 @@ public abstract class PipelineServiceRepository {
@Transaction
public PipelineService create(PipelineService pipelineService) throws JsonProcessingException {
// Validate fields
validateIngestionSchedule(pipelineService.getIngestionSchedule());
Utils.validateIngestionSchedule(pipelineService.getIngestionSchedule());
pipelineServiceDAO().insert(JsonUtils.pojoToJson(pipelineService));
return pipelineService;
}
@Transaction
public PipelineService update(String id, String description, URI url,
Schedule ingestionSchedule)
throws IOException {
validateIngestionSchedule(ingestionSchedule);
Utils.validateIngestionSchedule(ingestionSchedule);
PipelineService pipelineService = EntityUtil.validate(id, pipelineServiceDAO().findById(id), PipelineService.class);
// Update fields
pipelineService.withDescription(description).withIngestionSchedule(ingestionSchedule)
@ -92,30 +92,7 @@ public abstract class PipelineServiceRepository {
relationshipDAO().deleteAll(id);
}
private void validateIngestionSchedule(Schedule ingestion) {
if (ingestion == null) {
return;
}
String duration = ingestion.getRepeatFrequency();
// ISO8601 duration format is P{y}Y{m}M{d}DT{h}H{m}M{s}S.
String[] splits = duration.split("T");
if (splits[0].contains("Y") || splits[0].contains("M") ||
(splits.length == 2 && splits[1].contains("S"))) {
throw new IllegalArgumentException("Ingestion repeatFrequency can only contain Days, Hours, and Minutes - " +
"example P{d}DT{h}H{m}M");
}
Period period;
try {
period = ISOPeriodFormat.standard().parsePeriod(duration);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid ingestion repeatFrequency " + duration, e);
}
if (period.toStandardMinutes().getMinutes() < 60) {
throw new IllegalArgumentException("Ingestion repeatFrequency is too short and must be more than 60 minutes");
}
}
public interface PipelineServiceDAO {
@SqlUpdate("INSERT INTO pipeline_service_entity (json) VALUES (:json)")

View File

@ -237,10 +237,7 @@ public abstract class TaskRepository {
}
public EntityReference getOwner(Task task) throws IOException {
if (task == null) {
return null;
}
return EntityUtil.populateOwner(task.getId(), relationshipDAO(), userDAO(), teamDAO());
return task != null ? EntityUtil.populateOwner(task.getId(), relationshipDAO(), userDAO(), teamDAO()) : null;
}
private void setOwner(Task task, EntityReference owner) {

View File

@ -237,10 +237,7 @@ public abstract class TopicRepository {
}
public EntityReference getOwner(Topic topic) throws IOException {
if (topic == null) {
return null;
}
return EntityUtil.populateOwner(topic.getId(), relationshipDAO(), userDAO(), teamDAO());
return topic != null ? EntityUtil.populateOwner(topic.getId(), relationshipDAO(), userDAO(), teamDAO()) : null;
}
private void setOwner(Topic topic, EntityReference owner) {

View File

@ -75,7 +75,7 @@ import java.util.UUID;
@Api(value = "Pipelines collection", tags = "Pipelines collection")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "pipeliness", repositoryClass = "org.openmetadata.catalog.jdbi3.PipelineRepository")
@Collection(name = "pipelines", repositoryClass = "org.openmetadata.catalog.jdbi3.PipelineRepository")
public class PipelineResource {
public static final String PIPELINE_COLLECTION_PATH = "v1/pipelines/";
private final List<String> attributes = RestUtil.getAttributes(Pipeline.class);
@ -144,7 +144,7 @@ public class PipelineResource {
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields") String fieldsParam,
@Parameter(description = "Filter pipelines by service name",
schema = @Schema(type = "string", example = "superset"))
schema = @Schema(type = "string", example = "airflow"))
@QueryParam("service") String serviceParam,
@Parameter(description = "Limit the number pipelines returned. (1 to 1000000, " +
"default = 10)")
@ -175,9 +175,9 @@ public class PipelineResource {
@GET
@Path("/{id}")
@Operation(summary = "Get a pipeline", tags = "pipelines",
description = "Get a pipelines by `id`.",
description = "Get a pipeline by `id`.",
responses = {
@ApiResponse(responseCode = "200", description = "The pipelines",
@ApiResponse(responseCode = "200", description = "The pipeline",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = Pipeline.class))),
@ApiResponse(responseCode = "404", description = "Pipeline for instance {id} is not found")

View File

@ -27,7 +27,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.openmetadata.catalog.api.services.CreatePipelineService;
import org.openmetadata.catalog.api.services.UpdatePipelineService;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.jdbi3.PipelineServiceRepository;
import org.openmetadata.catalog.resources.Collection;
@ -95,7 +94,7 @@ public class PipelineServiceResource {
}
@GET
@Operation(summary = "List Pipeline services", tags = "services",
@Operation(summary = "List pipeline services", tags = "services",
description = "Get a list of pipeline services.",
responses = {
@ApiResponse(responseCode = "200", description = "List of pipeline service instances",
@ -124,13 +123,13 @@ public class PipelineServiceResource {
@GET
@Path("/name/{name}")
@Operation(summary = "Get messaging service by name", tags = "services",
description = "Get a messaging service by the service `name`.",
@Operation(summary = "Get pipeline service by name", tags = "services",
description = "Get a pipeline service by the service `name`.",
responses = {
@ApiResponse(responseCode = "200", description = "Messaging service instance",
@ApiResponse(responseCode = "200", description = "Pipeline service instance",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = Dashboard.class))),
@ApiResponse(responseCode = "404", description = "Messaging service for instance {id} is not found")
@ApiResponse(responseCode = "404", description = "Pipeline service for instance {id} is not found")
})
public PipelineService getByName(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@ -139,12 +138,12 @@ public class PipelineServiceResource {
}
@POST
@Operation(summary = "Create a messaging service", tags = "services",
description = "Create a new messaging service.",
@Operation(summary = "Create a pipeline service", tags = "services",
description = "Create a new pipeline service.",
responses = {
@ApiResponse(responseCode = "200", description = "Messaging service instance",
@ApiResponse(responseCode = "200", description = "Pipeline service instance",
content = @Content(mediaType = "application/json",
schema = @Schema(implementation = MessagingService.class))),
schema = @Schema(implementation = PipelineService.class))),
@ApiResponse(responseCode = "400", description = "Bad request")
})
public Response create(@Context UriInfo uriInfo,

View File

@ -0,0 +1,39 @@
package org.openmetadata.catalog.util;
import org.joda.time.Period;
import org.joda.time.format.ISOPeriodFormat;
import org.openmetadata.catalog.type.Schedule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
private Utils() {}
public static void validateIngestionSchedule(Schedule ingestion) {
if (ingestion == null) {
return;
}
String duration = ingestion.getRepeatFrequency();
// ISO8601 duration format is P{y}Y{m}M{d}DT{h}H{m}M{s}S.
String[] splits = duration.split("T");
if (splits[0].contains("Y") || splits[0].contains("M") ||
(splits.length == 2 && splits[1].contains("S"))) {
throw new IllegalArgumentException("Ingestion repeatFrequency can only contain Days, Hours, and Minutes - " +
"example P{d}DT{h}H{m}M");
}
Period period;
try {
period = ISOPeriodFormat.standard().parsePeriod(duration);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid ingestion repeatFrequency " + duration, e);
}
if (period.toStandardMinutes().getMinutes() < 60) {
throw new IllegalArgumentException("Ingestion repeatFrequency is too short and must be more than 60 minutes");
}
}
}