Fixes #1358 - Add change events to PipelineService and StorageService entity

This commit is contained in:
sureshms 2021-11-24 14:26:23 -08:00
parent ec6c6f48c4
commit c8cc96f6f5
5 changed files with 75 additions and 175 deletions

View File

@ -35,7 +35,6 @@ import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.UUID;

View File

@ -201,12 +201,8 @@ public class PipelineServiceRepository extends EntityRepository<PipelineService>
@Override
public void entitySpecificUpdate() throws IOException {
updatePipelineUrl();
updateIngestionSchedule();
}
private void updatePipelineUrl() throws JsonProcessingException {
recordChange("pipelineUrl", original.getEntity().getPipelineUrl(), updated.getEntity().getPipelineUrl());
updateIngestionSchedule();
}
private void updateIngestionSchedule() throws JsonProcessingException {

View File

@ -234,9 +234,7 @@ public class MessagingServiceResourceTest extends EntityResourceTest<MessagingSe
change.getFieldsUpdated().add(new FieldChange().withName("schemaRegistry")
.withOldValue(SCHEMA_REGISTRY_URL).withNewValue(updatedSchemaRegistry));
service = updateAndCheckEntity(update, OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change);
// MessagingService updatedService = getService(service.getId(), adminAuthHeaders());
// validateMessagingServiceConfig(updatedService, List.of("localhost:0"), new URI("http://localhost:9000"));
updateAndCheckEntity(update, OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change);
}
@Test
@ -353,12 +351,6 @@ public class MessagingServiceResourceTest extends EntityResourceTest<MessagingSe
.withIngestionSchedule(new Schedule().withStartDate(new Date()).withRepeatFrequency("P1D"));
}
private static void validateMessagingServiceConfig(MessagingService actualService, List<String> expectedBrokers,
URI expectedSchemaRegistry) {
assertTrue(actualService.getBrokers().containsAll(expectedBrokers));
assertEquals(actualService.getSchemaRegistry(), expectedSchemaRegistry);
}
@Override
public Object createRequest(TestInfo test, int index, String description, String displayName, EntityReference owner)
throws URISyntaxException {
@ -376,6 +368,8 @@ public class MessagingServiceResourceTest extends EntityResourceTest<MessagingSe
assertEquals(expectedIngestion.getStartDate(), service.getIngestionSchedule().getStartDate());
assertEquals(expectedIngestion.getRepeatFrequency(), service.getIngestionSchedule().getRepeatFrequency());
}
assertTrue(createRequest.getBrokers().containsAll(service.getBrokers()));
assertEquals(createRequest.getSchemaRegistry(), service.getSchemaRegistry());
}
@Override

View File

