diff --git a/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java b/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java index 3c63d90e436..7e0da8b6ac4 100644 --- a/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java +++ b/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java @@ -33,7 +33,6 @@ import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.Enumeration; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -178,10 +177,6 @@ public final class CommonUtil { return Optional.ofNullable(list).orElse(Collections.emptyList()); } - public static Map collectionOrEmpty(Map input) { - return Optional.ofNullable(input).orElse(new HashMap<>()); - } - public static List listOrEmptyMutable(List list) { return nullOrEmpty(list) ? new ArrayList<>() : new ArrayList<>(list); } @@ -214,14 +209,6 @@ public final class CommonUtil { } } - public static List collectionOrDefault(List c, List defaultValue) { - if (nullOrEmpty(c)) { - return defaultValue; - } else { - return c; - } - } - public static String getResourceAsStream(ClassLoader loader, String file) throws IOException { return IOUtils.toString(Objects.requireNonNull(loader.getResourceAsStream(file)), UTF_8); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index dd3c6f05a88..9026c4f36da 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -14,8 +14,6 @@ package org.openmetadata.service.jdbi3; import static javax.ws.rs.core.Response.Status.OK; -import static org.openmetadata.common.utils.CommonUtil.collectionOrDefault; -import static org.openmetadata.common.utils.CommonUtil.nullOrDefault; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.csv.CsvUtil.addField; import static org.openmetadata.csv.EntityCsv.getCsvDocumentation; @@ -50,13 +48,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.sqlobject.transaction.Transaction; +import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.csv.CsvUtil; import org.openmetadata.schema.api.lineage.AddLineage; -import org.openmetadata.schema.api.lineage.EsLineageData; -import org.openmetadata.schema.api.lineage.LineageDirection; -import org.openmetadata.schema.api.lineage.RelationshipRef; -import org.openmetadata.schema.api.lineage.SearchLineageRequest; -import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.entity.data.APIEndpoint; import org.openmetadata.schema.entity.data.Container; import org.openmetadata.schema.entity.data.Dashboard; @@ -150,67 +144,84 @@ public class LineageRepository { private void addLineageToSearch( EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) { + IndexMapping sourceIndexMapping = + Entity.getSearchRepository().getIndexMapping(fromEntity.getType()); + String sourceIndexName = + sourceIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()); IndexMapping destinationIndexMapping = Entity.getSearchRepository().getIndexMapping(toEntity.getType()); String destinationIndexName = destinationIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()); - // For lineage from -> to (not stored) since the doc itself is the toEntity - EsLineageData lineageData = - buildEntityLineageData(fromEntity, toEntity, lineageDetails).withToEntity(null); + Map relationshipDetails = + buildRelationshipDetailsMap(fromEntity, toEntity, lineageDetails); + Pair from = new ImmutablePair<>("_id", fromEntity.getId().toString()); Pair to = new ImmutablePair<>("_id", toEntity.getId().toString()); - searchClient.updateLineage(destinationIndexName, to, lineageData); + searchClient.updateLineage(sourceIndexName, from, relationshipDetails); + searchClient.updateLineage(destinationIndexName, to, relationshipDetails); } - public static RelationshipRef buildEntityRefLineage(EntityReference entityRef) { - return new RelationshipRef() - .withId(entityRef.getId()) - .withType(entityRef.getType()) - .withFqn(entityRef.getFullyQualifiedName()) - .withFqnHash(FullyQualifiedName.buildHash(entityRef.getFullyQualifiedName())); + public static Map buildEntityRefMap(EntityReference entityRef) { + Map details = new HashMap<>(); + details.put("id", entityRef.getId().toString()); + details.put("type", entityRef.getType()); + details.put("fqn", entityRef.getFullyQualifiedName()); + details.put("fqnHash", FullyQualifiedName.buildHash(entityRef.getFullyQualifiedName())); + return details; } - public static EsLineageData buildEntityLineageData( + public static Map buildRelationshipDetailsMap( EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) { - EsLineageData lineageData = - new EsLineageData() - .withDocId(fromEntity.getId().toString() + "-->" + toEntity.getId().toString()) - .withFromEntity(buildEntityRefLineage(fromEntity)); + Map relationshipDetails = new HashMap<>(); + relationshipDetails.put( + "doc_id", fromEntity.getId().toString() + "-" + toEntity.getId().toString()); + relationshipDetails.put("fromEntity", buildEntityRefMap(fromEntity)); + relationshipDetails.put("toEntity", buildEntityRefMap(toEntity)); if (lineageDetails != null) { // Add Pipeline Details - addPipelineDetails(lineageData, lineageDetails.getPipeline()); - lineageData.setDescription(nullOrDefault(lineageDetails.getDescription(), null)); - lineageData.setColumns(collectionOrDefault(lineageDetails.getColumnsLineage(), null)); - lineageData.setSqlQuery(nullOrDefault(lineageDetails.getSqlQuery(), null)); - lineageData.setSource(nullOrDefault(lineageDetails.getSource().value(), null)); + addPipelineDetails(relationshipDetails, lineageDetails.getPipeline()); + relationshipDetails.put( + "description", + CommonUtil.nullOrEmpty(lineageDetails.getDescription()) + ? null + : lineageDetails.getDescription()); + if (!CommonUtil.nullOrEmpty(lineageDetails.getColumnsLineage())) { + List> colummnLineageList = new ArrayList<>(); + for (ColumnLineage columnLineage : lineageDetails.getColumnsLineage()) { + colummnLineageList.add(JsonUtils.getMap(columnLineage)); + } + relationshipDetails.put("columns", colummnLineageList); + } + relationshipDetails.put( + "sqlQuery", + CommonUtil.nullOrEmpty(lineageDetails.getSqlQuery()) + ? null + : lineageDetails.getSqlQuery()); + relationshipDetails.put( + "source", + CommonUtil.nullOrEmpty(lineageDetails.getSource()) ? null : lineageDetails.getSource()); } - return lineageData; + return relationshipDetails; } - public static void addPipelineDetails(EsLineageData lineageData, EntityReference pipelineRef) { - if (nullOrEmpty(pipelineRef)) { - lineageData.setPipeline(null); + public static void addPipelineDetails( + Map relationshipDetails, EntityReference pipelineRef) { + if (CommonUtil.nullOrEmpty(pipelineRef)) { + relationshipDetails.put(PIPELINE, JsonUtils.getMap(null)); } else { - Pair> pipelineOrStoredProcedure = - getPipelineOrStoredProcedure(pipelineRef, List.of("changeDescription")); - lineageData.setPipelineEntityType(pipelineOrStoredProcedure.getLeft()); - lineageData.setPipeline(pipelineOrStoredProcedure.getRight()); + Map pipelineMap; + if (pipelineRef.getType().equals(PIPELINE)) { + pipelineMap = + JsonUtils.getMap( + Entity.getEntity(pipelineRef, "pipelineStatus,tags,owners", Include.ALL)); + } else { + pipelineMap = JsonUtils.getMap(Entity.getEntity(pipelineRef, "tags,owners", Include.ALL)); + } + pipelineMap.remove("changeDescription"); + relationshipDetails.put("pipelineEntityType", pipelineRef.getType()); + relationshipDetails.put(PIPELINE, pipelineMap); } } - public static Pair> getPipelineOrStoredProcedure( - EntityReference pipelineRef, List fieldsToRemove) { - Map pipelineMap; - if (pipelineRef.getType().equals(PIPELINE)) { - pipelineMap = - JsonUtils.getMap( - Entity.getEntity(pipelineRef, "pipelineStatus,tags,owners", Include.ALL)); - } else { - pipelineMap = JsonUtils.getMap(Entity.getEntity(pipelineRef, "tags,owners", Include.ALL)); - } - fieldsToRemove.forEach(pipelineMap::remove); - return Pair.of(pipelineRef.getType(), pipelineMap); - } - private String validateLineageDetails( EntityReference from, EntityReference to, LineageDetails details) { if (details == null) { @@ -238,21 +249,13 @@ public class LineageRepository { throws IOException { CsvDocumentation documentation = getCsvDocumentation("lineage"); List headers = documentation.getHeaders(); - // TODO: Fix the lineage we need booth the lineage - SearchLineageResult result = + Map lineageMap = Entity.getSearchRepository() .searchLineageForExport( - fqn, - upstreamDepth, - downstreamDepth, - queryFilter, - deleted, - entityType, - LineageDirection.UPSTREAM); + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); CsvFile csvFile = new CsvFile().withHeaders(headers); - // TODO: Fix This - // addRecords(csvFile, result); + addRecords(csvFile, lineageMap); return CsvUtil.formatCsv(csvFile); } @@ -275,19 +278,10 @@ public class LineageRepository { String entityType, boolean deleted) { try { - // TODO: fix Export to consider for both the nodes - SearchLineageResult response = + Response response = Entity.getSearchRepository() - .searchLineage( - new SearchLineageRequest() - .withFqn(fqn) - .withUpstreamDepth(upstreamDepth) - .withDownstreamDepth(downstreamDepth) - .withQueryFilter(queryFilter) - .withIncludeDeleted(deleted) - .withEntityType(entityType) - .withDirection(LineageDirection.UPSTREAM)); - String jsonResponse = JsonUtils.pojoToJson(response); + .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + String jsonResponse = JsonUtils.pojoToJson(response.getEntity()); JsonNode rootNode = JsonUtils.readTree(jsonResponse); Map entityMap = new HashMap<>(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java index 24ec96c9916..cd71dfe5fe8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java @@ -51,9 +51,6 @@ import javax.ws.rs.core.SecurityContext; import javax.ws.rs.core.UriInfo; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.lineage.AddLineage; -import org.openmetadata.schema.api.lineage.LineageDirection; -import org.openmetadata.schema.api.lineage.SearchLineageRequest; -import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.type.EntityLineage; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.MetadataOperation; @@ -195,7 +192,7 @@ public class LineageResource { mediaType = "application/json", schema = @Schema(implementation = SearchResponse.class))) }) - public SearchLineageResult searchLineage( + public Response searchLineage( @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description = "fqn") @QueryParam("fqn") String fqn, @@ -212,61 +209,9 @@ public class LineageResource { boolean deleted, @Parameter(description = "entity type") @QueryParam("type") String entityType) throws IOException { - return Entity.getSearchRepository() - .searchLineage( - new SearchLineageRequest() - .withFqn(fqn) - .withUpstreamDepth(upstreamDepth) - .withDownstreamDepth(downstreamDepth) - .withQueryFilter(queryFilter) - .withIncludeDeleted(deleted) - .withEntityType(entityType)); - } - @GET - @Path("/getLineage/{direction}") - @Operation( - operationId = "searchLineageWithDirection", - summary = "Search lineage with Direction", - responses = { - @ApiResponse( - responseCode = "200", - description = "search response", - content = - @Content( - mediaType = "application/json", - schema = @Schema(implementation = SearchResponse.class))) - }) - public SearchLineageResult searchLineageWithDirection( - @Context UriInfo uriInfo, - @Context SecurityContext securityContext, - @Parameter(description = "fqn") @QueryParam("fqn") String fqn, - @Parameter(description = "Direction", required = true, schema = @Schema(type = "string")) - @PathParam("direction") - LineageDirection direction, - @Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth, - @Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth") - int downstreamDepth, - @Parameter( - description = - "Elasticsearch query that will be combined with the query_string query generator from the `query` argument") - @QueryParam("query_filter") - String queryFilter, - @Parameter(description = "Filter documents by deleted param. By default deleted is false") - @QueryParam("includeDeleted") - boolean deleted, - @Parameter(description = "entity type") @QueryParam("type") String entityType) - throws IOException { return Entity.getSearchRepository() - .searchLineage( - new SearchLineageRequest() - .withFqn(fqn) - .withUpstreamDepth(upstreamDepth) - .withDownstreamDepth(downstreamDepth) - .withQueryFilter(queryFilter) - .withIncludeDeleted(deleted) - .withEntityType(entityType) - .withDirection(direction)); + .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } @GET @@ -334,6 +279,8 @@ public class LineageResource { boolean deleted, @Parameter(description = "entity type") @QueryParam("type") String entityType) throws IOException { + Entity.getSearchRepository() + .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index 85a2609a8d1..d42fab34926 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -7,7 +7,6 @@ import java.security.KeyStoreException; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.json.JsonArray; @@ -16,10 +15,6 @@ import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; import lombok.Getter; import org.apache.commons.lang3.tuple.Pair; -import org.openmetadata.schema.api.lineage.EsLineageData; -import org.openmetadata.schema.api.lineage.RelationshipRef; -import org.openmetadata.schema.api.lineage.SearchLineageRequest; -import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.api.search.SearchSettings; import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; @@ -33,7 +28,6 @@ import org.openmetadata.service.resources.settings.SettingsCache; import org.openmetadata.service.search.models.IndexMapping; import org.openmetadata.service.search.security.RBACConditionEvaluator; import org.openmetadata.service.security.policyevaluator.SubjectContext; -import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.util.SSLUtil; import os.org.opensearch.action.bulk.BulkRequest; @@ -41,9 +35,6 @@ import os.org.opensearch.action.bulk.BulkResponse; import os.org.opensearch.client.RequestOptions; public interface SearchClient { - String UPSTREAM_LINEAGE_FIELD = "upstreamLineage"; - String FQN_FIELD = "fullyQualifiedName"; - String ID_FIELD = "id"; ExecutorService asyncExecutor = Executors.newFixedThreadPool(1); String UPDATE = "update"; @@ -90,10 +81,10 @@ public interface SearchClient { "if (ctx._source.certification != null && ctx._source.certification.tagLabel != null) {ctx._source.certification.tagLabel.style = params.style; ctx._source.certification.tagLabel.description = params.description; ctx._source.certification.tagLabel.tagFQN = params.tagFQN; ctx._source.certification.tagLabel.name = params.name; }"; String REMOVE_LINEAGE_SCRIPT = - "for (int i = 0; i < ctx._source.upstreamLineage.length; i++) { if (ctx._source.upstreamLineage[i].doc_id == '%s') { ctx._source.upstreamLineage.remove(i) }}"; + "for (int i = 0; i < ctx._source.lineage.length; i++) { if (ctx._source.lineage[i].doc_id == '%s') { ctx._source.lineage.remove(i) }}"; String ADD_UPDATE_LINEAGE = - "boolean docIdExists = false; for (int i = 0; i < ctx._source.upstreamLineage.size(); i++) { if (ctx._source.upstreamLineage[i].doc_id.equalsIgnoreCase(params.lineageData.doc_id)) { ctx._source.upstreamLineage[i] = params.lineageData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.upstreamLineage.add(params.lineageData);}"; + "boolean docIdExists = false; for (int i = 0; i < ctx._source.lineage.size(); i++) { if (ctx._source.lineage[i].doc_id.equalsIgnoreCase(params.lineageData.doc_id)) { ctx._source.lineage[i] = params.lineageData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.lineage.add(params.lineageData);}"; // The script is used for updating the entityRelationship attribute of the entity in ES // It checks if any duplicate entry is present based on the doc_id and updates only if it is not @@ -197,15 +188,15 @@ public interface SearchClient { Response searchBySourceUrl(String sourceUrl) throws IOException; - SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException; - - SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest) + Response searchLineage( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) throws IOException; - SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest) throws IOException; - - SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest) throws IOException; - Response searchEntityRelationship( String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) throws IOException; @@ -217,10 +208,6 @@ public interface SearchClient { String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) throws IOException; - Map searchEntityByKey( - String indexAlias, String keyName, String keyValue, List fieldsToRemove) - throws IOException; - /* Used for listing knowledge page hierarchy for a given parent and page type, used in Elastic/Open SearchClientExtension */ @@ -233,7 +220,6 @@ public interface SearchClient { /* Used for listing knowledge page hierarchy for a given active Page and page type, used in Elastic/Open SearchClientExtension */ - @SuppressWarnings("unused") default ResultList listPageHierarchyForActivePage( String activeFqn, String pageType, int offset, int limit) { throw new CustomExceptionMessage( @@ -246,6 +232,15 @@ public interface SearchClient { Response.Status.NOT_IMPLEMENTED, NOT_IMPLEMENTED_ERROR_TYPE, NOT_IMPLEMENTED_METHOD); } + Map searchLineageInternal( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) + throws IOException; + Response searchByField(String fieldName, String fieldValue, String index) throws IOException; Response aggregate(String index, String fieldName, String value, String query) throws IOException; @@ -296,7 +291,7 @@ public interface SearchClient { Pair> updates); void updateLineage( - String indexName, Pair fieldAndValue, EsLineageData lineageData); + String indexName, Pair fieldAndValue, Map lineagaData); void updateEntityRelationship( String indexName, @@ -398,51 +393,5 @@ public interface SearchClient { && rbacConditionEvaluator != null; } - // default String getLineageDirection(LineageDirection direction, String entityType) { - // if (LineageDirection.UPSTREAM.equals(direction)) { - // if (Boolean.FALSE.equals(entityType.equals(Entity.PIPELINE)) && - // Boolean.FALSE.equals(entityType.equals(Entity.STORED_PROCEDURE))) { - // return - // }else{ - // - // } - // } else { - // - // } - // return ""; - // } - SearchHealthStatus getSearchHealthStatus() throws IOException; - - static RelationshipRef getRelationshipRef(Map entityMap) { - // This assumes these keys exists in the map, use it with caution - return new RelationshipRef() - .withId(UUID.fromString(entityMap.get("id").toString())) - .withType(entityMap.get("entityType").toString()) - .withFqn(entityMap.get("fullyQualifiedName").toString()) - .withFqnHash(FullyQualifiedName.buildHash(entityMap.get("fullyQualifiedName").toString())); - } - - static EsLineageData getEsLineageDataFromUpstreamLineage( - String fqn, List upstream) { - for (EsLineageData esLineageData : upstream) { - if (esLineageData.getFromEntity().getFqn().equals(fqn)) { - return esLineageData; - } - } - return null; - } - - static EsLineageData copyEsLineageData(EsLineageData data) { - return new EsLineageData() - .withDocId(data.getDocId()) - .withFromEntity(data.getFromEntity()) - .withToEntity(data.getToEntity()) - .withPipeline(data.getPipeline()) - .withSqlQuery(data.getSqlQuery()) - .withColumns(data.getColumns()) - .withDescription(data.getDescription()) - .withSource(data.getSource()) - .withPipelineEntityType(data.getPipelineEntityType()); - } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexUtils.java index 68dbea5060c..ba210001f73 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexUtils.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexUtils.java @@ -19,7 +19,6 @@ import org.openmetadata.schema.tests.Datum; import org.openmetadata.schema.tests.type.DataQualityReportMetadata; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.TagLabel; -import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.Utilities; public final class SearchIndexUtils { @@ -59,9 +58,8 @@ public final class SearchIndexUtils { if (value instanceof Map) { currentMap = (Map) value; } else if (value instanceof List) { - List list = (List>) value; - for (Object obj : list) { - Map item = JsonUtils.getMap(obj); + List> list = (List>) value; + for (Map item : list) { removeFieldByPath( item, Arrays.stream(pathElements, 1, pathElements.length).collect(Collectors.joining("."))); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 3a288d09805..943a23e593f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -60,9 +60,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.EntityTimeSeriesInterface; import org.openmetadata.schema.analytics.ReportData; -import org.openmetadata.schema.api.lineage.LineageDirection; -import org.openmetadata.schema.api.lineage.SearchLineageRequest; -import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.entity.classification.Tag; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; @@ -961,13 +958,16 @@ public class SearchRepository { return searchClient.searchBySourceUrl(sourceUrl); } - public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException { - return searchClient.searchLineage(lineageRequest); - } - - public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest) + public Response searchLineage( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) throws IOException { - return searchClient.searchLineage(lineageRequest); + return searchClient.searchLineage( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } public Response searchEntityRelationship( @@ -989,24 +989,16 @@ public class SearchRepository { fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); } - public SearchLineageResult searchLineageForExport( + public Map searchLineageForExport( String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted, - String entityType, - LineageDirection direction) + String entityType) throws IOException { - return searchClient.searchLineage( - new SearchLineageRequest() - .withFqn(fqn) - .withUpstreamDepth(upstreamDepth) - .withDownstreamDepth(downstreamDepth) - .withQueryFilter(queryFilter) - .withIncludeDeleted(deleted) - .withEntityType(entityType) - .withDirection(direction)); + return searchClient.searchLineageInternal( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } public Response searchByField(String fieldName, String fieldValue, String index) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 4220fd9487b..69ebeafd86e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -2,7 +2,6 @@ package org.openmetadata.service.search.elasticsearch; import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static javax.ws.rs.core.Response.Status.OK; -import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA; import static org.openmetadata.service.Entity.DATA_PRODUCT; @@ -153,11 +152,6 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.jetbrains.annotations.NotNull; import org.openmetadata.common.utils.CommonUtil; -import org.openmetadata.schema.api.lineage.EsLineageData; -import org.openmetadata.schema.api.lineage.LineageDirection; -import org.openmetadata.schema.api.lineage.RelationshipRef; -import org.openmetadata.schema.api.lineage.SearchLineageRequest; -import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; @@ -848,26 +842,64 @@ public class ElasticSearchClient implements SearchClient { } @Override - public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException { - SearchLineageResult result = - getDownStreamLineage(lineageRequest.withDirection(LineageDirection.DOWNSTREAM)); - SearchLineageResult upstreamLineage = - getUpstreamLineage(lineageRequest.withDirection(LineageDirection.UPSTREAM)); - - // Add All nodes and edges from upstream lineage to result - result.getNodes().putAll(upstreamLineage.getNodes()); - result.getUpstreamEdges().putAll(upstreamLineage.getUpstreamEdges()); - return result; + public Map searchLineageInternal( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) + throws IOException { + Map responseMap = new HashMap<>(); + Set> edges = new HashSet<>(); + Set> nodes = new HashSet<>(); + if (entityType.equalsIgnoreCase(Entity.PIPELINE) + || entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) { + return searchPipelineLineage( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, responseMap); + } + es.org.elasticsearch.action.search.SearchRequest searchRequest = + new es.org.elasticsearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + List sourceFieldsToExcludeCopy = new ArrayList<>(SOURCE_FIELDS_TO_EXCLUDE); + searchSourceBuilder.fetchSource(null, sourceFieldsToExcludeCopy.toArray(String[]::new)); + searchSourceBuilder.query( + QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); + searchRequest.source(searchSourceBuilder.size(1000)); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + Map tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + responseMap.put("entity", tempMap); + } + getLineage( + fqn, + downstreamDepth, + edges, + nodes, + queryFilter, + "lineage.fromEntity.fqnHash.keyword", + deleted); + getLineage( + fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqnHash.keyword", deleted); + responseMap.put("edges", edges); + responseMap.put("nodes", nodes); + return responseMap; } @Override - public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest) + public Response searchLineage( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) throws IOException { - if (lineageRequest.getDirection().equals(LineageDirection.UPSTREAM)) { - return getUpstreamLineage(lineageRequest); - } else { - return getDownStreamLineage(lineageRequest); - } + Map responseMap = + searchLineageInternal( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + return Response.status(OK).entity(responseMap).build(); } private void getEntityRelationship( @@ -1073,155 +1105,54 @@ public class ElasticSearchClient implements SearchClient { return Response.status(OK).entity(responseMap).build(); } - @Override - public Map searchEntityByKey( - String indexAlias, String keyName, String keyValue, List fieldsToRemove) - throws IOException { - es.org.elasticsearch.action.search.SearchRequest searchRequest = - getSearchRequest(indexAlias, null, keyName, keyValue, null, null, fieldsToRemove); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - int noOfHits = searchResponse.getHits().getHits().length; - if (noOfHits == 1) { - return new HashMap<>( - JsonUtils.getMap(searchResponse.getHits().getHits()[0].getSourceAsMap())); - } else { - throw new SearchException( - String.format( - "Issue in Search Entity By Key: %s, Value: %s , Number of Hits: %s", - keyName, keyValue, noOfHits)); - } - } - - private es.org.elasticsearch.action.search.SearchRequest getSearchRequest( - String indexAlias, + private void getLineage( + String fqn, + int depth, + Set> edges, + Set> nodes, String queryFilter, - String key, - String value, - Boolean deleted, - List fieldsToInclude, - List fieldsToRemove) { + String direction, + boolean deleted) + throws IOException { + if (depth <= 0) { + return; + } es.org.elasticsearch.action.search.SearchRequest searchRequest = new es.org.elasticsearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(indexAlias)); + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.fetchSource( - listOrEmpty(fieldsToInclude).toArray(String[]::new), - listOrEmpty(fieldsToRemove).toArray(String[]::new)); - searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(key, value))); - if (!CommonUtil.nullOrEmpty(deleted)) { + searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))); + if (CommonUtil.nullOrEmpty(deleted)) { searchSourceBuilder.query( QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(key, value)) + .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))) .must(QueryBuilders.termQuery("deleted", deleted))); } - buildSearchSourceFilter(queryFilter, searchSourceBuilder); searchRequest.source(searchSourceBuilder.size(1000)); - return searchRequest; - } - - public SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest) - throws IOException { - SearchLineageResult result = - new SearchLineageResult() - .withNodes(new HashMap<>()) - .withUpstreamEdges(new HashMap<>()) - .withDownstreamEdges(new HashMap<>()); - getUpstreamLineageRecursively(lineageRequest, result); - return result; - } - - private void getUpstreamLineageRecursively( - SearchLineageRequest lineageRequest, SearchLineageResult result) throws IOException { - if (lineageRequest.getUpstreamDepth() <= 0) { - return; - } - - Map entityMap = - searchEntityByKey( - GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE); - if (!entityMap.isEmpty()) { - result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap); - // Contains Upstream Lineage - if (entityMap.containsKey(UPSTREAM_LINEAGE_FIELD)) { - List upStreamEntities = - JsonUtils.readOrConvertValues( - entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class); - for (EsLineageData esLineageData : upStreamEntities) { - result - .getUpstreamEdges() - .putIfAbsent( - esLineageData.getDocId(), - esLineageData.withToEntity(SearchClient.getRelationshipRef(entityMap))); - String upstreamEntityId = esLineageData.getFromEntity().getId().toString(); - if (!result.getNodes().containsKey(upstreamEntityId)) { - SearchLineageRequest updatedRequest = - JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class) - .withUpstreamDepth(lineageRequest.getUpstreamDepth() - 1) - .withFqn(esLineageData.getFromEntity().getFqn()); - getUpstreamLineageRecursively(updatedRequest, result); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + List> lineage = + (List>) hit.getSourceAsMap().get("lineage"); + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + nodes.add(tempMap); + for (Map lin : lineage) { + Map fromEntity = (HashMap) lin.get("fromEntity"); + Map toEntity = (HashMap) lin.get("toEntity"); + if (direction.equalsIgnoreCase("lineage.fromEntity.fqnHash.keyword")) { + if (!edges.contains(lin) && fromEntity.get("fqn").equals(fqn)) { + edges.add(lin); + getLineage( + toEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); } - } - } - } - } - - public SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest) - throws IOException { - SearchLineageResult result = - new SearchLineageResult() - .withNodes(new HashMap<>()) - .withUpstreamEdges(new HashMap<>()) - .withDownstreamEdges(new HashMap<>()); - Map entityMap = - searchEntityByKey( - GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE); - if (!entityMap.isEmpty()) { - result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap); - getDownStreamRecursively(SearchClient.getRelationshipRef(entityMap), lineageRequest, result); - } - return result; - } - - private void getDownStreamRecursively( - RelationshipRef fromEntity, SearchLineageRequest lineageRequest, SearchLineageResult result) - throws IOException { - if (lineageRequest.getDownstreamDepth() <= 0) { - return; - } - - es.org.elasticsearch.action.search.SearchRequest searchDownstreamEntities = - getSearchRequest( - GLOBAL_SEARCH_ALIAS, - lineageRequest.getQueryFilter(), - "upstreamLineage.fromEntity.fqnHash", - FullyQualifiedName.buildHash(lineageRequest.getFqn()), - lineageRequest.getIncludeDeleted(), - null, - SOURCE_FIELDS_TO_EXCLUDE); - SearchResponse downstreamEntities = - client.search(searchDownstreamEntities, RequestOptions.DEFAULT); - for (SearchHit searchHit : downstreamEntities.getHits().getHits()) { - Map entityMap = new HashMap<>(searchHit.getSourceAsMap()); - if (!entityMap.isEmpty()) { - result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap); - List upStreamEntities = - JsonUtils.readOrConvertValues( - entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class); - EsLineageData esLineageData = - SearchClient.getEsLineageDataFromUpstreamLineage(fromEntity.getFqn(), upStreamEntities); - if (esLineageData != null) { - RelationshipRef toEntity = SearchClient.getRelationshipRef(entityMap); - result - .getDownstreamEdges() - .putIfAbsent(esLineageData.getDocId(), esLineageData.withToEntity(toEntity)); - String downStreamEntityId = toEntity.getId().toString(); - if (!result.getNodes().containsKey(downStreamEntityId)) { - SearchLineageRequest updatedRequest = - JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class) - .withDownstreamDepth(lineageRequest.getDownstreamDepth() - 1) - .withFqn(toEntity.getFqn()); - getDownStreamRecursively(toEntity, updatedRequest, result); + } else { + if (!edges.contains(lin) && toEntity.get("fqn").equals(fqn)) { + edges.add(lin); + getLineage( + fromEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); } } } @@ -1365,6 +1296,112 @@ public class ElasticSearchClient implements SearchClient { return client.search(searchRequest, RequestOptions.DEFAULT); } + private Map searchPipelineLineage( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + Map responseMap) + throws IOException { + Set> edges = new HashSet<>(); + Set> nodes = new HashSet<>(); + Object[] searchAfter = null; + long processedRecords = 0; + long totalRecords = -1; + while (totalRecords != processedRecords) { + es.org.elasticsearch.action.search.SearchRequest searchRequest = + new es.org.elasticsearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + es.org.elasticsearch.index.query.BoolQueryBuilder boolQueryBuilder = + QueryBuilders.boolQuery(); + boolQueryBuilder.should( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn))); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); + FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName"); + searchSourceBuilder.sort(sortBuilder); + searchSourceBuilder.query(boolQueryBuilder); + if (searchAfter != null) { + searchSourceBuilder.searchAfter(searchAfter); + } + if (CommonUtil.nullOrEmpty(deleted)) { + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(boolQueryBuilder) + .must(QueryBuilders.termQuery("deleted", deleted))); + } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); + searchRequest.source(searchSourceBuilder); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + + for (var hit : searchResponse.getHits().getHits()) { + List> lineage = + (List>) hit.getSourceAsMap().get("lineage"); + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + nodes.add(tempMap); + for (Map lin : lineage) { + HashMap fromEntity = (HashMap) lin.get("fromEntity"); + HashMap toEntity = (HashMap) lin.get("toEntity"); + HashMap pipeline = (HashMap) lin.get("pipeline"); + if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) { + edges.add(lin); + getLineage( + fromEntity.get("fqn"), + upstreamDepth, + edges, + nodes, + queryFilter, + "lineage.toEntity.fqn.keyword", + deleted); + getLineage( + toEntity.get("fqn"), + downstreamDepth, + edges, + nodes, + queryFilter, + "lineage.fromEntity.fqn.keyword", + deleted); + } + } + } + totalRecords = searchResponse.getHits().getTotalHits().value; + int currentHits = searchResponse.getHits().getHits().length; + processedRecords += currentHits; + if (currentHits > 0) { + searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues(); + } else { + searchAfter = null; + } + } + getLineage( + fqn, downstreamDepth, edges, nodes, queryFilter, "lineage.fromEntity.fqn.keyword", deleted); + getLineage( + fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted); + + // TODO: Fix this , this is hack + if (edges.isEmpty()) { + es.org.elasticsearch.action.search.SearchRequest searchRequestForEntity = + new es.org.elasticsearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilderForEntity = new SearchSourceBuilder(); + searchSourceBuilderForEntity.query( + QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); + searchRequestForEntity.source(searchSourceBuilderForEntity.size(1000)); + SearchResponse searchResponseForEntity = + client.search(searchRequestForEntity, RequestOptions.DEFAULT); + for (var hit : searchResponseForEntity.getHits().getHits()) { + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + tempMap.keySet().removeAll(FIELDS_TO_REMOVE); + responseMap.put("entity", tempMap); + } + } + responseMap.put("edges", edges); + responseMap.put("nodes", nodes); + return responseMap; + } + @Override public Response searchBySourceUrl(String sourceUrl) throws IOException { es.org.elasticsearch.action.search.SearchRequest searchRequest = @@ -2282,14 +2319,13 @@ public class ElasticSearchClient implements SearchClient { @Override public void updateLineage( - String indexName, Pair fieldAndValue, EsLineageData lineageData) { + String indexName, Pair fieldAndValue, Map lineageData) { if (isClientAvailable) { UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); updateByQueryRequest.setQuery( new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue()) .operator(Operator.AND)); - Map params = - Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData)); + Map params = Collections.singletonMap("lineageData", lineageData); Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params); updateByQueryRequest.setScript(script); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/APIEndpointIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/APIEndpointIndex.java index c86dc2c5c6a..b55e07b897a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/APIEndpointIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/APIEndpointIndex.java @@ -95,7 +95,7 @@ public class APIEndpointIndex implements SearchIndex { .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); Map commonAttributes = getCommonAttributesMap(apiEndpoint, Entity.API_ENDPOINT); doc.putAll(commonAttributes); - doc.put("upstreamLineage", SearchIndex.getLineageData(apiEndpoint.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(apiEndpoint.getEntityReference())); doc.put( "requestSchema", apiEndpoint.getRequestSchema() != null ? apiEndpoint.getRequestSchema() : null); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/ContainerIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/ContainerIndex.java index 186b8365c3f..0f25bcfada7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/ContainerIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/ContainerIndex.java @@ -63,7 +63,7 @@ public record ContainerIndex(Container container) implements ColumnIndex { doc.put("column_suggest", columnSuggest); doc.put("serviceType", container.getServiceType()); doc.put("fullPath", container.getFullPath()); - doc.put("upstreamLineage", SearchIndex.getLineageData(container.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(container.getEntityReference())); doc.put("service", getEntityWithDisplayName(container.getService())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardDataModelIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardDataModelIndex.java index e2051ea9098..26ced947d7a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardDataModelIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardDataModelIndex.java @@ -64,7 +64,7 @@ public record DashboardDataModelIndex(DashboardDataModel dashboardDataModel) doc.put("column_suggest", columnSuggest); doc.put("tier", parseTags.getTierTag()); doc.put("service", getEntityWithDisplayName(dashboardDataModel.getService())); - doc.put("upstreamLineage", SearchIndex.getLineageData(dashboardDataModel.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(dashboardDataModel.getEntityReference())); doc.put("domain", getEntityWithDisplayName(dashboardDataModel.getDomain())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardIndex.java index 5c2307b4e05..374e6bc546c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardIndex.java @@ -54,7 +54,7 @@ public class DashboardIndex implements SearchIndex { doc.put("data_model_suggest", dataModelSuggest); doc.put("service_suggest", serviceSuggest); doc.put("serviceType", dashboard.getServiceType()); - doc.put("upstreamLineage", SearchIndex.getLineageData(dashboard.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(dashboard.getEntityReference())); doc.put("service", getEntityWithDisplayName(dashboard.getService())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MetricIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MetricIndex.java index 99c36d37759..787caa5c1a8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MetricIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MetricIndex.java @@ -32,7 +32,7 @@ public class MetricIndex implements SearchIndex { public Map buildSearchIndexDocInternal(Map doc) { Map commonAttributes = getCommonAttributesMap(metric, Entity.METRIC); doc.putAll(commonAttributes); - doc.put("upstreamLineage", SearchIndex.getLineageData(metric.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(metric.getEntityReference())); return doc; } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MlModelIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MlModelIndex.java index 416b775fb54..98865b5f63a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MlModelIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MlModelIndex.java @@ -35,7 +35,7 @@ public class MlModelIndex implements SearchIndex { doc.put("tags", parseTags.getTags()); doc.put("tier", parseTags.getTierTag()); doc.put("serviceType", mlModel.getServiceType()); - doc.put("upstreamLineage", SearchIndex.getLineageData(mlModel.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(mlModel.getEntityReference())); doc.put("service", getEntityWithDisplayName(mlModel.getService())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/PipelineIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/PipelineIndex.java index ca293b4b989..9479ae8d947 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/PipelineIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/PipelineIndex.java @@ -51,7 +51,7 @@ public class PipelineIndex implements SearchIndex { doc.put("task_suggest", taskSuggest); doc.put("service_suggest", serviceSuggest); doc.put("serviceType", pipeline.getServiceType()); - doc.put("upstreamLineage", SearchIndex.getLineageData(pipeline.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(pipeline.getEntityReference())); doc.put("service", getEntityWithDisplayName(pipeline.getService())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchEntityIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchEntityIndex.java index 2ed56f73ad3..a04ebee91a0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchEntityIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchEntityIndex.java @@ -32,7 +32,7 @@ public record SearchEntityIndex(org.openmetadata.schema.entity.data.SearchIndex doc.put("tier", parseTags.getTierTag()); doc.put("service", getEntityWithDisplayName(searchIndex.getService())); doc.put("indexType", searchIndex.getIndexType()); - doc.put("upstreamLineage", SearchIndex.getLineageData(searchIndex.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(searchIndex.getEntityReference())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java index 79a29ba8fa0..902366cea84 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java @@ -6,7 +6,7 @@ import static org.openmetadata.schema.type.Include.ALL; import static org.openmetadata.service.Entity.FIELD_DESCRIPTION; import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME; import static org.openmetadata.service.Entity.getEntityByName; -import static org.openmetadata.service.jdbi3.LineageRepository.buildEntityLineageData; +import static org.openmetadata.service.jdbi3.LineageRepository.buildRelationshipDetailsMap; import static org.openmetadata.service.search.EntityBuilderConstant.DISPLAY_NAME_KEYWORD; import static org.openmetadata.service.search.EntityBuilderConstant.FIELD_DISPLAY_NAME_NGRAM; import static org.openmetadata.service.search.EntityBuilderConstant.FULLY_QUALIFIED_NAME; @@ -24,7 +24,6 @@ import java.util.Set; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.openmetadata.schema.EntityInterface; -import org.openmetadata.schema.api.lineage.EsLineageData; import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; @@ -43,7 +42,7 @@ import org.openmetadata.service.util.JsonUtils; public interface SearchIndex { Set DEFAULT_EXCLUDED_FIELDS = - Set.of("changeDescription", "upstreamLineage.pipeline.changeDescription", "connection"); + Set.of("changeDescription", "lineage.pipeline.changeDescription", "connection"); public static final SearchClient searchClient = Entity.getSearchRepository().getSearchClient(); default Map buildSearchIndexDoc() { @@ -150,25 +149,31 @@ public interface SearchIndex { return nullOrEmpty(entity.getDescription()) ? "INCOMPLETE" : "COMPLETE"; } - static List getLineageData(EntityReference entity) { - return new ArrayList<>( - getLineageDataFromRefs( - entity, - Entity.getCollectionDAO() - .relationshipDAO() - .findFrom(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal()))); - } - - static List getLineageDataFromRefs( - EntityReference entity, List records) { - List data = new ArrayList<>(); - for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : records) { + static List> getLineageData(EntityReference entity) { + List> data = new ArrayList<>(); + CollectionDAO dao = Entity.getCollectionDAO(); + List toRelationshipsRecords = + dao.relationshipDAO() + .findTo(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal()); + for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : toRelationshipsRecords) { EntityReference ref = Entity.getEntityReferenceById( entityRelationshipRecord.getType(), entityRelationshipRecord.getId(), Include.ALL); LineageDetails lineageDetails = JsonUtils.readValue(entityRelationshipRecord.getJson(), LineageDetails.class); - data.add(buildEntityLineageData(ref, entity, lineageDetails)); + data.add(buildRelationshipDetailsMap(entity, ref, lineageDetails)); + } + List fromRelationshipsRecords = + dao.relationshipDAO() + .findFrom(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal()); + for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : + fromRelationshipsRecords) { + EntityReference ref = + Entity.getEntityReferenceById( + entityRelationshipRecord.getType(), entityRelationshipRecord.getId(), Include.ALL); + LineageDetails lineageDetails = + JsonUtils.readValue(entityRelationshipRecord.getJson(), LineageDetails.class); + data.add(buildRelationshipDetailsMap(ref, entity, lineageDetails)); } return data; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java index d6fd404ffda..06394422581 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java @@ -30,7 +30,7 @@ public record StoredProcedureIndex(StoredProcedure storedProcedure) implements S ParseTags parseTags = new ParseTags(Entity.getEntityTags(Entity.STORED_PROCEDURE, storedProcedure)); doc.put("tags", parseTags.getTags()); - doc.put("upstreamLineage", SearchIndex.getLineageData(storedProcedure.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(storedProcedure.getEntityReference())); doc.put("tier", parseTags.getTierTag()); doc.put("service", getEntityWithDisplayName(storedProcedure.getService())); return doc; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java index 6a43bf3ceba..d4c44bb1f03 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java @@ -103,7 +103,7 @@ public record TableIndex(Table table) implements ColumnIndex { doc.put("schemaDefinition", table.getSchemaDefinition()); doc.put("service", getEntityWithDisplayName(table.getService())); doc.put("database", getEntityWithDisplayName(table.getDatabase())); - doc.put("upstreamLineage", SearchIndex.getLineageData(table.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(table.getEntityReference())); doc.put("entityRelationship", SearchIndex.populateEntityRelationshipData(table)); doc.put("databaseSchema", getEntityWithDisplayName(table.getDatabaseSchema())); return doc; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TopicIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TopicIndex.java index 9d9159a08e5..109c97217eb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TopicIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TopicIndex.java @@ -81,7 +81,7 @@ public class TopicIndex implements SearchIndex { doc.put("field_suggest", fieldSuggest); doc.put("service_suggest", serviceSuggest); doc.put("serviceType", topic.getServiceType()); - doc.put("upstreamLineage", SearchIndex.getLineageData(topic.getEntityReference())); + doc.put("lineage", SearchIndex.getLineageData(topic.getEntityReference())); doc.put("messageSchema", topic.getMessageSchema() != null ? topic.getMessageSchema() : null); doc.put("service", getEntityWithDisplayName(topic.getService())); return doc; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 392b8e37913..50a90e72486 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -2,7 +2,6 @@ package org.openmetadata.service.search.opensearch; import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static javax.ws.rs.core.Response.Status.OK; -import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA; import static org.openmetadata.service.Entity.DATA_PRODUCT; @@ -68,11 +67,6 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.jetbrains.annotations.NotNull; import org.openmetadata.common.utils.CommonUtil; -import org.openmetadata.schema.api.lineage.EsLineageData; -import org.openmetadata.schema.api.lineage.LineageDirection; -import org.openmetadata.schema.api.lineage.RelationshipRef; -import org.openmetadata.schema.api.lineage.SearchLineageRequest; -import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; @@ -845,26 +839,64 @@ public class OpenSearchClient implements SearchClient { return Response.status(OK).entity(response).build(); } - @Override - public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException { - SearchLineageResult result = - getDownStreamLineage(lineageRequest.withDirection(LineageDirection.DOWNSTREAM)); - SearchLineageResult upstreamLineage = - getDownStreamLineage(lineageRequest.withDirection(LineageDirection.UPSTREAM)); - - // Add All nodes and edges from upstream lineage to result - result.getNodes().putAll(upstreamLineage.getNodes()); - result.getUpstreamEdges().putAll(upstreamLineage.getUpstreamEdges()); - return result; + public Map searchLineageInternal( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) + throws IOException { + if (entityType.equalsIgnoreCase(Entity.PIPELINE) + || entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) { + return searchPipelineLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); + } + Map responseMap = new HashMap<>(); + Set> edges = new HashSet<>(); + Set> nodes = new HashSet<>(); + os.org.opensearch.action.search.SearchRequest searchRequest = + new os.org.opensearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + List sourceFieldsToExcludeCopy = new ArrayList<>(SOURCE_FIELDS_TO_EXCLUDE); + searchSourceBuilder.fetchSource(null, sourceFieldsToExcludeCopy.toArray(String[]::new)); + searchSourceBuilder.query( + QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); + searchRequest.source(searchSourceBuilder.size(1000)); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + tempMap.keySet().removeAll(FIELDS_TO_REMOVE); + responseMap.put("entity", tempMap); + } + getLineage( + fqn, + downstreamDepth, + edges, + nodes, + queryFilter, + "lineage.fromEntity.fqnHash.keyword", + deleted); + getLineage( + fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqnHash.keyword", deleted); + responseMap.put("edges", edges); + responseMap.put("nodes", nodes); + return responseMap; } - public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest) + @Override + public Response searchLineage( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) throws IOException { - if (lineageRequest.getDirection().equals(LineageDirection.UPSTREAM)) { - return getUpstreamLineage(lineageRequest); - } else { - return getDownStreamLineage(lineageRequest); - } + Map responseMap = + searchLineageInternal( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + return Response.status(OK).entity(responseMap).build(); } private void getEntityRelationship( @@ -1068,148 +1100,56 @@ public class OpenSearchClient implements SearchClient { return Response.status(OK).entity(responseMap).build(); } - @Override - public Map searchEntityByKey( - String indexAlias, String keyName, String keyValue, List fieldsToRemove) - throws IOException { - os.org.opensearch.action.search.SearchRequest searchRequest = - getSearchRequest(indexAlias, null, keyName, keyValue, null, null, fieldsToRemove); - os.org.opensearch.action.search.SearchResponse searchResponse = - client.search(searchRequest, RequestOptions.DEFAULT); - int noOfHits = searchResponse.getHits().getHits().length; - if (noOfHits == 1) { - return new HashMap<>( - JsonUtils.getMap(searchResponse.getHits().getHits()[0].getSourceAsMap())); - } else { - throw new SearchException( - String.format( - "Issue in Search Entity By Key: %s, Value: %s , Number of Hits: %s", - keyName, keyValue, noOfHits)); - } - } - - private os.org.opensearch.action.search.SearchRequest getSearchRequest( - String indexAlias, + private void getLineage( + String fqn, + int depth, + Set> edges, + Set> nodes, String queryFilter, - String key, - String value, - Boolean deleted, - List fieldsToInclude, - List fieldsToRemove) { + String direction, + boolean deleted) + throws IOException { + if (depth <= 0) { + return; + } os.org.opensearch.action.search.SearchRequest searchRequest = new os.org.opensearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(indexAlias)); + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.fetchSource( - listOrEmpty(fieldsToInclude).toArray(String[]::new), - listOrEmpty(fieldsToRemove).toArray(String[]::new)); - searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(key, value))); - if (!CommonUtil.nullOrEmpty(deleted)) { + searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))); + if (CommonUtil.nullOrEmpty(deleted)) { searchSourceBuilder.query( QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(key, value)) + .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))) .must(QueryBuilders.termQuery("deleted", deleted))); } - buildSearchSourceFilter(queryFilter, searchSourceBuilder); + searchRequest.source(searchSourceBuilder.size(1000)); - return searchRequest; - } - - public SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest) - throws IOException { - SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>()); - getUpstreamLineageRecursively(lineageRequest, result); - return result; - } - - private void getUpstreamLineageRecursively( - SearchLineageRequest lineageRequest, SearchLineageResult result) throws IOException { - if (lineageRequest.getUpstreamDepth() <= 0) { - return; - } - - Map entityMap = - searchEntityByKey( - GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE); - if (!entityMap.isEmpty()) { - result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap); - // Contains Upstream Lineage - if (entityMap.containsKey(UPSTREAM_LINEAGE_FIELD)) { - List upStreamEntities = - JsonUtils.readOrConvertValues( - entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class); - for (EsLineageData esLineageData : upStreamEntities) { - result - .getUpstreamEdges() - .putIfAbsent( - esLineageData.getDocId(), - esLineageData.withToEntity(SearchClient.getRelationshipRef(entityMap))); - String upstreamEntityId = esLineageData.getFromEntity().getId().toString(); - if (!result.getNodes().containsKey(upstreamEntityId)) { - SearchLineageRequest updatedRequest = - JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class) - .withUpstreamDepth(lineageRequest.getUpstreamDepth() - 1) - .withFqn(esLineageData.getFromEntity().getFqn()); - getUpstreamLineageRecursively(updatedRequest, result); + os.org.opensearch.action.search.SearchResponse searchResponse = + client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + List> lineage = + (List>) hit.getSourceAsMap().get("lineage"); + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + nodes.add(tempMap); + for (Map lin : lineage) { + HashMap fromEntity = (HashMap) lin.get("fromEntity"); + HashMap toEntity = (HashMap) lin.get("toEntity"); + if (direction.equalsIgnoreCase("lineage.fromEntity.fqnHash.keyword")) { + if (!edges.contains(lin) && fromEntity.get("fqn").equals(fqn)) { + edges.add(lin); + getLineage( + toEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); } - } - } - } - } - - public SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest) - throws IOException { - SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>()); - Map entityMap = - searchEntityByKey( - GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE); - if (!entityMap.isEmpty()) { - result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap); - getDownStreamRecursively(SearchClient.getRelationshipRef(entityMap), lineageRequest, result); - } - return result; - } - - private void getDownStreamRecursively( - RelationshipRef fromEntity, SearchLineageRequest lineageRequest, SearchLineageResult result) - throws IOException { - if (lineageRequest.getDownstreamDepth() <= 0) { - return; - } - - os.org.opensearch.action.search.SearchRequest searchDownstreamEntities = - getSearchRequest( - GLOBAL_SEARCH_ALIAS, - lineageRequest.getQueryFilter(), - "upstreamLineage.fromEntity.fqnHash", - FullyQualifiedName.buildHash(lineageRequest.getFqn()), - lineageRequest.getIncludeDeleted(), - null, - SOURCE_FIELDS_TO_EXCLUDE); - os.org.opensearch.action.search.SearchResponse downstreamEntities = - client.search(searchDownstreamEntities, RequestOptions.DEFAULT); - for (os.org.opensearch.search.SearchHit searchHit : downstreamEntities.getHits().getHits()) { - Map entityMap = new HashMap<>(searchHit.getSourceAsMap()); - if (!entityMap.isEmpty()) { - result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap); - List upStreamEntities = - JsonUtils.readOrConvertValues( - entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class); - EsLineageData esLineageData = - SearchClient.getEsLineageDataFromUpstreamLineage(fromEntity.getFqn(), upStreamEntities); - if (esLineageData != null) { - RelationshipRef toEntity = SearchClient.getRelationshipRef(entityMap); - result - .getDownstreamEdges() - .putIfAbsent(esLineageData.getDocId(), esLineageData.withToEntity(toEntity)); - String downStreamEntityId = toEntity.getId().toString(); - if (!result.getNodes().containsKey(downStreamEntityId)) { - SearchLineageRequest updatedRequest = - JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class) - .withDownstreamDepth(lineageRequest.getDownstreamDepth() - 1) - .withFqn(toEntity.getFqn()); - getDownStreamRecursively(toEntity, updatedRequest, result); + } else { + if (!edges.contains(lin) && toEntity.get("fqn").equals(fqn)) { + edges.add(lin); + getLineage( + fromEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); } } } @@ -1353,6 +1293,107 @@ public class OpenSearchClient implements SearchClient { return client.search(searchRequest, RequestOptions.DEFAULT); } + private Map searchPipelineLineage( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + Map responseMap = new HashMap<>(); + Set> edges = new HashSet<>(); + Set> nodes = new HashSet<>(); + responseMap.put("entity", null); + Object[] searchAfter = null; + long processedRecords = 0; + long totalRecords = -1; + while (totalRecords != processedRecords) { + os.org.opensearch.action.search.SearchRequest searchRequest = + new os.org.opensearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.should( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn))); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); + FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName"); + searchSourceBuilder.sort(sortBuilder); + searchSourceBuilder.query(boolQueryBuilder); + if (searchAfter != null) { + searchSourceBuilder.searchAfter(searchAfter); + } + if (CommonUtil.nullOrEmpty(deleted)) { + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(boolQueryBuilder) + .must(QueryBuilders.termQuery("deleted", deleted))); + } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); + + searchRequest.source(searchSourceBuilder); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + List> lineage = + (List>) hit.getSourceAsMap().get("lineage"); + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + nodes.add(tempMap); + for (Map lin : lineage) { + HashMap fromEntity = (HashMap) lin.get("fromEntity"); + HashMap toEntity = (HashMap) lin.get("toEntity"); + HashMap pipeline = (HashMap) lin.get("pipeline"); + if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) { + edges.add(lin); + getLineage( + fromEntity.get("fqn"), + upstreamDepth, + edges, + nodes, + queryFilter, + "lineage.toEntity.fqn.keyword", + deleted); + getLineage( + toEntity.get("fqn"), + downstreamDepth, + edges, + nodes, + queryFilter, + "lineage.fromEntity.fqn.keyword", + deleted); + } + } + } + totalRecords = searchResponse.getHits().getTotalHits().value; + int currentHits = searchResponse.getHits().getHits().length; + processedRecords += currentHits; + if (currentHits > 0) { + searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues(); + } else { + searchAfter = null; + } + } + getLineage( + fqn, downstreamDepth, edges, nodes, queryFilter, "lineage.fromEntity.fqn.keyword", deleted); + getLineage( + fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted); + + if (edges.isEmpty()) { + os.org.opensearch.action.search.SearchRequest searchRequestForEntity = + new os.org.opensearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilderForEntity = new SearchSourceBuilder(); + searchSourceBuilderForEntity.query( + QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); + searchRequestForEntity.source(searchSourceBuilderForEntity.size(1000)); + SearchResponse searchResponseForEntity = + client.search(searchRequestForEntity, RequestOptions.DEFAULT); + for (var hit : searchResponseForEntity.getHits().getHits()) { + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + tempMap.keySet().removeAll(FIELDS_TO_REMOVE); + responseMap.put("entity", tempMap); + } + } + responseMap.put("edges", edges); + responseMap.put("nodes", nodes); + return responseMap; + } + private static FunctionScoreQueryBuilder boostScore(QueryStringQueryBuilder queryBuilder) { FunctionScoreQueryBuilder.FilterFunctionBuilder tier1Boost = new FunctionScoreQueryBuilder.FilterFunctionBuilder( @@ -2230,14 +2271,13 @@ public class OpenSearchClient implements SearchClient { @Override public void updateLineage( - String indexName, Pair fieldAndValue, EsLineageData lineageData) { + String indexName, Pair fieldAndValue, Map lineagaData) { if (isClientAvailable) { UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); updateByQueryRequest.setQuery( new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue()) .operator(Operator.AND)); - Map params = - Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData)); + Map params = Collections.singletonMap("lineageData", lineagaData); Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params); updateByQueryRequest.setScript(script); diff --git a/openmetadata-spec/src/main/resources/json/schema/api/lineage/esLineageData.json b/openmetadata-spec/src/main/resources/json/schema/api/lineage/esLineageData.json deleted file mode 100644 index cf0308bd1c0..00000000000 --- a/openmetadata-spec/src/main/resources/json/schema/api/lineage/esLineageData.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "$id": "https://open-metadata.org/schema/api/lineage/esLineageData.json", - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "EsLineageData", - "description": "Response object for the search lineage request from Elastic Search.", - "javaType": "org.openmetadata.schema.api.lineage.EsLineageData", - "type": "object", - "definitions": { - "relationshipRef": { - "description": "Relationship Reference to an Entity.", - "type": "object", - "properties": { - "id": { - "description": "Unique identifier of this entity instance.", - "$ref": "../../type/basic.json#/definitions/uuid" - }, - "fqn": { - "description": "FullyQualifiedName of the entity.", - "$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName" - }, - "fqnHash": { - "description": "FullyQualifiedName Hash of the entity.", - "type": "string" - }, - "type": { - "description": "Type of the entity.", - "type": "string" - } - } - } - }, - "properties": { - "fromEntity": { - "description": "From Entity.", - "$ref": "#/definitions/relationshipRef" - }, - "toEntity": { - "description": "To Entity.", - "$ref": "#/definitions/relationshipRef" - }, - "pipeline": { - "description": "Pipeline in case pipeline is present between entities." - }, - "sqlQuery": { - "description": "Sql Query associated.", - "type": "string" - }, - "columns": { - "description": "Columns associated.", - "type": "array", - "items": { - "$ref": "../../type/entityLineage.json#/definitions/columnLineage" - } - }, - "description": { - "description": "Description.", - "type": "string" - }, - "source": { - "description": "Source of the Lineage.", - "type": "string" - }, - "doc_id": { - "description": "Doc Id for the Lineage.", - "type": "string" - }, - "pipelineEntityType": { - "description": "Pipeline Entity or Stored procedure.", - "type": "string" - } - }, - "additionalProperties": false -} diff --git a/openmetadata-spec/src/main/resources/json/schema/api/lineage/lineageDirection.json b/openmetadata-spec/src/main/resources/json/schema/api/lineage/lineageDirection.json deleted file mode 100644 index 271ffdd2100..00000000000 --- a/openmetadata-spec/src/main/resources/json/schema/api/lineage/lineageDirection.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "$id": "https://open-metadata.org/schema/api/lineage/lineageDirection.json", - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "LineageDirection", - "description": "Lineage Direction Schema.", - "javaType": "org.openmetadata.schema.api.lineage.LineageDirection", - "type": "string", - "enum": ["Upstream", "Downstream"] -} diff --git a/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageRequest.json b/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageRequest.json deleted file mode 100644 index e3c357f22b2..00000000000 --- a/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageRequest.json +++ /dev/null @@ -1,46 +0,0 @@ -{ - "$id": "https://open-metadata.org/schema/api/lineage/searchLineageRequest.json", - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "SearchLineageRequest", - "description": "Search Lineage Request Schema to find linage from Elastic Search.", - "javaType": "org.openmetadata.schema.api.lineage.SearchLineageRequest", - "type": "object", - "properties": { - "fqn": { - "description": "Entity Fqn to search lineage", - "type": "string" - }, - "entityType": { - "description": "Query Filter", - "type": "string" - }, - "direction": { - "$ref": "./lineageDirection.json" - }, - "directionValue": { - "description": "Lineage Direction Value.", - "type": "string" - }, - "upstreamDepth": { - "description": "The upstream depth of the lineage", - "type": "integer", - "default": 3 - }, - "downstreamDepth": { - "description": "The downstream depth of the lineage", - "type": "integer", - "default": 3 - }, - "queryFilter": { - "description": "Query Filter", - "type": "string" - }, - "includeDeleted": { - "description": "Include deleted entities", - "type": "boolean", - "default": false - } - }, - "required": ["fqn", "entityType", "direction"], - "additionalProperties": false -} diff --git a/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageResult.json b/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageResult.json deleted file mode 100644 index 4613bd1f063..00000000000 --- a/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageResult.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "$id": "https://open-metadata.org/schema/api/lineage/searchLineageResult.json", - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "SearchLineageResult", - "description": "Search Lineage Response for the Lineage Request", - "javaType": "org.openmetadata.schema.api.lineage.SearchLineageResult", - "type": "object", - "definitions": { - "entity" : { - "description": "A general object to hold any object." - } - }, - "properties": { - "paging": { - "$ref": "../../type/paging.json" - }, - "direction": { - "description": "Lineage Direction Value.", - "type": "string" - }, - "nodes" : { - "description": "Nodes in the lineage response.", - "existingJavaType": "java.util.Map" - }, - "upstreamEdges": { - "description": "Upstream Edges for the entity.", - "existingJavaType": "java.util.Map" - }, - "downstreamEdges": { - "description": "Downstream Edges for the node.", - "existingJavaType": "java.util.Map" - } - }, - "additionalProperties": false -} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/esLineageData.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/esLineageData.ts deleted file mode 100644 index 2da1437de9f..00000000000 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/esLineageData.ts +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2025 Collate. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Response object for the search lineage request from Elastic Search. - */ -export interface EsLineageData { - /** - * Columns associated. - */ - columns?: ColumnLineage[]; - /** - * Description. - */ - description?: string; - /** - * Doc Id for the Lineage. - */ - doc_id?: string; - /** - * From Entity. - */ - fromEntity?: RelationshipRef; - /** - * Pipeline in case pipeline is present between entities. - */ - pipeline?: any; - /** - * Pipeline Entity or Stored procedure. - */ - pipelineEntityType?: string; - /** - * Source of the Lineage. - */ - source?: string; - /** - * Sql Query associated. - */ - sqlQuery?: string; - /** - * To Entity. - */ - toEntity?: RelationshipRef; -} - -export interface ColumnLineage { - /** - * One or more source columns identified by fully qualified column name used by - * transformation function to create destination column. - */ - fromColumns?: string[]; - /** - * Transformation function applied to source columns to create destination column. That is - * `function(fromColumns) -> toColumn`. - */ - function?: string; - /** - * Destination column identified by fully qualified column name created by the - * transformation of source columns. - */ - toColumn?: string; - [property: string]: any; -} - -/** - * From Entity. - * - * Relationship Reference to an Entity. - * - * To Entity. - */ -export interface RelationshipRef { - /** - * FullyQualifiedName of the entity. - */ - fqn?: string; - /** - * FullyQualifiedName Hash of the entity. - */ - fqnHash?: string; - /** - * Unique identifier of this entity instance. - */ - id?: string; - /** - * Type of the entity. - */ - type?: string; - [property: string]: any; -} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/lineageDirection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/lineageDirection.ts deleted file mode 100644 index 941f89c401a..00000000000 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/lineageDirection.ts +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2025 Collate. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Lineage Direction Schema. - */ -export enum LineageDirection { - Downstream = "Downstream", - Upstream = "Upstream", -} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageRequest.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageRequest.ts deleted file mode 100644 index 45caa6d6410..00000000000 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageRequest.ts +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2025 Collate. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Search Lineage Request Schema to find linage from Elastic Search. - */ -export interface SearchLineageRequest { - direction: LineageDirection; - /** - * Lineage Direction Value. - */ - directionValue?: string; - /** - * The downstream depth of the lineage - */ - downstreamDepth?: number; - /** - * Query Filter - */ - entityType: string; - /** - * Entity Fqn to search lineage - */ - fqn: string; - /** - * Include deleted entities - */ - includeDeleted?: boolean; - /** - * Query Filter - */ - queryFilter?: string; - /** - * The upstream depth of the lineage - */ - upstreamDepth?: number; -} - -/** - * Lineage Direction Schema. - */ -export enum LineageDirection { - Downstream = "Downstream", - Upstream = "Upstream", -} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageResult.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageResult.ts deleted file mode 100644 index 19858c45b81..00000000000 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageResult.ts +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2025 Collate. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Search Lineage Response for the Lineage Request - */ -export interface SearchLineageResult { - /** - * Lineage Direction Value. - */ - direction?: string; - /** - * Downstream Edges for the node. - */ - downstreamEdges?: any; - /** - * Nodes in the lineage response. - */ - nodes?: any; - paging?: Paging; - /** - * Upstream Edges for the entity. - */ - upstreamEdges?: any; -} - -/** - * Type used for cursor based pagination information in GET list responses. - */ -export interface Paging { - /** - * After cursor used for getting the next page (see API pagination for details). - */ - after?: string; - /** - * Before cursor used for getting the previous page (see API pagination for details). - */ - before?: string; - /** - * Limit used in case of offset based pagination. - */ - limit?: number; - /** - * Offset used in case of offset based pagination. - */ - offset?: number; - /** - * Total number of entries available to page through. - */ - total: number; -}