Fix #579: Pipeline, Task Entities and PipeineService for Airflow, Prefect

This commit is contained in:
Sriharsha Chintalapani 2021-09-25 00:05:23 -07:00
parent 0a0d5fceae
commit 7d883b2c64

View File

@ -387,11 +387,11 @@ public class PipelineResourceTest extends CatalogApplicationTest {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
createAndCheckPipeline(request, adminAuthHeaders());
Pipeline Pipeline = updateAndCheckPipeline(request
Pipeline pipeline = updateAndCheckPipeline(request
.withDescription("newDescription").withTasks(TASK_REFERENCES),
OK, adminAuthHeaders());
validatePipelineTASKs(Pipeline, TASK_REFERENCES);
assertEquals("newDescription", Pipeline.getDescription());
validatePipelineTASKs(pipeline, TASK_REFERENCES);
assertEquals("newDescription", pipeline.getDescription());
}
@Test
@ -399,16 +399,16 @@ public class PipelineResourceTest extends CatalogApplicationTest {
CreatePipeline request = create(test).withService(AIRFLOW_REFERENCE).withDescription(null);
createAndCheckPipeline(request, adminAuthHeaders());
Pipeline Pipeline = updateAndCheckPipeline(request
Pipeline pipeline = updateAndCheckPipeline(request
.withDescription("newDescription").withTasks(TASK_REFERENCES),
OK, adminAuthHeaders());
validatePipelineTASKs(Pipeline, TASK_REFERENCES);
validatePipelineTASKs(pipeline, TASK_REFERENCES);
// remove a TASK
TASK_REFERENCES.remove(0);
Pipeline = updateAndCheckPipeline(request
pipeline = updateAndCheckPipeline(request
.withDescription("newDescription").withTasks(TASK_REFERENCES),
OK, adminAuthHeaders());
validatePipelineTASKs(Pipeline, TASK_REFERENCES);
validatePipelineTASKs(pipeline, TASK_REFERENCES);
}
@Test
@ -423,97 +423,97 @@ public class PipelineResourceTest extends CatalogApplicationTest {
public void get_PipelineWithDifferentFields_200_OK(TestInfo test) throws HttpResponseException {
CreatePipeline create = create(test).withDescription("description").withOwner(USER_OWNER1)
.withService(AIRFLOW_REFERENCE);
Pipeline Pipeline = createAndCheckPipeline(create, adminAuthHeaders());
validateGetWithDifferentFields(Pipeline, false);
Pipeline pipeline = createAndCheckPipeline(create, adminAuthHeaders());
validateGetWithDifferentFields(pipeline, false);
}
@Test
public void get_PipelineByNameWithDifferentFields_200_OK(TestInfo test) throws HttpResponseException {
CreatePipeline create = create(test).withDescription("description").withOwner(USER_OWNER1)
.withService(AIRFLOW_REFERENCE);
Pipeline Pipeline = createAndCheckPipeline(create, adminAuthHeaders());
validateGetWithDifferentFields(Pipeline, true);
Pipeline pipeline = createAndCheckPipeline(create, adminAuthHeaders());
validateGetWithDifferentFields(pipeline, true);
}
@Test
public void patch_PipelineAttributes_200_ok(TestInfo test) throws HttpResponseException, JsonProcessingException {
// Create Pipeline without description, owner
Pipeline Pipeline = createPipeline(create(test), adminAuthHeaders());
assertNull(Pipeline.getDescription());
assertNull(Pipeline.getOwner());
assertNotNull(Pipeline.getService());
Pipeline pipeline = createPipeline(create(test), adminAuthHeaders());
assertNull(pipeline.getDescription());
assertNull(pipeline.getOwner());
assertNotNull(pipeline.getService());
Pipeline = getPipeline(Pipeline.getId(), "service,owner", adminAuthHeaders());
Pipeline.getService().setHref(null); // href is readonly and not patchable
List<TagLabel> PipelineTags = singletonList(TIER_1);
pipeline = getPipeline(pipeline.getId(), "service,owner", adminAuthHeaders());
pipeline.getService().setHref(null); // href is readonly and not patchable
List<TagLabel> pipelineTags = singletonList(TIER_1);
// Add description, owner when previously they were null
Pipeline = patchPipelineAttributesAndCheck(Pipeline, "description",
TEAM_OWNER1, PipelineTags, adminAuthHeaders());
Pipeline.setOwner(TEAM_OWNER1); // Get rid of href and name returned in the response for owner
Pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
PipelineTags = singletonList(USER_ADDRESS_TAG_LABEL);
pipeline = patchPipelineAttributesAndCheck(pipeline, "description",
TEAM_OWNER1, pipelineTags, adminAuthHeaders());
pipeline.setOwner(TEAM_OWNER1); // Get rid of href and name returned in the response for owner
pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
pipelineTags = singletonList(USER_ADDRESS_TAG_LABEL);
// Replace description, tier, owner
Pipeline = patchPipelineAttributesAndCheck(Pipeline, "description1",
USER_OWNER1, PipelineTags, adminAuthHeaders());
Pipeline.setOwner(USER_OWNER1); // Get rid of href and name returned in the response for owner
Pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
pipeline = patchPipelineAttributesAndCheck(pipeline, "description1",
USER_OWNER1, pipelineTags, adminAuthHeaders());
pipeline.setOwner(USER_OWNER1); // Get rid of href and name returned in the response for owner
pipeline.setService(AIRFLOW_REFERENCE); // Get rid of href and name returned in the response for service
// Remove description, tier, owner
patchPipelineAttributesAndCheck(Pipeline, null, null, PipelineTags, adminAuthHeaders());
patchPipelineAttributesAndCheck(pipeline, null, null, pipelineTags, adminAuthHeaders());
}
@Test
public void patch_PipelineIDChange_400(TestInfo test) throws HttpResponseException, JsonProcessingException {
// Ensure Pipeline ID can't be changed using patch
Pipeline Pipeline = createPipeline(create(test), adminAuthHeaders());
UUID PipelineId = Pipeline.getId();
String PipelineJson = JsonUtils.pojoToJson(Pipeline);
Pipeline.setId(UUID.randomUUID());
Pipeline pipeline = createPipeline(create(test), adminAuthHeaders());
UUID pipelineId = pipeline.getId();
String PipelineJson = JsonUtils.pojoToJson(pipeline);
pipeline.setId(UUID.randomUUID());
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
patchPipeline(PipelineId, PipelineJson, Pipeline, adminAuthHeaders()));
patchPipeline(pipelineId, PipelineJson, pipeline, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "id"));
// ID can't be deleted
Pipeline.setId(null);
pipeline.setId(null);
exception = assertThrows(HttpResponseException.class, () ->
patchPipeline(PipelineId, PipelineJson, Pipeline, adminAuthHeaders()));
patchPipeline(pipelineId, PipelineJson, pipeline, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "id"));
}
@Test
public void patch_PipelineNameChange_400(TestInfo test) throws HttpResponseException, JsonProcessingException {
// Ensure Pipeline name can't be changed using patch
Pipeline Pipeline = createPipeline(create(test), adminAuthHeaders());
String PipelineJson = JsonUtils.pojoToJson(Pipeline);
Pipeline.setName("newName");
Pipeline pipeline = createPipeline(create(test), adminAuthHeaders());
String pipelineJson = JsonUtils.pojoToJson(pipeline);
pipeline.setName("newName");
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
patchPipeline(PipelineJson, Pipeline, adminAuthHeaders()));
patchPipeline(pipelineJson, pipeline, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "name"));
// Name can't be removed
Pipeline.setName(null);
pipeline.setName(null);
exception = assertThrows(HttpResponseException.class, () ->
patchPipeline(PipelineJson, Pipeline, adminAuthHeaders()));
patchPipeline(pipelineJson, pipeline, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "name"));
}
@Test
public void patch_PipelineRemoveService_400(TestInfo test) throws HttpResponseException, JsonProcessingException {
// Ensure service corresponding to Pipeline can't be changed by patch operation
Pipeline Pipeline = createPipeline(create(test), adminAuthHeaders());
Pipeline.getService().setHref(null); // Remove href from returned response as it is read-only field
Pipeline pipeline = createPipeline(create(test), adminAuthHeaders());
pipeline.getService().setHref(null); // Remove href from returned response as it is read-only field
String PipelineJson = JsonUtils.pojoToJson(Pipeline);
Pipeline.setService(PREFECT_REFERENCE);
String pipelineJson = JsonUtils.pojoToJson(pipeline);
pipeline.setService(PREFECT_REFERENCE);
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
patchPipeline(PipelineJson, Pipeline, adminAuthHeaders()));
patchPipeline(pipelineJson, pipeline, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "service"));
// Service relationship can't be removed
Pipeline.setService(null);
pipeline.setService(null);
exception = assertThrows(HttpResponseException.class, () ->
patchPipeline(PipelineJson, Pipeline, adminAuthHeaders()));
patchPipeline(pipelineJson, pipeline, adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, readOnlyAttribute(Entity.PIPELINE, "service"));
}
@ -522,8 +522,8 @@ public class PipelineResourceTest extends CatalogApplicationTest {
@Test
public void delete_emptyPipeline_200_ok(TestInfo test) throws HttpResponseException {
Pipeline Pipeline = createPipeline(create(test), adminAuthHeaders());
deletePipeline(Pipeline.getId(), adminAuthHeaders());
Pipeline pipeline = createPipeline(create(test), adminAuthHeaders());
deletePipeline(pipeline.getId(), adminAuthHeaders());
}
@Test
@ -540,19 +540,19 @@ public class PipelineResourceTest extends CatalogApplicationTest {
public static Pipeline createAndCheckPipeline(CreatePipeline create,
Map<String, String> authHeaders) throws HttpResponseException {
Pipeline Pipeline = createPipeline(create, authHeaders);
validatePipeline(Pipeline, create.getDisplayName(),
Pipeline pipeline = createPipeline(create, authHeaders);
validatePipeline(pipeline, create.getDisplayName(),
create.getDescription(), create.getOwner(), create.getService());
return getAndValidate(Pipeline.getId(), create, authHeaders);
return getAndValidate(pipeline.getId(), create, authHeaders);
}
public static Pipeline createAndCheckPipeline(CreatePipeline create, List<EntityReference> tasks,
Map<String, String> authHeaders) throws HttpResponseException {
create.withTasks(tasks);
Pipeline Pipeline = createPipeline(create, authHeaders);
validatePipeline(Pipeline, create.getDescription(), create.getOwner(), create.getService(), create.getTags(),
Pipeline pipeline = createPipeline(create, authHeaders);
validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService(), create.getTags(),
tasks);
return getAndValidate(Pipeline.getId(), create, authHeaders);
return getAndValidate(pipeline.getId(), create, authHeaders);
}
public static Pipeline updateAndCheckPipeline(CreatePipeline create,
@ -570,13 +570,13 @@ public class PipelineResourceTest extends CatalogApplicationTest {
CreatePipeline create,
Map<String, String> authHeaders) throws HttpResponseException {
// GET the newly created Pipeline by ID and validate
Pipeline Pipeline = getPipeline(PipelineId, "service,owner,tasks", authHeaders);
validatePipeline(Pipeline, create.getDescription(), create.getOwner(), create.getService());
Pipeline pipeline = getPipeline(PipelineId, "service,owner,tasks", authHeaders);
validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService());
// GET the newly created Pipeline by name and validate
String fqn = Pipeline.getFullyQualifiedName();
Pipeline = getPipelineByName(fqn, "service,owner,tasks", authHeaders);
return validatePipeline(Pipeline, create.getDescription(), create.getOwner(), create.getService());
String fqn = pipeline.getFullyQualifiedName();
pipeline = getPipelineByName(fqn, "service,owner,tasks", authHeaders);
return validatePipeline(pipeline, create.getDescription(), create.getOwner(), create.getService());
}
public static Pipeline updatePipeline(CreatePipeline create,
@ -592,103 +592,103 @@ public class PipelineResourceTest extends CatalogApplicationTest {
}
/** Validate returned fields GET .../pipelines/{id}?fields="..." or GET .../pipelines/name/{fqn}?fields="..." */
private void validateGetWithDifferentFields(Pipeline Pipeline, boolean byName) throws HttpResponseException {
private void validateGetWithDifferentFields(Pipeline pipeline, boolean byName) throws HttpResponseException {
// .../Pipelines?fields=owner
String fields = "owner";
Pipeline = byName ? getPipelineByName(Pipeline.getFullyQualifiedName(), fields, adminAuthHeaders()) :
getPipeline(Pipeline.getId(), fields, adminAuthHeaders());
assertNotNull(Pipeline.getOwner());
assertNull(Pipeline.getService());
assertNull(Pipeline.getTasks());
pipeline = byName ? getPipelineByName(pipeline.getFullyQualifiedName(), fields, adminAuthHeaders()) :
getPipeline(pipeline.getId(), fields, adminAuthHeaders());
assertNotNull(pipeline.getOwner());
assertNull(pipeline.getService());
assertNull(pipeline.getTasks());
// .../Pipelines?fields=owner,service
fields = "owner,service";
Pipeline = byName ? getPipelineByName(Pipeline.getFullyQualifiedName(), fields, adminAuthHeaders()) :
getPipeline(Pipeline.getId(), fields, adminAuthHeaders());
assertNotNull(Pipeline.getOwner());
assertNotNull(Pipeline.getService());
assertNull(Pipeline.getTasks());
pipeline = byName ? getPipelineByName(pipeline.getFullyQualifiedName(), fields, adminAuthHeaders()) :
getPipeline(pipeline.getId(), fields, adminAuthHeaders());
assertNotNull(pipeline.getOwner());
assertNotNull(pipeline.getService());
assertNull(pipeline.getTasks());
// .../Pipelines?fields=owner,service,tables
fields = "owner,service,tasks";
Pipeline = byName ? getPipelineByName(Pipeline.getFullyQualifiedName(), fields, adminAuthHeaders()) :
getPipeline(Pipeline.getId(), fields, adminAuthHeaders());
assertNotNull(Pipeline.getOwner());
assertNotNull(Pipeline.getService());
assertNotNull(Pipeline.getTasks());
TestUtils.validateEntityReference(Pipeline.getTasks());
pipeline = byName ? getPipelineByName(pipeline.getFullyQualifiedName(), fields, adminAuthHeaders()) :
getPipeline(pipeline.getId(), fields, adminAuthHeaders());
assertNotNull(pipeline.getOwner());
assertNotNull(pipeline.getService());
assertNotNull(pipeline.getTasks());
TestUtils.validateEntityReference(pipeline.getTasks());
}
private static Pipeline validatePipeline(Pipeline Pipeline, String expectedDisplayName,
private static Pipeline validatePipeline(Pipeline pipeline, String expectedDisplayName,
String expectedDescription,
EntityReference expectedOwner, EntityReference expectedService) {
Pipeline newPipeline = validatePipeline(Pipeline, expectedDescription, expectedOwner, expectedService);
Pipeline newPipeline = validatePipeline(pipeline, expectedDescription, expectedOwner, expectedService);
assertEquals(expectedDisplayName, newPipeline.getDisplayName());
return newPipeline;
}
private static Pipeline validatePipeline(Pipeline Pipeline, String expectedDescription,
private static Pipeline validatePipeline(Pipeline pipeline, String expectedDescription,
EntityReference expectedOwner, EntityReference expectedService) {
assertNotNull(Pipeline.getId());
assertNotNull(Pipeline.getHref());
assertEquals(expectedDescription, Pipeline.getDescription());
assertNotNull(pipeline.getId());
assertNotNull(pipeline.getHref());
assertEquals(expectedDescription, pipeline.getDescription());
// Validate owner
if (expectedOwner != null) {
TestUtils.validateEntityReference(Pipeline.getOwner());
assertEquals(expectedOwner.getId(), Pipeline.getOwner().getId());
assertEquals(expectedOwner.getType(), Pipeline.getOwner().getType());
assertNotNull(Pipeline.getOwner().getHref());
TestUtils.validateEntityReference(pipeline.getOwner());
assertEquals(expectedOwner.getId(), pipeline.getOwner().getId());
assertEquals(expectedOwner.getType(), pipeline.getOwner().getType());
assertNotNull(pipeline.getOwner().getHref());
}
// Validate service
if (expectedService != null) {
TestUtils.validateEntityReference(Pipeline.getService());
assertEquals(expectedService.getId(), Pipeline.getService().getId());
assertEquals(expectedService.getType(), Pipeline.getService().getType());
TestUtils.validateEntityReference(pipeline.getService());
assertEquals(expectedService.getId(), pipeline.getService().getId());
assertEquals(expectedService.getType(), pipeline.getService().getType());
}
return Pipeline;
return pipeline;
}
private static Pipeline validatePipeline(Pipeline Pipeline, String expectedDescription,
private static Pipeline validatePipeline(Pipeline pipeline, String expectedDescription,
EntityReference expectedOwner, EntityReference expectedService,
List<TagLabel> expectedTags,
List<EntityReference> TASKs) throws HttpResponseException {
assertNotNull(Pipeline.getId());
assertNotNull(Pipeline.getHref());
assertEquals(expectedDescription, Pipeline.getDescription());
assertNotNull(pipeline.getId());
assertNotNull(pipeline.getHref());
assertEquals(expectedDescription, pipeline.getDescription());
// Validate owner
if (expectedOwner != null) {
TestUtils.validateEntityReference(Pipeline.getOwner());
assertEquals(expectedOwner.getId(), Pipeline.getOwner().getId());
assertEquals(expectedOwner.getType(), Pipeline.getOwner().getType());
assertNotNull(Pipeline.getOwner().getHref());
TestUtils.validateEntityReference(pipeline.getOwner());
assertEquals(expectedOwner.getId(), pipeline.getOwner().getId());
assertEquals(expectedOwner.getType(), pipeline.getOwner().getType());
assertNotNull(pipeline.getOwner().getHref());
}
// Validate service
if (expectedService != null) {
TestUtils.validateEntityReference(Pipeline.getService());
assertEquals(expectedService.getId(), Pipeline.getService().getId());
assertEquals(expectedService.getType(), Pipeline.getService().getType());
TestUtils.validateEntityReference(pipeline.getService());
assertEquals(expectedService.getId(), pipeline.getService().getId());
assertEquals(expectedService.getType(), pipeline.getService().getType());
}
validatePipelineTASKs(Pipeline, TASKs);
validateTags(expectedTags, Pipeline.getTags());
return Pipeline;
validatePipelineTASKs(pipeline, TASKs);
validateTags(expectedTags, pipeline.getTags());
return pipeline;
}
private static void validatePipelineTASKs(Pipeline Pipeline, List<EntityReference> TASKs) {
if (TASKs != null) {
private static void validatePipelineTASKs(Pipeline pipeline, List<EntityReference> Tasks) {
if (Tasks != null) {
List<UUID> expectedTASKReferences = new ArrayList<>();
for (EntityReference TASK: TASKs) {
expectedTASKReferences.add(TASK.getId());
for (EntityReference Task: Tasks) {
expectedTASKReferences.add(Task.getId());
}
List<UUID> actualTASKReferences = new ArrayList<>();
for (EntityReference TASK: Pipeline.getTasks()) {
TestUtils.validateEntityReference(TASK);
actualTASKReferences.add(TASK.getId());
List<UUID> actualTaskReferences = new ArrayList<>();
for (EntityReference task: pipeline.getTasks()) {
TestUtils.validateEntityReference(task);
actualTaskReferences.add(task.getId());
}
assertTrue(actualTASKReferences.containsAll(expectedTASKReferences));
assertTrue(actualTaskReferences.containsAll(expectedTASKReferences));
}
}
@ -711,35 +711,35 @@ public class PipelineResourceTest extends CatalogApplicationTest {
assertTrue(updatedExpectedList.containsAll(actualList));
}
private Pipeline patchPipelineAttributesAndCheck(Pipeline Pipeline, String newDescription,
private Pipeline patchPipelineAttributesAndCheck(Pipeline pipeline, String newDescription,
EntityReference newOwner, List<TagLabel> tags,
Map<String, String> authHeaders)
throws JsonProcessingException, HttpResponseException {
String PipelineJson = JsonUtils.pojoToJson(Pipeline);
String PipelineJson = JsonUtils.pojoToJson(pipeline);
// Update the table attributes
Pipeline.setDescription(newDescription);
Pipeline.setOwner(newOwner);
Pipeline.setTags(tags);
pipeline.setDescription(newDescription);
pipeline.setOwner(newOwner);
pipeline.setTags(tags);
// Validate information returned in patch response has the updates
Pipeline updatedPipeline = patchPipeline(PipelineJson, Pipeline, authHeaders);
validatePipeline(updatedPipeline, Pipeline.getDescription(), newOwner, null, tags,
Pipeline.getTasks());
Pipeline updatedPipeline = patchPipeline(PipelineJson, pipeline, authHeaders);
validatePipeline(updatedPipeline, pipeline.getDescription(), newOwner, null, tags,
pipeline.getTasks());
// GET the table and Validate information returned
Pipeline getPipeline = getPipeline(Pipeline.getId(), "service,owner", authHeaders);
validatePipeline(updatedPipeline, Pipeline.getDescription(), newOwner, null, tags,
Pipeline.getTasks());
Pipeline getPipeline = getPipeline(pipeline.getId(), "service,owner", authHeaders);
validatePipeline(updatedPipeline, pipeline.getDescription(), newOwner, null, tags,
pipeline.getTasks());
return updatedPipeline;
}
private Pipeline patchPipeline(UUID PipelineId, String originalJson, Pipeline updatedPipeline,
private Pipeline patchPipeline(UUID pipelineId, String originalJson, Pipeline updatedPipeline,
Map<String, String> authHeaders)
throws JsonProcessingException, HttpResponseException {
String updatePipelineJson = JsonUtils.pojoToJson(updatedPipeline);
JsonPatch patch = JsonSchemaUtil.getJsonPatch(originalJson, updatePipelineJson);
return TestUtils.patch(getResource("pipelines/" + PipelineId), patch, Pipeline.class, authHeaders);
return TestUtils.patch(getResource("pipelines/" + pipelineId), patch, Pipeline.class, authHeaders);
}
private Pipeline patchPipeline(String originalJson,