mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 18:07:57 +00:00
fix(misc): misc fixes for OSS release (#10493)
This commit is contained in:
parent
d217a6f885
commit
b8b7928dd4
@ -62,6 +62,7 @@ public class AuthModule extends AbstractModule {
|
||||
private static final String PAC4J_SESSIONSTORE_PROVIDER_CONF = "pac4j.sessionStore.provider";
|
||||
private static final String ENTITY_CLIENT_RETRY_INTERVAL = "entityClient.retryInterval";
|
||||
private static final String ENTITY_CLIENT_NUM_RETRIES = "entityClient.numRetries";
|
||||
private static final String ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE = "entityClient.restli.get.batchSize";
|
||||
private static final String GET_SSO_SETTINGS_ENDPOINT = "auth/getSsoSettings";
|
||||
|
||||
private final com.typesafe.config.Config _configs;
|
||||
@ -201,11 +202,13 @@ public class AuthModule extends AbstractModule {
|
||||
protected SystemEntityClient provideEntityClient(
|
||||
@Named("systemOperationContext") final OperationContext systemOperationContext,
|
||||
final ConfigurationProvider configurationProvider) {
|
||||
|
||||
return new SystemRestliEntityClient(
|
||||
buildRestliClient(),
|
||||
new ExponentialBackoff(_configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)),
|
||||
_configs.getInt(ENTITY_CLIENT_NUM_RETRIES),
|
||||
configurationProvider.getCache().getClient().getEntityClient());
|
||||
configurationProvider.getCache().getClient().getEntityClient(),
|
||||
Math.max(1, _configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)));
|
||||
}
|
||||
|
||||
@Provides
|
||||
|
||||
@ -288,4 +288,6 @@ systemClientSecret=${?DATAHUB_SYSTEM_CLIENT_SECRET}
|
||||
entityClient.retryInterval = 2
|
||||
entityClient.retryInterval = ${?ENTITY_CLIENT_RETRY_INTERVAL}
|
||||
entityClient.numRetries = 3
|
||||
entityClient.numRetries = ${?ENTITY_CLIENT_NUM_RETRIES}
|
||||
entityClient.numRetries = ${?ENTITY_CLIENT_NUM_RETRIES}
|
||||
entityClient.restli.get.batchSize = 100
|
||||
entityClient.restli.get.batchSize = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE}
|
||||
@ -125,14 +125,6 @@ public abstract class AbstractMCLStep implements UpgradeStep {
|
||||
}
|
||||
});
|
||||
|
||||
entityService
|
||||
.streamRestoreIndices(opContext, args, x -> context.report().addLine((String) x))
|
||||
.forEach(
|
||||
result -> {
|
||||
context.report().addLine("Rows migrated: " + result.rowsMigrated);
|
||||
context.report().addLine("Rows ignored: " + result.ignored);
|
||||
});
|
||||
|
||||
BootstrapStep.setUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
|
||||
context.report().addLine("State updated: " + getUpgradeIdUrn());
|
||||
|
||||
|
||||
@ -7,6 +7,7 @@ import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
|
||||
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
|
||||
import com.linkedin.mxe.SystemMetadata;
|
||||
import com.linkedin.util.Pair;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -15,6 +16,7 @@ import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nonnull;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
/**
|
||||
* A batch of aspects in the context of either an MCP or MCL write path to a data store. The item is
|
||||
@ -191,5 +193,23 @@ public interface AspectsBatch {
|
||||
Pair::getValue, Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
|
||||
}
|
||||
|
||||
String toAbbreviatedString(int maxWidth);
|
||||
default String toAbbreviatedString(int maxWidth) {
|
||||
return toAbbreviatedString(getItems(), maxWidth);
|
||||
}
|
||||
|
||||
static String toAbbreviatedString(Collection<? extends BatchItem> items, int maxWidth) {
|
||||
List<String> itemsAbbreviated = new ArrayList<String>();
|
||||
items.forEach(
|
||||
item -> {
|
||||
if (item instanceof ChangeMCP) {
|
||||
itemsAbbreviated.add(((ChangeMCP) item).toAbbreviatedString());
|
||||
} else {
|
||||
itemsAbbreviated.add(item.toString());
|
||||
}
|
||||
});
|
||||
return "AspectsBatchImpl{"
|
||||
+ "items="
|
||||
+ StringUtils.abbreviate(itemsAbbreviated.toString(), maxWidth)
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
||||
@ -4,8 +4,10 @@ import com.linkedin.data.DataMap;
|
||||
import com.linkedin.data.template.RecordTemplate;
|
||||
import com.linkedin.metadata.aspect.SystemAspect;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.Optional;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
/**
|
||||
* A proposal to write data to the primary datastore which includes system metadata and other
|
||||
@ -47,4 +49,24 @@ public interface ChangeMCP extends MCPItem {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
default String toAbbreviatedString() {
|
||||
return "ChangeMCP{"
|
||||
+ "changeType="
|
||||
+ getChangeType()
|
||||
+ ", urn="
|
||||
+ getUrn()
|
||||
+ ", aspectName='"
|
||||
+ getAspectName()
|
||||
+ '\''
|
||||
+ ", recordTemplate="
|
||||
+ Optional.ofNullable(getRecordTemplate())
|
||||
.map(template -> StringUtils.abbreviate(template.toString(), 256))
|
||||
.orElse("")
|
||||
+ ", systemMetadata="
|
||||
+ Optional.ofNullable(getSystemMetadata())
|
||||
.map(systemMetadata -> StringUtils.abbreviate(systemMetadata.toString(), 128))
|
||||
.orElse("")
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
||||
@ -463,7 +463,7 @@ public class RecordUtils {
|
||||
METHOD_CACHE.putIfAbsent(record.getClass(), getMethodsFromRecordTemplate(record));
|
||||
try {
|
||||
return METHOD_CACHE.get(record.getClass()).get(fieldName).invoke(record);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
} catch (NullPointerException | IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"Failed to execute method for class [%s], field [%s]",
|
||||
|
||||
@ -11,7 +11,6 @@ import com.linkedin.metadata.aspect.batch.ChangeMCP;
|
||||
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import com.linkedin.util.Pair;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
@ -23,7 +22,6 @@ import javax.annotation.Nonnull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
@Slf4j
|
||||
@Getter
|
||||
@ -156,20 +154,4 @@ public class AspectsBatchImpl implements AspectsBatch {
|
||||
public String toString() {
|
||||
return "AspectsBatchImpl{" + "items=" + items + '}';
|
||||
}
|
||||
|
||||
public String toAbbreviatedString(int maxWidth) {
|
||||
List<String> itemsAbbreviated = new ArrayList<String>();
|
||||
items.forEach(
|
||||
item -> {
|
||||
if (item instanceof ChangeItemImpl) {
|
||||
itemsAbbreviated.add(((ChangeItemImpl) item).toAbbreviatedString());
|
||||
} else {
|
||||
itemsAbbreviated.add(item.toString());
|
||||
}
|
||||
});
|
||||
return "AspectsBatchImpl{"
|
||||
+ "items="
|
||||
+ StringUtils.abbreviate(itemsAbbreviated.toString(), maxWidth)
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,7 +31,6 @@ import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
@Slf4j
|
||||
@Getter
|
||||
@ -250,20 +249,4 @@ public class ChangeItemImpl implements ChangeMCP {
|
||||
+ systemMetadata
|
||||
+ '}';
|
||||
}
|
||||
|
||||
public String toAbbreviatedString() {
|
||||
return "ChangeItemImpl{"
|
||||
+ "changeType="
|
||||
+ changeType
|
||||
+ ", urn="
|
||||
+ urn
|
||||
+ ", aspectName='"
|
||||
+ aspectName
|
||||
+ '\''
|
||||
+ ", recordTemplate="
|
||||
+ StringUtils.abbreviate(recordTemplate.toString(), 256)
|
||||
+ ", systemMetadata="
|
||||
+ StringUtils.abbreviate(systemMetadata.toString(), 128)
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ import com.datahub.plugins.auth.authorization.Authorizer;
|
||||
import com.datahub.util.RecordUtils;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.linkedin.aspect.GetTimeseriesAspectValuesResponse;
|
||||
import com.linkedin.common.AuditStamp;
|
||||
import com.linkedin.common.VersionedUrn;
|
||||
@ -59,6 +60,8 @@ import io.opentelemetry.extension.annotations.WithSpan;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Clock;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@ -90,6 +93,7 @@ public class JavaEntityClient implements EntityClient {
|
||||
private final TimeseriesAspectService timeseriesAspectService;
|
||||
private final RollbackService rollbackService;
|
||||
private final EventProducer eventProducer;
|
||||
private final int batchGetV2Size;
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
@ -121,7 +125,22 @@ public class JavaEntityClient implements EntityClient {
|
||||
throws RemoteInvocationException, URISyntaxException {
|
||||
final Set<String> projectedAspects =
|
||||
aspectNames == null ? opContext.getEntityAspectNames(entityName) : aspectNames;
|
||||
return entityService.getEntitiesV2(opContext, entityName, urns, projectedAspects);
|
||||
|
||||
Map<Urn, EntityResponse> responseMap = new HashMap<>();
|
||||
|
||||
Iterators.partition(urns.iterator(), Math.max(1, batchGetV2Size))
|
||||
.forEachRemaining(
|
||||
batch -> {
|
||||
try {
|
||||
responseMap.putAll(
|
||||
entityService.getEntitiesV2(
|
||||
opContext, entityName, new HashSet<>(batch), projectedAspects));
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
return responseMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -130,11 +149,25 @@ public class JavaEntityClient implements EntityClient {
|
||||
@Nonnull OperationContext opContext,
|
||||
@Nonnull String entityName,
|
||||
@Nonnull final Set<VersionedUrn> versionedUrns,
|
||||
@Nullable final Set<String> aspectNames)
|
||||
throws RemoteInvocationException, URISyntaxException {
|
||||
@Nullable final Set<String> aspectNames) {
|
||||
final Set<String> projectedAspects =
|
||||
aspectNames == null ? opContext.getEntityAspectNames(entityName) : aspectNames;
|
||||
return entityService.getEntitiesVersionedV2(opContext, versionedUrns, projectedAspects);
|
||||
|
||||
Map<Urn, EntityResponse> responseMap = new HashMap<>();
|
||||
|
||||
Iterators.partition(versionedUrns.iterator(), Math.max(1, batchGetV2Size))
|
||||
.forEachRemaining(
|
||||
batch -> {
|
||||
try {
|
||||
responseMap.putAll(
|
||||
entityService.getEntitiesVersionedV2(
|
||||
opContext, new HashSet<>(batch), projectedAspects));
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
return responseMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -42,7 +42,8 @@ public class SystemJavaEntityClient extends JavaEntityClient implements SystemEn
|
||||
TimeseriesAspectService timeseriesAspectService,
|
||||
RollbackService rollbackService,
|
||||
EventProducer eventProducer,
|
||||
EntityClientCacheConfig cacheConfig) {
|
||||
EntityClientCacheConfig cacheConfig,
|
||||
int batchGetV2Size) {
|
||||
super(
|
||||
entityService,
|
||||
deleteEntityService,
|
||||
@ -52,7 +53,8 @@ public class SystemJavaEntityClient extends JavaEntityClient implements SystemEn
|
||||
lineageSearchService,
|
||||
timeseriesAspectService,
|
||||
rollbackService,
|
||||
eventProducer);
|
||||
eventProducer,
|
||||
batchGetV2Size);
|
||||
this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build();
|
||||
this.entityClientCache = buildEntityClientCache(SystemJavaEntityClient.class, cacheConfig);
|
||||
}
|
||||
|
||||
@ -666,14 +666,8 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
log.info("Ingesting aspects batch to database: {}", aspectsBatch.toAbbreviatedString(2048));
|
||||
Timer.Context ingestToLocalDBTimer =
|
||||
MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time();
|
||||
List<UpdateAspectResult> ingestResults =
|
||||
ingestAspectsToLocalDB(opContext, aspectsBatch, overwrite);
|
||||
long took = ingestToLocalDBTimer.stop();
|
||||
log.info(
|
||||
"Ingestion of aspects batch to database took {} ms", TimeUnit.NANOSECONDS.toMillis(took));
|
||||
|
||||
List<UpdateAspectResult> mclResults = emitMCL(opContext, ingestResults, emitMCL);
|
||||
return mclResults;
|
||||
@ -778,7 +772,17 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
throw new ValidationException(exceptions.toString());
|
||||
}
|
||||
|
||||
// No changes, return
|
||||
if (changeMCPs.isEmpty()) {
|
||||
return Collections.<UpdateAspectResult>emptyList();
|
||||
}
|
||||
|
||||
// Database Upsert results
|
||||
log.info(
|
||||
"Ingesting aspects batch to database: {}",
|
||||
AspectsBatch.toAbbreviatedString(changeMCPs, 2048));
|
||||
Timer.Context ingestToLocalDBTimer =
|
||||
MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time();
|
||||
List<UpdateAspectResult> upsertResults =
|
||||
changeMCPs.stream()
|
||||
.map(
|
||||
@ -827,6 +831,10 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
|
||||
if (tx != null) {
|
||||
tx.commitAndContinue();
|
||||
}
|
||||
long took = ingestToLocalDBTimer.stop();
|
||||
log.info(
|
||||
"Ingestion of aspects batch to database took {} ms",
|
||||
TimeUnit.NANOSECONDS.toMillis(took));
|
||||
|
||||
// Retention optimization and tx
|
||||
if (retentionService != null) {
|
||||
|
||||
@ -70,7 +70,8 @@ public class JavaEntityClientTest {
|
||||
_lineageSearchService,
|
||||
_timeseriesAspectService,
|
||||
rollbackService,
|
||||
_eventProducer);
|
||||
_eventProducer,
|
||||
1);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -315,6 +315,7 @@ public class SampleDataFixtureConfiguration {
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null);
|
||||
null,
|
||||
1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -250,6 +250,7 @@ public class SearchLineageFixtureConfiguration {
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null);
|
||||
null,
|
||||
1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,7 +46,8 @@ public class MceConsumerApplicationTestConfiguration {
|
||||
restClient,
|
||||
new ExponentialBackoff(1),
|
||||
1,
|
||||
configurationProvider.getCache().getClient().getEntityClient());
|
||||
configurationProvider.getCache().getClient().getEntityClient(),
|
||||
1);
|
||||
}
|
||||
|
||||
@MockBean public Database ebeanServer;
|
||||
|
||||
@ -380,6 +380,12 @@ views:
|
||||
entityClient:
|
||||
retryInterval: ${ENTITY_CLIENT_RETRY_INTERVAL:2}
|
||||
numRetries: ${ENTITY_CLIENT_NUM_RETRIES:3}
|
||||
java:
|
||||
get:
|
||||
batchSize: ${ENTITY_CLIENT_JAVA_GET_BATCH_SIZE:375} # matches EbeanAspectDao batch size
|
||||
restli:
|
||||
get:
|
||||
batchSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE:100} # limited to prevent exceeding restli URI size limit
|
||||
|
||||
usageClient:
|
||||
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
|
||||
|
||||
@ -16,6 +16,7 @@ import com.linkedin.metadata.service.RollbackService;
|
||||
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
|
||||
import javax.inject.Singleton;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@ -37,7 +38,8 @@ public class JavaEntityClientFactory {
|
||||
final @Qualifier("timeseriesAspectService") TimeseriesAspectService _timeseriesAspectService,
|
||||
final @Qualifier("relationshipSearchService") LineageSearchService _lineageSearchService,
|
||||
final @Qualifier("kafkaEventProducer") EventProducer _eventProducer,
|
||||
final RollbackService rollbackService) {
|
||||
final RollbackService rollbackService,
|
||||
final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) {
|
||||
return new JavaEntityClient(
|
||||
_entityService,
|
||||
_deleteEntityService,
|
||||
@ -47,7 +49,8 @@ public class JavaEntityClientFactory {
|
||||
_lineageSearchService,
|
||||
_timeseriesAspectService,
|
||||
rollbackService,
|
||||
_eventProducer);
|
||||
_eventProducer,
|
||||
batchGetV2Size);
|
||||
}
|
||||
|
||||
@Bean("systemEntityClient")
|
||||
@ -63,7 +66,8 @@ public class JavaEntityClientFactory {
|
||||
final @Qualifier("relationshipSearchService") LineageSearchService _lineageSearchService,
|
||||
final @Qualifier("kafkaEventProducer") EventProducer _eventProducer,
|
||||
final RollbackService rollbackService,
|
||||
final EntityClientCacheConfig entityClientCacheConfig) {
|
||||
final EntityClientCacheConfig entityClientCacheConfig,
|
||||
final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) {
|
||||
return new SystemJavaEntityClient(
|
||||
_entityService,
|
||||
_deleteEntityService,
|
||||
@ -74,6 +78,7 @@ public class JavaEntityClientFactory {
|
||||
_timeseriesAspectService,
|
||||
rollbackService,
|
||||
_eventProducer,
|
||||
entityClientCacheConfig);
|
||||
entityClientCacheConfig,
|
||||
batchGetV2Size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,7 +29,8 @@ public class RestliEntityClientFactory {
|
||||
@Value("${datahub.gms.uri}") String gmsUri,
|
||||
@Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol,
|
||||
@Value("${entityClient.retryInterval:2}") int retryInterval,
|
||||
@Value("${entityClient.numRetries:3}") int numRetries) {
|
||||
@Value("${entityClient.numRetries:3}") int numRetries,
|
||||
final @Value("${entityClient.restli.get.batchSize:150}") int batchGetV2Size) {
|
||||
final Client restClient;
|
||||
if (gmsUri != null) {
|
||||
restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(gmsUri), gmsSslProtocol);
|
||||
@ -37,7 +38,8 @@ public class RestliEntityClientFactory {
|
||||
restClient =
|
||||
DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
|
||||
}
|
||||
return new RestliEntityClient(restClient, new ExponentialBackoff(retryInterval), numRetries);
|
||||
return new RestliEntityClient(
|
||||
restClient, new ExponentialBackoff(retryInterval), numRetries, batchGetV2Size);
|
||||
}
|
||||
|
||||
@Bean("systemEntityClient")
|
||||
@ -50,7 +52,8 @@ public class RestliEntityClientFactory {
|
||||
@Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol,
|
||||
@Value("${entityClient.retryInterval:2}") int retryInterval,
|
||||
@Value("${entityClient.numRetries:3}") int numRetries,
|
||||
final EntityClientCacheConfig entityClientCacheConfig) {
|
||||
final EntityClientCacheConfig entityClientCacheConfig,
|
||||
final @Value("${entityClient.restli.get.batchSize:150}") int batchGetV2Size) {
|
||||
|
||||
final Client restClient;
|
||||
if (gmsUri != null) {
|
||||
@ -60,6 +63,10 @@ public class RestliEntityClientFactory {
|
||||
DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
|
||||
}
|
||||
return new SystemRestliEntityClient(
|
||||
restClient, new ExponentialBackoff(retryInterval), numRetries, entityClientCacheConfig);
|
||||
restClient,
|
||||
new ExponentialBackoff(retryInterval),
|
||||
numRetries,
|
||||
entityClientCacheConfig,
|
||||
batchGetV2Size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package com.linkedin.entity.client;
|
||||
import com.datahub.plugins.auth.authorization.Authorizer;
|
||||
import com.datahub.util.RecordUtils;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.linkedin.common.VersionedUrn;
|
||||
import com.linkedin.common.client.BaseClient;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
@ -108,11 +109,15 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
|
||||
new PlatformRequestBuilders();
|
||||
private static final RunsRequestBuilders RUNS_REQUEST_BUILDERS = new RunsRequestBuilders();
|
||||
|
||||
private final int batchGetV2Size;
|
||||
|
||||
public RestliEntityClient(
|
||||
@Nonnull final Client restliClient,
|
||||
@Nonnull final BackoffPolicy backoffPolicy,
|
||||
int retryCount) {
|
||||
int retryCount,
|
||||
int batchGetV2Size) {
|
||||
super(restliClient, backoffPolicy, retryCount);
|
||||
this.batchGetV2Size = Math.max(1, batchGetV2Size);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -195,10 +200,10 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
|
||||
/**
|
||||
* Batch get a set of aspects for multiple entities.
|
||||
*
|
||||
* @param opContext operation's context
|
||||
* @param entityName the entity type to fetch
|
||||
* @param urns the urns of the entities to batch get
|
||||
* @param aspectNames the aspect names to batch get
|
||||
* @param authentication the authentication to include in the request to the Metadata Service
|
||||
* @throws RemoteInvocationException when unable to execute request
|
||||
*/
|
||||
@Override
|
||||
@ -210,29 +215,43 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
|
||||
@Nullable final Set<String> aspectNames)
|
||||
throws RemoteInvocationException, URISyntaxException {
|
||||
|
||||
final EntitiesV2BatchGetRequestBuilder requestBuilder =
|
||||
ENTITIES_V2_REQUEST_BUILDERS
|
||||
.batchGet()
|
||||
.aspectsParam(aspectNames)
|
||||
.ids(urns.stream().map(Urn::toString).collect(Collectors.toList()));
|
||||
Map<Urn, EntityResponse> responseMap = new HashMap<>();
|
||||
|
||||
return sendClientRequest(requestBuilder, opContext.getSessionAuthentication())
|
||||
.getEntity()
|
||||
.getResults()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
entry -> {
|
||||
try {
|
||||
return Urn.createFromString(entry.getKey());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"Failed to bind urn string with value %s into urn", entry.getKey()));
|
||||
}
|
||||
},
|
||||
entry -> entry.getValue().getEntity()));
|
||||
Iterators.partition(urns.iterator(), batchGetV2Size)
|
||||
.forEachRemaining(
|
||||
batch -> {
|
||||
try {
|
||||
final EntitiesV2BatchGetRequestBuilder requestBuilder =
|
||||
ENTITIES_V2_REQUEST_BUILDERS
|
||||
.batchGet()
|
||||
.aspectsParam(aspectNames)
|
||||
.ids(batch.stream().map(Urn::toString).collect(Collectors.toList()));
|
||||
|
||||
responseMap.putAll(
|
||||
sendClientRequest(requestBuilder, opContext.getSessionAuthentication())
|
||||
.getEntity()
|
||||
.getResults()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
entry -> {
|
||||
try {
|
||||
return Urn.createFromString(entry.getKey());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"Failed to bind urn string with value %s into urn",
|
||||
entry.getKey()));
|
||||
}
|
||||
},
|
||||
entry -> entry.getValue().getEntity())));
|
||||
} catch (RemoteInvocationException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
return responseMap;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -250,31 +269,44 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
|
||||
@Nonnull OperationContext opContext,
|
||||
@Nonnull String entityName,
|
||||
@Nonnull final Set<VersionedUrn> versionedUrns,
|
||||
@Nullable final Set<String> aspectNames)
|
||||
throws RemoteInvocationException, URISyntaxException {
|
||||
@Nullable final Set<String> aspectNames) {
|
||||
|
||||
final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder =
|
||||
ENTITIES_VERSIONED_V2_REQUEST_BUILDERS
|
||||
.batchGet()
|
||||
.aspectsParam(aspectNames)
|
||||
.entityTypeParam(entityName)
|
||||
.ids(
|
||||
versionedUrns.stream()
|
||||
.map(
|
||||
versionedUrn ->
|
||||
com.linkedin.common.urn.VersionedUrn.of(
|
||||
versionedUrn.getUrn().toString(), versionedUrn.getVersionStamp()))
|
||||
.collect(Collectors.toSet()));
|
||||
Map<Urn, EntityResponse> responseMap = new HashMap<>();
|
||||
|
||||
return sendClientRequest(requestBuilder, opContext.getSessionAuthentication())
|
||||
.getEntity()
|
||||
.getResults()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
entry -> UrnUtils.getUrn(entry.getKey().getUrn()),
|
||||
entry -> entry.getValue().getEntity()));
|
||||
Iterators.partition(versionedUrns.iterator(), batchGetV2Size)
|
||||
.forEachRemaining(
|
||||
batch -> {
|
||||
final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder =
|
||||
ENTITIES_VERSIONED_V2_REQUEST_BUILDERS
|
||||
.batchGet()
|
||||
.aspectsParam(aspectNames)
|
||||
.entityTypeParam(entityName)
|
||||
.ids(
|
||||
batch.stream()
|
||||
.map(
|
||||
versionedUrn ->
|
||||
com.linkedin.common.urn.VersionedUrn.of(
|
||||
versionedUrn.getUrn().toString(),
|
||||
versionedUrn.getVersionStamp()))
|
||||
.collect(Collectors.toSet()));
|
||||
|
||||
try {
|
||||
responseMap.putAll(
|
||||
sendClientRequest(requestBuilder, opContext.getSessionAuthentication())
|
||||
.getEntity()
|
||||
.getResults()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
entry -> UrnUtils.getUrn(entry.getKey().getUrn()),
|
||||
entry -> entry.getValue().getEntity())));
|
||||
} catch (RemoteInvocationException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
return responseMap;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -26,8 +26,9 @@ public class SystemRestliEntityClient extends RestliEntityClient implements Syst
|
||||
@Nonnull final Client restliClient,
|
||||
@Nonnull final BackoffPolicy backoffPolicy,
|
||||
int retryCount,
|
||||
EntityClientCacheConfig cacheConfig) {
|
||||
super(restliClient, backoffPolicy, retryCount);
|
||||
EntityClientCacheConfig cacheConfig,
|
||||
int batchGetV2Size) {
|
||||
super(restliClient, backoffPolicy, retryCount, batchGetV2Size);
|
||||
this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build();
|
||||
this.entityClientCache = buildEntityClientCache(SystemRestliEntityClient.class, cacheConfig);
|
||||
}
|
||||
|
||||
@ -37,7 +37,7 @@ public class BaseClientTest {
|
||||
when(mockRestliClient.sendRequest(any(ActionRequest.class))).thenReturn(mockFuture);
|
||||
|
||||
RestliEntityClient testClient =
|
||||
new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 0);
|
||||
new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 0, 10);
|
||||
testClient.sendClientRequest(testRequestBuilder, AUTH);
|
||||
// Expected 1 actual try and 0 retries
|
||||
verify(mockRestliClient).sendRequest(any(ActionRequest.class));
|
||||
@ -56,7 +56,7 @@ public class BaseClientTest {
|
||||
.thenReturn(mockFuture);
|
||||
|
||||
RestliEntityClient testClient =
|
||||
new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1);
|
||||
new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10);
|
||||
testClient.sendClientRequest(testRequestBuilder, AUTH);
|
||||
// Expected 1 actual try and 1 retries
|
||||
verify(mockRestliClient, times(2)).sendRequest(any(ActionRequest.class));
|
||||
@ -73,7 +73,7 @@ public class BaseClientTest {
|
||||
.thenThrow(new RuntimeException(new RequiredFieldNotPresentException("value")));
|
||||
|
||||
RestliEntityClient testClient =
|
||||
new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1);
|
||||
new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10);
|
||||
assertThrows(
|
||||
RuntimeException.class, () -> testClient.sendClientRequest(testRequestBuilder, AUTH));
|
||||
}
|
||||
|
||||
@ -45,7 +45,7 @@ public class SystemRestliEntityClientTest {
|
||||
noCacheConfig.setEnabled(true);
|
||||
|
||||
SystemRestliEntityClient noCacheTest =
|
||||
new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig);
|
||||
new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1);
|
||||
|
||||
com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true);
|
||||
com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false);
|
||||
@ -83,7 +83,7 @@ public class SystemRestliEntityClientTest {
|
||||
Map.of(TEST_URN.getEntityType(), Map.of(Constants.STATUS_ASPECT_NAME, 60)));
|
||||
|
||||
SystemRestliEntityClient cacheTest =
|
||||
new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig);
|
||||
new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1);
|
||||
|
||||
mockResponse(mockRestliClient, responseStatusTrue);
|
||||
assertEquals(
|
||||
@ -117,7 +117,7 @@ public class SystemRestliEntityClientTest {
|
||||
noCacheConfig.setEnabled(true);
|
||||
|
||||
SystemRestliEntityClient noCacheTest =
|
||||
new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig);
|
||||
new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1);
|
||||
|
||||
com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true);
|
||||
com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false);
|
||||
@ -155,7 +155,7 @@ public class SystemRestliEntityClientTest {
|
||||
Map.of(TEST_URN.getEntityType(), Map.of(Constants.STATUS_ASPECT_NAME, 60)));
|
||||
|
||||
SystemRestliEntityClient cacheTest =
|
||||
new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig);
|
||||
new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1);
|
||||
|
||||
mockResponse(mockRestliClient, responseStatusTrue);
|
||||
assertEquals(
|
||||
|
||||
@ -36,8 +36,7 @@ public class BaseService {
|
||||
@Nonnull OperationContext opContext,
|
||||
@Nonnull Set<Urn> entityUrns,
|
||||
@Nonnull GlobalTags defaultValue) {
|
||||
|
||||
if (entityUrns.size() <= 0) {
|
||||
if (entityUrns.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@ -75,8 +74,7 @@ public class BaseService {
|
||||
@Nonnull OperationContext opContext,
|
||||
@Nonnull Set<Urn> entityUrns,
|
||||
@Nonnull EditableSchemaMetadata defaultValue) {
|
||||
|
||||
if (entityUrns.size() <= 0) {
|
||||
if (entityUrns.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@ -114,8 +112,7 @@ public class BaseService {
|
||||
@Nonnull OperationContext opContext,
|
||||
@Nonnull Set<Urn> entityUrns,
|
||||
@Nonnull Ownership defaultValue) {
|
||||
|
||||
if (entityUrns.size() <= 0) {
|
||||
if (entityUrns.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@ -153,8 +150,7 @@ public class BaseService {
|
||||
@Nonnull OperationContext opContext,
|
||||
@Nonnull Set<Urn> entityUrns,
|
||||
@Nonnull GlossaryTerms defaultValue) {
|
||||
|
||||
if (entityUrns.size() <= 0) {
|
||||
if (entityUrns.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@ -192,8 +188,7 @@ public class BaseService {
|
||||
@Nonnull OperationContext opContext,
|
||||
@Nonnull Set<Urn> entityUrns,
|
||||
@Nonnull Domains defaultValue) {
|
||||
|
||||
if (entityUrns.size() <= 0) {
|
||||
if (entityUrns.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user