diff --git a/datahub-web-react/src/app/lineage/LineageEntityEdge.tsx b/datahub-web-react/src/app/lineage/LineageEntityEdge.tsx index 27919a8b0e..1505ce71f7 100644 --- a/datahub-web-react/src/app/lineage/LineageEntityEdge.tsx +++ b/datahub-web-react/src/app/lineage/LineageEntityEdge.tsx @@ -1,7 +1,9 @@ import React from 'react'; import { Tooltip } from 'antd'; +import { ClockCircleOutlined, EyeOutlined } from '@ant-design/icons'; import dayjs from 'dayjs'; import LocalizedFormat from 'dayjs/plugin/localizedFormat'; +import styled from 'styled-components'; import { Group } from '@vx/group'; import { curveBasis } from '@vx/curve'; import { LinePath } from '@vx/shape'; @@ -10,6 +12,18 @@ import { ANTD_GRAY } from '../entity/shared/constants'; dayjs.extend(LocalizedFormat); +const EdgeTimestamp = styled.div``; + +const StyledClockCircleOutlined = styled(ClockCircleOutlined)` + margin-right: 4px; + font-size: 14px; +`; + +const StyledEyeOutlined = styled(EyeOutlined)` + margin-right: 4px; + font-size: 14px; +`; + type Props = { edge: VizEdge; key: string; @@ -18,24 +32,31 @@ type Props = { export default function LineageEntityEdge({ edge, key, isHighlighted }: Props) { const createdOnTimestamp = edge?.createdOn; - const updatedOnTimestamp = - createdOnTimestamp && edge?.updatedOn && edge?.updatedOn > createdOnTimestamp - ? edge?.updatedOn - : createdOnTimestamp; - const createdOn: string = createdOnTimestamp ? dayjs(createdOnTimestamp).format('ll') : 'unknown'; - const updatedOn: string = updatedOnTimestamp ? dayjs(updatedOnTimestamp).format('ll') : 'unknown'; + const updatedOnTimestamp = edge?.updatedOn; + const createdOn = createdOnTimestamp ? dayjs(createdOnTimestamp).format('ll') : undefined; + const updatedOn = updatedOnTimestamp ? dayjs(updatedOnTimestamp).format('ll') : undefined; + const hasTimestamps = createdOn || updatedOn; const isManual = edge?.isManual; return ( <> - Created: {createdOn} -
- Last Observed: {updatedOn} - + (hasTimestamps && ( + <> + {createdOn && ( + + Created {isManual && 'manually '}on {createdOn} + + )} + {updatedOn && !isManual && ( + + Last observed on {updatedOn} + + )} + + )) || + undefined } > diff --git a/datahub-web-react/src/app/lineage/LineageEntityNode.tsx b/datahub-web-react/src/app/lineage/LineageEntityNode.tsx index 1032d16460..5b39d6aaf1 100644 --- a/datahub-web-react/src/app/lineage/LineageEntityNode.tsx +++ b/datahub-web-react/src/app/lineage/LineageEntityNode.tsx @@ -16,6 +16,7 @@ import { centerX, centerY, iconHeight, iconWidth, iconX, iconY, textX, width } f import LineageEntityColumns from './LineageEntityColumns'; import { convertInputFieldsToSchemaFields } from './utils/columnLineageUtils'; import ManageLineageMenu from './manage/ManageLineageMenu'; +import { useGetLineageTimeParams } from './utils/useGetLineageTimeParams'; const CLICK_DELAY_THRESHOLD = 1000; const DRAG_DISTANCE_THRESHOLD = 20; @@ -62,6 +63,7 @@ export default function LineageEntityNode({ }) { const { direction } = node; const { expandTitles, collapsedColumnsNodes, showColumns, refetchCenterNode } = useContext(LineageExplorerContext); + const { startTimeMillis, endTimeMillis } = useGetLineageTimeParams(); const [hasExpanded, setHasExpanded] = useState(false); const [isExpanding, setIsExpanding] = useState(false); const [expandHover, setExpandHover] = useState(false); @@ -76,7 +78,13 @@ export default function LineageEntityNode({ } else { // update non-center node using onExpandClick in useEffect below getAsyncEntityLineage({ - variables: { urn: node.data.urn, separateSiblings: isHideSiblingMode, showColumns }, + variables: { + urn: node.data.urn, + separateSiblings: isHideSiblingMode, + showColumns, + startTimeMillis, + endTimeMillis, + }, }); setTimeout(() => setHasExpanded(false), 0); } @@ -160,7 +168,13 @@ export default function LineageEntityNode({ if (node.data.urn && node.data.type) { // getAsyncEntity(node.data.urn, node.data.type); getAsyncEntityLineage({ - variables: { urn: node.data.urn, separateSiblings: isHideSiblingMode, showColumns }, + variables: { + urn: node.data.urn, + separateSiblings: isHideSiblingMode, + showColumns, + startTimeMillis, + endTimeMillis, + }, }); } }} diff --git a/datahub-web-react/src/graphql/lineage.graphql b/datahub-web-react/src/graphql/lineage.graphql index b18a2d513d..82f396c96f 100644 --- a/datahub-web-react/src/graphql/lineage.graphql +++ b/datahub-web-react/src/graphql/lineage.graphql @@ -247,48 +247,52 @@ fragment lineageFields on EntityWithRelationships { } } +fragment lineageRelationshipFields on LineageRelationship { + type + createdOn + createdActor { + urn + type + ... on CorpUser { + username + info { + displayName + } + properties { + displayName + } + editableProperties { + displayName + } + } + } + updatedOn + updatedActor { + urn + type + ... on CorpUser { + username + info { + displayName + } + properties { + displayName + } + editableProperties { + displayName + } + } + } + isManual +} + fragment fullLineageResults on EntityLineageResult { start count total filtered relationships { - type - createdOn - createdActor { - urn - type - ... on CorpUser { - username - info { - displayName - } - properties { - displayName - } - editableProperties { - displayName - } - } - } - updatedOn - updatedActor { - urn - type - ... on CorpUser { - username - info { - displayName - } - properties { - displayName - } - editableProperties { - displayName - } - } - } - isManual + ...lineageRelationshipFields entity { ...lineageFields ... on Dataset { @@ -311,7 +315,7 @@ fragment leafLineageResults on EntityLineageResult { total filtered relationships { - type + ...lineageRelationshipFields entity { urn type diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index b084c110a8..79590117fd 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -2,6 +2,7 @@ package com.linkedin.metadata.graph.elastic; import com.codahale.metrics.Timer; import com.datahub.util.exception.ESQueryException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.linkedin.common.UrnArray; @@ -69,16 +70,16 @@ public class ESGraphQueryDAO { private static final int MAX_ELASTIC_RESULT = 10000; private static final int BATCH_SIZE = 1000; private static final int TIMEOUT_SECS = 10; - private static final String SOURCE = "source"; - private static final String DESTINATION = "destination"; - private static final String RELATIONSHIP_TYPE = "relationshipType"; - private static final String SEARCH_EXECUTIONS_METRIC = "num_elasticSearch_reads"; - private static final String CREATED_ON = "createdOn"; - private static final String CREATED_ACTOR = "createdActor"; - private static final String UPDATED_ON = "updatedOn"; - private static final String UPDATED_ACTOR = "updatedActor"; - private static final String PROPERTIES = "properties"; - private static final String UI = "UI"; + static final String SOURCE = "source"; + static final String DESTINATION = "destination"; + static final String RELATIONSHIP_TYPE = "relationshipType"; + static final String SEARCH_EXECUTIONS_METRIC = "num_elasticSearch_reads"; + static final String CREATED_ON = "createdOn"; + static final String CREATED_ACTOR = "createdActor"; + static final String UPDATED_ON = "updatedOn"; + static final String UPDATED_ACTOR = "updatedActor"; + static final String PROPERTIES = "properties"; + static final String UI = "UI"; @Nonnull public static void addFilterToQueryBuilder(@Nonnull Filter filter, String node, BoolQueryBuilder rootQuery) { @@ -435,26 +436,12 @@ public class ESGraphQueryDAO { return relationship; } - BoolQueryBuilder getOutGoingEdgeQuery(List urns, List outgoingEdges, GraphFilters graphFilters) { - BoolQueryBuilder outgoingEdgeQuery = QueryBuilders.boolQuery(); - outgoingEdgeQuery.must(buildUrnFilters(urns, SOURCE)); - outgoingEdgeQuery.must(buildEdgeFilters(outgoingEdges)); - outgoingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE)); - outgoingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION)); - return outgoingEdgeQuery; - } - - BoolQueryBuilder getIncomingEdgeQuery(List urns, List incomingEdges, GraphFilters graphFilters) { - BoolQueryBuilder incomingEdgeQuery = QueryBuilders.boolQuery(); - incomingEdgeQuery.must(buildUrnFilters(urns, DESTINATION)); - incomingEdgeQuery.must(buildEdgeFilters(incomingEdges)); - incomingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE)); - incomingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION)); - return incomingEdgeQuery; - } - // Get search query for given list of edges and source urns - public QueryBuilder getQueryForLineage(List urns, List lineageEdges, GraphFilters graphFilters, + @VisibleForTesting + static QueryBuilder getQueryForLineage( + @Nonnull List urns, + @Nonnull List lineageEdges, + @Nonnull GraphFilters graphFilters, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis) { BoolQueryBuilder query = QueryBuilders.boolQuery(); @@ -476,63 +463,57 @@ public class ESGraphQueryDAO { query.should(getIncomingEdgeQuery(urns, incomingEdges, graphFilters)); } - // Add time range filters - if (startTimeMillis != null) { - query.must(buildStartTimeFilter(startTimeMillis)); - } - if (endTimeMillis != null) { - query.must(buildEndTimeFilter(endTimeMillis)); + /* + * Optional - Add edge filtering based on time windows. + */ + if (startTimeMillis != null && endTimeMillis != null) { + query.must(TimeFilterUtils.getEdgeTimeFilterQuery(startTimeMillis, endTimeMillis)); + } else { + log.debug(String.format( + "Empty time filter range provided: start time %s, end time: %s. Skipping application of time filters", + startTimeMillis, + endTimeMillis)); } return query; } - public QueryBuilder buildEntityTypesFilter(List entityTypes, String prefix) { + private static BoolQueryBuilder getOutGoingEdgeQuery( + @Nonnull List urns, + @Nonnull List outgoingEdges, + @Nonnull GraphFilters graphFilters) { + BoolQueryBuilder outgoingEdgeQuery = QueryBuilders.boolQuery(); + outgoingEdgeQuery.must(buildUrnFilters(urns, SOURCE)); + outgoingEdgeQuery.must(buildEdgeFilters(outgoingEdges)); + outgoingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE)); + outgoingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION)); + return outgoingEdgeQuery; + } + + private static BoolQueryBuilder getIncomingEdgeQuery( + @Nonnull List urns, List incomingEdges, + @Nonnull GraphFilters graphFilters) { + BoolQueryBuilder incomingEdgeQuery = QueryBuilders.boolQuery(); + incomingEdgeQuery.must(buildUrnFilters(urns, DESTINATION)); + incomingEdgeQuery.must(buildEdgeFilters(incomingEdges)); + incomingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE)); + incomingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION)); + return incomingEdgeQuery; + } + + private static QueryBuilder buildEntityTypesFilter(@Nonnull List entityTypes, @Nonnull String prefix) { return QueryBuilders.termsQuery(prefix + ".entityType", entityTypes.stream().map(Object::toString).collect(Collectors.toList())); } - public QueryBuilder buildUrnFilters(List urns, String prefix) { + private static QueryBuilder buildUrnFilters(@Nonnull List urns, @Nonnull String prefix) { return QueryBuilders.termsQuery(prefix + ".urn", urns.stream().map(Object::toString).collect(Collectors.toList())); } - public QueryBuilder buildEdgeFilters(List edgeInfos) { + private static QueryBuilder buildEdgeFilters(@Nonnull List edgeInfos) { return QueryBuilders.termsQuery("relationshipType", edgeInfos.stream().map(EdgeInfo::getType).distinct().collect(Collectors.toList())); } - public QueryBuilder buildExistenceFilter() { - final BoolQueryBuilder boolExistenceBuilder = QueryBuilders.boolQuery(); - boolExistenceBuilder.mustNot(QueryBuilders.existsQuery(CREATED_ON)); - boolExistenceBuilder.mustNot(QueryBuilders.existsQuery(UPDATED_ON)); - return boolExistenceBuilder; - } - - public QueryBuilder buildManualLineageFilter() { - return QueryBuilders.termQuery(String.format("%s.%s", PROPERTIES, SOURCE), UI); - } - - public QueryBuilder buildStartTimeFilter(@Nonnull final Long startTimeMillis) { - final BoolQueryBuilder startTimeQuery = QueryBuilders.boolQuery(); - startTimeQuery.should(QueryBuilders.rangeQuery(UPDATED_ON).gte(startTimeMillis)); - // Secondary check in case we only have createdOn - startTimeQuery.should(QueryBuilders.rangeQuery(CREATED_ON).gte(startTimeMillis)); - // If both createdOn and updatedOn are not present, then we should include the edge - startTimeQuery.should(buildExistenceFilter()); - // If the edge is a manual lineage edge, then we should include the edge - startTimeQuery.should(buildManualLineageFilter()); - return startTimeQuery; - } - - public QueryBuilder buildEndTimeFilter(@Nonnull final Long endTimeMillis) { - final BoolQueryBuilder endTimeQuery = QueryBuilders.boolQuery(); - endTimeQuery.should(QueryBuilders.rangeQuery(CREATED_ON).lte(endTimeMillis)); - // If both createdOn and updatedOn are not present, then we should include the edge - endTimeQuery.should(buildExistenceFilter()); - // If the edge is a manual lineage edge, then we should include the edge - endTimeQuery.should(buildManualLineageFilter()); - return endTimeQuery; - } - @Value public static class LineageResponse { int total; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/TimeFilterUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/TimeFilterUtils.java new file mode 100644 index 0000000000..66422c5997 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/TimeFilterUtils.java @@ -0,0 +1,145 @@ +package com.linkedin.metadata.graph.elastic; + +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; + +import static com.linkedin.metadata.graph.elastic.ESGraphQueryDAO.*; + +@Slf4j +public class TimeFilterUtils { + + /** + * In order to filter for edges that fall into a specific filter window, we perform a range-overlap query. + * Note that both a start time and an end time must be provided in order to add the filters. + * + * A range overlap query compares 2 time windows for ANY overlap. This essentially equates to a union operation. + * Each window is characterized by 2 points in time: a start time (e.g. created time of the edge) and an end time + * (e.g. last updated time of an edge). + * + * @param startTimeMillis the start of the time filter window + * @param endTimeMillis the end of the time filter window + */ + public static QueryBuilder getEdgeTimeFilterQuery(final long startTimeMillis, final long endTimeMillis) { + log.debug(String.format("Adding edge time filters for start time: %s, end time: %s", startTimeMillis, endTimeMillis)); + /* + * One of the following must be true in order for the edge to be returned (should = OR) + * + * 1. The start and end time window should overlap with the createdOn updatedOn window. + * 2. The createdOn and updatedOn window does not exist on the edge at all (support legacy cases) + * 3. Special lineage case: The edge is marked as a "manual" edge, meaning that the time filters should NOT be applied. + */ + BoolQueryBuilder timeFilterQuery = QueryBuilders.boolQuery(); + timeFilterQuery.should(buildTimeWindowFilter(startTimeMillis, endTimeMillis)); + timeFilterQuery.should(buildTimestampsMissingFilter()); + timeFilterQuery.should(buildManualLineageFilter()); + return timeFilterQuery; + } + + /** + * Builds a filter that compares 2 windows on a timeline and returns true for any overlap. This logic + * is a bit tricky so change with caution. + * + * The first window comes from start time and end time provided by the user. + * The second window comes from the createdOn and updatedOn timestamps present on graph edges. + * + * Also accounts for the case where createdOn or updatedOn is MISSING, and in such cases performs + * a point overlap instead of a range overlap. + * + * Range Examples: + * + * start time -> end time |-----| + * createdOn -> updatedOn |-----| + * + * = true + * + * start time -> end time |------| + * createdOn -> updatedOn |--| + * + * = true + * + * start time -> end time |-----| + * createdOn -> updatedOn |-----| + * + * = true + * + * start time -> end time |-----| + * createdOn -> updatedOn |-----| + * + * = false + * + * + * Point Examples: + * + * start time -> end time |-----| + * updatedOn | + * + * = true + * + * start time -> end time |-----| + * updatedOn | + * + * = false + * + * and same for createdOn. + * + * Assumptions are that startTimeMillis is always before or equal to endTimeMillis, + * and createdOn is always before or equal to updatedOn. + * + * @param startTimeMillis the start time of the window in milliseconds + * @param endTimeMillis the end time of the window in milliseconds + * + * @return Query Builder with time window filters appended. + */ + private static QueryBuilder buildTimeWindowFilter(final long startTimeMillis, final long endTimeMillis) { + final BoolQueryBuilder timeWindowQuery = QueryBuilders.boolQuery(); + + /* + * To perform comparison: + * + * If either createdOn or updatedOn time point falls into the startTime->endTime window, + * the edge should be included. + * + * We also verify that the field actually exists (non-null). + */ + + // Build filter comparing createdOn time to startTime->endTime window. + BoolQueryBuilder createdOnFilter = QueryBuilders.boolQuery(); + createdOnFilter.must(QueryBuilders.existsQuery(CREATED_ON)); + createdOnFilter.must(QueryBuilders.rangeQuery(CREATED_ON).gte(startTimeMillis).lte(endTimeMillis)); + + // Build filter comparing updatedOn time to startTime->endTime window. + BoolQueryBuilder updatedOnFilter = QueryBuilders.boolQuery(); + updatedOnFilter.must(QueryBuilders.existsQuery(UPDATED_ON)); + updatedOnFilter.must(QueryBuilders.rangeQuery(UPDATED_ON).gte(startTimeMillis).lte(endTimeMillis)); + + // Now - OR the 2 point comparison conditions together. + timeWindowQuery.should(createdOnFilter); + timeWindowQuery.should(updatedOnFilter); + return timeWindowQuery; + } + + private static QueryBuilder buildTimestampsMissingFilter() { + // If both createdOn and updatedOn do NOT EXIST (either are null or 0), then + // return the edge. + final BoolQueryBuilder boolExistenceBuilder = QueryBuilders.boolQuery(); + boolExistenceBuilder.must(buildNotExistsFilter(CREATED_ON)); + boolExistenceBuilder.must(buildNotExistsFilter(UPDATED_ON)); + return boolExistenceBuilder; + } + + private static QueryBuilder buildNotExistsFilter(String fieldName) { + // This filter returns 'true' if the field DOES NOT EXIST or it exists but is equal to 0. + final BoolQueryBuilder notExistsFilter = QueryBuilders.boolQuery(); + notExistsFilter.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(fieldName))); + notExistsFilter.should(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(fieldName, 0L))); + return notExistsFilter; + } + + private static QueryBuilder buildManualLineageFilter() { + return QueryBuilders.termQuery(String.format("%s.%s", PROPERTIES, SOURCE), UI); + } + + private TimeFilterUtils() { } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java index 1b3b7aa707..fabf26b489 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java @@ -264,6 +264,7 @@ public class ESUtils { public static String toKeywordField(@Nonnull final String filterField, @Nonnull final boolean skipKeywordSuffix) { return skipKeywordSuffix || "urn".equals(filterField) + || "runId".equals(filterField) || filterField.contains(".") ? filterField : filterField + ESUtils.KEYWORD_SUFFIX; } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java new file mode 100644 index 0000000000..a51ffea0e9 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java @@ -0,0 +1,46 @@ +package com.linkedin.metadata.graph.elastic; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.Resources; +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.graph.GraphFilters; +import com.linkedin.metadata.models.registry.LineageRegistry; +import com.linkedin.metadata.query.filter.RelationshipDirection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.elasticsearch.index.query.QueryBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ESGraphQueryDAOTest { + + private static final String TEST_QUERY_FILE = "elasticsearch/sample_filters/lineage_query_filters_1.json"; + + @Test + private static void testGetQueryForLineageFullArguments() throws Exception { + + URL url = Resources.getResource(TEST_QUERY_FILE); + String expectedQuery = Resources.toString(url, StandardCharsets.UTF_8); + + List urns = new ArrayList<>(); + List edgeInfos = new ArrayList<>(ImmutableList.of( + new LineageRegistry.EdgeInfo("DownstreamOf", RelationshipDirection.INCOMING, Constants.DATASET_ENTITY_NAME) + )); + GraphFilters graphFilters = new GraphFilters(ImmutableList.of(Constants.DATASET_ENTITY_NAME)); + Long startTime = 0L; + Long endTime = 1L; + + QueryBuilder builder = ESGraphQueryDAO.getQueryForLineage( + urns, + edgeInfos, + graphFilters, + startTime, + endTime + ); + + Assert.assertEquals(builder.toString(), expectedQuery); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/TimeFilterUtilsTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/TimeFilterUtilsTest.java new file mode 100644 index 0000000000..988a7ccc70 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/TimeFilterUtilsTest.java @@ -0,0 +1,22 @@ +package com.linkedin.metadata.graph.elastic; + +import com.google.common.io.Resources; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import org.elasticsearch.index.query.QueryBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TimeFilterUtilsTest { + + private static final String TEST_QUERY_FILE = "elasticsearch/sample_filters/lineage_time_query_filters_1.json"; + @Test + private static void testGetEdgeTimeFilterQuery() throws Exception { + URL url = Resources.getResource(TEST_QUERY_FILE); + String expectedQuery = Resources.toString(url, StandardCharsets.UTF_8); + long startTime = 1L; + long endTime = 2L; + QueryBuilder result = TimeFilterUtils.getEdgeTimeFilterQuery(startTime, endTime); + Assert.assertEquals(result.toString(), expectedQuery); + } +} diff --git a/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_1.json b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_1.json new file mode 100644 index 0000000000..eb84638f0c --- /dev/null +++ b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_query_filters_1.json @@ -0,0 +1,206 @@ +{ + "bool" : { + "must" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "must" : [ + { + "exists" : { + "field" : "createdOn", + "boost" : 1.0 + } + }, + { + "range" : { + "createdOn" : { + "from" : 0, + "to" : 1, + "include_lower" : true, + "include_upper" : true, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "exists" : { + "field" : "updatedOn", + "boost" : 1.0 + } + }, + { + "range" : { + "updatedOn" : { + "from" : 0, + "to" : 1, + "include_lower" : true, + "include_upper" : true, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "must_not" : [ + { + "exists" : { + "field" : "createdOn", + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "term" : { + "createdOn" : { + "value" : 0, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "should" : [ + { + "bool" : { + "must_not" : [ + { + "exists" : { + "field" : "updatedOn", + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "term" : { + "updatedOn" : { + "value" : 0, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "term" : { + "properties.source" : { + "value" : "UI", + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "should" : [ + { + "bool" : { + "must" : [ + { + "terms" : { + "destination.urn" : [ ], + "boost" : 1.0 + } + }, + { + "terms" : { + "relationshipType" : [ + "DownstreamOf" + ], + "boost" : 1.0 + } + }, + { + "terms" : { + "source.entityType" : [ + "dataset" + ], + "boost" : 1.0 + } + }, + { + "terms" : { + "destination.entityType" : [ + "dataset" + ], + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } +} \ No newline at end of file diff --git a/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_time_query_filters_1.json b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_time_query_filters_1.json new file mode 100644 index 0000000000..327f1d4ff9 --- /dev/null +++ b/metadata-io/src/test/resources/elasticsearch/sample_filters/lineage_time_query_filters_1.json @@ -0,0 +1,158 @@ +{ + "bool" : { + "should" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "must" : [ + { + "exists" : { + "field" : "createdOn", + "boost" : 1.0 + } + }, + { + "range" : { + "createdOn" : { + "from" : 1, + "to" : 2, + "include_lower" : true, + "include_upper" : true, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "exists" : { + "field" : "updatedOn", + "boost" : 1.0 + } + }, + { + "range" : { + "updatedOn" : { + "from" : 1, + "to" : 2, + "include_lower" : true, + "include_upper" : true, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "bool" : { + "should" : [ + { + "bool" : { + "must_not" : [ + { + "exists" : { + "field" : "createdOn", + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "term" : { + "createdOn" : { + "value" : 0, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "should" : [ + { + "bool" : { + "must_not" : [ + { + "exists" : { + "field" : "updatedOn", + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "bool" : { + "must" : [ + { + "term" : { + "updatedOn" : { + "value" : 0, + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } + }, + { + "term" : { + "properties.source" : { + "value" : "UI", + "boost" : 1.0 + } + } + } + ], + "adjust_pure_negative" : true, + "boost" : 1.0 + } +} \ No newline at end of file diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/GraphIndexUtils.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/GraphIndexUtils.java index d270a3c837..6ab7a4b766 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/GraphIndexUtils.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/GraphIndexUtils.java @@ -138,10 +138,14 @@ public class GraphIndexUtils { systemMetadata = event.hasPreviousSystemMetadata() ? event.getPreviousSystemMetadata() : null; } - if (createdOn == null && systemMetadata != null) { + if ((createdOn == null || createdOn == 0) && systemMetadata != null) { createdOn = systemMetadata.getLastObserved(); + } + + if ((updatedOn == null || updatedOn == 0) && systemMetadata != null) { updatedOn = systemMetadata.getLastObserved(); } + if (createdActor == null && event.hasCreated()) { createdActor = event.getCreated().getActor(); updatedActor = event.getCreated().getActor();