Fix #862: Task PUT api is not updating the URL and start and end times (#863)

* Fix #862: Task PUT api is not updating the URL and start and end times

* Fix #862: Pipeline/Task PUT api is not updating the URL and start and end times

* Fix #862: Pipeline/Task PUT api is not updating the URL and start and end times
This commit is contained in:
Sriharsha Chintalapani 2021-10-19 14:19:20 -07:00 committed by GitHub
parent 67a257a384
commit d35e2574fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 65 additions and 12 deletions

View File

@ -21,7 +21,6 @@ import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Task;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
@ -207,6 +206,9 @@ public abstract class PipelineRepository {
private Pipeline setFields(Pipeline pipeline, Fields fields) throws IOException {
pipeline.setDisplayName(pipeline.getDisplayName());
pipeline.setPipelineUrl(pipeline.getPipelineUrl());
pipeline.setStartDate(pipeline.getStartDate());
pipeline.setConcurrency(pipeline.getConcurrency());
pipeline.setOwner(fields.contains("owner") ? getOwner(pipeline) : null);
pipeline.setService(fields.contains("service") ? getService(pipeline) : null);
pipeline.setFollowers(fields.contains("followers") ? getFollowers(pipeline) : null);

View File

@ -58,7 +58,7 @@ import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityN
public abstract class TaskRepository {
private static final Logger LOG = LoggerFactory.getLogger(TaskRepository.class);
private static final Fields TASK_UPDATE_FIELDS = new Fields(TaskResource.FIELD_LIST, "owner," +
"taskConfig,tags");
"taskConfig,tags,downstreamTasks");
private static final Fields TASK_PATCH_FIELDS = new Fields(TaskResource.FIELD_LIST, "owner,service,tags");
public static String getFQN(Task task) {
@ -248,9 +248,14 @@ public abstract class TaskRepository {
}
private Task setFields(Task task, Fields fields) throws IOException {
task.setTaskUrl(task.getTaskUrl());
task.setTaskSQL(task.getTaskSQL());
task.setStartDate(task.getStartDate());
task.setEndDate(task.getEndDate());
task.setOwner(fields.contains("owner") ? getOwner(task) : null);
task.setService(fields.contains("service") ? getService(task) : null);
task.setTags(fields.contains("tags") ? getTags(task.getFullyQualifiedName()) : null);
task.setDownstreamTasks(fields.contains("downstreamTasks") ? task.getDownstreamTasks() : null);
return task;
}

View File

@ -274,6 +274,7 @@ public class PipelineResource {
.withDisplayName(create.getDisplayName())
.withDescription(create.getDescription()).withService(create.getService()).withTasks(create.getTasks())
.withPipelineUrl(create.getPipelineUrl()).withTags(create.getTags())
.withConcurrency(create.getConcurrency()).withStartDate(create.getStartDate())
.withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());

View File

@ -119,7 +119,7 @@ public class TaskResource {
}
}
static final String FIELDS = "taskConfig,owner,service,tags";
static final String FIELDS = "downstreamTasks,taskConfig,owner,service,tags";
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "")
.split(","));

View File

