feat(model): dashboard usage model, is_null condition added (#5397)

This commit is contained in:
Mayuri Nehate 2022-07-15 15:37:06 +05:30 committed by GitHub
parent d24b91df42
commit 2c48329810
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1488 additions and 11 deletions

View File

@ -30,6 +30,7 @@ import com.linkedin.datahub.graphql.generated.CorpUser;
import com.linkedin.datahub.graphql.generated.CorpUserInfo;
import com.linkedin.datahub.graphql.generated.Dashboard;
import com.linkedin.datahub.graphql.generated.DashboardInfo;
import com.linkedin.datahub.graphql.generated.DashboardUserUsageCounts;
import com.linkedin.datahub.graphql.generated.DataFlow;
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
@ -81,6 +82,7 @@ import com.linkedin.datahub.graphql.resolvers.browse.BrowseResolver;
import com.linkedin.datahub.graphql.resolvers.config.AppConfigResolver;
import com.linkedin.datahub.graphql.resolvers.container.ContainerEntitiesResolver;
import com.linkedin.datahub.graphql.resolvers.container.ParentContainersResolver;
import com.linkedin.datahub.graphql.resolvers.dashboard.DashboardUsageStatsResolver;
import com.linkedin.datahub.graphql.resolvers.dataset.DatasetHealthResolver;
import com.linkedin.datahub.graphql.resolvers.deprecation.UpdateDeprecationResolver;
import com.linkedin.datahub.graphql.resolvers.domain.CreateDomainResolver;
@ -1015,6 +1017,7 @@ public class GmsGraphQLEngine {
})
)
.dataFetcher("parentContainers", new ParentContainersResolver(entityClient))
.dataFetcher("usageStats", new DashboardUsageStatsResolver(timeseriesAspectService))
);
builder.type("DashboardInfo", typeWiring -> typeWiring
.dataFetcher("charts", new LoadableTypeBatchResolver<>(chartType,
@ -1022,6 +1025,11 @@ public class GmsGraphQLEngine {
.map(Chart::getUrn)
.collect(Collectors.toList())))
);
builder.type("DashboardUserUsageCounts", typeWiring -> typeWiring
.dataFetcher("user", new LoadableTypeResolver<>(
corpUserType,
(env) -> ((DashboardUserUsageCounts) env.getSource()).getUser().getUrn()))
);
}
/**

View File

@ -0,0 +1,350 @@
package com.linkedin.datahub.graphql.resolvers.dashboard;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.StringArray;
import com.linkedin.datahub.graphql.generated.CorpUser;
import com.linkedin.datahub.graphql.generated.DashboardUsageAggregation;
import com.linkedin.datahub.graphql.generated.DashboardUsageAggregationMetrics;
import com.linkedin.datahub.graphql.generated.DashboardUsageMetrics;
import com.linkedin.datahub.graphql.generated.DashboardUsageQueryResult;
import com.linkedin.datahub.graphql.generated.DashboardUsageQueryResultAggregations;
import com.linkedin.datahub.graphql.generated.DashboardUserUsageCounts;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.WindowDuration;
import com.linkedin.datahub.graphql.types.dashboard.mappers.DashboardUsageMetricMapper;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.timeseries.AggregationSpec;
import com.linkedin.timeseries.AggregationType;
import com.linkedin.timeseries.CalendarInterval;
import com.linkedin.timeseries.GenericTable;
import com.linkedin.timeseries.GroupingBucket;
import com.linkedin.timeseries.GroupingBucketType;
import com.linkedin.timeseries.TimeWindowSize;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
/**
* Resolver used for resolving the usage statistics of a Dashboard.
* <p>
* Returns daily as well as absolute usage metrics of Dashboard
*/
@Slf4j
public class DashboardUsageStatsResolver implements DataFetcher<CompletableFuture<DashboardUsageQueryResult>> {
private static final String ES_FIELD_URN = "urn";
private static final String ES_FIELD_TIMESTAMP = "timestampMillis";
private static final String ES_FIELD_EVENT_GRANULARITY = "eventGranularity";
private static final String ES_NULL_VALUE = "NULL";
private final TimeseriesAspectService timeseriesAspectService;
public DashboardUsageStatsResolver(TimeseriesAspectService timeseriesAspectService) {
this.timeseriesAspectService = timeseriesAspectService;
}
@Override
public CompletableFuture<DashboardUsageQueryResult> get(DataFetchingEnvironment environment) throws Exception {
final String dashboardUrn = ((Entity) environment.getSource()).getUrn();
final Long maybeStartTimeMillis = environment.getArgumentOrDefault("startTimeMillis", null);
final Long maybeEndTimeMillis = environment.getArgumentOrDefault("endTimeMillis", null);
// Max number of aspects to return for absolute dashboard usage.
final Integer maybeLimit = environment.getArgumentOrDefault("limit", null);
return CompletableFuture.supplyAsync(() -> {
DashboardUsageQueryResult usageQueryResult = new DashboardUsageQueryResult();
// Time Bucket Stats
Filter bucketStatsFilter = createBucketUsageStatsFilter(dashboardUrn, maybeStartTimeMillis, maybeEndTimeMillis);
List<DashboardUsageAggregation> dailyUsageBuckets = getBuckets(bucketStatsFilter, dashboardUrn);
DashboardUsageQueryResultAggregations aggregations = getAggregations(bucketStatsFilter, dailyUsageBuckets);
usageQueryResult.setBuckets(dailyUsageBuckets);
usageQueryResult.setAggregations(aggregations);
// Absolute usage metrics
List<DashboardUsageMetrics> dashboardUsageMetrics =
getDashboardUsageMetrics(dashboardUrn, maybeStartTimeMillis, maybeEndTimeMillis, maybeLimit);
usageQueryResult.setMetrics(dashboardUsageMetrics);
return usageQueryResult;
});
}
private List<DashboardUsageMetrics> getDashboardUsageMetrics(String dashboardUrn, Long maybeStartTimeMillis,
Long maybeEndTimeMillis, Integer maybeLimit) {
List<DashboardUsageMetrics> dashboardUsageMetrics;
try {
Filter filter = new Filter();
final ArrayList<Criterion> criteria = new ArrayList<>();
// Add filter for absence of eventGranularity - only consider absolute stats
Criterion excludeTimeBucketsCriterion =
new Criterion().setField(ES_FIELD_EVENT_GRANULARITY).setCondition(Condition.IS_NULL).setValue("");
criteria.add(excludeTimeBucketsCriterion);
filter.setOr(new ConjunctiveCriterionArray(
ImmutableList.of(new ConjunctiveCriterion().setAnd(new CriterionArray(criteria)))));
List<EnvelopedAspect> aspects =
timeseriesAspectService.getAspectValues(Urn.createFromString(dashboardUrn), Constants.DASHBOARD_ENTITY_NAME,
Constants.DASHBOARD_USAGE_STATISTICS_ASPECT_NAME, maybeStartTimeMillis, maybeEndTimeMillis, maybeLimit,
null, filter);
dashboardUsageMetrics = aspects.stream().map(DashboardUsageMetricMapper::map).collect(Collectors.toList());
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid resource", e);
}
return dashboardUsageMetrics;
}
private DashboardUsageQueryResultAggregations getAggregations(Filter bucketStatsFilter,
final List<DashboardUsageAggregation> dailyUsageBuckets) {
List<DashboardUserUsageCounts> userUsageCounts = getUserUsageCounts(bucketStatsFilter);
DashboardUsageQueryResultAggregations aggregations = new DashboardUsageQueryResultAggregations();
aggregations.setUsers(userUsageCounts);
aggregations.setUniqueUserCount(userUsageCounts.size());
// Compute total viewsCount and executionsCount for queries time range from the buckets itself.
// We want to avoid issuing an additional query with a sum aggregation.
Integer totalViewsCount = null;
Integer totalExecutionsCount = null;
for (DashboardUsageAggregation bucket : dailyUsageBuckets) {
if (bucket.getMetrics().getExecutionsCount() != null) {
if (totalExecutionsCount == null) {
totalExecutionsCount = 0;
}
totalExecutionsCount += bucket.getMetrics().getExecutionsCount();
}
if (bucket.getMetrics().getViewsCount() != null) {
if (totalViewsCount == null) {
totalViewsCount = 0;
}
totalViewsCount += bucket.getMetrics().getViewsCount();
}
}
aggregations.setExecutionsCount(totalExecutionsCount);
aggregations.setViewsCount(totalViewsCount);
return aggregations;
}
private List<DashboardUsageAggregation> getBuckets(Filter bucketStatsFilter, String dashboardUrn) {
AggregationSpec usersCountAggregation =
new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("uniqueUserCount");
AggregationSpec viewsCountAggregation =
new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("viewsCount");
AggregationSpec executionsCountAggregation =
new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("executionsCount");
AggregationSpec usersCountCardinalityAggregation =
new AggregationSpec().setAggregationType(AggregationType.CARDINALITY).setFieldPath("uniqueUserCount");
AggregationSpec viewsCountCardinalityAggregation =
new AggregationSpec().setAggregationType(AggregationType.CARDINALITY).setFieldPath("viewsCount");
AggregationSpec executionsCountCardinalityAggregation =
new AggregationSpec().setAggregationType(AggregationType.CARDINALITY).setFieldPath("executionsCount");
AggregationSpec[] aggregationSpecs =
new AggregationSpec[]{usersCountAggregation, viewsCountAggregation, executionsCountAggregation,
usersCountCardinalityAggregation, viewsCountCardinalityAggregation, executionsCountCardinalityAggregation};
GenericTable dailyStats = timeseriesAspectService.getAggregatedStats(Constants.DASHBOARD_ENTITY_NAME,
Constants.DASHBOARD_USAGE_STATISTICS_ASPECT_NAME, aggregationSpecs, bucketStatsFilter,
createUsageGroupingBuckets(CalendarInterval.DAY));
List<DashboardUsageAggregation> buckets = new ArrayList<>();
StringArray columnNames = dailyStats.getColumnNames();
Integer idxTimestampMillis = columnNames.indexOf("timestampMillis");
Integer idxUserCountSum = columnNames.indexOf("sum_uniqueUserCount");
Integer idxViewsCountSum = columnNames.indexOf("sum_viewsCount");
Integer idxExecutionsCountSum = columnNames.indexOf("sum_executionsCount");
Integer idxUserCountCardinality = columnNames.indexOf("cardinality_uniqueUserCount");
Integer idxViewsCountCardinality = columnNames.indexOf("cardinality_viewsCount");
Integer idxExecutionsCountCardinality = columnNames.indexOf("cardinality_executionsCount");
for (StringArray row : dailyStats.getRows()) {
DashboardUsageAggregation usageAggregation = new DashboardUsageAggregation();
usageAggregation.setBucket(Long.valueOf(row.get(idxTimestampMillis)));
usageAggregation.setDuration(WindowDuration.DAY);
usageAggregation.setResource(dashboardUrn);
DashboardUsageAggregationMetrics usageAggregationMetrics = new DashboardUsageAggregationMetrics();
// Note: Currently SUM AggregationType returns 0 (zero) value even if all values in timeseries field being aggregated
// are NULL (missing). For example sum of execution counts come up as 0 if all values in executions count timeseries
// are NULL. To overcome this, we extract CARDINALITY for the same timeseries field. Cardinality of 0 identifies
// above scenario. For such scenario, we set sum as NULL.
if (!row.get(idxUserCountSum).equals(ES_NULL_VALUE) && !row.get(idxUserCountCardinality).equals(ES_NULL_VALUE)) {
try {
if (Integer.valueOf(row.get(idxUserCountCardinality)) != 0) {
usageAggregationMetrics.setUniqueUserCount(Integer.valueOf(row.get(idxUserCountSum)));
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to convert uniqueUserCount from ES to int", e);
}
}
if (!row.get(idxViewsCountSum).equals(ES_NULL_VALUE) && !row.get(idxViewsCountCardinality)
.equals(ES_NULL_VALUE)) {
try {
if (Integer.valueOf(row.get(idxViewsCountCardinality)) != 0) {
usageAggregationMetrics.setViewsCount(Integer.valueOf(row.get(idxViewsCountSum)));
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to convert viewsCount from ES to int", e);
}
}
if (!row.get(idxExecutionsCountSum).equals(ES_NULL_VALUE) && !row.get(idxExecutionsCountCardinality)
.equals(ES_NULL_VALUE)) {
try {
if (Integer.valueOf(row.get(idxExecutionsCountCardinality)) != 0) {
usageAggregationMetrics.setExecutionsCount(Integer.valueOf(row.get(idxExecutionsCountSum)));
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to convert executionsCount from ES to object", e);
}
}
usageAggregation.setMetrics(usageAggregationMetrics);
buckets.add(usageAggregation);
}
return buckets;
}
private List<DashboardUserUsageCounts> getUserUsageCounts(Filter filter) {
// Sum aggregation on userCounts.count
AggregationSpec sumUsageCountsCountAggSpec =
new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("userCounts.usageCount");
AggregationSpec sumViewCountsCountAggSpec =
new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("userCounts.viewsCount");
AggregationSpec sumExecutionCountsCountAggSpec =
new AggregationSpec().setAggregationType(AggregationType.SUM).setFieldPath("userCounts.executionsCount");
AggregationSpec usageCountsCardinalityAggSpec =
new AggregationSpec().setAggregationType(AggregationType.CARDINALITY).setFieldPath("userCounts.usageCount");
AggregationSpec viewCountsCardinalityAggSpec =
new AggregationSpec().setAggregationType(AggregationType.CARDINALITY).setFieldPath("userCounts.viewsCount");
AggregationSpec executionCountsCardinalityAggSpec =
new AggregationSpec().setAggregationType(AggregationType.CARDINALITY)
.setFieldPath("userCounts.executionsCount");
AggregationSpec[] aggregationSpecs =
new AggregationSpec[]{sumUsageCountsCountAggSpec, sumViewCountsCountAggSpec, sumExecutionCountsCountAggSpec,
usageCountsCardinalityAggSpec, viewCountsCardinalityAggSpec, executionCountsCardinalityAggSpec};
// String grouping bucket on userCounts.user
GroupingBucket userGroupingBucket =
new GroupingBucket().setKey("userCounts.user").setType(GroupingBucketType.STRING_GROUPING_BUCKET);
GroupingBucket[] groupingBuckets = new GroupingBucket[]{userGroupingBucket};
// Query backend
GenericTable result = timeseriesAspectService.getAggregatedStats(Constants.DASHBOARD_ENTITY_NAME,
Constants.DASHBOARD_USAGE_STATISTICS_ASPECT_NAME, aggregationSpecs, filter, groupingBuckets);
StringArray columnNames = result.getColumnNames();
Integer idxUser = columnNames.indexOf("userCounts.user");
Integer idxUsageCountSum = columnNames.indexOf("sum_userCounts.usageCount");
Integer idxViewsCountSum = columnNames.indexOf("sum_userCounts.viewsCount");
Integer idxExecutionsCountSum = columnNames.indexOf("sum_userCounts.executionsCount");
Integer idxUsageCountCardinality = columnNames.indexOf("cardinality_userCounts.usageCount");
Integer idxViewsCountCardinality = columnNames.indexOf("cardinality_userCounts.viewsCount");
Integer idxExecutionsCountCardinality = columnNames.indexOf("cardinality_userCounts.executionsCount");
// Process response
List<DashboardUserUsageCounts> userUsageCounts = new ArrayList<>();
for (StringArray row : result.getRows()) {
DashboardUserUsageCounts userUsageCount = new DashboardUserUsageCounts();
CorpUser partialUser = new CorpUser();
partialUser.setUrn(row.get(idxUser));
userUsageCount.setUser(partialUser);
// Note: Currently SUM AggregationType returns 0 (zero) value even if all values in timeseries field being aggregated
// are NULL (missing). For example sum of execution counts come up as 0 if all values in executions count timeseries
// are NULL. To overcome this, we extract CARDINALITY for the same timeseries field. Cardinality of 0 identifies
// above scenario. For such scenario, we set sum as NULL.
if (!row.get(idxUsageCountSum).equals(ES_NULL_VALUE) && !row.get(idxUsageCountCardinality)
.equals(ES_NULL_VALUE)) {
try {
if (Integer.valueOf(row.get(idxUsageCountCardinality)) != 0) {
userUsageCount.setUsageCount(Integer.valueOf(row.get(idxUsageCountSum)));
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to convert user usage count from ES to int", e);
}
}
if (!row.get(idxViewsCountSum).equals(ES_NULL_VALUE) && row.get(idxViewsCountCardinality).equals(ES_NULL_VALUE)) {
try {
if (Integer.valueOf(row.get(idxViewsCountCardinality)) != 0) {
userUsageCount.setViewsCount(Integer.valueOf(row.get(idxViewsCountSum)));
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to convert user views count from ES to int", e);
}
}
if (!row.get(idxExecutionsCountSum).equals(ES_NULL_VALUE) && !row.get(idxExecutionsCountCardinality)
.equals(ES_NULL_VALUE)) {
try {
if (Integer.valueOf(row.get(idxExecutionsCountCardinality)) != 0) {
userUsageCount.setExecutionsCount(Integer.valueOf(row.get(idxExecutionsCountSum)));
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to convert user executions count from ES to int", e);
}
}
userUsageCounts.add(userUsageCount);
}
return userUsageCounts;
}
private GroupingBucket[] createUsageGroupingBuckets(CalendarInterval calenderInterval) {
GroupingBucket timestampBucket = new GroupingBucket();
timestampBucket.setKey(ES_FIELD_TIMESTAMP)
.setType(GroupingBucketType.DATE_GROUPING_BUCKET)
.setTimeWindowSize(new TimeWindowSize().setMultiple(1).setUnit(calenderInterval));
return new GroupingBucket[]{timestampBucket};
}
private Filter createBucketUsageStatsFilter(String dashboardUrn, Long startTime, Long endTime) {
Filter filter = new Filter();
final ArrayList<Criterion> criteria = new ArrayList<>();
// Add filter for urn == dashboardUrn
Criterion dashboardUrnCriterion =
new Criterion().setField(ES_FIELD_URN).setCondition(Condition.EQUAL).setValue(dashboardUrn);
criteria.add(dashboardUrnCriterion);
if (startTime != null) {
// Add filter for start time
Criterion startTimeCriterion = new Criterion().setField(ES_FIELD_TIMESTAMP)
.setCondition(Condition.GREATER_THAN_OR_EQUAL_TO)
.setValue(Long.toString(startTime));
criteria.add(startTimeCriterion);
}
if (endTime != null) {
// Add filter for end time
Criterion endTimeCriterion = new Criterion().setField(ES_FIELD_TIMESTAMP)
.setCondition(Condition.LESS_THAN_OR_EQUAL_TO)
.setValue(Long.toString(endTime));
criteria.add(endTimeCriterion);
}
// Add filter for presence of eventGranularity - only consider bucket stats and not absolute stats
// since unit is mandatory, we assume if eventGranularity contains unit, then it is not null
Criterion onlyTimeBucketsCriterion =
new Criterion().setField(ES_FIELD_EVENT_GRANULARITY).setCondition(Condition.CONTAIN).setValue("unit");
criteria.add(onlyTimeBucketsCriterion);
filter.setOr(new ConjunctiveCriterionArray(
ImmutableList.of(new ConjunctiveCriterion().setAnd(new CriterionArray(criteria)))));
return filter;
}
}

View File

@ -0,0 +1,34 @@
package com.linkedin.datahub.graphql.types.dashboard.mappers;
import com.linkedin.datahub.graphql.generated.DashboardUsageMetrics;
import com.linkedin.datahub.graphql.types.mappers.TimeSeriesAspectMapper;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.utils.GenericRecordUtils;
import javax.annotation.Nonnull;
public class DashboardUsageMetricMapper implements TimeSeriesAspectMapper<DashboardUsageMetrics> {
public static final DashboardUsageMetricMapper INSTANCE = new DashboardUsageMetricMapper();
public static DashboardUsageMetrics map(@Nonnull final EnvelopedAspect envelopedAspect) {
return INSTANCE.apply(envelopedAspect);
}
@Override
public DashboardUsageMetrics apply(EnvelopedAspect envelopedAspect) {
com.linkedin.dashboard.DashboardUsageStatistics gmsDashboardUsageStatistics =
GenericRecordUtils.deserializeAspect(envelopedAspect.getAspect().getValue(),
envelopedAspect.getAspect().getContentType(), com.linkedin.dashboard.DashboardUsageStatistics.class);
final com.linkedin.datahub.graphql.generated.DashboardUsageMetrics dashboardUsageMetrics =
new com.linkedin.datahub.graphql.generated.DashboardUsageMetrics();
dashboardUsageMetrics.setLastViewed(gmsDashboardUsageStatistics.getLastViewedAt());
dashboardUsageMetrics.setViewsCount(gmsDashboardUsageStatistics.getViewsCount());
dashboardUsageMetrics.setExecutionsCount(gmsDashboardUsageStatistics.getExecutionsCount());
dashboardUsageMetrics.setFavoritesCount(gmsDashboardUsageStatistics.getFavoritesCount());
dashboardUsageMetrics.setTimestampMillis(gmsDashboardUsageStatistics.getTimestampMillis());
return dashboardUsageMetrics;
}
}

View File

@ -4121,6 +4121,12 @@ type Dashboard implements EntityWithRelationships & Entity {
"""
lineage(input: LineageInput!): EntityLineageResult
"""
Experimental (Subject to breaking change) -- Statistics about how this Dashboard is used
"""
usageStats(startTimeMillis: Long, endTimeMillis: Long, limit: Int): DashboardUsageQueryResult
"""
Deprecated, use properties field instead
Additional read only information about the dashboard
@ -5247,6 +5253,158 @@ type FieldUsageCounts {
count: Int
}
"""
Information about individual user usage of a Dashboard
"""
type DashboardUserUsageCounts {
"""
The user of the Dashboard
"""
user: CorpUser
"""
number of times dashboard has been viewed by the user
"""
viewsCount: Int
"""
number of dashboard executions by the user
"""
executionsCount: Int
"""
Normalized numeric metric representing user's dashboard usage
Higher value represents more usage
"""
usageCount: Int
}
"""
The result of a dashboard usage query
"""
type DashboardUsageQueryResult {
"""
A set of relevant time windows for use in displaying usage statistics
"""
buckets: [DashboardUsageAggregation]
"""
A set of rolled up aggregations about the dashboard usage
"""
aggregations: DashboardUsageQueryResultAggregations
"""
A set of absolute dashboard usage metrics
"""
metrics: [DashboardUsageMetrics!]
}
"""
A set of rolled up aggregations about the Dashboard usage
"""
type DashboardUsageQueryResultAggregations {
"""
The count of unique Dashboard users within the queried time range
"""
uniqueUserCount: Int
"""
The specific per user usage counts within the queried time range
"""
users: [DashboardUserUsageCounts]
"""
The total number of dashboard views within the queried time range
"""
viewsCount: Int
"""
The total number of dashboard executions within the queried time range
"""
executionsCount: Int
}
"""
A set of absolute dashboard usage metrics
"""
type DashboardUsageMetrics implements TimeSeriesAspect {
"""
The time at which the metrics were reported
"""
timestampMillis: Long!
"""
The total number of times dashboard has been favorited
FIXME: Qualifies as Popularity Metric rather than Usage Metric?
"""
favoritesCount: Int
"""
The total number of dashboard views
"""
viewsCount: Int
"""
The total number of dashboard execution
"""
executionsCount: Int
"""
The time when this dashboard was last viewed
"""
lastViewed: Long
}
"""
An aggregation of Dashboard usage statistics
"""
type DashboardUsageAggregation {
"""
The time window start time
"""
bucket: Long
"""
The time window span
"""
duration: WindowDuration
"""
The resource urn associated with the usage information, eg a Dashboard urn
"""
resource: String
"""
The rolled up usage metrics
"""
metrics: DashboardUsageAggregationMetrics
}
"""
Rolled up metrics about Dashboard usage over time
"""
type DashboardUsageAggregationMetrics {
"""
The unique number of dashboard users within the time range
"""
uniqueUserCount: Int
"""
The total number of dashboard views within the time range
"""
viewsCount: Int
"""
The total number of dashboard executions within the time range
"""
executionsCount: Int
}
"""
The duration of a fixed window of time
"""
@ -5273,7 +5431,7 @@ enum WindowDuration {
}
"""
A time range used in fetching Dataset Usage statistics
A time range used in fetching Usage statistics
"""
enum TimeRange {
"""

View File

@ -0,0 +1,164 @@
# Imports for urn construction utility methods
from datetime import datetime
from typing import List
from datahub.emitter.mce_builder import make_dashboard_urn, make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
CalendarIntervalClass,
ChangeTypeClass,
DashboardUsageStatisticsClass,
DashboardUserUsageCountsClass,
TimeWindowSizeClass,
)
# Create rest emitter
rest_emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
usage_day_1_user_counts: List[DashboardUserUsageCountsClass] = [
DashboardUserUsageCountsClass(
user=make_user_urn("user1"), executionsCount=3, usageCount=3
),
DashboardUserUsageCountsClass(
user=make_user_urn("user2"), executionsCount=2, usageCount=2
),
]
usage_day_1: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-09", "%Y-%m-%d").timestamp() * 1000
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
uniqueUserCount=2,
executionsCount=5,
userCounts=usage_day_1_user_counts,
),
)
absolute_usage_as_of_day_1: MetadataChangeProposalWrapper = (
MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-09", "%Y-%m-%d").timestamp() * 1000
),
favoritesCount=100,
viewsCount=25,
lastViewedAt=round(
datetime.strptime(
"2022-02-09 04:45:30", "%Y-%m-%d %H:%M:%S"
).timestamp()
* 1000
),
),
)
)
rest_emitter.emit(usage_day_1)
rest_emitter.emit(absolute_usage_as_of_day_1)
usage_day_2_user_counts: List[DashboardUserUsageCountsClass] = [
DashboardUserUsageCountsClass(
user=make_user_urn("user1"), executionsCount=4, usageCount=4
),
DashboardUserUsageCountsClass(
user=make_user_urn("user2"), executionsCount=6, usageCount=6
),
]
usage_day_2: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-10", "%Y-%m-%d").timestamp() * 1000
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
uniqueUserCount=2,
executionsCount=10,
userCounts=usage_day_2_user_counts,
),
)
absolute_usage_as_of_day_2: MetadataChangeProposalWrapper = (
MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-10", "%Y-%m-%d").timestamp() * 1000
),
favoritesCount=100,
viewsCount=27,
lastViewedAt=round(
datetime.strptime(
"2022-02-10 10:45:30", "%Y-%m-%d %H:%M:%S"
).timestamp()
* 1000
),
),
)
)
rest_emitter.emit(usage_day_2)
rest_emitter.emit(absolute_usage_as_of_day_2)
usage_day_3_user_counts: List[DashboardUserUsageCountsClass] = [
DashboardUserUsageCountsClass(
user=make_user_urn("user1"), executionsCount=2, usageCount=2
),
]
usage_day_3: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-11", "%Y-%m-%d").timestamp() * 1000
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
uniqueUserCount=1,
executionsCount=2,
userCounts=usage_day_3_user_counts,
),
)
absolute_usage_as_of_day_3: MetadataChangeProposalWrapper = (
MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-11", "%Y-%m-%d").timestamp() * 1000
),
favoritesCount=102,
viewsCount=30,
lastViewedAt=round(
datetime.strptime(
"2022-02-11 02:45:30", "%Y-%m-%d %H:%M:%S"
).timestamp()
* 1000
),
),
)
)
rest_emitter.emit(usage_day_3)
rest_emitter.emit(absolute_usage_as_of_day_3)

View File

@ -63,16 +63,48 @@ from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsClass,
CalendarIntervalClass,
ChangeTypeClass,
ChartInfoClass,
ChartTypeClass,
DashboardInfoClass,
DashboardUsageStatisticsClass,
DashboardUserUsageCountsClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
TimeWindowSizeClass,
)
logger = logging.getLogger(__name__)
usage_queries: Dict[str, Dict] = {
# Query - group by dashboard and date, find unique users, dashboard runs count
"counts_per_day_per_dashboard": {
"model": "system__activity",
"view": "history",
"fields": [
"history.dashboard_id",
"history.created_date",
"history.dashboard_user",
"history.dashboard_run_count",
],
"filters": {},
},
# Query - group by user, dashboard and date, find runs count
"counts_per_day_per_user_per_dashboard": {
"model": "system__activity",
"view": "history",
"fields": [
"history.created_date",
"history.dashboard_id",
"history.dashboard_run_count",
"user.id",
],
"filters": {},
},
}
class TransportOptionsConfig(ConfigModel):
timeout: int
@ -158,6 +190,15 @@ class LookerDashboardSourceConfig(LookerAPIConfig, LookerCommonConfig):
None,
description="Optional URL to use when constructing external URLs to Looker if the `base_url` is not the correct one to use. For example, `https://looker-public.company.com`. If not provided, the external base URL will default to `base_url`.",
)
extract_usage_history: bool = Field(
False,
description="Experimental (Subject to breaking change) -- Whether to ingest usage statistics for dashboards. Setting this to True will query looker system activity explores to fetch historical dashboard usage.",
)
# TODO - stateful ingestion to autodetect usage history interval
extract_usage_history_for_interval: str = Field(
"1 day ago",
description="Experimental (Subject to breaking change) -- Used only if extract_usage_history is set to True. Interval to extract looker dashboard usage history for . https://docs.looker.com/reference/filter-expressions#date_and_time",
)
@validator("external_base_url", pre=True, always=True)
def external_url_defaults_to_api_config_base_url(
@ -308,6 +349,9 @@ class LookerDashboard:
last_updated_by: Optional[LookerUser] = None
deleted_at: Optional[datetime.datetime] = None
deleted_by: Optional[LookerUser] = None
favorite_count: Optional[int] = None
view_count: Optional[int] = None
last_viewed_at: Optional[datetime.datetime] = None
def url(self, base_url):
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
@ -447,7 +491,7 @@ class LookerDashboardSource(Source):
def _get_looker_dashboard_element( # noqa: C901
self, element: DashboardElement
) -> Optional[LookerDashboardElement]:
# Dashboard elements can use raw queries against explores
# Dashboard elements can use raw usage_queries against explores
explores: List[str]
fields: List[str]
@ -697,12 +741,14 @@ class LookerDashboardSource(Source):
def _make_dashboard_and_chart_mces(
self, looker_dashboard: LookerDashboard
) -> List[MetadataChangeEvent]:
) -> Iterable[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
chart_mces = [
self._make_chart_mce(element, looker_dashboard)
for element in looker_dashboard.dashboard_elements
if element.type == "vis"
]
for chart_mce in chart_mces:
yield chart_mce
dashboard_urn = builder.make_dashboard_urn(
self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id()
@ -734,8 +780,27 @@ class LookerDashboardSource(Source):
dashboard_snapshot.aspects.append(Status(removed=looker_dashboard.is_deleted))
dashboard_mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot)
yield dashboard_mce
return chart_mces + [dashboard_mce]
if self.source_config.extract_usage_history:
# Emit snapshot values of dashboard usage - do this always ?
dashboard_usage_mcp = MetadataChangeProposalWrapper(
entityType="dashboard",
entityUrn=dashboard_urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
favoritesCount=looker_dashboard.favorite_count,
viewsCount=looker_dashboard.view_count,
lastViewedAt=round(
looker_dashboard.last_viewed_at.timestamp() * 1000
)
if looker_dashboard.last_viewed_at
else None,
),
)
yield dashboard_usage_mcp
def get_ownership(
self, looker_dashboard: LookerDashboard
@ -859,6 +924,9 @@ class LookerDashboardSource(Source):
last_updated_by=self._get_looker_user(dashboard.last_updater_id),
deleted_at=dashboard.deleted_at,
deleted_by=self._get_looker_user(dashboard.deleter_id),
favorite_count=dashboard.favorite_count,
view_count=dashboard.view_count,
last_viewed_at=dashboard.last_viewed_at,
)
return looker_dashboard
@ -908,6 +976,12 @@ class LookerDashboardSource(Source):
"deleted_at",
"deleter_id",
]
if self.source_config.extract_usage_history:
fields += [
"favorite_count",
"view_count",
"last_viewed_at",
]
dashboard_object: Dashboard = self.client.dashboard(
dashboard_id=dashboard_id,
fields=",".join(fields),
@ -939,10 +1013,117 @@ class LookerDashboardSource(Source):
# for mce in mces:
workunits = [
MetadataWorkUnit(id=f"looker-{mce.proposedSnapshot.urn}", mce=mce)
if isinstance(mce, MetadataChangeEvent)
else MetadataWorkUnit(
id=f"looker-{mce.aspectName}-{mce.entityUrn}", mcp=mce
)
for mce in mces
]
return workunits, dashboard_id, start_time, datetime.datetime.now()
def extract_usage_history_from_system_activity(
self, dashboard_ids: List[str]
) -> Iterable[MetadataChangeProposalWrapper]:
# key tuple (dashboard_id, date)
dashboard_usages: Dict[tuple, DashboardUsageStatisticsClass] = dict()
common_filters = {
"history.dashboard_id": ",".join(dashboard_ids),
"history.created_date": self.source_config.extract_usage_history_for_interval,
}
for query in usage_queries.values():
query["filters"].update(common_filters)
self._populate_dashboard_counts(dashboard_usages)
self._populate_userwise_runs_counts(dashboard_usages)
for key, val in dashboard_usages.items():
yield MetadataChangeProposalWrapper(
entityType="dashboard",
entityUrn=builder.make_dashboard_urn(
self.source_config.platform_name,
f"dashboards.{key[0]}", # in sync with LookerDashboard.get_urn_dashboard_id
),
changeType=ChangeTypeClass.UPSERT,
aspectName="dashboardUsageStatistics",
aspect=val,
)
def _populate_userwise_runs_counts(self, dashboard_usages):
userwise_count_rows = LookerUtil.run_inline_query(
self.client, usage_queries["counts_per_day_per_user_per_dashboard"]
)
for row in userwise_count_rows:
user: Optional[LookerUser] = (
self.user_registry.get_by_id(
row["user.id"],
self.source_config.transport_options.get_transport_options()
if self.source_config.transport_options is not None
else None,
)
if row["user.id"] is not None
else None
)
if user is None:
logger.warning(
f"Unable to resolve user with id {row['user.id']}, skipping"
)
continue
user_urn: Optional[str] = user._get_urn(
self.source_config.strip_user_ids_from_email
)
if user_urn is None:
logger.warning(
f"Unable to resolve urn for user with id {row['user.id']}, skipping"
)
continue
user_usage: DashboardUserUsageCountsClass = DashboardUserUsageCountsClass(
user=user_urn,
executionsCount=row["history.dashboard_run_count"],
usageCount=row["history.dashboard_run_count"],
)
usage_mcp_prev = dashboard_usages.get(
(row["history.dashboard_id"], row["history.created_date"])
)
if usage_mcp_prev is None:
# Unreachable
logger.warning(
f"User counts found but no users for {row['history.dashboard_id']} on date {row['history.created_date']}"
)
continue
if usage_mcp_prev.userCounts is None:
usage_mcp_prev.userCounts = [user_usage]
else:
usage_mcp_prev.userCounts.append(user_usage)
def _populate_dashboard_counts(self, dashboard_usages):
count_rows = LookerUtil.run_inline_query(
self.client,
usage_queries["counts_per_day_per_dashboard"],
)
for row in count_rows:
dashboard_usages[
(row["history.dashboard_id"], row["history.created_date"])
] = DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.datetime.strptime(row["history.created_date"], "%Y-%m-%d")
.replace(tzinfo=datetime.timezone.utc)
.timestamp()
* 1000
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
uniqueUserCount=row["history.dashboard_user"],
executionsCount=row["history.dashboard_run_count"],
)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
dashboards = self.client.all_dashboards(
fields="id",
@ -1029,5 +1210,17 @@ class LookerDashboardSource(Source):
self.reporter.report_workunit(workunit)
yield workunit
if self.source_config.extract_usage_history and dashboard_ids is not None:
usage_mcps = self.extract_usage_history_from_system_activity(
dashboard_ids # type:ignore
)
for usage_mcp in usage_mcps:
workunit = MetadataWorkUnit(
id=f"looker-{usage_mcp.aspectName}-{usage_mcp.entityUrn}-{usage_mcp.aspect.timestampMillis}", # type:ignore
mcp=usage_mcp,
)
self.reporter.report_workunit(workunit)
yield workunit
def get_report(self) -> SourceReport:
return self.reporter

View File

@ -1,3 +1,4 @@
import json
import logging
import re
from dataclasses import dataclass
@ -8,6 +9,7 @@ import pydantic
from looker_sdk.error import SDKError
from looker_sdk.rtl.transport import TransportOptions
from looker_sdk.sdk.api31.methods import Looker31SDK
from looker_sdk.sdk.api31.models import WriteQuery
from pydantic import BaseModel, Field
from pydantic.class_validators import validator
@ -457,6 +459,42 @@ class LookerUtil:
"""Returns a display name that corresponds to the Looker conventions"""
return name.replace("_", " ").title() if name else name
@staticmethod
def create_query_request(q: dict, limit: Optional[str] = None) -> WriteQuery:
return WriteQuery(
model=q["model"],
view=q["view"],
fields=q.get("fields"),
filters=q.get("filters"),
filter_expression=q.get("filter_expressions"),
sorts=q.get("sorts"),
limit=q.get("limit") or limit,
column_limit=q.get("column_limit"),
vis_config={"type": "looker_column"},
filter_config=q.get("filter_config"),
query_timezone="UTC",
)
@staticmethod
def run_inline_query(client: Looker31SDK, q: dict) -> List:
response_sql = client.run_inline_query(
result_format="sql",
body=LookerUtil.create_query_request(q),
)
logger.debug("=================Query=================")
logger.debug(response_sql)
response_json = client.run_inline_query(
result_format="json",
body=LookerUtil.create_query_request(q),
)
logger.debug("=================Response=================")
data = json.loads(response_json)
logger.debug(f"length {len(data)}")
return data
@dataclass
class LookerExplore:

View File

@ -0,0 +1,387 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": {
"urn": "urn:li:dashboard:(looker,dashboards.1)",
"aspects": [
{
"com.linkedin.pegasus2avro.dashboard.DashboardInfo": {
"customProperties": {},
"externalUrl": null,
"title": "foo",
"description": "lorem ipsum",
"charts": [],
"datasets": [],
"lastModified": {
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"deleted": null
},
"dashboardUrl": "https://looker.company.com/dashboards/1",
"access": null,
"lastRefreshed": null
}
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.1)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dashboardUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1586847600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"viewsCount\": 25, \"favoritesCount\": 5, \"lastViewedAt\": 1586847600000}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/prod/looker/lkml_samples/explores/data.my_view"
]
}
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"looker.explore.label": "My Explore View",
"looker.explore.file": "test_source_file.lkml"
},
"externalUrl": "https://looker.company.com/explore/data/my_view",
"name": "My Explore View",
"qualifiedName": null,
"description": "lorem ipsum",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.underlying_view,PROD)",
"type": "VIEW"
}
],
"fineGrainedLineages": null
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "my_view",
"platform": "urn:li:dataPlatform:looker",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "dim1",
"jsonPath": null,
"nullable": false,
"description": "",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension",
"context": null
}
]
},
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
}
],
"primaryKeys": [],
"foreignKeysSpecs": null,
"foreignKeys": null
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,data.explore.my_view,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"explore\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {
"urn": "urn:li:tag:Dimension",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER",
"source": null
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
}
}
},
{
"com.linkedin.pegasus2avro.tag.TagProperties": {
"name": "Dimension",
"description": "A tag that is applied to all dimension fields.",
"colorHex": null
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {
"urn": "urn:li:tag:Temporal",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER",
"source": null
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
}
}
},
{
"com.linkedin.pegasus2avro.tag.TagProperties": {
"name": "Temporal",
"description": "A tag that is applied to all time-based (temporal) fields such as timestamps or durations.",
"colorHex": null
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": {
"urn": "urn:li:tag:Measure",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER",
"source": null
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
}
}
},
{
"com.linkedin.pegasus2avro.tag.TagProperties": {
"name": "Measure",
"description": "A tag that is applied to all measures (metrics). Measures are typically the columns that you aggregate on",
"colorHex": null
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.11)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dashboardUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1656979200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"executionsCount\": 14, \"uniqueUserCount\": 1, \"userCounts\": [{\"user\": \"urn:li:corpuser:test@looker.com\", \"executionsCount\": 14, \"usageCount\": 14}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.12)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dashboardUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1656979200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"executionsCount\": 14, \"uniqueUserCount\": 1, \"userCounts\": [{\"user\": \"urn:li:corpuser:test@looker.com\", \"executionsCount\": 14, \"usageCount\": 14}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,dashboards.37)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dashboardUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1656979200000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"executionsCount\": 5, \"uniqueUserCount\": 1, \"userCounts\": [{\"user\": \"urn:li:corpuser:test@looker.com\", \"executionsCount\": 5, \"usageCount\": 5}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
}
]

View File

@ -1,3 +1,4 @@
import json
import time
from datetime import datetime
from unittest import mock
@ -11,9 +12,12 @@ from looker_sdk.sdk.api31.models import (
LookmlModelExploreFieldset,
LookmlModelExploreJoins,
Query,
User,
WriteQuery,
)
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.looker import usage_queries
from tests.test_helpers import mce_helpers
FROZEN_TIME = "2020-04-14 07:00:00"
@ -286,6 +290,69 @@ def setup_mock_explore(mocked_client):
)
def setup_mock_user(mocked_client):
mocked_client.user.return_value = User(id=1, email="test@looker.com")
def side_effect_query_inline(result_format: str, body: WriteQuery) -> str:
query_type = None
if result_format == "sql":
return "" # Placeholder for sql text
for query_name, query_template in usage_queries.items():
if body.fields == query_template["fields"]:
query_type = query_name
if query_type == "counts_per_day_per_dashboard":
return json.dumps(
[
{
"history.dashboard_id": "11",
"history.created_date": "2022-07-05",
"history.dashboard_user": 1,
"history.dashboard_run_count": 14,
},
{
"history.dashboard_id": "12",
"history.created_date": "2022-07-05",
"history.dashboard_user": 1,
"history.dashboard_run_count": 14,
},
{
"history.dashboard_id": "37",
"history.created_date": "2022-07-05",
"history.dashboard_user": 1,
"history.dashboard_run_count": 5,
},
]
)
if query_type == "counts_per_day_per_user_per_dashboard":
return json.dumps(
[
{
"history.created_date": "2022-07-05",
"history.dashboard_id": "11",
"user.id": 1,
"history.dashboard_run_count": 14,
},
{
"history.created_date": "2022-07-05",
"history.dashboard_id": "12",
"user.id": 1,
"history.dashboard_run_count": 14,
},
{
"history.created_date": "2022-07-05",
"history.dashboard_id": "37",
"user.id": 1,
"history.dashboard_run_count": 5,
},
]
)
raise Exception("Unknown Query")
@freeze_time(FROZEN_TIME)
def test_looker_ingest_allow_pattern(pytestconfig, tmp_path, mock_time):
mocked_client = mock.MagicMock()
@ -355,3 +422,68 @@ def test_looker_ingest_allow_pattern(pytestconfig, tmp_path, mock_time):
output_path=tmp_path / "looker_mces.json",
golden_path=f"{test_resources_dir}/{mce_out_file}",
)
@freeze_time(FROZEN_TIME)
def test_looker_ingest_usage_history(pytestconfig, tmp_path, mock_time):
mocked_client = mock.MagicMock()
with mock.patch("looker_sdk.init31") as mock_sdk:
mock_sdk.return_value = mocked_client
mocked_client.all_dashboards.return_value = [Dashboard(id="1")]
mocked_client.dashboard.return_value = Dashboard(
id="1",
title="foo",
created_at=datetime.utcfromtimestamp(time.time()),
updated_at=datetime.utcfromtimestamp(time.time()),
description="lorem ipsum",
favorite_count=5,
view_count=25,
last_viewed_at=datetime.utcfromtimestamp(time.time()),
dashboard_elements=[
DashboardElement(
id="2",
type="",
subtitle_text="Some text",
query=Query(
model="data",
view="my_view",
dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}]',
),
)
],
)
mocked_client.run_inline_query.side_effect = side_effect_query_inline
setup_mock_explore(mocked_client)
setup_mock_user(mocked_client)
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
pipeline = Pipeline.create(
{
"run_id": "looker-test",
"source": {
"type": "looker",
"config": {
"base_url": "https://looker.company.com",
"client_id": "foo",
"client_secret": "bar",
"extract_usage_history": True,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/looker_mces.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_out_file = "looker_mces_usage_history.json"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "looker_mces.json",
golden_path=f"{test_resources_dir}/{mce_out_file}",
)

View File

@ -3,7 +3,7 @@ import logging
import os
import pytest
from boto3 import Session
from boto3.session import Session
from moto import mock_s3
from pydantic import ValidationError

View File

@ -60,7 +60,8 @@ public class ESUtils {
log.warn("Received query Filter with a deprecated field 'criteria'. Use 'or' instead.");
final BoolQueryBuilder andQueryBuilder = new BoolQueryBuilder();
filter.getCriteria().forEach(criterion -> {
if (!criterion.getValue().trim().isEmpty() || criterion.hasValues()) {
if (!criterion.getValue().trim().isEmpty() || criterion.hasValues()
|| criterion.getCondition() == Condition.IS_NULL) {
andQueryBuilder.must(getQueryBuilderFromCriterion(criterion));
}
});
@ -73,7 +74,8 @@ public class ESUtils {
public static BoolQueryBuilder buildConjunctiveFilterQuery(@Nonnull ConjunctiveCriterion conjunctiveCriterion) {
final BoolQueryBuilder andQueryBuilder = new BoolQueryBuilder();
conjunctiveCriterion.getAnd().forEach(criterion -> {
if (!criterion.getValue().trim().isEmpty() || criterion.hasValues()) {
if (!criterion.getValue().trim().isEmpty() || criterion.hasValues()
|| criterion.getCondition() == Condition.IS_NULL) {
andQueryBuilder.must(getQueryBuilderFromCriterion(criterion));
}
});
@ -120,6 +122,8 @@ public class ESUtils {
Arrays.stream(criterion.getValue().trim().split("\\s*,\\s*"))
.forEach(elem -> filters.should(QueryBuilders.matchQuery(criterion.getField(), elem)));
return filters;
} else if (condition == Condition.IS_NULL) {
return QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(criterion.getField()));
} else if (condition == Condition.GREATER_THAN) {
return QueryBuilders.rangeQuery(criterion.getField()).gt(criterion.getValue().trim());
} else if (condition == Condition.GREATER_THAN_OR_EQUAL_TO) {

View File

@ -3,7 +3,7 @@ namespace com.linkedin.dashboard
import com.linkedin.timeseries.TimeseriesAspectBase
/**
* Stats corresponding to dashboard's usage.
* Experimental (Subject to breaking change) -- Stats corresponding to dashboard's usage.
*
* If this aspect represents the latest snapshot of the statistics about a Dashboard, the eventGranularity field should be null.
* If this aspect represents a bucketed window of usage statistics (e.g. over a day), then the eventGranularity field should be set accordingly.

View File

@ -20,6 +20,11 @@ enum Condition {
*/
EQUAL
/**
* Represent the relation: field is null, e.g. platform is null
*/
IS_NULL
/**
* Represent the relation greater than, e.g. ownerCount > 5
*/

View File

@ -56,7 +56,7 @@
"type" : "enum",
"name" : "Condition",
"doc" : "The matching condition in a filter criterion",
"symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ],
"symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "IS_NULL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ],
"symbolDocs" : {
"CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile",
"END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event",
@ -64,6 +64,7 @@
"GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5",
"GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5",
"IN" : "Represent the relation: String field is one of the array values to, e.g. name in [\"Profile\", \"Event\"]",
"IS_NULL" : "Represent the relation: field is null, e.g. platform is null",
"LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3",
"LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3",
"START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView"

View File

@ -146,7 +146,7 @@
"type" : "enum",
"name" : "Condition",
"doc" : "The matching condition in a filter criterion",
"symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ],
"symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "IS_NULL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ],
"symbolDocs" : {
"CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile",
"END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event",
@ -154,6 +154,7 @@
"GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5",
"GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5",
"IN" : "Represent the relation: String field is one of the array values to, e.g. name in [\"Profile\", \"Event\"]",
"IS_NULL" : "Represent the relation: field is null, e.g. platform is null",
"LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3",
"LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3",
"START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView"

View File

@ -5157,7 +5157,7 @@
"name" : "Condition",
"namespace" : "com.linkedin.metadata.query.filter",
"doc" : "The matching condition in a filter criterion",
"symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ],
"symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "IS_NULL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "IN", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ],
"symbolDocs" : {
"CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile",
"END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event",
@ -5165,6 +5165,7 @@
"GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5",
"GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5",
"IN" : "Represent the relation: String field is one of the array values to, e.g. name in [\"Profile\", \"Event\"]",
"IS_NULL" : "Represent the relation: field is null, e.g. platform is null",
"LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3",
"LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3",
"START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView"

View File

@ -102,6 +102,7 @@ public class Constants {
public static final String DASHBOARD_KEY_ASPECT_NAME = "dashboardKey";
public static final String DASHBOARD_INFO_ASPECT_NAME = "dashboardInfo";
public static final String EDITABLE_DASHBOARD_PROPERTIES_ASPECT_NAME = "editableDashboardProperties";
public static final String DASHBOARD_USAGE_STATISTICS_ASPECT_NAME = "dashboardUsageStatistics";
// Notebook
public static final String NOTEBOOK_KEY_ASPECT_NAME = "notebookKey";