mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-28 19:05:53 +00:00
GEN 1409 - Logic to fetch DQ Lineage (#18069)
* fix import issue * feat: added dq lineage tracing logic * fix: move dq lineage to its own endpoint --------- Co-authored-by: Chirag Madlani <12962843+chirag-madlani@users.noreply.github.com>
This commit is contained in:
parent
dfddac9a73
commit
4cfce98090
@ -4,6 +4,7 @@ import static org.openmetadata.schema.type.EventType.ENTITY_DELETED;
|
||||
import static org.openmetadata.service.Entity.TEST_CASE;
|
||||
import static org.openmetadata.service.Entity.TEST_DEFINITION;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -223,6 +224,18 @@ public class TestCaseResultRepository extends EntityTimeSeriesRepository<TestCas
|
||||
daoCollection.dataQualityDataTimeSeriesDao().deleteAll(fqn);
|
||||
}
|
||||
|
||||
public boolean hasTestCaseFailure(String fqn) throws IOException {
|
||||
ResultList<TestCaseResult> testCaseResultResults =
|
||||
listLatestFromSearch(
|
||||
EntityUtil.Fields.EMPTY_FIELDS,
|
||||
new SearchListFilter().addQueryParam("entityFQN", fqn),
|
||||
"testCaseFQN.keyword",
|
||||
null);
|
||||
return testCaseResultResults.getData().stream()
|
||||
.anyMatch(
|
||||
testCaseResult -> testCaseResult.getTestCaseStatus().equals(TestCaseStatus.Failed));
|
||||
}
|
||||
|
||||
private TestCaseResult getTestCaseResult(
|
||||
CreateTestCaseResult createTestCaseResults, TestCase testCase) {
|
||||
RestUtil.validateTimestampMilliseconds(createTestCaseResults.getTimestamp());
|
||||
|
@ -219,6 +219,11 @@ public class TestCaseResultResource
|
||||
schema = @Schema(type = "string"))
|
||||
@QueryParam("testSuiteId")
|
||||
String testSuiteId,
|
||||
@Parameter(
|
||||
description = "Entity FQN the test case belongs to",
|
||||
schema = @Schema(type = "string"))
|
||||
@QueryParam("entityFQN")
|
||||
String entityFQN,
|
||||
@Parameter(
|
||||
description =
|
||||
"Get the latest test case result for each test case -- requires `testSuiteId`. Offset and limit are ignored",
|
||||
@ -263,7 +268,7 @@ public class TestCaseResultResource
|
||||
@QueryParam("q")
|
||||
String q)
|
||||
throws IOException {
|
||||
if (latest.equals("true") && testSuiteId == null) {
|
||||
if (latest.equals("true") && (testSuiteId == null && entityFQN == null)) {
|
||||
throw new IllegalArgumentException("latest=true requires testSuiteId");
|
||||
}
|
||||
EntityUtil.Fields fields = repository.getFields(fieldParams);
|
||||
@ -278,6 +283,7 @@ public class TestCaseResultResource
|
||||
.ifPresent(tcf -> searchListFilter.addQueryParam("testCaseFQN", tcf));
|
||||
Optional.ofNullable(testSuiteId)
|
||||
.ifPresent(tsi -> searchListFilter.addQueryParam("testSuiteId", tsi));
|
||||
Optional.ofNullable(entityFQN).ifPresent(ef -> searchListFilter.addQueryParam("entityFQN", ef));
|
||||
Optional.ofNullable(type).ifPresent(t -> searchListFilter.addQueryParam("testCaseType", t));
|
||||
Optional.ofNullable(dataQualityDimension)
|
||||
.ifPresent(dqd -> searchListFilter.addQueryParam("dataQualityDimension", dqd));
|
||||
|
@ -208,6 +208,39 @@ public class LineageResource {
|
||||
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/getDataQualityLineage")
|
||||
@Operation(
|
||||
operationId = "searchDataQualityLineage",
|
||||
summary = "Search Data Quality lineage",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "search response",
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = SearchResponse.class)))
|
||||
})
|
||||
public Response searchDataQualityLineage(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "fqn") @QueryParam("fqn") String fqn,
|
||||
@Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth,
|
||||
@Parameter(
|
||||
description =
|
||||
"Elasticsearch query that will be combined with the query_string query generator from the `query` argument")
|
||||
@QueryParam("query_filter")
|
||||
String queryFilter,
|
||||
@Parameter(description = "Filter documents by deleted param. By default deleted is false")
|
||||
@QueryParam("includeDeleted")
|
||||
boolean deleted)
|
||||
throws IOException {
|
||||
|
||||
return Entity.getSearchRepository()
|
||||
.searchDataQualityLineage(fqn, upstreamDepth, queryFilter, deleted);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/export")
|
||||
@Produces(MediaType.TEXT_PLAIN)
|
||||
|
@ -136,6 +136,9 @@ public interface SearchClient {
|
||||
String entityType)
|
||||
throws IOException;
|
||||
|
||||
Response searchDataQualityLineage(
|
||||
String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException;
|
||||
|
||||
/*
|
||||
Used for listing knowledge page hierarchy for a given parent and page type, used in Elastic/Open SearchClientExtension
|
||||
*/
|
||||
|
@ -177,11 +177,7 @@ public class SearchListFilter extends Filter<SearchListFilter> {
|
||||
if (entityFQN != null) {
|
||||
conditions.add(
|
||||
includeAllTests
|
||||
? String.format(
|
||||
"{\"bool\":{\"should\": ["
|
||||
+ "{\"prefix\": {\"entityFQN\": \"%s%s\"}},"
|
||||
+ "{\"term\": {\"entityFQN\": \"%s\"}}]}}",
|
||||
escapeDoubleQuotes(entityFQN), Entity.SEPARATOR, escapeDoubleQuotes(entityFQN))
|
||||
? getTestCaseForEntityCondition(entityFQN, "entityFQN")
|
||||
: String.format(
|
||||
"{\"term\": {\"entityFQN\": \"%s\"}}", escapeDoubleQuotes(entityFQN)));
|
||||
}
|
||||
@ -218,6 +214,7 @@ public class SearchListFilter extends Filter<SearchListFilter> {
|
||||
private String getTestCaseResultCondition() {
|
||||
ArrayList<String> conditions = new ArrayList<>();
|
||||
|
||||
String entityFQN = getQueryParam("entityFQN");
|
||||
String dataQualityDimension = getQueryParam("dataQualityDimension");
|
||||
String type = getQueryParam("testCaseType");
|
||||
String startTimestamp = getQueryParam("startTimestamp");
|
||||
@ -226,6 +223,9 @@ public class SearchListFilter extends Filter<SearchListFilter> {
|
||||
String testCaseStatus = getQueryParam("testCaseStatus");
|
||||
String testSuiteId = getQueryParam("testSuiteId");
|
||||
|
||||
if (entityFQN != null)
|
||||
conditions.add(getTestCaseForEntityCondition(entityFQN, "testCase.entityFQN"));
|
||||
|
||||
if (startTimestamp != null && endTimestamp != null) {
|
||||
conditions.add(getTimestampFilter("timestamp", "gte", Long.parseLong(startTimestamp)));
|
||||
conditions.add(getTimestampFilter("timestamp", "lte", Long.parseLong(endTimestamp)));
|
||||
@ -294,6 +294,18 @@ public class SearchListFilter extends Filter<SearchListFilter> {
|
||||
};
|
||||
}
|
||||
|
||||
private String getTestCaseForEntityCondition(String entityFQN, String field) {
|
||||
return String.format(
|
||||
"{\"bool\":{\"should\": ["
|
||||
+ "{\"prefix\": {\"%s\": \"%s%s\"}},"
|
||||
+ "{\"term\": {\"%s\": \"%s\"}}]}}",
|
||||
field,
|
||||
escapeDoubleQuotes(entityFQN),
|
||||
Entity.SEPARATOR,
|
||||
field,
|
||||
escapeDoubleQuotes(entityFQN));
|
||||
}
|
||||
|
||||
private String getDataQualityDimensionCondition(String dataQualityDimension, String field) {
|
||||
return String.format("{\"term\": {\"%s\": \"%s\"}}", field, dataQualityDimension);
|
||||
}
|
||||
|
@ -798,6 +798,11 @@ public class SearchRepository {
|
||||
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
|
||||
}
|
||||
|
||||
public Response searchDataQualityLineage(
|
||||
String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException {
|
||||
return searchClient.searchDataQualityLineage(fqn, upstreamDepth, queryFilter, deleted);
|
||||
}
|
||||
|
||||
public Map<String, Object> searchLineageForExport(
|
||||
String fqn,
|
||||
int upstreamDepth,
|
||||
|
@ -154,6 +154,7 @@ import org.openmetadata.sdk.exception.SearchIndexNotFoundException;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface;
|
||||
import org.openmetadata.service.jdbi3.DataInsightChartRepository;
|
||||
import org.openmetadata.service.jdbi3.TestCaseResultRepository;
|
||||
import org.openmetadata.service.search.SearchClient;
|
||||
import org.openmetadata.service.search.SearchIndexUtils;
|
||||
import org.openmetadata.service.search.SearchRequest;
|
||||
@ -361,21 +362,7 @@ public class ElasticSearchClient implements SearchClient {
|
||||
buildSearchRBACQuery(subjectContext, searchSourceBuilder);
|
||||
|
||||
// Add Filter
|
||||
if (!nullOrEmpty(request.getQueryFilter()) && !request.getQueryFilter().equals("{}")) {
|
||||
try {
|
||||
XContentParser filterParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(
|
||||
xContentRegistry, LoggingDeprecationHandler.INSTANCE, request.getQueryFilter());
|
||||
QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query();
|
||||
BoolQueryBuilder newQuery =
|
||||
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
|
||||
searchSourceBuilder.query(newQuery);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
|
||||
}
|
||||
}
|
||||
buildSearchSourceFilter(request.getQueryFilter(), searchSourceBuilder);
|
||||
|
||||
if (!nullOrEmpty(request.getPostFilter())) {
|
||||
try {
|
||||
@ -757,6 +744,18 @@ public class ElasticSearchClient implements SearchClient {
|
||||
return Response.status(OK).entity(responseMap).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response searchDataQualityLineage(
|
||||
String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException {
|
||||
Map<String, Object> responseMap = new HashMap<>();
|
||||
Set<Map<String, Object>> edges = new HashSet<>();
|
||||
Set<Map<String, Object>> nodes = new HashSet<>();
|
||||
searchDataQualityLineage(fqn, upstreamDepth, queryFilter, deleted, edges, nodes);
|
||||
responseMap.put("edges", edges);
|
||||
responseMap.put("nodes", nodes);
|
||||
return Response.status(OK).entity(responseMap).build();
|
||||
}
|
||||
|
||||
private void getLineage(
|
||||
String fqn,
|
||||
int depth,
|
||||
@ -782,21 +781,7 @@ public class ElasticSearchClient implements SearchClient {
|
||||
.must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))
|
||||
.must(QueryBuilders.termQuery("deleted", deleted)));
|
||||
}
|
||||
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
|
||||
try {
|
||||
XContentParser filterParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter);
|
||||
es.org.elasticsearch.index.query.QueryBuilder filter =
|
||||
SearchSourceBuilder.fromXContent(filterParser).query();
|
||||
es.org.elasticsearch.index.query.BoolQueryBuilder newQuery =
|
||||
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
|
||||
searchSourceBuilder.query(newQuery);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
|
||||
}
|
||||
}
|
||||
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
|
||||
searchRequest.source(searchSourceBuilder.size(1000));
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
for (var hit : searchResponse.getHits().getHits()) {
|
||||
@ -825,6 +810,141 @@ public class ElasticSearchClient implements SearchClient {
|
||||
}
|
||||
}
|
||||
|
||||
private void searchDataQualityLineage(
|
||||
String fqn,
|
||||
int upstreamDepth,
|
||||
String queryFilter,
|
||||
boolean deleted,
|
||||
Set<Map<String, Object>> edges,
|
||||
Set<Map<String, Object>> nodes)
|
||||
throws IOException {
|
||||
Map<String, Map<String, Object>> allNodes = new HashMap<>();
|
||||
Map<String, List<Map<String, Object>>> allEdges = new HashMap<>();
|
||||
Set<String> nodesWithFailures = new HashSet<>();
|
||||
|
||||
collectNodesAndEdges(
|
||||
fqn,
|
||||
upstreamDepth,
|
||||
queryFilter,
|
||||
deleted,
|
||||
allEdges,
|
||||
allNodes,
|
||||
nodesWithFailures,
|
||||
new HashSet<>());
|
||||
for (String nodeWithFailure : nodesWithFailures) {
|
||||
traceBackDQLineage(
|
||||
nodeWithFailure, nodesWithFailures, allEdges, allNodes, nodes, edges, new HashSet<>());
|
||||
}
|
||||
}
|
||||
|
||||
private void collectNodesAndEdges(
|
||||
String fqn,
|
||||
int upstreamDepth,
|
||||
String queryFilter,
|
||||
boolean deleted,
|
||||
Map<String, List<Map<String, Object>>> allEdges,
|
||||
Map<String, Map<String, Object>> allNodes,
|
||||
Set<String> nodesWithFailure,
|
||||
Set<String> processedNode)
|
||||
throws IOException {
|
||||
TestCaseResultRepository testCaseResultRepository = new TestCaseResultRepository();
|
||||
if (upstreamDepth <= 0 || processedNode.contains(fqn)) {
|
||||
return;
|
||||
}
|
||||
processedNode.add(fqn);
|
||||
SearchResponse searchResponse = performLineageSearch(fqn, queryFilter, deleted);
|
||||
Optional<List> optionalDocs =
|
||||
JsonUtils.readJsonAtPath(searchResponse.toString(), "$.hits.hits[*]._source", List.class);
|
||||
|
||||
if (optionalDocs.isPresent()) {
|
||||
List<Map<String, Object>> docs = (List<Map<String, Object>>) optionalDocs.get();
|
||||
for (Map<String, Object> doc : docs) {
|
||||
String nodeId = doc.get("id").toString();
|
||||
allNodes.put(nodeId, doc);
|
||||
if (testCaseResultRepository.hasTestCaseFailure(doc.get("fullyQualifiedName").toString())) {
|
||||
nodesWithFailure.add(nodeId);
|
||||
}
|
||||
Optional<List> optionalLineageList =
|
||||
JsonUtils.readJsonAtPath(JsonUtils.pojoToJson(doc), "$.lineage", List.class);
|
||||
if (optionalLineageList.isPresent()) {
|
||||
List<Map<String, Object>> lineageList =
|
||||
(List<Map<String, Object>>) optionalLineageList.get();
|
||||
for (Map<String, Object> lineage : lineageList) {
|
||||
Map<String, String> fromEntity = (Map<String, String>) lineage.get("fromEntity");
|
||||
String fromEntityId = fromEntity.get("id");
|
||||
allEdges.computeIfAbsent(fromEntityId, k -> new ArrayList<>()).add(lineage);
|
||||
collectNodesAndEdges(
|
||||
fromEntity.get("fqn"),
|
||||
upstreamDepth - 1,
|
||||
queryFilter,
|
||||
deleted,
|
||||
allEdges,
|
||||
allNodes,
|
||||
nodesWithFailure,
|
||||
processedNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void traceBackDQLineage(
|
||||
String nodeFailureId,
|
||||
Set<String> nodesWithFailures,
|
||||
Map<String, List<Map<String, Object>>> allEdges,
|
||||
Map<String, Map<String, Object>> allNodes,
|
||||
Set<Map<String, Object>> nodes,
|
||||
Set<Map<String, Object>> edges,
|
||||
Set<String> processedNodes) {
|
||||
if (processedNodes.contains(nodeFailureId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
processedNodes.add(nodeFailureId);
|
||||
if (nodesWithFailures.contains(nodeFailureId)) {
|
||||
Map<String, Object> node = allNodes.get(nodeFailureId);
|
||||
node.keySet().removeAll(FIELDS_TO_REMOVE);
|
||||
node.remove("lineage");
|
||||
nodes.add(allNodes.get(nodeFailureId));
|
||||
}
|
||||
List<Map<String, Object>> edgesForNode = allEdges.get(nodeFailureId);
|
||||
if (edgesForNode != null) {
|
||||
for (Map<String, Object> edge : edgesForNode) {
|
||||
Map<String, String> fromEntity = (Map<String, String>) edge.get("fromEntity");
|
||||
String fromEntityId = fromEntity.get("id");
|
||||
if (!fromEntityId.equals(nodeFailureId)) continue;
|
||||
Map<String, String> toEntity = (Map<String, String>) edge.get("toEntity");
|
||||
edges.add(edge);
|
||||
traceBackDQLineage(
|
||||
toEntity.get("id"),
|
||||
nodesWithFailures,
|
||||
allEdges,
|
||||
allNodes,
|
||||
nodes,
|
||||
edges,
|
||||
processedNodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private SearchResponse performLineageSearch(String fqn, String queryFilter, boolean deleted)
|
||||
throws IOException {
|
||||
es.org.elasticsearch.action.search.SearchRequest searchRequest =
|
||||
new es.org.elasticsearch.action.search.SearchRequest(
|
||||
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(
|
||||
QueryBuilders.boolQuery()
|
||||
.must(
|
||||
QueryBuilders.termQuery(
|
||||
"lineage.toEntity.fqnHash.keyword", FullyQualifiedName.buildHash(fqn)))
|
||||
.must(QueryBuilders.termQuery("deleted", !nullOrEmpty(deleted) && deleted)));
|
||||
|
||||
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
|
||||
searchRequest.source(searchSourceBuilder.size(1000));
|
||||
return client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
}
|
||||
|
||||
private Map<String, Object> searchPipelineLineage(
|
||||
String fqn,
|
||||
int upstreamDepth,
|
||||
@ -850,21 +970,7 @@ public class ElasticSearchClient implements SearchClient {
|
||||
.must(boolQueryBuilder)
|
||||
.must(QueryBuilders.termQuery("deleted", deleted)));
|
||||
}
|
||||
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
|
||||
try {
|
||||
XContentParser filterParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter);
|
||||
es.org.elasticsearch.index.query.QueryBuilder filter =
|
||||
SearchSourceBuilder.fromXContent(filterParser).query();
|
||||
es.org.elasticsearch.index.query.BoolQueryBuilder newQuery =
|
||||
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
|
||||
searchSourceBuilder.query(newQuery);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
|
||||
}
|
||||
}
|
||||
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
for (var hit : searchResponse.getHits().getHits()) {
|
||||
@ -2003,21 +2109,7 @@ public class ElasticSearchClient implements SearchClient {
|
||||
|
||||
searchSourceBuilder.query(searchQueryFiler).fetchSource(false);
|
||||
|
||||
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
|
||||
try {
|
||||
XContentParser filterParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter);
|
||||
es.org.elasticsearch.index.query.QueryBuilder filter =
|
||||
SearchSourceBuilder.fromXContent(filterParser).query();
|
||||
es.org.elasticsearch.index.query.BoolQueryBuilder newQuery =
|
||||
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
|
||||
searchSourceBuilder.query(newQuery);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
|
||||
}
|
||||
}
|
||||
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
|
||||
|
||||
return searchSourceBuilder;
|
||||
}
|
||||
@ -2341,4 +2433,22 @@ public class ElasticSearchClient implements SearchClient {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void buildSearchSourceFilter(
|
||||
String queryFilter, SearchSourceBuilder searchSourceBuilder) {
|
||||
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
|
||||
try {
|
||||
XContentParser filterParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter);
|
||||
QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query();
|
||||
BoolQueryBuilder newQuery =
|
||||
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
|
||||
searchSourceBuilder.query(newQuery);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -75,6 +75,7 @@ import org.openmetadata.sdk.exception.SearchIndexNotFoundException;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface;
|
||||
import org.openmetadata.service.jdbi3.DataInsightChartRepository;
|
||||
import org.openmetadata.service.jdbi3.TestCaseResultRepository;
|
||||
import org.openmetadata.service.search.SearchClient;
|
||||
import org.openmetadata.service.search.SearchIndexUtils;
|
||||
import org.openmetadata.service.search.SearchRequest;
|
||||
@ -353,23 +354,7 @@ public class OpenSearchClient implements SearchClient {
|
||||
buildSearchRBACQuery(subjectContext, searchSourceBuilder);
|
||||
|
||||
// Add Query Filter
|
||||
if (!nullOrEmpty(request.getQueryFilter()) && !request.getQueryFilter().equals("{}")) {
|
||||
try {
|
||||
XContentParser filterParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(
|
||||
X_CONTENT_REGISTRY,
|
||||
LoggingDeprecationHandler.INSTANCE,
|
||||
request.getQueryFilter());
|
||||
QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query();
|
||||
BoolQueryBuilder newQuery =
|
||||
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
|
||||
searchSourceBuilder.query(newQuery);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
|
||||
}
|
||||
}
|
||||
buildSearchSourceFilter(request.getQueryFilter(), searchSourceBuilder);
|
||||
|
||||
if (!nullOrEmpty(request.getPostFilter())) {
|
||||
try {
|
||||
@ -760,6 +745,18 @@ public class OpenSearchClient implements SearchClient {
|
||||
return Response.status(OK).entity(responseMap).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response searchDataQualityLineage(
|
||||
String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException {
|
||||
Map<String, Object> responseMap = new HashMap<>();
|
||||
Set<Map<String, Object>> edges = new HashSet<>();
|
||||
Set<Map<String, Object>> nodes = new HashSet<>();
|
||||
searchDataQualityLineage(fqn, upstreamDepth, queryFilter, deleted, edges, nodes);
|
||||
responseMap.put("edges", edges);
|
||||
responseMap.put("nodes", nodes);
|
||||
return Response.status(OK).entity(responseMap).build();
|
||||
}
|
||||
|
||||
private void getLineage(
|
||||
String fqn,
|
||||
int depth,
|
||||
@ -785,20 +782,8 @@ public class OpenSearchClient implements SearchClient {
|
||||
.must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))
|
||||
.must(QueryBuilders.termQuery("deleted", deleted)));
|
||||
}
|
||||
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
|
||||
try {
|
||||
XContentParser filterParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, queryFilter);
|
||||
QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query();
|
||||
BoolQueryBuilder newQuery =
|
||||
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
|
||||
searchSourceBuilder.query(newQuery);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
|
||||
}
|
||||
}
|
||||
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
|
||||
|
||||
searchRequest.source(searchSourceBuilder.size(1000));
|
||||
os.org.opensearch.action.search.SearchResponse searchResponse =
|
||||
client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
@ -828,6 +813,127 @@ public class OpenSearchClient implements SearchClient {
|
||||
}
|
||||
}
|
||||
|
||||
private void searchDataQualityLineage(
|
||||
String fqn,
|
||||
int upstreamDepth,
|
||||
String queryFilter,
|
||||
boolean deleted,
|
||||
Set<Map<String, Object>> edges,
|
||||
Set<Map<String, Object>> nodes)
|
||||
throws IOException {
|
||||
Map<String, Map<String, Object>> allNodes = new HashMap<>();
|
||||
Map<String, List<Map<String, Object>>> allEdges = new HashMap<>();
|
||||
Set<String> nodesWithFailures = new HashSet<>();
|
||||
|
||||
collectNodesAndEdges(
|
||||
fqn,
|
||||
upstreamDepth,
|
||||
queryFilter,
|
||||
deleted,
|
||||
allEdges,
|
||||
allNodes,
|
||||
nodesWithFailures,
|
||||
new HashSet<>());
|
||||
for (String nodeWithFailure : nodesWithFailures) {
|
||||
traceBackDQLineage(nodeWithFailure, allEdges, allNodes, nodes, edges, new HashSet<>());
|
||||
}
|
||||
}
|
||||
|
||||
private void collectNodesAndEdges(
|
||||
String fqn,
|
||||
int upstreamDepth,
|
||||
String queryFilter,
|
||||
boolean deleted,
|
||||
Map<String, List<Map<String, Object>>> allEdges,
|
||||
Map<String, Map<String, Object>> allNodes,
|
||||
Set<String> nodesWithFailure,
|
||||
Set<String> processedNode)
|
||||
throws IOException {
|
||||
TestCaseResultRepository testCaseResultRepository = new TestCaseResultRepository();
|
||||
if (upstreamDepth <= 0 || processedNode.contains(fqn)) {
|
||||
return;
|
||||
}
|
||||
processedNode.add(fqn);
|
||||
SearchResponse searchResponse = performLineageSearch(fqn, queryFilter, deleted);
|
||||
Optional<List> optionalDocs =
|
||||
JsonUtils.readJsonAtPath(searchResponse.toString(), "$.hits.hits[*]._source", List.class);
|
||||
|
||||
if (optionalDocs.isPresent()) {
|
||||
List<Map<String, Object>> docs = (List<Map<String, Object>>) optionalDocs.get();
|
||||
for (Map<String, Object> doc : docs) {
|
||||
String nodeId = doc.get("id").toString();
|
||||
allNodes.put(nodeId, doc);
|
||||
if (testCaseResultRepository.hasTestCaseFailure(doc.get("fullyQualifiedName").toString())) {
|
||||
nodesWithFailure.add(nodeId);
|
||||
}
|
||||
Optional<List> optionalLineageList =
|
||||
JsonUtils.readJsonAtPath(JsonUtils.pojoToJson(doc), "$.lineage", List.class);
|
||||
if (optionalLineageList.isPresent()) {
|
||||
List<Map<String, Object>> lineageList =
|
||||
(List<Map<String, Object>>) optionalLineageList.get();
|
||||
for (Map<String, Object> lineage : lineageList) {
|
||||
Map<String, String> fromEntity = (Map<String, String>) lineage.get("fromEntity");
|
||||
String fromEntityId = fromEntity.get("id");
|
||||
allEdges.computeIfAbsent(fromEntityId, k -> new ArrayList<>()).add(lineage);
|
||||
collectNodesAndEdges(
|
||||
fromEntity.get("fqn"),
|
||||
upstreamDepth - 1,
|
||||
queryFilter,
|
||||
deleted,
|
||||
allEdges,
|
||||
allNodes,
|
||||
nodesWithFailure,
|
||||
processedNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void traceBackDQLineage(
|
||||
String nodeFailureId,
|
||||
Map<String, List<Map<String, Object>>> allEdges,
|
||||
Map<String, Map<String, Object>> allNodes,
|
||||
Set<Map<String, Object>> nodes,
|
||||
Set<Map<String, Object>> edges,
|
||||
Set<String> processedNodes) {
|
||||
if (processedNodes.contains(nodeFailureId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
processedNodes.add(nodeFailureId);
|
||||
nodes.add(allNodes.get(nodeFailureId));
|
||||
List<Map<String, Object>> edgesForNode = allEdges.get(nodeFailureId);
|
||||
if (edgesForNode != null) {
|
||||
for (Map<String, Object> edge : edgesForNode) {
|
||||
Map<String, String> fromEntity = (Map<String, String>) edge.get("fromEntity");
|
||||
String fromEntityId = fromEntity.get("id");
|
||||
if (!fromEntityId.equals(nodeFailureId)) continue; // skip if the edge is from the node
|
||||
Map<String, String> toEntity = (Map<String, String>) edge.get("toEntity");
|
||||
edges.add(edge);
|
||||
traceBackDQLineage(toEntity.get("id"), allEdges, allNodes, nodes, edges, processedNodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private SearchResponse performLineageSearch(String fqn, String queryFilter, boolean deleted)
|
||||
throws IOException {
|
||||
os.org.opensearch.action.search.SearchRequest searchRequest =
|
||||
new os.org.opensearch.action.search.SearchRequest(
|
||||
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(
|
||||
QueryBuilders.boolQuery()
|
||||
.must(
|
||||
QueryBuilders.termQuery(
|
||||
"lineage.fromEntity.fqnHash.keyword", FullyQualifiedName.buildHash(fqn)))
|
||||
.must(QueryBuilders.termQuery("deleted", !nullOrEmpty(deleted) && deleted)));
|
||||
|
||||
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
|
||||
searchRequest.source(searchSourceBuilder.size(1000));
|
||||
return client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
}
|
||||
|
||||
private Map<String, Object> searchPipelineLineage(
|
||||
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
|
||||
throws IOException {
|
||||
@ -850,20 +956,8 @@ public class OpenSearchClient implements SearchClient {
|
||||
.must(boolQueryBuilder)
|
||||
.must(QueryBuilders.termQuery("deleted", deleted)));
|
||||
}
|
||||
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
|
||||
try {
|
||||
XContentParser filterParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, queryFilter);
|
||||
QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query();
|
||||
BoolQueryBuilder newQuery =
|
||||
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
|
||||
searchSourceBuilder.query(newQuery);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
|
||||
}
|
||||
}
|
||||
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
|
||||
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
for (var hit : searchResponse.getHits().getHits()) {
|
||||
@ -1970,20 +2064,7 @@ public class OpenSearchClient implements SearchClient {
|
||||
|
||||
searchSourceBuilder.query(searchQueryFiler).fetchSource(false);
|
||||
|
||||
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
|
||||
try {
|
||||
XContentParser filterParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, queryFilter);
|
||||
QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query();
|
||||
BoolQueryBuilder newQuery =
|
||||
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
|
||||
searchSourceBuilder.query(newQuery);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
|
||||
}
|
||||
}
|
||||
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
|
||||
|
||||
return searchSourceBuilder;
|
||||
}
|
||||
@ -2305,4 +2386,22 @@ public class OpenSearchClient implements SearchClient {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void buildSearchSourceFilter(
|
||||
String queryFilter, SearchSourceBuilder searchSourceBuilder) {
|
||||
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
|
||||
try {
|
||||
XContentParser filterParser =
|
||||
XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, queryFilter);
|
||||
QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query();
|
||||
BoolQueryBuilder newQuery =
|
||||
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
|
||||
searchSourceBuilder.query(newQuery);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -236,8 +236,7 @@ public abstract class OpenMetadataApplicationTest {
|
||||
authenticationConfiguration,
|
||||
forceMigrations);
|
||||
// Initialize search repository
|
||||
SearchRepository searchRepository =
|
||||
new SearchRepository(config.getElasticSearchConfiguration());
|
||||
SearchRepository searchRepository = new SearchRepository(getEsConfig());
|
||||
Entity.setSearchRepository(searchRepository);
|
||||
Entity.setCollectionDAO(jdbi.onDemand(CollectionDAO.class));
|
||||
Entity.initializeRepositories(config, jdbi);
|
||||
@ -280,20 +279,7 @@ public abstract class OpenMetadataApplicationTest {
|
||||
}
|
||||
|
||||
private void createIndices() {
|
||||
ElasticSearchConfiguration esConfig = new ElasticSearchConfiguration();
|
||||
esConfig
|
||||
.withHost(HOST)
|
||||
.withPort(ELASTIC_SEARCH_CONTAINER.getMappedPort(9200))
|
||||
.withUsername(ELASTIC_USER)
|
||||
.withPassword(ELASTIC_PASSWORD)
|
||||
.withScheme(ELASTIC_SCHEME)
|
||||
.withConnectionTimeoutSecs(ELASTIC_CONNECT_TIMEOUT)
|
||||
.withSocketTimeoutSecs(ELASTIC_SOCKET_TIMEOUT)
|
||||
.withKeepAliveTimeoutSecs(ELASTIC_KEEP_ALIVE_TIMEOUT)
|
||||
.withBatchSize(ELASTIC_BATCH_SIZE)
|
||||
.withSearchIndexMappingLanguage(ELASTIC_SEARCH_INDEX_MAPPING_LANGUAGE)
|
||||
.withClusterAlias(ELASTIC_SEARCH_CLUSTER_ALIAS)
|
||||
.withSearchType(ELASTIC_SEARCH_TYPE);
|
||||
ElasticSearchConfiguration esConfig = getEsConfig();
|
||||
SearchRepository searchRepository = new SearchRepository(esConfig);
|
||||
LOG.info("creating indexes.");
|
||||
searchRepository.createIndexes();
|
||||
@ -364,4 +350,22 @@ public abstract class OpenMetadataApplicationTest {
|
||||
configOverrides.add(ConfigOverride.config("database.user", sqlContainer.getUsername()));
|
||||
configOverrides.add(ConfigOverride.config("database.password", sqlContainer.getPassword()));
|
||||
}
|
||||
|
||||
private static ElasticSearchConfiguration getEsConfig() {
|
||||
ElasticSearchConfiguration esConfig = new ElasticSearchConfiguration();
|
||||
esConfig
|
||||
.withHost(HOST)
|
||||
.withPort(ELASTIC_SEARCH_CONTAINER.getMappedPort(9200))
|
||||
.withUsername(ELASTIC_USER)
|
||||
.withPassword(ELASTIC_PASSWORD)
|
||||
.withScheme(ELASTIC_SCHEME)
|
||||
.withConnectionTimeoutSecs(ELASTIC_CONNECT_TIMEOUT)
|
||||
.withSocketTimeoutSecs(ELASTIC_SOCKET_TIMEOUT)
|
||||
.withKeepAliveTimeoutSecs(ELASTIC_KEEP_ALIVE_TIMEOUT)
|
||||
.withBatchSize(ELASTIC_BATCH_SIZE)
|
||||
.withSearchIndexMappingLanguage(ELASTIC_SEARCH_INDEX_MAPPING_LANGUAGE)
|
||||
.withClusterAlias(ELASTIC_SEARCH_CLUSTER_ALIAS)
|
||||
.withSearchType(ELASTIC_SEARCH_TYPE);
|
||||
return esConfig;
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ import static javax.ws.rs.core.Response.Status.FORBIDDEN;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.openmetadata.service.Entity.ADMIN_USER_NAME;
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.permissionNotAllowed;
|
||||
import static org.openmetadata.service.security.SecurityUtil.authHeaders;
|
||||
@ -27,6 +28,7 @@ import static org.openmetadata.service.util.TestUtils.assertResponse;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URLEncoder;
|
||||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
@ -50,6 +52,9 @@ import org.openmetadata.schema.api.data.CreateMlModel;
|
||||
import org.openmetadata.schema.api.data.CreateTable;
|
||||
import org.openmetadata.schema.api.data.CreateTopic;
|
||||
import org.openmetadata.schema.api.lineage.AddLineage;
|
||||
import org.openmetadata.schema.api.tests.CreateTestCase;
|
||||
import org.openmetadata.schema.api.tests.CreateTestCaseResult;
|
||||
import org.openmetadata.schema.api.tests.CreateTestSuite;
|
||||
import org.openmetadata.schema.entity.data.Container;
|
||||
import org.openmetadata.schema.entity.data.Dashboard;
|
||||
import org.openmetadata.schema.entity.data.DashboardDataModel;
|
||||
@ -58,6 +63,10 @@ import org.openmetadata.schema.entity.data.Table;
|
||||
import org.openmetadata.schema.entity.data.Topic;
|
||||
import org.openmetadata.schema.entity.teams.Role;
|
||||
import org.openmetadata.schema.entity.teams.User;
|
||||
import org.openmetadata.schema.tests.TestCase;
|
||||
import org.openmetadata.schema.tests.TestDefinition;
|
||||
import org.openmetadata.schema.tests.TestSuite;
|
||||
import org.openmetadata.schema.tests.type.TestCaseStatus;
|
||||
import org.openmetadata.schema.type.ColumnLineage;
|
||||
import org.openmetadata.schema.type.ContainerDataModel;
|
||||
import org.openmetadata.schema.type.Edge;
|
||||
@ -71,6 +80,10 @@ import org.openmetadata.service.OpenMetadataApplicationTest;
|
||||
import org.openmetadata.service.resources.dashboards.DashboardResourceTest;
|
||||
import org.openmetadata.service.resources.databases.TableResourceTest;
|
||||
import org.openmetadata.service.resources.datamodels.DashboardDataModelResourceTest;
|
||||
import org.openmetadata.service.resources.dqtests.TestCaseResourceTest;
|
||||
import org.openmetadata.service.resources.dqtests.TestDefinitionResourceTest;
|
||||
import org.openmetadata.service.resources.dqtests.TestSuiteResourceTest;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.resources.mlmodels.MlModelResourceTest;
|
||||
import org.openmetadata.service.resources.storages.ContainerResourceTest;
|
||||
import org.openmetadata.service.resources.teams.RoleResource;
|
||||
@ -454,6 +467,108 @@ public class LineageResourceTest extends OpenMetadataApplicationTest {
|
||||
assertEquals(lineageDetails.getDescription(), edge.getLineageDetails().getDescription());
|
||||
}
|
||||
|
||||
@Order(6)
|
||||
@Test
|
||||
void get_dataQualityLineage(TestInfo test)
|
||||
throws IOException, URISyntaxException, ParseException {
|
||||
TestSuiteResourceTest testSuiteResourceTest = new TestSuiteResourceTest();
|
||||
TestCaseResourceTest testCaseResourceTest = new TestCaseResourceTest();
|
||||
TestDefinitionResourceTest testDefinitionResourceTest = new TestDefinitionResourceTest();
|
||||
|
||||
addEdge(TABLES.get(4), TABLES.get(5));
|
||||
addEdge(TABLES.get(5), TABLES.get(6));
|
||||
addEdge(TABLES.get(0), TABLES.get(4));
|
||||
addEdge(TABLES.get(0), TABLES.get(2));
|
||||
addEdge(TABLES.get(2), TABLES.get(1));
|
||||
addEdge(TABLES.get(2), TABLES.get(7));
|
||||
addEdge(TABLES.get(6), TABLES.get(7));
|
||||
|
||||
Map<String, String> queryParams =
|
||||
Map.of("fqn", TABLES.get(7).getFullyQualifiedName(), "upstreamDepth", "3");
|
||||
Map<String, Object> lineage = getDataQualityLineage(queryParams, ADMIN_AUTH_HEADERS);
|
||||
|
||||
// we have no failures in the lineage, hence no
|
||||
assertEquals(0, ((List) lineage.get("nodes")).size());
|
||||
assertEquals(0, ((List) lineage.get("edges")).size());
|
||||
|
||||
// Create test cases with failures for table 4 and table 6
|
||||
TestDefinition testDefinition =
|
||||
testDefinitionResourceTest.getEntityByName(
|
||||
"columnValuesToBeNotNull", "owners", ADMIN_AUTH_HEADERS);
|
||||
|
||||
CreateTestSuite createTestSuite4 =
|
||||
testSuiteResourceTest.createRequest(test).withName(TABLES.get(4).getFullyQualifiedName());
|
||||
CreateTestSuite createTestSuite6 =
|
||||
testSuiteResourceTest.createRequest(test).withName(TABLES.get(6).getFullyQualifiedName());
|
||||
TestSuite testSuite4 =
|
||||
testSuiteResourceTest.createExecutableTestSuite(createTestSuite4, ADMIN_AUTH_HEADERS);
|
||||
TestSuite testSuite6 =
|
||||
testSuiteResourceTest.createExecutableTestSuite(createTestSuite6, ADMIN_AUTH_HEADERS);
|
||||
|
||||
MessageParser.EntityLink TABLE4_LINK =
|
||||
new MessageParser.EntityLink(Entity.TABLE, TABLES.get(4).getFullyQualifiedName());
|
||||
MessageParser.EntityLink TABLE6_LINK =
|
||||
new MessageParser.EntityLink(Entity.TABLE, TABLES.get(6).getFullyQualifiedName());
|
||||
CreateTestCase create4 = testCaseResourceTest.createRequest(test);
|
||||
CreateTestCase create6 = testCaseResourceTest.createRequest(test, 2);
|
||||
create4
|
||||
.withEntityLink(TABLE4_LINK.getLinkString())
|
||||
.withTestSuite(testSuite4.getFullyQualifiedName())
|
||||
.withTestDefinition(testDefinition.getFullyQualifiedName());
|
||||
create6
|
||||
.withEntityLink(TABLE6_LINK.getLinkString())
|
||||
.withTestSuite(testSuite6.getFullyQualifiedName())
|
||||
.withTestDefinition(testDefinition.getFullyQualifiedName());
|
||||
TestCase testCase4 = testCaseResourceTest.createEntity(create4, ADMIN_AUTH_HEADERS);
|
||||
TestCase testCase6 = testCaseResourceTest.createEntity(create6, ADMIN_AUTH_HEADERS);
|
||||
|
||||
CreateTestCaseResult createTestCaseResult =
|
||||
new CreateTestCaseResult()
|
||||
.withResult("tested")
|
||||
.withTestCaseStatus(TestCaseStatus.Failed)
|
||||
.withTimestamp(TestUtils.dateToTimestamp(String.format("2024-09-11")));
|
||||
testCaseResourceTest.postTestCaseResult(
|
||||
testCase4.getFullyQualifiedName(), createTestCaseResult, ADMIN_AUTH_HEADERS);
|
||||
testCaseResourceTest.postTestCaseResult(
|
||||
testCase6.getFullyQualifiedName(), createTestCaseResult, ADMIN_AUTH_HEADERS);
|
||||
|
||||
lineage = getDataQualityLineage(queryParams, ADMIN_AUTH_HEADERS);
|
||||
List<Map<String, Object>> edges = ((List<Map<String, Object>>) lineage.get("edges"));
|
||||
List<Map<String, Object>> nodes = ((List<Map<String, Object>>) lineage.get("nodes"));
|
||||
// We should have 2 nodes (4 and 6) and 3 edges (4->5, 5->6, 6->7)
|
||||
assertEquals(3, edges.size());
|
||||
assertEquals(2, nodes.size());
|
||||
|
||||
assertTrue(
|
||||
nodes.stream()
|
||||
.allMatch(
|
||||
n ->
|
||||
TABLES.get(4).getId().toString().equals(n.get("id"))
|
||||
|| TABLES.get(6).getId().toString().equals(n.get("id"))));
|
||||
// our lineage is 0 -> 4 -> 5 -> 6 -> 7
|
||||
for (Map<String, Object> edge : edges) {
|
||||
Map<String, String> toEntity = ((Map<String, String>) edge.get("toEntity"));
|
||||
Map<String, String> fromEntity = ((Map<String, String>) edge.get("fromEntity"));
|
||||
if (toEntity.get("id").equals(TABLES.get(6).getId().toString())) {
|
||||
assertEquals(TABLES.get(5).getId().toString(), fromEntity.get("id"));
|
||||
} else if (fromEntity.get("id").equals(TABLES.get(4).getId().toString())) {
|
||||
assertEquals(TABLES.get(5).getId().toString(), toEntity.get("id"));
|
||||
} else if (fromEntity.get("id").equals(TABLES.get(6).getId().toString())) {
|
||||
assertEquals(TABLES.get(7).getId().toString(), toEntity.get("id"));
|
||||
} else {
|
||||
fail(String.format("Unexpected edge: %s", edge));
|
||||
}
|
||||
}
|
||||
|
||||
deleteEdge(TABLES.get(4), TABLES.get(5));
|
||||
deleteEdge(TABLES.get(5), TABLES.get(6));
|
||||
deleteEdge(TABLES.get(0), TABLES.get(4));
|
||||
deleteEdge(TABLES.get(0), TABLES.get(2));
|
||||
deleteEdge(TABLES.get(2), TABLES.get(1));
|
||||
deleteEdge(TABLES.get(2), TABLES.get(7));
|
||||
deleteEdge(TABLES.get(6), TABLES.get(7));
|
||||
}
|
||||
|
||||
public Edge getEdge(Table from, Table to) {
|
||||
return getEdge(from.getId(), to.getId(), null);
|
||||
}
|
||||
@ -655,6 +770,17 @@ public class LineageResourceTest extends OpenMetadataApplicationTest {
|
||||
return lineage;
|
||||
}
|
||||
|
||||
public Map<String, Object> getDataQualityLineage(
|
||||
Map<String, String> queryParams, Map<String, String> authHeaders)
|
||||
throws HttpResponseException {
|
||||
WebTarget target = getResource("lineage/getDataQualityLineage");
|
||||
for (Map.Entry<String, String> entry : queryParams.entrySet()) {
|
||||
target = target.queryParam(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
return TestUtils.get(target, Map.class, authHeaders);
|
||||
}
|
||||
|
||||
public void assertEdge(EntityLineage lineage, Edge expectedEdge, boolean downstream) {
|
||||
if (downstream) {
|
||||
assertTrue(lineage.getDownstreamEdges().contains(expectedEdge));
|
||||
|
Loading…
x
Reference in New Issue
Block a user