fix(openapiv3): v3 scroll response fix (#10654)

Co-authored-by: Kevin Chun <kevin1chun@gmail.com>
This commit is contained in:
david-leifker 2024-06-07 13:37:49 -05:00 committed by GitHub
parent 54a2d2a23e
commit 1b6763fdf7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1071 additions and 626 deletions

View File

@ -9,6 +9,7 @@ dependencies {
implementation project(':metadata-service:auth-impl')
implementation project(':metadata-service:factories')
implementation project(':metadata-service:openapi-servlet')
implementation project(':metadata-service:openapi-servlet:models')
implementation project(':metadata-models')
implementation externalDependency.springBoot

View File

@ -9,6 +9,7 @@ dependencies {
implementation project(':metadata-service:auth-impl')
implementation project(':metadata-service:factories')
implementation project(':metadata-service:openapi-servlet')
implementation project(':metadata-service:openapi-servlet:models')
implementation project(':metadata-models')
implementation externalDependency.servletApi

View File

@ -6,6 +6,14 @@ dependencies {
implementation project(':entity-registry')
implementation project(':metadata-operation-context')
implementation project(':metadata-auth:auth-api')
implementation project(':metadata-service:auth-impl')
implementation project(':metadata-io')
implementation externalDependency.springWeb
implementation(externalDependency.springDocUI) {
exclude group: 'org.springframework.boot'
}
implementation externalDependency.swaggerAnnotations
implementation externalDependency.jacksonDataBind
implementation externalDependency.httpClient

View File

@ -0,0 +1,641 @@
package io.datahubproject.openapi.controller;
import static com.linkedin.metadata.authorization.ApiOperation.CREATE;
import static com.linkedin.metadata.authorization.ApiOperation.DELETE;
import static com.linkedin.metadata.authorization.ApiOperation.EXISTS;
import static com.linkedin.metadata.authorization.ApiOperation.READ;
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;
import com.datahub.authentication.Actor;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.datahub.authorization.AuthUtil;
import com.datahub.authorization.AuthorizerChain;
import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.ByteString;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.UpdateAspectResult;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.SearchUtil;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.RequestContext;
import io.datahubproject.openapi.exception.UnauthorizedException;
import io.datahubproject.openapi.models.GenericEntity;
import io.datahubproject.openapi.models.GenericEntityScrollResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
public abstract class GenericEntitiesController<
E extends GenericEntity, S extends GenericEntityScrollResult<E>> {
protected static final SearchFlags DEFAULT_SEARCH_FLAGS =
new SearchFlags().setFulltext(false).setSkipAggregates(true).setSkipHighlighting(true);
@Autowired protected EntityRegistry entityRegistry;
@Autowired protected SearchService searchService;
@Autowired protected EntityService<?> entityService;
@Autowired protected AuthorizerChain authorizationChain;
@Autowired protected ObjectMapper objectMapper;
@Qualifier("systemOperationContext")
@Autowired
protected OperationContext systemOperationContext;
/**
* Returns scroll result entities
*
* @param searchEntities the entities to contain in the result
* @param aspectNames the aspect names present
* @param withSystemMetadata whether to include system metadata in the result
* @param scrollId the pagination token
* @return result containing entities/aspects
* @throws URISyntaxException parsing error
*/
protected abstract S buildScrollResult(
@Nonnull OperationContext opContext,
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata,
@Nullable String scrollId)
throws URISyntaxException;
protected abstract List<E> buildEntityList(
@Nonnull OperationContext opContext,
List<Urn> urns,
Set<String> aspectNames,
boolean withSystemMetadata)
throws URISyntaxException;
protected abstract List<E> buildEntityList(
Set<IngestResult> ingestResults, boolean withSystemMetadata);
protected abstract E buildGenericEntity(
@Nonnull String aspectName,
@Nonnull UpdateAspectResult updateAspectResult,
boolean withSystemMetadata);
protected abstract AspectsBatch toMCPBatch(
@Nonnull OperationContext opContext, String entityArrayList, Actor actor)
throws JsonProcessingException, URISyntaxException;
@Tag(name = "Generic Entities", description = "API for interacting with generic entities.")
@GetMapping(value = "/{entityName}", produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Scroll entities")
public ResponseEntity<S> getEntities(
@PathVariable("entityName") String entityName,
@RequestParam(value = "aspectNames", defaultValue = "") Set<String> aspectNames,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "query", defaultValue = "*") String query,
@RequestParam(value = "scrollId", required = false) String scrollId,
@RequestParam(value = "sort", required = false, defaultValue = "urn") String sortField,
@RequestParam(value = "sortOrder", required = false, defaultValue = "ASCENDING")
String sortOrder,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata,
@RequestParam(value = "skipCache", required = false, defaultValue = "false")
Boolean skipCache)
throws URISyntaxException {
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityType(authentication, authorizationChain, READ, entityName)) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("getEntities", entityName),
authorizationChain,
authentication,
true);
// TODO: support additional and multiple sort params
SortCriterion sortCriterion = SearchUtil.sortBy(sortField, SortOrder.valueOf(sortOrder));
ScrollResult result =
searchService.scrollAcrossEntities(
opContext
.withSearchFlags(flags -> DEFAULT_SEARCH_FLAGS)
.withSearchFlags(flags -> flags.setSkipCache(skipCache)),
List.of(entitySpec.getName()),
query,
null,
sortCriterion,
scrollId,
null,
count);
if (!AuthUtil.isAPIAuthorizedResult(authentication, authorizationChain, result)) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
return ResponseEntity.ok(
buildScrollResult(
opContext,
result.getEntities(),
aspectNames,
withSystemMetadata,
result.getScrollId()));
}
@Tag(name = "Generic Entities")
@GetMapping(
value = "/{entityName}/{entityUrn:urn:li:.+}",
produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<E> getEntity(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@RequestParam(value = "aspectNames", defaultValue = "") Set<String> aspectNames,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata)
throws URISyntaxException {
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, READ, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("getEntity", entityName),
authorizationChain,
authentication,
true);
return ResponseEntity.of(
buildEntityList(opContext, List.of(urn), aspectNames, withSystemMetadata).stream()
.findFirst());
}
@Tag(name = "Generic Entities")
@RequestMapping(
value = "/{entityName}/{entityUrn}",
method = {RequestMethod.HEAD})
@Operation(summary = "Entity exists")
public ResponseEntity<Object> headEntity(
@PathVariable("entityName") String entityName, @PathVariable("entityUrn") String entityUrn) {
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, EXISTS, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + EXISTS + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("headEntity", entityName),
authorizationChain,
authentication,
true);
return exists(opContext, urn, null)
? ResponseEntity.noContent().build()
: ResponseEntity.notFound().build();
}
@Tag(name = "Generic Aspects", description = "API for generic aspects.")
@GetMapping(
value = "/{entityName}/{entityUrn}/{aspectName}",
produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Get an entity's generic aspect.")
public ResponseEntity<Object> getAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata)
throws URISyntaxException {
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, READ, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("getAspect", entityName),
authorizationChain,
authentication,
true);
return ResponseEntity.of(
buildEntityList(opContext, List.of(urn), Set.of(aspectName), withSystemMetadata).stream()
.findFirst()
.flatMap(
e ->
e.getAspects().entrySet().stream()
.filter(
entry ->
entry.getKey().equals(lookupAspectSpec(urn, aspectName).getName()))
.map(Map.Entry::getValue)
.findFirst()));
}
@Tag(name = "Generic Aspects")
@RequestMapping(
value = "/{entityName}/{entityUrn}/{aspectName}",
method = {RequestMethod.HEAD})
@Operation(summary = "Whether an entity aspect exists.")
public ResponseEntity<Object> headAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName) {
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, EXISTS, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + EXISTS + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("headAspect", entityName),
authorizationChain,
authentication,
true);
return exists(opContext, urn, lookupAspectSpec(urn, aspectName).getName())
? ResponseEntity.noContent().build()
: ResponseEntity.notFound().build();
}
@Tag(name = "Generic Entities")
@DeleteMapping(value = "/{entityName}/{entityUrn}")
@Operation(summary = "Delete an entity")
public void deleteEntity(
@PathVariable("entityName") String entityName, @PathVariable("entityUrn") String entityUrn) {
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, DELETE, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + DELETE + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("deleteEntity", entityName),
authorizationChain,
authentication,
true);
entityService.deleteAspect(opContext, entityUrn, entitySpec.getKeyAspectName(), Map.of(), true);
}
@Tag(name = "Generic Entities")
@PostMapping(value = "/{entityName}", produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Create a batch of entities.")
public ResponseEntity<List<E>> createEntity(
@PathVariable("entityName") String entityName,
@RequestParam(value = "async", required = false, defaultValue = "true") Boolean async,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata,
@RequestBody @Nonnull String jsonEntityList)
throws URISyntaxException, JsonProcessingException {
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityType(
authentication, authorizationChain, CREATE, entityName)) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + CREATE + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("createEntity", entityName),
authorizationChain,
authentication,
true);
AspectsBatch batch = toMCPBatch(opContext, jsonEntityList, authentication.getActor());
Set<IngestResult> results = entityService.ingestProposal(opContext, batch, async);
if (!async) {
return ResponseEntity.ok(buildEntityList(results, withSystemMetadata));
} else {
return ResponseEntity.accepted().body(buildEntityList(results, withSystemMetadata));
}
}
@Tag(name = "Generic Aspects")
@DeleteMapping(value = "/{entityName}/{entityUrn}/{aspectName}")
@Operation(summary = "Delete an entity aspect.")
public void deleteAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName) {
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, DELETE, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + DELETE + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("deleteAspect", entityName),
authorizationChain,
authentication,
true);
entityService.deleteAspect(
opContext, entityUrn, lookupAspectSpec(urn, aspectName).getName(), Map.of(), true);
}
@Tag(name = "Generic Aspects")
@PostMapping(
value = "/{entityName}/{entityUrn}/{aspectName}",
produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Create an entity aspect.")
public ResponseEntity<E> createAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata,
@RequestParam(value = "createIfNotExists", required = false, defaultValue = "false")
Boolean createIfNotExists,
@RequestBody @Nonnull String jsonAspect)
throws URISyntaxException {
Urn urn = UrnUtils.getUrn(entityUrn);
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, CREATE, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + CREATE + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("createAspect", entityName),
authorizationChain,
authentication,
true);
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName);
ChangeMCP upsert =
toUpsertItem(
opContext.getRetrieverContext().get().getAspectRetriever(),
urn,
aspectSpec,
createIfNotExists,
jsonAspect,
authentication.getActor());
List<UpdateAspectResult> results =
entityService.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(upsert))
.build(),
true,
true);
return ResponseEntity.of(
results.stream()
.findFirst()
.map(result -> buildGenericEntity(aspectName, result, withSystemMetadata)));
}
@Tag(name = "Generic Aspects")
@PatchMapping(
value = "/{entityName}/{entityUrn}/{aspectName}",
consumes = "application/json-patch+json",
produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Patch an entity aspect. (Experimental)")
public ResponseEntity<E> patchAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata,
@RequestBody @Nonnull GenericJsonPatch patch)
throws URISyntaxException,
NoSuchMethodException,
InvocationTargetException,
InstantiationException,
IllegalAccessException {
Urn urn = UrnUtils.getUrn(entityUrn);
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, UPDATE, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + UPDATE + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("patchAspect", entityName),
authorizationChain,
authentication,
true);
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName);
RecordTemplate currentValue = entityService.getAspect(opContext, urn, aspectSpec.getName(), 0);
GenericPatchTemplate<? extends RecordTemplate> genericPatchTemplate =
GenericPatchTemplate.builder()
.genericJsonPatch(patch)
.templateType(aspectSpec.getDataTemplateClass())
.templateDefault(
aspectSpec.getDataTemplateClass().getDeclaredConstructor().newInstance())
.build();
ChangeMCP upsert =
toUpsertItem(
opContext.getRetrieverContext().get().getAspectRetriever(),
UrnUtils.getUrn(entityUrn),
aspectSpec,
currentValue,
genericPatchTemplate,
authentication.getActor());
List<UpdateAspectResult> results =
entityService.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(upsert))
.build(),
true,
true);
return ResponseEntity.of(
results.stream()
.findFirst()
.map(result -> buildGenericEntity(aspectSpec.getName(), result, withSystemMetadata)));
}
protected Boolean exists(@Nonnull OperationContext opContext, Urn urn, @Nullable String aspect) {
return aspect == null
? entityService.exists(opContext, urn, true)
: entityService.exists(opContext, urn, aspect, true);
}
protected Set<AspectSpec> resolveAspectNames(Set<Urn> urns, Set<String> requestedAspectNames) {
if (requestedAspectNames.isEmpty()) {
return urns.stream()
.flatMap(u -> entityRegistry.getEntitySpec(u.getEntityType()).getAspectSpecs().stream())
.collect(Collectors.toSet());
} else {
// ensure key is always present
return Stream.concat(
urns.stream()
.flatMap(
urn ->
requestedAspectNames.stream()
.map(aspectName -> lookupAspectSpec(urn, aspectName))),
urns.stream()
.map(u -> entityRegistry.getEntitySpec(u.getEntityType()).getKeyAspectSpec()))
.collect(Collectors.toSet());
}
}
protected Map<String, Pair<RecordTemplate, SystemMetadata>> toAspectMap(
Urn urn, List<EnvelopedAspect> aspects, boolean withSystemMetadata) {
return aspects.stream()
.map(
a ->
Map.entry(
a.getName(),
Pair.of(
toRecordTemplate(lookupAspectSpec(urn, a.getName()), a),
withSystemMetadata ? a.getSystemMetadata() : null)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
protected AspectSpec lookupAspectSpec(Urn urn, String aspectName) {
return lookupAspectSpec(entityRegistry.getEntitySpec(urn.getEntityType()), aspectName);
}
protected RecordTemplate toRecordTemplate(
AspectSpec aspectSpec, EnvelopedAspect envelopedAspect) {
return RecordUtils.toRecordTemplate(
aspectSpec.getDataTemplateClass(), envelopedAspect.getValue().data());
}
protected ChangeMCP toUpsertItem(
@Nonnull AspectRetriever aspectRetriever,
Urn entityUrn,
AspectSpec aspectSpec,
Boolean createIfNotExists,
String jsonAspect,
Actor actor)
throws URISyntaxException {
return ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(aspectSpec.getName())
.changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT)
.auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr()))
.recordTemplate(
GenericRecordUtils.deserializeAspect(
ByteString.copyString(jsonAspect, StandardCharsets.UTF_8),
GenericRecordUtils.JSON,
aspectSpec))
.build(aspectRetriever);
}
protected ChangeMCP toUpsertItem(
@Nonnull AspectRetriever aspectRetriever,
@Nonnull Urn urn,
@Nonnull AspectSpec aspectSpec,
@Nullable RecordTemplate currentValue,
@Nonnull GenericPatchTemplate<? extends RecordTemplate> genericPatchTemplate,
@Nonnull Actor actor) {
return ChangeItemImpl.fromPatch(
urn,
aspectSpec,
currentValue,
genericPatchTemplate,
AuditStampUtils.createAuditStamp(actor.toUrnStr()),
aspectRetriever);
}
/**
* Case-insensitive fallback
*
* @return
*/
protected static AspectSpec lookupAspectSpec(EntitySpec entitySpec, String aspectName) {
return entitySpec.getAspectSpec(aspectName) != null
? entitySpec.getAspectSpec(aspectName)
: entitySpec.getAspectSpecs().stream()
.filter(aspec -> aspec.getName().toLowerCase().equals(aspectName))
.findFirst()
.get();
}
}

