mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-15 12:00:22 +00:00
Fix #579: Pipeline, Task Entities and PipeineService for Airflow, Prefect
This commit is contained in:
parent
7d883b2c64
commit
169ae80a20
@ -24,14 +24,11 @@ import io.swagger.v3.oas.annotations.Parameter;
|
|||||||
import io.swagger.v3.oas.annotations.media.Content;
|
import io.swagger.v3.oas.annotations.media.Content;
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||||
import org.openmetadata.catalog.api.services.CreateMessagingService;
|
|
||||||
import org.openmetadata.catalog.api.services.CreatePipelineService;
|
import org.openmetadata.catalog.api.services.CreatePipelineService;
|
||||||
import org.openmetadata.catalog.api.services.UpdateMessagingService;
|
|
||||||
import org.openmetadata.catalog.api.services.UpdatePipelineService;
|
import org.openmetadata.catalog.api.services.UpdatePipelineService;
|
||||||
import org.openmetadata.catalog.entity.data.Dashboard;
|
import org.openmetadata.catalog.entity.data.Dashboard;
|
||||||
import org.openmetadata.catalog.entity.services.MessagingService;
|
import org.openmetadata.catalog.entity.services.MessagingService;
|
||||||
import org.openmetadata.catalog.entity.services.PipelineService;
|
import org.openmetadata.catalog.entity.services.PipelineService;
|
||||||
import org.openmetadata.catalog.jdbi3.MessagingServiceRepository;
|
|
||||||
import org.openmetadata.catalog.jdbi3.PipelineServiceRepository;
|
import org.openmetadata.catalog.jdbi3.PipelineServiceRepository;
|
||||||
import org.openmetadata.catalog.resources.Collection;
|
import org.openmetadata.catalog.resources.Collection;
|
||||||
import org.openmetadata.catalog.security.CatalogAuthorizer;
|
import org.openmetadata.catalog.security.CatalogAuthorizer;
|
||||||
|
@ -30,7 +30,6 @@ import org.openmetadata.catalog.api.data.CreateTask;
|
|||||||
import org.openmetadata.catalog.entity.data.Chart;
|
import org.openmetadata.catalog.entity.data.Chart;
|
||||||
import org.openmetadata.catalog.entity.data.Dashboard;
|
import org.openmetadata.catalog.entity.data.Dashboard;
|
||||||
import org.openmetadata.catalog.entity.data.Task;
|
import org.openmetadata.catalog.entity.data.Task;
|
||||||
import org.openmetadata.catalog.jdbi3.ChartRepository;
|
|
||||||
import org.openmetadata.catalog.jdbi3.TaskRepository;
|
import org.openmetadata.catalog.jdbi3.TaskRepository;
|
||||||
import org.openmetadata.catalog.resources.Collection;
|
import org.openmetadata.catalog.resources.Collection;
|
||||||
import org.openmetadata.catalog.security.CatalogAuthorizer;
|
import org.openmetadata.catalog.security.CatalogAuthorizer;
|
||||||
|
@ -468,16 +468,16 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
// Ensure Pipeline ID can't be changed using patch
|
// Ensure Pipeline ID can't be changed using patch
|
||||||
Pipeline pipeline = createPipeline(create(test), adminAuthHeaders());
|
Pipeline pipeline = createPipeline(create(test), adminAuthHeaders());
|
||||||
UUID pipelineId = pipeline.getId();
|
UUID pipelineId = pipeline.getId();
|
||||||
String PipelineJson = JsonUtils.pojoToJson(pipeline);
|
String pipelineJson = JsonUtils.pojoToJson(pipeline);
|
||||||
pipeline.setId(UUID.randomUUID());
|
pipeline.setId(UUID.randomUUID());
|
||||||
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
|
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
|
||||||
patchPipeline(pipelineId, PipelineJson, pipeline, adminAuthHeaders()));
|
patchPipeline(pipelineId, pipelineJson, pipeline, adminAuthHeaders()));
|
||||||
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "id"));
|
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "id"));
|
||||||
|
|
||||||
// ID can't be deleted
|
// ID can't be deleted
|
||||||
pipeline.setId(null);
|
pipeline.setId(null);
|
||||||
exception = assertThrows(HttpResponseException.class, () ->
|
exception = assertThrows(HttpResponseException.class, () ->
|
||||||
patchPipeline(pipelineId, PipelineJson, pipeline, adminAuthHeaders()));
|
patchPipeline(pipelineId, pipelineJson, pipeline, adminAuthHeaders()));
|
||||||
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "id"));
|
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "id"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -566,11 +566,11 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Make sure in GET operations the returned Pipeline has all the required information passed during creation
|
// Make sure in GET operations the returned Pipeline has all the required information passed during creation
|
||||||
public static Pipeline getAndValidate(UUID PipelineId,
|
public static Pipeline getAndValidate(UUID pipelineId,
|
||||||
CreatePipeline create,
|
CreatePipeline create,
|
||||||
Map<String, String> authHeaders) throws HttpResponseException {
|
Map<String, String> authHeaders) throws HttpResponseException {
|
||||||
// GET the newly created Pipeline by ID and validate
|
// GET the newly created Pipeline by ID and validate
|
||||||
Pipeline pipeline = getPipeline(PipelineId, "service,owner,tasks", authHeaders);
|
Pipeline pipeline = getPipeline(pipelineId, "service,owner,tasks", authHeaders);
|
||||||
validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService());
|
validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService());
|
||||||
|
|
||||||
// GET the newly created Pipeline by name and validate
|
// GET the newly created Pipeline by name and validate
|
||||||
@ -653,7 +653,7 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
private static Pipeline validatePipeline(Pipeline pipeline, String expectedDescription,
|
private static Pipeline validatePipeline(Pipeline pipeline, String expectedDescription,
|
||||||
EntityReference expectedOwner, EntityReference expectedService,
|
EntityReference expectedOwner, EntityReference expectedService,
|
||||||
List<TagLabel> expectedTags,
|
List<TagLabel> expectedTags,
|
||||||
List<EntityReference> TASKs) throws HttpResponseException {
|
List<EntityReference> tasks) throws HttpResponseException {
|
||||||
assertNotNull(pipeline.getId());
|
assertNotNull(pipeline.getId());
|
||||||
assertNotNull(pipeline.getHref());
|
assertNotNull(pipeline.getHref());
|
||||||
assertEquals(expectedDescription, pipeline.getDescription());
|
assertEquals(expectedDescription, pipeline.getDescription());
|
||||||
@ -672,16 +672,16 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
assertEquals(expectedService.getId(), pipeline.getService().getId());
|
assertEquals(expectedService.getId(), pipeline.getService().getId());
|
||||||
assertEquals(expectedService.getType(), pipeline.getService().getType());
|
assertEquals(expectedService.getType(), pipeline.getService().getType());
|
||||||
}
|
}
|
||||||
validatePipelineTASKs(pipeline, TASKs);
|
validatePipelineTASKs(pipeline, tasks);
|
||||||
validateTags(expectedTags, pipeline.getTags());
|
validateTags(expectedTags, pipeline.getTags());
|
||||||
return pipeline;
|
return pipeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void validatePipelineTASKs(Pipeline pipeline, List<EntityReference> Tasks) {
|
private static void validatePipelineTASKs(Pipeline pipeline, List<EntityReference> tasks) {
|
||||||
if (Tasks != null) {
|
if (tasks != null) {
|
||||||
List<UUID> expectedTASKReferences = new ArrayList<>();
|
List<UUID> expectedTASKReferences = new ArrayList<>();
|
||||||
for (EntityReference Task: Tasks) {
|
for (EntityReference task: tasks) {
|
||||||
expectedTASKReferences.add(Task.getId());
|
expectedTASKReferences.add(task.getId());
|
||||||
}
|
}
|
||||||
List<UUID> actualTaskReferences = new ArrayList<>();
|
List<UUID> actualTaskReferences = new ArrayList<>();
|
||||||
for (EntityReference task: pipeline.getTasks()) {
|
for (EntityReference task: pipeline.getTasks()) {
|
||||||
@ -715,7 +715,7 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
EntityReference newOwner, List<TagLabel> tags,
|
EntityReference newOwner, List<TagLabel> tags,
|
||||||
Map<String, String> authHeaders)
|
Map<String, String> authHeaders)
|
||||||
throws JsonProcessingException, HttpResponseException {
|
throws JsonProcessingException, HttpResponseException {
|
||||||
String PipelineJson = JsonUtils.pojoToJson(pipeline);
|
String pipelineJson = JsonUtils.pojoToJson(pipeline);
|
||||||
|
|
||||||
// Update the table attributes
|
// Update the table attributes
|
||||||
pipeline.setDescription(newDescription);
|
pipeline.setDescription(newDescription);
|
||||||
@ -723,14 +723,14 @@ public class PipelineResourceTest extends CatalogApplicationTest {
|
|||||||
pipeline.setTags(tags);
|
pipeline.setTags(tags);
|
||||||
|
|
||||||
// Validate information returned in patch response has the updates
|
// Validate information returned in patch response has the updates
|
||||||
Pipeline updatedPipeline = patchPipeline(PipelineJson, pipeline, authHeaders);
|
Pipeline updatedPipeline = patchPipeline(pipelineJson, pipeline, authHeaders);
|
||||||
validatePipeline(updatedPipeline, pipeline.getDescription(), newOwner, null, tags,
|
validatePipeline(updatedPipeline, pipeline.getDescription(), newOwner, null, tags,
|
||||||
pipeline.getTasks());
|
pipeline.getTasks());
|
||||||
|
|
||||||
// GET the table and Validate information returned
|
// GET the table and Validate information returned
|
||||||
Pipeline getPipeline = getPipeline(pipeline.getId(), "service,owner", authHeaders);
|
Pipeline getPipeline = getPipeline(pipeline.getId(), "service,owner", authHeaders);
|
||||||
validatePipeline(updatedPipeline, pipeline.getDescription(), newOwner, null, tags,
|
validatePipeline(updatedPipeline, getPipeline.getDescription(), newOwner, null, tags,
|
||||||
pipeline.getTasks());
|
getPipeline.getTasks());
|
||||||
return updatedPipeline;
|
return updatedPipeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,7 +330,7 @@ public class TaskResourceTest extends CatalogApplicationTest {
|
|||||||
|
|
||||||
// Update null description with a new description
|
// Update null description with a new description
|
||||||
Task task = updateAndCheckTask(request.withDescription("newDescription").withDisplayName("newTask")
|
Task task = updateAndCheckTask(request.withDescription("newDescription").withDisplayName("newTask")
|
||||||
, OK, adminAuthHeaders());
|
,OK, adminAuthHeaders());
|
||||||
assertEquals("newDescription", task.getDescription());
|
assertEquals("newDescription", task.getDescription());
|
||||||
assertEquals("newTask", task.getDisplayName());
|
assertEquals("newTask", task.getDisplayName());
|
||||||
}
|
}
|
||||||
@ -394,7 +394,8 @@ public class TaskResourceTest extends CatalogApplicationTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void patch_taskAttributes_200_ok(TestInfo test) throws HttpResponseException, JsonProcessingException, URISyntaxException {
|
public void patch_taskAttributes_200_ok(TestInfo test) throws HttpResponseException, JsonProcessingException,
|
||||||
|
URISyntaxException {
|
||||||
// Create task without description, owner
|
// Create task without description, owner
|
||||||
Task task = createTask(create(test), adminAuthHeaders());
|
Task task = createTask(create(test), adminAuthHeaders());
|
||||||
assertNull(task.getDescription());
|
assertNull(task.getDescription());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user