@ -28,18 +28,19 @@ import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.jdbi3.PipelineServiceRepository.PipelineServiceEntityInterface;
import org.openmetadata.catalog.resources.EntityResourceTest;
import org.openmetadata.catalog.resources.services.pipeline.PipelineServiceResource.PipelineServiceList;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.type.Schedule;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.TestUtils;
import org.openmetadata.catalog.util.TestUtils.UpdateType;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -49,10 +50,10 @@ import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import static org.openmetadata.catalog.util.TestUtils.authHeaders;
import static org.openmetadata.catalog.util.TestUtils.getPrincipal;
public class PipelineServiceResourceTest extends EntityResourceTest<PipelineService> {
@ -76,15 +77,25 @@ public class PipelineServiceResourceTest extends EntityResourceTest<PipelineServ
}
@Override
public void validateCreatedEntity(PipelineService createdEntity, Object request, Map<String, String> authHeaders)
public void validateCreatedEntity(PipelineService service, Object request, Map<String, String> authHeaders)
throws HttpResponseException {
CreatePipelineService createRequest = (CreatePipelineService) request;
validateCommonEntityFields(getEntityInterface(service), createRequest.getDescription(), getPrincipal(authHeaders),
null);
assertEquals(createRequest.getName(), service.getName());
Schedule expectedIngestion = createRequest.getIngestionSchedule();
if (expectedIngestion != null) {
assertEquals(expectedIngestion.getStartDate(), service.getIngestionSchedule().getStartDate());
assertEquals(expectedIngestion.getRepeatFrequency(), service.getIngestionSchedule().getRepeatFrequency());
}
assertEquals(createRequest.getPipelineUrl(), service.getPipelineUrl());
}
@Override
public void validateUpdatedEntity(PipelineService updatedEntity, Object request, Map<String, String> authHeaders)
throws HttpResponseException {
validateCreatedEntity(updatedEntity, request, authHeaders);
}
@Override
@ -145,12 +156,12 @@ public class PipelineServiceResourceTest extends EntityResourceTest<PipelineServ
}
@Test
public void post_validService_as_admin_200_ok(TestInfo test) throws HttpResponseException {
public void post_validService_as_admin_200_ok(TestInfo test) throws IOException {
// Create pipeline service with different optional fields
Map<String, String> authHeaders = adminAuthHeaders();
createAndCheckService(create(test, 1).withDescription(null), authHeaders);
createAndCheckService(create(test, 2).withDescription("description"), authHeaders);
createAndCheckService(create(test, 3).withIngestionSchedule(null), authHeaders);
createAndCheckEntity(create(test, 1).withDescription(null), authHeaders);
createAndCheckEntity(create(test, 2).withDescription("description"), authHeaders);
createAndCheckEntity(create(test, 3).withIngestionSchedule(null), authHeaders);
}
@Test
@ -159,7 +170,7 @@ public class PipelineServiceResourceTest extends EntityResourceTest<PipelineServ
Map<String, String> authHeaders = authHeaders("test@open-metadata.org");
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createAndCheckService(create(test, 1).withDescription(null), authHeaders));
createAndCheckEntity(create(test, 1).withDescription(null), authHeaders));
TestUtils.assertResponse(exception, FORBIDDEN,
"Principal: CatalogPrincipal{name='test'} is not admin");
}
@ -197,16 +208,16 @@ public class PipelineServiceResourceTest extends EntityResourceTest<PipelineServ
}
@Test
public void post_validIngestionSchedules_as_admin_200(TestInfo test) throws HttpResponseException {
public void post_validIngestionSchedules_as_admin_200(TestInfo test) throws IOException {
Schedule schedule = new Schedule().withStartDate(new Date());
schedule.withRepeatFrequency("PT60M"); // Repeat every 60M should be valid
createAndCheckService(create(test, 1).withIngestionSchedule(schedule), adminAuthHeaders());
createAndCheckEntity(create(test, 1).withIngestionSchedule(schedule), adminAuthHeaders());
schedule.withRepeatFrequency("PT1H49M");
createAndCheckService(create(test, 2).withIngestionSchedule(schedule), adminAuthHeaders());
createAndCheckEntity(create(test, 2).withIngestionSchedule(schedule), adminAuthHeaders());
schedule.withRepeatFrequency("P1DT1H49M");
createAndCheckService(create(test, 3).withIngestionSchedule(schedule), adminAuthHeaders());
createAndCheckEntity(create(test, 3).withIngestionSchedule(schedule), adminAuthHeaders());
}
@Test
@ -227,40 +238,44 @@ public class PipelineServiceResourceTest extends EntityResourceTest<PipelineServ
}
@Test
public void put_updateService_as_admin_2xx(TestInfo test) throws HttpResponseException, URISyntaxException {
PipelineService dbService = createAndCheckService(create(test).withDescription(null).withIngestionSchedule(null)
public void put_updateService_as_admin_2xx(TestInfo test) throws IOException, URISyntaxException {
PipelineService service = createAndCheckEntity(create(test).withDescription(null).withIngestionSchedule(null)
.withPipelineUrl(PIPELINE_SERVICE_URL), adminAuthHeaders());
// Update pipeline description and ingestion service that are null
CreatePipelineService update = create(test).withDescription("description1");
updateAndCheckService(update, OK, adminAuthHeaders());
Schedule schedule = update.getIngestionSchedule();
ChangeDescription change = getChangeDescription(service.getVersion());
change.getFieldsAdded().add(new FieldChange().withName("description").withNewValue("description1"));
change.getFieldsAdded().add(new FieldChange().withName("ingestionSchedule").withNewValue(schedule));
service = updateAndCheckEntity(update, OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change);
// Update ingestion schedule
Schedule schedule = new Schedule().withStartDate(new Date()).withRepeatFrequency("P1D");
update.withIngestionSchedule(schedule);
updateAndCheckService(update, OK, adminAuthHeaders());
// Update ingestion schedule again
Schedule schedule1 = new Schedule().withStartDate(new Date()).withRepeatFrequency("PT1H");
update.withIngestionSchedule(schedule1);
change = getChangeDescription(service.getVersion());
change.getFieldsUpdated().add(new FieldChange().withName("ingestionSchedule").withOldValue(schedule)
.withNewValue(schedule1));
service = updateAndCheckEntity(update, OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change);
// Update ingestion schedule again
update.withIngestionSchedule(schedule.withRepeatFrequency("PT1H"));
updateAndCheckService(update, OK, adminAuthHeaders());
// update broker list and schema registry
update.withPipelineUrl(new URI("http://localhost:9000"));
updateAndCheckService(update, OK, adminAuthHeaders());
PipelineService updatedService = getService(dbService.getId(), adminAuthHeaders());
validatePipelineServiceConfig(updatedService, List.of("localhost:0"), new URI("http://localhost:9000"));
// update pipeline Url
URI pipelineUrl = new URI("http://localhost:9000");
update.withPipelineUrl(pipelineUrl);
change = getChangeDescription(service.getVersion());
change.getFieldsUpdated().add(new FieldChange().withName("pipelineUrl")
.withOldValue(PIPELINE_SERVICE_URL).withNewValue(pipelineUrl));
updateAndCheckEntity(update, OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change);
}
@Test
public void put_update_as_non_admin_401(TestInfo test) throws HttpResponseException {
public void put_update_as_non_admin_401(TestInfo test) throws IOException {
Map<String, String> authHeaders = adminAuthHeaders();
PipelineService pipelineService = createAndCheckService(create(test).withDescription(null)
.withIngestionSchedule(null),
authHeaders);
createAndCheckEntity(create(test).withDescription(null) .withIngestionSchedule(null), authHeaders);
// Update pipeline description and ingestion service that are null
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
updateAndCheckService(create(test), OK, authHeaders("test@open-metadata.org")));
updateAndCheckEntity(create(test), OK, authHeaders("test@open-metadata.org"),
UpdateType.NO_CHANGE, null));
TestUtils.assertResponse(exception, FORBIDDEN, "Principal: CatalogPrincipal{name='test'} " +
"is not admin");
}
@ -281,43 +296,12 @@ public class PipelineServiceResourceTest extends EntityResourceTest<PipelineServ
"invalidName"));
}
public static PipelineService createAndCheckService(CreatePipelineService create,
Map<String, String> authHeaders) throws HttpResponseException {
String updatedBy = TestUtils.getPrincipal(authHeaders);
PipelineService service = createService(create, authHeaders);
assertEquals(0.1, service.getVersion());
validateService(service, create.getName(), create.getDescription(), create.getIngestionSchedule(), updatedBy);
// GET the newly created service and validate
PipelineService getService = getService(service.getId(), authHeaders);
validateService(getService, create.getName(), create.getDescription(), create.getIngestionSchedule(), updatedBy);
// GET the newly created service by name and validate
getService = getServiceByName(service.getName(), null, authHeaders);
validateService(getService, create.getName(), create.getDescription(), create.getIngestionSchedule(), updatedBy);
return service;
}
public static PipelineService createService(CreatePipelineService create,
Map<String, String> authHeaders) throws HttpResponseException {
return TestUtils.post(CatalogApplicationTest.getResource("services/pipelineServices"),
create, PipelineService.class, authHeaders);
}
private static void validateService(PipelineService service, String expectedName, String expectedDescription,
Schedule expectedIngestion, String expectedUpdatedBy) {
assertNotNull(service.getId());
assertNotNull(service.getHref());
assertEquals(expectedName, service.getName());
assertEquals(expectedDescription, service.getDescription());
assertEquals(expectedUpdatedBy, service.getUpdatedBy());
if (expectedIngestion != null) {
assertEquals(expectedIngestion.getStartDate(), service.getIngestionSchedule().getStartDate());
assertEquals(expectedIngestion.getRepeatFrequency(), service.getIngestionSchedule().getRepeatFrequency());
}
}
public static PipelineService getService(UUID id, Map<String, String> authHeaders) throws HttpResponseException {
return getService(id, null, authHeaders);
}
@ -397,31 +381,4 @@ public class PipelineServiceResourceTest extends EntityResourceTest<PipelineServ
.withPipelineUrl(PIPELINE_SERVICE_URL)
.withIngestionSchedule(new Schedule().withStartDate(new Date()).withRepeatFrequency("P1D"));
}
public static void updateAndCheckService(CreatePipelineService update, Status status,
Map<String, String> authHeaders) throws HttpResponseException {
String updatedBy = TestUtils.getPrincipal(authHeaders);
PipelineService service = updatePipelineService(update, status, authHeaders);
validateService(service, service.getName(), update.getDescription(), update.getIngestionSchedule(), updatedBy);
// GET the newly updated pipeline and validate
PipelineService getService = getService(service.getId(), authHeaders);
validateService(getService, service.getName(), update.getDescription(), update.getIngestionSchedule(), updatedBy);
// GET the newly updated pipeline by name and validate
getService = getServiceByName(service.getName(), null, authHeaders);
validateService(getService, service.getName(), update.getDescription(), update.getIngestionSchedule(), updatedBy);
}
public static PipelineService updatePipelineService(CreatePipelineService updated,
Status status, Map<String, String> authHeaders)
throws HttpResponseException {
return TestUtils.put(CatalogApplicationTest.getResource("services/pipelineServices"), updated,
PipelineService.class, status, authHeaders);
}
private static void validatePipelineServiceConfig(PipelineService actualService, List<String> expectedBrokers,
URI expectedUrl) {
assertEquals(actualService.getPipelineUrl(), expectedUrl);
}
}

