feat(entity-client): batch entity-client ingestProposals (#11787)

This commit is contained in:
david-leifker 2024-11-05 09:42:21 -06:00 committed by GitHub
parent 26529f2b05
commit 5c5812804b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 407 additions and 202 deletions

View File

@ -13,6 +13,7 @@ import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.models.registry.EmptyEntityRegistry;
@ -213,11 +214,13 @@ public class AuthModule extends AbstractModule {
return new SystemRestliEntityClient(
buildRestliClient(),
new ExponentialBackoff(configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)),
configs.getInt(ENTITY_CLIENT_NUM_RETRIES),
configurationProvider.getCache().getClient().getEntityClient(),
Math.max(1, configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)),
Math.max(1, configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY)));
EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)))
.retryCount(configs.getInt(ENTITY_CLIENT_NUM_RETRIES))
.batchGetV2Size(configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE))
.batchGetV2Concurrency(2)
.build(),
configurationProvider.getCache().getClient().getEntityClient());
}
@Provides

View File

@ -464,5 +464,7 @@ public class Constants {
public static final String MDC_ENTITY_TYPE = "entityType";
public static final String MDC_CHANGE_TYPE = "changeType";
public static final String RESTLI_SUCCESS = "success";
private Constants() {}
}

View File

@ -4,6 +4,7 @@ import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar;
import com.linkedin.metadata.kafka.hook.UpdateIndicesHook;
@ -36,6 +37,12 @@ public class MCLMAESpringTest extends AbstractTestNGSpringContextTests {
@Autowired private UpdateIndicesService updateIndicesService;
static {
PathSpecBasedSchemaAnnotationVisitor.class
.getClassLoader()
.setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false);
}
@Test
public void testHooks() {
MCLKafkaListenerRegistrar registrar =

View File

@ -5,6 +5,7 @@ import static org.mockito.Mockito.when;
import com.datahub.authentication.Authentication;
import com.datahub.metadata.ingestion.IngestionScheduler;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration;
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
@ -58,6 +59,11 @@ public class MCLSpringCommonTestConfiguration {
@MockBean public IngestionScheduler ingestionScheduler;
@Bean
public EntityClientConfig entityClientConfig() {
return EntityClientConfig.builder().build();
}
@MockBean(name = "systemEntityClient")
public SystemEntityClient systemEntityClient;

View File

@ -1,5 +1,6 @@
package com.linkedin.metadata.kafka;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
@ -39,16 +40,25 @@ public class MceConsumerApplicationTestConfiguration {
@Bean
@Primary
public SystemEntityClient systemEntityClient(
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider) {
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider,
final EntityClientConfig entityClientConfig) {
String selfUri = restTemplate.getRootUri();
final Client restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(selfUri), null);
return new SystemRestliEntityClient(
restClient,
new ExponentialBackoff(1),
1,
configurationProvider.getCache().getClient().getEntityClient(),
1,
2);
entityClientConfig,
configurationProvider.getCache().getClient().getEntityClient());
}
@Bean
@Primary
public EntityClientConfig entityClientConfig() {
return EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(1))
.retryCount(1)
.batchGetV2Size(1)
.batchGetV2Concurrency(2)
.build();
}
@MockBean public Database ebeanServer;

View File

@ -457,10 +457,19 @@ entityClient:
java:
get:
batchSize: ${ENTITY_CLIENT_JAVA_GET_BATCH_SIZE:375} # matches EbeanAspectDao batch size
ingest:
batchSize: ${ENTITY_CLIENT_JAVA_INGEST_BATCH_SIZE:375}
restli:
get:
batchSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE:100} # limited to prevent exceeding restli URI size limit
batchConcurrency: ${ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY:2} # parallel threads
batchQueueSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_QUEUE_SIZE:500}
batchThreadKeepAlive: ${ENTITY_CLIENT_RESTLI_GET_BATCH_THREAD_KEEP_ALIVE:60}
ingest:
batchSize: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_SIZE:50} # limited to prevent exceeding restli timeouts
batchConcurrency: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_CONCURRENCY:2} # parallel threads
batchQueueSize: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_QUEUE_SIZE:500}
batchThreadKeepAlive: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_THREAD_KEEP_ALIVE:60}
usageClient:
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}

