metadata-models 54.0.1 -> 58.0.1 (#1610)

Changes include:
Remove all keys that can be moved back to respective GMS
Add support for <, <=, >, >= conditions for the filter API
Update Conditions model for <, <=, >, >= conditions

MP_VERSION=metadata-models:58.0.1
MP_VERSION=wherehows-samza:1.0.56
This commit is contained in:
Jyoti Wadhwani 2020-03-26 21:53:29 -07:00 committed by GitHub
parent c2e4761753
commit 5fb3c97315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 404 additions and 207 deletions

View File

@ -1038,12 +1038,16 @@
"name" : "Condition",
"namespace" : "com.linkedin.metadata.query",
"doc" : "The matching condition in a filter criterion",
"symbols" : [ "EQUAL", "START_WITH", "END_WITH", "CONTAIN" ],
"symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ],
"symbolDocs" : {
"EQUAL" : "Represent the relation: field = value, e.g. platform = hdfs",
"START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView",
"CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile",
"END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event",
"CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile"
"EQUAL" : "Represent the relation: field = value, e.g. platform = hdfs",
"GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5",
"GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5",
"LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3",
"LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3",
"START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView"
}
}, {
"type" : "record",

View File

@ -172,12 +172,16 @@
"name" : "Condition",
"namespace" : "com.linkedin.metadata.query",
"doc" : "The matching condition in a filter criterion",
"symbols" : [ "EQUAL", "START_WITH", "END_WITH", "CONTAIN" ],
"symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ],
"symbolDocs" : {
"EQUAL" : "Represent the relation: field = value, e.g. platform = hdfs",
"START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView",
"CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile",
"END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event",
"CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile"
"EQUAL" : "Represent the relation: field = value, e.g. platform = hdfs",
"GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5",
"GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5",
"LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3",
"LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3",
"START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView"
}
}, {
"type" : "record",

View File

@ -220,12 +220,16 @@
"name" : "Condition",
"namespace" : "com.linkedin.metadata.query",
"doc" : "The matching condition in a filter criterion",
"symbols" : [ "EQUAL", "START_WITH", "END_WITH", "CONTAIN" ],
"symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ],
"symbolDocs" : {
"EQUAL" : "Represent the relation: field = value, e.g. platform = hdfs",
"START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView",
"CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile",
"END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event",
"CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile"
"EQUAL" : "Represent the relation: field = value, e.g. platform = hdfs",
"GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5",
"GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5",
"LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3",
"LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3",
"START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView"
}
}, {
"type" : "record",

View File

@ -19,6 +19,7 @@ dependencies {
testCompile externalDependency.parseqTest
testCompile externalDependency.mockito
testCompile externalDependency.testng
}
// Generate IDLs

View File

@ -83,7 +83,7 @@ public class ImmutableLocalDAO<ASPECT_UNION extends UnionTemplate, URN extends U
@Override
@Nonnull
public <ASPECT extends RecordTemplate> Optional<RecordTemplate> add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
public <ASPECT extends RecordTemplate> RecordTemplate add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<RecordTemplate>, RecordTemplate> updateLambda, @Nonnull AuditStamp auditStamp,
int maxTransactionRetry) {
throw new UnsupportedOperationException("Not supported by immutable DAO");

View File

@ -45,6 +45,9 @@ import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import static com.linkedin.metadata.dao.utils.SearchUtils.*;
/**
* A search DAO for Elasticsearch backend.
*/
@ -119,14 +122,44 @@ public class ESSearchDAO<DOCUMENT extends RecordTemplate> extends BaseSearchDAO<
@Override
@Nonnull
public SearchResult<DOCUMENT> filter(@Nullable Filter filters, @Nullable SortCriterion sortCriterion, int from, int size) {
final Map<String, String> requestMap = SearchUtils.getRequestMap(filters);
final SearchRequest searchRequest = ESUtils.getFilteredSearchQuery(requestMap, sortCriterion, from, size);
public SearchResult<DOCUMENT> filter(@Nullable Filter filters, @Nullable SortCriterion sortCriterion,
int from, int size) {
final SearchRequest searchRequest = getFilteredSearchQuery(filters, sortCriterion, from, size);
return executeAndExtract(searchRequest, from, size);
}
/**
* Returns a {@link SearchRequest} given filters to be applied to search query and sort criterion to be applied to search results
*
* @param filters {@link Filter} list of conditions with fields and values
* @param sortCriterion {@link SortCriterion} to be applied to the search results
* @param from index to start the search from
* @param size the number of search hits to return
* @return {@link SearchRequest} that contains the filtered query
*/
@Nonnull
SearchRequest getFilteredSearchQuery(@Nullable Filter filters, @Nullable SortCriterion sortCriterion,
int from, int size) {
final BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
if (filters != null) {
filters.getCriteria().forEach(criterion -> {
if (!criterion.getValue().trim().isEmpty()) {
boolQueryBuilder.filter(getQueryBuilderFromCriterion(criterion));
}
});
}
final SearchRequest searchRequest = new SearchRequest(_config.getIndexName());
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.from(from).size(size);
ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion);
searchRequest.source(searchSourceBuilder);
return searchRequest;
}
/**
* Constructs the search query based on the query request
*

View File

@ -5,7 +5,6 @@ import java.util.Arrays;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -48,32 +47,6 @@ public class ESUtils {
return boolFilter;
}
/**
* Returns a {@link SearchRequest} given filters to be applied to search query and sort criterion to be applied to search results
*
* @param requestMap search request map with fields and values
* @param sortCriterion {@link SortCriterion} to be applied to the search results
* @param from index to start the search from
* @param size the number of search hits to return
* @return {@link SearchRequest} that contains the filtered query
*/
@Nonnull
public static SearchRequest getFilteredSearchQuery(@Nonnull Map<String, String> requestMap, @Nullable SortCriterion sortCriterion, int from, int size) {
final BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
for (Map.Entry<String, String> entry : requestMap.entrySet()) {
if (!entry.getValue().trim().isEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery(entry.getKey(), entry.getValue().trim().split("\\s*,\\s*")));
}
}
final SearchRequest searchRequest = new SearchRequest();
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.from(from).size(size);
buildSortOrder(searchSourceBuilder, sortCriterion);
searchRequest.source(searchSourceBuilder);
return searchRequest;
}
/**
* Populates source field of search query with the sort order as per the criterion provided
*

View File

@ -1,5 +1,6 @@
package com.linkedin.metadata.dao.utils;
import com.linkedin.metadata.query.Condition;
import com.linkedin.metadata.query.Criterion;
import com.linkedin.metadata.query.CriterionArray;
import com.linkedin.metadata.query.Filter;
@ -13,6 +14,8 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@Slf4j
@ -44,6 +47,30 @@ public class SearchUtils {
return requestParams.getCriteria().stream().collect(Collectors.toMap(Criterion::getField, Criterion::getValue));
}
/**
* Builds search query using criterion
*
* @param criterion {@link Criterion} single criterion which contains field, value and a comparison operator
* @return QueryBuilder
*/
@Nonnull
public static QueryBuilder getQueryBuilderFromCriterion(@Nonnull Criterion criterion) {
final Condition condition = criterion.getCondition();
if (condition == Condition.EQUAL) {
return QueryBuilders.termsQuery(criterion.getField(), criterion.getValue().trim().split("\\s*,\\s*"));
} else if (condition == Condition.GREATER_THAN) {
return QueryBuilders.rangeQuery(criterion.getField()).gt(criterion.getValue().trim());
} else if (condition == Condition.GREATER_THAN_OR_EQUAL_TO) {
return QueryBuilders.rangeQuery(criterion.getField()).gte(criterion.getValue().trim());
} else if (condition == Condition.LESS_THAN) {
return QueryBuilders.rangeQuery(criterion.getField()).lt(criterion.getValue().trim());
} else if (condition == Condition.LESS_THAN_OR_EQUAL_TO) {
return QueryBuilders.rangeQuery(criterion.getField()).lte(criterion.getValue().trim());
}
throw new IllegalArgumentException("Unsupported condition: " + condition);
}
/**
* Converts a requestMap to a filter
*

View File

@ -2,12 +2,8 @@ package com.linkedin.metadata.dao;
import com.google.common.collect.ImmutableMap;
import com.linkedin.metadata.dao.utils.ESUtils;
import com.linkedin.metadata.query.SortCriterion;
import com.linkedin.metadata.query.SortOrder;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.testng.annotations.Test;
@ -39,26 +35,6 @@ public class ESUtilsTest {
assertEquals(queryBuilder.toString(), loadJsonFromResource("filterQuery/ComplexFilterQuery.json"));
}
@Test
public void testGetFilteredSearchQuery() throws IOException {
int from = 0;
int size = 10;
Map<String, String> requestMap = ImmutableMap.of("key1", "value1, value2 ", "key2", "value3", "key3", " ");
SortCriterion sortCriterion = new SortCriterion().setOrder(SortOrder.ASCENDING).setField("urn");
// Test 1: sort order provided
SearchRequest searchRequest = ESUtils.getFilteredSearchQuery(requestMap, sortCriterion, from, size);
assertEquals(searchRequest.source().toString(), loadJsonFromResource("SortByUrnTermsFilterQuery.json"));
// Test 2: no sort order provided, default is used.
searchRequest = ESUtils.getFilteredSearchQuery(requestMap, null, from, size);
assertEquals(searchRequest.source().toString(), loadJsonFromResource("DefaultSortTermsFilterQuery.json"));
// Test 3: empty request map provided
searchRequest = ESUtils.getFilteredSearchQuery(Collections.emptyMap(), sortCriterion, from, size);
assertEquals(searchRequest.source().toString(), loadJsonFromResource("EmptyFilterQuery.json"));
}
@Test
public void testEscapeReservedCharacters() {
assertEquals(escapeReservedCharacters("foobar"), "foobar");

View File

@ -20,13 +20,13 @@ import static org.testng.Assert.*;
public class BrowseDAOTest {
private BaseBrowseConfig _browseConfig;
private RestHighLevelClient _mockClient;
private ESBrowseDAO _mockBrowseDAO;
private ESBrowseDAO _browseDAO;
@BeforeMethod
public void setup() {
_browseConfig = new TestBrowseConfig();
_mockClient = mock(RestHighLevelClient.class);
_mockBrowseDAO = new ESBrowseDAO(_mockClient, _browseConfig);
_browseDAO = new ESBrowseDAO(_mockClient, _browseConfig);
}
@Test
@ -79,7 +79,7 @@ public class BrowseDAOTest {
when(mockSearchHits.getHits()).thenReturn(new SearchHit[0]);
when(mockSearchResponse.getHits()).thenReturn(mockSearchHits);
when(_mockClient.search(any())).thenReturn(mockSearchResponse);
assertEquals(_mockBrowseDAO.getBrowsePaths(dummyUrn).size(), 0);
assertEquals(_browseDAO.getBrowsePaths(dummyUrn).size(), 0);
// Test the case of single search hit & browsePaths field doesn't exist
when(mockSourceMap.containsKey(_browseConfig.getBrowsePathFieldName())).thenReturn(false);
@ -87,7 +87,7 @@ public class BrowseDAOTest {
when(mockSearchHits.getHits()).thenReturn(new SearchHit[]{mockSearchHit});
when(mockSearchResponse.getHits()).thenReturn(mockSearchHits);
when(_mockClient.search(any())).thenReturn(mockSearchResponse);
assertEquals(_mockBrowseDAO.getBrowsePaths(dummyUrn).size(), 0);
assertEquals(_browseDAO.getBrowsePaths(dummyUrn).size(), 0);
// Test the case of single search hit & browsePaths field exists
when(mockSourceMap.containsKey(_browseConfig.getBrowsePathFieldName())).thenReturn(true);
@ -96,7 +96,7 @@ public class BrowseDAOTest {
when(mockSearchHits.getHits()).thenReturn(new SearchHit[]{mockSearchHit});
when(mockSearchResponse.getHits()).thenReturn(mockSearchHits);
when(_mockClient.search(any())).thenReturn(mockSearchResponse);
assertEquals(_mockBrowseDAO.getBrowsePaths(dummyUrn).size(), 1);
assertEquals(_mockBrowseDAO.getBrowsePaths(dummyUrn).get(0), "foo");
assertEquals(_browseDAO.getBrowsePaths(dummyUrn).size(), 1);
assertEquals(_browseDAO.getBrowsePaths(dummyUrn).get(0), "foo");
}
}

View File

@ -1,24 +1,35 @@
package com.linkedin.metadata.dao.search;
import com.google.common.collect.ImmutableMap;
import com.linkedin.common.UrnArray;
import com.linkedin.data.DataList;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.StringArray;
import com.linkedin.metadata.dao.utils.SearchUtils;
import com.linkedin.metadata.query.AggregationMetadataArray;
import com.linkedin.metadata.query.Condition;
import com.linkedin.metadata.query.Criterion;
import com.linkedin.metadata.query.CriterionArray;
import com.linkedin.metadata.query.Filter;
import com.linkedin.metadata.query.SearchResultMetadata;
import com.linkedin.metadata.query.SortCriterion;
import com.linkedin.metadata.query.SortOrder;
import com.linkedin.testing.EntityDocument;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static com.linkedin.metadata.utils.TestUtils.*;
import static com.linkedin.testing.TestUtils.*;
import static org.testng.Assert.*;
import static org.mockito.Mockito.*;
@ -26,13 +37,15 @@ import static org.mockito.Mockito.*;
public class ESSearchDAOTest {
private ESSearchDAO<EntityDocument> _mockSearchDAO;
private ESAutoCompleteQueryForHighCardinalityFields _baseESAutoCompleteQuery;
private ESSearchDAO<EntityDocument> _searchDAO;
private ESAutoCompleteQueryForHighCardinalityFields _esAutoCompleteQuery;
private TestSearchConfig _testSearchConfig;
@BeforeMethod
public void setup() throws Exception {
_mockSearchDAO = new ESSearchDAO(null, EntityDocument.class, new TestSearchConfig());
_baseESAutoCompleteQuery = new ESAutoCompleteQueryForHighCardinalityFields(new TestSearchConfig());
_testSearchConfig = new TestSearchConfig();
_searchDAO = new ESSearchDAO(null, EntityDocument.class, _testSearchConfig);
_esAutoCompleteQuery = new ESAutoCompleteQueryForHighCardinalityFields(_testSearchConfig);
}
@Test
@ -84,7 +97,7 @@ public class ESSearchDAOTest {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.getHits()).thenReturn(searchHits);
StringArray res = _baseESAutoCompleteQuery.getSuggestionList(searchResponse, "name", "test", 2);
StringArray res = _esAutoCompleteQuery.getSuggestionList(searchResponse, "name", "test", 2);
assertEquals(res.size(), 2);
}
@ -96,7 +109,7 @@ public class ESSearchDAOTest {
when(searchHits1.getTotalHits()).thenReturn(10L);
SearchResponse searchResponse1 = mock(SearchResponse.class);
when(searchResponse1.getHits()).thenReturn(searchHits1);
assertEquals(_mockSearchDAO.extractSearchResultMetadata(searchResponse1), getDefaultSearchResultMetadata());
assertEquals(_searchDAO.extractSearchResultMetadata(searchResponse1), getDefaultSearchResultMetadata());
// Test: urn field exists in search document
SearchHits searchHits2 = mock(SearchHits.class);
@ -106,7 +119,7 @@ public class ESSearchDAOTest {
SearchResponse searchResponse2 = mock(SearchResponse.class);
when(searchResponse2.getHits()).thenReturn(searchHits2);
UrnArray urns = new UrnArray(Arrays.asList(makeUrn(1), makeUrn(2)));
assertEquals(_mockSearchDAO.extractSearchResultMetadata(searchResponse2), getDefaultSearchResultMetadata().setUrns(urns));
assertEquals(_searchDAO.extractSearchResultMetadata(searchResponse2), getDefaultSearchResultMetadata().setUrns(urns));
// Test: urn field does not exist in one search document, exists in another
SearchHits searchHits3 = mock(SearchHits.class);
@ -116,7 +129,7 @@ public class ESSearchDAOTest {
when(searchHits3.getHits()).thenReturn(new SearchHit[]{hit3, hit4});
SearchResponse searchResponse3 = mock(SearchResponse.class);
when(searchResponse3.getHits()).thenReturn(searchHits3);
assertThrows(RuntimeException.class, () -> _mockSearchDAO.extractSearchResultMetadata(searchResponse3));
assertThrows(RuntimeException.class, () -> _searchDAO.extractSearchResultMetadata(searchResponse3));
}
@Test
@ -129,7 +142,58 @@ public class ESSearchDAOTest {
DataMap dataMap = new DataMap();
dataMap.put("field1", "val1");
dataMap.put("field3", new DataList(arrayList));
assertEquals(_mockSearchDAO.buildDocumentsDataMap(sourceData), dataMap);
assertEquals(_searchDAO.buildDocumentsDataMap(sourceData), dataMap);
}
@Test
public void testFilteredQueryWithTermsFilter() throws IOException {
int from = 0;
int size = 10;
Filter filter = SearchUtils.getFilter(ImmutableMap.of("key1", "value1, value2 ", "key2", "value3", "key3", " "));
SortCriterion sortCriterion = new SortCriterion().setOrder(SortOrder.ASCENDING).setField("urn");
// Test 1: sort order provided
SearchRequest searchRequest = _searchDAO.getFilteredSearchQuery(filter, sortCriterion, from, size);
assertEquals(searchRequest.source().toString(), loadJsonFromResource("SortByUrnTermsFilterQuery.json"));
assertEquals(searchRequest.indices(), new String[] {_testSearchConfig.getIndexName()});
// Test 2: no sort order provided, default is used.
searchRequest = _searchDAO.getFilteredSearchQuery(filter, null, from, size);
assertEquals(searchRequest.source().toString(), loadJsonFromResource("DefaultSortTermsFilterQuery.json"));
assertEquals(searchRequest.indices(), new String[] {_testSearchConfig.getIndexName()});
// Test 3: empty request map provided
searchRequest = _searchDAO.getFilteredSearchQuery(SearchUtils.getFilter(Collections.emptyMap()), sortCriterion, from, size);
assertEquals(searchRequest.source().toString(), loadJsonFromResource("EmptyFilterQuery.json"));
assertEquals(searchRequest.indices(), new String[] {_testSearchConfig.getIndexName()});
}
@Test
public void testFilteredQueryWithRangeFilter() throws IOException {
int from = 0;
int size = 10;
final Filter filter1 = new Filter().setCriteria(new CriterionArray(Arrays.asList(
new Criterion().setField("field_gt").setValue("100").setCondition(Condition.GREATER_THAN),
new Criterion().setField("field_gte").setValue("200").setCondition(Condition.GREATER_THAN_OR_EQUAL_TO),
new Criterion().setField("field_lt").setValue("300").setCondition(Condition.LESS_THAN),
new Criterion().setField("field_lte").setValue("400").setCondition(Condition.LESS_THAN_OR_EQUAL_TO)
)));
SortCriterion sortCriterion = new SortCriterion().setOrder(SortOrder.ASCENDING).setField("urn");
SearchRequest searchRequest = _searchDAO.getFilteredSearchQuery(filter1, sortCriterion, from, size);
assertEquals(searchRequest.source().toString(), loadJsonFromResource("RangeFilterQuery.json"));
assertEquals(searchRequest.indices(), new String[] {_testSearchConfig.getIndexName()});
}
@Test
public void testFilteredQueryUnsupportedCondition() throws IOException {
int from = 0;
int size = 10;
final Filter filter2 = new Filter().setCriteria(new CriterionArray(Arrays.asList(
new Criterion().setField("field_contain").setValue("value_contain").setCondition(Condition.CONTAIN)
)));
SortCriterion sortCriterion = new SortCriterion().setOrder(SortOrder.ASCENDING).setField("urn");
assertThrows(IllegalArgumentException.class, () -> _searchDAO.getFilteredSearchQuery(filter2, sortCriterion, from, size));
}
private static SearchHit makeSearchHit(int id) {

View File

@ -0,0 +1,64 @@
{
"from" : 0,
"size" : 10,
"query" : {
"bool" : {
"filter" : [
{
"range" : {
"field_gt" : {
"from" : "100",
"to" : null,
"include_lower" : false,
"include_upper" : true,
"boost" : 1.0
}
}
},
{
"range" : {
"field_gte" : {
"from" : "200",
"to" : null,
"include_lower" : true,
"include_upper" : true,
"boost" : 1.0
}
}
},
{
"range" : {
"field_lt" : {
"from" : null,
"to" : "300",
"include_lower" : true,
"include_upper" : false,
"boost" : 1.0
}
}
},
{
"range" : {
"field_lte" : {
"from" : null,
"to" : "400",
"include_lower" : true,
"include_upper" : true,
"boost" : 1.0
}
}
}
],
"disable_coord" : false,
"adjust_pure_negative" : true,
"boost" : 1.0
}
},
"sort" : [
{
"urn" : {
"order" : "asc"
}
}
]
}

View File

@ -140,10 +140,10 @@ public abstract class BaseLocalDAO<ASPECT_UNION extends UnionTemplate, URN exten
* @param urn the URN for the entity the aspect is attached to
* @param auditStamp the audit stamp for the operation
* @param updateLambda a lambda expression that takes the previous version of aspect and returns the new version
* @return {@link RecordTemplate} of the new value of aspect, empty if the transaction fails
* @return {@link RecordTemplate} of the new value of aspect
*/
@Nonnull
public <ASPECT extends RecordTemplate> Optional<RecordTemplate> add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
public <ASPECT extends RecordTemplate> RecordTemplate add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<RecordTemplate>, RecordTemplate> updateLambda, @Nonnull AuditStamp auditStamp,
int maxTransactionRetry) {
@ -151,7 +151,7 @@ public abstract class BaseLocalDAO<ASPECT_UNION extends UnionTemplate, URN exten
final EqualityTester<ASPECT> equalityTester = getEqualityTester(aspectClass);
AddResult result = runInTransactionWithRetry(() -> {
final AddResult result = runInTransactionWithRetry(() -> {
// 1. Compute newValue based on oldValue
AspectEntry latest = getLatest(urn, aspectClass);
final ASPECT oldValue = latest == null ? null : (ASPECT) latest.getAspect();
@ -178,17 +178,16 @@ public abstract class BaseLocalDAO<ASPECT_UNION extends UnionTemplate, URN exten
}, maxTransactionRetry);
// 5. Produce MAE after a successful update
if (result != null) {
_producer.produceMetadataAuditEvent(urn, result.getOldValue(), result.getNewValue());
}
return Optional.ofNullable(result).map(r -> result.getNewValue());
_producer.produceMetadataAuditEvent(urn, result.getOldValue(), result.getNewValue());
return result.getNewValue();
}
/**
* Similar to {@link #add(Urn, Class, Function, AuditStamp, int)} but uses the default maximum transaction retry.
*/
@Nonnull
public <ASPECT extends RecordTemplate> Optional<RecordTemplate> add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
public <ASPECT extends RecordTemplate> RecordTemplate add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<RecordTemplate>, RecordTemplate> updateLambda, @Nonnull AuditStamp auditStamp) {
return add(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY);
}
@ -197,7 +196,7 @@ public abstract class BaseLocalDAO<ASPECT_UNION extends UnionTemplate, URN exten
* Similar to {@link #add(Urn, Class, Function, AuditStamp)} but takes the new value directly.
*/
@Nonnull
public <ASPECT extends RecordTemplate> Optional<RecordTemplate> add(@Nonnull URN urn, @Nonnull ASPECT newValue,
public <ASPECT extends RecordTemplate> RecordTemplate add(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp) {
return add(urn, newValue.getClass(), ignored -> newValue, auditStamp);
}
@ -241,6 +240,7 @@ public abstract class BaseLocalDAO<ASPECT_UNION extends UnionTemplate, URN exten
* @param <T> type for the result object
* @return the result object from a successfully committed transaction
*/
@Nonnull
protected abstract <T> T runInTransactionWithRetry(@Nonnull Supplier<T> block, int maxTransactionRetry);
/**

View File

@ -4,15 +4,23 @@
"namespace": "com.linkedin.metadata.query",
"doc": "The matching condition in a filter criterion",
"symbols": [
"EQUAL",
"START_WITH",
"CONTAIN",
"END_WITH",
"CONTAIN"
"EQUAL",
"GREATER_THAN",
"GREATER_THAN_OR_EQUAL_TO",
"LESS_THAN",
"LESS_THAN_OR_EQUAL_TO",
"START_WITH"
],
"symbolDocs": {
"EQUAL": "Represent the relation: field = value, e.g. platform = hdfs",
"START_WITH": "Represent the relation: String field starts with value, e.g. name starts with PageView",
"CONTAIN": "Represent the relation: String field contains value, e.g. name contains Profile",
"END_WITH": "Represent the relation: String field ends with value, e.g. name ends with Event",
"CONTAIN": "Represent the relation: String field contains value, e.g. name contains Profile"
"EQUAL": "Represent the relation: field = value, e.g. platform = hdfs",
"GREATER_THAN": "Represent the relation greater than, e.g. ownerCount > 5",
"GREATER_THAN_OR_EQUAL_TO": "Represent the relation greater than or equal to, e.g. ownerCount >= 5",
"LESS_THAN": "Represent the relation less than, e.g. ownerCount < 3",
"LESS_THAN_OR_EQUAL_TO": "Represent the relation less than or equal to, e.g. ownerCount <= 3",
"START_WITH": "Represent the relation: String field starts with value, e.g. name starts with PageView"
}
}

View File

@ -0,0 +1,16 @@
{
"type": "record",
"name": "SnapshotKey",
"namespace": "com.linkedin.metadata.snapshot",
"doc": "The Key for a metadata Snapshot.",
"fields": [
{
"name": "aspectVersions",
"doc": "A list of metadata aspects in the Snapshot and their versions",
"type": {
"type": "array",
"items": "com.linkedin.metadata.aspect.AspectVersion"
}
}
]
}

View File

@ -113,19 +113,19 @@ public class EventUtils {
return renameSchemaNamespace(original, ORIGINAL_MCE_AVRO_SCHEMA, RENAMED_MCE_AVRO_SCHEMA);
}
/**
* Converts a Pegasus Failed MCE into the equivalent Avro model as a {@link GenericRecord}.
*
* @param failedMetadataChangeEvent the Pegasus {@link FailedMetadataChangeEvent} model
* @return the Avro model with com.linkedin.pegasus2avro.mxe namesapce
* @throws IOException if the conversion fails
*/
@Nonnull
public static GenericRecord pegasusToAvroFailedMCE(@Nonnull FailedMetadataChangeEvent failedMetadataChangeEvent) throws IOException {
GenericRecord original =
DataTranslator.dataMapToGenericRecord(failedMetadataChangeEvent.data(), failedMetadataChangeEvent.schema(), ORIGINAL_FAILED_MCE_AVRO_SCHEMA);
return renameSchemaNamespace(original, ORIGINAL_FAILED_MCE_AVRO_SCHEMA, RENAMED_FAILED_MCE_AVRO_SCHEMA);
}
/**
* Converts a Pegasus Failed MCE into the equivalent Avro model as a {@link GenericRecord}.
*
* @param failedMetadataChangeEvent the Pegasus {@link FailedMetadataChangeEvent} model
* @return the Avro model with com.linkedin.pegasus2avro.mxe namesapce
* @throws IOException if the conversion fails
*/
@Nonnull
public static GenericRecord pegasusToAvroFailedMCE(@Nonnull FailedMetadataChangeEvent failedMetadataChangeEvent) throws IOException {
GenericRecord original =
DataTranslator.dataMapToGenericRecord(failedMetadataChangeEvent.data(), failedMetadataChangeEvent.schema(), ORIGINAL_FAILED_MCE_AVRO_SCHEMA);
return renameSchemaNamespace(original, ORIGINAL_FAILED_MCE_AVRO_SCHEMA, RENAMED_FAILED_MCE_AVRO_SCHEMA);
}
/**
* Converts original MXE into a renamed namespace

View File

@ -22,71 +22,71 @@ import static org.testng.Assert.*;
public class EventUtilsTests {
@Test
public void testAvroToPegasusMAE() throws IOException {
GenericRecord record = genericRecordFromResource("test-avro2pegasus-mae.json",
com.linkedin.pegasus2avro.mxe.MetadataAuditEvent.SCHEMA$);
@Test
public void testAvroToPegasusMAE() throws IOException {
GenericRecord record = genericRecordFromResource("test-avro2pegasus-mae.json",
com.linkedin.pegasus2avro.mxe.MetadataAuditEvent.SCHEMA$);
MetadataAuditEvent mae = EventUtils.avroToPegasusMAE(record);
MetadataAuditEvent mae = EventUtils.avroToPegasusMAE(record);
assertEquals(
mae.getNewSnapshot().getDatasetSnapshot().getAspects().get(0).getOwnership().getOwners().get(0).getOwner(),
new CorpuserUrn("foobar"));
}
assertEquals(
mae.getNewSnapshot().getDatasetSnapshot().getAspects().get(0).getOwnership().getOwners().get(0).getOwner(),
new CorpuserUrn("foobar"));
}
@Test
public void testAvroToPegasusMCE() throws IOException {
GenericRecord record = genericRecordFromResource("test-avro2pegasus-mce.json",
com.linkedin.pegasus2avro.mxe.MetadataChangeEvent.SCHEMA$);
@Test
public void testAvroToPegasusMCE() throws IOException {
GenericRecord record = genericRecordFromResource("test-avro2pegasus-mce.json",
com.linkedin.pegasus2avro.mxe.MetadataChangeEvent.SCHEMA$);
MetadataChangeEvent mce = EventUtils.avroToPegasusMCE(record);
MetadataChangeEvent mce = EventUtils.avroToPegasusMCE(record);
assertEquals(
mce.getProposedSnapshot().getDatasetSnapshot().getAspects().get(0).getOwnership().getOwners().get(0).getOwner(),
new CorpuserUrn("foobar"));
}
assertEquals(
mce.getProposedSnapshot().getDatasetSnapshot().getAspects().get(0).getOwnership().getOwners().get(0).getOwner(),
new CorpuserUrn("foobar"));
}
@Test
public void testPegasusToAvroMAE() throws IOException {
MetadataAuditEvent event = recordTemplateFromResource("test-pegasus2avro-mae.json", MetadataAuditEvent.class);
@Test
public void testPegasusToAvroMAE() throws IOException {
MetadataAuditEvent event = recordTemplateFromResource("test-pegasus2avro-mae.json", MetadataAuditEvent.class);
GenericRecord record = EventUtils.pegasusToAvroMAE(event);
GenericRecord record = EventUtils.pegasusToAvroMAE(event);
assertEquals(record.getSchema(), com.linkedin.pegasus2avro.mxe.MetadataAuditEvent.SCHEMA$);
assertNotNull(record.get("newSnapshot"));
}
assertEquals(record.getSchema(), com.linkedin.pegasus2avro.mxe.MetadataAuditEvent.SCHEMA$);
assertNotNull(record.get("newSnapshot"));
}
@Test
public void testPegasusToAvroMCE() throws IOException {
MetadataChangeEvent event = recordTemplateFromResource("test-pegasus2avro-mce.json", MetadataChangeEvent.class);
@Test
public void testPegasusToAvroMCE() throws IOException {
MetadataChangeEvent event = recordTemplateFromResource("test-pegasus2avro-mce.json", MetadataChangeEvent.class);
GenericRecord record = EventUtils.pegasusToAvroMCE(event);
GenericRecord record = EventUtils.pegasusToAvroMCE(event);
assertEquals(record.getSchema(), com.linkedin.pegasus2avro.mxe.MetadataChangeEvent.SCHEMA$);
assertNotNull(record.get("proposedSnapshot"));
}
assertEquals(record.getSchema(), com.linkedin.pegasus2avro.mxe.MetadataChangeEvent.SCHEMA$);
assertNotNull(record.get("proposedSnapshot"));
}
@Test
public void testPegasusToAvroFailedMCE() throws IOException {
FailedMetadataChangeEvent event = recordTemplateFromResource("test-pegasus2avro-fmce.json", FailedMetadataChangeEvent.class);
@Test
public void testPegasusToAvroFailedMCE() throws IOException {
FailedMetadataChangeEvent event = recordTemplateFromResource("test-pegasus2avro-fmce.json", FailedMetadataChangeEvent.class);
GenericRecord record = EventUtils.pegasusToAvroFailedMCE(event);
GenericRecord record = EventUtils.pegasusToAvroFailedMCE(event);
assertEquals(record.getSchema(), com.linkedin.pegasus2avro.mxe.FailedMetadataChangeEvent.SCHEMA$);
assertNotNull(record.get("error"));
assertNotNull(record.get("metadataChangeEvent"));
}
assertEquals(record.getSchema(), com.linkedin.pegasus2avro.mxe.FailedMetadataChangeEvent.SCHEMA$);
assertNotNull(record.get("error"));
assertNotNull(record.get("metadataChangeEvent"));
}
private GenericRecord genericRecordFromResource(String resourcePath, Schema schema) throws IOException {
InputStream is = getClass().getClassLoader().getResourceAsStream(resourcePath);
JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, is);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, decoder);
}
private GenericRecord genericRecordFromResource(String resourcePath, Schema schema) throws IOException {
InputStream is = getClass().getClassLoader().getResourceAsStream(resourcePath);
JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, is);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, decoder);
}
private <T extends RecordTemplate> T recordTemplateFromResource(String resourcePath,
Class<? extends RecordTemplate> clazz) throws IOException {
String json = loadJsonFromResource(resourcePath);
return (T) RecordUtils.toRecordTemplate(clazz, json);
}
private <T extends RecordTemplate> T recordTemplateFromResource(String resourcePath,
Class<? extends RecordTemplate> clazz) throws IOException {
String json = loadJsonFromResource(resourcePath);
return (T) RecordUtils.toRecordTemplate(clazz, json);
}
}

View File

@ -1,26 +1,26 @@
{
"metadataChangeEvent": {
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:foo,bar,baz)",
"aspects": [
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:foobar",
"type": "DEVELOPER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:foobar"
}
}
}
]
"metadataChangeEvent": {
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:foo,bar,baz)",
"aspects": [
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:foobar",
"type": "DEVELOPER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:foobar"
}
}
}
},
"error": "Test"
}
}
]
}
}
},
"error": "Test"
}

View File

@ -15,4 +15,5 @@ dependencies {
testCompile project(':metadata-testing:metadata-test-utils')
testCompile externalDependency.parseqTest
testCompile externalDependency.mockito
}
testCompile externalDependency.testng
}

View File

@ -125,6 +125,22 @@ public abstract class BaseVersionedAspectResource<URN extends Urn, ASPECT_UNION
});
}
/**
* Similar to {@link #create(Class, Function)} but returns {@link CreateKVResponse} containing latest version and created aspect
*/
@RestMethod.Create
@ReturnEntity
@Nonnull
public Task<CreateKVResponse<Long, ASPECT>> createAndGet(@Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<RecordTemplate>, RecordTemplate> createLambda) {
return RestliUtils.toTask(() -> {
final URN urn = getUrn(getContext().getPathKeys());
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
final ASPECT newValue = (ASPECT) getLocalDAO().add(urn, aspectClass, createLambda, auditStamp);
return new CreateKVResponse<>(LATEST_VERSION, newValue);
});
}
/**
* Creates using the provided default value only if the aspect is not set already
*
@ -134,13 +150,7 @@ public abstract class BaseVersionedAspectResource<URN extends Urn, ASPECT_UNION
@RestMethod.Create
@ReturnEntity
@Nonnull
public Task<Optional<CreateKVResponse<Long, ASPECT>>> createIfAbsent(@Nonnull ASPECT defaultValue) {
return RestliUtils.toTask(() -> {
final URN urn = getUrn(getContext().getPathKeys());
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
final Optional<ASPECT> newValue = (Optional<ASPECT>) getLocalDAO().add(urn, (Class<ASPECT>) defaultValue.getClass(),
ignored -> ignored.orElse(defaultValue), auditStamp);
return newValue.map(val -> new CreateKVResponse<>(LATEST_VERSION, val));
});
public Task<CreateKVResponse<Long, ASPECT>> createIfAbsent(@Nonnull ASPECT defaultValue) {
return createAndGet((Class<ASPECT>) defaultValue.getClass(), ignored -> ignored.orElse(defaultValue));
}
}

View File

@ -121,6 +121,19 @@ public class BaseVersionedAspectResourceTest extends BaseEngineTest {
verifyNoMoreInteractions(_mockLocalDAO);
}
@Test
public void testCreateResponseViaLambda() {
AspectFoo foo = new AspectFoo().setValue("foo");
Function<Optional<RecordTemplate>, RecordTemplate> createLambda = (prev) -> foo;
when(_mockLocalDAO.add(eq(ENTITY_URN), eq(AspectFoo.class), eq(createLambda), any())).thenReturn(foo);
CreateKVResponse<Long, AspectFoo> response = runAndWait(_resource.createAndGet(AspectFoo.class, createLambda));
assertEquals(response.getStatus().getCode(), 201);
assertEquals(response.getEntity(), foo);
assertEquals(response.getId(), Long.valueOf(LATEST_VERSION));
}
@Test
public void testCreateIfAbsentWithoutExistingValue() {
AspectFoo defaultValue = new AspectFoo().setValue("foo");
@ -128,15 +141,14 @@ public class BaseVersionedAspectResourceTest extends BaseEngineTest {
Object[] args = invocation.getArguments();
assertTrue(args[2] instanceof Function);
Function<Optional<RecordTemplate>, RecordTemplate> lambda = (Function<Optional<RecordTemplate>, RecordTemplate>) args[2];
return Optional.of(lambda.apply(Optional.empty()));
return lambda.apply(Optional.empty());
});
Optional<CreateKVResponse<Long, AspectFoo>> response = runAndWait(_resource.createIfAbsent(defaultValue));
CreateKVResponse<Long, AspectFoo> response = runAndWait(_resource.createIfAbsent(defaultValue));
assertTrue(response.isPresent());
assertEquals(response.get().getStatus().getCode(), 201);
assertEquals(response.get().getEntity(), defaultValue);
assertEquals(response.get().getId(), Long.valueOf(LATEST_VERSION));
assertEquals(response.getStatus().getCode(), 201);
assertEquals(response.getEntity(), defaultValue);
assertEquals(response.getId(), Long.valueOf(LATEST_VERSION));
}
@Test
@ -147,14 +159,13 @@ public class BaseVersionedAspectResourceTest extends BaseEngineTest {
Object[] args = invocation.getArguments();
assertTrue(args[2] instanceof Function);
Function<Optional<RecordTemplate>, RecordTemplate> lambda = (Function<Optional<RecordTemplate>, RecordTemplate>) args[2];
return Optional.of(lambda.apply(Optional.of(oldVal)));
return lambda.apply(Optional.of(oldVal));
});
Optional<CreateKVResponse<Long, AspectFoo>> response = runAndWait(_resource.createIfAbsent(defaultValue));
CreateKVResponse<Long, AspectFoo> response = runAndWait(_resource.createIfAbsent(defaultValue));
assertTrue(response.isPresent());
assertEquals(response.get().getStatus().getCode(), 201);
assertEquals(response.get().getEntity(), oldVal);
assertEquals(response.get().getId(), Long.valueOf(LATEST_VERSION));
assertEquals(response.getStatus().getCode(), 201);
assertEquals(response.getEntity(), oldVal);
assertEquals(response.getId(), Long.valueOf(LATEST_VERSION));
}
}

View File

@ -5,6 +5,7 @@
"doc": "For unit tests",
"ref": [
"AspectFoo",
"AspectBar",
"AspectFooEvolved"
]
}