View File

@ -15,9 +15,9 @@ import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.StorageServiceType;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.TestUtils;
import org.openmetadata.catalog.util.TestUtils.UpdateType;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
@ -29,10 +29,10 @@ import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import static org.openmetadata.catalog.util.TestUtils.authHeaders;
import static org.openmetadata.catalog.util.TestUtils.getPrincipal;
public class StorageServiceResourceTest extends EntityResourceTest<StorageService> {
public StorageServiceResourceTest() {
@ -69,11 +69,11 @@ public class StorageServiceResourceTest extends EntityResourceTest<StorageServic
}
@Test
public void post_validService_as_admin_200_ok(TestInfo test) throws HttpResponseException {
public void post_validService_as_admin_200_ok(TestInfo test) throws IOException {
// Create storage service with different optional fields
Map<String, String> authHeaders = adminAuthHeaders();
createAndCheckService(create(test, 1).withDescription(null), authHeaders);
createAndCheckService(create(test, 2).withDescription("description"), authHeaders);
createAndCheckEntity(create(test, 1).withDescription(null), authHeaders);
createAndCheckEntity(create(test, 2).withDescription("description"), authHeaders);
}
@Test
@ -82,33 +82,27 @@ public class StorageServiceResourceTest extends EntityResourceTest<StorageServic
Map<String, String> authHeaders = authHeaders("test@open-metadata.org");
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
createAndCheckService(create(test, 1).withDescription(null), authHeaders));
createAndCheckEntity(create(test, 1).withDescription(null), authHeaders));
TestUtils.assertResponse(exception, FORBIDDEN,
"Principal: CatalogPrincipal{name='test'} is not admin");
}
@Test
public void put_updateStorageService_as_admin_2xx(TestInfo test) throws HttpResponseException {
StorageService dbService = createAndCheckService(create(test).withDescription(null), adminAuthHeaders());
String id = dbService.getId().toString();
public void put_updateStorageService_as_admin_2xx(TestInfo test) throws IOException {
createAndCheckEntity(create(test).withDescription(null), adminAuthHeaders());
// Update storage description and ingestion service that are null
CreateStorageService update = create(test).withDescription("description1");
updateAndCheckService(update, OK, adminAuthHeaders());
// Update description and ingestion schedule again
update.withDescription("description1");
updateAndCheckService(update, OK, adminAuthHeaders());
// TODO add more tests for different fields
}
@Test
public void put_update_as_non_admin_401(TestInfo test) throws HttpResponseException {
public void put_update_as_non_admin_401(TestInfo test) throws IOException {
Map<String, String> authHeaders = adminAuthHeaders();
StorageService dbService = createAndCheckService(create(test).withDescription(null), authHeaders);
createAndCheckEntity(create(test).withDescription(null), authHeaders);
// Update storage description and ingestion service that are null
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
updateAndCheckService(create(test), OK, authHeaders("test@open-metadata.org")));
updateAndCheckEntity(create(test), OK, authHeaders("test@open-metadata.org"),
UpdateType.NO_CHANGE, null));
TestUtils.assertResponse(exception, FORBIDDEN, "Principal: CatalogPrincipal{name='test'} " +
"is not admin");
}
@ -178,28 +172,6 @@ public class StorageServiceResourceTest extends EntityResourceTest<StorageServic
create, StorageService.class, authHeaders);
}
public static StorageService createAndCheckService(CreateStorageService create,
Map<String, String> authHeaders) throws HttpResponseException {
StorageService service = createService(create, authHeaders);
validateService(service, create.getName(), create.getDescription());
// GET the newly created service and validate
StorageService getService = getService(service.getId(), authHeaders);
validateService(getService, create.getName(), create.getDescription());
// GET the newly created service by name and validate
getService = getServiceByName(service.getName(), null, authHeaders);
validateService(getService, create.getName(), create.getDescription());
return service;
}
private static void validateService(StorageService service, String expectedName, String expectedDescription) {
assertNotNull(service.getId());
assertNotNull(service.getHref());
assertEquals(expectedName, service.getName());
assertEquals(expectedDescription, service.getDescription());
}
public static StorageService getService(UUID id, Map<String, String> authHeaders) throws HttpResponseException {
return getService(id, null, authHeaders);
}
@ -218,27 +190,6 @@ public class StorageServiceResourceTest extends EntityResourceTest<StorageServic
return TestUtils.get(target, StorageService.class, authHeaders);
}
public static StorageService updateStorageService(CreateStorageService updated,
Response.Status status, Map<String, String> authHeaders)
throws HttpResponseException {
return TestUtils.put(CatalogApplicationTest.getResource("services/storageServices"), updated,
StorageService.class, status, authHeaders);
}
public static void updateAndCheckService(CreateStorageService update, Response.Status status,
Map<String, String> authHeaders) throws HttpResponseException {
StorageService service = updateStorageService(update, status, authHeaders);
validateService(service, service.getName(), update.getDescription());
// GET the newly updated storage and validate
StorageService getService = getService(service.getId(), authHeaders);
validateService(getService, service.getName(), update.getDescription());
// GET the newly updated storage by name and validate
getService = getServiceByName(service.getName(), null, authHeaders);
validateService(getService, service.getName(), update.getDescription());
}
private void deleteService(UUID id, String name, Map<String, String> authHeaders) throws HttpResponseException {
TestUtils.delete(CatalogApplicationTest.getResource("services/storageServices/" + id), authHeaders);
@ -260,13 +211,16 @@ public class StorageServiceResourceTest extends EntityResourceTest<StorageServic
}
@Override
public void validateCreatedEntity(StorageService createdEntity, Object request, Map<String, String> authHeaders) throws HttpResponseException {
public void validateCreatedEntity(StorageService service, Object request, Map<String, String> authHeaders) throws HttpResponseException {
CreateStorageService createRequest = (CreateStorageService) request;
validateCommonEntityFields(getEntityInterface(service), createRequest.getDescription(),
getPrincipal(authHeaders), null);
assertEquals(createRequest.getName(), service.getName());
}
@Override
public void validateUpdatedEntity(StorageService updatedEntity, Object request, Map<String, String> authHeaders) throws HttpResponseException {
public void validateUpdatedEntity(StorageService service, Object request, Map<String, String> authHeaders) throws HttpResponseException {
validateCreatedEntity(service, request, authHeaders);
}
@Override
@ -281,6 +235,6 @@ public class StorageServiceResourceTest extends EntityResourceTest<StorageServic
@Override
public void assertFieldChange(String fieldName, Object expected, Object actual) throws IOException {
super.assertCommonFieldChange(fieldName, expected, actual);
}
}