View File

@ -1,8 +1,11 @@
package com.linkedin.gms.factory.entityclient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -14,4 +17,30 @@ public class EntityClientConfigFactory {
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider) {
return configurationProvider.getCache().getClient().getEntityClient();
}
@Bean
public EntityClientConfig entityClientConfig(
final @Value("${entityClient.retryInterval:2}") int retryInterval,
final @Value("${entityClient.numRetries:3}") int numRetries,
final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency,
final @Value("${entityClient.restli.get.batchQueueSize}") int batchGetV2QueueSize,
final @Value("${entityClient.restli.get.batchThreadKeepAlive}") int batchGetV2KeepAlive,
final @Value("${entityClient.restli.ingest.batchSize}") int batchIngestSize,
final @Value("${entityClient.restli.ingest.batchConcurrency}") int batchIngestConcurrency,
final @Value("${entityClient.restli.ingest.batchQueueSize}") int batchIngestQueueSize,
final @Value("${entityClient.restli.ingest.batchThreadKeepAlive}") int batchIngestKeepAlive) {
return EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(retryInterval))
.retryCount(numRetries)
.batchGetV2Size(batchGetV2Size)
.batchGetV2Concurrency(batchGetV2Concurrency)
.batchGetV2QueueSize(batchGetV2QueueSize)
.batchGetV2KeepAlive(batchGetV2KeepAlive)
.batchIngestSize(batchIngestSize)
.batchIngestConcurrency(batchIngestConcurrency)
.batchIngestQueueSize(batchIngestQueueSize)
.batchIngestKeepAlive(batchIngestKeepAlive)
.build();
}
}

View File

@ -1,6 +1,7 @@
package com.linkedin.gms.factory.entityclient;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.client.SystemJavaEntityClient;
@ -16,7 +17,6 @@ import com.linkedin.metadata.service.RollbackService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import javax.inject.Singleton;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -39,7 +39,7 @@ public class JavaEntityClientFactory {
final @Qualifier("relationshipSearchService") LineageSearchService _lineageSearchService,
final @Qualifier("kafkaEventProducer") EventProducer _eventProducer,
final RollbackService rollbackService,
final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) {
final EntityClientConfig entityClientConfig) {
return new JavaEntityClient(
_entityService,
_deleteEntityService,
@ -50,7 +50,7 @@ public class JavaEntityClientFactory {
_timeseriesAspectService,
rollbackService,
_eventProducer,
batchGetV2Size);
entityClientConfig.getBatchGetV2Size());
}
@Bean("systemEntityClient")
@ -67,7 +67,7 @@ public class JavaEntityClientFactory {
final @Qualifier("kafkaEventProducer") EventProducer _eventProducer,
final RollbackService rollbackService,
final EntityClientCacheConfig entityClientCacheConfig,
final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) {
final EntityClientConfig entityClientConfig) {
return new SystemJavaEntityClient(
_entityService,
_deleteEntityService,
@ -79,6 +79,6 @@ public class JavaEntityClientFactory {
rollbackService,
_eventProducer,
entityClientCacheConfig,
batchGetV2Size);
entityClientConfig.getBatchGetV2Size());
}
}

View File

