Fix(search): fix datajob and dataflow search mappings (#2418)

This commit is contained in:
Fredrik Sannholm 2021-04-21 22:04:20 +03:00 committed by GitHub
parent ffe49f061a
commit e88a671959
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 292 additions and 68 deletions

View File

@ -11,15 +11,6 @@
"analyzer": "custom_browse_slash",
"fielddata": true
},
"access": {
"type": "keyword",
"fields": {
"ngram": {
"type": "text",
"analyzer": "delimit_edgengram"
}
}
},
"description": {
"type": "text",
"fields": {
@ -38,10 +29,17 @@
},
"analyzer": "comma_pattern"
},
"queryType": {
"type": "keyword"
"orchestrator": {
"type": "keyword",
"fields": {
"ngram": {
"type": "text",
"analyzer": "custom_ngram"
}
},
"normalizer": "custom_normalizer"
},
"title": {
"name": {
"type": "keyword",
"fields": {
"delimited": {
@ -55,7 +53,7 @@
},
"normalizer": "custom_normalizer"
},
"tool": {
"project": {
"type": "keyword",
"fields": {
"ngram": {
@ -64,8 +62,26 @@
}
}
},
"type": {
"type": "keyword"
"cluster": {
"type": "keyword",
"fields": {
"ngram": {
"type": "text",
"analyzer": "delimit_edgengram"
}
}
},
"flowId": {
"type": "keyword",
"fields": {
"ngram": {
"type": "text",
"analyzer": "delimit_edgengram"
}
}
},
"hasOwners": {
"type": "boolean"
},
"urn": {
"type": "keyword",

View File

@ -40,6 +40,13 @@
"type": "custom",
"tokenizer": "comma_tokenizer"
},
"custom_ngram": {
"filter": [
"lowercase"
],
"type": "custom",
"tokenizer": "custom_ngram"
},
"comma_pattern_ngram": {
"filter": [
"lowercase",
@ -91,6 +98,11 @@
"comma_tokenizer": {
"pattern": ",",
"type": "pattern"
},
"custom_ngram": {
"type": "ngram",
"min_gram": "3",
"max_gram": "20"
}
}
}

View File

@ -11,15 +11,6 @@
"analyzer": "custom_browse_slash",
"fielddata": true
},
"access": {
"type": "keyword",
"fields": {
"ngram": {
"type": "text",
"analyzer": "delimit_edgengram"
}
}
},
"description": {
"type": "text",
"fields": {
@ -38,10 +29,7 @@
},
"analyzer": "comma_pattern"
},
"queryType": {
"type": "keyword"
},
"title": {
"name": {
"type": "keyword",
"fields": {
"delimited": {
@ -55,7 +43,16 @@
},
"normalizer": "custom_normalizer"
},
"tool": {
"hasOwners": {
"type": "boolean"
},
"numInputDatasets": {
"type": "long"
},
"numOutputDatasets": {
"type": "long"
},
"jobId": {
"type": "keyword",
"fields": {
"ngram": {
@ -64,12 +61,26 @@
}
}
},
"type": {
"type": "keyword"
"dataFlow": {
"type": "keyword",
"fields": {
"ngram": {
"type": "text",
"analyzer": "delimit_edgengram"
}
}
},
"urn": {
"type": "keyword",
"normalizer": "custom_normalizer"
},
"inputs": {
"type": "keyword",
"normalizer": "custom_normalizer"
},
"outputs": {
"type": "keyword",
"normalizer": "custom_normalizer"
}
}
}

View File

@ -14,7 +14,6 @@ import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DataFlowIndexBuilder extends BaseIndexBuilder<DataFlowDocument> {
public DataFlowIndexBuilder() {
@ -23,27 +22,23 @@ public class DataFlowIndexBuilder extends BaseIndexBuilder<DataFlowDocument> {
@Nonnull
private static String buildBrowsePath(@Nonnull DataFlowUrn urn) {
return ("/" + urn.getOrchestratorEntity() + "/" + urn.getClusterEntity() + "/" + urn.getFlowIdEntity()).toLowerCase();
return ("/" + urn.getOrchestratorEntity() + "/" + urn.getClusterEntity() + "/" + urn.getFlowIdEntity())
.toLowerCase();
}
@Nonnull
private static DataFlowDocument setUrnDerivedFields(@Nonnull DataFlowUrn urn) {
return new DataFlowDocument()
.setUrn(urn)
.setOrchestrator(urn.getOrchestratorEntity())
.setFlowId(urn.getFlowIdEntity())
.setCluster(urn.getClusterEntity())
return new DataFlowDocument().setUrn(urn).setOrchestrator(urn.getOrchestratorEntity())
.setFlowId(urn.getFlowIdEntity()).setCluster(urn.getClusterEntity())
.setBrowsePaths(new StringArray(Collections.singletonList(buildBrowsePath(urn))));
}
@Nonnull
private DataFlowDocument getDocumentToUpdateFromAspect(@Nonnull DataFlowUrn urn,
@Nonnull DataFlowInfo info) {
final DataFlowDocument document = setUrnDerivedFields(urn);
private DataFlowDocument getDocumentToUpdateFromAspect(@Nonnull DataFlowUrn urn, @Nonnull DataFlowInfo info) {
final DataFlowDocument document = new DataFlowDocument().setUrn(urn);
document.setName(info.getName());
if (info.getDescription() != null) {
document.setDescription(info.getDescription());
document.setDescription(info.getDescription());
}
if (info.getProject() != null) {
document.setProject(info.getProject());
@ -52,16 +47,15 @@ public class DataFlowIndexBuilder extends BaseIndexBuilder<DataFlowDocument> {
}
@Nonnull
private DataFlowDocument getDocumentToUpdateFromAspect(@Nonnull DataFlowUrn urn,
@Nonnull Ownership ownership) {
return setUrnDerivedFields(urn)
.setOwners(BuilderUtils.getCorpUserOwners(ownership));
private DataFlowDocument getDocumentToUpdateFromAspect(@Nonnull DataFlowUrn urn, @Nonnull Ownership ownership) {
final StringArray owners = BuilderUtils.getCorpUserOwners(ownership);
return new DataFlowDocument().setUrn(urn).setHasOwners(!owners.isEmpty()).setOwners(owners);
}
@Nonnull
private List<DataFlowDocument> getDocumentsToUpdateFromSnapshotType(@Nonnull DataFlowSnapshot snapshot) {
DataFlowUrn urn = snapshot.getUrn();
return snapshot.getAspects().stream().map(aspect -> {
final List<DataFlowDocument> documents = snapshot.getAspects().stream().map(aspect -> {
if (aspect.isDataFlowInfo()) {
return getDocumentToUpdateFromAspect(urn, aspect.getDataFlowInfo());
} else if (aspect.isOwnership()) {
@ -69,6 +63,8 @@ public class DataFlowIndexBuilder extends BaseIndexBuilder<DataFlowDocument> {
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
documents.add(setUrnDerivedFields(urn));
return documents;
}
@Nonnull

View File

@ -3,6 +3,7 @@ package com.linkedin.metadata.builders.search;
import com.linkedin.common.Ownership;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.datajob.DataJobInfo;
import com.linkedin.datajob.DataJobInputOutput;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringArray;
import com.linkedin.metadata.search.DataJobDocument;
@ -14,7 +15,6 @@ import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DataJobIndexBuilder extends BaseIndexBuilder<DataJobDocument> {
public DataJobIndexBuilder() {
@ -23,48 +23,60 @@ public class DataJobIndexBuilder extends BaseIndexBuilder<DataJobDocument> {
@Nonnull
private static String buildBrowsePath(@Nonnull DataJobUrn urn) {
return ("/" + urn.getFlowEntity().getFlowIdEntity() + "/" + urn.getJobIdEntity()).toLowerCase();
return ("/" + urn.getFlowEntity().getFlowIdEntity() + "/" + urn.getJobIdEntity()).toLowerCase();
}
@Nonnull
private static DataJobDocument setUrnDerivedFields(@Nonnull DataJobUrn urn) {
return new DataJobDocument()
.setUrn(urn)
.setDataFlow(urn.getFlowEntity().getFlowIdEntity())
return new DataJobDocument().setUrn(urn).setDataFlow(urn.getFlowEntity().getFlowIdEntity())
.setJobId(urn.getJobIdEntity())
.setBrowsePaths(new StringArray(Collections.singletonList(buildBrowsePath(urn))));
}
@Nonnull
private DataJobDocument getDocumentToUpdateFromAspect(@Nonnull DataJobUrn urn,
@Nonnull DataJobInfo info) {
final DataJobDocument document = setUrnDerivedFields(urn);
private DataJobDocument getDocumentToUpdateFromAspect(@Nonnull DataJobUrn urn, @Nonnull DataJobInfo info) {
final DataJobDocument document = new DataJobDocument().setUrn(urn);
document.setName(info.getName());
if (info.getDescription() != null) {
document.setDescription(info.getDescription());
document.setDescription(info.getDescription());
}
return document;
}
@Nonnull
private DataJobDocument getDocumentToUpdateFromAspect(@Nonnull DataJobUrn urn,
@Nonnull Ownership ownership) {
return setUrnDerivedFields(urn)
.setOwners(BuilderUtils.getCorpUserOwners(ownership)); // TODO: should be optional?
@Nonnull DataJobInputOutput inputOutput) {
final DataJobDocument document = new DataJobDocument().setUrn(urn);
if (inputOutput.getInputDatasets() != null) {
document.setInputs(inputOutput.getInputDatasets()).setNumInputDatasets(inputOutput.getInputDatasets().size());
}
if (inputOutput.getOutputDatasets() != null) {
document.setOutputs(inputOutput.getOutputDatasets()).setNumOutputDatasets(inputOutput.getInputDatasets().size());
}
return document;
}
@Nonnull
private DataJobDocument getDocumentToUpdateFromAspect(@Nonnull DataJobUrn urn, @Nonnull Ownership ownership) {
final StringArray owners = BuilderUtils.getCorpUserOwners(ownership);
return new DataJobDocument().setUrn(urn).setHasOwners(!owners.isEmpty()).setOwners(owners);
}
@Nonnull
private List<DataJobDocument> getDocumentsToUpdateFromSnapshotType(@Nonnull DataJobSnapshot snapshot) {
DataJobUrn urn = snapshot.getUrn();
return snapshot.getAspects().stream().map(aspect -> {
final List<DataJobDocument> documents = snapshot.getAspects().stream().map(aspect -> {
if (aspect.isDataJobInfo()) {
return getDocumentToUpdateFromAspect(urn, aspect.getDataJobInfo());
} else if (aspect.isDataJobInputOutput()) {
return getDocumentToUpdateFromAspect(urn, aspect.getDataJobInputOutput());
} else if (aspect.isOwnership()) {
return getDocumentToUpdateFromAspect(urn, aspect.getOwnership());
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
documents.add(setUrnDerivedFields(urn));
return documents;
}
@Nonnull

View File

@ -0,0 +1,29 @@
package com.linkedin.metadata.builders.common;
import com.linkedin.metadata.aspect.DataFlowAspect;
import com.linkedin.datajob.DataFlowInfo;
import static com.linkedin.metadata.testing.Owners.*;
import static com.linkedin.metadata.testing.Urns.*;
public class DataFlowTestUtils {
private DataFlowTestUtils() {
// Util class should not have public constructor
}
public static DataFlowAspect makeOwnershipAspect() {
DataFlowAspect aspect = new DataFlowAspect();
aspect.setOwnership(makeOwnership("fooUser"));
return aspect;
}
public static DataFlowAspect makeDataFlowInfoAspect() {
DataFlowInfo dataFlowInfo = new DataFlowInfo();
dataFlowInfo.setName("Flow number 1");
dataFlowInfo.setDescription("A description");
dataFlowInfo.setProject("Lost cause");
DataFlowAspect aspect = new DataFlowAspect();
aspect.setDataFlowInfo(dataFlowInfo);
return aspect;
}
}

View File

@ -1,13 +1,13 @@
package com.linkedin.metadata.builders.common;
import com.linkedin.metadata.aspect.DataJobAspect;
import com.linkedin.datajob.DataJobInputOutput;
import com.linkedin.datajob.DataJobInfo;
import com.linkedin.common.DatasetUrnArray;
import com.linkedin.common.urn.DatasetUrn;
import static com.linkedin.metadata.testing.Owners.*;
import static com.linkedin.metadata.testing.Urns.*;
import static com.linkedin.metadata.utils.AuditStamps.*;
public class DataJobTestUtils {
@ -21,8 +21,28 @@ public class DataJobTestUtils {
DatasetUrn output1 = makeDatasetUrn("output1");
DatasetUrn output2 = makeDatasetUrn("output2");
return new DataJobInputOutput()
.setInputDatasets(new DatasetUrnArray(input1, input2))
return new DataJobInputOutput().setInputDatasets(new DatasetUrnArray(input1, input2))
.setOutputDatasets(new DatasetUrnArray(output1, output2));
}
}
public static DataJobAspect makeDataJobInputOutputAspect() {
DataJobAspect aspect = new DataJobAspect();
aspect.setDataJobInputOutput(makeDataJobInputOutput());
return aspect;
}
public static DataJobAspect makeOwnershipAspect() {
DataJobAspect aspect = new DataJobAspect();
aspect.setOwnership(makeOwnership("fooUser"));
return aspect;
}
public static DataJobAspect makeDataJobInfoAspect() {
DataJobInfo dataJobInfo = new DataJobInfo();
dataJobInfo.setName("You had one Job");
dataJobInfo.setDescription("A Job for one");
DataJobAspect aspect = new DataJobAspect();
aspect.setDataJobInfo(dataJobInfo);
return aspect;
}
}

View File

@ -0,0 +1,44 @@
package com.linkedin.metadata.builders.search;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.datajob.DataFlowInfo;
import com.linkedin.metadata.aspect.DataFlowAspectArray;
import com.linkedin.metadata.search.DataFlowDocument;
import com.linkedin.metadata.snapshot.DataFlowSnapshot;
import java.util.List;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import static com.linkedin.metadata.builders.common.DataFlowTestUtils.*;
public class DataFlowIndexBuilderTest {
@Test
public void testGetDocumentsToUpdateFromDataFlowSnapshot() {
DataFlowUrn dataFlowUrn = new DataFlowUrn("airflow", "flow1", "main");
DataFlowInfo dataFlowInfo = new DataFlowInfo();
DataFlowAspectArray dataFlowAspectArray = new DataFlowAspectArray();
dataFlowAspectArray.add(makeDataFlowInfoAspect());
dataFlowAspectArray.add(makeOwnershipAspect());
DataFlowSnapshot dataFlowSnapshot = new DataFlowSnapshot().setUrn(dataFlowUrn).setAspects(dataFlowAspectArray);
List<DataFlowDocument> actualDocs = new DataFlowIndexBuilder().getDocumentsToUpdate(dataFlowSnapshot);
assertEquals(actualDocs.size(), 3);
assertEquals(actualDocs.get(0).getName(), "Flow number 1");
assertEquals(actualDocs.get(0).getDescription(), "A description");
assertEquals(actualDocs.get(0).getProject(), "Lost cause");
assertEquals(actualDocs.get(0).getUrn(), dataFlowUrn);
assertEquals(actualDocs.get(1).getOwners().size(), 1);
assertEquals(actualDocs.get(1).getOwners().get(0), "fooUser");
assertTrue(actualDocs.get(1).hasHasOwners());
assertEquals(actualDocs.get(1).getUrn(), dataFlowUrn);
assertEquals(actualDocs.get(2).getOrchestrator(), "airflow");
assertEquals(actualDocs.get(2).getFlowId(), "flow1");
assertEquals(actualDocs.get(2).getCluster(), "main");
assertEquals(actualDocs.get(2).getUrn(), dataFlowUrn);
}
}

View File

@ -0,0 +1,50 @@
package com.linkedin.metadata.builders.search;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.metadata.aspect.DataJobAspectArray;
import com.linkedin.metadata.search.DataJobDocument;
import com.linkedin.metadata.snapshot.DataJobSnapshot;
import java.util.List;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import static com.linkedin.metadata.builders.common.DataJobTestUtils.*;
public class DataJobIndexBuilderTest {
@Test
public void testGetDocumentsToUpdateFromDataJobSnapshot() {
DataFlowUrn dataFlowUrn = new DataFlowUrn("airflow", "flow1", "main");
DataJobUrn dataJobUrn = new DataJobUrn(dataFlowUrn, "you_had_one_job");
DataJobAspectArray dataJobAspectArray = new DataJobAspectArray();
dataJobAspectArray.add(makeDataJobInputOutputAspect());
dataJobAspectArray.add(makeDataJobInfoAspect());
dataJobAspectArray.add(makeOwnershipAspect());
DataJobSnapshot dataJobSnapshot = new DataJobSnapshot().setUrn(dataJobUrn).setAspects(dataJobAspectArray);
List<DataJobDocument> actualDocs = new DataJobIndexBuilder().getDocumentsToUpdate(dataJobSnapshot);
assertEquals(actualDocs.size(), 4);
assertEquals(actualDocs.get(0).getInputs().get(0).getDatasetNameEntity(), "input1");
assertEquals(actualDocs.get(0).getInputs().get(1).getDatasetNameEntity(), "input2");
assertEquals(actualDocs.get(0).getOutputs().get(0).getDatasetNameEntity(), "output1");
assertEquals(actualDocs.get(0).getOutputs().get(1).getDatasetNameEntity(), "output2");
assertEquals(actualDocs.get(0).getNumInputDatasets(), new Long(2));
assertEquals(actualDocs.get(0).getNumOutputDatasets(), new Long(2));
assertEquals(actualDocs.get(0).getUrn(), dataJobUrn);
assertEquals(actualDocs.get(1).getName(), "You had one Job");
assertEquals(actualDocs.get(1).getDescription(), "A Job for one");
assertEquals(actualDocs.get(1).getUrn(), dataJobUrn);
assertEquals(actualDocs.get(2).getOwners().size(), 1);
assertEquals(actualDocs.get(2).getOwners().get(0), "fooUser");
assertTrue(actualDocs.get(2).hasHasOwners());
assertEquals(actualDocs.get(2).getUrn(), dataJobUrn);
assertEquals(actualDocs.get(3).getJobId(), "you_had_one_job");
assertEquals(actualDocs.get(3).getDataFlow(), "flow1");
assertEquals(actualDocs.get(3).getUrn(), dataJobUrn);
}
}

View File

@ -6,6 +6,8 @@ import com.linkedin.metadata.builders.search.ChartIndexBuilder;
import com.linkedin.metadata.builders.search.CorpGroupIndexBuilder;
import com.linkedin.metadata.builders.search.CorpUserInfoIndexBuilder;
import com.linkedin.metadata.builders.search.DashboardIndexBuilder;
import com.linkedin.metadata.builders.search.DataFlowIndexBuilder;
import com.linkedin.metadata.builders.search.DataJobIndexBuilder;
import com.linkedin.metadata.builders.search.DataProcessIndexBuilder;
import com.linkedin.metadata.builders.search.DatasetIndexBuilder;
import com.linkedin.metadata.builders.search.MLModelIndexBuilder;
@ -22,7 +24,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Configurations for search index builders
*/
@ -50,6 +51,8 @@ public class IndexBuildersConfig {
builders.add(new CorpUserInfoIndexBuilder());
builders.add(new ChartIndexBuilder());
builders.add(new DatasetIndexBuilder());
builders.add(new DataFlowIndexBuilder());
builders.add(new DataJobIndexBuilder());
builders.add(new DataProcessIndexBuilder());
builders.add(new DashboardIndexBuilder());
builders.add(new MLModelIndexBuilder());

View File

@ -48,4 +48,8 @@ record DataFlowDocument includes BaseDocument {
*/
owners: optional array[string]
/**
* Flag to indicate if the flow has non empty corp users as owners or not.
*/
hasOwners: optional boolean
}

View File

@ -2,6 +2,8 @@ namespace com.linkedin.metadata.search
import com.linkedin.common.AccessLevel
import com.linkedin.common.DataJobUrn
import com.linkedin.common.DatasetUrn
/**
* Data model for DataJob entity search
@ -38,4 +40,29 @@ record DataJobDocument includes BaseDocument {
*/
owners: optional array[string]
/**
* Flag to indicate if the job has non empty corp users as owners or not.
*/
hasOwners: optional boolean
/**
* Lineage information represented by the number of immediate input datasets of this job.
*/
numInputDatasets: optional long
/**
* Lineage information represented by the number of immediate output datasets of this job.
*/
numOutputDatasets: optional long
/**
* List of inputs for this job
*/
inputs: optional array[DatasetUrn]
/**
* List of outputs for this job
*/
outputs: optional array[DatasetUrn]
}