Implemented data process search feature (#1706)

* implement search feature

* add test for dataprocessIndexBuilder; refactor code based on feedback

* update based on PR feedback

* Update DataProcessDocument.pdl

fixed typo wording.

* add not null check for data process info
This commit is contained in:
Liangjun Jiang 2020-06-29 12:20:22 -05:00 committed by GitHub
parent 2dc11a51f4
commit 5d078aa617
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 327 additions and 104 deletions

View File

@ -1,7 +1,6 @@
{
"settings": {
"index": {
"max_ngram_diff": "50",
"analysis": {
"filter": {
"autocomplete_filter": {
@ -140,6 +139,7 @@
}
},
"mappings": {
"doc": {
"properties": {
"browsePaths": {
"type": "text",
@ -166,26 +166,7 @@
"type": "boolean"
},
"name": {
"type": "keyword",
"fields": {
"data_process_pattern_ngram": {
"type": "text",
"analyzer": "data_process_pattern_ngram"
},
"delimited": {
"type": "text",
"analyzer": "delimit"
},
"ngram": {
"type": "text",
"analyzer": "custom_ngram"
},
"pattern": {
"type": "text",
"analyzer": "data_process_pattern"
}
},
"normalizer": "my_normalizer"
"type": "keyword"
},
"num_inputs": {
"type": "long"
@ -227,4 +208,5 @@
}
}
}
}
}

View File

@ -10,7 +10,7 @@
"type" : "com.linkedin.dataprocess.DataProcessKey",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
"supports" : [ "batch_get", "get" ],
"supports" : [ "batch_get", "get", "get_all" ],
"methods" : [ {
"method" : "get",
"parameters" : [ {
@ -25,6 +25,22 @@
"type" : "{ \"type\" : \"array\", \"items\" : \"string\" }",
"default" : "[]"
} ]
}, {
"method" : "get_all",
"parameters" : [ {
"name" : "aspects",
"type" : "{ \"type\" : \"array\", \"items\" : \"string\" }",
"default" : "[]"
}, {
"name" : "filter",
"type" : "com.linkedin.metadata.query.Filter",
"optional" : true
}, {
"name" : "sort",
"type" : "com.linkedin.metadata.query.SortCriterion",
"optional" : true
} ],
"pagingSupported" : true
} ],
"finders" : [ {
"name" : "search",
@ -50,6 +66,22 @@
"pagingSupported" : true
} ],
"actions" : [ {
"name" : "autocomplete",
"parameters" : [ {
"name" : "query",
"type" : "string"
}, {
"name" : "field",
"type" : "string"
}, {
"name" : "filter",
"type" : "com.linkedin.metadata.query.Filter"
}, {
"name" : "limit",
"type" : "int"
} ],
"returns" : "com.linkedin.metadata.query.AutoCompleteResult"
}, {
"name" : "backfill",
"parameters" : [ {
"name" : "urn",

View File

@ -204,7 +204,7 @@
"fields" : [ {
"name" : "orchestrator",
"type" : "string",
"doc" : "Standardized orchestration platform urn where process is running. It can be Azure Data Factory or Sqoop script",
"doc" : "Standardized orchestration platform where process is running. It can be Azure Data Factory or Sqoop script",
"validate" : {
"strlen" : {
"max" : 500,
@ -276,6 +276,23 @@
},
"doc" : "List of aggregations showing the number of documents falling into each bucket. e.g, for platform aggregation, the bucket can be hive, kafka, etc"
} ]
}, {
"type" : "record",
"name" : "AutoCompleteResult",
"namespace" : "com.linkedin.metadata.query",
"doc" : "The model for the auto complete result",
"fields" : [ {
"name" : "query",
"type" : "string",
"doc" : "The original chars typed by user"
}, {
"name" : "suggestions",
"type" : {
"type" : "array",
"items" : "string"
},
"doc" : "A list of typeahead suggestions"
} ]
}, {
"type" : "enum",
"name" : "Condition",
@ -406,7 +423,7 @@
"type" : "com.linkedin.dataprocess.DataProcessKey",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
"supports" : [ "batch_get", "get" ],
"supports" : [ "batch_get", "get", "get_all" ],
"methods" : [ {
"method" : "get",
"parameters" : [ {
@ -421,6 +438,22 @@
"type" : "{ \"type\" : \"array\", \"items\" : \"string\" }",
"default" : "[]"
} ]
}, {
"method" : "get_all",
"parameters" : [ {
"name" : "aspects",
"type" : "{ \"type\" : \"array\", \"items\" : \"string\" }",
"default" : "[]"
}, {
"name" : "filter",
"type" : "com.linkedin.metadata.query.Filter",
"optional" : true
}, {
"name" : "sort",
"type" : "com.linkedin.metadata.query.SortCriterion",
"optional" : true
} ],
"pagingSupported" : true
} ],
"finders" : [ {
"name" : "search",
@ -446,6 +479,22 @@
"pagingSupported" : true
} ],
"actions" : [ {
"name" : "autocomplete",
"parameters" : [ {
"name" : "query",
"type" : "string"
}, {
"name" : "field",
"type" : "string"
}, {
"name" : "filter",
"type" : "com.linkedin.metadata.query.Filter"
}, {
"name" : "limit",
"type" : "int"
} ],
"returns" : "com.linkedin.metadata.query.AutoCompleteResult"
}, {
"name" : "backfill",
"parameters" : [ {
"name" : "urn",

View File

@ -3,9 +3,12 @@ package com.linkedin.dataprocess.client;
import com.linkedin.common.urn.DataProcessUrn;
import com.linkedin.dataprocess.DataProcess;
import com.linkedin.dataprocess.DataProcessInfoRequestBuilders;
import com.linkedin.dataprocess.DataProcessesDoAutocompleteRequestBuilder;
import com.linkedin.dataprocess.DataProcessesFindBySearchRequestBuilder;
import com.linkedin.dataprocess.DataProcessesRequestBuilders;
import com.linkedin.dataprocess.DataProcessKey;
import com.linkedin.dataprocess.DataProcessInfo;
import com.linkedin.metadata.configs.DataProcessSearchConfig;
import com.linkedin.metadata.query.AutoCompleteResult;
import com.linkedin.metadata.query.SortCriterion;
import com.linkedin.metadata.restli.BaseClient;
@ -25,11 +28,15 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static com.linkedin.metadata.dao.utils.QueryUtils.newFilter;
public class DataProcesses extends BaseClient implements SearchableClient<DataProcess> {
private static final DataProcessesRequestBuilders PROCESSES_REQUEST_BUILDERS = new DataProcessesRequestBuilders();
private static final DataProcessInfoRequestBuilders PROCESS_INFO_REQUEST_BUILDERS = new DataProcessInfoRequestBuilders();
private static final DataProcessesRequestBuilders DATA_PROCESSES_REQUEST_BUILDERS = new DataProcessesRequestBuilders();
private static final DataProcessInfoRequestBuilders DATA_PROCESS_INFO_REQUEST_BUILDERS = new DataProcessInfoRequestBuilders();
private static final DataProcessSearchConfig DATA_PROCESS_SEARCH_CONFIG = new DataProcessSearchConfig();
protected DataProcesses(@Nonnull Client restliClient) {
super(restliClient);
@ -38,7 +45,7 @@ public class DataProcesses extends BaseClient implements SearchableClient<DataPr
@Nonnull
public DataProcess get(@Nonnull DataProcessUrn urn)
throws RemoteInvocationException {
GetRequest<DataProcess> getRequest = PROCESSES_REQUEST_BUILDERS.get()
GetRequest<DataProcess> getRequest = DATA_PROCESSES_REQUEST_BUILDERS.get()
.id(new ComplexResourceKey<>(toDataProcessKey(urn), new EmptyRecord()))
.build();
@ -49,7 +56,7 @@ public class DataProcesses extends BaseClient implements SearchableClient<DataPr
public Map<DataProcessUrn, DataProcess> batchGet(@Nonnull Set<DataProcessUrn> urns)
throws RemoteInvocationException {
BatchGetEntityRequest<ComplexResourceKey<DataProcessKey, EmptyRecord>, DataProcess> batchGetRequest
= PROCESSES_REQUEST_BUILDERS.batchGet()
= DATA_PROCESSES_REQUEST_BUILDERS.batchGet()
.ids(urns.stream().map(this::getKeyFromUrn).collect(Collectors.toSet()))
.build();
@ -62,7 +69,7 @@ public class DataProcesses extends BaseClient implements SearchableClient<DataPr
public void createDataProcessInfo(@Nonnull DataProcessUrn dataProcessUrn,
@Nonnull DataProcessInfo dataProcessInfo) throws RemoteInvocationException {
CreateIdRequest<Long, DataProcessInfo> request = PROCESS_INFO_REQUEST_BUILDERS.create()
CreateIdRequest<Long, DataProcessInfo> request = DATA_PROCESS_INFO_REQUEST_BUILDERS.create()
.dataprocessKey(new ComplexResourceKey<>(toDataProcessKey(dataProcessUrn), new EmptyRecord()))
.input(dataProcessInfo)
.build();
@ -93,19 +100,35 @@ public class DataProcesses extends BaseClient implements SearchableClient<DataPr
@Override
public CollectionResponse<DataProcess> search(@Nonnull String input, @Nullable Map<String, String> requestFilters,
@Nullable SortCriterion sortCriterion, int start, int count) throws RemoteInvocationException {
throw new UnsupportedOperationException(
String.format("%s doesn't support search feature yet,",
this.getClass().getName())
);
DataProcessesFindBySearchRequestBuilder requestBuilder = DATA_PROCESSES_REQUEST_BUILDERS
.findBySearch()
.inputParam(input)
.sortParam(sortCriterion)
.paginate(start, count);
if (requestFilters != null) {
requestBuilder.filterParam(newFilter(requestFilters));
}
return _client.sendRequest(requestBuilder.build()).getResponse().getEntity();
}
@Nonnull
public CollectionResponse<DataProcess> search(@Nonnull String input, int start, int count)
throws RemoteInvocationException {
return search(input, null, null, start, count);
}
@Nonnull
@Override
public AutoCompleteResult autocomplete(@Nonnull String query, @Nullable String field, @Nullable Map<String, String> requestFilters, int limit)
public AutoCompleteResult autocomplete(@Nonnull String query, @Nullable String field, @Nonnull Map<String, String> requestFilters, int limit)
throws RemoteInvocationException {
throw new UnsupportedOperationException(
String.format("%s doesn't support auto completion feature yet,",
this.getClass().getName())
);
final String autocompleteField = (field != null) ? field : DATA_PROCESS_SEARCH_CONFIG.getDefaultAutocompleteField();
DataProcessesDoAutocompleteRequestBuilder requestBuilder = DATA_PROCESSES_REQUEST_BUILDERS
.actionAutocomplete()
.queryParam(query)
.fieldParam(autocompleteField)
.filterParam(newFilter(requestFilters))
.limitParam(limit);
return _client.sendRequest(requestBuilder.build()).getResponse().getEntity();
}
}

View File

@ -8,6 +8,7 @@ import com.linkedin.metadata.aspect.DataProcessAspect;
import com.linkedin.metadata.dao.BaseLocalDAO;
import com.linkedin.metadata.dao.BaseSearchDAO;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.query.AutoCompleteResult;
import com.linkedin.metadata.query.Filter;
import com.linkedin.metadata.query.SearchResultMetadata;
import com.linkedin.metadata.query.SortCriterion;
@ -151,6 +152,15 @@ public class DataProcesses extends BaseSearchableEntityResource<
return super.batchGet(keys, aspectNames);
}
@RestMethod.GetAll
@Nonnull
public Task<List<DataProcess>> getAll(@PagingContextParam @Nonnull PagingContext pagingContext,
@QueryParam(PARAM_ASPECTS) @Optional("[]") @Nonnull String[] aspectNames,
@QueryParam(PARAM_FILTER) @Optional @Nullable Filter filter,
@QueryParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion) {
return super.getAll(pagingContext, aspectNames, filter, sortCriterion);
}
@Finder(FINDER_SEARCH)
@Override
@Nonnull
@ -162,6 +172,15 @@ public class DataProcesses extends BaseSearchableEntityResource<
return super.search(input, aspectNames, filter, sortCriterion, pagingContext);
}
@Action(name = ACTION_AUTOCOMPLETE)
@Override
@Nonnull
public Task<AutoCompleteResult> autocomplete(@ActionParam(PARAM_QUERY) @Nonnull String query,
@ActionParam(PARAM_FIELD) @Nullable String field, @ActionParam(PARAM_FILTER) @Nullable Filter filter,
@ActionParam(PARAM_LIMIT) int limit) {
return super.autocomplete(query, field, filter, limit);
}
@Action(name = ACTION_INGEST)
@Override
@Nonnull

View File

@ -1,8 +1,15 @@
package com.linkedin.metadata.builders.search;
import com.linkedin.common.Ownership;
import com.linkedin.common.Status;
import com.linkedin.common.urn.DataProcessUrn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringArray;
import com.linkedin.dataprocess.DataProcessInfo;
import com.linkedin.metadata.search.DataProcessDocument;
import com.linkedin.metadata.snapshot.DataProcessSnapshot;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nonnull;
@ -17,13 +24,70 @@ public class DataProcessIndexBuilder extends BaseIndexBuilder<DataProcessDocumen
super(Collections.singletonList(DataProcessSnapshot.class), DataProcessDocument.class);
}
@Nonnull
private static String buildBrowsePath(@Nonnull DataProcessUrn urn) {
return ("/" + urn.getOriginEntity() + "/" + urn.getOrchestrator() + "/" + urn.getNameEntity())
.replace('.', '/').toLowerCase();
}
@Nonnull
private static DataProcessDocument setUrnDerivedFields(@Nonnull DataProcessUrn urn) {
return new DataProcessDocument()
.setName(urn.getNameEntity())
.setOrchestrator(urn.getOrchestrator())
.setUrn(urn)
.setBrowsePaths(new StringArray(Collections.singletonList(buildBrowsePath(urn))));
}
@Nonnull
private DataProcessDocument getDocumentToUpdateFromAspect(@Nonnull DataProcessUrn urn, @Nonnull Ownership ownership) {
final StringArray owners = BuilderUtils.getCorpUserOwners(ownership);
return setUrnDerivedFields(urn)
.setHasOwners(!owners.isEmpty())
.setOwners(owners);
}
@Nonnull
private DataProcessDocument getDocumentToUpdateFromAspect(@Nonnull DataProcessUrn urn,
@Nonnull DataProcessInfo dataProcessInfo) {
DataProcessDocument dataProcessDocument = setUrnDerivedFields(urn);
if (dataProcessInfo.getInputs() != null) {
dataProcessDocument.setInputs(dataProcessInfo.getInputs())
.setNumInputDatasets(dataProcessInfo.getInputs().size());
}
if (dataProcessInfo.getOutputs() != null) {
dataProcessDocument.setOutputs(dataProcessInfo.getOutputs())
.setNumOutputDatasets(dataProcessInfo.getOutputs().size());
}
return dataProcessDocument;
}
@Nonnull
private DataProcessDocument getDocumentToUpdateFromAspect(@Nonnull DataProcessUrn urn, @Nonnull Status status) {
return setUrnDerivedFields(urn)
.setRemoved(status.isRemoved());
}
@Nonnull
private List<DataProcessDocument> getDocumentsToUpdateFromSnapshotType(@Nonnull DataProcessSnapshot dataProcessSnapshot) {
DataProcessUrn urn = dataProcessSnapshot.getUrn();
return dataProcessSnapshot.getAspects().stream().map(aspect -> {
if (aspect.isDataProcessInfo()) {
return getDocumentToUpdateFromAspect(urn, aspect.getDataProcessInfo());
} else if (aspect.isOwnership()) {
return getDocumentToUpdateFromAspect(urn, aspect.getOwnership());
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
}
@Nullable
@Override
public List<DataProcessDocument> getDocumentsToUpdate(@Nonnull RecordTemplate snapshot) {
throw new UnsupportedOperationException(
String.format("%s doesn't support this feature yet,",
this.getClass().getName())
);
public List<DataProcessDocument> getDocumentsToUpdate(@Nonnull RecordTemplate genericSnapshot) {
if (genericSnapshot instanceof DataProcessSnapshot) {
return getDocumentsToUpdateFromSnapshotType((DataProcessSnapshot) genericSnapshot);
}
return Collections.emptyList();
}
@Nonnull

View File

@ -20,7 +20,7 @@ import static org.testng.Assert.*;
public class CorpGroupIndexBuilderTest {
@Test
public void testGetDocumentsToUpdateFromDatasetSnapshot() {
public void testGetDocumentsToUpdateFromCorpGroupSnapshot() {
CorpGroupUrn corpGroupUrn = new CorpGroupUrn("foo");
CorpGroupSnapshot corpGroupSnapshot = new CorpGroupSnapshot().setUrn(corpGroupUrn).setAspects(new CorpGroupAspectArray());
String groupName = "bar";

View File

@ -18,7 +18,7 @@ import static org.testng.Assert.*;
public class CorpUserInfoIndexBuilderTest {
@Test
public void testGetDocumentsToUpdateFromDatasetSnapshot() {
public void testGetDocumentsToUpdateFromCorpUserSnapshot() {
String testerLdap = "fooBar";
CorpuserUrn corpuserUrn = new CorpuserUrn(testerLdap);
CorpUserSnapshot corpUserSnapshot =

View File

@ -0,0 +1,48 @@
package com.linkedin.metadata.builders.search;
import com.linkedin.common.DatasetUrnArray;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DataProcessUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.dataprocess.DataProcessInfo;
import com.linkedin.metadata.aspect.DataProcessAspect;
import com.linkedin.metadata.aspect.DataProcessAspectArray;
import com.linkedin.metadata.search.DataProcessDocument;
import com.linkedin.metadata.snapshot.DataProcessSnapshot;
import java.util.List;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class DataProcessIndexBuilderTest {
@Test
public void testGetDocumentsToUpdateFromDataProcessSnapshot() {
DataProcessUrn dataProcessUrn = new DataProcessUrn("Azure Data Factory", "ADFJob1", FabricType.PROD);
DataProcessInfo dataProcessInfo = new DataProcessInfo();
DatasetUrn inputDatasetUrn = new DatasetUrn(new DataPlatformUrn("HIVE"), "SampleInputDataset", FabricType.DEV);
DatasetUrnArray inputs = new DatasetUrnArray();
inputs.add(inputDatasetUrn);
dataProcessInfo.setInputs(inputs);
DatasetUrn outputDatasetUrn = new DatasetUrn(new DataPlatformUrn("HIVE"), "SampleOutputDataset", FabricType.DEV);
DatasetUrnArray outputs = new DatasetUrnArray();
outputs.add(outputDatasetUrn);
dataProcessInfo.setOutputs(outputs);
DataProcessAspect dataProcessAspect = new DataProcessAspect();
dataProcessAspect.setDataProcessInfo(dataProcessInfo);
DataProcessAspectArray dataProcessAspectArray = new DataProcessAspectArray();
dataProcessAspectArray.add(dataProcessAspect);
DataProcessSnapshot dataProcessSnapshot =
new DataProcessSnapshot().setUrn(dataProcessUrn).setAspects(dataProcessAspectArray);
List<DataProcessDocument> actualDocs = new DataProcessIndexBuilder().getDocumentsToUpdate(dataProcessSnapshot);
assertEquals(actualDocs.size(), 1);
assertEquals(actualDocs.get(0).getUrn(), dataProcessUrn);
assertEquals(actualDocs.get(0).getInputs().get(0), inputDatasetUrn);
assertEquals(actualDocs.get(0).getOutputs().get(0), outputDatasetUrn);
}
}

View File

@ -2,6 +2,7 @@ namespace com.linkedin.metadata.search
import com.linkedin.common.DataProcessUrn
import com.linkedin.common.DatasetUrn
import com.linkedin.common.FabricType
/**
* Data model for data process entity search
@ -23,6 +24,11 @@ record DataProcessDocument includes BaseDocument {
*/
orchestrator: optional string
/**
* Fabric type where data process belongs to or where it was generated
*/
origin: optional FabricType
/**
* LDAP usernames of corp users who are the owners of this process
*/