diff --git a/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 0fdf6376bc..5b2a96dea6 100644 --- a/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -24,6 +24,7 @@ import io.ebean.EbeanServer; import io.ebean.EbeanServerFactory; import io.ebean.ExpressionList; import io.ebean.PagedList; +import io.ebean.Query; import io.ebean.Transaction; import io.ebean.config.ServerConfig; import io.ebean.datasource.DataSourceConfig; @@ -33,12 +34,12 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.persistence.RollbackException; @@ -606,10 +607,10 @@ public class EbeanLocalDAO object = indexValue.getDouble(); return new GMAIndexPair(EbeanMetadataIndex.DOUBLE_COLUMN, object); } else if (indexValue.isFloat()) { - object = indexValue.getFloat(); + object = (indexValue.getFloat()).doubleValue(); return new GMAIndexPair(EbeanMetadataIndex.DOUBLE_COLUMN, object); } else if (indexValue.isInt()) { - object = indexValue.getInt(); + object = Long.valueOf(indexValue.getInt()); return new GMAIndexPair(EbeanMetadataIndex.LONG_COLUMN, object); } else if (indexValue.isLong()) { object = indexValue.getLong(); @@ -622,11 +623,56 @@ public class EbeanLocalDAO } } + /** + * Sets the values of parameters in metadata index query based on its position, values obtained from + * {@link IndexCriterionArray} and last urn + * + * @param indexCriterionArray {@link IndexCriterionArray} whose values will be used to set parameters in metadata + * index query based on its position + * @param indexQuery {@link Query} whose ordered parameters need to be set, based on it's position + * @param lastUrn String representation of the urn whose value is used to set the last urn parameter in index query + */ + private static void setParameters(@Nonnull IndexCriterionArray indexCriterionArray, @Nonnull Query indexQuery, + @Nonnull String lastUrn) { + indexQuery.setParameter(1, lastUrn); + int pos = 2; + for (IndexCriterion criterion : indexCriterionArray) { + indexQuery.setParameter(pos++, criterion.getAspect()); + if (criterion.hasPathParams()) { + indexQuery.setParameter(pos++, criterion.getPathParams().getPath()); + indexQuery.setParameter(pos++, getGMAIndexPair(criterion.getPathParams().getValue()).value); + } + } + } + + /** + * Constructs SQL query that contains positioned parameters (with `?`), based on whether {@link IndexCriterion} of + * a given condition has field `pathParams` + * + * @param indexCriterionArray {@link IndexCriterionArray} used to construct the SQL query + * @return String representation of SQL query + */ + @Nonnull + private static String constructSQLQuery(@Nonnull IndexCriterionArray indexCriterionArray) { + String selectClause = "SELECT DISTINCT(t0.urn) FROM metadata_index t0"; + selectClause += IntStream.range(1, indexCriterionArray.size()).mapToObj(i -> " INNER JOIN metadata_index " + "t" + + i + " ON t0.urn = " + "t" + i + ".urn").collect(Collectors.joining("")); + final StringBuilder whereClause = new StringBuilder("WHERE t0.urn > ?"); + IntStream.range(0, indexCriterionArray.size()).forEach(i -> { + final IndexCriterion criterion = indexCriterionArray.get(i); + whereClause.append(" AND t").append(i).append(".aspect = ?"); + if (criterion.hasPathParams()) { + whereClause.append(" AND t").append(i).append(".path = ?").append(" AND t").append(i).append(".") + .append(getGMAIndexPair(criterion.getPathParams().getValue()).valueType).append(" = ?"); + } + }); + return selectClause + " " + whereClause; + } + /** * Returns list of urns from strongly consistent secondary index that satisfy the given filter conditions. * Results are sorted in increasing alphabetical order of urn. - * NOTE: Currently this works for only one filter condition - * TODO: Extend the support for multiple filter conditions + * NOTE: Currently this works for upto 10 filter conditions. * * @param indexFilter {@link IndexFilter} containing filter conditions to be applied * @param lastUrn last urn of the previous fetched page. This eliminates the need to use offset which @@ -644,32 +690,21 @@ public class EbeanLocalDAO if (indexCriterionArray.size() == 0) { throw new UnsupportedOperationException("Empty Index Filter is not supported by EbeanLocalDAO"); } - if (indexCriterionArray.size() > 1) { - throw new UnsupportedOperationException("Currently only one filter condition is supported by EbeanLocalDAO"); + if (indexCriterionArray.size() > 10) { + throw new UnsupportedOperationException("Currently more than 10 filter conditions is not supported by EbeanLocalDAO"); } + final Query query = _server.findNative(EbeanMetadataIndex.class, constructSQLQuery(indexCriterionArray)); + setParameters(indexCriterionArray, query, lastUrn == null ? "" : lastUrn.toString()); - final IndexCriterion criterion = indexCriterionArray.get(0); - ExpressionList expressionList = _server.find(EbeanMetadataIndex.class) - .setDistinct(true) - .select(EbeanMetadataIndex.URN_COLUMN) - .where() - .gt(EbeanMetadataIndex.URN_COLUMN, lastUrn == null ? "" : lastUrn.toString()) - .eq(EbeanMetadataIndex.ASPECT_COLUMN, criterion.getAspect()); - if (criterion.hasPathParams()) { - final GMAIndexPair gmaIndexPair = getGMAIndexPair(criterion.getPathParams().getValue()); - expressionList = expressionList - .eq(EbeanMetadataIndex.PATH_COLUMN, criterion.getPathParams().getPath()) - .eq(gmaIndexPair.valueType, gmaIndexPair.value); - } - final PagedList pagedList = expressionList + final PagedList pagedList = query .orderBy() .asc(EbeanMetadataIndex.URN_COLUMN) .setMaxRows(pageSize) .findPagedList(); + final List urns = pagedList.getList() .stream() .map(EbeanLocalDAO::extractUrn) - .filter(Objects::nonNull) .collect(Collectors.toList()); return toListResult(urns, null, pagedList, null); } diff --git a/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java b/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java index 151b2c5c80..f48ac3844c 100644 --- a/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java +++ b/metadata-dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.persistence.RollbackException; import org.mockito.InOrder; import org.testng.annotations.BeforeClass; @@ -560,6 +561,88 @@ public class EbeanLocalDAOTest { assertEquals(results.getValues(), new ArrayList<>()); } + private static IndexCriterionArray makeIndexCriterionArray(int size) { + List criterionArrays = new ArrayList<>(); + IntStream.range(0, size).forEach(i -> criterionArrays.add(new IndexCriterion().setAspect("aspect" + i))); + return new IndexCriterionArray(criterionArrays); + } + + @Test + void testListUrnsFromIndexManyFilters() { + EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); + dao.enableLocalSecondaryIndex(true); + FooUrn urn1 = makeFooUrn(1); + FooUrn urn2 = makeFooUrn(2); + FooUrn urn3 = makeFooUrn(3); + String aspect1 = "aspect1" + System.currentTimeMillis(); + String aspect2 = "aspect2" + System.currentTimeMillis(); + + addIndex(urn1, aspect1, "/path1", true); // boolean + addIndex(urn1, aspect1, "/path2", 1.534e2); // double + addIndex(urn1, aspect1, "/path3", 123.4f); // float + addIndex(urn1, aspect2, "/path4", 123); // int + addIndex(urn1, aspect2, "/path5", 1234L); // long + addIndex(urn1, aspect2, "/path6", "val"); // string + + addIndex(urn2, aspect1, "/path1", true); // boolean + addIndex(urn2, aspect1, "/path2", 1.534e2); // double + + addIndex(urn3, aspect1, "/path1", true); // boolean + addIndex(urn3, aspect1, "/path2", 1.534e2); // double + addIndex(urn3, aspect1, "/path3", 123.4f); // float + addIndex(urn3, aspect2, "/path4", 123); // int + addIndex(urn3, aspect2, "/path5", 1234L); // long + addIndex(urn3, aspect2, "/path6", "val"); // string + + IndexValue indexValue1 = new IndexValue(); + indexValue1.setBoolean(true); + IndexCriterion criterion1 = new IndexCriterion().setAspect(aspect1).setPathParams(new IndexPathParams().setPath("/path1").setValue(indexValue1)); + IndexValue indexValue2 = new IndexValue(); + indexValue2.setDouble(1.534e2); + IndexCriterion criterion2 = new IndexCriterion().setAspect(aspect1).setPathParams(new IndexPathParams().setPath("/path2").setValue(indexValue2)); + IndexValue indexValue3 = new IndexValue(); + indexValue3.setFloat(123.4f); + IndexCriterion criterion3 = new IndexCriterion().setAspect(aspect1).setPathParams(new IndexPathParams().setPath("/path3").setValue(indexValue3)); + IndexValue indexValue4 = new IndexValue(); + indexValue4.setInt(123); + IndexCriterion criterion4 = new IndexCriterion().setAspect(aspect2).setPathParams(new IndexPathParams().setPath("/path4").setValue(indexValue4)); + IndexValue indexValue5 = new IndexValue(); + indexValue5.setLong(1234L); + IndexCriterion criterion5 = new IndexCriterion().setAspect(aspect2).setPathParams(new IndexPathParams().setPath("/path5").setValue(indexValue5)); + IndexValue indexValue6 = new IndexValue(); + indexValue6.setString("val"); + IndexCriterion criterion6 = new IndexCriterion().setAspect(aspect2).setPathParams(new IndexPathParams().setPath("/path6").setValue(indexValue6)); + + // 1. with two filter conditions + IndexCriterionArray indexCriterionArray1 = new IndexCriterionArray(Arrays.asList(criterion1, criterion2)); + final IndexFilter indexFilter1 = new IndexFilter().setCriteria(indexCriterionArray1); + ListResult urns1 = dao.listUrns(indexFilter1, null, 3); + + assertEquals(urns1.getValues(), Arrays.asList(urn1, urn2, urn3)); + assertEquals(urns1.getTotalCount(), 3); + assertEquals(urns1.getTotalPageCount(), 1); + assertEquals(urns1.getPageSize(), 3); + assertFalse(urns1.isHavingMore()); + + // 2. with two filter conditions, check if LIMIT is working as desired i.e. totalCount is more than the page size + ListResult urns2 = dao.listUrns(indexFilter1, null, 2); + assertEquals(urns2.getValues(), Arrays.asList(urn1, urn2)); + assertEquals(urns2.getTotalCount(), 3); + assertEquals(urns2.getTotalPageCount(), 2); + assertEquals(urns2.getPageSize(), 2); + assertTrue(urns2.isHavingMore()); + + // 3. with six filter conditions covering all different data types that value can take + IndexCriterionArray indexCriterionArray3 = new IndexCriterionArray(Arrays.asList(criterion1, criterion2, criterion3, criterion4, criterion5, criterion6)); + final IndexFilter indexFilter3 = new IndexFilter().setCriteria(indexCriterionArray3); + ListResult urns3 = dao.listUrns(indexFilter3, urn1, 5); + assertEquals(urns3.getValues(), Collections.singletonList(urn3)); + assertEquals(urns3.getTotalCount(), 1); + assertEquals(urns3.getTotalPageCount(), 1); + assertEquals(urns3.getPageSize(), 5); + assertFalse(urns3.isHavingMore()); + } + @Test public void testListUrns() { EbeanLocalDAO dao = new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server); @@ -954,16 +1037,18 @@ public class EbeanLocalDAOTest { assertEquals(dVal, gmaIndexPair.value); // 3. IndexValue pair corresponds to float float fVal = 0.0001f; + double doubleVal = fVal; indexValue.setFloat(fVal); gmaIndexPair = EbeanLocalDAO.getGMAIndexPair(indexValue); assertEquals(EbeanMetadataIndex.DOUBLE_COLUMN, gmaIndexPair.valueType); - assertEquals(fVal, gmaIndexPair.value); + assertEquals(doubleVal, gmaIndexPair.value); // 4. IndexValue pair corresponds to int int iVal = 100; + long longVal = iVal; indexValue.setInt(iVal); gmaIndexPair = EbeanLocalDAO.getGMAIndexPair(indexValue); assertEquals(EbeanMetadataIndex.LONG_COLUMN, gmaIndexPair.valueType); - assertEquals(iVal, gmaIndexPair.value); + assertEquals(longVal, gmaIndexPair.value); // 5. IndexValue pair corresponds to long long lVal = 1L; indexValue.setLong(lVal); @@ -984,14 +1069,15 @@ public class EbeanLocalDAOTest { FooUrn urn1 = makeFooUrn(1); FooUrn urn2 = makeFooUrn(2); FooUrn urn3 = makeFooUrn(3); - addIndex(urn1, "aspect1", "/path1", "val1"); - addIndex(urn1, "aspect1", "/path2", "val2"); - addIndex(urn1, "aspect1", "/path3", "val3"); - addIndex(urn2, "aspect1", "/path1", "val1"); - addIndex(urn3, "aspect1", "/path1", "val1"); + String aspect = "aspect" + System.currentTimeMillis(); + addIndex(urn1, aspect, "/path1", "val1"); + addIndex(urn1, aspect, "/path2", "val2"); + addIndex(urn1, aspect, "/path3", "val3"); + addIndex(urn2, aspect, "/path1", "val1"); + addIndex(urn3, aspect, "/path1", "val1"); // 1. local secondary index is not enabled, should throw exception - IndexCriterion indexCriterion = new IndexCriterion().setAspect("aspect1"); + IndexCriterion indexCriterion = new IndexCriterion().setAspect(aspect); final IndexFilter indexFilter1 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion)); dao.enableLocalSecondaryIndex(false); @@ -1005,15 +1091,12 @@ public class EbeanLocalDAOTest { assertThrows(UnsupportedOperationException.class, () -> dao.listUrns(indexFilter2, null, 2)); - // 3. index criterion array contains more than 1 criterion, should throw an exception - IndexCriterion indexCriterion1 = new IndexCriterion().setAspect("aspect1"); - IndexCriterion indexCriterion2 = new IndexCriterion().setAspect("aspect2"); - final IndexFilter indexFilter3 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion1, indexCriterion2)); - + // 3. index criterion array contains more than 10 criterion, should throw an exception + final IndexFilter indexFilter3 = new IndexFilter().setCriteria(makeIndexCriterionArray(11)); assertThrows(UnsupportedOperationException.class, () -> dao.listUrns(indexFilter3, null, 2)); - // 4. only aspect is provided in Index Filter - indexCriterion = new IndexCriterion().setAspect("aspect1"); + // 3. only aspect and not path or value is provided in Index Filter + indexCriterion = new IndexCriterion().setAspect(aspect); final IndexFilter indexFilter4 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion)); ListResult urns = dao.listUrns(indexFilter4, null, 2); @@ -1025,7 +1108,7 @@ public class EbeanLocalDAOTest { IndexValue indexValue = new IndexValue(); indexValue.setString("val1"); IndexPathParams indexPathParams = new IndexPathParams().setPath("/path1").setValue(indexValue); - indexCriterion = new IndexCriterion().setAspect("aspect1").setPathParams(indexPathParams); + indexCriterion = new IndexCriterion().setAspect(aspect).setPathParams(indexPathParams); final IndexFilter indexFilter5 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion)); urns = dao.listUrns(indexFilter5, urn1, 2); @@ -1035,7 +1118,7 @@ public class EbeanLocalDAOTest { // 6. aspect with correct path but incorrect value indexValue.setString("valX"); indexPathParams = new IndexPathParams().setPath("/path1").setValue(indexValue); - indexCriterion = new IndexCriterion().setAspect("aspect1").setPathParams(indexPathParams); + indexCriterion = new IndexCriterion().setAspect(aspect).setPathParams(indexPathParams); final IndexFilter indexFilter6 = new IndexFilter().setCriteria(new IndexCriterionArray(indexCriterion)); urns = dao.listUrns(indexFilter6, urn1, 2); @@ -1091,12 +1174,26 @@ public class EbeanLocalDAOTest { .findList(); } - private void addIndex(Urn urn, String aspectName, String pathName, String sVal) { + private void addIndex(Urn urn, String aspectName, String pathName, Object val) { EbeanMetadataIndex index = new EbeanMetadataIndex(); index.setUrn(urn.toString()) .setAspect(aspectName) - .setPath(pathName) - .setStringVal(sVal); + .setPath(pathName); + if (val instanceof String) { + index.setStringVal(val.toString()); + } else if (val instanceof Boolean) { + index.setStringVal(String.valueOf(val)); + } else if (val instanceof Double) { + index.setDoubleVal((Double) val); + } else if (val instanceof Float) { + index.setDoubleVal(((Float) val).doubleValue()); + } else if (val instanceof Integer) { + index.setLongVal(Long.valueOf((Integer) val)); + } else if (val instanceof Long) { + index.setLongVal((Long) val); + } else { + return; + } _server.save(index); }