mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-15 04:37:03 +00:00
feat(usageclient): updates for usageclient (#9255)
This commit is contained in:
parent
c348f841c6
commit
b03515fbc7
@ -82,6 +82,7 @@ project.ext.externalDependency = [
|
||||
'commonsLang': 'commons-lang:commons-lang:2.6',
|
||||
'commonsText': 'org.apache.commons:commons-text:1.10.0',
|
||||
'commonsCollections': 'commons-collections:commons-collections:3.2.2',
|
||||
'caffeine': 'com.github.ben-manes.caffeine:caffeine:3.1.8',
|
||||
'datastaxOssNativeProtocol': 'com.datastax.oss:native-protocol:1.5.1',
|
||||
'datastaxOssCore': 'com.datastax.oss:java-driver-core:4.14.1',
|
||||
'datastaxOssQueryBuilder': 'com.datastax.oss:java-driver-query-builder:4.14.1',
|
||||
|
@ -332,7 +332,8 @@ entityClient:
|
||||
|
||||
usageClient:
|
||||
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
|
||||
numRetries: ${USAGE_CLIENT_NUM_RETRIES:3}
|
||||
numRetries: ${USAGE_CLIENT_NUM_RETRIES:0}
|
||||
timeoutMs: ${USAGE_CLIENT_TIMEOUT_MS:3000}
|
||||
|
||||
cache:
|
||||
primary:
|
||||
|
@ -5,6 +5,7 @@ import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
|
||||
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
|
||||
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
|
||||
import com.linkedin.r2.transport.http.client.HttpClientFactory;
|
||||
import com.linkedin.restli.client.Client;
|
||||
import com.linkedin.usage.UsageClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -14,6 +15,9 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
@Configuration
|
||||
@PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class)
|
||||
@ -34,16 +38,22 @@ public class UsageClientFactory {
|
||||
@Value("${usageClient.retryInterval:2}")
|
||||
private int retryInterval;
|
||||
|
||||
@Value("${usageClient.numRetries:3}")
|
||||
@Value("${usageClient.numRetries:0}")
|
||||
private int numRetries;
|
||||
|
||||
@Value("${usageClient.timeoutMs:3000}")
|
||||
private long timeoutMs;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("configurationProvider")
|
||||
private ConfigurationProvider configurationProvider;
|
||||
|
||||
@Bean("usageClient")
|
||||
public UsageClient getUsageClient(@Qualifier("systemAuthentication") final Authentication systemAuthentication) {
|
||||
Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, String.valueOf(timeoutMs));
|
||||
|
||||
Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol, params);
|
||||
return new UsageClient(restClient, new ExponentialBackoff(retryInterval), numRetries, systemAuthentication,
|
||||
configurationProvider.getCache().getClient().getUsageClient());
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ dependencies {
|
||||
api project(':metadata-utils')
|
||||
implementation project(':metadata-service:configuration')
|
||||
|
||||
implementation externalDependency.caffeine
|
||||
implementation externalDependency.slf4jApi
|
||||
compileOnly externalDependency.lombok
|
||||
annotationProcessor externalDependency.lombok
|
||||
|
@ -14,8 +14,8 @@ import lombok.NonNull;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiFunction;
|
||||
@ -63,15 +63,15 @@ public class ClientCache<K, V, C extends ClientCacheConfig> {
|
||||
|
||||
public ClientCache<K, V, C> build(Class<?> metricClazz) {
|
||||
// loads data from entity client
|
||||
CacheLoader<K, V> loader = new CacheLoader<>() {
|
||||
CacheLoader<K, V> loader = new CacheLoader<K, V>() {
|
||||
@Override
|
||||
public V load(@NonNull K key) {
|
||||
return loadAll(List.of(key)).get(key);
|
||||
return loadAll(Set.of(key)).get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public Map<K, V> loadAll(@NonNull Iterable<? extends K> keys) {
|
||||
public Map<K, V> loadAll(@NonNull Set<? extends K> keys) {
|
||||
return loadFunction.apply(keys);
|
||||
}
|
||||
};
|
||||
|
@ -81,16 +81,14 @@ public class EntityClientCache {
|
||||
public EntityClientCache build(Class<?> metricClazz) {
|
||||
// estimate size
|
||||
Weigher<Key, EnvelopedAspect> weighByEstimatedSize = (key, value) ->
|
||||
value.getValue().data().values().parallelStream()
|
||||
.mapToInt(o -> o.toString().getBytes().length)
|
||||
.sum();
|
||||
value.getValue().data().toString().getBytes().length;
|
||||
|
||||
// batch loads data from entity client (restli or java)
|
||||
Function<Iterable<? extends Key>, Map<Key, EnvelopedAspect>> loader = (Iterable<? extends Key> keys) -> {
|
||||
Map<String, Set<Key>> keysByEntity = StreamSupport.stream(keys.spliterator(), true)
|
||||
.collect(Collectors.groupingBy(Key::getEntityName, Collectors.toSet()));
|
||||
|
||||
Map<Key, EnvelopedAspect> results = keysByEntity.entrySet().parallelStream()
|
||||
Map<Key, EnvelopedAspect> results = keysByEntity.entrySet().stream()
|
||||
.flatMap(entry -> {
|
||||
Set<Urn> urns = entry.getValue().stream()
|
||||
.map(Key::getUrn)
|
||||
|
@ -9,6 +9,7 @@ import com.linkedin.metadata.config.cache.client.UsageClientCacheConfig;
|
||||
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
|
||||
import com.linkedin.r2.RemoteInvocationException;
|
||||
import com.linkedin.restli.client.Client;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
@ -51,7 +52,9 @@ public class UsageClient extends BaseClient {
|
||||
private UsageQueryResult getUsageStats(@Nonnull String resource, @Nonnull UsageTimeRange range,
|
||||
@Nonnull Authentication authentication)
|
||||
throws RemoteInvocationException, URISyntaxException {
|
||||
final UsageStatsDoQueryRangeRequestBuilder requestBuilder = USAGE_STATS_REQUEST_BUILDERS.actionQueryRange()
|
||||
|
||||
final UsageStatsDoQueryRangeRequestBuilder requestBuilder = USAGE_STATS_REQUEST_BUILDERS
|
||||
.actionQueryRange()
|
||||
.resourceParam(resource)
|
||||
.durationParam(WindowDuration.DAY)
|
||||
.rangeFromEndParam(range);
|
||||
|
@ -42,9 +42,7 @@ public class UsageClientCache {
|
||||
public UsageClientCache build() {
|
||||
// estimate size
|
||||
Weigher<Key, UsageQueryResult> weighByEstimatedSize = (key, value) ->
|
||||
value.data().values().parallelStream()
|
||||
.mapToInt(o -> o.toString().getBytes().length)
|
||||
.sum();
|
||||
value.data().toString().getBytes().length;
|
||||
|
||||
// batch loads data from usage client
|
||||
Function<Iterable<? extends Key>, Map<Key, UsageQueryResult>> loader = (Iterable<? extends Key> keys) ->
|
||||
|
@ -1,316 +0,0 @@
|
||||
package com.linkedin.metadata.resources.usage;
|
||||
|
||||
import com.linkedin.common.WindowDuration;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.data.template.SetMode;
|
||||
import com.linkedin.data.template.StringArray;
|
||||
import com.linkedin.data.template.StringArrayArray;
|
||||
import com.linkedin.metadata.query.Condition;
|
||||
import com.linkedin.metadata.query.Criterion;
|
||||
import com.linkedin.metadata.query.CriterionArray;
|
||||
import com.linkedin.metadata.query.Filter;
|
||||
import com.linkedin.metadata.timeseries.elastic.ElasticSearchTimeseriesAspectService;
|
||||
import com.linkedin.metadata.usage.UsageService;
|
||||
import com.linkedin.timeseries.AggregationSpec;
|
||||
import com.linkedin.timeseries.AggregationType;
|
||||
import com.linkedin.timeseries.CalendarInterval;
|
||||
import com.linkedin.timeseries.DateGroupingBucket;
|
||||
import com.linkedin.timeseries.GenericTable;
|
||||
import com.linkedin.metadata.restli.RestliUtils;
|
||||
import com.linkedin.parseq.Task;
|
||||
import com.linkedin.restli.server.annotations.Action;
|
||||
import com.linkedin.restli.server.annotations.ActionParam;
|
||||
import com.linkedin.restli.server.annotations.RestLiSimpleResource;
|
||||
import com.linkedin.restli.server.resources.SimpleResourceTemplate;
|
||||
import com.linkedin.timeseries.GroupingBucket;
|
||||
import com.linkedin.timeseries.MetricAggregation;
|
||||
import com.linkedin.timeseries.StringGroupingBucket;
|
||||
import com.linkedin.usage.FieldUsageCounts;
|
||||
import com.linkedin.usage.FieldUsageCountsArray;
|
||||
import com.linkedin.usage.UsageAggregation;
|
||||
import com.linkedin.usage.UsageAggregationArray;
|
||||
import com.linkedin.usage.UsageAggregationMetrics;
|
||||
import com.linkedin.usage.UsageQueryResult;
|
||||
import com.linkedin.usage.UsageQueryResultAggregations;
|
||||
import com.linkedin.usage.UsageTimeRange;
|
||||
import com.linkedin.usage.UserUsageCounts;
|
||||
import com.linkedin.usage.UserUsageCountsArray;
|
||||
import com.linkedin.util.Pair;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
/**
|
||||
* Rest.li entry point: /usageStats
|
||||
*/
|
||||
@RestLiSimpleResource(name = "usageStats", namespace = "com.linkedin.usage")
|
||||
public class UsageStats extends SimpleResourceTemplate<UsageAggregation> {
|
||||
private static final String ACTION_BATCH_INGEST = "batchIngest";
|
||||
private static final String PARAM_BUCKETS = "buckets";
|
||||
|
||||
private static final String ACTION_QUERY = "query";
|
||||
private static final String PARAM_RESOURCE = "resource";
|
||||
private static final String PARAM_DURATION = "duration";
|
||||
private static final String PARAM_START_TIME = "startTime";
|
||||
private static final String PARAM_END_TIME = "endTime";
|
||||
private static final String PARAM_MAX_BUCKETS = "maxBuckets";
|
||||
|
||||
private static final String ACTION_QUERY_RANGE = "queryRange";
|
||||
private static final String PARAM_RANGE = "rangeFromEnd";
|
||||
private static final String USAGE_STATS_ENTITY_NAME = "dataset";
|
||||
private static final String USAGE_STATS_ASPECT_NAME = "datasetUsageStatistics";
|
||||
private static final String ES_FIELD_TIMESTAMP = "timestampMillis";
|
||||
private final Logger _logger = LoggerFactory.getLogger(UsageStats.class.getName());
|
||||
@Inject
|
||||
@Named("usageService")
|
||||
private UsageService _usageService;
|
||||
@Inject
|
||||
@Named("elasticSearchTimeseriesAspectService")
|
||||
private ElasticSearchTimeseriesAspectService _elasticSearchTimeseriesAspectService;
|
||||
|
||||
@Action(name = ACTION_BATCH_INGEST)
|
||||
@Nonnull
|
||||
public Task<Void> batchIngest(@ActionParam(PARAM_BUCKETS) @Nonnull UsageAggregation[] buckets) {
|
||||
_logger.info("Ingesting {} usage stats aggregations", buckets.length);
|
||||
return RestliUtils.toTask(() -> {
|
||||
for (UsageAggregation agg : buckets) {
|
||||
this.ingest(agg);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private CalendarInterval windowToInterval(@Nonnull WindowDuration duration) {
|
||||
switch (duration) {
|
||||
case HOUR:
|
||||
return CalendarInterval.HOUR;
|
||||
case DAY:
|
||||
return CalendarInterval.DAY;
|
||||
case WEEK:
|
||||
return CalendarInterval.WEEK;
|
||||
case MONTH:
|
||||
return CalendarInterval.MONTH;
|
||||
case YEAR:
|
||||
return CalendarInterval.YEAR;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported duration value" + duration);
|
||||
}
|
||||
}
|
||||
|
||||
private UsageAggregationArray getBuckets(@Nonnull String resource, @Nonnull WindowDuration duration, Long startTime, Long endTime) {
|
||||
// Populate the filter
|
||||
Filter filter = new Filter();
|
||||
ArrayList<Criterion> criteria = new ArrayList<>();
|
||||
Criterion hasUrnCriterion = new Criterion().setField("urn").setCondition(Condition.EQUAL).setValue(resource);
|
||||
criteria.add(hasUrnCriterion);
|
||||
if (startTime != null) {
|
||||
Criterion startTimeCriterion = new Criterion().setField(ES_FIELD_TIMESTAMP)
|
||||
.setCondition(Condition.GREATER_THAN_OR_EQUAL_TO)
|
||||
.setValue(startTime.toString());
|
||||
criteria.add(startTimeCriterion);
|
||||
}
|
||||
if (endTime != null) {
|
||||
Criterion endTimeCriterion = new Criterion().setField(ES_FIELD_TIMESTAMP)
|
||||
.setCondition(Condition.LESS_THAN_OR_EQUAL_TO)
|
||||
.setValue(endTime.toString());
|
||||
criteria.add(endTimeCriterion);
|
||||
}
|
||||
filter.setCriteria(new CriterionArray(criteria));
|
||||
// Populate the aggregation specs
|
||||
ArrayList<AggregationSpec> aggregationSpecs = new ArrayList<>();
|
||||
aggregationSpecs.add(new AggregationSpec().setAggregationType(AggregationType.LATEST).setMemberName("uniqueUserCount"));
|
||||
aggregationSpecs.add(new AggregationSpec().setAggregationType(AggregationType.LATEST).setMemberName("totalSqlQueries"));
|
||||
aggregationSpecs.add(new AggregationSpec().setAggregationType(AggregationType.LATEST).setMemberName("topSqlQueries"));
|
||||
/*
|
||||
aggregationSpecs.add(new AggregationSpec().setAggregationType(AggregationType.SUM).setMemberName("totalSqlQueries"));
|
||||
aggregationSpecs.add(new AggregationSpec().setAggregationType(AggregationType.SUM).setMemberName("userCounts.count"));
|
||||
aggregationSpecs.add(new AggregationSpec().setAggregationType(AggregationType.SUM).setMemberName("fieldCounts.count"));
|
||||
*/
|
||||
|
||||
// Populate the Grouping buckets
|
||||
ArrayList<GroupingBucket> groupingBuckets = new ArrayList<>();
|
||||
// ts bucket
|
||||
GroupingBucket timestampBucket = new GroupingBucket();
|
||||
timestampBucket.setDateGroupingBucket(
|
||||
new DateGroupingBucket().setKey(ES_FIELD_TIMESTAMP).setGranularity(windowToInterval(duration)));
|
||||
groupingBuckets.add(timestampBucket);
|
||||
/*
|
||||
// user counts bucket
|
||||
GroupingBucket userGroupsBucket = new GroupingBucket();
|
||||
userGroupsBucket.setStringGroupingBucket( new StringGroupingBucket().setKey("userCounts.user") );
|
||||
groupingBuckets.add(userGroupsBucket);
|
||||
// field counts bucket
|
||||
GroupingBucket fieldCountGroupBucket = new GroupingBucket();
|
||||
fieldCountGroupBucket.setStringGroupingBucket(new StringGroupingBucket().setKey("fieldCounts.fieldName"));
|
||||
groupingBuckets.add(fieldCountGroupBucket);
|
||||
*/
|
||||
|
||||
GenericTable result =
|
||||
_elasticSearchTimeseriesAspectService.getAggregatedStats(USAGE_STATS_ENTITY_NAME, USAGE_STATS_ASPECT_NAME,
|
||||
(AggregationSpec[]) aggregationSpecs.toArray(), filter, (GroupingBucket[]) groupingBuckets.toArray());
|
||||
UsageAggregationArray buckets = new UsageAggregationArray();
|
||||
for(StringArray row: result.getRows()) {
|
||||
UsageAggregation usageAggregation = new UsageAggregation();
|
||||
usageAggregation.setBucket(Long.valueOf(row.get(0)));
|
||||
usageAggregation.setDuration(duration);
|
||||
try {
|
||||
usageAggregation.setResource(new Urn(resource));
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException("Invalid resource" + e);
|
||||
}
|
||||
UsageAggregationMetrics usageAggregationMetrics = new UsageAggregationMetrics();
|
||||
usageAggregationMetrics.setUniqueUserCount(Integer.valueOf(row.get(1)));
|
||||
usageAggregationMetrics.setTotalSqlQueries(Integer.valueOf(row.get(2)));
|
||||
//usageAggregationMetrics.setTopSqlQueries(row.get(3));
|
||||
usageAggregation.setMetrics(usageAggregationMetrics);
|
||||
}
|
||||
return buckets;
|
||||
}
|
||||
|
||||
private UsageQueryResultAggregations getAggregations(String resource, WindowDuration duration, Long startTime, Long endTime) {
|
||||
// TODO: make the aggregation computation logic reusable
|
||||
UsageQueryResultAggregations aggregations = new UsageQueryResultAggregations();
|
||||
|
||||
/*
|
||||
// Compute aggregations for users and unique user count.
|
||||
{
|
||||
Map<Pair<Urn, String>, Integer> userAgg = new HashMap<>();
|
||||
buckets.forEach((bucket) -> {
|
||||
Optional.ofNullable(bucket.getMetrics().getUsers()).ifPresent(usersUsageCounts -> {
|
||||
usersUsageCounts.forEach((userCount -> {
|
||||
Pair<Urn, String> key = new Pair<>(userCount.getUser(), userCount.getUserEmail());
|
||||
int count = userAgg.getOrDefault(key, 0);
|
||||
count += userCount.getCount();
|
||||
userAgg.put(key, count);
|
||||
}));
|
||||
});
|
||||
});
|
||||
|
||||
if (!userAgg.isEmpty()) {
|
||||
UserUsageCountsArray users = new UserUsageCountsArray();
|
||||
users.addAll(userAgg.entrySet()
|
||||
.stream()
|
||||
.map((mapping) -> new UserUsageCounts().setUser(mapping.getKey().getFirst(), SetMode.REMOVE_IF_NULL)
|
||||
.setUserEmail(mapping.getKey().getSecond(), SetMode.REMOVE_IF_NULL)
|
||||
.setCount(mapping.getValue()))
|
||||
.collect(Collectors.toList()));
|
||||
aggregations.setUsers(users);
|
||||
aggregations.setUniqueUserCount(userAgg.size());
|
||||
}
|
||||
}
|
||||
|
||||
// Compute aggregation for total query count.
|
||||
{
|
||||
Integer totalQueryCount = null;
|
||||
|
||||
for (UsageAggregation bucket : buckets) {
|
||||
if (bucket.getMetrics().getTotalSqlQueries() != null) {
|
||||
if (totalQueryCount == null) {
|
||||
totalQueryCount = 0;
|
||||
}
|
||||
totalQueryCount += bucket.getMetrics().getTotalSqlQueries();
|
||||
}
|
||||
}
|
||||
|
||||
if (totalQueryCount != null) {
|
||||
aggregations.setTotalSqlQueries(totalQueryCount);
|
||||
}
|
||||
}
|
||||
|
||||
// Compute aggregations for field usage counts.
|
||||
{
|
||||
Map<String, Integer> fieldAgg = new HashMap<>();
|
||||
buckets.forEach((bucket) -> {
|
||||
Optional.ofNullable(bucket.getMetrics().getFields()).ifPresent(fieldUsageCounts -> {
|
||||
fieldUsageCounts.forEach((fieldCount -> {
|
||||
String key = fieldCount.getFieldName();
|
||||
int count = fieldAgg.getOrDefault(key, 0);
|
||||
count += fieldCount.getCount();
|
||||
fieldAgg.put(key, count);
|
||||
}));
|
||||
});
|
||||
});
|
||||
|
||||
if (!fieldAgg.isEmpty()) {
|
||||
FieldUsageCountsArray fields = new FieldUsageCountsArray();
|
||||
fields.addAll(fieldAgg.entrySet()
|
||||
.stream()
|
||||
.map((mapping) -> new FieldUsageCounts().setFieldName(mapping.getKey()).setCount(mapping.getValue()))
|
||||
.collect(Collectors.toList()));
|
||||
aggregations.setFields(fields);
|
||||
}
|
||||
}
|
||||
*/
|
||||
return aggregations;
|
||||
}
|
||||
|
||||
@Action(name = ACTION_QUERY)
|
||||
@Nonnull
|
||||
public Task<UsageQueryResult> query(@ActionParam(PARAM_RESOURCE) @Nonnull String resource,
|
||||
@ActionParam(PARAM_DURATION) @Nonnull WindowDuration duration,
|
||||
@ActionParam(PARAM_START_TIME) @com.linkedin.restli.server.annotations.Optional Long startTime,
|
||||
@ActionParam(PARAM_END_TIME) @com.linkedin.restli.server.annotations.Optional Long endTime,
|
||||
@ActionParam(PARAM_MAX_BUCKETS) @com.linkedin.restli.server.annotations.Optional Integer maxBuckets) {
|
||||
_logger.info("Attempting to query usage stats");
|
||||
return RestliUtils.toTask(() -> {
|
||||
UsageAggregationArray buckets = getBuckets(resource, duration, startTime, endTime);
|
||||
UsageQueryResultAggregations aggregations = getAggregations(resource, duration, startTime, endTime);
|
||||
return new UsageQueryResult().setBuckets(buckets).setAggregations(aggregations);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@Action(name = ACTION_QUERY_RANGE)
|
||||
@Nonnull
|
||||
public Task<UsageQueryResult> queryRange(@ActionParam(PARAM_RESOURCE) @Nonnull String resource,
|
||||
@ActionParam(PARAM_DURATION) @Nonnull WindowDuration duration, @ActionParam(PARAM_RANGE) UsageTimeRange range) {
|
||||
final long now = Instant.now().toEpochMilli();
|
||||
return this.query(resource, duration, convertRangeToStartTime(range, now), now, null);
|
||||
}
|
||||
|
||||
private void ingest(@Nonnull UsageAggregation bucket) {
|
||||
// TODO attempt to resolve users into emails
|
||||
_usageService.upsertDocument(bucket);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
Long convertRangeToStartTime(@Nonnull UsageTimeRange range, long currentEpochMillis) {
|
||||
// TRICKY: since start_time must be before the bucket's start, we actually
|
||||
// need to subtract extra from the current time to ensure that we get precisely
|
||||
// what we're looking for. Note that start_time and end_time are both inclusive,
|
||||
// so we must also do an off-by-one adjustment.
|
||||
final long oneHourMillis = 60 * 60 * 1000;
|
||||
final long oneDayMillis = 24 * oneHourMillis;
|
||||
|
||||
if (range == UsageTimeRange.HOUR) {
|
||||
return currentEpochMillis - (2 * oneHourMillis + 1);
|
||||
} else if (range == UsageTimeRange.DAY) {
|
||||
return currentEpochMillis - (2 * oneDayMillis + 1);
|
||||
} else if (range == UsageTimeRange.WEEK) {
|
||||
return currentEpochMillis - (8 * oneDayMillis + 1);
|
||||
} else if (range == UsageTimeRange.MONTH) {
|
||||
// Assuming month is last 30 days.
|
||||
return currentEpochMillis - (31 * oneDayMillis + 1);
|
||||
} else if (range == UsageTimeRange.QUARTER) {
|
||||
// Assuming a quarter is 91 days.
|
||||
return currentEpochMillis - (92 * oneDayMillis + 1);
|
||||
} else if (range == UsageTimeRange.YEAR) {
|
||||
return currentEpochMillis - (366 * oneDayMillis + 1);
|
||||
} else if (range == UsageTimeRange.ALL) {
|
||||
return 0L;
|
||||
} else {
|
||||
throw new IllegalArgumentException("invalid UsageTimeRange enum state: " + range.name());
|
||||
}
|
||||
}
|
||||
}
|
@ -44,18 +44,34 @@ public class DefaultRestliClientFactory {
|
||||
@Nonnull
|
||||
public static RestClient getRestLiClient(@Nonnull String restLiServerHost, int restLiServerPort, boolean useSSL,
|
||||
@Nullable String sslProtocol) {
|
||||
return getRestLiClient(restLiServerHost, restLiServerPort, useSSL, sslProtocol, null);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public static RestClient getRestLiClient(@Nonnull String restLiServerHost, int restLiServerPort, boolean useSSL,
|
||||
@Nullable String sslProtocol, @Nullable Map<String, String> params) {
|
||||
return getRestLiClient(
|
||||
URI.create(String.format("%s://%s:%s", useSSL ? "https" : "http", restLiServerHost, restLiServerPort)),
|
||||
sslProtocol);
|
||||
sslProtocol,
|
||||
params);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public static RestClient getRestLiClient(@Nonnull URI gmsUri, @Nullable String sslProtocol) {
|
||||
return getRestLiClient(gmsUri, sslProtocol, null);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public static RestClient getRestLiClient(@Nonnull URI gmsUri, @Nullable String sslProtocol,
|
||||
@Nullable Map<String, String> inputParams) {
|
||||
if (StringUtils.isBlank(gmsUri.getHost()) || gmsUri.getPort() <= 0) {
|
||||
throw new InvalidParameterException("Invalid restli server host name or port!");
|
||||
}
|
||||
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
if (inputParams != null) {
|
||||
params.putAll(inputParams);
|
||||
}
|
||||
|
||||
if ("https".equals(gmsUri.getScheme())) {
|
||||
try {
|
||||
|
Loading…
x
Reference in New Issue
Block a user