mirror of
https://github.com/datahub-project/datahub.git
synced 2026-01-03 21:35:39 +00:00
refactor(API): Add "Filter" support for Assertion Run Events, Dataset Profiles, Dataset Operations (#4869)
This commit is contained in:
parent
15438f62f1
commit
25b89b318c
@ -1,12 +1,13 @@
|
||||
package com.linkedin.datahub.graphql.resolvers.assertion;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.generated.Assertion;
|
||||
import com.linkedin.datahub.graphql.generated.AssertionResultType;
|
||||
import com.linkedin.datahub.graphql.generated.AssertionRunEvent;
|
||||
import com.linkedin.datahub.graphql.generated.AssertionRunEventsResult;
|
||||
import com.linkedin.datahub.graphql.generated.AssertionRunStatus;
|
||||
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
|
||||
import com.linkedin.datahub.graphql.generated.FilterInput;
|
||||
import com.linkedin.datahub.graphql.types.dataset.mappers.AssertionRunEventMapper;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.metadata.Constants;
|
||||
@ -19,11 +20,14 @@ import com.linkedin.metadata.query.filter.Filter;
|
||||
import com.linkedin.r2.RemoteInvocationException;
|
||||
import graphql.schema.DataFetcher;
|
||||
import graphql.schema.DataFetchingEnvironment;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
|
||||
|
||||
|
||||
/**
|
||||
* GraphQL Resolver used for fetching AssertionRunEvents.
|
||||
@ -47,6 +51,9 @@ public class AssertionRunEventResolver implements DataFetcher<CompletableFuture<
|
||||
final Long maybeStartTimeMillis = environment.getArgumentOrDefault("startTimeMillis", null);
|
||||
final Long maybeEndTimeMillis = environment.getArgumentOrDefault("endTimeMillis", null);
|
||||
final Integer maybeLimit = environment.getArgumentOrDefault("limit", null);
|
||||
final FilterInput maybeFilters = environment.getArgument("filter") != null
|
||||
? bindArgument(environment.getArgument("filter"), FilterInput.class)
|
||||
: null;
|
||||
|
||||
try {
|
||||
// Step 1: Fetch aspects from GMS
|
||||
@ -58,7 +65,7 @@ public class AssertionRunEventResolver implements DataFetcher<CompletableFuture<
|
||||
maybeEndTimeMillis,
|
||||
maybeLimit,
|
||||
false,
|
||||
buildStatusFilter(maybeStatus),
|
||||
buildFilter(maybeFilters, maybeStatus),
|
||||
context.getAuthentication());
|
||||
|
||||
// Step 2: Bind profiles into GraphQL strong types.
|
||||
@ -87,16 +94,19 @@ public class AssertionRunEventResolver implements DataFetcher<CompletableFuture<
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private Filter buildStatusFilter(@Nullable final String status) {
|
||||
if (status == null) {
|
||||
private Filter buildFilter(@Nullable FilterInput filtersInput, @Nullable final String status) {
|
||||
if (filtersInput == null && status == null) {
|
||||
return null;
|
||||
}
|
||||
return new Filter().setOr(new ConjunctiveCriterionArray(ImmutableList.of(
|
||||
new ConjunctiveCriterion().setAnd(new CriterionArray(ImmutableList.of(
|
||||
new Criterion()
|
||||
.setField("status")
|
||||
.setValue(status)
|
||||
)))
|
||||
)));
|
||||
List<FacetFilterInput> facetFilters = new ArrayList<>();
|
||||
if (status != null) {
|
||||
facetFilters.add(new FacetFilterInput("status", status));
|
||||
}
|
||||
if (filtersInput != null) {
|
||||
facetFilters.addAll(filtersInput.getAnd());
|
||||
}
|
||||
return new Filter().setOr(new ConjunctiveCriterionArray(new ConjunctiveCriterion().setAnd(new CriterionArray(facetFilters.stream()
|
||||
.map(filter -> new Criterion().setField(filter.getField()).setValue(filter.getValue()))
|
||||
.collect(Collectors.toList())))));
|
||||
}
|
||||
}
|
||||
|
||||
@ -4,11 +4,17 @@ import com.datahub.authorization.ResourceSpec;
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
|
||||
import com.linkedin.datahub.graphql.generated.Entity;
|
||||
import com.linkedin.datahub.graphql.generated.FilterInput;
|
||||
import com.linkedin.datahub.graphql.generated.TimeSeriesAspect;
|
||||
import com.linkedin.entity.client.EntityClient;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.aspect.EnvelopedAspect;
|
||||
import com.linkedin.metadata.authorization.PoliciesConfig;
|
||||
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
|
||||
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
|
||||
import com.linkedin.metadata.query.filter.Criterion;
|
||||
import com.linkedin.metadata.query.filter.CriterionArray;
|
||||
import com.linkedin.metadata.query.filter.Filter;
|
||||
import com.linkedin.r2.RemoteInvocationException;
|
||||
import graphql.schema.DataFetcher;
|
||||
import graphql.schema.DataFetchingEnvironment;
|
||||
@ -18,6 +24,10 @@ import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
|
||||
|
||||
|
||||
/**
|
||||
@ -33,6 +43,7 @@ import java.util.stream.Collectors;
|
||||
* be invoked for each {@link EnvelopedAspect} received from the GMS getTimeSeriesAspectValues API.
|
||||
*
|
||||
*/
|
||||
@Slf4j
|
||||
public class TimeSeriesAspectResolver implements DataFetcher<CompletableFuture<List<TimeSeriesAspect>>> {
|
||||
|
||||
private final EntityClient _client;
|
||||
@ -77,12 +88,15 @@ public class TimeSeriesAspectResolver implements DataFetcher<CompletableFuture<L
|
||||
final Long maybeEndTimeMillis = environment.getArgumentOrDefault("endTimeMillis", null);
|
||||
// Max number of aspects to return.
|
||||
final Integer maybeLimit = environment.getArgumentOrDefault("limit", null);
|
||||
final FilterInput maybeFilters = environment.getArgument("filter") != null
|
||||
? bindArgument(environment.getArgument("filter"), FilterInput.class)
|
||||
: null;
|
||||
|
||||
try {
|
||||
// Step 1: Get aspects.
|
||||
List<EnvelopedAspect> aspects =
|
||||
_client.getTimeseriesAspectValues(urn, _entityName, _aspectName, maybeStartTimeMillis, maybeEndTimeMillis,
|
||||
maybeLimit, null, null, context.getAuthentication());
|
||||
maybeLimit, null, buildFilters(maybeFilters), context.getAuthentication());
|
||||
|
||||
// Step 2: Bind profiles into GraphQL strong types.
|
||||
return aspects.stream().map(_aspectMapper::apply).collect(Collectors.toList());
|
||||
@ -91,4 +105,13 @@ public class TimeSeriesAspectResolver implements DataFetcher<CompletableFuture<L
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private Filter buildFilters(@Nullable FilterInput maybeFilters) {
|
||||
if (maybeFilters == null) {
|
||||
return null;
|
||||
}
|
||||
return new Filter().setOr(new ConjunctiveCriterionArray(new ConjunctiveCriterion().setAnd(new CriterionArray(maybeFilters.getAnd().stream()
|
||||
.map(filter -> new Criterion().setField(filter.getField()).setValue(filter.getValue()))
|
||||
.collect(Collectors.toList())))));
|
||||
}
|
||||
}
|
||||
|
||||
@ -791,12 +791,12 @@ type Dataset implements EntityWithRelationships & Entity {
|
||||
Profile Stats resource that retrieves the events in a previous unit of time in descending order
|
||||
If no start or end time are provided, the most recent events will be returned
|
||||
"""
|
||||
datasetProfiles(startTimeMillis: Long, endTimeMillis: Long, limit: Int): [DatasetProfile!]
|
||||
datasetProfiles(startTimeMillis: Long, endTimeMillis: Long, filter: FilterInput, limit: Int): [DatasetProfile!]
|
||||
|
||||
"""
|
||||
Operational events for an entity.
|
||||
"""
|
||||
operations(startTimeMillis: Long, endTimeMillis: Long, limit: Int): [Operation!]
|
||||
operations(startTimeMillis: Long, endTimeMillis: Long, filter: FilterInput, limit: Int): [Operation!]
|
||||
|
||||
"""
|
||||
Assertions associated with the Dataset
|
||||
@ -5168,7 +5168,7 @@ type Assertion implements EntityWithRelationships & Entity {
|
||||
Lifecycle events detailing individual runs of this assertion. If startTimeMillis & endTimeMillis are not provided, the most
|
||||
recent events will be returned.
|
||||
"""
|
||||
runEvents(status: AssertionRunStatus, startTimeMillis: Long, endTimeMillis: Long, limit: Int): AssertionRunEventsResult
|
||||
runEvents(status: AssertionRunStatus, startTimeMillis: Long, endTimeMillis: Long, filter: FilterInput, limit: Int): AssertionRunEventsResult
|
||||
|
||||
"""
|
||||
Edges extending from this entity
|
||||
|
||||
@ -566,3 +566,13 @@ type BrowsePath {
|
||||
"""
|
||||
path: [String!]!
|
||||
}
|
||||
|
||||
"""
|
||||
A set of filter criteria
|
||||
"""
|
||||
input FilterInput {
|
||||
"""
|
||||
A list of conjunctive filters
|
||||
"""
|
||||
and: [FacetFilterInput!]!
|
||||
}
|
||||
|
||||
@ -96,6 +96,11 @@ record SchemaField {
|
||||
*/
|
||||
isPartOfKey: boolean = false
|
||||
|
||||
/**
|
||||
* For Datasets which are partitioned, this determines the partitioning key.
|
||||
*/
|
||||
isPartitioningKey: optional boolean
|
||||
|
||||
/**
|
||||
* For schema fields that have other properties that are not modeled explicitly,
|
||||
* use this field to serialize those properties into a JSON string
|
||||
|
||||
@ -14,6 +14,7 @@ record PartitionSpec {
|
||||
/**
|
||||
* String representation of the partition
|
||||
*/
|
||||
@TimeseriesField = {}
|
||||
partition: string
|
||||
|
||||
/**
|
||||
|
||||
@ -885,7 +885,8 @@
|
||||
"addToFilters" : true,
|
||||
"fieldName" : "glossaryTerms",
|
||||
"fieldType" : "URN",
|
||||
"filterNameOverride" : "Glossary Term"
|
||||
"filterNameOverride" : "Glossary Term",
|
||||
"hasValuesFieldName" : "hasGlossaryTerms"
|
||||
}
|
||||
} ]
|
||||
}, "com.linkedin.common.GlossaryTermUrn", {
|
||||
@ -2515,6 +2516,11 @@
|
||||
"type" : "boolean",
|
||||
"doc" : "For schema fields that are part of complex keys, set this field to true\nWe do this to easily distinguish between value and key fields",
|
||||
"default" : false
|
||||
}, {
|
||||
"name" : "isPartitioningKey",
|
||||
"type" : "boolean",
|
||||
"doc" : "For Datasets which are partitioned, this determines the partitioning key.",
|
||||
"optional" : true
|
||||
}, {
|
||||
"name" : "jsonProps",
|
||||
"type" : "string",
|
||||
|
||||
@ -866,7 +866,8 @@
|
||||
"addToFilters" : true,
|
||||
"fieldName" : "glossaryTerms",
|
||||
"fieldType" : "URN",
|
||||
"filterNameOverride" : "Glossary Term"
|
||||
"filterNameOverride" : "Glossary Term",
|
||||
"hasValuesFieldName" : "hasGlossaryTerms"
|
||||
}
|
||||
} ]
|
||||
}, "com.linkedin.common.GlossaryTermUrn", {
|
||||
@ -2908,6 +2909,11 @@
|
||||
"type" : "boolean",
|
||||
"doc" : "For schema fields that are part of complex keys, set this field to true\nWe do this to easily distinguish between value and key fields",
|
||||
"default" : false
|
||||
}, {
|
||||
"name" : "isPartitioningKey",
|
||||
"type" : "boolean",
|
||||
"doc" : "For Datasets which are partitioned, this determines the partitioning key.",
|
||||
"optional" : true
|
||||
}, {
|
||||
"name" : "jsonProps",
|
||||
"type" : "string",
|
||||
|
||||
@ -645,7 +645,8 @@
|
||||
"addToFilters" : true,
|
||||
"fieldName" : "glossaryTerms",
|
||||
"fieldType" : "URN",
|
||||
"filterNameOverride" : "Glossary Term"
|
||||
"filterNameOverride" : "Glossary Term",
|
||||
"hasValuesFieldName" : "hasGlossaryTerms"
|
||||
}
|
||||
} ]
|
||||
}, "com.linkedin.common.GlossaryTermUrn", {
|
||||
@ -2262,6 +2263,11 @@
|
||||
"type" : "boolean",
|
||||
"doc" : "For schema fields that are part of complex keys, set this field to true\nWe do this to easily distinguish between value and key fields",
|
||||
"default" : false
|
||||
}, {
|
||||
"name" : "isPartitioningKey",
|
||||
"type" : "boolean",
|
||||
"doc" : "For Datasets which are partitioned, this determines the partitioning key.",
|
||||
"optional" : true
|
||||
}, {
|
||||
"name" : "jsonProps",
|
||||
"type" : "string",
|
||||
|
||||
@ -866,7 +866,8 @@
|
||||
"addToFilters" : true,
|
||||
"fieldName" : "glossaryTerms",
|
||||
"fieldType" : "URN",
|
||||
"filterNameOverride" : "Glossary Term"
|
||||
"filterNameOverride" : "Glossary Term",
|
||||
"hasValuesFieldName" : "hasGlossaryTerms"
|
||||
}
|
||||
} ]
|
||||
}, "com.linkedin.common.GlossaryTermUrn", {
|
||||
@ -2908,6 +2909,11 @@
|
||||
"type" : "boolean",
|
||||
"doc" : "For schema fields that are part of complex keys, set this field to true\nWe do this to easily distinguish between value and key fields",
|
||||
"default" : false
|
||||
}, {
|
||||
"name" : "isPartitioningKey",
|
||||
"type" : "boolean",
|
||||
"doc" : "For Datasets which are partitioned, this determines the partitioning key.",
|
||||
"optional" : true
|
||||
}, {
|
||||
"name" : "jsonProps",
|
||||
"type" : "string",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user