feat(openapi-v3): generic entities scroll (#11564)

This commit is contained in:
david-leifker 2024-10-10 18:16:33 -05:00 committed by GitHub
parent 2205920aaa
commit f43720e4ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 492 additions and 67 deletions

View File

@ -13,6 +13,7 @@ import com.linkedin.metadata.utils.SearchUtil;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import io.datahubproject.metadata.context.OperationContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -213,7 +214,7 @@ public class SearchService {
* @return some entities to search
*/
public List<String> getEntitiesToSearch(
@Nonnull OperationContext opContext, @Nonnull List<String> inputEntities, int size) {
@Nonnull OperationContext opContext, @Nonnull Collection<String> inputEntities, int size) {
List<String> nonEmptyEntities;
List<String> lowercaseEntities =
inputEntities.stream().map(String::toLowerCase).collect(Collectors.toList());
@ -247,7 +248,7 @@ public class SearchService {
@Nonnull
public ScrollResult scrollAcrossEntities(
@Nonnull OperationContext opContext,
@Nonnull List<String> entities,
@Nonnull Collection<String> entities,
@Nonnull String input,
@Nullable Filter postFilters,
List<SortCriterion> sortCriteria,

View File

@ -0,0 +1,15 @@
package io.datahubproject.openapi.v3.models;
import java.util.Set;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
@Data
@Jacksonized
@Builder
public class GenericEntityAspectsBodyV3 {
@Nullable private Set<String> entities;
@Nullable private Set<String> aspects;
}

View File

@ -97,6 +97,7 @@ public abstract class GenericEntitiesController<
* @param aspectNames the aspect names present
* @param withSystemMetadata whether to include system metadata in the result
* @param scrollId the pagination token
* @param expandEmpty whether to expand an empty aspects list to all aspects
* @return result containing entities/aspects
* @throws URISyntaxException parsing error
*/
@ -105,14 +106,16 @@ public abstract class GenericEntitiesController<
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata,
@Nullable String scrollId)
@Nullable String scrollId,
boolean expandEmpty)
throws URISyntaxException;
protected List<E> buildEntityList(
@Nonnull OperationContext opContext,
List<Urn> urns,
Set<String> aspectNames,
boolean withSystemMetadata)
@Nullable Set<String> aspectNames,
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {
LinkedHashMap<Urn, Map<String, Long>> versionMap =
@ -122,7 +125,7 @@ public abstract class GenericEntitiesController<
urn ->
Map.entry(
urn,
aspectNames.stream()
Optional.ofNullable(aspectNames).orElse(Set.of()).stream()
.map(aspectName -> Map.entry(aspectName, 0L))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))
.collect(
@ -133,14 +136,30 @@ public abstract class GenericEntitiesController<
throw new IllegalStateException("Duplicate key");
},
LinkedHashMap::new)),
0L);
return buildEntityVersionedAspectList(opContext, versionMap, withSystemMetadata);
0L,
expandEmpty);
return buildEntityVersionedAspectList(
opContext, urns, versionMap, withSystemMetadata, expandEmpty);
}
/**
* Build a list of entities for an API response
*
* @param opContext the operation context
* @param requestedUrns list of urns requested
* @param fetchUrnAspectVersions the map of urn to aspect name and version to fetch
* @param withSystemMetadata whether to include system metadata in the response entity
* @param expandEmpty whether to expand an empty aspects list to all aspects
* @return entity responses
* @throws URISyntaxException urn parsing error
*/
protected abstract List<E> buildEntityVersionedAspectList(
@Nonnull OperationContext opContext,
LinkedHashMap<Urn, Map<String, Long>> urnAspectVersions,
boolean withSystemMetadata)
Collection<Urn> requestedUrns,
LinkedHashMap<Urn, Map<String, Long>> fetchUrnAspectVersions,
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException;
protected abstract List<E> buildEntityList(
@ -225,13 +244,17 @@ public abstract class GenericEntitiesController<
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
Set<String> mergedAspects =
ImmutableSet.<String>builder().addAll(aspects1).addAll(aspects2).build();
return ResponseEntity.ok(
buildScrollResult(
opContext,
result.getEntities(),
ImmutableSet.<String>builder().addAll(aspects1).addAll(aspects2).build(),
mergedAspects,
withSystemMetadata,
result.getScrollId()));
result.getScrollId(),
true));
}
@Tag(name = "Generic Entities")
@ -269,7 +292,8 @@ public abstract class GenericEntitiesController<
opContext,
List.of(urn),
ImmutableSet.<String>builder().addAll(aspects1).addAll(aspects2).build(),
withSystemMetadata)
withSystemMetadata,
true)
.stream()
.findFirst()
.map(ResponseEntity::ok)
@ -344,13 +368,16 @@ public abstract class GenericEntitiesController<
final List<E> resultList;
if (version == 0) {
resultList = buildEntityList(opContext, List.of(urn), Set.of(aspectName), withSystemMetadata);
resultList =
buildEntityList(opContext, List.of(urn), Set.of(aspectName), withSystemMetadata, true);
} else {
resultList =
buildEntityVersionedAspectList(
opContext,
List.of(urn),
new LinkedHashMap<>(Map.of(urn, Map.of(aspectName, version))),
withSystemMetadata);
withSystemMetadata,
true);
}
return resultList.stream()
@ -395,9 +422,10 @@ public abstract class GenericEntitiesController<
authentication.getActor().toUrnStr() + " is unauthorized to " + EXISTS + " entities.");
}
return exists(opContext, urn, lookupAspectSpec(urn, aspectName).getName(), includeSoftDelete)
? ResponseEntity.noContent().build()
: ResponseEntity.notFound().build();
return lookupAspectSpec(urn, aspectName)
.filter(aspectSpec -> exists(opContext, urn, aspectSpec.getName(), includeSoftDelete))
.map(aspectSpec -> ResponseEntity.noContent().build())
.orElse(ResponseEntity.notFound().build());
}
@Tag(name = "Generic Entities")
@ -443,7 +471,7 @@ public abstract class GenericEntitiesController<
entityService.deleteUrn(opContext, urn);
} else {
aspects.stream()
.map(aspectName -> lookupAspectSpec(urn, aspectName).getName())
.map(aspectName -> lookupAspectSpec(urn, aspectName).get().getName())
.forEach(
aspectName ->
entityService.deleteAspect(opContext, entityUrn, aspectName, Map.of(), true));
@ -515,8 +543,11 @@ public abstract class GenericEntitiesController<
authentication.getActor().toUrnStr() + " is unauthorized to " + DELETE + " entities.");
}
entityService.deleteAspect(
opContext, entityUrn, lookupAspectSpec(urn, aspectName).getName(), Map.of(), true);
lookupAspectSpec(urn, aspectName)
.ifPresent(
aspectSpec ->
entityService.deleteAspect(
opContext, entityUrn, aspectSpec.getName(), Map.of(), true));
}
@Tag(name = "Generic Aspects")
@ -554,7 +585,7 @@ public abstract class GenericEntitiesController<
authentication.getActor().toUrnStr() + " is unauthorized to " + CREATE + " entities.");
}
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName);
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName).get();
ChangeMCP upsert =
toUpsertItem(
opContext.getRetrieverContext().get().getAspectRetriever(),
@ -618,7 +649,7 @@ public abstract class GenericEntitiesController<
authentication.getActor().toUrnStr() + " is unauthorized to " + UPDATE + " entities.");
}
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName);
AspectSpec aspectSpec = lookupAspectSpec(entitySpec, aspectName).get();
RecordTemplate currentValue = entityService.getAspect(opContext, urn, aspectSpec.getName(), 0);
GenericPatchTemplate<? extends RecordTemplate> genericPatchTemplate =
@ -672,15 +703,18 @@ public abstract class GenericEntitiesController<
*
* @param requestedAspectNames requested aspects
* @param <T> map values
* @param expandEmpty whether to expand empty aspect names to all aspect names
* @return updated map
*/
protected <T> LinkedHashMap<Urn, Map<String, T>> resolveAspectNames(
LinkedHashMap<Urn, Map<String, T>> requestedAspectNames, @Nonnull T defaultValue) {
LinkedHashMap<Urn, Map<String, T>> requestedAspectNames,
@Nonnull T defaultValue,
boolean expandEmpty) {
return requestedAspectNames.entrySet().stream()
.map(
entry -> {
final Urn urn = entry.getKey();
if (entry.getValue().isEmpty() || entry.getValue().containsKey("")) {
if (expandEmpty && (entry.getValue().isEmpty() || entry.getValue().containsKey(""))) {
// All aspects specified
Set<String> allNames =
entityRegistry.getEntitySpec(urn.getEntityType()).getAspectSpecs().stream()
@ -694,15 +728,16 @@ public abstract class GenericEntitiesController<
Map.entry(
aspectName, entry.getValue().getOrDefault("", defaultValue)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
} else {
} else if (!entry.getValue().keySet().isEmpty()) {
final Map<String, String> normalizedNames =
entry.getValue().keySet().stream()
.map(
requestAspectName ->
Map.entry(
requestAspectName,
lookupAspectSpec(urn, requestAspectName).getName()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
requestAspectName, lookupAspectSpec(urn, requestAspectName)))
.filter(aspectSpecEntry -> aspectSpecEntry.getValue().isPresent())
.collect(
Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get().getName()));
return Map.entry(
urn,
entry.getValue().entrySet().stream()
@ -712,8 +747,11 @@ public abstract class GenericEntitiesController<
Map.entry(
normalizedNames.get(reqEntry.getKey()), reqEntry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
} else {
return (Map.Entry<Urn, Map<String, T>>) null;
}
})
.filter(Objects::nonNull)
.collect(
Collectors.toMap(
Map.Entry::getKey,
@ -732,12 +770,12 @@ public abstract class GenericEntitiesController<
Map.entry(
a.getName(),
Pair.of(
toRecordTemplate(lookupAspectSpec(urn, a.getName()), a),
toRecordTemplate(lookupAspectSpec(urn, a.getName()).get(), a),
withSystemMetadata ? a.getSystemMetadata() : null)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
protected AspectSpec lookupAspectSpec(Urn urn, String aspectName) {
protected Optional<AspectSpec> lookupAspectSpec(Urn urn, String aspectName) {
return lookupAspectSpec(entityRegistry.getEntitySpec(urn.getEntityType()), aspectName);
}
@ -777,13 +815,16 @@ public abstract class GenericEntitiesController<
*
* @return
*/
protected static AspectSpec lookupAspectSpec(EntitySpec entitySpec, String aspectName) {
protected static Optional<AspectSpec> lookupAspectSpec(EntitySpec entitySpec, String aspectName) {
if (entitySpec == null) {
return Optional.empty();
}
return entitySpec.getAspectSpec(aspectName) != null
? entitySpec.getAspectSpec(aspectName)
? Optional.of(entitySpec.getAspectSpec(aspectName))
: entitySpec.getAspectSpecs().stream()
.filter(aspec -> aspec.getName().toLowerCase().equals(aspectName))
.findFirst()
.get();
.findFirst();
}
protected static Urn validatedUrn(String urn) throws InvalidUrnException {

View File

@ -46,6 +46,7 @@ import jakarta.servlet.http.HttpServletRequest;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
@ -114,7 +115,8 @@ public class EntityController
opContext,
urns,
new HashSet<>(request.getAspectNames()),
request.isWithSystemMetadata())))
request.isWithSystemMetadata(),
true)))
.build()));
}
@ -124,10 +126,12 @@ public class EntityController
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata,
@Nullable String scrollId)
@Nullable String scrollId,
boolean expandEmpty)
throws URISyntaxException {
return GenericEntityScrollResultV2.builder()
.results(toRecordTemplates(opContext, searchEntities, aspectNames, withSystemMetadata))
.results(
toRecordTemplates(opContext, searchEntities, aspectNames, withSystemMetadata, true))
.scrollId(scrollId)
.build();
}
@ -155,7 +159,7 @@ public class EntityController
while (aspectItr.hasNext()) {
Map.Entry<String, JsonNode> aspect = aspectItr.next();
AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey());
AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey()).get();
if (aspectSpec != null) {
ChangeItemImpl.ChangeItemImplBuilder builder =
@ -192,12 +196,14 @@ public class EntityController
@Override
protected List<GenericEntityV2> buildEntityVersionedAspectList(
@Nonnull OperationContext opContext,
Collection<Urn> requestedUrns,
LinkedHashMap<Urn, Map<String, Long>> urnAspectVersions,
boolean withSystemMetadata)
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {
Map<Urn, List<EnvelopedAspect>> aspects =
entityService.getEnvelopedVersionedAspects(
opContext, resolveAspectNames(urnAspectVersions, 0L), true);
opContext, resolveAspectNames(urnAspectVersions, 0L, true), true);
return urnAspectVersions.keySet().stream()
.map(
@ -230,13 +236,15 @@ public class EntityController
@Nonnull OperationContext opContext,
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata)
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {
return buildEntityList(
opContext,
searchEntities.stream().map(SearchEntity::getEntity).collect(Collectors.toList()),
aspectNames,
withSystemMetadata);
withSystemMetadata,
true);
}
@Override

