mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-18 14:16:48 +00:00
feat(openapi-v3): support async and createIfNotExists params on aspect (#11609)
This commit is contained in:
parent
62c39cdcd3
commit
09d70bc02c
@ -1357,6 +1357,7 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
|||||||
return IngestResult.builder()
|
return IngestResult.builder()
|
||||||
.urn(item.getUrn())
|
.urn(item.getUrn())
|
||||||
.request(item)
|
.request(item)
|
||||||
|
.result(result)
|
||||||
.publishedMCL(result.getMclFuture() != null)
|
.publishedMCL(result.getMclFuture() != null)
|
||||||
.sqlCommitted(true)
|
.sqlCommitted(true)
|
||||||
.isUpdate(result.getOldValue() != null)
|
.isUpdate(result.getOldValue() != null)
|
||||||
|
@ -170,6 +170,9 @@ public abstract class GenericEntitiesController<
|
|||||||
@Nonnull UpdateAspectResult updateAspectResult,
|
@Nonnull UpdateAspectResult updateAspectResult,
|
||||||
boolean withSystemMetadata);
|
boolean withSystemMetadata);
|
||||||
|
|
||||||
|
protected abstract E buildGenericEntity(
|
||||||
|
@Nonnull String aspectName, @Nonnull IngestResult ingestResult, boolean withSystemMetadata);
|
||||||
|
|
||||||
protected abstract AspectsBatch toMCPBatch(
|
protected abstract AspectsBatch toMCPBatch(
|
||||||
@Nonnull OperationContext opContext, String entityArrayList, Actor actor)
|
@Nonnull OperationContext opContext, String entityArrayList, Actor actor)
|
||||||
throws JsonProcessingException, InvalidUrnException;
|
throws JsonProcessingException, InvalidUrnException;
|
||||||
@ -560,8 +563,11 @@ public abstract class GenericEntitiesController<
|
|||||||
@PathVariable("entityName") String entityName,
|
@PathVariable("entityName") String entityName,
|
||||||
@PathVariable("entityUrn") String entityUrn,
|
@PathVariable("entityUrn") String entityUrn,
|
||||||
@PathVariable("aspectName") String aspectName,
|
@PathVariable("aspectName") String aspectName,
|
||||||
|
@RequestParam(value = "async", required = false, defaultValue = "false") Boolean async,
|
||||||
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
|
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
|
||||||
Boolean withSystemMetadata,
|
Boolean withSystemMetadata,
|
||||||
|
@RequestParam(value = "createIfEntityNotExists", required = false, defaultValue = "false")
|
||||||
|
Boolean createIfEntityNotExists,
|
||||||
@RequestParam(value = "createIfNotExists", required = false, defaultValue = "true")
|
@RequestParam(value = "createIfNotExists", required = false, defaultValue = "true")
|
||||||
Boolean createIfNotExists,
|
Boolean createIfNotExists,
|
||||||
@RequestBody @Nonnull String jsonAspect)
|
@RequestBody @Nonnull String jsonAspect)
|
||||||
@ -591,24 +597,38 @@ public abstract class GenericEntitiesController<
|
|||||||
opContext.getRetrieverContext().get().getAspectRetriever(),
|
opContext.getRetrieverContext().get().getAspectRetriever(),
|
||||||
urn,
|
urn,
|
||||||
aspectSpec,
|
aspectSpec,
|
||||||
|
createIfEntityNotExists,
|
||||||
createIfNotExists,
|
createIfNotExists,
|
||||||
jsonAspect,
|
jsonAspect,
|
||||||
authentication.getActor());
|
authentication.getActor());
|
||||||
|
|
||||||
List<UpdateAspectResult> results =
|
Set<IngestResult> results =
|
||||||
entityService.ingestAspects(
|
entityService.ingestProposal(
|
||||||
opContext,
|
opContext,
|
||||||
AspectsBatchImpl.builder()
|
AspectsBatchImpl.builder()
|
||||||
.retrieverContext(opContext.getRetrieverContext().get())
|
.retrieverContext(opContext.getRetrieverContext().get())
|
||||||
.items(List.of(upsert))
|
.items(List.of(upsert))
|
||||||
.build(),
|
.build(),
|
||||||
true,
|
async);
|
||||||
true);
|
|
||||||
|
|
||||||
return ResponseEntity.of(
|
if (!async) {
|
||||||
results.stream()
|
return ResponseEntity.of(
|
||||||
.findFirst()
|
results.stream()
|
||||||
.map(result -> buildGenericEntity(aspectName, result, withSystemMetadata)));
|
.filter(item -> aspectName.equals(item.getRequest().getAspectName()))
|
||||||
|
.findFirst()
|
||||||
|
.map(
|
||||||
|
result ->
|
||||||
|
buildGenericEntity(aspectName, result.getResult(), withSystemMetadata)));
|
||||||
|
} else {
|
||||||
|
return results.stream()
|
||||||
|
.filter(item -> aspectName.equals(item.getRequest().getAspectName()))
|
||||||
|
.map(
|
||||||
|
result ->
|
||||||
|
ResponseEntity.accepted()
|
||||||
|
.body(buildGenericEntity(aspectName, result, withSystemMetadata)))
|
||||||
|
.findFirst()
|
||||||
|
.orElse(ResponseEntity.accepted().build());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Tag(name = "Generic Aspects")
|
@Tag(name = "Generic Aspects")
|
||||||
@ -789,6 +809,7 @@ public abstract class GenericEntitiesController<
|
|||||||
@Nonnull AspectRetriever aspectRetriever,
|
@Nonnull AspectRetriever aspectRetriever,
|
||||||
Urn entityUrn,
|
Urn entityUrn,
|
||||||
AspectSpec aspectSpec,
|
AspectSpec aspectSpec,
|
||||||
|
Boolean createIfEntityNotExists,
|
||||||
Boolean createIfNotExists,
|
Boolean createIfNotExists,
|
||||||
String jsonAspect,
|
String jsonAspect,
|
||||||
Actor actor)
|
Actor actor)
|
||||||
|
@ -232,6 +232,20 @@ public class EntityController
|
|||||||
withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null)));
|
withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected GenericEntityV2 buildGenericEntity(
|
||||||
|
@Nonnull String aspectName, @Nonnull IngestResult ingestResult, boolean withSystemMetadata) {
|
||||||
|
return GenericEntityV2.builder()
|
||||||
|
.urn(ingestResult.getUrn().toString())
|
||||||
|
.build(
|
||||||
|
objectMapper,
|
||||||
|
Map.of(
|
||||||
|
aspectName,
|
||||||
|
Pair.of(
|
||||||
|
ingestResult.getRequest().getRecordTemplate(),
|
||||||
|
withSystemMetadata ? ingestResult.getRequest().getSystemMetadata() : null)));
|
||||||
|
}
|
||||||
|
|
||||||
private List<GenericEntityV2> toRecordTemplates(
|
private List<GenericEntityV2> toRecordTemplates(
|
||||||
@Nonnull OperationContext opContext,
|
@Nonnull OperationContext opContext,
|
||||||
SearchEntityArray searchEntities,
|
SearchEntityArray searchEntities,
|
||||||
@ -278,14 +292,25 @@ public class EntityController
|
|||||||
@Nonnull AspectRetriever aspectRetriever,
|
@Nonnull AspectRetriever aspectRetriever,
|
||||||
Urn entityUrn,
|
Urn entityUrn,
|
||||||
AspectSpec aspectSpec,
|
AspectSpec aspectSpec,
|
||||||
|
Boolean createIfEntityNotExists,
|
||||||
Boolean createIfNotExists,
|
Boolean createIfNotExists,
|
||||||
String jsonAspect,
|
String jsonAspect,
|
||||||
Actor actor)
|
Actor actor)
|
||||||
throws URISyntaxException {
|
throws URISyntaxException {
|
||||||
|
|
||||||
|
final ChangeType changeType;
|
||||||
|
if (Boolean.TRUE.equals(createIfEntityNotExists)) {
|
||||||
|
changeType = ChangeType.CREATE_ENTITY;
|
||||||
|
} else if (Boolean.TRUE.equals(createIfNotExists)) {
|
||||||
|
changeType = ChangeType.CREATE;
|
||||||
|
} else {
|
||||||
|
changeType = ChangeType.UPSERT;
|
||||||
|
}
|
||||||
|
|
||||||
return ChangeItemImpl.builder()
|
return ChangeItemImpl.builder()
|
||||||
.urn(entityUrn)
|
.urn(entityUrn)
|
||||||
.aspectName(aspectSpec.getName())
|
.aspectName(aspectSpec.getName())
|
||||||
.changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT)
|
.changeType(changeType)
|
||||||
.auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr()))
|
.auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr()))
|
||||||
.recordTemplate(
|
.recordTemplate(
|
||||||
GenericRecordUtils.deserializeAspect(
|
GenericRecordUtils.deserializeAspect(
|
||||||
|
@ -1100,6 +1100,28 @@ public class OpenAPIV3Generator {
|
|||||||
new Operation()
|
new Operation()
|
||||||
.summary(String.format("Create aspect %s on %s ", aspect, upperFirstEntity))
|
.summary(String.format("Create aspect %s on %s ", aspect, upperFirstEntity))
|
||||||
.tags(tags)
|
.tags(tags)
|
||||||
|
.parameters(
|
||||||
|
List.of(
|
||||||
|
new Parameter()
|
||||||
|
.in(NAME_QUERY)
|
||||||
|
.name("async")
|
||||||
|
.description("Use async ingestion for high throughput.")
|
||||||
|
.schema(new Schema().type(TYPE_BOOLEAN)._default(false)),
|
||||||
|
new Parameter()
|
||||||
|
.in(NAME_QUERY)
|
||||||
|
.name(NAME_SYSTEM_METADATA)
|
||||||
|
.description("Include systemMetadata with response.")
|
||||||
|
.schema(new Schema().type(TYPE_BOOLEAN)._default(false)),
|
||||||
|
new Parameter()
|
||||||
|
.in(NAME_QUERY)
|
||||||
|
.name("createIfEntityNotExists")
|
||||||
|
.description("Only create the aspect if the Entity doesn't exist.")
|
||||||
|
.schema(new Schema().type(TYPE_BOOLEAN)._default(false)),
|
||||||
|
new Parameter()
|
||||||
|
.in(NAME_QUERY)
|
||||||
|
.name("createIfNotExists")
|
||||||
|
.description("Only create the aspect if the Aspect doesn't exist.")
|
||||||
|
.schema(new Schema().type(TYPE_BOOLEAN)._default(true))))
|
||||||
.requestBody(requestBody)
|
.requestBody(requestBody)
|
||||||
.responses(new ApiResponses().addApiResponse("201", successPostResponse));
|
.responses(new ApiResponses().addApiResponse("201", successPostResponse));
|
||||||
// Patch Operation
|
// Patch Operation
|
||||||
|
@ -328,6 +328,24 @@ public class EntityController
|
|||||||
.build()));
|
.build()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected GenericEntityV3 buildGenericEntity(
|
||||||
|
@Nonnull String aspectName, @Nonnull IngestResult ingestResult, boolean withSystemMetadata) {
|
||||||
|
return GenericEntityV3.builder()
|
||||||
|
.build(
|
||||||
|
objectMapper,
|
||||||
|
ingestResult.getUrn(),
|
||||||
|
Map.of(
|
||||||
|
aspectName,
|
||||||
|
AspectItem.builder()
|
||||||
|
.aspect(ingestResult.getRequest().getRecordTemplate())
|
||||||
|
.systemMetadata(
|
||||||
|
withSystemMetadata ? ingestResult.getRequest().getSystemMetadata() : null)
|
||||||
|
.auditStamp(
|
||||||
|
withSystemMetadata ? ingestResult.getRequest().getAuditStamp() : null)
|
||||||
|
.build()));
|
||||||
|
}
|
||||||
|
|
||||||
private List<GenericEntityV3> toRecordTemplates(
|
private List<GenericEntityV3> toRecordTemplates(
|
||||||
@Nonnull OperationContext opContext,
|
@Nonnull OperationContext opContext,
|
||||||
SearchEntityArray searchEntities,
|
SearchEntityArray searchEntities,
|
||||||
@ -472,16 +490,27 @@ public class EntityController
|
|||||||
@Nonnull AspectRetriever aspectRetriever,
|
@Nonnull AspectRetriever aspectRetriever,
|
||||||
Urn entityUrn,
|
Urn entityUrn,
|
||||||
AspectSpec aspectSpec,
|
AspectSpec aspectSpec,
|
||||||
|
Boolean createIfEntityNotExists,
|
||||||
Boolean createIfNotExists,
|
Boolean createIfNotExists,
|
||||||
String jsonAspect,
|
String jsonAspect,
|
||||||
Actor actor)
|
Actor actor)
|
||||||
throws JsonProcessingException {
|
throws JsonProcessingException {
|
||||||
JsonNode jsonNode = objectMapper.readTree(jsonAspect);
|
JsonNode jsonNode = objectMapper.readTree(jsonAspect);
|
||||||
String aspectJson = jsonNode.get("value").toString();
|
String aspectJson = jsonNode.get("value").toString();
|
||||||
|
|
||||||
|
final ChangeType changeType;
|
||||||
|
if (Boolean.TRUE.equals(createIfEntityNotExists)) {
|
||||||
|
changeType = ChangeType.CREATE_ENTITY;
|
||||||
|
} else if (Boolean.TRUE.equals(createIfNotExists)) {
|
||||||
|
changeType = ChangeType.CREATE;
|
||||||
|
} else {
|
||||||
|
changeType = ChangeType.UPSERT;
|
||||||
|
}
|
||||||
|
|
||||||
return ChangeItemImpl.builder()
|
return ChangeItemImpl.builder()
|
||||||
.urn(entityUrn)
|
.urn(entityUrn)
|
||||||
.aspectName(aspectSpec.getName())
|
.aspectName(aspectSpec.getName())
|
||||||
.changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT)
|
.changeType(changeType)
|
||||||
.auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr()))
|
.auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr()))
|
||||||
.recordTemplate(
|
.recordTemplate(
|
||||||
GenericRecordUtils.deserializeAspect(
|
GenericRecordUtils.deserializeAspect(
|
||||||
|
@ -2,6 +2,7 @@ package com.linkedin.metadata.entity;
|
|||||||
|
|
||||||
import com.linkedin.common.urn.Urn;
|
import com.linkedin.common.urn.Urn;
|
||||||
import com.linkedin.metadata.aspect.batch.BatchItem;
|
import com.linkedin.metadata.aspect.batch.BatchItem;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
import lombok.Value;
|
import lombok.Value;
|
||||||
|
|
||||||
@ -10,6 +11,7 @@ import lombok.Value;
|
|||||||
public class IngestResult {
|
public class IngestResult {
|
||||||
Urn urn;
|
Urn urn;
|
||||||
BatchItem request;
|
BatchItem request;
|
||||||
|
@Nullable UpdateAspectResult result;
|
||||||
boolean publishedMCL;
|
boolean publishedMCL;
|
||||||
boolean processedMCL;
|
boolean processedMCL;
|
||||||
boolean publishedMCP;
|
boolean publishedMCP;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user