Fixes #3021 - Restore a soft-deleted container entity along with the children (#3022)

This commit is contained in:
Suresh Srinivas 2022-02-28 18:42:25 -08:00 committed by GitHub
parent c49af971a7
commit 4c256e2845
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 133 additions and 64 deletions

View File

@ -139,6 +139,15 @@ public final class Entity {
return dao.findEntityReferenceByName(fqn); return dao.findEntityReferenceByName(fqn);
} }
public static EntityReference getEntityReferenceByName(@NonNull String entity, @NonNull String fqn, Include include)
throws IOException {
EntityDAO<?> dao = DAO_MAP.get(entity);
if (dao == null) {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entity));
}
return dao.findEntityReferenceByName(fqn, include);
}
public static <T> EntityReference getEntityReference(T entity) { public static <T> EntityReference getEntityReference(T entity) {
String entityType = getEntityTypeFromObject(entity); String entityType = getEntityTypeFromObject(entity);
@ -224,15 +233,24 @@ public final class Entity {
return entityRepository; return entityRepository;
} }
public static void deleteEntity(String updatedBy, String entity, UUID entityId, boolean recursive) public static void deleteEntity(String updatedBy, String entityType, UUID entityId, boolean recursive)
throws IOException, ParseException { throws IOException, ParseException {
EntityRepository<?> dao = ENTITY_REPOSITORY_MAP.get(entity); EntityRepository<?> dao = ENTITY_REPOSITORY_MAP.get(entityType);
if (dao == null) { if (dao == null) {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entity)); throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entityType));
} }
dao.delete(updatedBy, entityId.toString(), recursive); dao.delete(updatedBy, entityId.toString(), recursive);
} }
public static void restoreEntity(String updatedBy, String entityType, UUID entityId)
throws IOException, ParseException {
EntityRepository<?> dao = ENTITY_REPOSITORY_MAP.get(entityType);
if (dao == null) {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entityType));
}
dao.restoreEntity(updatedBy, entityType, entityId);
}
public static <T> EntityRepository<T> getEntityRepositoryForClass(@NonNull Class<T> clazz) { public static <T> EntityRepository<T> getEntityRepositoryForClass(@NonNull Class<T> clazz) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
EntityRepository<T> entityRepository = (EntityRepository<T>) CLASS_ENTITY_REPOSITORY_MAP.get(clazz); EntityRepository<T> entityRepository = (EntityRepository<T>) CLASS_ENTITY_REPOSITORY_MAP.get(clazz);

View File

@ -502,7 +502,7 @@ public interface CollectionDAO {
void softDeleteAll(@Bind("id") String id, @Bind("entity") String entity); void softDeleteAll(@Bind("id") String id, @Bind("entity") String entity);
@SqlUpdate("UPDATE entity_relationship SET deleted = false WHERE toId = :id OR fromId = :id") @SqlUpdate("UPDATE entity_relationship SET deleted = false WHERE toId = :id OR fromId = :id")
void recoverSoftDeleteAll(@Bind("id") String id); int recoverSoftDeleteAll(@Bind("id") String id);
} }
interface FeedDAO { interface FeedDAO {

View File

@ -195,6 +195,14 @@ public interface EntityDAO<T> {
return getEntityReference(findEntityByName(fqn)); return getEntityReference(findEntityByName(fqn));
} }
default EntityReference findEntityReferenceById(UUID id, Include include) throws IOException {
return getEntityReference(findEntityById(id, include));
}
default EntityReference findEntityReferenceByName(String fqn, Include include) throws IOException {
return getEntityReference(findEntityByName(fqn, include));
}
default String findJsonById(String id, Include include) { default String findJsonById(String id, Include include) {
return findById(getTableName(), id, toBoolean(include)); return findById(getTableName(), id, toBoolean(include));
} }

View File

@ -374,16 +374,21 @@ public abstract class EntityRepository<T> {
public final PutResponse<T> createOrUpdate(UriInfo uriInfo, T updated, boolean allowEdits) public final PutResponse<T> createOrUpdate(UriInfo uriInfo, T updated, boolean allowEdits)
throws IOException, ParseException { throws IOException, ParseException {
prepare(updated); prepare(updated);
EntityInterface<T> updatedInterface = getEntityInterface(updated);
// Check if there is any original, deleted or not // Check if there is any original, deleted or not
T original = JsonUtils.readValue(dao.findJsonByFqn(getFullyQualifiedName(updated), Include.ALL), entityClass); T original = JsonUtils.readValue(dao.findJsonByFqn(getFullyQualifiedName(updated), Include.ALL), entityClass);
if (original == null) { if (original == null) {
return new PutResponse<>(Status.CREATED, withHref(uriInfo, createNewEntity(updated)), RestUtil.ENTITY_CREATED); return new PutResponse<>(Status.CREATED, withHref(uriInfo, createNewEntity(updated)), RestUtil.ENTITY_CREATED);
} }
// Get all the fields in the original entity that can be updated during PUT operation // Get all the fields in the original entity that can be updated during PUT operation
setFields(original, putFields); setFields(original, putFields);
// Recover relationships if original was deleted before setFields
recoverDeletedRelationships(original); // If the entity state is soft-deleted, recursively undelete the entity and it's children
EntityInterface<T> origInterface = getEntityInterface(original);
if (origInterface.isDeleted()) {
restoreEntity(updatedInterface.getUpdatedBy(), entityType, origInterface.getId());
}
// Update the attributes and relationships of an entity // Update the attributes and relationships of an entity
EntityUpdater entityUpdater = getUpdater(original, updated, Operation.PUT); EntityUpdater entityUpdater = getUpdater(original, updated, Operation.PUT);
@ -622,13 +627,29 @@ public abstract class EntityRepository<T> {
return RestUtil.getHref(uriInfo, collectionPath, id); return RestUtil.getHref(uriInfo, collectionPath, id);
} }
private void recoverDeletedRelationships(T original) { public void restoreEntity(String updatedBy, String entityType, UUID id) throws IOException, ParseException {
// If original is deleted, we need to recover the relationships before setting the fields // If an entity being restored contains other **deleted** children entities, restore them
// or we won't find the related services List<EntityReference> contains =
EntityInterface<T> originalRef = getEntityInterface(original); daoCollection
if (Boolean.TRUE.equals(originalRef.isDeleted())) { .relationshipDAO()
daoCollection.relationshipDAO().recoverSoftDeleteAll(originalRef.getId().toString()); .findTo(id.toString(), entityType, Relationship.CONTAINS.ordinal(), toBoolean(Include.DELETED));
if (!contains.isEmpty()) {
// Restore all the contained entities
for (EntityReference entityReference : contains) {
LOG.info("Recursively restoring {} {}", entityReference.getType(), entityReference.getId());
Entity.restoreEntity(updatedBy, entityReference.getType(), entityReference.getId());
}
} }
// Restore all the relationships from and to the entity as not deleted
daoCollection.relationshipDAO().recoverSoftDeleteAll(id.toString());
// Finally set entity deleted flag to false
T entity = dao.findEntityById(id, DELETED);
EntityInterface<T> entityInterface = getEntityInterface(entity);
entityInterface.setDeleted(false);
dao.update(entityInterface.getId(), JsonUtils.pojoToJson(entity));
} }
/** Builder method for EntityHandler */ /** Builder method for EntityHandler */
@ -758,7 +779,7 @@ public abstract class EntityRepository<T> {
Relationship.CONTAINS.ordinal(), Relationship.CONTAINS.ordinal(),
// FIXME: containerEntityName should be a property of the entity decorated. // FIXME: containerEntityName should be a property of the entity decorated.
containerEntityType, containerEntityType,
toBoolean(isDeleted)); null);
if (refs.isEmpty()) { if (refs.isEmpty()) {
throw new UnhandledServerException(CatalogExceptionMessage.entityTypeNotFound(containerEntityType)); throw new UnhandledServerException(CatalogExceptionMessage.entityTypeNotFound(containerEntityType));
} else if (refs.size() > 1) { } else if (refs.size() > 1) {
@ -900,6 +921,7 @@ public abstract class EntityRepository<T> {
protected final Operation operation; protected final Operation operation;
protected final ChangeDescription changeDescription = new ChangeDescription(); protected final ChangeDescription changeDescription = new ChangeDescription();
protected boolean majorVersionChange = false; protected boolean majorVersionChange = false;
private boolean entityRestored = false;
public EntityUpdater(T original, T updated, Operation operation) { public EntityUpdater(T original, T updated, Operation operation) {
this.original = getEntityInterface(original); this.original = getEntityInterface(original);
@ -955,6 +977,7 @@ public abstract class EntityRepository<T> {
if (Boolean.TRUE.equals(original.isDeleted())) { if (Boolean.TRUE.equals(original.isDeleted())) {
updated.setDeleted(false); updated.setDeleted(false);
recordChange("deleted", true, false); recordChange("deleted", true, false);
entityRestored = true;
} }
} else { } else {
recordChange("deleted", original.isDeleted(), updated.isDeleted()); recordChange("deleted", original.isDeleted(), updated.isDeleted());
@ -1035,6 +1058,10 @@ public abstract class EntityRepository<T> {
|| !changeDescription.getFieldsDeleted().isEmpty(); || !changeDescription.getFieldsDeleted().isEmpty();
} }
public boolean isEntityRestored() {
return entityRestored;
}
public final <K> boolean recordChange(String field, K orig, K updated) throws JsonProcessingException { public final <K> boolean recordChange(String field, K orig, K updated) throws JsonProcessingException {
return recordChange(field, orig, updated, false, objectMatch); return recordChange(field, orig, updated, false, objectMatch);
} }

View File

@ -256,7 +256,7 @@ public final class EntityUtil {
// TODO: add more validation for field name and array fields // TODO: add more validation for field name and array fields
return Entity.getEntityReferenceByName(entityType, fqn); return Entity.getEntityReferenceByName(entityType, fqn, ALL);
} }
public static UsageDetails getLatestUsage(UsageDAO usageDAO, UUID entityId) { public static UsageDetails getLatestUsage(UsageDAO usageDAO, UUID entityId) {

View File

@ -424,7 +424,7 @@ public abstract class EntityResourceTest<T, K> extends CatalogApplicationTest {
// Get container entity based on create request that has CONTAINS relationship to the entity created with this // Get container entity based on create request that has CONTAINS relationship to the entity created with this
// request has . For table, it is database. For database, it is databaseService. See Relationship.CONTAINS for // request has . For table, it is database. For database, it is databaseService. See Relationship.CONTAINS for
// details. // details.
public EntityReference getContainer(K createRequest) { public EntityReference getContainer() {
return null; return null;
} }
@ -611,11 +611,16 @@ public abstract class EntityResourceTest<T, K> extends CatalogApplicationTest {
/** At the end of test for an entity, delete the parent container to test recursive delete functionality */ /** At the end of test for an entity, delete the parent container to test recursive delete functionality */
private void delete_recursiveTest() throws IOException { private void delete_recursiveTest() throws IOException {
// Finally, delete the container that contains the entities created for this test // Finally, delete the container that contains the entities created for this test
EntityReference container = getContainer(createRequest("deleteRecursive", "", "", null)); EntityReference container = getContainer();
if (container != null) { if (container != null) {
ResultList<T> listBeforeDeletion = listEntities(null, 1000, null, null, ADMIN_AUTH_HEADERS); // List both deleted and non deleted entities
Map<String, String> queryParams = new HashMap<>();
queryParams.put("include", "all");
ResultList<T> listBeforeDeletion = listEntities(queryParams, 1000, null, null, ADMIN_AUTH_HEADERS);
// Delete non-empty container entity and ensure deletion is not allowed // Delete non-empty container entity and ensure deletion is not allowed
EntityResourceTest<?, ?> containerTest = ENTITY_RESOURCE_TEST_MAP.get(container.getType()); EntityResourceTest<Object, Object> containerTest =
(EntityResourceTest<Object, Object>) ENTITY_RESOURCE_TEST_MAP.get(container.getType());
assertResponse( assertResponse(
() -> containerTest.deleteEntity(container.getId(), ADMIN_AUTH_HEADERS), () -> containerTest.deleteEntity(container.getId(), ADMIN_AUTH_HEADERS),
BAD_REQUEST, BAD_REQUEST,
@ -624,12 +629,26 @@ public abstract class EntityResourceTest<T, K> extends CatalogApplicationTest {
// Now delete the container with recursive flag on // Now delete the container with recursive flag on
containerTest.deleteEntity(container.getId(), true, ADMIN_AUTH_HEADERS); containerTest.deleteEntity(container.getId(), true, ADMIN_AUTH_HEADERS);
// Make sure entities contained are deleted and the new list operation returns 0 entities // Make sure entities that belonged to the container are deleted and the new list operation returns less entities
ResultList<T> listAfterDeletion = listEntities(null, 1000, null, null, ADMIN_AUTH_HEADERS); ResultList<T> listAfterDeletion = listEntities(null, 1000, null, null, ADMIN_AUTH_HEADERS);
listAfterDeletion listAfterDeletion
.getData() .getData()
.forEach(e -> assertNotEquals(getEntityInterface(e).getContainer().getId(), container.getId())); .forEach(e -> assertNotEquals(getEntityInterface(e).getContainer().getId(), container.getId()));
assertTrue(listAfterDeletion.getData().size() < listBeforeDeletion.getData().size()); assertTrue(listAfterDeletion.getData().size() < listBeforeDeletion.getData().size());
// Restore the soft-deleted container by PUT operation and make sure it is restored
String containerName = container.getName();
if (containerTest.getContainer() != null) {
// Find container name by removing parentContainer fqn from container fqn
// Example: remove "service" from "service.database" to get "database" container name for table
String parentOfContainer = containerTest.getContainer().getName();
containerName = container.getName().replace(parentOfContainer + ".", "");
}
Object request = containerTest.createRequest(containerName, "", "", null);
containerTest.updateEntity(request, Status.OK, ADMIN_AUTH_HEADERS);
ResultList<T> listAfterRestore = listEntities(null, 1000, null, null, ADMIN_AUTH_HEADERS);
assertEquals(listBeforeDeletion.getData().size(), listAfterRestore.getData().size());
} }
} }

View File

@ -130,13 +130,13 @@ public class ChartResourceTest extends EntityResourceTest<Chart, CreateChart> {
.withDescription(description) .withDescription(description)
.withDisplayName(displayName) .withDisplayName(displayName)
.withOwner(owner) .withOwner(owner)
.withService(SUPERSET_REFERENCE) .withService(getContainer())
.withChartType(ChartType.Area); .withChartType(ChartType.Area);
} }
@Override @Override
public EntityReference getContainer(CreateChart createRequest) { public EntityReference getContainer() {
return createRequest.getService(); return SUPERSET_REFERENCE;
} }
@Override @Override

View File

@ -234,7 +234,7 @@ public class DashboardResourceTest extends EntityResourceTest<Dashboard, CreateD
public CreateDashboard createRequest(String name, String description, String displayName, EntityReference owner) { public CreateDashboard createRequest(String name, String description, String displayName, EntityReference owner) {
return new CreateDashboard() return new CreateDashboard()
.withName(name) .withName(name)
.withService(SUPERSET_REFERENCE) .withService(getContainer())
.withCharts(CHART_REFERENCES) .withCharts(CHART_REFERENCES)
.withDescription(description) .withDescription(description)
.withDisplayName(displayName) .withDisplayName(displayName)
@ -242,8 +242,8 @@ public class DashboardResourceTest extends EntityResourceTest<Dashboard, CreateD
} }
@Override @Override
public EntityReference getContainer(CreateDashboard createRequest) { public EntityReference getContainer() {
return createRequest.getService(); return SUPERSET_REFERENCE;
} }
@Override @Override

View File

@ -161,12 +161,12 @@ public class DatabaseResourceTest extends EntityResourceTest<Database, CreateDat
.withName(name) .withName(name)
.withDescription(description) .withDescription(description)
.withOwner(owner) .withOwner(owner)
.withService(SNOWFLAKE_REFERENCE); .withService(getContainer());
} }
@Override @Override
public EntityReference getContainer(CreateDatabase createRequest) { public EntityReference getContainer() {
return createRequest.getService(); return SNOWFLAKE_REFERENCE;
} }
@Override @Override

View File

@ -1791,7 +1791,7 @@ public class TableResourceTest extends EntityResourceTest<Table, CreateTable> {
new TableConstraint().withConstraintType(ConstraintType.UNIQUE).withColumns(List.of(COLUMNS.get(0).getName())); new TableConstraint().withConstraintType(ConstraintType.UNIQUE).withColumns(List.of(COLUMNS.get(0).getName()));
return new CreateTable() return new CreateTable()
.withName(name) .withName(name)
.withDatabase(DATABASE_REFERENCE) .withDatabase(getContainer())
.withColumns(COLUMNS) .withColumns(COLUMNS)
.withTableConstraints(List.of(constraint)) .withTableConstraints(List.of(constraint))
.withDescription(description) .withDescription(description)
@ -1809,8 +1809,8 @@ public class TableResourceTest extends EntityResourceTest<Table, CreateTable> {
} }
@Override @Override
public EntityReference getContainer(CreateTable createRequest) { public EntityReference getContainer() {
return Entity.getEntityReference(DATABASE); // TODO clean this up return DATABASE_REFERENCE;
} }
@Override @Override

View File

@ -66,11 +66,6 @@ public class GlossaryResourceTest extends EntityResourceTest<Glossary, CreateGlo
.withOwner(owner); .withOwner(owner);
} }
@Override
public EntityReference getContainer(CreateGlossary createRequest) {
return null;
}
@Override @Override
public void validateCreatedEntity( public void validateCreatedEntity(
Glossary createdEntity, CreateGlossary createRequest, Map<String, String> authHeaders) Glossary createdEntity, CreateGlossary createRequest, Map<String, String> authHeaders)

View File

@ -214,11 +214,6 @@ public class GlossaryTermResourceTest extends EntityResourceTest<GlossaryTerm, C
.withReviewers(List.of(USER_OWNER1)); .withReviewers(List.of(USER_OWNER1));
} }
@Override
public EntityReference getContainer(CreateGlossaryTerm createRequest) {
return null;
}
@Override @Override
public void validateCreatedEntity(GlossaryTerm entity, CreateGlossaryTerm request, Map<String, String> authHeaders) public void validateCreatedEntity(GlossaryTerm entity, CreateGlossaryTerm request, Map<String, String> authHeaders)
throws HttpResponseException { throws HttpResponseException {

View File

@ -72,14 +72,14 @@ public class LocationResourceTest extends EntityResourceTest<Location, CreateLoc
public CreateLocation createRequest(String name, String description, String displayName, EntityReference owner) { public CreateLocation createRequest(String name, String description, String displayName, EntityReference owner) {
return new CreateLocation() return new CreateLocation()
.withName(name) .withName(name)
.withService(AWS_STORAGE_SERVICE_REFERENCE) .withService(getContainer())
.withDescription(description) .withDescription(description)
.withOwner(owner); .withOwner(owner);
} }
@Override @Override
public EntityReference getContainer(CreateLocation createRequest) { public EntityReference getContainer() {
return createRequest.getService(); return AWS_STORAGE_SERVICE_REFERENCE;
} }
@Override @Override

View File

@ -129,7 +129,7 @@ public class AirflowPipelineResourceTest extends EntityOperationsResourceTest<Ai
return new CreateAirflowPipeline() return new CreateAirflowPipeline()
.withName(name) .withName(name)
.withPipelineType(PipelineType.METADATA) .withPipelineType(PipelineType.METADATA)
.withService(BIGQUERY_REFERENCE) .withService(getContainer())
.withPipelineConfig(INGESTION_CONFIG) .withPipelineConfig(INGESTION_CONFIG)
.withStartDate("2021-11-21") .withStartDate("2021-11-21")
.withDescription(description) .withDescription(description)
@ -138,8 +138,8 @@ public class AirflowPipelineResourceTest extends EntityOperationsResourceTest<Ai
} }
@Override @Override
public EntityReference getContainer(CreateAirflowPipeline createRequest) { public EntityReference getContainer() {
return createRequest.getService(); return BIGQUERY_REFERENCE;
} }
@Override @Override

View File

@ -101,7 +101,7 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
public CreatePipeline createRequest(String name, String description, String displayName, EntityReference owner) { public CreatePipeline createRequest(String name, String description, String displayName, EntityReference owner) {
return new CreatePipeline() return new CreatePipeline()
.withName(name) .withName(name)
.withService(AIRFLOW_REFERENCE) .withService(getContainer())
.withDescription(description) .withDescription(description)
.withDisplayName(displayName) .withDisplayName(displayName)
.withOwner(owner) .withOwner(owner)
@ -109,8 +109,8 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
} }
@Override @Override
public EntityReference getContainer(CreatePipeline createRequest) { public EntityReference getContainer() {
return createRequest.getService(); return AIRFLOW_REFERENCE;
} }
@Override @Override

View File

@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpResponseException; import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestInfo;
import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.Entity;
@ -51,9 +50,17 @@ import org.openmetadata.catalog.util.TestUtils.UpdateType;
@Slf4j @Slf4j
public class MessagingServiceResourceTest extends EntityResourceTest<MessagingService, CreateMessagingService> { public class MessagingServiceResourceTest extends EntityResourceTest<MessagingService, CreateMessagingService> {
public static List<String> KAFKA_BROKERS; public static List<String> KAFKA_BROKERS = List.of("192.168.1.1:0");
public static URI SCHEMA_REGISTRY_URL; public static URI SCHEMA_REGISTRY_URL;
static {
try {
SCHEMA_REGISTRY_URL = new URI("http://localhost:0");
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
public MessagingServiceResourceTest() { public MessagingServiceResourceTest() {
super( super(
Entity.MESSAGING_SERVICE, Entity.MESSAGING_SERVICE,
@ -69,12 +76,6 @@ public class MessagingServiceResourceTest extends EntityResourceTest<MessagingSe
supportsPatch = false; supportsPatch = false;
} }
@BeforeAll
public static void setup() throws URISyntaxException {
KAFKA_BROKERS = List.of("192.168.1.1:0");
SCHEMA_REGISTRY_URL = new URI("http://localhost:0");
}
@Test @Test
void post_withoutRequiredFields_400_badRequest(TestInfo test) { void post_withoutRequiredFields_400_badRequest(TestInfo test) {
// Create messaging with mandatory serviceType field empty // Create messaging with mandatory serviceType field empty

View File

@ -48,9 +48,16 @@ import org.openmetadata.catalog.util.TestUtils.UpdateType;
@Slf4j @Slf4j
public class PipelineServiceResourceTest extends EntityResourceTest<PipelineService, CreatePipelineService> { public class PipelineServiceResourceTest extends EntityResourceTest<PipelineService, CreatePipelineService> {
public static URI PIPELINE_SERVICE_URL; public static URI PIPELINE_SERVICE_URL;
static {
try {
PIPELINE_SERVICE_URL = new URI("http://localhost:8080");
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
public PipelineServiceResourceTest() { public PipelineServiceResourceTest() {
super( super(
Entity.PIPELINE_SERVICE, Entity.PIPELINE_SERVICE,
@ -70,7 +77,6 @@ public class PipelineServiceResourceTest extends EntityResourceTest<PipelineServ
@Override @Override
public void setup(TestInfo test) throws URISyntaxException, IOException { public void setup(TestInfo test) throws URISyntaxException, IOException {
super.setup(test); super.setup(test);
PIPELINE_SERVICE_URL = new URI("http://localhost:8080");
} }
@Test @Test

View File

@ -258,15 +258,15 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
public CreateTopic createRequest(String name, String description, String displayName, EntityReference owner) { public CreateTopic createRequest(String name, String description, String displayName, EntityReference owner) {
return new CreateTopic() return new CreateTopic()
.withName(name) .withName(name)
.withService(KAFKA_REFERENCE) .withService(getContainer())
.withPartitions(1) .withPartitions(1)
.withDescription(description) .withDescription(description)
.withOwner(owner); .withOwner(owner);
} }
@Override @Override
public EntityReference getContainer(CreateTopic createRequest) { public EntityReference getContainer() {
return createRequest.getService(); return KAFKA_REFERENCE;
} }
@Override @Override