mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-02 13:53:06 +00:00
fix(lineage): Fixing Timeline Lineage Filters (#7435)
This commit is contained in:
parent
77d072b522
commit
d54e3b81cf
@ -1,7 +1,9 @@
|
||||
import React from 'react';
|
||||
import { Tooltip } from 'antd';
|
||||
import { ClockCircleOutlined, EyeOutlined } from '@ant-design/icons';
|
||||
import dayjs from 'dayjs';
|
||||
import LocalizedFormat from 'dayjs/plugin/localizedFormat';
|
||||
import styled from 'styled-components';
|
||||
import { Group } from '@vx/group';
|
||||
import { curveBasis } from '@vx/curve';
|
||||
import { LinePath } from '@vx/shape';
|
||||
@ -10,6 +12,18 @@ import { ANTD_GRAY } from '../entity/shared/constants';
|
||||
|
||||
dayjs.extend(LocalizedFormat);
|
||||
|
||||
const EdgeTimestamp = styled.div``;
|
||||
|
||||
const StyledClockCircleOutlined = styled(ClockCircleOutlined)`
|
||||
margin-right: 4px;
|
||||
font-size: 14px;
|
||||
`;
|
||||
|
||||
const StyledEyeOutlined = styled(EyeOutlined)`
|
||||
margin-right: 4px;
|
||||
font-size: 14px;
|
||||
`;
|
||||
|
||||
type Props = {
|
||||
edge: VizEdge;
|
||||
key: string;
|
||||
@ -18,24 +32,31 @@ type Props = {
|
||||
|
||||
export default function LineageEntityEdge({ edge, key, isHighlighted }: Props) {
|
||||
const createdOnTimestamp = edge?.createdOn;
|
||||
const updatedOnTimestamp =
|
||||
createdOnTimestamp && edge?.updatedOn && edge?.updatedOn > createdOnTimestamp
|
||||
? edge?.updatedOn
|
||||
: createdOnTimestamp;
|
||||
const createdOn: string = createdOnTimestamp ? dayjs(createdOnTimestamp).format('ll') : 'unknown';
|
||||
const updatedOn: string = updatedOnTimestamp ? dayjs(updatedOnTimestamp).format('ll') : 'unknown';
|
||||
const updatedOnTimestamp = edge?.updatedOn;
|
||||
const createdOn = createdOnTimestamp ? dayjs(createdOnTimestamp).format('ll') : undefined;
|
||||
const updatedOn = updatedOnTimestamp ? dayjs(updatedOnTimestamp).format('ll') : undefined;
|
||||
const hasTimestamps = createdOn || updatedOn;
|
||||
const isManual = edge?.isManual;
|
||||
|
||||
return (
|
||||
<>
|
||||
<Tooltip
|
||||
arrowPointAtCenter
|
||||
title={
|
||||
(hasTimestamps && (
|
||||
<>
|
||||
Created: {createdOn}
|
||||
<br />
|
||||
Last Observed: {updatedOn}
|
||||
{createdOn && (
|
||||
<EdgeTimestamp>
|
||||
<StyledClockCircleOutlined /> Created {isManual && 'manually '}on {createdOn}
|
||||
</EdgeTimestamp>
|
||||
)}
|
||||
{updatedOn && !isManual && (
|
||||
<EdgeTimestamp>
|
||||
<StyledEyeOutlined /> Last observed on {updatedOn}
|
||||
</EdgeTimestamp>
|
||||
)}
|
||||
</>
|
||||
)) ||
|
||||
undefined
|
||||
}
|
||||
>
|
||||
<Group key={key}>
|
||||
|
@ -16,6 +16,7 @@ import { centerX, centerY, iconHeight, iconWidth, iconX, iconY, textX, width } f
|
||||
import LineageEntityColumns from './LineageEntityColumns';
|
||||
import { convertInputFieldsToSchemaFields } from './utils/columnLineageUtils';
|
||||
import ManageLineageMenu from './manage/ManageLineageMenu';
|
||||
import { useGetLineageTimeParams } from './utils/useGetLineageTimeParams';
|
||||
|
||||
const CLICK_DELAY_THRESHOLD = 1000;
|
||||
const DRAG_DISTANCE_THRESHOLD = 20;
|
||||
@ -62,6 +63,7 @@ export default function LineageEntityNode({
|
||||
}) {
|
||||
const { direction } = node;
|
||||
const { expandTitles, collapsedColumnsNodes, showColumns, refetchCenterNode } = useContext(LineageExplorerContext);
|
||||
const { startTimeMillis, endTimeMillis } = useGetLineageTimeParams();
|
||||
const [hasExpanded, setHasExpanded] = useState(false);
|
||||
const [isExpanding, setIsExpanding] = useState(false);
|
||||
const [expandHover, setExpandHover] = useState(false);
|
||||
@ -76,7 +78,13 @@ export default function LineageEntityNode({
|
||||
} else {
|
||||
// update non-center node using onExpandClick in useEffect below
|
||||
getAsyncEntityLineage({
|
||||
variables: { urn: node.data.urn, separateSiblings: isHideSiblingMode, showColumns },
|
||||
variables: {
|
||||
urn: node.data.urn,
|
||||
separateSiblings: isHideSiblingMode,
|
||||
showColumns,
|
||||
startTimeMillis,
|
||||
endTimeMillis,
|
||||
},
|
||||
});
|
||||
setTimeout(() => setHasExpanded(false), 0);
|
||||
}
|
||||
@ -160,7 +168,13 @@ export default function LineageEntityNode({
|
||||
if (node.data.urn && node.data.type) {
|
||||
// getAsyncEntity(node.data.urn, node.data.type);
|
||||
getAsyncEntityLineage({
|
||||
variables: { urn: node.data.urn, separateSiblings: isHideSiblingMode, showColumns },
|
||||
variables: {
|
||||
urn: node.data.urn,
|
||||
separateSiblings: isHideSiblingMode,
|
||||
showColumns,
|
||||
startTimeMillis,
|
||||
endTimeMillis,
|
||||
},
|
||||
});
|
||||
}
|
||||
}}
|
||||
|
@ -247,12 +247,7 @@ fragment lineageFields on EntityWithRelationships {
|
||||
}
|
||||
}
|
||||
|
||||
fragment fullLineageResults on EntityLineageResult {
|
||||
start
|
||||
count
|
||||
total
|
||||
filtered
|
||||
relationships {
|
||||
fragment lineageRelationshipFields on LineageRelationship {
|
||||
type
|
||||
createdOn
|
||||
createdActor {
|
||||
@ -289,6 +284,15 @@ fragment fullLineageResults on EntityLineageResult {
|
||||
}
|
||||
}
|
||||
isManual
|
||||
}
|
||||
|
||||
fragment fullLineageResults on EntityLineageResult {
|
||||
start
|
||||
count
|
||||
total
|
||||
filtered
|
||||
relationships {
|
||||
...lineageRelationshipFields
|
||||
entity {
|
||||
...lineageFields
|
||||
... on Dataset {
|
||||
@ -311,7 +315,7 @@ fragment leafLineageResults on EntityLineageResult {
|
||||
total
|
||||
filtered
|
||||
relationships {
|
||||
type
|
||||
...lineageRelationshipFields
|
||||
entity {
|
||||
urn
|
||||
type
|
||||
|
@ -2,6 +2,7 @@ package com.linkedin.metadata.graph.elastic;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.datahub.util.exception.ESQueryException;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.linkedin.common.UrnArray;
|
||||
@ -69,16 +70,16 @@ public class ESGraphQueryDAO {
|
||||
private static final int MAX_ELASTIC_RESULT = 10000;
|
||||
private static final int BATCH_SIZE = 1000;
|
||||
private static final int TIMEOUT_SECS = 10;
|
||||
private static final String SOURCE = "source";
|
||||
private static final String DESTINATION = "destination";
|
||||
private static final String RELATIONSHIP_TYPE = "relationshipType";
|
||||
private static final String SEARCH_EXECUTIONS_METRIC = "num_elasticSearch_reads";
|
||||
private static final String CREATED_ON = "createdOn";
|
||||
private static final String CREATED_ACTOR = "createdActor";
|
||||
private static final String UPDATED_ON = "updatedOn";
|
||||
private static final String UPDATED_ACTOR = "updatedActor";
|
||||
private static final String PROPERTIES = "properties";
|
||||
private static final String UI = "UI";
|
||||
static final String SOURCE = "source";
|
||||
static final String DESTINATION = "destination";
|
||||
static final String RELATIONSHIP_TYPE = "relationshipType";
|
||||
static final String SEARCH_EXECUTIONS_METRIC = "num_elasticSearch_reads";
|
||||
static final String CREATED_ON = "createdOn";
|
||||
static final String CREATED_ACTOR = "createdActor";
|
||||
static final String UPDATED_ON = "updatedOn";
|
||||
static final String UPDATED_ACTOR = "updatedActor";
|
||||
static final String PROPERTIES = "properties";
|
||||
static final String UI = "UI";
|
||||
|
||||
@Nonnull
|
||||
public static void addFilterToQueryBuilder(@Nonnull Filter filter, String node, BoolQueryBuilder rootQuery) {
|
||||
@ -435,26 +436,12 @@ public class ESGraphQueryDAO {
|
||||
return relationship;
|
||||
}
|
||||
|
||||
BoolQueryBuilder getOutGoingEdgeQuery(List<Urn> urns, List<EdgeInfo> outgoingEdges, GraphFilters graphFilters) {
|
||||
BoolQueryBuilder outgoingEdgeQuery = QueryBuilders.boolQuery();
|
||||
outgoingEdgeQuery.must(buildUrnFilters(urns, SOURCE));
|
||||
outgoingEdgeQuery.must(buildEdgeFilters(outgoingEdges));
|
||||
outgoingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE));
|
||||
outgoingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION));
|
||||
return outgoingEdgeQuery;
|
||||
}
|
||||
|
||||
BoolQueryBuilder getIncomingEdgeQuery(List<Urn> urns, List<EdgeInfo> incomingEdges, GraphFilters graphFilters) {
|
||||
BoolQueryBuilder incomingEdgeQuery = QueryBuilders.boolQuery();
|
||||
incomingEdgeQuery.must(buildUrnFilters(urns, DESTINATION));
|
||||
incomingEdgeQuery.must(buildEdgeFilters(incomingEdges));
|
||||
incomingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE));
|
||||
incomingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION));
|
||||
return incomingEdgeQuery;
|
||||
}
|
||||
|
||||
// Get search query for given list of edges and source urns
|
||||
public QueryBuilder getQueryForLineage(List<Urn> urns, List<EdgeInfo> lineageEdges, GraphFilters graphFilters,
|
||||
@VisibleForTesting
|
||||
static QueryBuilder getQueryForLineage(
|
||||
@Nonnull List<Urn> urns,
|
||||
@Nonnull List<EdgeInfo> lineageEdges,
|
||||
@Nonnull GraphFilters graphFilters,
|
||||
@Nullable Long startTimeMillis,
|
||||
@Nullable Long endTimeMillis) {
|
||||
BoolQueryBuilder query = QueryBuilders.boolQuery();
|
||||
@ -476,63 +463,57 @@ public class ESGraphQueryDAO {
|
||||
query.should(getIncomingEdgeQuery(urns, incomingEdges, graphFilters));
|
||||
}
|
||||
|
||||
// Add time range filters
|
||||
if (startTimeMillis != null) {
|
||||
query.must(buildStartTimeFilter(startTimeMillis));
|
||||
}
|
||||
if (endTimeMillis != null) {
|
||||
query.must(buildEndTimeFilter(endTimeMillis));
|
||||
/*
|
||||
* Optional - Add edge filtering based on time windows.
|
||||
*/
|
||||
if (startTimeMillis != null && endTimeMillis != null) {
|
||||
query.must(TimeFilterUtils.getEdgeTimeFilterQuery(startTimeMillis, endTimeMillis));
|
||||
} else {
|
||||
log.debug(String.format(
|
||||
"Empty time filter range provided: start time %s, end time: %s. Skipping application of time filters",
|
||||
startTimeMillis,
|
||||
endTimeMillis));
|
||||
}
|
||||
|
||||
return query;
|
||||
}
|
||||
|
||||
public QueryBuilder buildEntityTypesFilter(List<String> entityTypes, String prefix) {
|
||||
private static BoolQueryBuilder getOutGoingEdgeQuery(
|
||||
@Nonnull List<Urn> urns,
|
||||
@Nonnull List<EdgeInfo> outgoingEdges,
|
||||
@Nonnull GraphFilters graphFilters) {
|
||||
BoolQueryBuilder outgoingEdgeQuery = QueryBuilders.boolQuery();
|
||||
outgoingEdgeQuery.must(buildUrnFilters(urns, SOURCE));
|
||||
outgoingEdgeQuery.must(buildEdgeFilters(outgoingEdges));
|
||||
outgoingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE));
|
||||
outgoingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION));
|
||||
return outgoingEdgeQuery;
|
||||
}
|
||||
|
||||
private static BoolQueryBuilder getIncomingEdgeQuery(
|
||||
@Nonnull List<Urn> urns, List<EdgeInfo> incomingEdges,
|
||||
@Nonnull GraphFilters graphFilters) {
|
||||
BoolQueryBuilder incomingEdgeQuery = QueryBuilders.boolQuery();
|
||||
incomingEdgeQuery.must(buildUrnFilters(urns, DESTINATION));
|
||||
incomingEdgeQuery.must(buildEdgeFilters(incomingEdges));
|
||||
incomingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE));
|
||||
incomingEdgeQuery.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION));
|
||||
return incomingEdgeQuery;
|
||||
}
|
||||
|
||||
private static QueryBuilder buildEntityTypesFilter(@Nonnull List<String> entityTypes, @Nonnull String prefix) {
|
||||
return QueryBuilders.termsQuery(prefix + ".entityType", entityTypes.stream().map(Object::toString).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public QueryBuilder buildUrnFilters(List<Urn> urns, String prefix) {
|
||||
private static QueryBuilder buildUrnFilters(@Nonnull List<Urn> urns, @Nonnull String prefix) {
|
||||
return QueryBuilders.termsQuery(prefix + ".urn", urns.stream().map(Object::toString).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public QueryBuilder buildEdgeFilters(List<EdgeInfo> edgeInfos) {
|
||||
private static QueryBuilder buildEdgeFilters(@Nonnull List<EdgeInfo> edgeInfos) {
|
||||
return QueryBuilders.termsQuery("relationshipType",
|
||||
edgeInfos.stream().map(EdgeInfo::getType).distinct().collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public QueryBuilder buildExistenceFilter() {
|
||||
final BoolQueryBuilder boolExistenceBuilder = QueryBuilders.boolQuery();
|
||||
boolExistenceBuilder.mustNot(QueryBuilders.existsQuery(CREATED_ON));
|
||||
boolExistenceBuilder.mustNot(QueryBuilders.existsQuery(UPDATED_ON));
|
||||
return boolExistenceBuilder;
|
||||
}
|
||||
|
||||
public QueryBuilder buildManualLineageFilter() {
|
||||
return QueryBuilders.termQuery(String.format("%s.%s", PROPERTIES, SOURCE), UI);
|
||||
}
|
||||
|
||||
public QueryBuilder buildStartTimeFilter(@Nonnull final Long startTimeMillis) {
|
||||
final BoolQueryBuilder startTimeQuery = QueryBuilders.boolQuery();
|
||||
startTimeQuery.should(QueryBuilders.rangeQuery(UPDATED_ON).gte(startTimeMillis));
|
||||
// Secondary check in case we only have createdOn
|
||||
startTimeQuery.should(QueryBuilders.rangeQuery(CREATED_ON).gte(startTimeMillis));
|
||||
// If both createdOn and updatedOn are not present, then we should include the edge
|
||||
startTimeQuery.should(buildExistenceFilter());
|
||||
// If the edge is a manual lineage edge, then we should include the edge
|
||||
startTimeQuery.should(buildManualLineageFilter());
|
||||
return startTimeQuery;
|
||||
}
|
||||
|
||||
public QueryBuilder buildEndTimeFilter(@Nonnull final Long endTimeMillis) {
|
||||
final BoolQueryBuilder endTimeQuery = QueryBuilders.boolQuery();
|
||||
endTimeQuery.should(QueryBuilders.rangeQuery(CREATED_ON).lte(endTimeMillis));
|
||||
// If both createdOn and updatedOn are not present, then we should include the edge
|
||||
endTimeQuery.should(buildExistenceFilter());
|
||||
// If the edge is a manual lineage edge, then we should include the edge
|
||||
endTimeQuery.should(buildManualLineageFilter());
|
||||
return endTimeQuery;
|
||||
}
|
||||
|
||||
@Value
|
||||
public static class LineageResponse {
|
||||
int total;
|
||||
|
@ -0,0 +1,145 @@
|
||||
package com.linkedin.metadata.graph.elastic;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
|
||||
import static com.linkedin.metadata.graph.elastic.ESGraphQueryDAO.*;
|
||||
|
||||
@Slf4j
|
||||
public class TimeFilterUtils {
|
||||
|
||||
/**
|
||||
* In order to filter for edges that fall into a specific filter window, we perform a range-overlap query.
|
||||
* Note that both a start time and an end time must be provided in order to add the filters.
|
||||
*
|
||||
* A range overlap query compares 2 time windows for ANY overlap. This essentially equates to a union operation.
|
||||
* Each window is characterized by 2 points in time: a start time (e.g. created time of the edge) and an end time
|
||||
* (e.g. last updated time of an edge).
|
||||
*
|
||||
* @param startTimeMillis the start of the time filter window
|
||||
* @param endTimeMillis the end of the time filter window
|
||||
*/
|
||||
public static QueryBuilder getEdgeTimeFilterQuery(final long startTimeMillis, final long endTimeMillis) {
|
||||
log.debug(String.format("Adding edge time filters for start time: %s, end time: %s", startTimeMillis, endTimeMillis));
|
||||
/*
|
||||
* One of the following must be true in order for the edge to be returned (should = OR)
|
||||
*
|
||||
* 1. The start and end time window should overlap with the createdOn updatedOn window.
|
||||
* 2. The createdOn and updatedOn window does not exist on the edge at all (support legacy cases)
|
||||
* 3. Special lineage case: The edge is marked as a "manual" edge, meaning that the time filters should NOT be applied.
|
||||
*/
|
||||
BoolQueryBuilder timeFilterQuery = QueryBuilders.boolQuery();
|
||||
timeFilterQuery.should(buildTimeWindowFilter(startTimeMillis, endTimeMillis));
|
||||
timeFilterQuery.should(buildTimestampsMissingFilter());
|
||||
timeFilterQuery.should(buildManualLineageFilter());
|
||||
return timeFilterQuery;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a filter that compares 2 windows on a timeline and returns true for any overlap. This logic
|
||||
* is a bit tricky so change with caution.
|
||||
*
|
||||
* The first window comes from start time and end time provided by the user.
|
||||
* The second window comes from the createdOn and updatedOn timestamps present on graph edges.
|
||||
*
|
||||
* Also accounts for the case where createdOn or updatedOn is MISSING, and in such cases performs
|
||||
* a point overlap instead of a range overlap.
|
||||
*
|
||||
* Range Examples:
|
||||
*
|
||||
* start time -> end time |-----|
|
||||
* createdOn -> updatedOn |-----|
|
||||
*
|
||||
* = true
|
||||
*
|
||||
* start time -> end time |------|
|
||||
* createdOn -> updatedOn |--|
|
||||
*
|
||||
* = true
|
||||
*
|
||||
* start time -> end time |-----|
|
||||
* createdOn -> updatedOn |-----|
|
||||
*
|
||||
* = true
|
||||
*
|
||||
* start time -> end time |-----|
|
||||
* createdOn -> updatedOn |-----|
|
||||
*
|
||||
* = false
|
||||
*
|
||||
*
|
||||
* Point Examples:
|
||||
*
|
||||
* start time -> end time |-----|
|
||||
* updatedOn |
|
||||
*
|
||||
* = true
|
||||
*
|
||||
* start time -> end time |-----|
|
||||
* updatedOn |
|
||||
*
|
||||
* = false
|
||||
*
|
||||
* and same for createdOn.
|
||||
*
|
||||
* Assumptions are that startTimeMillis is always before or equal to endTimeMillis,
|
||||
* and createdOn is always before or equal to updatedOn.
|
||||
*
|
||||
* @param startTimeMillis the start time of the window in milliseconds
|
||||
* @param endTimeMillis the end time of the window in milliseconds
|
||||
*
|
||||
* @return Query Builder with time window filters appended.
|
||||
*/
|
||||
private static QueryBuilder buildTimeWindowFilter(final long startTimeMillis, final long endTimeMillis) {
|
||||
final BoolQueryBuilder timeWindowQuery = QueryBuilders.boolQuery();
|
||||
|
||||
/*
|
||||
* To perform comparison:
|
||||
*
|
||||
* If either createdOn or updatedOn time point falls into the startTime->endTime window,
|
||||
* the edge should be included.
|
||||
*
|
||||
* We also verify that the field actually exists (non-null).
|
||||
*/
|
||||
|
||||
// Build filter comparing createdOn time to startTime->endTime window.
|
||||
BoolQueryBuilder createdOnFilter = QueryBuilders.boolQuery();
|
||||
createdOnFilter.must(QueryBuilders.existsQuery(CREATED_ON));
|
||||
createdOnFilter.must(QueryBuilders.rangeQuery(CREATED_ON).gte(startTimeMillis).lte(endTimeMillis));
|
||||
|
||||
// Build filter comparing updatedOn time to startTime->endTime window.
|
||||
BoolQueryBuilder updatedOnFilter = QueryBuilders.boolQuery();
|
||||
updatedOnFilter.must(QueryBuilders.existsQuery(UPDATED_ON));
|
||||
updatedOnFilter.must(QueryBuilders.rangeQuery(UPDATED_ON).gte(startTimeMillis).lte(endTimeMillis));
|
||||
|
||||
// Now - OR the 2 point comparison conditions together.
|
||||
timeWindowQuery.should(createdOnFilter);
|
||||
timeWindowQuery.should(updatedOnFilter);
|
||||
return timeWindowQuery;
|
||||
}
|
||||
|
||||
private static QueryBuilder buildTimestampsMissingFilter() {
|
||||
// If both createdOn and updatedOn do NOT EXIST (either are null or 0), then
|
||||
// return the edge.
|
||||
final BoolQueryBuilder boolExistenceBuilder = QueryBuilders.boolQuery();
|
||||
boolExistenceBuilder.must(buildNotExistsFilter(CREATED_ON));
|
||||
boolExistenceBuilder.must(buildNotExistsFilter(UPDATED_ON));
|
||||
return boolExistenceBuilder;
|
||||
}
|
||||
|
||||
private static QueryBuilder buildNotExistsFilter(String fieldName) {
|
||||
// This filter returns 'true' if the field DOES NOT EXIST or it exists but is equal to 0.
|
||||
final BoolQueryBuilder notExistsFilter = QueryBuilders.boolQuery();
|
||||
notExistsFilter.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(fieldName)));
|
||||
notExistsFilter.should(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(fieldName, 0L)));
|
||||
return notExistsFilter;
|
||||
}
|
||||
|
||||
private static QueryBuilder buildManualLineageFilter() {
|
||||
return QueryBuilders.termQuery(String.format("%s.%s", PROPERTIES, SOURCE), UI);
|
||||
}
|
||||
|
||||
private TimeFilterUtils() { }
|
||||
}
|
@ -264,6 +264,7 @@ public class ESUtils {
|
||||
public static String toKeywordField(@Nonnull final String filterField, @Nonnull final boolean skipKeywordSuffix) {
|
||||
return skipKeywordSuffix
|
||||
|| "urn".equals(filterField)
|
||||
|| "runId".equals(filterField)
|
||||
|| filterField.contains(".") ? filterField : filterField + ESUtils.KEYWORD_SUFFIX;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,46 @@
|
||||
package com.linkedin.metadata.graph.elastic;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.io.Resources;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.graph.GraphFilters;
|
||||
import com.linkedin.metadata.models.registry.LineageRegistry;
|
||||
import com.linkedin.metadata.query.filter.RelationshipDirection;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class ESGraphQueryDAOTest {
|
||||
|
||||
private static final String TEST_QUERY_FILE = "elasticsearch/sample_filters/lineage_query_filters_1.json";
|
||||
|
||||
@Test
|
||||
private static void testGetQueryForLineageFullArguments() throws Exception {
|
||||
|
||||
URL url = Resources.getResource(TEST_QUERY_FILE);
|
||||
String expectedQuery = Resources.toString(url, StandardCharsets.UTF_8);
|
||||
|
||||
List<Urn> urns = new ArrayList<>();
|
||||
List<LineageRegistry.EdgeInfo> edgeInfos = new ArrayList<>(ImmutableList.of(
|
||||
new LineageRegistry.EdgeInfo("DownstreamOf", RelationshipDirection.INCOMING, Constants.DATASET_ENTITY_NAME)
|
||||
));
|
||||
GraphFilters graphFilters = new GraphFilters(ImmutableList.of(Constants.DATASET_ENTITY_NAME));
|
||||
Long startTime = 0L;
|
||||
Long endTime = 1L;
|
||||
|
||||
QueryBuilder builder = ESGraphQueryDAO.getQueryForLineage(
|
||||
urns,
|
||||
edgeInfos,
|
||||
graphFilters,
|
||||
startTime,
|
||||
endTime
|
||||
);
|
||||
|
||||
Assert.assertEquals(builder.toString(), expectedQuery);
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package com.linkedin.metadata.graph.elastic;
|
||||
|
||||
import com.google.common.io.Resources;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class TimeFilterUtilsTest {
|
||||
|
||||
private static final String TEST_QUERY_FILE = "elasticsearch/sample_filters/lineage_time_query_filters_1.json";
|
||||
@Test
|
||||
private static void testGetEdgeTimeFilterQuery() throws Exception {
|
||||
URL url = Resources.getResource(TEST_QUERY_FILE);
|
||||
String expectedQuery = Resources.toString(url, StandardCharsets.UTF_8);
|
||||
long startTime = 1L;
|
||||
long endTime = 2L;
|
||||
QueryBuilder result = TimeFilterUtils.getEdgeTimeFilterQuery(startTime, endTime);
|
||||
Assert.assertEquals(result.toString(), expectedQuery);
|
||||
}
|
||||
}
|
@ -0,0 +1,206 @@
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"bool" : {
|
||||
"should" : [
|
||||
{
|
||||
"bool" : {
|
||||
"should" : [
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"exists" : {
|
||||
"field" : "createdOn",
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"range" : {
|
||||
"createdOn" : {
|
||||
"from" : 0,
|
||||
"to" : 1,
|
||||
"include_lower" : true,
|
||||
"include_upper" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"exists" : {
|
||||
"field" : "updatedOn",
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"range" : {
|
||||
"updatedOn" : {
|
||||
"from" : 0,
|
||||
"to" : 1,
|
||||
"include_lower" : true,
|
||||
"include_upper" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"bool" : {
|
||||
"should" : [
|
||||
{
|
||||
"bool" : {
|
||||
"must_not" : [
|
||||
{
|
||||
"exists" : {
|
||||
"field" : "createdOn",
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"term" : {
|
||||
"createdOn" : {
|
||||
"value" : 0,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool" : {
|
||||
"should" : [
|
||||
{
|
||||
"bool" : {
|
||||
"must_not" : [
|
||||
{
|
||||
"exists" : {
|
||||
"field" : "updatedOn",
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"term" : {
|
||||
"updatedOn" : {
|
||||
"value" : 0,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"term" : {
|
||||
"properties.source" : {
|
||||
"value" : "UI",
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"should" : [
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"terms" : {
|
||||
"destination.urn" : [ ],
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"terms" : {
|
||||
"relationshipType" : [
|
||||
"DownstreamOf"
|
||||
],
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"terms" : {
|
||||
"source.entityType" : [
|
||||
"dataset"
|
||||
],
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"terms" : {
|
||||
"destination.entityType" : [
|
||||
"dataset"
|
||||
],
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
@ -0,0 +1,158 @@
|
||||
{
|
||||
"bool" : {
|
||||
"should" : [
|
||||
{
|
||||
"bool" : {
|
||||
"should" : [
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"exists" : {
|
||||
"field" : "createdOn",
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"range" : {
|
||||
"createdOn" : {
|
||||
"from" : 1,
|
||||
"to" : 2,
|
||||
"include_lower" : true,
|
||||
"include_upper" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"exists" : {
|
||||
"field" : "updatedOn",
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"range" : {
|
||||
"updatedOn" : {
|
||||
"from" : 1,
|
||||
"to" : 2,
|
||||
"include_lower" : true,
|
||||
"include_upper" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"bool" : {
|
||||
"should" : [
|
||||
{
|
||||
"bool" : {
|
||||
"must_not" : [
|
||||
{
|
||||
"exists" : {
|
||||
"field" : "createdOn",
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"term" : {
|
||||
"createdOn" : {
|
||||
"value" : 0,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool" : {
|
||||
"should" : [
|
||||
{
|
||||
"bool" : {
|
||||
"must_not" : [
|
||||
{
|
||||
"exists" : {
|
||||
"field" : "updatedOn",
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool" : {
|
||||
"must" : [
|
||||
{
|
||||
"term" : {
|
||||
"updatedOn" : {
|
||||
"value" : 0,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
"term" : {
|
||||
"properties.source" : {
|
||||
"value" : "UI",
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"adjust_pure_negative" : true,
|
||||
"boost" : 1.0
|
||||
}
|
||||
}
|
@ -138,10 +138,14 @@ public class GraphIndexUtils {
|
||||
systemMetadata = event.hasPreviousSystemMetadata() ? event.getPreviousSystemMetadata() : null;
|
||||
}
|
||||
|
||||
if (createdOn == null && systemMetadata != null) {
|
||||
if ((createdOn == null || createdOn == 0) && systemMetadata != null) {
|
||||
createdOn = systemMetadata.getLastObserved();
|
||||
}
|
||||
|
||||
if ((updatedOn == null || updatedOn == 0) && systemMetadata != null) {
|
||||
updatedOn = systemMetadata.getLastObserved();
|
||||
}
|
||||
|
||||
if (createdActor == null && event.hasCreated()) {
|
||||
createdActor = event.getCreated().getActor();
|
||||
updatedActor = event.getCreated().getActor();
|
||||
|
Loading…
x
Reference in New Issue
Block a user