mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-10 00:11:15 +00:00
feat(graphql): extend entity client to support aspect methods directly via java (#3489)
This commit is contained in:
parent
4a0ed0710d
commit
c4c74a3f45
@ -72,6 +72,8 @@ public class GmsClientFactory {
|
|||||||
return _entities;
|
return _entities;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deprecated- please use EntityClient from now on for all aspect related calls
|
||||||
|
@Deprecated
|
||||||
public static AspectClient getAspectsClient() {
|
public static AspectClient getAspectsClient() {
|
||||||
if (_aspects == null) {
|
if (_aspects == null) {
|
||||||
synchronized (GmsClientFactory.class) {
|
synchronized (GmsClientFactory.class) {
|
||||||
|
|||||||
@ -235,7 +235,7 @@ public class GmsGraphQLEngine {
|
|||||||
GmsClientFactory.getRelationshipsClient()
|
GmsClientFactory.getRelationshipsClient()
|
||||||
);
|
);
|
||||||
this.glossaryTermType = new GlossaryTermType(entityClient);
|
this.glossaryTermType = new GlossaryTermType(entityClient);
|
||||||
this.aspectType = new AspectType(GmsClientFactory.getAspectsClient());
|
this.aspectType = new AspectType(entityClient);
|
||||||
this.usageType = new UsageType(GmsClientFactory.getUsageClient());
|
this.usageType = new UsageType(GmsClientFactory.getUsageClient());
|
||||||
|
|
||||||
// Init Lists
|
// Init Lists
|
||||||
|
|||||||
@ -3,7 +3,7 @@ package com.linkedin.datahub.graphql.types.aspect;
|
|||||||
import com.linkedin.datahub.graphql.VersionedAspectKey;
|
import com.linkedin.datahub.graphql.VersionedAspectKey;
|
||||||
import com.linkedin.datahub.graphql.QueryContext;
|
import com.linkedin.datahub.graphql.QueryContext;
|
||||||
import com.linkedin.datahub.graphql.generated.Aspect;
|
import com.linkedin.datahub.graphql.generated.Aspect;
|
||||||
import com.linkedin.entity.client.AspectClient;
|
import com.linkedin.entity.client.EntityClient;
|
||||||
import com.linkedin.metadata.aspect.VersionedAspect;
|
import com.linkedin.metadata.aspect.VersionedAspect;
|
||||||
import com.linkedin.r2.RemoteInvocationException;
|
import com.linkedin.r2.RemoteInvocationException;
|
||||||
import com.linkedin.restli.client.RestLiResponseException;
|
import com.linkedin.restli.client.RestLiResponseException;
|
||||||
@ -14,10 +14,10 @@ import javax.annotation.Nonnull;
|
|||||||
|
|
||||||
|
|
||||||
public class AspectType {
|
public class AspectType {
|
||||||
private final AspectClient _aspectClient;
|
private final EntityClient _entityClient;
|
||||||
|
|
||||||
public AspectType(final AspectClient aspectClient) {
|
public AspectType(final EntityClient entityClient) {
|
||||||
_aspectClient = aspectClient;
|
_entityClient = entityClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -30,7 +30,7 @@ public class AspectType {
|
|||||||
try {
|
try {
|
||||||
return keys.stream().map(key -> {
|
return keys.stream().map(key -> {
|
||||||
try {
|
try {
|
||||||
VersionedAspect entity = _aspectClient.getAspect(key.getUrn(), key.getAspectName(), key.getVersion(), context.getActor());
|
VersionedAspect entity = _entityClient.getAspect(key.getUrn(), key.getAspectName(), key.getVersion(), context.getActor());
|
||||||
return DataFetcherResult.<Aspect>newResult().data(AspectMapper.map(entity)).build();
|
return DataFetcherResult.<Aspect>newResult().data(AspectMapper.map(entity)).build();
|
||||||
} catch (RemoteInvocationException e) {
|
} catch (RemoteInvocationException e) {
|
||||||
if (e instanceof RestLiResponseException) {
|
if (e instanceof RestLiResponseException) {
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import com.linkedin.entity.client.JavaEntityClient;
|
|||||||
import com.linkedin.metadata.entity.EntityService;
|
import com.linkedin.metadata.entity.EntityService;
|
||||||
import com.linkedin.metadata.search.EntitySearchService;
|
import com.linkedin.metadata.search.EntitySearchService;
|
||||||
import com.linkedin.metadata.search.SearchService;
|
import com.linkedin.metadata.search.SearchService;
|
||||||
|
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
@ -24,8 +25,12 @@ public class JavaEntityClientFactory {
|
|||||||
@Qualifier("entitySearchService")
|
@Qualifier("entitySearchService")
|
||||||
private EntitySearchService _entitySearchService;
|
private EntitySearchService _entitySearchService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("timeseriesAspectService")
|
||||||
|
private TimeseriesAspectService _timeseriesAspectService;
|
||||||
|
|
||||||
@Bean("javaEntityClient")
|
@Bean("javaEntityClient")
|
||||||
public JavaEntityClient getJavaEntityClient() {
|
public JavaEntityClient getJavaEntityClient() {
|
||||||
return new JavaEntityClient(_entityService, _entitySearchService, _searchService);
|
return new JavaEntityClient(_entityService, _entitySearchService, _searchService, _timeseriesAspectService);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,8 +1,11 @@
|
|||||||
package com.linkedin.entity.client;
|
package com.linkedin.entity.client;
|
||||||
|
|
||||||
import com.linkedin.common.urn.Urn;
|
import com.linkedin.common.urn.Urn;
|
||||||
|
import com.linkedin.data.template.RecordTemplate;
|
||||||
import com.linkedin.data.template.StringArray;
|
import com.linkedin.data.template.StringArray;
|
||||||
import com.linkedin.entity.Entity;
|
import com.linkedin.entity.Entity;
|
||||||
|
import com.linkedin.metadata.aspect.EnvelopedAspect;
|
||||||
|
import com.linkedin.metadata.aspect.VersionedAspect;
|
||||||
import com.linkedin.metadata.browse.BrowseResult;
|
import com.linkedin.metadata.browse.BrowseResult;
|
||||||
import com.linkedin.metadata.query.AutoCompleteResult;
|
import com.linkedin.metadata.query.AutoCompleteResult;
|
||||||
import com.linkedin.metadata.query.filter.Filter;
|
import com.linkedin.metadata.query.filter.Filter;
|
||||||
@ -10,10 +13,12 @@ import com.linkedin.metadata.query.ListResult;
|
|||||||
import com.linkedin.metadata.query.filter.SortCriterion;
|
import com.linkedin.metadata.query.filter.SortCriterion;
|
||||||
import com.linkedin.metadata.search.SearchResult;
|
import com.linkedin.metadata.search.SearchResult;
|
||||||
import com.linkedin.metadata.query.ListUrnsResult;
|
import com.linkedin.metadata.query.ListUrnsResult;
|
||||||
|
import com.linkedin.mxe.MetadataChangeProposal;
|
||||||
import com.linkedin.mxe.SystemMetadata;
|
import com.linkedin.mxe.SystemMetadata;
|
||||||
import com.linkedin.r2.RemoteInvocationException;
|
import com.linkedin.r2.RemoteInvocationException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
@ -218,4 +223,39 @@ public interface EntityClient {
|
|||||||
int count,
|
int count,
|
||||||
@Nonnull String actor)
|
@Nonnull String actor)
|
||||||
throws RemoteInvocationException;
|
throws RemoteInvocationException;
|
||||||
|
|
||||||
|
public VersionedAspect getAspect(
|
||||||
|
@Nonnull String urn,
|
||||||
|
@Nonnull String aspect,
|
||||||
|
@Nonnull Long version,
|
||||||
|
@Nonnull String actor)
|
||||||
|
throws RemoteInvocationException;
|
||||||
|
|
||||||
|
public VersionedAspect getAspectOrNull(
|
||||||
|
@Nonnull String urn,
|
||||||
|
@Nonnull String aspect,
|
||||||
|
@Nonnull Long version,
|
||||||
|
@Nonnull String actor) throws RemoteInvocationException;
|
||||||
|
|
||||||
|
public List<EnvelopedAspect> getTimeseriesAspectValues(
|
||||||
|
@Nonnull String urn,
|
||||||
|
@Nonnull String entity,
|
||||||
|
@Nonnull String aspect,
|
||||||
|
@Nullable Long startTimeMillis,
|
||||||
|
@Nullable Long endTimeMillis,
|
||||||
|
@Nullable Integer limit,
|
||||||
|
@Nullable String actor
|
||||||
|
) throws RemoteInvocationException;
|
||||||
|
|
||||||
|
public String ingestProposal(
|
||||||
|
@Nonnull final MetadataChangeProposal metadataChangeProposal,
|
||||||
|
@Nonnull final String actor
|
||||||
|
) throws RemoteInvocationException;
|
||||||
|
|
||||||
|
public <T extends RecordTemplate> Optional<T> getVersionedAspect(
|
||||||
|
@Nonnull String urn,
|
||||||
|
@Nonnull String aspect,
|
||||||
|
@Nonnull Long version,
|
||||||
|
@Nonnull String actor,
|
||||||
|
@Nonnull Class<T> aspectClass) throws RemoteInvocationException;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,48 +1,67 @@
|
|||||||
package com.linkedin.entity.client;
|
package com.linkedin.entity.client;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.linkedin.aspect.GetTimeseriesAspectValuesResponse;
|
||||||
import com.linkedin.common.AuditStamp;
|
import com.linkedin.common.AuditStamp;
|
||||||
import com.linkedin.common.urn.Urn;
|
import com.linkedin.common.urn.Urn;
|
||||||
|
import com.linkedin.data.DataMap;
|
||||||
|
import com.linkedin.data.template.RecordTemplate;
|
||||||
import com.linkedin.data.template.StringArray;
|
import com.linkedin.data.template.StringArray;
|
||||||
import com.linkedin.entity.Entity;
|
import com.linkedin.entity.Entity;
|
||||||
|
import com.linkedin.metadata.Constants;
|
||||||
|
import com.linkedin.metadata.aspect.EnvelopedAspect;
|
||||||
|
import com.linkedin.metadata.aspect.EnvelopedAspectArray;
|
||||||
|
import com.linkedin.metadata.aspect.VersionedAspect;
|
||||||
import com.linkedin.metadata.browse.BrowseResult;
|
import com.linkedin.metadata.browse.BrowseResult;
|
||||||
|
import com.linkedin.metadata.dao.utils.RecordUtils;
|
||||||
import com.linkedin.metadata.entity.EntityService;
|
import com.linkedin.metadata.entity.EntityService;
|
||||||
import com.linkedin.metadata.query.AutoCompleteResult;
|
import com.linkedin.metadata.query.AutoCompleteResult;
|
||||||
import com.linkedin.metadata.query.filter.Filter;
|
import com.linkedin.metadata.query.filter.Filter;
|
||||||
import com.linkedin.metadata.query.filter.SortCriterion;
|
import com.linkedin.metadata.query.filter.SortCriterion;
|
||||||
import com.linkedin.metadata.query.ListResult;
|
import com.linkedin.metadata.query.ListResult;
|
||||||
import com.linkedin.metadata.query.ListUrnsResult;
|
import com.linkedin.metadata.query.ListUrnsResult;
|
||||||
|
import com.linkedin.metadata.resources.entity.AspectUtils;
|
||||||
import com.linkedin.metadata.resources.entity.EntityResource;
|
import com.linkedin.metadata.resources.entity.EntityResource;
|
||||||
import com.linkedin.metadata.search.EntitySearchService;
|
import com.linkedin.metadata.search.EntitySearchService;
|
||||||
import com.linkedin.metadata.search.SearchResult;
|
import com.linkedin.metadata.search.SearchResult;
|
||||||
import com.linkedin.metadata.search.SearchService;
|
import com.linkedin.metadata.search.SearchService;
|
||||||
|
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
|
||||||
|
import com.linkedin.mxe.MetadataChangeProposal;
|
||||||
import com.linkedin.mxe.SystemMetadata;
|
import com.linkedin.mxe.SystemMetadata;
|
||||||
import com.linkedin.r2.RemoteInvocationException;
|
import com.linkedin.r2.RemoteInvocationException;
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import static com.linkedin.metadata.search.utils.QueryUtils.*;
|
import static com.linkedin.metadata.search.utils.QueryUtils.*;
|
||||||
|
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
public class JavaEntityClient implements EntityClient {
|
public class JavaEntityClient implements EntityClient {
|
||||||
|
|
||||||
|
private final Clock _clock = Clock.systemUTC();
|
||||||
|
|
||||||
private EntityService _entityService;
|
private EntityService _entityService;
|
||||||
private EntitySearchService _entitySearchService;
|
private EntitySearchService _entitySearchService;
|
||||||
private SearchService _searchService;
|
private SearchService _searchService;
|
||||||
|
private TimeseriesAspectService _timeseriesAspectService;
|
||||||
|
|
||||||
public JavaEntityClient(@Nonnull final EntityService entityService, @Nonnull final EntitySearchService entitySearchService, @Nonnull final
|
public JavaEntityClient(@Nonnull final EntityService entityService, @Nonnull final EntitySearchService entitySearchService, @Nonnull final
|
||||||
SearchService searchService) {
|
SearchService searchService, @Nonnull final TimeseriesAspectService timeseriesAspectService) {
|
||||||
_entityService = entityService;
|
_entityService = entityService;
|
||||||
_entitySearchService = entitySearchService;
|
_entitySearchService = entitySearchService;
|
||||||
_searchService = searchService;
|
_searchService = searchService;
|
||||||
|
_timeseriesAspectService = timeseriesAspectService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@ -285,4 +304,69 @@ public class JavaEntityClient implements EntityClient {
|
|||||||
int start, int count, @Nonnull String actor) throws RemoteInvocationException {
|
int start, int count, @Nonnull String actor) throws RemoteInvocationException {
|
||||||
return _entitySearchService.filter(entity, filter, sortCriterion, start, count);
|
return _entitySearchService.filter(entity, filter, sortCriterion, start, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
@Override
|
||||||
|
public VersionedAspect getAspect(@Nonnull String urn, @Nonnull String aspect, @Nonnull Long version,
|
||||||
|
@Nonnull String actor) throws RemoteInvocationException {
|
||||||
|
return _entityService.getVersionedAspect(Urn.createFromString(urn), aspect, version);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
@Override
|
||||||
|
public VersionedAspect getAspectOrNull(@Nonnull String urn, @Nonnull String aspect, @Nonnull Long version,
|
||||||
|
@Nonnull String actor) throws RemoteInvocationException {
|
||||||
|
return _entityService.getVersionedAspect(Urn.createFromString(urn), aspect, version);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
@Override
|
||||||
|
public List<EnvelopedAspect> getTimeseriesAspectValues(@Nonnull String urn, @Nonnull String entity,
|
||||||
|
@Nonnull String aspect, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis, @Nullable Integer limit,
|
||||||
|
@Nullable String actor) throws RemoteInvocationException {
|
||||||
|
GetTimeseriesAspectValuesResponse response = new GetTimeseriesAspectValuesResponse();
|
||||||
|
response.setEntityName(entity);
|
||||||
|
response.setAspectName(aspect);
|
||||||
|
if (startTimeMillis != null) {
|
||||||
|
response.setStartTimeMillis(startTimeMillis);
|
||||||
|
}
|
||||||
|
if (endTimeMillis != null) {
|
||||||
|
response.setEndTimeMillis(endTimeMillis);
|
||||||
|
}
|
||||||
|
response.setLimit(limit);
|
||||||
|
response.setValues(new EnvelopedAspectArray(
|
||||||
|
_timeseriesAspectService.getAspectValues(Urn.createFromString(urn), entity, aspect, startTimeMillis, endTimeMillis,
|
||||||
|
limit)));
|
||||||
|
return response.getValues();
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Factor out ingest logic into a util that can be accessed by the java client and the resource
|
||||||
|
@SneakyThrows
|
||||||
|
@Override
|
||||||
|
public String ingestProposal(@Nonnull MetadataChangeProposal metadataChangeProposal,
|
||||||
|
@Nonnull String actor) throws RemoteInvocationException {
|
||||||
|
final AuditStamp auditStamp =
|
||||||
|
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(Constants.UNKNOWN_ACTOR));
|
||||||
|
final List<MetadataChangeProposal> additionalChanges =
|
||||||
|
AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService);
|
||||||
|
|
||||||
|
Urn urn = _entityService.ingestProposal(metadataChangeProposal, auditStamp);
|
||||||
|
additionalChanges.forEach(proposal -> _entityService.ingestProposal(proposal, auditStamp));
|
||||||
|
return urn.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
@Override
|
||||||
|
public <T extends RecordTemplate> Optional<T> getVersionedAspect(@Nonnull String urn, @Nonnull String aspect,
|
||||||
|
@Nonnull Long version, @Nonnull String actor, @Nonnull Class<T> aspectClass) throws RemoteInvocationException {
|
||||||
|
VersionedAspect entity = _entityService.getVersionedAspect(Urn.createFromString(urn), aspect, version);
|
||||||
|
if (entity.hasAspect()) {
|
||||||
|
DataMap rawAspect = ((DataMap) entity.data().get("aspect"));
|
||||||
|
if (rawAspect.containsKey(aspectClass.getCanonicalName())) {
|
||||||
|
DataMap aspectDataMap = rawAspect.getDataMap(aspectClass.getCanonicalName());
|
||||||
|
return Optional.of(RecordUtils.toRecordTemplate(aspectClass, aspectDataMap));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,13 @@ package com.linkedin.entity.client;
|
|||||||
|
|
||||||
import com.linkedin.common.client.BaseClient;
|
import com.linkedin.common.client.BaseClient;
|
||||||
import com.linkedin.common.urn.Urn;
|
import com.linkedin.common.urn.Urn;
|
||||||
|
import com.linkedin.data.DataMap;
|
||||||
|
import com.linkedin.data.template.RecordTemplate;
|
||||||
import com.linkedin.data.template.StringArray;
|
import com.linkedin.data.template.StringArray;
|
||||||
|
import com.linkedin.entity.AspectsDoGetTimeseriesAspectValuesRequestBuilder;
|
||||||
|
import com.linkedin.entity.AspectsDoIngestProposalRequestBuilder;
|
||||||
|
import com.linkedin.entity.AspectsGetRequestBuilder;
|
||||||
|
import com.linkedin.entity.AspectsRequestBuilders;
|
||||||
import com.linkedin.entity.EntitiesBatchGetRequestBuilder;
|
import com.linkedin.entity.EntitiesBatchGetRequestBuilder;
|
||||||
import com.linkedin.entity.EntitiesDoAutocompleteRequestBuilder;
|
import com.linkedin.entity.EntitiesDoAutocompleteRequestBuilder;
|
||||||
import com.linkedin.entity.EntitiesDoBatchGetTotalEntityCountRequestBuilder;
|
import com.linkedin.entity.EntitiesDoBatchGetTotalEntityCountRequestBuilder;
|
||||||
@ -21,33 +27,42 @@ import com.linkedin.entity.EntitiesDoSetWritableRequestBuilder;
|
|||||||
import com.linkedin.entity.EntitiesRequestBuilders;
|
import com.linkedin.entity.EntitiesRequestBuilders;
|
||||||
import com.linkedin.entity.Entity;
|
import com.linkedin.entity.Entity;
|
||||||
import com.linkedin.entity.EntityArray;
|
import com.linkedin.entity.EntityArray;
|
||||||
|
import com.linkedin.metadata.aspect.EnvelopedAspect;
|
||||||
|
import com.linkedin.metadata.aspect.VersionedAspect;
|
||||||
import com.linkedin.metadata.browse.BrowseResult;
|
import com.linkedin.metadata.browse.BrowseResult;
|
||||||
|
import com.linkedin.metadata.dao.utils.RecordUtils;
|
||||||
import com.linkedin.metadata.query.AutoCompleteResult;
|
import com.linkedin.metadata.query.AutoCompleteResult;
|
||||||
import com.linkedin.metadata.query.ListResult;
|
import com.linkedin.metadata.query.ListResult;
|
||||||
import com.linkedin.metadata.query.filter.Filter;
|
import com.linkedin.metadata.query.filter.Filter;
|
||||||
import com.linkedin.metadata.query.filter.SortCriterion;
|
import com.linkedin.metadata.query.filter.SortCriterion;
|
||||||
import com.linkedin.metadata.search.SearchResult;
|
import com.linkedin.metadata.search.SearchResult;
|
||||||
import com.linkedin.metadata.query.ListUrnsResult;
|
import com.linkedin.metadata.query.ListUrnsResult;
|
||||||
|
import com.linkedin.mxe.MetadataChangeProposal;
|
||||||
import com.linkedin.mxe.SystemMetadata;
|
import com.linkedin.mxe.SystemMetadata;
|
||||||
import com.linkedin.r2.RemoteInvocationException;
|
import com.linkedin.r2.RemoteInvocationException;
|
||||||
import com.linkedin.restli.client.Client;
|
import com.linkedin.restli.client.Client;
|
||||||
|
import com.linkedin.restli.client.RestLiResponseException;
|
||||||
|
import com.linkedin.restli.common.HttpStatus;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import static com.linkedin.metadata.search.utils.QueryUtils.newFilter;
|
import static com.linkedin.metadata.search.utils.QueryUtils.newFilter;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
public class RestliEntityClient extends BaseClient implements EntityClient {
|
public class RestliEntityClient extends BaseClient implements EntityClient {
|
||||||
|
|
||||||
private static final EntitiesRequestBuilders ENTITIES_REQUEST_BUILDERS = new EntitiesRequestBuilders();
|
private static final EntitiesRequestBuilders ENTITIES_REQUEST_BUILDERS = new EntitiesRequestBuilders();
|
||||||
|
private static final AspectsRequestBuilders ASPECTS_REQUEST_BUILDERS = new AspectsRequestBuilders();
|
||||||
|
|
||||||
public RestliEntityClient(@Nonnull final Client restliClient) {
|
public RestliEntityClient(@Nonnull final Client restliClient) {
|
||||||
super(restliClient);
|
super(restliClient);
|
||||||
@ -399,4 +414,143 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
|
|||||||
}
|
}
|
||||||
return sendClientRequest(requestBuilder, actor).getEntity();
|
return sendClientRequest(requestBuilder, actor).getEntity();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets aspect at version for an entity
|
||||||
|
*
|
||||||
|
* @param urn urn for the entity
|
||||||
|
* @return list of paths given urn
|
||||||
|
* @throws RemoteInvocationException on remote request error.
|
||||||
|
*/
|
||||||
|
@Nonnull
|
||||||
|
public VersionedAspect getAspect(
|
||||||
|
@Nonnull String urn,
|
||||||
|
@Nonnull String aspect,
|
||||||
|
@Nonnull Long version,
|
||||||
|
@Nonnull String actor)
|
||||||
|
throws RemoteInvocationException {
|
||||||
|
|
||||||
|
AspectsGetRequestBuilder requestBuilder =
|
||||||
|
ASPECTS_REQUEST_BUILDERS.get().id(urn).aspectParam(aspect).versionParam(version);
|
||||||
|
|
||||||
|
return sendClientRequest(requestBuilder, actor).getEntity();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets aspect at version for an entity, or null if one doesn't exist.
|
||||||
|
*
|
||||||
|
* @param urn urn for the entity
|
||||||
|
* @return list of paths given urn
|
||||||
|
* @throws RemoteInvocationException on remote request error.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public VersionedAspect getAspectOrNull(
|
||||||
|
@Nonnull String urn,
|
||||||
|
@Nonnull String aspect,
|
||||||
|
@Nonnull Long version,
|
||||||
|
@Nonnull String actor)
|
||||||
|
throws RemoteInvocationException {
|
||||||
|
|
||||||
|
AspectsGetRequestBuilder requestBuilder =
|
||||||
|
ASPECTS_REQUEST_BUILDERS.get().id(urn).aspectParam(aspect).versionParam(version);
|
||||||
|
try {
|
||||||
|
return sendClientRequest(requestBuilder, actor).getEntity();
|
||||||
|
} catch (RestLiResponseException e) {
|
||||||
|
if (e.getStatus() == HttpStatus.S_404_NOT_FOUND.getCode()) {
|
||||||
|
// Then the aspect was not found. Return null.
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve instances of a particular aspect.
|
||||||
|
*
|
||||||
|
* @param urn urn for the entity.
|
||||||
|
* @param entity the name of the entity.
|
||||||
|
* @param aspect the name of the aspect.
|
||||||
|
* @param startTimeMillis the earliest desired event time of the aspect value in milliseconds.
|
||||||
|
* @param endTimeMillis the latest desired event time of the aspect value in milliseconds.
|
||||||
|
* @param limit the maximum number of desired aspect values.
|
||||||
|
* @param actor the actor associated with the request [internal]
|
||||||
|
* @return the list of EnvelopedAspect values satisfying the input parameters.
|
||||||
|
* @throws RemoteInvocationException on remote request error.
|
||||||
|
*/
|
||||||
|
@Nonnull
|
||||||
|
public List<EnvelopedAspect> getTimeseriesAspectValues(
|
||||||
|
@Nonnull String urn,
|
||||||
|
@Nonnull String entity,
|
||||||
|
@Nonnull String aspect,
|
||||||
|
@Nullable Long startTimeMillis,
|
||||||
|
@Nullable Long endTimeMillis,
|
||||||
|
@Nullable Integer limit,
|
||||||
|
@Nullable String actor
|
||||||
|
)
|
||||||
|
throws RemoteInvocationException {
|
||||||
|
|
||||||
|
AspectsDoGetTimeseriesAspectValuesRequestBuilder requestBuilder =
|
||||||
|
ASPECTS_REQUEST_BUILDERS.actionGetTimeseriesAspectValues()
|
||||||
|
.urnParam(urn)
|
||||||
|
.entityParam(entity)
|
||||||
|
.aspectParam(aspect);
|
||||||
|
|
||||||
|
if (startTimeMillis != null) {
|
||||||
|
requestBuilder.startTimeMillisParam(startTimeMillis);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (endTimeMillis != null) {
|
||||||
|
requestBuilder.endTimeMillisParam(endTimeMillis);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (limit != null) {
|
||||||
|
requestBuilder.limitParam(limit);
|
||||||
|
}
|
||||||
|
return sendClientRequest(requestBuilder, actor).getEntity().getValues();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ingest a MetadataChangeProposal event.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public String ingestProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal, @Nonnull final String actor)
|
||||||
|
throws RemoteInvocationException {
|
||||||
|
final AspectsDoIngestProposalRequestBuilder requestBuilder = ASPECTS_REQUEST_BUILDERS.actionIngestProposal()
|
||||||
|
.proposalParam(metadataChangeProposal);
|
||||||
|
return sendClientRequest(requestBuilder, actor).getEntity();
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T extends RecordTemplate> Optional<T> getVersionedAspect(
|
||||||
|
@Nonnull String urn,
|
||||||
|
@Nonnull String aspect,
|
||||||
|
@Nonnull Long version,
|
||||||
|
@Nonnull String actor,
|
||||||
|
@Nonnull Class<T> aspectClass)
|
||||||
|
throws RemoteInvocationException {
|
||||||
|
|
||||||
|
AspectsGetRequestBuilder requestBuilder =
|
||||||
|
ASPECTS_REQUEST_BUILDERS.get().id(urn).aspectParam(aspect).versionParam(version);
|
||||||
|
|
||||||
|
try {
|
||||||
|
VersionedAspect entity = sendClientRequest(requestBuilder, actor).getEntity();
|
||||||
|
if (entity.hasAspect()) {
|
||||||
|
DataMap rawAspect = ((DataMap) entity.data().get("aspect"));
|
||||||
|
if (rawAspect.containsKey(aspectClass.getCanonicalName())) {
|
||||||
|
DataMap aspectDataMap = rawAspect.getDataMap(aspectClass.getCanonicalName());
|
||||||
|
return Optional.of(RecordUtils.toRecordTemplate(aspectClass, aspectDataMap));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (RestLiResponseException e) {
|
||||||
|
if (e.getStatus() == 404) {
|
||||||
|
log.debug("Could not find aspect {} for entity {}", aspect, urn);
|
||||||
|
return Optional.empty();
|
||||||
|
} else {
|
||||||
|
// re-throw other exceptions
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,12 +1,10 @@
|
|||||||
package com.linkedin.metadata.resources.entity;
|
package com.linkedin.metadata.resources.entity;
|
||||||
|
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
import com.google.common.collect.ImmutableSet;
|
|
||||||
import com.linkedin.aspect.GetTimeseriesAspectValuesResponse;
|
import com.linkedin.aspect.GetTimeseriesAspectValuesResponse;
|
||||||
import com.linkedin.common.AuditStamp;
|
import com.linkedin.common.AuditStamp;
|
||||||
import com.linkedin.common.urn.Urn;
|
import com.linkedin.common.urn.Urn;
|
||||||
import com.linkedin.data.template.RecordTemplate;
|
|
||||||
import com.linkedin.events.metadata.ChangeType;
|
|
||||||
import com.linkedin.metadata.Constants;
|
import com.linkedin.metadata.Constants;
|
||||||
import com.linkedin.metadata.aspect.EnvelopedAspectArray;
|
import com.linkedin.metadata.aspect.EnvelopedAspectArray;
|
||||||
import com.linkedin.metadata.aspect.VersionedAspect;
|
import com.linkedin.metadata.aspect.VersionedAspect;
|
||||||
@ -14,9 +12,6 @@ import com.linkedin.metadata.entity.EntityService;
|
|||||||
import com.linkedin.metadata.entity.ValidationException;
|
import com.linkedin.metadata.entity.ValidationException;
|
||||||
import com.linkedin.metadata.restli.RestliUtil;
|
import com.linkedin.metadata.restli.RestliUtil;
|
||||||
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
|
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
|
||||||
import com.linkedin.metadata.utils.EntityKeyUtils;
|
|
||||||
import com.linkedin.metadata.utils.GenericAspectUtils;
|
|
||||||
import com.linkedin.mxe.GenericAspect;
|
|
||||||
import com.linkedin.mxe.MetadataChangeProposal;
|
import com.linkedin.mxe.MetadataChangeProposal;
|
||||||
import com.linkedin.parseq.Task;
|
import com.linkedin.parseq.Task;
|
||||||
import com.linkedin.restli.common.HttpStatus;
|
import com.linkedin.restli.common.HttpStatus;
|
||||||
@ -32,11 +27,7 @@ import com.linkedin.restli.server.resources.CollectionResourceTaskTemplate;
|
|||||||
import io.opentelemetry.extension.annotations.WithSpan;
|
import io.opentelemetry.extension.annotations.WithSpan;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
@ -135,7 +126,8 @@ public class AspectResource extends CollectionResourceTaskTemplate<String, Versi
|
|||||||
// TODO: Use the actor present in the IC.
|
// TODO: Use the actor present in the IC.
|
||||||
final AuditStamp auditStamp =
|
final AuditStamp auditStamp =
|
||||||
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(Constants.UNKNOWN_ACTOR));
|
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(Constants.UNKNOWN_ACTOR));
|
||||||
final List<MetadataChangeProposal> additionalChanges = getAdditionalChanges(metadataChangeProposal);
|
final List<MetadataChangeProposal> additionalChanges =
|
||||||
|
AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService);
|
||||||
|
|
||||||
return RestliUtil.toTask(() -> {
|
return RestliUtil.toTask(() -> {
|
||||||
log.debug("Proposal: {}", metadataChangeProposal);
|
log.debug("Proposal: {}", metadataChangeProposal);
|
||||||
@ -149,35 +141,4 @@ public class AspectResource extends CollectionResourceTaskTemplate<String, Versi
|
|||||||
}, MetricRegistry.name(this.getClass(), "ingestProposal"));
|
}, MetricRegistry.name(this.getClass(), "ingestProposal"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<MetadataChangeProposal> getAdditionalChanges(@Nonnull MetadataChangeProposal metadataChangeProposal) {
|
|
||||||
// No additional changes for delete operation
|
|
||||||
if (metadataChangeProposal.getChangeType() == ChangeType.DELETE) {
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
final List<MetadataChangeProposal> additionalChanges = new ArrayList<>();
|
|
||||||
|
|
||||||
final Urn urn = EntityKeyUtils.getUrnFromProposal(metadataChangeProposal,
|
|
||||||
_entityService.getKeyAspectSpec(metadataChangeProposal.getEntityType()));
|
|
||||||
|
|
||||||
return _entityService.generateDefaultAspectsIfMissing(urn, ImmutableSet.of(metadataChangeProposal.getAspectName()))
|
|
||||||
.stream()
|
|
||||||
.map(entry -> getProposalFromAspect(entry.getKey(), entry.getValue(), metadataChangeProposal))
|
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
private MetadataChangeProposal getProposalFromAspect(String aspectName, RecordTemplate aspect,
|
|
||||||
MetadataChangeProposal original) {
|
|
||||||
try {
|
|
||||||
MetadataChangeProposal proposal = original.copy();
|
|
||||||
GenericAspect genericAspect = GenericAspectUtils.serializeAspect(aspect);
|
|
||||||
proposal.setAspect(genericAspect);
|
|
||||||
proposal.setAspectName(aspectName);
|
|
||||||
return proposal;
|
|
||||||
} catch (CloneNotSupportedException e) {
|
|
||||||
log.error("Issue while generating additional proposals corresponding to the input proposal", e);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,56 @@
|
|||||||
|
package com.linkedin.metadata.resources.entity;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.linkedin.common.urn.Urn;
|
||||||
|
import com.linkedin.data.template.RecordTemplate;
|
||||||
|
import com.linkedin.events.metadata.ChangeType;
|
||||||
|
import com.linkedin.metadata.entity.EntityService;
|
||||||
|
import com.linkedin.metadata.utils.EntityKeyUtils;
|
||||||
|
import com.linkedin.metadata.utils.GenericAspectUtils;
|
||||||
|
import com.linkedin.mxe.GenericAspect;
|
||||||
|
import com.linkedin.mxe.MetadataChangeProposal;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import javax.annotation.Nonnull;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class AspectUtils {
|
||||||
|
|
||||||
|
private AspectUtils() { }
|
||||||
|
|
||||||
|
public static List<MetadataChangeProposal> getAdditionalChanges(
|
||||||
|
@Nonnull MetadataChangeProposal metadataChangeProposal,
|
||||||
|
@Nonnull EntityService entityService
|
||||||
|
) {
|
||||||
|
// No additional changes for delete operation
|
||||||
|
if (metadataChangeProposal.getChangeType() == ChangeType.DELETE) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
final Urn urn = EntityKeyUtils.getUrnFromProposal(metadataChangeProposal,
|
||||||
|
entityService.getKeyAspectSpec(metadataChangeProposal.getEntityType()));
|
||||||
|
|
||||||
|
return entityService.generateDefaultAspectsIfMissing(urn, ImmutableSet.of(metadataChangeProposal.getAspectName()))
|
||||||
|
.stream()
|
||||||
|
.map(entry -> getProposalFromAspect(entry.getKey(), entry.getValue(), metadataChangeProposal))
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MetadataChangeProposal getProposalFromAspect(String aspectName, RecordTemplate aspect,
|
||||||
|
MetadataChangeProposal original) {
|
||||||
|
try {
|
||||||
|
MetadataChangeProposal proposal = original.copy();
|
||||||
|
GenericAspect genericAspect = GenericAspectUtils.serializeAspect(aspect);
|
||||||
|
proposal.setAspect(genericAspect);
|
||||||
|
proposal.setAspectName(aspectName);
|
||||||
|
return proposal;
|
||||||
|
} catch (CloneNotSupportedException e) {
|
||||||
|
log.error("Issue while generating additional proposals corresponding to the input proposal", e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user