@ -18,6 +18,7 @@ package org.openmetadata.catalog.resources.pipelines;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.http.client.HttpResponseException;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@ -50,8 +51,10 @@ import org.slf4j.LoggerFactory;
import javax.json.JsonPatch;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -365,6 +368,23 @@ public class PipelineResourceTest extends CatalogApplicationTest {
assertEquals("description", db.getDescription());
}
@Test
public void put_PipelineUrlUpdate_200(TestInfo test) throws HttpResponseException, URISyntaxException {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription("description");
createAndCheckPipeline(request, adminAuthHeaders());
URI pipelineURI = new URI("https://airflow.open-metadata.org/tree?dag_id=airflow_redshift_usage");
Integer pipelineConcurrency = 110;
Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate();
// Updating description is ignored when backend already has description
Pipeline pipeline = updatePipeline(request.withPipelineUrl(pipelineURI)
.withConcurrency(pipelineConcurrency)
.withStartDate(startDate), OK, adminAuthHeaders());
assertEquals(pipelineURI, pipeline.getPipelineUrl());
assertEquals(startDate, pipeline.getStartDate());
assertEquals(pipelineConcurrency, pipeline.getConcurrency());
}
@Test
public void put_PipelineUpdateOwner_200(TestInfo test) throws HttpResponseException {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription("");

View File

@ -18,6 +18,7 @@ package org.openmetadata.catalog.resources.tasks;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.http.client.HttpResponseException;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@ -49,6 +50,7 @@ import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -349,6 +351,27 @@ public class TaskResourceTest extends CatalogApplicationTest {
assertEquals("description", task.getDescription());
}
@Test
public void put_taskUrlUpdate_200(TestInfo test) throws HttpResponseException, URISyntaxException {
URI taskURI = new URI("http://localhost:8080/task_id=1");
String taskSQL = "select * from test;";
Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate();
Date endDate = new DateTime("2021-12-13T20:20:39+00:00").toDate();
CreateTask request = create(test).withService(AIRFLOW_REFERENCE)
.withDescription("description").withTaskUrl(taskURI);
createAndCheckTask(request, adminAuthHeaders());
// Updating description is ignored when backend already has description
Task task = updateTask(request.withTaskUrl(taskURI).withTaskSQL(taskSQL)
.withTaskType("test").withStartDate(startDate).withEndDate(endDate),
OK, adminAuthHeaders());
assertEquals(taskURI, task.getTaskUrl());
assertEquals(taskSQL, task.getTaskSQL());
assertEquals("test", task.getTaskType());
assertEquals(startDate, task.getStartDate());
assertEquals(endDate, task.getEndDate());
}
@Test
public void put_taskUpdateOwner_200(TestInfo test) throws HttpResponseException, URISyntaxException {
CreateTask request = create(test).withService(AIRFLOW_REFERENCE).withDescription("");
@ -436,7 +459,7 @@ public class TaskResourceTest extends CatalogApplicationTest {
Task task = createTask(create, authHeaders);
assertEquals(0.1, task.getVersion());
validateTask(task, task.getDisplayName(), create.getDescription(), create.getOwner(), create.getService(),
create.getTags(), updatedBy);
create.getTags(), create.getTaskUrl(), updatedBy);
return getAndValidate(task.getId(), create, authHeaders, updatedBy);
}
@ -446,7 +469,7 @@ public class TaskResourceTest extends CatalogApplicationTest {
String updatedBy = TestUtils.getPrincipal(authHeaders);
Task updatedTask = updateTask(create, status, authHeaders);
validateTask(updatedTask, create.getDescription(), create.getOwner(), create.getService(), create.getTags(),
updatedBy);
create.getTaskUrl(), updatedBy);
if (before == null) {
assertEquals(0.1, updatedTask.getVersion()); // First version created
} else {
@ -465,13 +488,13 @@ public class TaskResourceTest extends CatalogApplicationTest {
// GET the newly created task by ID and validate
Task task = getTask(taskId, "service,owner", authHeaders);
validateTask(task, create.getDescription(), create.getOwner(), create.getService(), create.getTags(),
expectedUpdatedBy);
create.getTaskUrl(), expectedUpdatedBy);
// GET the newly created task by name and validate
String fqn = task.getFullyQualifiedName();
task = getTaskByName(fqn, "service,owner", authHeaders);
return validateTask(task, create.getDescription(), create.getOwner(), create.getService(), create.getTags(),
expectedUpdatedBy);
create.getTaskUrl(), expectedUpdatedBy);
}
public static Task updateTask(CreateTask create,
@ -513,22 +536,23 @@ public class TaskResourceTest extends CatalogApplicationTest {
private static Task validateTask(Task task, String expectedDisplayName, String expectedDescription,
EntityReference expectedOwner, EntityReference expectedService,
List<TagLabel> expectedTags, String expectedUpdatedBy)
List<TagLabel> expectedTags, URI expectedTaskUrl, String expectedUpdatedBy)
throws HttpResponseException {
Task newTask = validateTask(task, expectedDescription, expectedOwner, expectedService, expectedTags,
expectedUpdatedBy);
expectedTaskUrl, expectedUpdatedBy);
assertEquals(expectedDisplayName, newTask.getDisplayName());
return task;
}
private static Task validateTask(Task task, String expectedDescription, EntityReference expectedOwner,
EntityReference expectedService, List<TagLabel> expectedTags,
String expectedUpdatedBy)
URI expectedTaskUrl, String expectedUpdatedBy)
throws HttpResponseException {
assertNotNull(task.getId());
assertNotNull(task.getHref());
assertEquals(expectedDescription, task.getDescription());
assertEquals(expectedUpdatedBy, task.getUpdatedBy());
assertEquals(expectedTaskUrl, task.getTaskUrl());
// Validate owner
if (expectedOwner != null) {
@ -561,12 +585,13 @@ public class TaskResourceTest extends CatalogApplicationTest {
// Validate information returned in patch response has the updates
Task updateTask = patchTask(taskJson, before, authHeaders);
validateTask(updateTask, before.getDescription(), newOwner, null, tags, updatedBy);
validateTask(updateTask, before.getDescription(), newOwner, null, tags, before.getTaskUrl(),
updatedBy);
TestUtils.validateUpdate(before.getVersion(), updateTask.getVersion(), updateType);
// GET the task and Validate information returned
Task getTask = getTask(before.getId(), "service,owner,tags", authHeaders);
validateTask(getTask, before.getDescription(), newOwner, null, tags, updatedBy);
validateTask(getTask, before.getDescription(), newOwner, null, tags, before.getTaskUrl(), updatedBy);
return updateTask;
}