View File

@ -60,6 +60,8 @@ public class OpenAPIV3Generator {
private static final String ENTITY_REQUEST_SUFFIX = "Entity" + REQUEST_SUFFIX;
private static final String ENTITY_RESPONSE_SUFFIX = "Entity" + RESPONSE_SUFFIX;
private static final String NAME_SKIP_CACHE = "skipCache";
private static final String ASPECTS = "Aspects";
private static final String ENTITIES = "Entities";
public static OpenAPI generateOpenApiSpec(EntityRegistry entityRegistry) {
final Set<String> aspectNames = entityRegistry.getAspectSpecs().keySet();
@ -75,10 +77,19 @@ public class OpenAPIV3Generator {
info.setTitle("Entity API");
info.setDescription("This is a service for DataHub Entities.");
info.setVersion("v3");
// Components
final Components components = new Components();
// Cross-entity components
components.addSchemas(
ENTITIES + ENTITY_REQUEST_SUFFIX, buildEntitiesRequestSchema(entityRegistry, aspectNames));
components.addSchemas(
ENTITIES + ENTITY_RESPONSE_SUFFIX, buildEntitySchema(entityRegistry, aspectNames, true));
components.addSchemas(
"Scroll" + ENTITIES + ENTITY_RESPONSE_SUFFIX, buildEntitiesScrollSchema());
// --> Aspect components
components.addSchemas("SortOrder", new Schema()._enum(List.of("ASCENDING", "DESCENDING")));
components.addSchemas("AspectPatch", buildAspectPatchSchema());
components.addSchemas(
"BatchGetRequestBody",
@ -94,6 +105,10 @@ public class OpenAPIV3Generator {
.description("System headers for the operation.")
.nullable(true)))
.nullable(true));
// --> Aspect components
components.addSchemas(
ASPECTS + ASPECT_RESPONSE_SUFFIX, buildAspectsRefResponseSchema(entityRegistry));
entityRegistry
.getAspectSpecs()
.values()
@ -108,6 +123,7 @@ public class OpenAPIV3Generator {
upperAspectName + ASPECT_RESPONSE_SUFFIX,
buildAspectRefResponseSchema(upperAspectName));
});
// --> Entity components
entityRegistry.getEntitySpecs().values().stream()
.filter(e -> aspectNames.contains(e.getKeyAspectName()))
@ -124,18 +140,34 @@ public class OpenAPIV3Generator {
"BatchGet" + entityName + ENTITY_REQUEST_SUFFIX,
buildEntityBatchGetRequestSchema(e, aspectNames));
});
components.addSchemas("SortOrder", new Schema()._enum(List.of("ASCENDING", "DESCENDING")));
// TODO: Correct handling of SystemMetadata and AuditStamp
components.addSchemas(
"SystemMetadata", new Schema().type(TYPE_OBJECT).additionalProperties(true));
components.addSchemas("AuditStamp", new Schema().type(TYPE_OBJECT).additionalProperties(true));
// Parameters
// --> Entity Parameters
entityRegistry.getEntitySpecs().values().stream()
.filter(e -> definitionNames.contains(e.getKeyAspectName()))
.forEach(
e -> {
final String parameterName = toUpperFirst(e.getName()) + "Aspects";
final String parameterName = toUpperFirst(e.getName()) + ASPECTS;
components.addParameters(
parameterName + MODEL_VERSION, buildParameterSchema(e, definitionNames));
});
addExtraParameters(components);
// Path
final Paths paths = new Paths();
// --> Cross-entity Paths
paths.addPathItem("/v3/entity/scroll", buildGenericListEntitiesPath());
// --> Entity Paths
entityRegistry.getEntitySpecs().values().stream()
.filter(e -> definitionNames.contains(e.getName()))
.sorted(Comparator.comparing(EntitySpec::getName))
@ -151,6 +183,8 @@ public class OpenAPIV3Generator {
String.format("/v3/entity/%s/{urn}", e.getName().toLowerCase()),
buildSingleEntityPath(e));
});
// --> Aspect Paths
entityRegistry.getEntitySpecs().values().stream()
.filter(e -> definitionNames.contains(e.getName()))
.sorted(Comparator.comparing(EntitySpec::getName))
@ -168,16 +202,12 @@ public class OpenAPIV3Generator {
buildSingleEntityAspectPath(
e, a.getName(), a.getPegasusSchema().getName())));
});
// TODO: Correct handling of SystemMetadata and AuditStamp
components.addSchemas(
"SystemMetadata", new Schema().type(TYPE_OBJECT).additionalProperties(true));
components.addSchemas("AuditStamp", new Schema().type(TYPE_OBJECT).additionalProperties(true));
return new OpenAPI().openapi("3.0.1").info(info).paths(paths).components(components);
}
private static PathItem buildSingleEntityPath(final EntitySpec entity) {
final String upperFirst = toUpperFirst(entity.getName());
final String aspectParameterName = upperFirst + "Aspects";
final String aspectParameterName = upperFirst + ASPECTS;
final PathItem result = new PathItem();
// Get Operation
@ -280,7 +310,7 @@ public class OpenAPIV3Generator {
private static PathItem buildListEntityPath(final EntitySpec entity) {
final String upperFirst = toUpperFirst(entity.getName());
final String aspectParameterName = upperFirst + "Aspects";
final String aspectParameterName = upperFirst + ASPECTS;
final PathItem result = new PathItem();
final List<Parameter> parameters =
List.of(
@ -327,7 +357,8 @@ public class OpenAPIV3Generator {
.summary(String.format("Scroll/List %s.", upperFirst))
.parameters(parameters)
.tags(List.of(entity.getName() + " Entity"))
.description("Scroll indexed entities. Will not include soft deleted entities.")
.description(
"Scroll indexed entities. Will not include soft deleted entities by default.")
.responses(new ApiResponses().addApiResponse("200", successApiResponse)));
// Post Operation
@ -452,6 +483,74 @@ public class OpenAPIV3Generator {
return result;
}
private static PathItem buildGenericListEntitiesPath() {
final PathItem result = new PathItem();
final List<Parameter> parameters =
List.of(
new Parameter()
.in(NAME_QUERY)
.name(NAME_SYSTEM_METADATA)
.description("Include systemMetadata with response.")
.schema(new Schema().type(TYPE_BOOLEAN)._default(false)),
new Parameter()
.in(NAME_QUERY)
.name(NAME_INCLUDE_SOFT_DELETE)
.description("Include soft-deleted aspects with response.")
.schema(new Schema().type(TYPE_BOOLEAN)._default(false)),
new Parameter()
.in(NAME_QUERY)
.name(NAME_SKIP_CACHE)
.description("Skip cache when listing entities.")
.schema(new Schema().type(TYPE_BOOLEAN)._default(false)),
new Parameter().$ref("#/components/parameters/PaginationCount" + MODEL_VERSION),
new Parameter().$ref("#/components/parameters/ScrollId" + MODEL_VERSION),
new Parameter().$ref("#/components/parameters/SortBy" + MODEL_VERSION),
new Parameter().$ref("#/components/parameters/SortOrder" + MODEL_VERSION),
new Parameter().$ref("#/components/parameters/ScrollQuery" + MODEL_VERSION));
final ApiResponse successApiResponse =
new ApiResponse()
.description("Success")
.content(
new Content()
.addMediaType(
"application/json",
new MediaType()
.schema(
new Schema()
.$ref(
String.format(
"#/components/schemas/Scroll%s%s",
ENTITIES, ENTITY_RESPONSE_SUFFIX)))));
final RequestBody requestBody =
new RequestBody()
.description(
"Scroll entities and aspects. If the `aspects` list is not specified then NO aspects will be returned. If the `aspects` list is emtpy, all aspects will be returned.")
.required(false)
.content(
new Content()
.addMediaType(
"application/json",
new MediaType()
.schema(
new Schema()
.$ref(
String.format(
"#/components/schemas/%s%s",
ENTITIES, ENTITY_REQUEST_SUFFIX)))));
result.setPost(
new Operation()
.summary(String.format("Scroll/List %s.", ENTITIES))
.parameters(parameters)
.tags(List.of("Generic Entities"))
.description("Scroll indexed entities. Will not include soft deleted entities.")
.requestBody(requestBody)
.responses(new ApiResponses().addApiResponse("200", successApiResponse)));
return result;
}
private static void addExtraParameters(final Components components) {
components.addParameters(
"ScrollId" + MODEL_VERSION,
@ -499,7 +598,8 @@ public class OpenAPIV3Generator {
new Parameter()
.in(NAME_QUERY)
.name(NAME_QUERY)
.description("Structured search query.")
.description(
"Structured search query. See Elasticsearch documentation on `query_string` syntax.")
.example("*")
.schema(new Schema().type(TYPE_STRING)._default("*")));
}
@ -528,6 +628,7 @@ public class OpenAPIV3Generator {
.name("aspects")
.explode(true)
.description("Aspects to include.")
.required(false)
.example(aspectNames)
.schema(schema);
}
@ -582,6 +683,43 @@ public class OpenAPIV3Generator {
}
}
/**
* Generate schema for cross-entity scroll/list response
*
* @param entityRegistry entity registry
* @return schema
*/
private static Schema buildAspectsRefResponseSchema(final EntityRegistry entityRegistry) {
final Schema result =
new Schema<>()
.type(TYPE_OBJECT)
.description(ASPECT_DESCRIPTION)
.required(List.of(PROPERTY_VALUE));
entityRegistry
.getAspectSpecs()
.values()
.forEach(
aspect ->
result.addProperty(
PROPERTY_VALUE, new Schema<>().$ref(PATH_DEFINITIONS + aspect.getName())));
result.addProperty(
NAME_SYSTEM_METADATA,
new Schema<>()
.type(TYPE_OBJECT)
.anyOf(List.of(new Schema().$ref(PATH_DEFINITIONS + "SystemMetadata")))
.description("System metadata for the aspect.")
.nullable(true));
result.addProperty(
NAME_AUDIT_STAMP,
new Schema<>()
.type(TYPE_OBJECT)
.anyOf(List.of(new Schema().$ref(PATH_DEFINITIONS + "AuditStamp")))
.description("Audit stamp for the aspect.")
.nullable(true));
return result;
}
private static Schema buildAspectRefResponseSchema(final String aspectName) {
final Schema result =
new Schema<>()
@ -612,7 +750,8 @@ public class OpenAPIV3Generator {
.type(TYPE_OBJECT)
.description(ASPECT_DESCRIPTION)
.required(List.of(PROPERTY_VALUE))
.addProperty(PROPERTY_VALUE, new Schema<>().$ref(PATH_DEFINITIONS + aspectName));
.addProperty(
PROPERTY_VALUE, new Schema<>().$ref(PATH_DEFINITIONS + toUpperFirst(aspectName)));
result.addProperty(
NAME_SYSTEM_METADATA,
new Schema<>()
@ -657,6 +796,111 @@ public class OpenAPIV3Generator {
.properties(properties);
}
/**
* Generate cross-entity schema
*
* @param entityRegistry entity registry
* @param withSystemMetadata include system metadata
* @return schema
*/
private static Schema buildEntitySchema(
final EntityRegistry entityRegistry,
final Set<String> aspectNames,
final boolean withSystemMetadata) {
final Map<String, Schema> properties =
entityRegistry.getAspectSpecs().entrySet().stream()
.filter(a -> aspectNames.contains(a.getValue().getName()))
.collect(
Collectors.toMap(
Map.Entry::getKey,
a ->
buildAspectRef(
a.getValue().getPegasusSchema().getName(), withSystemMetadata)));
properties.put(
PROPERTY_URN, new Schema<>().type(TYPE_STRING).description("Unique id for " + ENTITIES));
return new Schema<>()
.type(TYPE_OBJECT)
.description(ENTITIES + " object.")
.required(List.of(PROPERTY_URN))
.properties(properties);
}
/**
* Generate cross-entity schema
*
* @param entityRegistry entity registry
* @param definitionNames include aspects
* @return schema
*/
private static Schema buildEntitiesRequestSchema(
final EntityRegistry entityRegistry, final Set<String> definitionNames) {
final Set<String> keyAspects = new HashSet<>();
final List<String> entityNames =
entityRegistry.getEntitySpecs().values().stream()
.peek(entitySpec -> keyAspects.add(entitySpec.getKeyAspectName()))
.map(EntitySpec::getName)
.sorted()
.toList();
Schema entitiesSchema =
new Schema().type(TYPE_ARRAY).items(new Schema().type(TYPE_STRING)._enum(entityNames));
final List<String> aspectNames =
entityRegistry.getAspectSpecs().values().stream()
.filter(aspectSpec -> !aspectSpec.isTimeseries())
.map(AspectSpec::getName)
.filter(definitionNames::contains) // Only if aspect is defined
.distinct()
.sorted()
.collect(Collectors.toList());
Schema aspectsSchema =
new Schema().type(TYPE_ARRAY).items(new Schema().type(TYPE_STRING)._enum(aspectNames));
return new Schema<>()
.type(TYPE_OBJECT)
.description(ENTITIES + " request object.")
.example(
Map.of(
"entities", entityNames.stream().filter(n -> !n.startsWith("dataHub")).toList(),
"aspects",
aspectNames.stream()
.filter(n -> !n.startsWith("dataHub") && !keyAspects.contains(n))
.toList()))
.properties(
Map.of(
"entities", entitiesSchema,
"aspects", aspectsSchema));
}
/**
* Generate schema for cross-entity scroll/list response
*
* @return schema
*/
private static Schema buildEntitiesScrollSchema() {
return new Schema<>()
.type(TYPE_OBJECT)
.description("Scroll across (list) " + ENTITIES + " objects.")
.required(List.of("entities"))
.addProperty(
NAME_SCROLL_ID,
new Schema<>().type(TYPE_STRING).description("Scroll id for pagination."))
.addProperty(
"entities",
new Schema<>()
.type(TYPE_ARRAY)
.description(ENTITIES + " object.")
.items(
new Schema<>()
.$ref(
String.format(
"#/components/schemas/%s%s", ENTITIES, ENTITY_RESPONSE_SUFFIX))));
}
private static Schema buildEntityScrollSchema(final EntitySpec entity) {
return new Schema<>()
.type(TYPE_OBJECT)

View File

@ -25,10 +25,15 @@ import com.linkedin.metadata.entity.UpdateAspectResult;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.SearchUtil;
import com.linkedin.mxe.SystemMetadata;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.RequestContext;
@ -37,6 +42,7 @@ import io.datahubproject.openapi.exception.InvalidUrnException;
import io.datahubproject.openapi.exception.UnauthorizedException;
import io.datahubproject.openapi.v3.models.AspectItem;
import io.datahubproject.openapi.v3.models.GenericAspectV3;
import io.datahubproject.openapi.v3.models.GenericEntityAspectsBodyV3;
import io.datahubproject.openapi.v3.models.GenericEntityScrollResultV3;
import io.datahubproject.openapi.v3.models.GenericEntityV3;
import io.swagger.v3.oas.annotations.Hidden;
@ -45,6 +51,9 @@ import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@ -60,6 +69,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@ -109,7 +119,101 @@ public class EntityController
}
return ResponseEntity.of(
Optional.of(buildEntityVersionedAspectList(opContext, requestMap, withSystemMetadata)));
Optional.of(
buildEntityVersionedAspectList(
opContext, requestMap.keySet(), requestMap, withSystemMetadata, true)));
}
@Tag(name = "Generic Entities", description = "API for interacting with generic entities.")
@PostMapping(value = "/scroll", produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Scroll entities")
public ResponseEntity<GenericEntityScrollResultV3> scrollEntities(
HttpServletRequest request,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "query", defaultValue = "*") String query,
@RequestParam(value = "scrollId", required = false) String scrollId,
@RequestParam(value = "sort", required = false, defaultValue = "urn") String sortField,
@RequestParam(value = "sortCriteria", required = false) List<String> sortFields,
@RequestParam(value = "sortOrder", required = false, defaultValue = "ASCENDING")
String sortOrder,
@RequestParam(value = "systemMetadata", required = false, defaultValue = "false")
Boolean withSystemMetadata,
@RequestParam(value = "skipCache", required = false, defaultValue = "false")
Boolean skipCache,
@RequestParam(value = "includeSoftDelete", required = false, defaultValue = "false")
Boolean includeSoftDelete,
@RequestBody @Nonnull GenericEntityAspectsBodyV3 entityAspectsBody)
throws URISyntaxException {
final Collection<String> resolvedEntityNames;
if (entityAspectsBody.getEntities() != null) {
resolvedEntityNames =
entityAspectsBody.getEntities().stream()
.map(entityName -> entityRegistry.getEntitySpec(entityName))
.map(EntitySpec::getName)
.toList();
} else {
resolvedEntityNames =
entityRegistry.getEntitySpecs().values().stream().map(EntitySpec::getName).toList();
}
Authentication authentication = AuthenticationContext.getAuthentication();
OperationContext opContext =
OperationContext.asSession(
systemOperationContext,
RequestContext.builder()
.buildOpenapi(
authentication.getActor().toUrnStr(),
request,
"scrollEntities",
resolvedEntityNames),
authorizationChain,
authentication,
true);
if (!AuthUtil.isAPIAuthorizedEntityType(opContext, READ, resolvedEntityNames)) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
List<SortCriterion> sortCriteria;
if (!CollectionUtils.isEmpty(sortFields)) {
sortCriteria = new ArrayList<>();
sortFields.forEach(
field -> sortCriteria.add(SearchUtil.sortBy(field, SortOrder.valueOf(sortOrder))));
} else {
sortCriteria =
Collections.singletonList(SearchUtil.sortBy(sortField, SortOrder.valueOf(sortOrder)));
}
ScrollResult result =
searchService.scrollAcrossEntities(
opContext
.withSearchFlags(flags -> DEFAULT_SEARCH_FLAGS)
.withSearchFlags(flags -> flags.setSkipCache(skipCache))
.withSearchFlags(flags -> flags.setIncludeSoftDeleted(includeSoftDelete)),
resolvedEntityNames,
query,
null,
sortCriteria,
scrollId,
null,
count);
if (!AuthUtil.isAPIAuthorizedResult(opContext, result)) {
throw new UnauthorizedException(
authentication.getActor().toUrnStr() + " is unauthorized to " + READ + " entities.");
}
return ResponseEntity.ok(
buildScrollResult(
opContext,
result.getEntities(),
entityAspectsBody.getAspects(),
withSystemMetadata,
result.getScrollId(),
entityAspectsBody.getAspects() != null));
}
@Override
@ -118,10 +222,13 @@ public class EntityController
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata,
@Nullable String scrollId)
@Nullable String scrollId,
boolean expandEmpty)
throws URISyntaxException {
return GenericEntityScrollResultV3.builder()
.entities(toRecordTemplates(opContext, searchEntities, aspectNames, withSystemMetadata))
.entities(
toRecordTemplates(
opContext, searchEntities, aspectNames, withSystemMetadata, expandEmpty))
.scrollId(scrollId)
.build();
}
@ -129,15 +236,16 @@ public class EntityController
@Override
protected List<GenericEntityV3> buildEntityVersionedAspectList(
@Nonnull OperationContext opContext,
Collection<Urn> requestedUrns,
LinkedHashMap<Urn, Map<String, Long>> urnAspectVersions,
boolean withSystemMetadata)
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {
if (urnAspectVersions.isEmpty()) {
return List.of();
} else {
if (!urnAspectVersions.isEmpty()) {
Map<Urn, List<EnvelopedAspect>> aspects =
entityService.getEnvelopedVersionedAspects(
opContext, resolveAspectNames(urnAspectVersions, 0L), false);
opContext, resolveAspectNames(urnAspectVersions, 0L, expandEmpty), false);
return urnAspectVersions.keySet().stream()
.filter(urn -> aspects.containsKey(urn) && !aspects.get(urn).isEmpty())
@ -147,7 +255,13 @@ public class EntityController
.build(
objectMapper, u, toAspectItemMap(u, aspects.get(u), withSystemMetadata)))
.collect(Collectors.toList());
} else if (!expandEmpty) {
return requestedUrns.stream()
.map(u -> GenericEntityV3.builder().build(objectMapper, u, Collections.emptyMap()))
.collect(Collectors.toList());
}
return List.of();
}
private Map<String, AspectItem> toAspectItemMap(
@ -158,7 +272,7 @@ public class EntityController
Map.entry(
a.getName(),
AspectItem.builder()
.aspect(toRecordTemplate(lookupAspectSpec(urn, a.getName()), a))
.aspect(toRecordTemplate(lookupAspectSpec(urn, a.getName()).get(), a))
.systemMetadata(withSystemMetadata ? a.getSystemMetadata() : null)
.auditStamp(withSystemMetadata ? a.getCreated() : null)
.build()))
@ -218,13 +332,15 @@ public class EntityController
@Nonnull OperationContext opContext,
SearchEntityArray searchEntities,
Set<String> aspectNames,
boolean withSystemMetadata)
boolean withSystemMetadata,
boolean expandEmpty)
throws URISyntaxException {
return buildEntityList(
opContext,
searchEntities.stream().map(SearchEntity::getEntity).collect(Collectors.toList()),
aspectNames,
withSystemMetadata);
withSystemMetadata,
expandEmpty);
}
private LinkedHashMap<Urn, Map<String, Long>> toEntityVersionRequest(
@ -250,7 +366,7 @@ public class EntityController
continue;
}
AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey());
AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey()).orElse(null);
if (aspectSpec != null) {
@ -307,7 +423,7 @@ public class EntityController
continue;
}
AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey());
AspectSpec aspectSpec = lookupAspectSpec(entityUrn, aspect.getKey()).orElse(null);
if (aspectSpec != null) {