feat(datajob): Datajob graphql query (#2242)

This commit is contained in:
Fredrik Sannholm 2021-04-08 23:40:16 +03:00 committed by GitHub
parent 172d914ceb
commit fd0923c445
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 660 additions and 12 deletions

View File

@ -427,3 +427,142 @@ Sample Response:
}
}
```
### Query DataFlow
Request:
```
{
dataFlow(urn: "urn:li:dataFlow:(airflow,flow1,foo)") {
urn
type
orchestrator
flowId
info {
name
description
project
}
ownership {
owners {
owner {
username
urn
info {
displayName
email
fullName
manager {
urn
}
}
editableInfo {
aboutMe
skills
}
}
type
source {
url
}
}
lastModified {
actor
}
}
}
}
```
Sample response:
```
{
"data": {
"dataFlow": {
"urn": "urn:li:dataFlow:(airflow,flow1,foo)",
"type": "DATA_FLOW",
"orchestrator": "airflow",
"flowId": "flow1",
"info": {
"name": "flow1",
"description": "My own workflow",
"project": "X"
},
"ownership": {
"owners": [
{
"owner": {
"username": "test-user",
"urn": "urn:li:corpuser:test-user",
"info": null,
"editableInfo": null
},
"type": "DEVELOPER",
"source": null
}
],
"lastModified": {
"actor": "urn:li:corpuser:datahub"
}
}
}
}
}
```
### Query DataJob
Request:
```
{
dataJob(urn: "urn:li:dataJob:(urn:li:dataFlow:(airflow,flow1,foo),task1)") {
urn
type
jobId
dataFlow {
urn
flowId
}
inputOutput {
inputDatasets {
urn
name
}
outputDatasets {
urn
name
}
}
}
}
```
Sample response
```
{
"data": {
"dataJob": {
"urn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,flow1,foo),task1)",
"type": "DATA_JOB",
"jobId": "task1",
"dataFlow": {
"urn": "urn:li:dataFlow:(airflow,flow1,foo)",
"flowId": "flow1"
},
"inputOutput": {
"inputDatasets": [
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:redis,stuff,PROD)",
"name": "stuff"
}
],
"outputDatasets": []
}
}
}
}
```

View File

@ -11,6 +11,9 @@ import com.linkedin.ml.client.MLModels;
import com.linkedin.restli.client.Client;
import com.linkedin.tag.client.Tags;
import com.linkedin.util.Configuration;
import com.linkedin.datajob.client.DataFlows;
import com.linkedin.datajob.client.DataJobs;
/**
* Provides access to clients for use in fetching data from downstream GMS services.
@ -42,6 +45,8 @@ public class GmsClientFactory {
private static MLModels _mlModels;
private static Lineages _lineages;
private static Tags _tags;
private static DataFlows _dataFlows;
private static DataJobs _dataJobs;
private GmsClientFactory() { }
@ -112,6 +117,28 @@ public class GmsClientFactory {
return _mlModels;
}
public static DataFlows getDataFlowsClient() {
if (_dataFlows == null) {
synchronized (GmsClientFactory.class) {
if (_dataFlows == null) {
_dataFlows = new DataFlows(REST_CLIENT);
}
}
}
return _dataFlows;
}
public static DataJobs getDataJobsClient() {
if (_dataJobs == null) {
synchronized (GmsClientFactory.class) {
if (_dataJobs == null) {
_dataJobs = new DataJobs(REST_CLIENT);
}
}
}
return _dataJobs;
}
public static Lineages getLineagesClient() {
if (_lineages == null) {
synchronized (GmsClientFactory.class) {

View File

@ -4,6 +4,8 @@ import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.graphql.generated.Chart;
import com.linkedin.datahub.graphql.generated.ChartInfo;
import com.linkedin.datahub.graphql.generated.DashboardInfo;
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.RelatedDataset;
import com.linkedin.datahub.graphql.resolvers.load.LoadableTypeBatchResolver;
@ -32,6 +34,9 @@ import com.linkedin.datahub.graphql.resolvers.type.EntityInterfaceTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.PlatformSchemaUnionTypeResolver;
import com.linkedin.datahub.graphql.types.tag.TagType;
import com.linkedin.datahub.graphql.types.mlmodel.MLModelType;
import com.linkedin.datahub.graphql.types.dataflow.DataFlowType;
import com.linkedin.datahub.graphql.types.datajob.DataJobType;
import graphql.schema.idl.RuntimeWiring;
import org.apache.commons.io.IOUtils;
@ -67,6 +72,8 @@ public class GmsGraphQLEngine {
public static final DownstreamLineageType DOWNSTREAM_LINEAGE_TYPE = new DownstreamLineageType(GmsClientFactory.getLineagesClient());
public static final TagType TAG_TYPE = new TagType(GmsClientFactory.getTagsClient());
public static final MLModelType ML_MODEL_TYPE = new MLModelType(GmsClientFactory.getMLModelsClient());
public static final DataFlowType DATA_FLOW_TYPE = new DataFlowType(GmsClientFactory.getDataFlowsClient());
public static final DataJobType DATA_JOB_TYPE = new DataJobType(GmsClientFactory.getDataJobsClient());
/**
* Configures the graph objects that can be fetched primary key.
@ -79,7 +86,9 @@ public class GmsGraphQLEngine {
CHART_TYPE,
DASHBOARD_TYPE,
TAG_TYPE,
ML_MODEL_TYPE
ML_MODEL_TYPE,
DATA_FLOW_TYPE,
DATA_JOB_TYPE
);
/**
@ -133,7 +142,7 @@ public class GmsGraphQLEngine {
configureTypeResolvers(builder);
configureTypeExtensions(builder);
configureTagAssociationResolver(builder);
configureMlModelResolvers(builder);
configureDataJobResolvers(builder);
}
public static GraphQLEngine.Builder builder() {
@ -184,10 +193,14 @@ public class GmsGraphQLEngine {
new LoadableTypeResolver<>(
TAG_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("mlModel", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
ML_MODEL_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("dataFlow", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
DATA_FLOW_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("dataJob", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
DATA_JOB_TYPE,
(env) -> env.getArgument(URN_FIELD_NAME))))
);
}
@ -315,15 +328,31 @@ public class GmsGraphQLEngine {
}
/**
* Configures resolvers responsible for resolving the {@link com.linkedin.datahub.graphql.generated.MLModel} type.
* Configures resolvers responsible for resolving the {@link com.linkedin.datahub.graphql.generated.DataJob} type.
*/
private static void configureMlModelResolvers(final RuntimeWiring.Builder builder) {
private static void configureDataJobResolvers(final RuntimeWiring.Builder builder) {
builder
.type("Owner", typeWiring -> typeWiring
.dataFetcher("owner", new AuthenticatedResolver<>(
.type("DataJob", typeWiring -> typeWiring
.dataFetcher("dataFlow", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(
CORP_USER_TYPE,
(env) -> ((Owner) env.getSource()).getOwner().getUrn()))
DATA_FLOW_TYPE,
(env) -> ((DataJob) env.getSource()).getDataFlow().getUrn()))
)
)
.type("DataJobInputOutput", typeWiring -> typeWiring
.dataFetcher("inputDatasets", new AuthenticatedResolver<>(
new LoadableTypeBatchResolver<>(
DATASET_TYPE,
(env) -> ((DataJobInputOutput) env.getSource()).getInputDatasets().stream()
.map(Dataset::getUrn)
.collect(Collectors.toList())))
)
.dataFetcher("outputDatasets", new AuthenticatedResolver<>(
new LoadableTypeBatchResolver<>(
DATASET_TYPE,
(env) -> ((DataJobInputOutput) env.getSource()).getOutputDatasets().stream()
.map(Dataset::getUrn)
.collect(Collectors.toList())))
)
);
}

View File

@ -0,0 +1,105 @@
package com.linkedin.datahub.graphql.types.dataflow;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.net.URISyntaxException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AutoCompleteResults;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.DataFlow;
import com.linkedin.datahub.graphql.generated.SearchResults;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.SearchableEntityType;
import com.linkedin.datahub.graphql.types.mappers.AutoCompleteResultsMapper;
import com.linkedin.datahub.graphql.types.mappers.SearchResultsMapper;
import com.linkedin.datahub.graphql.types.dataflow.mappers.DataFlowMapper;
import com.linkedin.metadata.query.AutoCompleteResult;
import com.linkedin.datajob.client.DataFlows;
import com.linkedin.restli.common.CollectionResponse;
public class DataFlowType implements SearchableEntityType<DataFlow> {
private static final Set<String> FACET_FIELDS = ImmutableSet.of("orchestrator", "cluster");
private static final String DEFAULT_AUTO_COMPLETE_FIELD = "flowId";
private final DataFlows _dataFlowsClient;
public DataFlowType(final DataFlows dataFlowsClient) {
_dataFlowsClient = dataFlowsClient;
}
@Override
public EntityType type() {
return EntityType.DATA_FLOW;
}
@Override
public Class<DataFlow> objectClass() {
return DataFlow.class;
}
@Override
public List<DataFlow> batchLoad(final List<String> urns, final QueryContext context) throws Exception {
final List<DataFlowUrn> dataFlowUrns = urns.stream()
.map(this::getDataFlowUrn)
.collect(Collectors.toList());
try {
final Map<DataFlowUrn, com.linkedin.datajob.DataFlow> dataFlowMap = _dataFlowsClient.batchGet(dataFlowUrns
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toSet()));
final List<com.linkedin.datajob.DataFlow> gmsResults = dataFlowUrns.stream()
.map(flowUrn -> dataFlowMap.getOrDefault(flowUrn, null)).collect(Collectors.toList());
return gmsResults.stream()
.map(gmsDataFlow -> gmsDataFlow == null ? null : DataFlowMapper.map(gmsDataFlow))
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("Failed to batch load DataFlows", e);
}
}
@Override
public SearchResults search(@Nonnull String query,
@Nullable List<FacetFilterInput> filters,
int start,
int count,
@Nonnull final QueryContext context) throws Exception {
final Map<String, String> facetFilters = ResolverUtils.buildFacetFilters(filters, FACET_FIELDS);
final CollectionResponse<com.linkedin.datajob.DataFlow> searchResult = _dataFlowsClient.search(query, facetFilters, start, count);
return SearchResultsMapper.map(searchResult, DataFlowMapper::map);
}
@Override
public AutoCompleteResults autoComplete(@Nonnull String query,
@Nullable String field,
@Nullable List<FacetFilterInput> filters,
int limit,
@Nonnull final QueryContext context) throws Exception {
final Map<String, String> facetFilters = ResolverUtils.buildFacetFilters(filters, FACET_FIELDS);
field = field != null ? field : DEFAULT_AUTO_COMPLETE_FIELD;
final AutoCompleteResult result = _dataFlowsClient.autoComplete(query, field, facetFilters, limit);
return AutoCompleteResultsMapper.map(result);
}
private DataFlowUrn getDataFlowUrn(String urnStr) {
try {
return DataFlowUrn.createFromString(urnStr);
} catch (URISyntaxException e) {
throw new RuntimeException(String.format("Failed to retrieve dataflow with urn %s, invalid urn", urnStr));
}
}
}

View File

@ -0,0 +1,44 @@
package com.linkedin.datahub.graphql.types.dataflow.mappers;
import com.linkedin.datahub.graphql.generated.DataFlow;
import com.linkedin.datahub.graphql.generated.DataFlowInfo;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import com.linkedin.datahub.graphql.types.common.mappers.OwnershipMapper;
import javax.annotation.Nonnull;
public class DataFlowMapper implements ModelMapper<com.linkedin.datajob.DataFlow, DataFlow> {
public static final DataFlowMapper INSTANCE = new DataFlowMapper();
public static DataFlow map(@Nonnull final com.linkedin.datajob.DataFlow dataflow) {
return INSTANCE.apply(dataflow);
}
@Override
public DataFlow apply(@Nonnull final com.linkedin.datajob.DataFlow dataflow) {
final DataFlow result = new DataFlow();
result.setUrn(dataflow.getUrn().toString());
result.setType(EntityType.DATA_FLOW);
result.setOrchestrator(dataflow.getOrchestrator());
result.setFlowId(dataflow.getFlowId());
result.setCluster(dataflow.getCluster());
if (dataflow.hasInfo()) {
result.setInfo(mapDataFlowInfo(dataflow.getInfo()));
}
if (dataflow.hasOwnership()) {
result.setOwnership(OwnershipMapper.map(dataflow.getOwnership()));
}
return result;
}
private DataFlowInfo mapDataFlowInfo(final com.linkedin.datajob.DataFlowInfo info) {
final DataFlowInfo result = new DataFlowInfo();
result.setName(info.getName());
result.setDescription(info.getDescription());
result.setProject(info.getProject());
return result;
}
}

View File

@ -0,0 +1,105 @@
package com.linkedin.datahub.graphql.types.datajob;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.net.URISyntaxException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AutoCompleteResults;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.SearchResults;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.SearchableEntityType;
import com.linkedin.datahub.graphql.types.mappers.AutoCompleteResultsMapper;
import com.linkedin.datahub.graphql.types.mappers.SearchResultsMapper;
import com.linkedin.datahub.graphql.types.datajob.mappers.DataJobMapper;
import com.linkedin.metadata.query.AutoCompleteResult;
import com.linkedin.datajob.client.DataJobs;
import com.linkedin.restli.common.CollectionResponse;
public class DataJobType implements SearchableEntityType<DataJob> {
private static final Set<String> FACET_FIELDS = ImmutableSet.of("flow");
private static final String DEFAULT_AUTO_COMPLETE_FIELD = "jobId";
private final DataJobs _dataJobsClient;
public DataJobType(final DataJobs dataJobsClient) {
_dataJobsClient = dataJobsClient;
}
@Override
public EntityType type() {
return EntityType.DATA_JOB;
}
@Override
public Class<DataJob> objectClass() {
return DataJob.class;
}
@Override
public List<DataJob> batchLoad(final List<String> urns, final QueryContext context) throws Exception {
final List<DataJobUrn> dataJobUrns = urns.stream()
.map(this::getDataJobUrn)
.collect(Collectors.toList());
try {
final Map<DataJobUrn, com.linkedin.datajob.DataJob> dataJobMap = _dataJobsClient.batchGet(dataJobUrns
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toSet()));
final List<com.linkedin.datajob.DataJob> gmsResults = dataJobUrns.stream()
.map(jobUrn -> dataJobMap.getOrDefault(jobUrn, null)).collect(Collectors.toList());
return gmsResults.stream()
.map(gmsDataJob -> gmsDataJob == null ? null : DataJobMapper.map(gmsDataJob))
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("Failed to batch load DataJobs", e);
}
}
@Override
public SearchResults search(@Nonnull String query,
@Nullable List<FacetFilterInput> filters,
int start,
int count,
@Nonnull final QueryContext context) throws Exception {
final Map<String, String> facetFilters = ResolverUtils.buildFacetFilters(filters, FACET_FIELDS);
final CollectionResponse<com.linkedin.datajob.DataJob> searchResult = _dataJobsClient.search(query, facetFilters, start, count);
return SearchResultsMapper.map(searchResult, DataJobMapper::map);
}
@Override
public AutoCompleteResults autoComplete(@Nonnull String query,
@Nullable String field,
@Nullable List<FacetFilterInput> filters,
int limit,
@Nonnull final QueryContext context) throws Exception {
final Map<String, String> facetFilters = ResolverUtils.buildFacetFilters(filters, FACET_FIELDS);
field = field != null ? field : DEFAULT_AUTO_COMPLETE_FIELD;
final AutoCompleteResult result = _dataJobsClient.autoComplete(query, field, facetFilters, limit);
return AutoCompleteResultsMapper.map(result);
}
private DataJobUrn getDataJobUrn(String urnStr) {
try {
return DataJobUrn.createFromString(urnStr);
} catch (URISyntaxException e) {
throw new RuntimeException(String.format("Failed to retrieve datajob with urn %s, invalid urn", urnStr));
}
}
}

View File

@ -0,0 +1,65 @@
package com.linkedin.datahub.graphql.types.datajob.mappers;
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataJobInfo;
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
import com.linkedin.datahub.graphql.generated.DataFlow;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import com.linkedin.datahub.graphql.types.common.mappers.OwnershipMapper;
import javax.annotation.Nonnull;
import java.util.stream.Collectors;
public class DataJobMapper implements ModelMapper<com.linkedin.datajob.DataJob, DataJob> {
public static final DataJobMapper INSTANCE = new DataJobMapper();
public static DataJob map(@Nonnull final com.linkedin.datajob.DataJob dataJob) {
return INSTANCE.apply(dataJob);
}
@Override
public DataJob apply(@Nonnull final com.linkedin.datajob.DataJob dataJob) {
final DataJob result = new DataJob();
result.setUrn(dataJob.getUrn().toString());
result.setType(EntityType.DATA_JOB);
result.setDataFlow(new DataFlow.Builder().setUrn(dataJob.getDataFlow().toString()).build());
result.setJobId(dataJob.getJobId());
if (dataJob.hasInfo()) {
result.setInfo(mapDataJobInfo(dataJob.getInfo()));
}
if (dataJob.hasInputOutput()) {
result.setInputOutput(mapDataJobInputOutput(dataJob.getInputOutput()));
}
if (dataJob.hasOwnership()) {
result.setOwnership(OwnershipMapper.map(dataJob.getOwnership()));
}
return result;
}
private DataJobInfo mapDataJobInfo(final com.linkedin.datajob.DataJobInfo info) {
final DataJobInfo result = new DataJobInfo();
result.setName(info.getName());
result.setDescription(info.getDescription());
return result;
}
private DataJobInputOutput mapDataJobInputOutput(final com.linkedin.datajob.DataJobInputOutput inputOutput) {
final DataJobInputOutput result = new DataJobInputOutput();
result.setInputDatasets(inputOutput.getInputDatasets().stream().map(urn -> {
final Dataset dataset = new Dataset();
dataset.setUrn(urn.toString());
return dataset;
}).collect(Collectors.toList()));
result.setOutputDatasets(inputOutput.getOutputDatasets().stream().map(urn -> {
final Dataset dataset = new Dataset();
dataset.setUrn(urn.toString());
return dataset;
}).collect(Collectors.toList()));
return result;
}
}

View File

@ -53,6 +53,14 @@ enum EntityType {
The MLModel Entity
"""
MLMODEL
"""
The DATA_FLOW Entity
"""
DATA_FLOW
"""
The DATA_JOB Entity
"""
DATA_JOB
}
type Query {
@ -85,6 +93,16 @@ type Query {
"""
mlModel(urn: String!): MLModel
"""
Fetch a DataFlow by primary key
"""
dataFlow(urn: String!): DataFlow
"""
Fetch a DataJob by primary key
"""
dataJob(urn: String!): DataJob
"""
Search DataHub entities
"""
@ -1939,3 +1957,119 @@ type Deprecation {
"""
actor: String
}
type DataFlow implements Entity {
"""
The unique flow URN
"""
urn: String!
"""
GMS Entity Type
"""
type: EntityType!
"""
Workflow orchestrator ex: Azkaban, Airflow
"""
orchestrator: String!
"""
Id of the flow
"""
flowId: String!
"""
Cluster of the flow
"""
cluster: String!
"""
Information about a Data processing flow
"""
info: DataFlowInfo
"""
Ownership metadata of the flow
"""
ownership: Ownership
}
type DataFlowInfo {
"""
Name of the flow
"""
name: String!
"""
Description of the flow
"""
description: String
"""
Optional project/namespace associated with the flow
"""
project: String
}
type DataJob implements Entity {
"""
The unique job URN
"""
urn: String!
"""
GMS Entity Type
"""
type: EntityType!
"""
The associated data flow
"""
dataFlow: DataFlow!
"""
Id of the job
"""
jobId: String!
"""
Ownership metadata of the job
"""
ownership: Ownership
"""
Information about the inputs and outputs of a Data processing job
"""
inputOutput: DataJobInputOutput
"""
Information about a Data processing job
"""
info: DataJobInfo
}
type DataJobInfo {
"""
Job name
"""
name: String!
"""
Job description
"""
description: String
}
type DataJobInputOutput {
"""
Input datasets produced by the data job during processing
"""
inputDatasets: [Dataset!]
"""
Output datasets produced by the data job during processing
"""
outputDatasets: [Dataset!]
}