@ -1,12 +1,12 @@
package com.linkedin.gms.factory.entityclient;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.restli.client.Client;
import java.net.URI;
import javax.inject.Singleton;
@ -28,10 +28,7 @@ public class RestliEntityClientFactory {
@Value("${datahub.gms.useSSL}") boolean gmsUseSSL,
@Value("${datahub.gms.uri}") String gmsUri,
@Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol,
@Value("${entityClient.retryInterval:2}") int retryInterval,
@Value("${entityClient.numRetries:3}") int numRetries,
final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) {
final EntityClientConfig entityClientConfig) {
final Client restClient;
if (gmsUri != null) {
restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(gmsUri), gmsSslProtocol);
@ -39,12 +36,7 @@ public class RestliEntityClientFactory {
restClient =
DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
}
return new RestliEntityClient(
restClient,
new ExponentialBackoff(retryInterval),
numRetries,
batchGetV2Size,
batchGetV2Concurrency);
return new RestliEntityClient(restClient, entityClientConfig);
}
@Bean("systemEntityClient")
@ -55,11 +47,8 @@ public class RestliEntityClientFactory {
@Value("${datahub.gms.useSSL}") boolean gmsUseSSL,
@Value("${datahub.gms.uri}") String gmsUri,
@Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol,
@Value("${entityClient.retryInterval:2}") int retryInterval,
@Value("${entityClient.numRetries:3}") int numRetries,
final EntityClientCacheConfig entityClientCacheConfig,
final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) {
final EntityClientConfig entityClientConfig) {
final Client restClient;
if (gmsUri != null) {
@ -68,12 +57,6 @@ public class RestliEntityClientFactory {
restClient =
DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
}
return new SystemRestliEntityClient(
restClient,
new ExponentialBackoff(retryInterval),
numRetries,
entityClientCacheConfig,
batchGetV2Size,
batchGetV2Concurrency);
return new SystemRestliEntityClient(restClient, entityClientConfig, entityClientCacheConfig);
}
}

View File