View File

@ -0,0 +1,7 @@
package io.datahubproject.openapi.models;
import java.util.Map;
public interface GenericEntity {
Map<String, Object> getAspects();
}

View File

@ -0,0 +1,3 @@
package io.datahubproject.openapi.models;
public interface GenericEntityScrollResult<T extends GenericEntity> {}

View File

@ -1,4 +1,4 @@
package io.datahubproject.openapi.v2.models;
package io.datahubproject.openapi.models;
import java.util.List;
import lombok.Builder;

View File

@ -16,5 +16,5 @@ import lombok.Value;
public class BatchGetUrnResponse implements Serializable {
@JsonProperty("entities")
@Schema(description = "List of entity responses")
List<GenericEntity> entities;
List<GenericEntityV2> entities;
}

View File

@ -0,0 +1,15 @@
package io.datahubproject.openapi.v2.models;
import io.datahubproject.openapi.models.GenericEntity;
import io.datahubproject.openapi.models.GenericEntityScrollResult;
import java.util.List;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class GenericEntityScrollResultV2<T extends GenericEntity>
implements GenericEntityScrollResult<T> {
private String scrollId;
private List<T> results;
}

View File

@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import io.datahubproject.openapi.models.GenericEntity;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -23,7 +24,7 @@ import lombok.NoArgsConstructor;
@JsonInclude(JsonInclude.Include.NON_NULL)
@NoArgsConstructor(force = true, access = AccessLevel.PRIVATE)
@AllArgsConstructor
public class GenericEntity {
public class GenericEntityV2 implements GenericEntity {
@JsonProperty("urn")
@Schema(description = "Urn of the entity")
private String urn;
@ -32,9 +33,9 @@ public class GenericEntity {
@Schema(description = "Map of aspect name to aspect")
private Map<String, Object> aspects;
public static class GenericEntityBuilder {
public static class GenericEntityV2Builder {
public GenericEntity build(
public GenericEntityV2 build(
ObjectMapper objectMapper, Map<String, Pair<RecordTemplate, SystemMetadata>> aspects) {
Map<String, Object> jsonObjectMap =
aspects.entrySet().stream()
@ -63,7 +64,7 @@ public class GenericEntity {
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new GenericEntity(urn, jsonObjectMap);
return new GenericEntityV2(urn, jsonObjectMap);
}
}
}

View File

@ -0,0 +1,15 @@
package io.datahubproject.openapi.v3.models;
import io.datahubproject.openapi.models.GenericEntity;
import io.datahubproject.openapi.models.GenericEntityScrollResult;
import java.util.List;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class GenericEntityScrollResultV3<T extends GenericEntity>
implements GenericEntityScrollResult<T> {
private String scrollId;
private List<T> entities;
}

View File

@ -0,0 +1,77 @@
package io.datahubproject.openapi.v3.models;
import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import io.datahubproject.openapi.models.GenericEntity;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
@EqualsAndHashCode(callSuper = true)
@Data
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
@AllArgsConstructor
public class GenericEntityV3 extends LinkedHashMap<String, Object> implements GenericEntity {
public GenericEntityV3(Map<? extends String, ?> m) {
super(m);
}
@Override
public Map<String, Object> getAspects() {
return this;
}
public static class GenericEntityV3Builder {
public GenericEntityV3 build(
ObjectMapper objectMapper,
@Nonnull Urn urn,
Map<String, Pair<RecordTemplate, SystemMetadata>> aspects) {
Map<String, Object> jsonObjectMap =
aspects.entrySet().stream()
.map(
e -> {
try {
Map<String, Object> valueMap =
Map.of(
"value",
objectMapper.readTree(
RecordUtils.toJsonString(e.getValue().getFirst())
.getBytes(StandardCharsets.UTF_8)));
if (e.getValue().getSecond() != null) {
return Map.entry(
e.getKey(),
Map.of(
"systemMetadata", e.getValue().getSecond(),
"value", valueMap.get("value")));
} else {
return Map.entry(e.getKey(), Map.of("value", valueMap.get("value")));
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
GenericEntityV3 genericEntityV3 = new GenericEntityV3();
genericEntityV3.put("urn", urn.toString());
genericEntityV3.putAll(jsonObjectMap);
return genericEntityV3;
}
}
}

View File

@ -1,63 +1,42 @@
package io.datahubproject.openapi.v2.controller;
import static com.linkedin.metadata.authorization.ApiOperation.CREATE;
import static com.linkedin.metadata.authorization.ApiOperation.DELETE;
import static com.linkedin.metadata.authorization.ApiOperation.EXISTS;
import static com.linkedin.metadata.authorization.ApiOperation.READ;
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;
import com.datahub.authentication.Actor;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.datahub.authorization.AuthUtil;
import com.datahub.authorization.AuthorizerChain;
import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.ByteString;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate;
import com.linkedin.metadata.entity.EntityApiUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.UpdateAspectResult;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.SearchUtil;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.RequestContext;
import io.datahubproject.openapi.controller.GenericEntitiesController;
import io.datahubproject.openapi.exception.UnauthorizedException;
import io.datahubproject.openapi.v2.models.BatchGetUrnRequest;
import io.datahubproject.openapi.v2.models.BatchGetUrnResponse;
import io.datahubproject.openapi.v2.models.GenericEntity;
import io.datahubproject.openapi.v2.models.GenericScrollResult;
import io.datahubproject.openapi.v2.models.GenericEntityScrollResultV2;
import io.datahubproject.openapi.v2.models.GenericEntityV2;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -69,100 +48,38 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
@RequestMapping("/v2/entity")
@Slf4j
public class EntityController {
private static final SearchFlags DEFAULT_SEARCH_FLAGS =
new SearchFlags().setFulltext(false).setSkipAggregates(true).setSkipHighlighting(true);
@Autowired private EntityRegistry entityRegistry;
@Autowired private SearchService searchService;
@Autowired private EntityService<?> entityService;
@Autowired private AuthorizerChain authorizationChain;
@Autowired private ObjectMapper objectMapper;
public class EntityController
extends GenericEntitiesController<
GenericEntityV2, GenericEntityScrollResultV2<GenericEntityV2>> {
@Qualifier("systemOperationContext")
@Autowired
private OperationContext systemOperationContext;
@Tag(name = "Generic Entities", description = "API for interacting with generic entities.")
@GetMapping(value = "/{entityName}", produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Scroll entities")
public ResponseEntity<GenericScrollResult<GenericEntity>> getEntities(
@PathVariable("entityName") String entityName,
@RequestParam(value = "aspectNames", defaultValue = "") Set<String> aspectNames,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "query", defaultValue = "*") String query,
@RequestParam(value = "scrollId", required = false) String scrollId,
@RequestParam(value = "sort", required = false, defaultValue = "urn") String sortField,
@RequestParam(value = "sortOrder", required = false, defaultValue = "ASCENDING")
String sortOrder,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata)
@Override
public GenericEntityScrollResultV2<GenericEntityV2> buildScrollResult(
@Nonnull OperationContext opContext,
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata,
@Nullable String scrollId)
throws URISyntaxException {
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityType(authentication, authorizationChain, READ, entityName)) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("getEntities", entityName),
authorizationChain,
authentication,
true);
// TODO: support additional and multiple sort params
SortCriterion sortCriterion = SearchUtil.sortBy(sortField, SortOrder.valueOf(sortOrder));
ScrollResult result =
searchService.scrollAcrossEntities(
opContext.withSearchFlags(flags -> DEFAULT_SEARCH_FLAGS),
List.of(entitySpec.getName()),
query,
null,
sortCriterion,
scrollId,
null,
count);
if (!AuthUtil.isAPIAuthorizedResult(authentication, authorizationChain, result)) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
return ResponseEntity.ok(
GenericScrollResult.<GenericEntity>builder()
.results(
toRecordTemplates(opContext, result.getEntities(), aspectNames, withSystemMetadata))
.scrollId(result.getScrollId())
.build());
return GenericEntityScrollResultV2.<GenericEntityV2>builder()
.results(toRecordTemplates(opContext, searchEntities, aspectNames, withSystemMetadata))
.scrollId(scrollId)
.build();
}
@Tag(name = "Generic Entities")
@ -192,7 +109,7 @@ public class EntityController {
BatchGetUrnResponse.builder()
.entities(
new ArrayList<>(
toRecordTemplates(
buildEntityList(
opContext,
urns,
new HashSet<>(request.getAspectNames()),
@ -200,506 +117,10 @@ public class EntityController {
.build()));
}
@Tag(name = "Generic Entities")
@GetMapping(
value = "/{entityName}/{entityUrn:urn:li:.+}",
produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<GenericEntity> getEntity(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@RequestParam(value = "aspectNames", defaultValue = "") Set<String> aspectNames,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata)
throws URISyntaxException {
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, READ, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("getEntity", entityName),
authorizationChain,
authentication,
true);
return ResponseEntity.of(
toRecordTemplates(opContext, List.of(urn), aspectNames, withSystemMetadata).stream()
.findFirst());
}
@Tag(name = "Generic Entities")
@RequestMapping(
value = "/{entityName}/{entityUrn}",
method = {RequestMethod.HEAD})
@Operation(summary = "Entity exists")
public ResponseEntity<Object> headEntity(
@PathVariable("entityName") String entityName, @PathVariable("entityUrn") String entityUrn) {
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, EXISTS, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + EXISTS + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("headEntity", entityName),
authorizationChain,
authentication,
true);
return exists(opContext, urn, null)
? ResponseEntity.noContent().build()
: ResponseEntity.notFound().build();
}
@Tag(name = "Generic Aspects", description = "API for generic aspects.")
@GetMapping(
value = "/{entityName}/{entityUrn}/{aspectName}",
produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Get an entity's generic aspect.")
public ResponseEntity<Object> getAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata)
throws URISyntaxException {
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, READ, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("getAspect", entityName),
authorizationChain,
authentication,
true);
return ResponseEntity.of(
toRecordTemplates(opContext, List.of(urn), Set.of(aspectName), withSystemMetadata).stream()
.findFirst()
.flatMap(
e ->
e.getAspects().entrySet().stream()
.filter(
entry ->
entry.getKey().equals(lookupAspectSpec(urn, aspectName).getName()))
.map(Map.Entry::getValue)
.findFirst()));
}
@Tag(name = "Generic Aspects")
@RequestMapping(
value = "/{entityName}/{entityUrn}/{aspectName}",
method = {RequestMethod.HEAD})
@Operation(summary = "Whether an entity aspect exists.")
public ResponseEntity<Object> headAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName) {
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, EXISTS, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + EXISTS + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("headAspect", entityName),
authorizationChain,
authentication,
true);
return exists(opContext, urn, lookupAspectSpec(urn, aspectName).getName())
? ResponseEntity.noContent().build()
: ResponseEntity.notFound().build();
}
@Tag(name = "Generic Entities")
@DeleteMapping(value = "/{entityName}/{entityUrn}")
@Operation(summary = "Delete an entity")
public void deleteEntity(
@PathVariable("entityName") String entityName, @PathVariable("entityUrn") String entityUrn) {
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, DELETE, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + DELETE + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("deleteEntity", entityName),
authorizationChain,
authentication,
true);
entityService.deleteAspect(opContext, entityUrn, entitySpec.getKeyAspectName(), Map.of(), true);
}
@Tag(name = "Generic Entities")
@PostMapping(value = "/{entityName}", produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Create a batch of entities.")
public ResponseEntity<List<GenericEntity>> createEntity(
@PathVariable("entityName") String entityName,
@RequestParam(value = "async", required = false, defaultValue = "true") Boolean async,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata,
@RequestBody @Nonnull String jsonEntityList)
throws URISyntaxException, JsonProcessingException {
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityType(
authentication, authorizationChain, CREATE, entityName)) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + CREATE + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("createEntity", entityName),
authorizationChain,
authentication,
true);
AspectsBatch batch = toMCPBatch(opContext, jsonEntityList, authentication.getActor());
Set<IngestResult> results = entityService.ingestProposal(opContext, batch, async);
if (!async) {
return ResponseEntity.ok(toEntityListResponse(results, withSystemMetadata));
} else {
return ResponseEntity.accepted().body(toEntityListResponse(results, withSystemMetadata));
}
}
@Tag(name = "Generic Aspects")
@DeleteMapping(value = "/{entityName}/{entityUrn}/{aspectName}")
@Operation(summary = "Delete an entity aspect.")
public void deleteAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName) {
Urn urn = UrnUtils.getUrn(entityUrn);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, DELETE, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + DELETE + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("deleteAspect", entityName),
authorizationChain,
authentication,
true);
entityService.deleteAspect(
opContext, entityUrn, lookupAspectSpec(urn, aspectName).getName(), Map.of(), true);
}
@Tag(name = "Generic Aspects")
@PostMapping(
value = "/{entityName}/{entityUrn}/{aspectName}",
produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Create an entity aspect.")
public ResponseEntity<GenericEntity> createAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata,
@RequestParam(value = "createIfNotExists", required = false, defaultValue = "false")
Boolean createIfNotExists,
@RequestBody @Nonnull String jsonAspect)
throws URISyntaxException {
Urn urn = UrnUtils.getUrn(entityUrn);
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, CREATE, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + CREATE + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("createAspect", entityName),
authorizationChain,
authentication,
true);
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName);
ChangeMCP upsert =
toUpsertItem(
opContext.getRetrieverContext().get().getAspectRetriever(),
urn,
aspectSpec,
createIfNotExists,
jsonAspect,
authentication.getActor());
List<UpdateAspectResult> results =
entityService.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(upsert))
.build(),
true,
true);
return ResponseEntity.of(
results.stream()
.findFirst()
.map(
result ->
GenericEntity.builder()
.urn(result.getUrn().toString())
.build(
objectMapper,
Map.of(
aspectName,
Pair.of(
result.getNewValue(),
withSystemMetadata ? result.getNewSystemMetadata() : null)))));
}
@Tag(name = "Generic Aspects")
@PatchMapping(
value = "/{entityName}/{entityUrn}/{aspectName}",
consumes = "application/json-patch+json",
produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Patch an entity aspect. (Experimental)")
public ResponseEntity<GenericEntity> patchAspect(
@PathVariable("entityName") String entityName,
@PathVariable("entityUrn") String entityUrn,
@PathVariable("aspectName") String aspectName,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata,
@RequestBody @Nonnull GenericJsonPatch patch)
throws URISyntaxException,
NoSuchMethodException,
InvocationTargetException,
InstantiationException,
IllegalAccessException {
Urn urn = UrnUtils.getUrn(entityUrn);
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
Authentication authentication = AuthenticationContext.getAuthentication();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
authentication, authorizationChain, UPDATE, List.of(urn))) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + UPDATE + " entities.");
}
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder().buildOpenapi("patchAspect", entityName),
authorizationChain,
authentication,
true);
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName);
RecordTemplate currentValue = entityService.getAspect(opContext, urn, aspectSpec.getName(), 0);
GenericPatchTemplate<? extends RecordTemplate> genericPatchTemplate =
GenericPatchTemplate.builder()
.genericJsonPatch(patch)
.templateType(aspectSpec.getDataTemplateClass())
.templateDefault(
aspectSpec.getDataTemplateClass().getDeclaredConstructor().newInstance())
.build();
ChangeMCP upsert =
toUpsertItem(
opContext.getRetrieverContext().get().getAspectRetriever(),
UrnUtils.getUrn(entityUrn),
aspectSpec,
currentValue,
genericPatchTemplate,
authentication.getActor());
List<UpdateAspectResult> results =
entityService.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(upsert))
.build(),
true,
true);
return ResponseEntity.of(
results.stream()
.findFirst()
.map(
result ->
GenericEntity.builder()
.urn(result.getUrn().toString())
.build(
objectMapper,
Map.of(
aspectSpec.getName(),
Pair.of(
result.getNewValue(),
withSystemMetadata ? result.getNewSystemMetadata() : null)))));
}
private List<GenericEntity> toRecordTemplates(
@Nonnull OperationContext opContext,
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata)
throws URISyntaxException {
return toRecordTemplates(
opContext,
searchEntities.stream().map(SearchEntity::getEntity).collect(Collectors.toList()),
aspectNames,
withSystemMetadata);
}
private Boolean exists(@Nonnull OperationContext opContext, Urn urn, @Nullable String aspect) {
return aspect == null
? entityService.exists(opContext, urn, true)
: entityService.exists(opContext, urn, aspect, true);
}
private List<GenericEntity> toRecordTemplates(
@Nonnull OperationContext opContext,
List<Urn> urns,
Set<String> aspectNames,
boolean withSystemMetadata)
throws URISyntaxException {
if (urns.isEmpty()) {
return List.of();
} else {
Set<Urn> urnsSet = new HashSet<>(urns);
Map<Urn, List<EnvelopedAspect>> aspects =
entityService.getLatestEnvelopedAspects(
opContext,
urnsSet,
resolveAspectNames(urnsSet, aspectNames).stream()
.map(AspectSpec::getName)
.collect(Collectors.toSet()));
return urns.stream()
.map(
u ->
GenericEntity.builder()
.urn(u.toString())
.build(
objectMapper,
toAspectMap(u, aspects.getOrDefault(u, List.of()), withSystemMetadata)))
.collect(Collectors.toList());
}
}
private Set<AspectSpec> resolveAspectNames(Set<Urn> urns, Set<String> requestedAspectNames) {
if (requestedAspectNames.isEmpty()) {
return urns.stream()
.flatMap(u -> entityRegistry.getEntitySpec(u.getEntityType()).getAspectSpecs().stream())
.collect(Collectors.toSet());
} else {
// ensure key is always present
return Stream.concat(
urns.stream()
.flatMap(
urn ->
requestedAspectNames.stream()
.map(aspectName -> lookupAspectSpec(urn, aspectName))),
urns.stream()
.map(u -> entityRegistry.getEntitySpec(u.getEntityType()).getKeyAspectSpec()))
.collect(Collectors.toSet());
}
}
private Map<String, Pair<RecordTemplate, SystemMetadata>> toAspectMap(
Urn urn, List<EnvelopedAspect> aspects, boolean withSystemMetadata) {
return aspects.stream()
.map(
a ->
Map.entry(
a.getName(),
Pair.of(
toRecordTemplate(lookupAspectSpec(urn, a.getName()), a),
withSystemMetadata ? a.getSystemMetadata() : null)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private AspectSpec lookupAspectSpec(Urn urn, String aspectName) {
return lookupAspectSpec(entityRegistry.getEntitySpec(urn.getEntityType()), aspectName);
}
private RecordTemplate toRecordTemplate(AspectSpec aspectSpec, EnvelopedAspect envelopedAspect) {
return RecordUtils.toRecordTemplate(
aspectSpec.getDataTemplateClass(), envelopedAspect.getValue().data());
}
private ChangeMCP toUpsertItem(
@Nonnull AspectRetriever aspectRetriever,
Urn entityUrn,
AspectSpec aspectSpec,
Boolean createIfNotExists,
String jsonAspect,
Actor actor)
throws URISyntaxException {
return ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(aspectSpec.getName())
.changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT)
.auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr()))
.recordTemplate(
GenericRecordUtils.deserializeAspect(
ByteString.copyString(jsonAspect, StandardCharsets.UTF_8),
GenericRecordUtils.JSON,
aspectSpec))
.build(aspectRetriever);
}
private ChangeMCP toUpsertItem(
@Nonnull AspectRetriever aspectRetriever,
@Nonnull Urn urn,
@Nonnull AspectSpec aspectSpec,
@Nullable RecordTemplate currentValue,
@Nonnull GenericPatchTemplate<? extends RecordTemplate> genericPatchTemplate,
@Nonnull Actor actor) {
return ChangeItemImpl.fromPatch(
urn,
aspectSpec,
currentValue,
genericPatchTemplate,
AuditStampUtils.createAuditStamp(actor.toUrnStr()),
aspectRetriever);
}
private AspectsBatch toMCPBatch(
@Override
protected AspectsBatch toMCPBatch(
@Nonnull OperationContext opContext, String entityArrayList, Actor actor)
throws JsonProcessingException, URISyntaxException {
throws JsonProcessingException {
JsonNode entities = objectMapper.readTree(entityArrayList);
List<BatchItem> items = new LinkedList<>();
@ -707,8 +128,14 @@ public class EntityController {
Iterator<JsonNode> entityItr = entities.iterator();
while (entityItr.hasNext()) {
JsonNode entity = entityItr.next();
if (!entity.has("urn")) {
throw new IllegalArgumentException("Missing `urn` field");
}
Urn entityUrn = UrnUtils.getUrn(entity.get("urn").asText());
if (!entity.has("aspects")) {
throw new IllegalArgumentException("Missing `aspects` field");
}
Iterator<Map.Entry<String, JsonNode>> aspectItr = entity.get("aspects").fields();
while (aspectItr.hasNext()) {
Map.Entry<String, JsonNode> aspect = aspectItr.next();
@ -747,9 +174,71 @@ public class EntityController {
.build();
}
public List<GenericEntity> toEntityListResponse(
@Override
protected List<GenericEntityV2> buildEntityList(
@Nonnull OperationContext opContext,
List<Urn> urns,
Set<String> aspectNames,
boolean withSystemMetadata)
throws URISyntaxException {
if (urns.isEmpty()) {
return List.of();
} else {
Set<Urn> urnsSet = new HashSet<>(urns);
Map<Urn, List<EnvelopedAspect>> aspects =
entityService.getLatestEnvelopedAspects(
opContext,
urnsSet,
resolveAspectNames(urnsSet, aspectNames).stream()
.map(AspectSpec::getName)
.collect(Collectors.toSet()));
return urns.stream()
.map(
u ->
GenericEntityV2.builder()
.urn(u.toString())
.build(
objectMapper,
toAspectMap(u, aspects.getOrDefault(u, List.of()), withSystemMetadata)))
.collect(Collectors.toList());
}
}
@Override
protected GenericEntityV2 buildGenericEntity(
@Nonnull String aspectName,
@Nonnull UpdateAspectResult updateAspectResult,
boolean withSystemMetadata) {
return GenericEntityV2.builder()
.urn(updateAspectResult.getUrn().toString())
.build(
objectMapper,
Map.of(
aspectName,
Pair.of(
updateAspectResult.getNewValue(),
withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null)));
}
private List<GenericEntityV2> toRecordTemplates(
@Nonnull OperationContext opContext,
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata)
throws URISyntaxException {
return buildEntityList(
opContext,
searchEntities.stream().map(SearchEntity::getEntity).collect(Collectors.toList()),
aspectNames,
withSystemMetadata);
}
@Override
protected List<GenericEntityV2> buildEntityList(
Set<IngestResult> ingestResults, boolean withSystemMetadata) {
List<GenericEntity> responseList = new LinkedList<>();
List<GenericEntityV2> responseList = new LinkedList<>();
Map<Urn, List<IngestResult>> entityMap =
ingestResults.stream().collect(Collectors.groupingBy(IngestResult::getUrn));
@ -765,24 +254,10 @@ public class EntityController {
withSystemMetadata ? ingest.getRequest().getSystemMetadata() : null)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
responseList.add(
GenericEntity.builder()
GenericEntityV2.builder()
.urn(urnAspects.getKey().toString())
.build(objectMapper, aspectsMap));
}
return responseList;
}
/**
* Case-insensitive fallback
*
* @return
*/
private static AspectSpec lookupAspectSpec(EntitySpec entitySpec, String aspectName) {
return entitySpec.getAspectSpec(aspectName) != null
? entitySpec.getAspectSpec(aspectName)
: entitySpec.getAspectSpecs().stream()
.filter(aspec -> aspec.getName().toLowerCase().equals(aspectName))
.findFirst()
.get();
}
}

View File

@ -18,8 +18,8 @@ import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import com.linkedin.metadata.search.utils.QueryUtils;
import io.datahubproject.openapi.exception.UnauthorizedException;
import io.datahubproject.openapi.models.GenericScrollResult;
import io.datahubproject.openapi.v2.models.GenericRelationship;
import io.datahubproject.openapi.v2.models.GenericScrollResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.Arrays;

View File

@ -19,7 +19,7 @@ import com.linkedin.metadata.utils.SearchUtil;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.RequestContext;
import io.datahubproject.openapi.exception.UnauthorizedException;
import io.datahubproject.openapi.v2.models.GenericScrollResult;
import io.datahubproject.openapi.models.GenericScrollResult;
import io.datahubproject.openapi.v2.models.GenericTimeseriesAspect;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.net.URISyntaxException;

View File

@ -1,5 +1,43 @@
package io.datahubproject.openapi.v3.controller;
import com.datahub.authentication.Actor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.ByteString;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.entity.EntityApiUtils;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.UpdateAspectResult;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.openapi.controller.GenericEntitiesController;
import io.datahubproject.openapi.v3.models.GenericEntityScrollResultV3;
import io.datahubproject.openapi.v3.models.GenericEntityV3;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
@ -9,4 +47,167 @@ import org.springframework.web.bind.annotation.RestController;
@RequiredArgsConstructor
@RequestMapping("/v3/entity")
@Slf4j
public class EntityController extends io.datahubproject.openapi.v2.controller.EntityController {}
public class EntityController
extends GenericEntitiesController<
GenericEntityV3, GenericEntityScrollResultV3<GenericEntityV3>> {
@Override
public GenericEntityScrollResultV3<GenericEntityV3> buildScrollResult(
@Nonnull OperationContext opContext,
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata,
@Nullable String scrollId)
throws URISyntaxException {
return GenericEntityScrollResultV3.<GenericEntityV3>builder()
.entities(toRecordTemplates(opContext, searchEntities, aspectNames, withSystemMetadata))
.scrollId(scrollId)
.build();
}
@Override
protected List<GenericEntityV3> buildEntityList(
@Nonnull OperationContext opContext,
List<Urn> urns,
Set<String> aspectNames,
boolean withSystemMetadata)
throws URISyntaxException {
if (urns.isEmpty()) {
return List.of();
} else {
Set<Urn> urnsSet = new HashSet<>(urns);
Map<Urn, List<EnvelopedAspect>> aspects =
entityService.getLatestEnvelopedAspects(
opContext,
urnsSet,
resolveAspectNames(urnsSet, aspectNames).stream()
.map(AspectSpec::getName)
.collect(Collectors.toSet()));
return urns.stream()
.map(
u ->
GenericEntityV3.builder()
.build(
objectMapper,
u,
toAspectMap(u, aspects.getOrDefault(u, List.of()), withSystemMetadata)))
.collect(Collectors.toList());
}
}
@Override
protected List<GenericEntityV3> buildEntityList(
Set<IngestResult> ingestResults, boolean withSystemMetadata) {
List<GenericEntityV3> responseList = new LinkedList<>();
Map<Urn, List<IngestResult>> entityMap =
ingestResults.stream().collect(Collectors.groupingBy(IngestResult::getUrn));
for (Map.Entry<Urn, List<IngestResult>> urnAspects : entityMap.entrySet()) {
Map<String, Pair<RecordTemplate, SystemMetadata>> aspectsMap =
urnAspects.getValue().stream()
.map(
ingest ->
Map.entry(
ingest.getRequest().getAspectName(),
Pair.of(
ingest.getRequest().getRecordTemplate(),
withSystemMetadata ? ingest.getRequest().getSystemMetadata() : null)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
responseList.add(
GenericEntityV3.builder().build(objectMapper, urnAspects.getKey(), aspectsMap));
}
return responseList;
}
@Override
protected GenericEntityV3 buildGenericEntity(
@Nonnull String aspectName,
@Nonnull UpdateAspectResult updateAspectResult,
boolean withSystemMetadata) {
return GenericEntityV3.builder()
.build(
objectMapper,
updateAspectResult.getUrn(),
Map.of(
aspectName,
Pair.of(
updateAspectResult.getNewValue(),
withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null)));
}
private List<GenericEntityV3> toRecordTemplates(
@Nonnull OperationContext opContext,
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata)
throws URISyntaxException {
return buildEntityList(
opContext,
searchEntities.stream().map(SearchEntity::getEntity).collect(Collectors.toList()),
aspectNames,
withSystemMetadata);
}
@Override
protected AspectsBatch toMCPBatch(
@Nonnull OperationContext opContext, String entityArrayList, Actor actor)
throws JsonProcessingException {
JsonNode entities = objectMapper.readTree(entityArrayList);
List<BatchItem> items = new LinkedList<>();
if (entities.isArray()) {
Iterator<JsonNode> entityItr = entities.iterator();
while (entityItr.hasNext()) {
JsonNode entity = entityItr.next();
if (!entity.has("urn")) {
throw new IllegalArgumentException("Missing `urn` field");
}
Urn entityUrn = UrnUtils.getUrn(entity.get("urn").asText());
Iterator<Map.Entry<String, JsonNode>> aspectItr = entity.fields();
while (aspectItr.hasNext()) {
Map.Entry<String, JsonNode> aspect = aspectItr.next();
if ("urn".equals(aspect.getKey())) {
continue;
}
AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey());
if (aspectSpec != null) {
SystemMetadata systemMetadata = null;
if (aspect.getValue().has("systemMetadata")) {
systemMetadata =
EntityApiUtils.parseSystemMetadata(
objectMapper.writeValueAsString(aspect.getValue().get("systemMetadata")));
((ObjectNode) aspect.getValue()).remove("systemMetadata");
}
ChangeItemImpl.ChangeItemImplBuilder builder =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(aspectSpec.getName())
.auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr()))
.systemMetadata(systemMetadata)
.recordTemplate(
GenericRecordUtils.deserializeAspect(
ByteString.copyString(
objectMapper.writeValueAsString(aspect.getValue()),
StandardCharsets.UTF_8),
GenericRecordUtils.JSON,
aspectSpec));
items.add(builder.build(opContext.getRetrieverContext().get().getAspectRetriever()));
}
}
}
}
return AspectsBatchImpl.builder()
.items(items)
.retrieverContext(opContext.getRetrieverContext().get())
.build();
}
}