diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResultRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResultRepository.java index 039ec3a11de..d9c937fd807 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResultRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResultRepository.java @@ -4,6 +4,7 @@ import static org.openmetadata.schema.type.EventType.ENTITY_DELETED; import static org.openmetadata.service.Entity.TEST_CASE; import static org.openmetadata.service.Entity.TEST_DEFINITION; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -223,6 +224,18 @@ public class TestCaseResultRepository extends EntityTimeSeriesRepository testCaseResultResults = + listLatestFromSearch( + EntityUtil.Fields.EMPTY_FIELDS, + new SearchListFilter().addQueryParam("entityFQN", fqn), + "testCaseFQN.keyword", + null); + return testCaseResultResults.getData().stream() + .anyMatch( + testCaseResult -> testCaseResult.getTestCaseStatus().equals(TestCaseStatus.Failed)); + } + private TestCaseResult getTestCaseResult( CreateTestCaseResult createTestCaseResults, TestCase testCase) { RestUtil.validateTimestampMilliseconds(createTestCaseResults.getTimestamp()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResultResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResultResource.java index 30d8b444eb4..8561d314b3f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResultResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResultResource.java @@ -219,6 +219,11 @@ public class TestCaseResultResource schema = @Schema(type = "string")) @QueryParam("testSuiteId") String testSuiteId, + @Parameter( + description = "Entity FQN the test case belongs to", + schema = @Schema(type = "string")) + @QueryParam("entityFQN") + String entityFQN, @Parameter( description = "Get the latest test case result for each test case -- requires `testSuiteId`. Offset and limit are ignored", @@ -263,7 +268,7 @@ public class TestCaseResultResource @QueryParam("q") String q) throws IOException { - if (latest.equals("true") && testSuiteId == null) { + if (latest.equals("true") && (testSuiteId == null && entityFQN == null)) { throw new IllegalArgumentException("latest=true requires testSuiteId"); } EntityUtil.Fields fields = repository.getFields(fieldParams); @@ -278,6 +283,7 @@ public class TestCaseResultResource .ifPresent(tcf -> searchListFilter.addQueryParam("testCaseFQN", tcf)); Optional.ofNullable(testSuiteId) .ifPresent(tsi -> searchListFilter.addQueryParam("testSuiteId", tsi)); + Optional.ofNullable(entityFQN).ifPresent(ef -> searchListFilter.addQueryParam("entityFQN", ef)); Optional.ofNullable(type).ifPresent(t -> searchListFilter.addQueryParam("testCaseType", t)); Optional.ofNullable(dataQualityDimension) .ifPresent(dqd -> searchListFilter.addQueryParam("dataQualityDimension", dqd)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java index 2f1dd321201..2a751153dfd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java @@ -208,6 +208,39 @@ public class LineageResource { .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } + @GET + @Path("/getDataQualityLineage") + @Operation( + operationId = "searchDataQualityLineage", + summary = "Search Data Quality lineage", + responses = { + @ApiResponse( + responseCode = "200", + description = "search response", + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = SearchResponse.class))) + }) + public Response searchDataQualityLineage( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "fqn") @QueryParam("fqn") String fqn, + @Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth, + @Parameter( + description = + "Elasticsearch query that will be combined with the query_string query generator from the `query` argument") + @QueryParam("query_filter") + String queryFilter, + @Parameter(description = "Filter documents by deleted param. By default deleted is false") + @QueryParam("includeDeleted") + boolean deleted) + throws IOException { + + return Entity.getSearchRepository() + .searchDataQualityLineage(fqn, upstreamDepth, queryFilter, deleted); + } + @GET @Path("/export") @Produces(MediaType.TEXT_PLAIN) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index 74f77ab7161..ad08f8edb64 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -136,6 +136,9 @@ public interface SearchClient { String entityType) throws IOException; + Response searchDataQualityLineage( + String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException; + /* Used for listing knowledge page hierarchy for a given parent and page type, used in Elastic/Open SearchClientExtension */ diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchListFilter.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchListFilter.java index c3e6bb22d88..bf7580aaf87 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchListFilter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchListFilter.java @@ -177,11 +177,7 @@ public class SearchListFilter extends Filter { if (entityFQN != null) { conditions.add( includeAllTests - ? String.format( - "{\"bool\":{\"should\": [" - + "{\"prefix\": {\"entityFQN\": \"%s%s\"}}," - + "{\"term\": {\"entityFQN\": \"%s\"}}]}}", - escapeDoubleQuotes(entityFQN), Entity.SEPARATOR, escapeDoubleQuotes(entityFQN)) + ? getTestCaseForEntityCondition(entityFQN, "entityFQN") : String.format( "{\"term\": {\"entityFQN\": \"%s\"}}", escapeDoubleQuotes(entityFQN))); } @@ -218,6 +214,7 @@ public class SearchListFilter extends Filter { private String getTestCaseResultCondition() { ArrayList conditions = new ArrayList<>(); + String entityFQN = getQueryParam("entityFQN"); String dataQualityDimension = getQueryParam("dataQualityDimension"); String type = getQueryParam("testCaseType"); String startTimestamp = getQueryParam("startTimestamp"); @@ -226,6 +223,9 @@ public class SearchListFilter extends Filter { String testCaseStatus = getQueryParam("testCaseStatus"); String testSuiteId = getQueryParam("testSuiteId"); + if (entityFQN != null) + conditions.add(getTestCaseForEntityCondition(entityFQN, "testCase.entityFQN")); + if (startTimestamp != null && endTimestamp != null) { conditions.add(getTimestampFilter("timestamp", "gte", Long.parseLong(startTimestamp))); conditions.add(getTimestampFilter("timestamp", "lte", Long.parseLong(endTimestamp))); @@ -294,6 +294,18 @@ public class SearchListFilter extends Filter { }; } + private String getTestCaseForEntityCondition(String entityFQN, String field) { + return String.format( + "{\"bool\":{\"should\": [" + + "{\"prefix\": {\"%s\": \"%s%s\"}}," + + "{\"term\": {\"%s\": \"%s\"}}]}}", + field, + escapeDoubleQuotes(entityFQN), + Entity.SEPARATOR, + field, + escapeDoubleQuotes(entityFQN)); + } + private String getDataQualityDimensionCondition(String dataQualityDimension, String field) { return String.format("{\"term\": {\"%s\": \"%s\"}}", field, dataQualityDimension); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 028f7483c4d..ea29b3e17d4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -798,6 +798,11 @@ public class SearchRepository { fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } + public Response searchDataQualityLineage( + String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException { + return searchClient.searchDataQualityLineage(fqn, upstreamDepth, queryFilter, deleted); + } + public Map searchLineageForExport( String fqn, int upstreamDepth, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 7acefe87802..2db930e8889 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -154,6 +154,7 @@ import org.openmetadata.sdk.exception.SearchIndexNotFoundException; import org.openmetadata.service.Entity; import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface; import org.openmetadata.service.jdbi3.DataInsightChartRepository; +import org.openmetadata.service.jdbi3.TestCaseResultRepository; import org.openmetadata.service.search.SearchClient; import org.openmetadata.service.search.SearchIndexUtils; import org.openmetadata.service.search.SearchRequest; @@ -361,21 +362,7 @@ public class ElasticSearchClient implements SearchClient { buildSearchRBACQuery(subjectContext, searchSourceBuilder); // Add Filter - if (!nullOrEmpty(request.getQueryFilter()) && !request.getQueryFilter().equals("{}")) { - try { - XContentParser filterParser = - XContentType.JSON - .xContent() - .createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, request.getQueryFilter()); - QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query(); - BoolQueryBuilder newQuery = - QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); - searchSourceBuilder.query(newQuery); - } catch (Exception ex) { - LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); - } - } + buildSearchSourceFilter(request.getQueryFilter(), searchSourceBuilder); if (!nullOrEmpty(request.getPostFilter())) { try { @@ -757,6 +744,18 @@ public class ElasticSearchClient implements SearchClient { return Response.status(OK).entity(responseMap).build(); } + @Override + public Response searchDataQualityLineage( + String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException { + Map responseMap = new HashMap<>(); + Set> edges = new HashSet<>(); + Set> nodes = new HashSet<>(); + searchDataQualityLineage(fqn, upstreamDepth, queryFilter, deleted, edges, nodes); + responseMap.put("edges", edges); + responseMap.put("nodes", nodes); + return Response.status(OK).entity(responseMap).build(); + } + private void getLineage( String fqn, int depth, @@ -782,21 +781,7 @@ public class ElasticSearchClient implements SearchClient { .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))) .must(QueryBuilders.termQuery("deleted", deleted))); } - if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) { - try { - XContentParser filterParser = - XContentType.JSON - .xContent() - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter); - es.org.elasticsearch.index.query.QueryBuilder filter = - SearchSourceBuilder.fromXContent(filterParser).query(); - es.org.elasticsearch.index.query.BoolQueryBuilder newQuery = - QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); - searchSourceBuilder.query(newQuery); - } catch (Exception ex) { - LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); - } - } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); searchRequest.source(searchSourceBuilder.size(1000)); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); for (var hit : searchResponse.getHits().getHits()) { @@ -825,6 +810,141 @@ public class ElasticSearchClient implements SearchClient { } } + private void searchDataQualityLineage( + String fqn, + int upstreamDepth, + String queryFilter, + boolean deleted, + Set> edges, + Set> nodes) + throws IOException { + Map> allNodes = new HashMap<>(); + Map>> allEdges = new HashMap<>(); + Set nodesWithFailures = new HashSet<>(); + + collectNodesAndEdges( + fqn, + upstreamDepth, + queryFilter, + deleted, + allEdges, + allNodes, + nodesWithFailures, + new HashSet<>()); + for (String nodeWithFailure : nodesWithFailures) { + traceBackDQLineage( + nodeWithFailure, nodesWithFailures, allEdges, allNodes, nodes, edges, new HashSet<>()); + } + } + + private void collectNodesAndEdges( + String fqn, + int upstreamDepth, + String queryFilter, + boolean deleted, + Map>> allEdges, + Map> allNodes, + Set nodesWithFailure, + Set processedNode) + throws IOException { + TestCaseResultRepository testCaseResultRepository = new TestCaseResultRepository(); + if (upstreamDepth <= 0 || processedNode.contains(fqn)) { + return; + } + processedNode.add(fqn); + SearchResponse searchResponse = performLineageSearch(fqn, queryFilter, deleted); + Optional optionalDocs = + JsonUtils.readJsonAtPath(searchResponse.toString(), "$.hits.hits[*]._source", List.class); + + if (optionalDocs.isPresent()) { + List> docs = (List>) optionalDocs.get(); + for (Map doc : docs) { + String nodeId = doc.get("id").toString(); + allNodes.put(nodeId, doc); + if (testCaseResultRepository.hasTestCaseFailure(doc.get("fullyQualifiedName").toString())) { + nodesWithFailure.add(nodeId); + } + Optional optionalLineageList = + JsonUtils.readJsonAtPath(JsonUtils.pojoToJson(doc), "$.lineage", List.class); + if (optionalLineageList.isPresent()) { + List> lineageList = + (List>) optionalLineageList.get(); + for (Map lineage : lineageList) { + Map fromEntity = (Map) lineage.get("fromEntity"); + String fromEntityId = fromEntity.get("id"); + allEdges.computeIfAbsent(fromEntityId, k -> new ArrayList<>()).add(lineage); + collectNodesAndEdges( + fromEntity.get("fqn"), + upstreamDepth - 1, + queryFilter, + deleted, + allEdges, + allNodes, + nodesWithFailure, + processedNode); + } + } + } + } + } + + private void traceBackDQLineage( + String nodeFailureId, + Set nodesWithFailures, + Map>> allEdges, + Map> allNodes, + Set> nodes, + Set> edges, + Set processedNodes) { + if (processedNodes.contains(nodeFailureId)) { + return; + } + + processedNodes.add(nodeFailureId); + if (nodesWithFailures.contains(nodeFailureId)) { + Map node = allNodes.get(nodeFailureId); + node.keySet().removeAll(FIELDS_TO_REMOVE); + node.remove("lineage"); + nodes.add(allNodes.get(nodeFailureId)); + } + List> edgesForNode = allEdges.get(nodeFailureId); + if (edgesForNode != null) { + for (Map edge : edgesForNode) { + Map fromEntity = (Map) edge.get("fromEntity"); + String fromEntityId = fromEntity.get("id"); + if (!fromEntityId.equals(nodeFailureId)) continue; + Map toEntity = (Map) edge.get("toEntity"); + edges.add(edge); + traceBackDQLineage( + toEntity.get("id"), + nodesWithFailures, + allEdges, + allNodes, + nodes, + edges, + processedNodes); + } + } + } + + private SearchResponse performLineageSearch(String fqn, String queryFilter, boolean deleted) + throws IOException { + es.org.elasticsearch.action.search.SearchRequest searchRequest = + new es.org.elasticsearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must( + QueryBuilders.termQuery( + "lineage.toEntity.fqnHash.keyword", FullyQualifiedName.buildHash(fqn))) + .must(QueryBuilders.termQuery("deleted", !nullOrEmpty(deleted) && deleted))); + + buildSearchSourceFilter(queryFilter, searchSourceBuilder); + searchRequest.source(searchSourceBuilder.size(1000)); + return client.search(searchRequest, RequestOptions.DEFAULT); + } + private Map searchPipelineLineage( String fqn, int upstreamDepth, @@ -850,21 +970,7 @@ public class ElasticSearchClient implements SearchClient { .must(boolQueryBuilder) .must(QueryBuilders.termQuery("deleted", deleted))); } - if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) { - try { - XContentParser filterParser = - XContentType.JSON - .xContent() - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter); - es.org.elasticsearch.index.query.QueryBuilder filter = - SearchSourceBuilder.fromXContent(filterParser).query(); - es.org.elasticsearch.index.query.BoolQueryBuilder newQuery = - QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); - searchSourceBuilder.query(newQuery); - } catch (Exception ex) { - LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); - } - } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); for (var hit : searchResponse.getHits().getHits()) { @@ -2003,21 +2109,7 @@ public class ElasticSearchClient implements SearchClient { searchSourceBuilder.query(searchQueryFiler).fetchSource(false); - if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) { - try { - XContentParser filterParser = - XContentType.JSON - .xContent() - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter); - es.org.elasticsearch.index.query.QueryBuilder filter = - SearchSourceBuilder.fromXContent(filterParser).query(); - es.org.elasticsearch.index.query.BoolQueryBuilder newQuery = - QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); - searchSourceBuilder.query(newQuery); - } catch (Exception ex) { - LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); - } - } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); return searchSourceBuilder; } @@ -2341,4 +2433,22 @@ public class ElasticSearchClient implements SearchClient { } } } + + private static void buildSearchSourceFilter( + String queryFilter, SearchSourceBuilder searchSourceBuilder) { + if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) { + try { + XContentParser filterParser = + XContentType.JSON + .xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter); + QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query(); + BoolQueryBuilder newQuery = + QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); + searchSourceBuilder.query(newQuery); + } catch (Exception ex) { + LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); + } + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 6e1b3551b93..0d383edc09a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -75,6 +75,7 @@ import org.openmetadata.sdk.exception.SearchIndexNotFoundException; import org.openmetadata.service.Entity; import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface; import org.openmetadata.service.jdbi3.DataInsightChartRepository; +import org.openmetadata.service.jdbi3.TestCaseResultRepository; import org.openmetadata.service.search.SearchClient; import org.openmetadata.service.search.SearchIndexUtils; import org.openmetadata.service.search.SearchRequest; @@ -353,23 +354,7 @@ public class OpenSearchClient implements SearchClient { buildSearchRBACQuery(subjectContext, searchSourceBuilder); // Add Query Filter - if (!nullOrEmpty(request.getQueryFilter()) && !request.getQueryFilter().equals("{}")) { - try { - XContentParser filterParser = - XContentType.JSON - .xContent() - .createParser( - X_CONTENT_REGISTRY, - LoggingDeprecationHandler.INSTANCE, - request.getQueryFilter()); - QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query(); - BoolQueryBuilder newQuery = - QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); - searchSourceBuilder.query(newQuery); - } catch (Exception ex) { - LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); - } - } + buildSearchSourceFilter(request.getQueryFilter(), searchSourceBuilder); if (!nullOrEmpty(request.getPostFilter())) { try { @@ -760,6 +745,18 @@ public class OpenSearchClient implements SearchClient { return Response.status(OK).entity(responseMap).build(); } + @Override + public Response searchDataQualityLineage( + String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException { + Map responseMap = new HashMap<>(); + Set> edges = new HashSet<>(); + Set> nodes = new HashSet<>(); + searchDataQualityLineage(fqn, upstreamDepth, queryFilter, deleted, edges, nodes); + responseMap.put("edges", edges); + responseMap.put("nodes", nodes); + return Response.status(OK).entity(responseMap).build(); + } + private void getLineage( String fqn, int depth, @@ -785,20 +782,8 @@ public class OpenSearchClient implements SearchClient { .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))) .must(QueryBuilders.termQuery("deleted", deleted))); } - if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) { - try { - XContentParser filterParser = - XContentType.JSON - .xContent() - .createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, queryFilter); - QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query(); - BoolQueryBuilder newQuery = - QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); - searchSourceBuilder.query(newQuery); - } catch (Exception ex) { - LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); - } - } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); + searchRequest.source(searchSourceBuilder.size(1000)); os.org.opensearch.action.search.SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); @@ -828,6 +813,127 @@ public class OpenSearchClient implements SearchClient { } } + private void searchDataQualityLineage( + String fqn, + int upstreamDepth, + String queryFilter, + boolean deleted, + Set> edges, + Set> nodes) + throws IOException { + Map> allNodes = new HashMap<>(); + Map>> allEdges = new HashMap<>(); + Set nodesWithFailures = new HashSet<>(); + + collectNodesAndEdges( + fqn, + upstreamDepth, + queryFilter, + deleted, + allEdges, + allNodes, + nodesWithFailures, + new HashSet<>()); + for (String nodeWithFailure : nodesWithFailures) { + traceBackDQLineage(nodeWithFailure, allEdges, allNodes, nodes, edges, new HashSet<>()); + } + } + + private void collectNodesAndEdges( + String fqn, + int upstreamDepth, + String queryFilter, + boolean deleted, + Map>> allEdges, + Map> allNodes, + Set nodesWithFailure, + Set processedNode) + throws IOException { + TestCaseResultRepository testCaseResultRepository = new TestCaseResultRepository(); + if (upstreamDepth <= 0 || processedNode.contains(fqn)) { + return; + } + processedNode.add(fqn); + SearchResponse searchResponse = performLineageSearch(fqn, queryFilter, deleted); + Optional optionalDocs = + JsonUtils.readJsonAtPath(searchResponse.toString(), "$.hits.hits[*]._source", List.class); + + if (optionalDocs.isPresent()) { + List> docs = (List>) optionalDocs.get(); + for (Map doc : docs) { + String nodeId = doc.get("id").toString(); + allNodes.put(nodeId, doc); + if (testCaseResultRepository.hasTestCaseFailure(doc.get("fullyQualifiedName").toString())) { + nodesWithFailure.add(nodeId); + } + Optional optionalLineageList = + JsonUtils.readJsonAtPath(JsonUtils.pojoToJson(doc), "$.lineage", List.class); + if (optionalLineageList.isPresent()) { + List> lineageList = + (List>) optionalLineageList.get(); + for (Map lineage : lineageList) { + Map fromEntity = (Map) lineage.get("fromEntity"); + String fromEntityId = fromEntity.get("id"); + allEdges.computeIfAbsent(fromEntityId, k -> new ArrayList<>()).add(lineage); + collectNodesAndEdges( + fromEntity.get("fqn"), + upstreamDepth - 1, + queryFilter, + deleted, + allEdges, + allNodes, + nodesWithFailure, + processedNode); + } + } + } + } + } + + private void traceBackDQLineage( + String nodeFailureId, + Map>> allEdges, + Map> allNodes, + Set> nodes, + Set> edges, + Set processedNodes) { + if (processedNodes.contains(nodeFailureId)) { + return; + } + + processedNodes.add(nodeFailureId); + nodes.add(allNodes.get(nodeFailureId)); + List> edgesForNode = allEdges.get(nodeFailureId); + if (edgesForNode != null) { + for (Map edge : edgesForNode) { + Map fromEntity = (Map) edge.get("fromEntity"); + String fromEntityId = fromEntity.get("id"); + if (!fromEntityId.equals(nodeFailureId)) continue; // skip if the edge is from the node + Map toEntity = (Map) edge.get("toEntity"); + edges.add(edge); + traceBackDQLineage(toEntity.get("id"), allEdges, allNodes, nodes, edges, processedNodes); + } + } + } + + private SearchResponse performLineageSearch(String fqn, String queryFilter, boolean deleted) + throws IOException { + os.org.opensearch.action.search.SearchRequest searchRequest = + new os.org.opensearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must( + QueryBuilders.termQuery( + "lineage.fromEntity.fqnHash.keyword", FullyQualifiedName.buildHash(fqn))) + .must(QueryBuilders.termQuery("deleted", !nullOrEmpty(deleted) && deleted))); + + buildSearchSourceFilter(queryFilter, searchSourceBuilder); + searchRequest.source(searchSourceBuilder.size(1000)); + return client.search(searchRequest, RequestOptions.DEFAULT); + } + private Map searchPipelineLineage( String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) throws IOException { @@ -850,20 +956,8 @@ public class OpenSearchClient implements SearchClient { .must(boolQueryBuilder) .must(QueryBuilders.termQuery("deleted", deleted))); } - if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) { - try { - XContentParser filterParser = - XContentType.JSON - .xContent() - .createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, queryFilter); - QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query(); - BoolQueryBuilder newQuery = - QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); - searchSourceBuilder.query(newQuery); - } catch (Exception ex) { - LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); - } - } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); + searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); for (var hit : searchResponse.getHits().getHits()) { @@ -1970,20 +2064,7 @@ public class OpenSearchClient implements SearchClient { searchSourceBuilder.query(searchQueryFiler).fetchSource(false); - if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) { - try { - XContentParser filterParser = - XContentType.JSON - .xContent() - .createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, queryFilter); - QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query(); - BoolQueryBuilder newQuery = - QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); - searchSourceBuilder.query(newQuery); - } catch (Exception ex) { - LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); - } - } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); return searchSourceBuilder; } @@ -2305,4 +2386,22 @@ public class OpenSearchClient implements SearchClient { } } } + + private static void buildSearchSourceFilter( + String queryFilter, SearchSourceBuilder searchSourceBuilder) { + if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) { + try { + XContentParser filterParser = + XContentType.JSON + .xContent() + .createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, queryFilter); + QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query(); + BoolQueryBuilder newQuery = + QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); + searchSourceBuilder.query(newQuery); + } catch (Exception ex) { + LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); + } + } + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java index 14bd2bdd5e2..83eaa7add77 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/OpenMetadataApplicationTest.java @@ -236,8 +236,7 @@ public abstract class OpenMetadataApplicationTest { authenticationConfiguration, forceMigrations); // Initialize search repository - SearchRepository searchRepository = - new SearchRepository(config.getElasticSearchConfiguration()); + SearchRepository searchRepository = new SearchRepository(getEsConfig()); Entity.setSearchRepository(searchRepository); Entity.setCollectionDAO(jdbi.onDemand(CollectionDAO.class)); Entity.initializeRepositories(config, jdbi); @@ -280,20 +279,7 @@ public abstract class OpenMetadataApplicationTest { } private void createIndices() { - ElasticSearchConfiguration esConfig = new ElasticSearchConfiguration(); - esConfig - .withHost(HOST) - .withPort(ELASTIC_SEARCH_CONTAINER.getMappedPort(9200)) - .withUsername(ELASTIC_USER) - .withPassword(ELASTIC_PASSWORD) - .withScheme(ELASTIC_SCHEME) - .withConnectionTimeoutSecs(ELASTIC_CONNECT_TIMEOUT) - .withSocketTimeoutSecs(ELASTIC_SOCKET_TIMEOUT) - .withKeepAliveTimeoutSecs(ELASTIC_KEEP_ALIVE_TIMEOUT) - .withBatchSize(ELASTIC_BATCH_SIZE) - .withSearchIndexMappingLanguage(ELASTIC_SEARCH_INDEX_MAPPING_LANGUAGE) - .withClusterAlias(ELASTIC_SEARCH_CLUSTER_ALIAS) - .withSearchType(ELASTIC_SEARCH_TYPE); + ElasticSearchConfiguration esConfig = getEsConfig(); SearchRepository searchRepository = new SearchRepository(esConfig); LOG.info("creating indexes."); searchRepository.createIndexes(); @@ -364,4 +350,22 @@ public abstract class OpenMetadataApplicationTest { configOverrides.add(ConfigOverride.config("database.user", sqlContainer.getUsername())); configOverrides.add(ConfigOverride.config("database.password", sqlContainer.getPassword())); } + + private static ElasticSearchConfiguration getEsConfig() { + ElasticSearchConfiguration esConfig = new ElasticSearchConfiguration(); + esConfig + .withHost(HOST) + .withPort(ELASTIC_SEARCH_CONTAINER.getMappedPort(9200)) + .withUsername(ELASTIC_USER) + .withPassword(ELASTIC_PASSWORD) + .withScheme(ELASTIC_SCHEME) + .withConnectionTimeoutSecs(ELASTIC_CONNECT_TIMEOUT) + .withSocketTimeoutSecs(ELASTIC_SOCKET_TIMEOUT) + .withKeepAliveTimeoutSecs(ELASTIC_KEEP_ALIVE_TIMEOUT) + .withBatchSize(ELASTIC_BATCH_SIZE) + .withSearchIndexMappingLanguage(ELASTIC_SEARCH_INDEX_MAPPING_LANGUAGE) + .withClusterAlias(ELASTIC_SEARCH_CLUSTER_ALIAS) + .withSearchType(ELASTIC_SEARCH_TYPE); + return esConfig; + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/lineage/LineageResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/lineage/LineageResourceTest.java index be87e695b7a..dfb9a671ca4 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/lineage/LineageResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/lineage/LineageResourceTest.java @@ -18,6 +18,7 @@ import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.openmetadata.service.Entity.ADMIN_USER_NAME; import static org.openmetadata.service.exception.CatalogExceptionMessage.permissionNotAllowed; import static org.openmetadata.service.security.SecurityUtil.authHeaders; @@ -27,6 +28,7 @@ import static org.openmetadata.service.util.TestUtils.assertResponse; import java.io.IOException; import java.net.URISyntaxException; import java.net.URLEncoder; +import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -50,6 +52,9 @@ import org.openmetadata.schema.api.data.CreateMlModel; import org.openmetadata.schema.api.data.CreateTable; import org.openmetadata.schema.api.data.CreateTopic; import org.openmetadata.schema.api.lineage.AddLineage; +import org.openmetadata.schema.api.tests.CreateTestCase; +import org.openmetadata.schema.api.tests.CreateTestCaseResult; +import org.openmetadata.schema.api.tests.CreateTestSuite; import org.openmetadata.schema.entity.data.Container; import org.openmetadata.schema.entity.data.Dashboard; import org.openmetadata.schema.entity.data.DashboardDataModel; @@ -58,6 +63,10 @@ import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.entity.data.Topic; import org.openmetadata.schema.entity.teams.Role; import org.openmetadata.schema.entity.teams.User; +import org.openmetadata.schema.tests.TestCase; +import org.openmetadata.schema.tests.TestDefinition; +import org.openmetadata.schema.tests.TestSuite; +import org.openmetadata.schema.tests.type.TestCaseStatus; import org.openmetadata.schema.type.ColumnLineage; import org.openmetadata.schema.type.ContainerDataModel; import org.openmetadata.schema.type.Edge; @@ -71,6 +80,10 @@ import org.openmetadata.service.OpenMetadataApplicationTest; import org.openmetadata.service.resources.dashboards.DashboardResourceTest; import org.openmetadata.service.resources.databases.TableResourceTest; import org.openmetadata.service.resources.datamodels.DashboardDataModelResourceTest; +import org.openmetadata.service.resources.dqtests.TestCaseResourceTest; +import org.openmetadata.service.resources.dqtests.TestDefinitionResourceTest; +import org.openmetadata.service.resources.dqtests.TestSuiteResourceTest; +import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.resources.mlmodels.MlModelResourceTest; import org.openmetadata.service.resources.storages.ContainerResourceTest; import org.openmetadata.service.resources.teams.RoleResource; @@ -454,6 +467,108 @@ public class LineageResourceTest extends OpenMetadataApplicationTest { assertEquals(lineageDetails.getDescription(), edge.getLineageDetails().getDescription()); } + @Order(6) + @Test + void get_dataQualityLineage(TestInfo test) + throws IOException, URISyntaxException, ParseException { + TestSuiteResourceTest testSuiteResourceTest = new TestSuiteResourceTest(); + TestCaseResourceTest testCaseResourceTest = new TestCaseResourceTest(); + TestDefinitionResourceTest testDefinitionResourceTest = new TestDefinitionResourceTest(); + + addEdge(TABLES.get(4), TABLES.get(5)); + addEdge(TABLES.get(5), TABLES.get(6)); + addEdge(TABLES.get(0), TABLES.get(4)); + addEdge(TABLES.get(0), TABLES.get(2)); + addEdge(TABLES.get(2), TABLES.get(1)); + addEdge(TABLES.get(2), TABLES.get(7)); + addEdge(TABLES.get(6), TABLES.get(7)); + + Map queryParams = + Map.of("fqn", TABLES.get(7).getFullyQualifiedName(), "upstreamDepth", "3"); + Map lineage = getDataQualityLineage(queryParams, ADMIN_AUTH_HEADERS); + + // we have no failures in the lineage, hence no + assertEquals(0, ((List) lineage.get("nodes")).size()); + assertEquals(0, ((List) lineage.get("edges")).size()); + + // Create test cases with failures for table 4 and table 6 + TestDefinition testDefinition = + testDefinitionResourceTest.getEntityByName( + "columnValuesToBeNotNull", "owners", ADMIN_AUTH_HEADERS); + + CreateTestSuite createTestSuite4 = + testSuiteResourceTest.createRequest(test).withName(TABLES.get(4).getFullyQualifiedName()); + CreateTestSuite createTestSuite6 = + testSuiteResourceTest.createRequest(test).withName(TABLES.get(6).getFullyQualifiedName()); + TestSuite testSuite4 = + testSuiteResourceTest.createExecutableTestSuite(createTestSuite4, ADMIN_AUTH_HEADERS); + TestSuite testSuite6 = + testSuiteResourceTest.createExecutableTestSuite(createTestSuite6, ADMIN_AUTH_HEADERS); + + MessageParser.EntityLink TABLE4_LINK = + new MessageParser.EntityLink(Entity.TABLE, TABLES.get(4).getFullyQualifiedName()); + MessageParser.EntityLink TABLE6_LINK = + new MessageParser.EntityLink(Entity.TABLE, TABLES.get(6).getFullyQualifiedName()); + CreateTestCase create4 = testCaseResourceTest.createRequest(test); + CreateTestCase create6 = testCaseResourceTest.createRequest(test, 2); + create4 + .withEntityLink(TABLE4_LINK.getLinkString()) + .withTestSuite(testSuite4.getFullyQualifiedName()) + .withTestDefinition(testDefinition.getFullyQualifiedName()); + create6 + .withEntityLink(TABLE6_LINK.getLinkString()) + .withTestSuite(testSuite6.getFullyQualifiedName()) + .withTestDefinition(testDefinition.getFullyQualifiedName()); + TestCase testCase4 = testCaseResourceTest.createEntity(create4, ADMIN_AUTH_HEADERS); + TestCase testCase6 = testCaseResourceTest.createEntity(create6, ADMIN_AUTH_HEADERS); + + CreateTestCaseResult createTestCaseResult = + new CreateTestCaseResult() + .withResult("tested") + .withTestCaseStatus(TestCaseStatus.Failed) + .withTimestamp(TestUtils.dateToTimestamp(String.format("2024-09-11"))); + testCaseResourceTest.postTestCaseResult( + testCase4.getFullyQualifiedName(), createTestCaseResult, ADMIN_AUTH_HEADERS); + testCaseResourceTest.postTestCaseResult( + testCase6.getFullyQualifiedName(), createTestCaseResult, ADMIN_AUTH_HEADERS); + + lineage = getDataQualityLineage(queryParams, ADMIN_AUTH_HEADERS); + List> edges = ((List>) lineage.get("edges")); + List> nodes = ((List>) lineage.get("nodes")); + // We should have 2 nodes (4 and 6) and 3 edges (4->5, 5->6, 6->7) + assertEquals(3, edges.size()); + assertEquals(2, nodes.size()); + + assertTrue( + nodes.stream() + .allMatch( + n -> + TABLES.get(4).getId().toString().equals(n.get("id")) + || TABLES.get(6).getId().toString().equals(n.get("id")))); + // our lineage is 0 -> 4 -> 5 -> 6 -> 7 + for (Map edge : edges) { + Map toEntity = ((Map) edge.get("toEntity")); + Map fromEntity = ((Map) edge.get("fromEntity")); + if (toEntity.get("id").equals(TABLES.get(6).getId().toString())) { + assertEquals(TABLES.get(5).getId().toString(), fromEntity.get("id")); + } else if (fromEntity.get("id").equals(TABLES.get(4).getId().toString())) { + assertEquals(TABLES.get(5).getId().toString(), toEntity.get("id")); + } else if (fromEntity.get("id").equals(TABLES.get(6).getId().toString())) { + assertEquals(TABLES.get(7).getId().toString(), toEntity.get("id")); + } else { + fail(String.format("Unexpected edge: %s", edge)); + } + } + + deleteEdge(TABLES.get(4), TABLES.get(5)); + deleteEdge(TABLES.get(5), TABLES.get(6)); + deleteEdge(TABLES.get(0), TABLES.get(4)); + deleteEdge(TABLES.get(0), TABLES.get(2)); + deleteEdge(TABLES.get(2), TABLES.get(1)); + deleteEdge(TABLES.get(2), TABLES.get(7)); + deleteEdge(TABLES.get(6), TABLES.get(7)); + } + public Edge getEdge(Table from, Table to) { return getEdge(from.getId(), to.getId(), null); } @@ -655,6 +770,17 @@ public class LineageResourceTest extends OpenMetadataApplicationTest { return lineage; } + public Map getDataQualityLineage( + Map queryParams, Map authHeaders) + throws HttpResponseException { + WebTarget target = getResource("lineage/getDataQualityLineage"); + for (Map.Entry entry : queryParams.entrySet()) { + target = target.queryParam(entry.getKey(), entry.getValue()); + } + + return TestUtils.get(target, Map.class, authHeaders); + } + public void assertEdge(EntityLineage lineage, Edge expectedEdge, boolean downstream) { if (downstream) { assertTrue(lineage.getDownstreamEdges().contains(expectedEdge));