@ -2,8 +2,8 @@ package com.linkedin.common.client;
import com.datahub.authentication.Authentication;
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.restli.client.AbstractRequestBuilder;
import com.linkedin.restli.client.Client;
@ -19,17 +19,15 @@ import org.apache.http.HttpHeaders;
@Slf4j
public abstract class BaseClient implements AutoCloseable {
protected final Client _client;
protected final BackoffPolicy _backoffPolicy;
protected final int _retryCount;
protected final Client client;
protected final EntityClientConfig entityClientConfig;
protected static final Set<String> NON_RETRYABLE =
Set.of("com.linkedin.data.template.RequiredFieldNotPresentException");
protected BaseClient(@Nonnull Client restliClient, BackoffPolicy backoffPolicy, int retryCount) {
_client = Objects.requireNonNull(restliClient);
_backoffPolicy = backoffPolicy;
_retryCount = retryCount;
protected BaseClient(@Nonnull Client restliClient, EntityClientConfig entityClientConfig) {
client = Objects.requireNonNull(restliClient);
this.entityClientConfig = entityClientConfig;
}
protected <T> Response<T> sendClientRequest(
@ -52,9 +50,9 @@ public abstract class BaseClient implements AutoCloseable {
int attemptCount = 0;
while (attemptCount < _retryCount + 1) {
while (attemptCount < entityClientConfig.getRetryCount() + 1) {
try {
return _client.sendRequest(requestBuilder.build()).getResponse();
return client.sendRequest(requestBuilder.build()).getResponse();
} catch (Throwable ex) {
MetricUtils.counter(
BaseClient.class,
@ -66,12 +64,13 @@ public abstract class BaseClient implements AutoCloseable {
|| (ex.getCause() != null
&& NON_RETRYABLE.contains(ex.getCause().getClass().getCanonicalName()));
if (attemptCount == _retryCount || skipRetry) {
if (attemptCount == entityClientConfig.getRetryCount() || skipRetry) {
throw ex;
} else {
attemptCount = attemptCount + 1;
try {
Thread.sleep(_backoffPolicy.nextBackoff(attemptCount, ex) * 1000);
Thread.sleep(
entityClientConfig.getBackoffPolicy().nextBackoff(attemptCount, ex) * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -84,6 +83,6 @@ public abstract class BaseClient implements AutoCloseable {
@Override
public void close() {
_client.shutdown(new FutureCallback<>());
client.shutdown(new FutureCallback<>());
}
}

View File

@ -0,0 +1,46 @@
package com.linkedin.entity.client;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.Value;
@Builder(toBuilder = true)
@Value
public class EntityClientConfig {
@Nonnull @Builder.Default BackoffPolicy backoffPolicy = new ExponentialBackoff(2);
@Builder.Default int retryCount = 3;
@Builder.Default int batchGetV2Size = 5;
@Builder.Default int batchGetV2Concurrency = 1;
@Builder.Default int batchGetV2QueueSize = 100;
@Builder.Default int batchGetV2KeepAlive = 60;
@Builder.Default int batchIngestSize = 5;
@Builder.Default int batchIngestConcurrency = 1;
@Builder.Default int batchIngestQueueSize = 100;
@Builder.Default int batchIngestKeepAlive = 60;
public int getBatchGetV2Size() {
return Math.max(1, batchGetV2Size);
}
public int getBatchGetV2Concurrency() {
return Math.max(1, batchGetV2Concurrency);
}
public int getBatchIngestSize() {
return Math.max(1, batchIngestSize);
}
public int getBatchIngestConcurrency() {
return Math.max(1, batchIngestConcurrency);
}
public int getBatchIngestQueueSize() {
return Math.max(1, batchIngestQueueSize);
}
public int getBatchGetV2QueueSize() {
return Math.max(1, batchGetV2QueueSize);
}
}

View File

@ -1,5 +1,6 @@
package com.linkedin.entity.client;
import static com.linkedin.metadata.Constants.RESTLI_SUCCESS;
import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion;
import com.datahub.plugins.auth.authorization.Authorizer;
@ -74,7 +75,6 @@ import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.MetadataChangeProposalArray;
import com.linkedin.mxe.PlatformEvent;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.platform.PlatformDoProducePlatformEventRequestBuilder;
import com.linkedin.platform.PlatformRequestBuilders;
import com.linkedin.r2.RemoteInvocationException;
@ -83,17 +83,21 @@ import com.linkedin.restli.client.RestLiResponseException;
import com.linkedin.restli.common.HttpStatus;
import io.datahubproject.metadata.context.OperationContext;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -120,18 +124,32 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
new PlatformRequestBuilders();
private static final RunsRequestBuilders RUNS_REQUEST_BUILDERS = new RunsRequestBuilders();
private final int batchGetV2Size;
private final int batchGetV2Concurrency;
private final ExecutorService batchGetV2Pool;
private final ExecutorService batchIngestPool;
public RestliEntityClient(
@Nonnull final Client restliClient,
@Nonnull final BackoffPolicy backoffPolicy,
int retryCount,
int batchGetV2Size,
int batchGetV2Concurrency) {
super(restliClient, backoffPolicy, retryCount);
this.batchGetV2Size = Math.max(1, batchGetV2Size);
this.batchGetV2Concurrency = batchGetV2Concurrency;
@Nonnull final Client restliClient, EntityClientConfig entityClientConfig) {
super(restliClient, entityClientConfig);
this.batchGetV2Pool =
new ThreadPoolExecutor(
entityClientConfig.getBatchGetV2Concurrency(), // core threads
entityClientConfig.getBatchGetV2Concurrency(), // max threads
entityClientConfig.getBatchGetV2KeepAlive(),
TimeUnit.SECONDS, // thread keep-alive time
new ArrayBlockingQueue<>(
entityClientConfig.getBatchGetV2QueueSize()), // fixed size queue
new ThreadPoolExecutor.AbortPolicy() // optional - this is the default
);
this.batchIngestPool =
new ThreadPoolExecutor(
entityClientConfig.getBatchIngestConcurrency(), // core threads
entityClientConfig.getBatchIngestConcurrency(), // max threads
entityClientConfig.getBatchIngestKeepAlive(),
TimeUnit.SECONDS, // thread keep-alive time
new ArrayBlockingQueue<>(
entityClientConfig.getBatchIngestQueueSize()), // fixed size queue
new ThreadPoolExecutor.AbortPolicy() // optional - this is the default
);
}
@Override
@ -229,54 +247,50 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
throws RemoteInvocationException, URISyntaxException {
Map<Urn, EntityResponse> responseMap = new HashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(Math.max(1, batchGetV2Concurrency));
try {
Iterable<List<Urn>> iterable = () -> Iterators.partition(urns.iterator(), batchGetV2Size);
List<Future<Map<Urn, EntityResponse>>> futures =
StreamSupport.stream(iterable.spliterator(), false)
.map(
batch ->
executor.submit(
() -> {
try {
log.debug("Executing batchGetV2 with batch size: {}", batch.size());
final EntitiesV2BatchGetRequestBuilder requestBuilder =
ENTITIES_V2_REQUEST_BUILDERS
.batchGet()
.aspectsParam(aspectNames)
.ids(
batch.stream()
.map(Urn::toString)
.collect(Collectors.toList()));
Iterable<List<Urn>> iterable =
() -> Iterators.partition(urns.iterator(), entityClientConfig.getBatchGetV2Size());
List<Future<Map<Urn, EntityResponse>>> futures =
StreamSupport.stream(iterable.spliterator(), false)
.map(
batch ->
batchGetV2Pool.submit(
() -> {
try {
log.debug("Executing batchGetV2 with batch size: {}", batch.size());
final EntitiesV2BatchGetRequestBuilder requestBuilder =
ENTITIES_V2_REQUEST_BUILDERS
.batchGet()
.aspectsParam(aspectNames)
.ids(
batch.stream()
.map(Urn::toString)
.collect(Collectors.toList()));
return sendClientRequest(
requestBuilder, opContext.getSessionAuthentication())
.getEntity()
.getResults()
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> UrnUtils.getUrn(entry.getKey()),
entry -> entry.getValue().getEntity()));
} catch (RemoteInvocationException e) {
throw new RuntimeException(e);
}
}))
.collect(Collectors.toList());
return sendClientRequest(
requestBuilder, opContext.getSessionAuthentication())
.getEntity()
.getResults()
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> UrnUtils.getUrn(entry.getKey()),
entry -> entry.getValue().getEntity()));
} catch (RemoteInvocationException e) {
throw new RuntimeException(e);
}
}))
.collect(Collectors.toList());
futures.forEach(
result -> {
try {
responseMap.putAll(result.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
} finally {
executor.shutdown();
}
futures.forEach(
result -> {
try {
responseMap.putAll(result.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
return responseMap;
}
@ -298,62 +312,56 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
@Nullable final Set<String> aspectNames) {
Map<Urn, EntityResponse> responseMap = new HashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(Math.max(1, batchGetV2Concurrency));
try {
Iterable<List<VersionedUrn>> iterable =
() -> Iterators.partition(versionedUrns.iterator(), batchGetV2Size);
List<Future<Map<Urn, EntityResponse>>> futures =
StreamSupport.stream(iterable.spliterator(), false)
.map(
batch ->
executor.submit(
() -> {
try {
log.debug(
"Executing batchGetVersionedV2 with batch size: {}",
batch.size());
final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder =
ENTITIES_VERSIONED_V2_REQUEST_BUILDERS
.batchGet()
.aspectsParam(aspectNames)
.entityTypeParam(entityName)
.ids(
batch.stream()
.map(
versionedUrn ->
com.linkedin.common.urn.VersionedUrn.of(
versionedUrn.getUrn().toString(),
versionedUrn.getVersionStamp()))
.collect(Collectors.toSet()));
Iterable<List<VersionedUrn>> iterable =
() -> Iterators.partition(versionedUrns.iterator(), entityClientConfig.getBatchGetV2Size());
List<Future<Map<Urn, EntityResponse>>> futures =
StreamSupport.stream(iterable.spliterator(), false)
.map(
batch ->
batchGetV2Pool.submit(
() -> {
try {
log.debug(
"Executing batchGetVersionedV2 with batch size: {}", batch.size());
final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder =
ENTITIES_VERSIONED_V2_REQUEST_BUILDERS
.batchGet()
.aspectsParam(aspectNames)
.entityTypeParam(entityName)
.ids(
batch.stream()
.map(
versionedUrn ->
com.linkedin.common.urn.VersionedUrn.of(
versionedUrn.getUrn().toString(),
versionedUrn.getVersionStamp()))
.collect(Collectors.toSet()));
return sendClientRequest(
requestBuilder, opContext.getSessionAuthentication())
.getEntity()
.getResults()
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> UrnUtils.getUrn(entry.getKey().getUrn()),
entry -> entry.getValue().getEntity()));
} catch (RemoteInvocationException e) {
throw new RuntimeException(e);
}
}))
.collect(Collectors.toList());
return sendClientRequest(
requestBuilder, opContext.getSessionAuthentication())
.getEntity()
.getResults()
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> UrnUtils.getUrn(entry.getKey().getUrn()),
entry -> entry.getValue().getEntity()));
} catch (RemoteInvocationException e) {
throw new RuntimeException(e);
}
}))
.collect(Collectors.toList());
futures.forEach(
result -> {
try {
responseMap.putAll(result.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
} finally {
executor.shutdown();
}
futures.forEach(
result -> {
try {
responseMap.putAll(result.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
return responseMap;
}
@ -1075,29 +1083,68 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
@Nonnull Collection<MetadataChangeProposal> metadataChangeProposals,
boolean async)
throws RemoteInvocationException {
final AspectsDoIngestProposalBatchRequestBuilder requestBuilder =
ASPECTS_REQUEST_BUILDERS
.actionIngestProposalBatch()
.proposalsParam(new MetadataChangeProposalArray(metadataChangeProposals))
.asyncParam(String.valueOf(async));
String result =
sendClientRequest(requestBuilder, opContext.getSessionAuthentication()).getEntity();
return metadataChangeProposals.stream()
.map(
proposal -> {
if ("success".equals(result)) {
if (proposal.getEntityUrn() != null) {
return proposal.getEntityUrn().toString();
} else {
EntitySpec entitySpec =
opContext.getEntityRegistry().getEntitySpec(proposal.getEntityType());
return EntityKeyUtils.getUrnFromProposal(proposal, entitySpec.getKeyAspectSpec())
.toString();
}
}
return null;
})
.collect(Collectors.toList());
List<String> response = new ArrayList<>();
Iterable<List<MetadataChangeProposal>> iterable =
() ->
Iterators.partition(
metadataChangeProposals.iterator(), entityClientConfig.getBatchIngestSize());
List<Future<List<String>>> futures =
StreamSupport.stream(iterable.spliterator(), false)
.map(
batch ->
batchIngestPool.submit(
() -> {
try {
log.debug(
"Executing batchIngestProposals with batch size: {}", batch.size());
final AspectsDoIngestProposalBatchRequestBuilder requestBuilder =
ASPECTS_REQUEST_BUILDERS
.actionIngestProposalBatch()
.proposalsParam(
new MetadataChangeProposalArray(metadataChangeProposals))
.asyncParam(String.valueOf(async));
String result =
sendClientRequest(
requestBuilder, opContext.getSessionAuthentication())
.getEntity();
if (RESTLI_SUCCESS.equals(result)) {
return batch.stream()
.map(
mcp -> {
if (mcp.getEntityUrn() != null) {
return mcp.getEntityUrn().toString();
} else {
EntitySpec entitySpec =
opContext
.getEntityRegistry()
.getEntitySpec(mcp.getEntityType());
return EntityKeyUtils.getUrnFromProposal(
mcp, entitySpec.getKeyAspectSpec())
.toString();
}
})
.collect(Collectors.toList());
}
return Collections.<String>emptyList();
} catch (RemoteInvocationException e) {
throw new RuntimeException(e);
}
}))
.collect(Collectors.toList());
futures.forEach(
result -> {
try {
response.addAll(result.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
return response;
}
@Override

View File

@ -5,7 +5,6 @@ import com.google.common.cache.CacheBuilder;
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.EntityResponse;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.restli.client.Client;
import io.datahubproject.metadata.context.OperationContext;
@ -24,12 +23,9 @@ public class SystemRestliEntityClient extends RestliEntityClient implements Syst
public SystemRestliEntityClient(
@Nonnull final Client restliClient,
@Nonnull final BackoffPolicy backoffPolicy,
int retryCount,
EntityClientCacheConfig cacheConfig,
int batchGetV2Size,
int batchGetV2Concurrency) {
super(restliClient, backoffPolicy, retryCount, batchGetV2Size, batchGetV2Concurrency);
@Nonnull EntityClientConfig clientConfig,
EntityClientCacheConfig cacheConfig) {
super(restliClient, clientConfig);
this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build();
this.entityClientCache = buildEntityClientCache(SystemRestliEntityClient.class, cacheConfig);
}

View File

@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.linkedin.common.EntityRelationships;
import com.linkedin.common.WindowDuration;
import com.linkedin.common.client.BaseClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.metadata.config.cache.client.UsageClientCacheConfig;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.r2.RemoteInvocationException;
@ -26,7 +27,9 @@ public class RestliUsageClient extends BaseClient implements UsageClient {
@Nonnull final BackoffPolicy backoffPolicy,
int retryCount,
UsageClientCacheConfig cacheConfig) {
super(restliClient, backoffPolicy, retryCount);
super(
restliClient,
EntityClientConfig.builder().backoffPolicy(backoffPolicy).retryCount(retryCount).build());
this.operationContextMap = Caffeine.newBuilder().maximumSize(500).build();
this.usageClientCache =
UsageClientCache.builder()

View File

@ -13,6 +13,7 @@ import com.datahub.authentication.Authentication;
import com.linkedin.data.template.RequiredFieldNotPresentException;
import com.linkedin.entity.AspectsDoIngestProposalRequestBuilder;
import com.linkedin.entity.AspectsRequestBuilders;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
@ -37,7 +38,14 @@ public class BaseClientTest {
when(mockRestliClient.sendRequest(any(ActionRequest.class))).thenReturn(mockFuture);
RestliEntityClient testClient =
new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 0, 10, 2);
new RestliEntityClient(
mockRestliClient,
EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(1))
.retryCount(0)
.batchGetV2Size(10)
.batchGetV2Concurrency(2)
.build());
testClient.sendClientRequest(testRequestBuilder, AUTH);
// Expected 1 actual try and 0 retries
verify(mockRestliClient).sendRequest(any(ActionRequest.class));
@ -56,7 +64,14 @@ public class BaseClientTest {
.thenReturn(mockFuture);
RestliEntityClient testClient =
new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10, 2);
new RestliEntityClient(
mockRestliClient,
EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(1))
.retryCount(1)
.batchGetV2Size(10)
.batchGetV2Concurrency(2)
.build());
testClient.sendClientRequest(testRequestBuilder, AUTH);
// Expected 1 actual try and 1 retries
verify(mockRestliClient, times(2)).sendRequest(any(ActionRequest.class));
@ -73,7 +88,14 @@ public class BaseClientTest {
.thenThrow(new RuntimeException(new RequiredFieldNotPresentException("value")));
RestliEntityClient testClient =
new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10, 2);
new RestliEntityClient(
mockRestliClient,
EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(1))
.retryCount(1)
.batchGetV2Size(10)
.batchGetV2Concurrency(2)
.build());
assertThrows(
RuntimeException.class, () -> testClient.sendClientRequest(testRequestBuilder, AUTH));
}

View File

@ -47,7 +47,14 @@ public class SystemRestliEntityClientTest {
SystemRestliEntityClient noCacheTest =
new SystemRestliEntityClient(
mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1, 2);
mockRestliClient,
EntityClientConfig.builder()
.backoffPolicy(new ConstantBackoff(0))
.retryCount(0)
.batchGetV2Size(1)
.batchGetV2Concurrency(2)
.build(),
noCacheConfig);
com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true);
com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false);
@ -86,7 +93,14 @@ public class SystemRestliEntityClientTest {
SystemRestliEntityClient cacheTest =
new SystemRestliEntityClient(
mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1, 2);
mockRestliClient,
EntityClientConfig.builder()
.backoffPolicy(new ConstantBackoff(0))
.retryCount(0)
.batchGetV2Size(1)
.batchGetV2Concurrency(2)
.build(),
cacheConfig);
mockResponse(mockRestliClient, responseStatusTrue);
assertEquals(
@ -121,7 +135,14 @@ public class SystemRestliEntityClientTest {
SystemRestliEntityClient noCacheTest =
new SystemRestliEntityClient(
mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1, 2);
mockRestliClient,
EntityClientConfig.builder()
.backoffPolicy(new ConstantBackoff(0))
.retryCount(0)
.batchGetV2Size(1)
.batchGetV2Concurrency(2)
.build(),
noCacheConfig);
com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true);
com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false);
@ -160,7 +181,14 @@ public class SystemRestliEntityClientTest {
SystemRestliEntityClient cacheTest =
new SystemRestliEntityClient(
mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1, 2);
mockRestliClient,
EntityClientConfig.builder()
.backoffPolicy(new ConstantBackoff(0))
.retryCount(0)
.batchGetV2Size(1)
.batchGetV2Concurrency(2)
.build(),
cacheConfig);
mockResponse(mockRestliClient, responseStatusTrue);
assertEquals(
@ -195,7 +223,14 @@ public class SystemRestliEntityClientTest {
SystemRestliEntityClient noCacheTest =
new SystemRestliEntityClient(
mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1, 2);
mockRestliClient,
EntityClientConfig.builder()
.backoffPolicy(new ConstantBackoff(0))
.retryCount(0)
.batchGetV2Size(1)
.batchGetV2Concurrency(2)
.build(),
noCacheConfig);
com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true);
com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false);
@ -238,7 +273,14 @@ public class SystemRestliEntityClientTest {
SystemRestliEntityClient cacheTest =
new SystemRestliEntityClient(
mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1, 2);
mockRestliClient,
EntityClientConfig.builder()
.backoffPolicy(new ConstantBackoff(0))
.retryCount(0)
.batchGetV2Size(1)
.batchGetV2Concurrency(2)
.build(),
cacheConfig);
mockResponse(mockRestliClient, responseStatusTrue);
assertEquals(

View File

@ -4,6 +4,7 @@ import static com.datahub.authorization.AuthUtil.isAPIAuthorized;
import static com.datahub.authorization.AuthUtil.isAPIAuthorizedEntityUrns;
import static com.datahub.authorization.AuthUtil.isAPIAuthorizedUrns;
import static com.datahub.authorization.AuthUtil.isAPIOperationsAuthorized;
import static com.linkedin.metadata.Constants.RESTLI_SUCCESS;
import static com.linkedin.metadata.authorization.ApiGroup.COUNTS;
import static com.linkedin.metadata.authorization.ApiGroup.ENTITY;
import static com.linkedin.metadata.authorization.ApiGroup.TIMESERIES;
@ -330,7 +331,7 @@ public class AspectResource extends CollectionResourceTaskTemplate<String, Versi
}
// TODO: We don't actually use this return value anywhere. Maybe we should just stop returning it altogether?
return "success";
return RESTLI_SUCCESS;
} catch (ValidationException e) {
throw new RestLiServiceException(HttpStatus.S_422_UNPROCESSABLE_ENTITY, e.getMessage());
}