diff --git a/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java b/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java index 7e0da8b6ac4..3c63d90e436 100644 --- a/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java +++ b/common/src/main/java/org/openmetadata/common/utils/CommonUtil.java @@ -33,6 +33,7 @@ import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.Enumeration; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -177,6 +178,10 @@ public final class CommonUtil { return Optional.ofNullable(list).orElse(Collections.emptyList()); } + public static Map collectionOrEmpty(Map input) { + return Optional.ofNullable(input).orElse(new HashMap<>()); + } + public static List listOrEmptyMutable(List list) { return nullOrEmpty(list) ? new ArrayList<>() : new ArrayList<>(list); } @@ -209,6 +214,14 @@ public final class CommonUtil { } } + public static List collectionOrDefault(List c, List defaultValue) { + if (nullOrEmpty(c)) { + return defaultValue; + } else { + return c; + } + } + public static String getResourceAsStream(ClassLoader loader, String file) throws IOException { return IOUtils.toString(Objects.requireNonNull(loader.getResourceAsStream(file)), UTF_8); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index 9026c4f36da..dd3c6f05a88 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -14,6 +14,8 @@ package org.openmetadata.service.jdbi3; import static javax.ws.rs.core.Response.Status.OK; +import static org.openmetadata.common.utils.CommonUtil.collectionOrDefault; +import static org.openmetadata.common.utils.CommonUtil.nullOrDefault; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.csv.CsvUtil.addField; import static org.openmetadata.csv.EntityCsv.getCsvDocumentation; @@ -48,9 +50,13 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.sqlobject.transaction.Transaction; -import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.csv.CsvUtil; import org.openmetadata.schema.api.lineage.AddLineage; +import org.openmetadata.schema.api.lineage.EsLineageData; +import org.openmetadata.schema.api.lineage.LineageDirection; +import org.openmetadata.schema.api.lineage.RelationshipRef; +import org.openmetadata.schema.api.lineage.SearchLineageRequest; +import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.entity.data.APIEndpoint; import org.openmetadata.schema.entity.data.Container; import org.openmetadata.schema.entity.data.Dashboard; @@ -144,84 +150,67 @@ public class LineageRepository { private void addLineageToSearch( EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) { - IndexMapping sourceIndexMapping = - Entity.getSearchRepository().getIndexMapping(fromEntity.getType()); - String sourceIndexName = - sourceIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()); IndexMapping destinationIndexMapping = Entity.getSearchRepository().getIndexMapping(toEntity.getType()); String destinationIndexName = destinationIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()); - Map relationshipDetails = - buildRelationshipDetailsMap(fromEntity, toEntity, lineageDetails); - Pair from = new ImmutablePair<>("_id", fromEntity.getId().toString()); + // For lineage from -> to (not stored) since the doc itself is the toEntity + EsLineageData lineageData = + buildEntityLineageData(fromEntity, toEntity, lineageDetails).withToEntity(null); Pair to = new ImmutablePair<>("_id", toEntity.getId().toString()); - searchClient.updateLineage(sourceIndexName, from, relationshipDetails); - searchClient.updateLineage(destinationIndexName, to, relationshipDetails); + searchClient.updateLineage(destinationIndexName, to, lineageData); } - public static Map buildEntityRefMap(EntityReference entityRef) { - Map details = new HashMap<>(); - details.put("id", entityRef.getId().toString()); - details.put("type", entityRef.getType()); - details.put("fqn", entityRef.getFullyQualifiedName()); - details.put("fqnHash", FullyQualifiedName.buildHash(entityRef.getFullyQualifiedName())); - return details; + public static RelationshipRef buildEntityRefLineage(EntityReference entityRef) { + return new RelationshipRef() + .withId(entityRef.getId()) + .withType(entityRef.getType()) + .withFqn(entityRef.getFullyQualifiedName()) + .withFqnHash(FullyQualifiedName.buildHash(entityRef.getFullyQualifiedName())); } - public static Map buildRelationshipDetailsMap( + public static EsLineageData buildEntityLineageData( EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) { - Map relationshipDetails = new HashMap<>(); - relationshipDetails.put( - "doc_id", fromEntity.getId().toString() + "-" + toEntity.getId().toString()); - relationshipDetails.put("fromEntity", buildEntityRefMap(fromEntity)); - relationshipDetails.put("toEntity", buildEntityRefMap(toEntity)); + EsLineageData lineageData = + new EsLineageData() + .withDocId(fromEntity.getId().toString() + "-->" + toEntity.getId().toString()) + .withFromEntity(buildEntityRefLineage(fromEntity)); if (lineageDetails != null) { // Add Pipeline Details - addPipelineDetails(relationshipDetails, lineageDetails.getPipeline()); - relationshipDetails.put( - "description", - CommonUtil.nullOrEmpty(lineageDetails.getDescription()) - ? null - : lineageDetails.getDescription()); - if (!CommonUtil.nullOrEmpty(lineageDetails.getColumnsLineage())) { - List> colummnLineageList = new ArrayList<>(); - for (ColumnLineage columnLineage : lineageDetails.getColumnsLineage()) { - colummnLineageList.add(JsonUtils.getMap(columnLineage)); - } - relationshipDetails.put("columns", colummnLineageList); - } - relationshipDetails.put( - "sqlQuery", - CommonUtil.nullOrEmpty(lineageDetails.getSqlQuery()) - ? null - : lineageDetails.getSqlQuery()); - relationshipDetails.put( - "source", - CommonUtil.nullOrEmpty(lineageDetails.getSource()) ? null : lineageDetails.getSource()); + addPipelineDetails(lineageData, lineageDetails.getPipeline()); + lineageData.setDescription(nullOrDefault(lineageDetails.getDescription(), null)); + lineageData.setColumns(collectionOrDefault(lineageDetails.getColumnsLineage(), null)); + lineageData.setSqlQuery(nullOrDefault(lineageDetails.getSqlQuery(), null)); + lineageData.setSource(nullOrDefault(lineageDetails.getSource().value(), null)); } - return relationshipDetails; + return lineageData; } - public static void addPipelineDetails( - Map relationshipDetails, EntityReference pipelineRef) { - if (CommonUtil.nullOrEmpty(pipelineRef)) { - relationshipDetails.put(PIPELINE, JsonUtils.getMap(null)); + public static void addPipelineDetails(EsLineageData lineageData, EntityReference pipelineRef) { + if (nullOrEmpty(pipelineRef)) { + lineageData.setPipeline(null); } else { - Map pipelineMap; - if (pipelineRef.getType().equals(PIPELINE)) { - pipelineMap = - JsonUtils.getMap( - Entity.getEntity(pipelineRef, "pipelineStatus,tags,owners", Include.ALL)); - } else { - pipelineMap = JsonUtils.getMap(Entity.getEntity(pipelineRef, "tags,owners", Include.ALL)); - } - pipelineMap.remove("changeDescription"); - relationshipDetails.put("pipelineEntityType", pipelineRef.getType()); - relationshipDetails.put(PIPELINE, pipelineMap); + Pair> pipelineOrStoredProcedure = + getPipelineOrStoredProcedure(pipelineRef, List.of("changeDescription")); + lineageData.setPipelineEntityType(pipelineOrStoredProcedure.getLeft()); + lineageData.setPipeline(pipelineOrStoredProcedure.getRight()); } } + public static Pair> getPipelineOrStoredProcedure( + EntityReference pipelineRef, List fieldsToRemove) { + Map pipelineMap; + if (pipelineRef.getType().equals(PIPELINE)) { + pipelineMap = + JsonUtils.getMap( + Entity.getEntity(pipelineRef, "pipelineStatus,tags,owners", Include.ALL)); + } else { + pipelineMap = JsonUtils.getMap(Entity.getEntity(pipelineRef, "tags,owners", Include.ALL)); + } + fieldsToRemove.forEach(pipelineMap::remove); + return Pair.of(pipelineRef.getType(), pipelineMap); + } + private String validateLineageDetails( EntityReference from, EntityReference to, LineageDetails details) { if (details == null) { @@ -249,13 +238,21 @@ public class LineageRepository { throws IOException { CsvDocumentation documentation = getCsvDocumentation("lineage"); List headers = documentation.getHeaders(); - Map lineageMap = + // TODO: Fix the lineage we need booth the lineage + SearchLineageResult result = Entity.getSearchRepository() .searchLineageForExport( - fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + fqn, + upstreamDepth, + downstreamDepth, + queryFilter, + deleted, + entityType, + LineageDirection.UPSTREAM); CsvFile csvFile = new CsvFile().withHeaders(headers); - addRecords(csvFile, lineageMap); + // TODO: Fix This + // addRecords(csvFile, result); return CsvUtil.formatCsv(csvFile); } @@ -278,10 +275,19 @@ public class LineageRepository { String entityType, boolean deleted) { try { - Response response = + // TODO: fix Export to consider for both the nodes + SearchLineageResult response = Entity.getSearchRepository() - .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); - String jsonResponse = JsonUtils.pojoToJson(response.getEntity()); + .searchLineage( + new SearchLineageRequest() + .withFqn(fqn) + .withUpstreamDepth(upstreamDepth) + .withDownstreamDepth(downstreamDepth) + .withQueryFilter(queryFilter) + .withIncludeDeleted(deleted) + .withEntityType(entityType) + .withDirection(LineageDirection.UPSTREAM)); + String jsonResponse = JsonUtils.pojoToJson(response); JsonNode rootNode = JsonUtils.readTree(jsonResponse); Map entityMap = new HashMap<>(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java index cd71dfe5fe8..24ec96c9916 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java @@ -51,6 +51,9 @@ import javax.ws.rs.core.SecurityContext; import javax.ws.rs.core.UriInfo; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.lineage.AddLineage; +import org.openmetadata.schema.api.lineage.LineageDirection; +import org.openmetadata.schema.api.lineage.SearchLineageRequest; +import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.type.EntityLineage; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.MetadataOperation; @@ -192,7 +195,7 @@ public class LineageResource { mediaType = "application/json", schema = @Schema(implementation = SearchResponse.class))) }) - public Response searchLineage( + public SearchLineageResult searchLineage( @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description = "fqn") @QueryParam("fqn") String fqn, @@ -209,9 +212,61 @@ public class LineageResource { boolean deleted, @Parameter(description = "entity type") @QueryParam("type") String entityType) throws IOException { - return Entity.getSearchRepository() - .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + .searchLineage( + new SearchLineageRequest() + .withFqn(fqn) + .withUpstreamDepth(upstreamDepth) + .withDownstreamDepth(downstreamDepth) + .withQueryFilter(queryFilter) + .withIncludeDeleted(deleted) + .withEntityType(entityType)); + } + + @GET + @Path("/getLineage/{direction}") + @Operation( + operationId = "searchLineageWithDirection", + summary = "Search lineage with Direction", + responses = { + @ApiResponse( + responseCode = "200", + description = "search response", + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = SearchResponse.class))) + }) + public SearchLineageResult searchLineageWithDirection( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "fqn") @QueryParam("fqn") String fqn, + @Parameter(description = "Direction", required = true, schema = @Schema(type = "string")) + @PathParam("direction") + LineageDirection direction, + @Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth, + @Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth") + int downstreamDepth, + @Parameter( + description = + "Elasticsearch query that will be combined with the query_string query generator from the `query` argument") + @QueryParam("query_filter") + String queryFilter, + @Parameter(description = "Filter documents by deleted param. By default deleted is false") + @QueryParam("includeDeleted") + boolean deleted, + @Parameter(description = "entity type") @QueryParam("type") String entityType) + throws IOException { + return Entity.getSearchRepository() + .searchLineage( + new SearchLineageRequest() + .withFqn(fqn) + .withUpstreamDepth(upstreamDepth) + .withDownstreamDepth(downstreamDepth) + .withQueryFilter(queryFilter) + .withIncludeDeleted(deleted) + .withEntityType(entityType) + .withDirection(direction)); } @GET @@ -279,8 +334,6 @@ public class LineageResource { boolean deleted, @Parameter(description = "entity type") @QueryParam("type") String entityType) throws IOException { - Entity.getSearchRepository() - .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index d42fab34926..85a2609a8d1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -7,6 +7,7 @@ import java.security.KeyStoreException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.json.JsonArray; @@ -15,6 +16,10 @@ import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; import lombok.Getter; import org.apache.commons.lang3.tuple.Pair; +import org.openmetadata.schema.api.lineage.EsLineageData; +import org.openmetadata.schema.api.lineage.RelationshipRef; +import org.openmetadata.schema.api.lineage.SearchLineageRequest; +import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.api.search.SearchSettings; import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; @@ -28,6 +33,7 @@ import org.openmetadata.service.resources.settings.SettingsCache; import org.openmetadata.service.search.models.IndexMapping; import org.openmetadata.service.search.security.RBACConditionEvaluator; import org.openmetadata.service.security.policyevaluator.SubjectContext; +import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.util.SSLUtil; import os.org.opensearch.action.bulk.BulkRequest; @@ -35,6 +41,9 @@ import os.org.opensearch.action.bulk.BulkResponse; import os.org.opensearch.client.RequestOptions; public interface SearchClient { + String UPSTREAM_LINEAGE_FIELD = "upstreamLineage"; + String FQN_FIELD = "fullyQualifiedName"; + String ID_FIELD = "id"; ExecutorService asyncExecutor = Executors.newFixedThreadPool(1); String UPDATE = "update"; @@ -81,10 +90,10 @@ public interface SearchClient { "if (ctx._source.certification != null && ctx._source.certification.tagLabel != null) {ctx._source.certification.tagLabel.style = params.style; ctx._source.certification.tagLabel.description = params.description; ctx._source.certification.tagLabel.tagFQN = params.tagFQN; ctx._source.certification.tagLabel.name = params.name; }"; String REMOVE_LINEAGE_SCRIPT = - "for (int i = 0; i < ctx._source.lineage.length; i++) { if (ctx._source.lineage[i].doc_id == '%s') { ctx._source.lineage.remove(i) }}"; + "for (int i = 0; i < ctx._source.upstreamLineage.length; i++) { if (ctx._source.upstreamLineage[i].doc_id == '%s') { ctx._source.upstreamLineage.remove(i) }}"; String ADD_UPDATE_LINEAGE = - "boolean docIdExists = false; for (int i = 0; i < ctx._source.lineage.size(); i++) { if (ctx._source.lineage[i].doc_id.equalsIgnoreCase(params.lineageData.doc_id)) { ctx._source.lineage[i] = params.lineageData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.lineage.add(params.lineageData);}"; + "boolean docIdExists = false; for (int i = 0; i < ctx._source.upstreamLineage.size(); i++) { if (ctx._source.upstreamLineage[i].doc_id.equalsIgnoreCase(params.lineageData.doc_id)) { ctx._source.upstreamLineage[i] = params.lineageData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.upstreamLineage.add(params.lineageData);}"; // The script is used for updating the entityRelationship attribute of the entity in ES // It checks if any duplicate entry is present based on the doc_id and updates only if it is not @@ -188,15 +197,15 @@ public interface SearchClient { Response searchBySourceUrl(String sourceUrl) throws IOException; - Response searchLineage( - String fqn, - int upstreamDepth, - int downstreamDepth, - String queryFilter, - boolean deleted, - String entityType) + SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException; + + SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest) throws IOException; + SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest) throws IOException; + + SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest) throws IOException; + Response searchEntityRelationship( String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) throws IOException; @@ -208,6 +217,10 @@ public interface SearchClient { String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) throws IOException; + Map searchEntityByKey( + String indexAlias, String keyName, String keyValue, List fieldsToRemove) + throws IOException; + /* Used for listing knowledge page hierarchy for a given parent and page type, used in Elastic/Open SearchClientExtension */ @@ -220,6 +233,7 @@ public interface SearchClient { /* Used for listing knowledge page hierarchy for a given active Page and page type, used in Elastic/Open SearchClientExtension */ + @SuppressWarnings("unused") default ResultList listPageHierarchyForActivePage( String activeFqn, String pageType, int offset, int limit) { throw new CustomExceptionMessage( @@ -232,15 +246,6 @@ public interface SearchClient { Response.Status.NOT_IMPLEMENTED, NOT_IMPLEMENTED_ERROR_TYPE, NOT_IMPLEMENTED_METHOD); } - Map searchLineageInternal( - String fqn, - int upstreamDepth, - int downstreamDepth, - String queryFilter, - boolean deleted, - String entityType) - throws IOException; - Response searchByField(String fieldName, String fieldValue, String index) throws IOException; Response aggregate(String index, String fieldName, String value, String query) throws IOException; @@ -291,7 +296,7 @@ public interface SearchClient { Pair> updates); void updateLineage( - String indexName, Pair fieldAndValue, Map lineagaData); + String indexName, Pair fieldAndValue, EsLineageData lineageData); void updateEntityRelationship( String indexName, @@ -393,5 +398,51 @@ public interface SearchClient { && rbacConditionEvaluator != null; } + // default String getLineageDirection(LineageDirection direction, String entityType) { + // if (LineageDirection.UPSTREAM.equals(direction)) { + // if (Boolean.FALSE.equals(entityType.equals(Entity.PIPELINE)) && + // Boolean.FALSE.equals(entityType.equals(Entity.STORED_PROCEDURE))) { + // return + // }else{ + // + // } + // } else { + // + // } + // return ""; + // } + SearchHealthStatus getSearchHealthStatus() throws IOException; + + static RelationshipRef getRelationshipRef(Map entityMap) { + // This assumes these keys exists in the map, use it with caution + return new RelationshipRef() + .withId(UUID.fromString(entityMap.get("id").toString())) + .withType(entityMap.get("entityType").toString()) + .withFqn(entityMap.get("fullyQualifiedName").toString()) + .withFqnHash(FullyQualifiedName.buildHash(entityMap.get("fullyQualifiedName").toString())); + } + + static EsLineageData getEsLineageDataFromUpstreamLineage( + String fqn, List upstream) { + for (EsLineageData esLineageData : upstream) { + if (esLineageData.getFromEntity().getFqn().equals(fqn)) { + return esLineageData; + } + } + return null; + } + + static EsLineageData copyEsLineageData(EsLineageData data) { + return new EsLineageData() + .withDocId(data.getDocId()) + .withFromEntity(data.getFromEntity()) + .withToEntity(data.getToEntity()) + .withPipeline(data.getPipeline()) + .withSqlQuery(data.getSqlQuery()) + .withColumns(data.getColumns()) + .withDescription(data.getDescription()) + .withSource(data.getSource()) + .withPipelineEntityType(data.getPipelineEntityType()); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexUtils.java index ba210001f73..68dbea5060c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexUtils.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexUtils.java @@ -19,6 +19,7 @@ import org.openmetadata.schema.tests.Datum; import org.openmetadata.schema.tests.type.DataQualityReportMetadata; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.TagLabel; +import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.Utilities; public final class SearchIndexUtils { @@ -58,8 +59,9 @@ public final class SearchIndexUtils { if (value instanceof Map) { currentMap = (Map) value; } else if (value instanceof List) { - List> list = (List>) value; - for (Map item : list) { + List list = (List>) value; + for (Object obj : list) { + Map item = JsonUtils.getMap(obj); removeFieldByPath( item, Arrays.stream(pathElements, 1, pathElements.length).collect(Collectors.joining("."))); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 943a23e593f..3a288d09805 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -60,6 +60,9 @@ import org.apache.commons.lang3.tuple.Pair; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.EntityTimeSeriesInterface; import org.openmetadata.schema.analytics.ReportData; +import org.openmetadata.schema.api.lineage.LineageDirection; +import org.openmetadata.schema.api.lineage.SearchLineageRequest; +import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.entity.classification.Tag; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; @@ -958,16 +961,13 @@ public class SearchRepository { return searchClient.searchBySourceUrl(sourceUrl); } - public Response searchLineage( - String fqn, - int upstreamDepth, - int downstreamDepth, - String queryFilter, - boolean deleted, - String entityType) + public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException { + return searchClient.searchLineage(lineageRequest); + } + + public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest) throws IOException { - return searchClient.searchLineage( - fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + return searchClient.searchLineage(lineageRequest); } public Response searchEntityRelationship( @@ -989,16 +989,24 @@ public class SearchRepository { fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); } - public Map searchLineageForExport( + public SearchLineageResult searchLineageForExport( String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted, - String entityType) + String entityType, + LineageDirection direction) throws IOException { - return searchClient.searchLineageInternal( - fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + return searchClient.searchLineage( + new SearchLineageRequest() + .withFqn(fqn) + .withUpstreamDepth(upstreamDepth) + .withDownstreamDepth(downstreamDepth) + .withQueryFilter(queryFilter) + .withIncludeDeleted(deleted) + .withEntityType(entityType) + .withDirection(direction)); } public Response searchByField(String fieldName, String fieldValue, String index) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 69ebeafd86e..4220fd9487b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -2,6 +2,7 @@ package org.openmetadata.service.search.elasticsearch; import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static javax.ws.rs.core.Response.Status.OK; +import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA; import static org.openmetadata.service.Entity.DATA_PRODUCT; @@ -152,6 +153,11 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.jetbrains.annotations.NotNull; import org.openmetadata.common.utils.CommonUtil; +import org.openmetadata.schema.api.lineage.EsLineageData; +import org.openmetadata.schema.api.lineage.LineageDirection; +import org.openmetadata.schema.api.lineage.RelationshipRef; +import org.openmetadata.schema.api.lineage.SearchLineageRequest; +import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; @@ -842,64 +848,26 @@ public class ElasticSearchClient implements SearchClient { } @Override - public Map searchLineageInternal( - String fqn, - int upstreamDepth, - int downstreamDepth, - String queryFilter, - boolean deleted, - String entityType) - throws IOException { - Map responseMap = new HashMap<>(); - Set> edges = new HashSet<>(); - Set> nodes = new HashSet<>(); - if (entityType.equalsIgnoreCase(Entity.PIPELINE) - || entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) { - return searchPipelineLineage( - fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, responseMap); - } - es.org.elasticsearch.action.search.SearchRequest searchRequest = - new es.org.elasticsearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - List sourceFieldsToExcludeCopy = new ArrayList<>(SOURCE_FIELDS_TO_EXCLUDE); - searchSourceBuilder.fetchSource(null, sourceFieldsToExcludeCopy.toArray(String[]::new)); - searchSourceBuilder.query( - QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); - searchRequest.source(searchSourceBuilder.size(1000)); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - for (var hit : searchResponse.getHits().getHits()) { - Map tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); - responseMap.put("entity", tempMap); - } - getLineage( - fqn, - downstreamDepth, - edges, - nodes, - queryFilter, - "lineage.fromEntity.fqnHash.keyword", - deleted); - getLineage( - fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqnHash.keyword", deleted); - responseMap.put("edges", edges); - responseMap.put("nodes", nodes); - return responseMap; + public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException { + SearchLineageResult result = + getDownStreamLineage(lineageRequest.withDirection(LineageDirection.DOWNSTREAM)); + SearchLineageResult upstreamLineage = + getUpstreamLineage(lineageRequest.withDirection(LineageDirection.UPSTREAM)); + + // Add All nodes and edges from upstream lineage to result + result.getNodes().putAll(upstreamLineage.getNodes()); + result.getUpstreamEdges().putAll(upstreamLineage.getUpstreamEdges()); + return result; } @Override - public Response searchLineage( - String fqn, - int upstreamDepth, - int downstreamDepth, - String queryFilter, - boolean deleted, - String entityType) + public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest) throws IOException { - Map responseMap = - searchLineageInternal( - fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); - return Response.status(OK).entity(responseMap).build(); + if (lineageRequest.getDirection().equals(LineageDirection.UPSTREAM)) { + return getUpstreamLineage(lineageRequest); + } else { + return getDownStreamLineage(lineageRequest); + } } private void getEntityRelationship( @@ -1105,54 +1073,155 @@ public class ElasticSearchClient implements SearchClient { return Response.status(OK).entity(responseMap).build(); } - private void getLineage( - String fqn, - int depth, - Set> edges, - Set> nodes, - String queryFilter, - String direction, - boolean deleted) + @Override + public Map searchEntityByKey( + String indexAlias, String keyName, String keyValue, List fieldsToRemove) throws IOException { - if (depth <= 0) { - return; + es.org.elasticsearch.action.search.SearchRequest searchRequest = + getSearchRequest(indexAlias, null, keyName, keyValue, null, null, fieldsToRemove); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + int noOfHits = searchResponse.getHits().getHits().length; + if (noOfHits == 1) { + return new HashMap<>( + JsonUtils.getMap(searchResponse.getHits().getHits()[0].getSourceAsMap())); + } else { + throw new SearchException( + String.format( + "Issue in Search Entity By Key: %s, Value: %s , Number of Hits: %s", + keyName, keyValue, noOfHits)); } + } + + private es.org.elasticsearch.action.search.SearchRequest getSearchRequest( + String indexAlias, + String queryFilter, + String key, + String value, + Boolean deleted, + List fieldsToInclude, + List fieldsToRemove) { es.org.elasticsearch.action.search.SearchRequest searchRequest = new es.org.elasticsearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + Entity.getSearchRepository().getIndexOrAliasName(indexAlias)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); - searchSourceBuilder.query( - QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))); - if (CommonUtil.nullOrEmpty(deleted)) { + searchSourceBuilder.fetchSource( + listOrEmpty(fieldsToInclude).toArray(String[]::new), + listOrEmpty(fieldsToRemove).toArray(String[]::new)); + searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(key, value))); + if (!CommonUtil.nullOrEmpty(deleted)) { searchSourceBuilder.query( QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))) + .must(QueryBuilders.termQuery(key, value)) .must(QueryBuilders.termQuery("deleted", deleted))); } + buildSearchSourceFilter(queryFilter, searchSourceBuilder); searchRequest.source(searchSourceBuilder.size(1000)); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - for (var hit : searchResponse.getHits().getHits()) { - List> lineage = - (List>) hit.getSourceAsMap().get("lineage"); - HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); - nodes.add(tempMap); - for (Map lin : lineage) { - Map fromEntity = (HashMap) lin.get("fromEntity"); - Map toEntity = (HashMap) lin.get("toEntity"); - if (direction.equalsIgnoreCase("lineage.fromEntity.fqnHash.keyword")) { - if (!edges.contains(lin) && fromEntity.get("fqn").equals(fqn)) { - edges.add(lin); - getLineage( - toEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); + return searchRequest; + } + + public SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest) + throws IOException { + SearchLineageResult result = + new SearchLineageResult() + .withNodes(new HashMap<>()) + .withUpstreamEdges(new HashMap<>()) + .withDownstreamEdges(new HashMap<>()); + getUpstreamLineageRecursively(lineageRequest, result); + return result; + } + + private void getUpstreamLineageRecursively( + SearchLineageRequest lineageRequest, SearchLineageResult result) throws IOException { + if (lineageRequest.getUpstreamDepth() <= 0) { + return; + } + + Map entityMap = + searchEntityByKey( + GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE); + if (!entityMap.isEmpty()) { + result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap); + // Contains Upstream Lineage + if (entityMap.containsKey(UPSTREAM_LINEAGE_FIELD)) { + List upStreamEntities = + JsonUtils.readOrConvertValues( + entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class); + for (EsLineageData esLineageData : upStreamEntities) { + result + .getUpstreamEdges() + .putIfAbsent( + esLineageData.getDocId(), + esLineageData.withToEntity(SearchClient.getRelationshipRef(entityMap))); + String upstreamEntityId = esLineageData.getFromEntity().getId().toString(); + if (!result.getNodes().containsKey(upstreamEntityId)) { + SearchLineageRequest updatedRequest = + JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class) + .withUpstreamDepth(lineageRequest.getUpstreamDepth() - 1) + .withFqn(esLineageData.getFromEntity().getFqn()); + getUpstreamLineageRecursively(updatedRequest, result); } - } else { - if (!edges.contains(lin) && toEntity.get("fqn").equals(fqn)) { - edges.add(lin); - getLineage( - fromEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); + } + } + } + } + + public SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest) + throws IOException { + SearchLineageResult result = + new SearchLineageResult() + .withNodes(new HashMap<>()) + .withUpstreamEdges(new HashMap<>()) + .withDownstreamEdges(new HashMap<>()); + Map entityMap = + searchEntityByKey( + GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE); + if (!entityMap.isEmpty()) { + result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap); + getDownStreamRecursively(SearchClient.getRelationshipRef(entityMap), lineageRequest, result); + } + return result; + } + + private void getDownStreamRecursively( + RelationshipRef fromEntity, SearchLineageRequest lineageRequest, SearchLineageResult result) + throws IOException { + if (lineageRequest.getDownstreamDepth() <= 0) { + return; + } + + es.org.elasticsearch.action.search.SearchRequest searchDownstreamEntities = + getSearchRequest( + GLOBAL_SEARCH_ALIAS, + lineageRequest.getQueryFilter(), + "upstreamLineage.fromEntity.fqnHash", + FullyQualifiedName.buildHash(lineageRequest.getFqn()), + lineageRequest.getIncludeDeleted(), + null, + SOURCE_FIELDS_TO_EXCLUDE); + SearchResponse downstreamEntities = + client.search(searchDownstreamEntities, RequestOptions.DEFAULT); + for (SearchHit searchHit : downstreamEntities.getHits().getHits()) { + Map entityMap = new HashMap<>(searchHit.getSourceAsMap()); + if (!entityMap.isEmpty()) { + result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap); + List upStreamEntities = + JsonUtils.readOrConvertValues( + entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class); + EsLineageData esLineageData = + SearchClient.getEsLineageDataFromUpstreamLineage(fromEntity.getFqn(), upStreamEntities); + if (esLineageData != null) { + RelationshipRef toEntity = SearchClient.getRelationshipRef(entityMap); + result + .getDownstreamEdges() + .putIfAbsent(esLineageData.getDocId(), esLineageData.withToEntity(toEntity)); + String downStreamEntityId = toEntity.getId().toString(); + if (!result.getNodes().containsKey(downStreamEntityId)) { + SearchLineageRequest updatedRequest = + JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class) + .withDownstreamDepth(lineageRequest.getDownstreamDepth() - 1) + .withFqn(toEntity.getFqn()); + getDownStreamRecursively(toEntity, updatedRequest, result); } } } @@ -1296,112 +1365,6 @@ public class ElasticSearchClient implements SearchClient { return client.search(searchRequest, RequestOptions.DEFAULT); } - private Map searchPipelineLineage( - String fqn, - int upstreamDepth, - int downstreamDepth, - String queryFilter, - boolean deleted, - Map responseMap) - throws IOException { - Set> edges = new HashSet<>(); - Set> nodes = new HashSet<>(); - Object[] searchAfter = null; - long processedRecords = 0; - long totalRecords = -1; - while (totalRecords != processedRecords) { - es.org.elasticsearch.action.search.SearchRequest searchRequest = - new es.org.elasticsearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); - es.org.elasticsearch.index.query.BoolQueryBuilder boolQueryBuilder = - QueryBuilders.boolQuery(); - boolQueryBuilder.should( - QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn))); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); - FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName"); - searchSourceBuilder.sort(sortBuilder); - searchSourceBuilder.query(boolQueryBuilder); - if (searchAfter != null) { - searchSourceBuilder.searchAfter(searchAfter); - } - if (CommonUtil.nullOrEmpty(deleted)) { - searchSourceBuilder.query( - QueryBuilders.boolQuery() - .must(boolQueryBuilder) - .must(QueryBuilders.termQuery("deleted", deleted))); - } - buildSearchSourceFilter(queryFilter, searchSourceBuilder); - searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - - for (var hit : searchResponse.getHits().getHits()) { - List> lineage = - (List>) hit.getSourceAsMap().get("lineage"); - HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); - nodes.add(tempMap); - for (Map lin : lineage) { - HashMap fromEntity = (HashMap) lin.get("fromEntity"); - HashMap toEntity = (HashMap) lin.get("toEntity"); - HashMap pipeline = (HashMap) lin.get("pipeline"); - if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) { - edges.add(lin); - getLineage( - fromEntity.get("fqn"), - upstreamDepth, - edges, - nodes, - queryFilter, - "lineage.toEntity.fqn.keyword", - deleted); - getLineage( - toEntity.get("fqn"), - downstreamDepth, - edges, - nodes, - queryFilter, - "lineage.fromEntity.fqn.keyword", - deleted); - } - } - } - totalRecords = searchResponse.getHits().getTotalHits().value; - int currentHits = searchResponse.getHits().getHits().length; - processedRecords += currentHits; - if (currentHits > 0) { - searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues(); - } else { - searchAfter = null; - } - } - getLineage( - fqn, downstreamDepth, edges, nodes, queryFilter, "lineage.fromEntity.fqn.keyword", deleted); - getLineage( - fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted); - - // TODO: Fix this , this is hack - if (edges.isEmpty()) { - es.org.elasticsearch.action.search.SearchRequest searchRequestForEntity = - new es.org.elasticsearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); - SearchSourceBuilder searchSourceBuilderForEntity = new SearchSourceBuilder(); - searchSourceBuilderForEntity.query( - QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); - searchRequestForEntity.source(searchSourceBuilderForEntity.size(1000)); - SearchResponse searchResponseForEntity = - client.search(searchRequestForEntity, RequestOptions.DEFAULT); - for (var hit : searchResponseForEntity.getHits().getHits()) { - HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); - tempMap.keySet().removeAll(FIELDS_TO_REMOVE); - responseMap.put("entity", tempMap); - } - } - responseMap.put("edges", edges); - responseMap.put("nodes", nodes); - return responseMap; - } - @Override public Response searchBySourceUrl(String sourceUrl) throws IOException { es.org.elasticsearch.action.search.SearchRequest searchRequest = @@ -2319,13 +2282,14 @@ public class ElasticSearchClient implements SearchClient { @Override public void updateLineage( - String indexName, Pair fieldAndValue, Map lineageData) { + String indexName, Pair fieldAndValue, EsLineageData lineageData) { if (isClientAvailable) { UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); updateByQueryRequest.setQuery( new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue()) .operator(Operator.AND)); - Map params = Collections.singletonMap("lineageData", lineageData); + Map params = + Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData)); Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params); updateByQueryRequest.setScript(script); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/APIEndpointIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/APIEndpointIndex.java index b55e07b897a..c86dc2c5c6a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/APIEndpointIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/APIEndpointIndex.java @@ -95,7 +95,7 @@ public class APIEndpointIndex implements SearchIndex { .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); Map commonAttributes = getCommonAttributesMap(apiEndpoint, Entity.API_ENDPOINT); doc.putAll(commonAttributes); - doc.put("lineage", SearchIndex.getLineageData(apiEndpoint.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(apiEndpoint.getEntityReference())); doc.put( "requestSchema", apiEndpoint.getRequestSchema() != null ? apiEndpoint.getRequestSchema() : null); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/ContainerIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/ContainerIndex.java index 0f25bcfada7..186b8365c3f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/ContainerIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/ContainerIndex.java @@ -63,7 +63,7 @@ public record ContainerIndex(Container container) implements ColumnIndex { doc.put("column_suggest", columnSuggest); doc.put("serviceType", container.getServiceType()); doc.put("fullPath", container.getFullPath()); - doc.put("lineage", SearchIndex.getLineageData(container.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(container.getEntityReference())); doc.put("service", getEntityWithDisplayName(container.getService())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardDataModelIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardDataModelIndex.java index 26ced947d7a..e2051ea9098 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardDataModelIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardDataModelIndex.java @@ -64,7 +64,7 @@ public record DashboardDataModelIndex(DashboardDataModel dashboardDataModel) doc.put("column_suggest", columnSuggest); doc.put("tier", parseTags.getTierTag()); doc.put("service", getEntityWithDisplayName(dashboardDataModel.getService())); - doc.put("lineage", SearchIndex.getLineageData(dashboardDataModel.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(dashboardDataModel.getEntityReference())); doc.put("domain", getEntityWithDisplayName(dashboardDataModel.getDomain())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardIndex.java index 374e6bc546c..5c2307b4e05 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/DashboardIndex.java @@ -54,7 +54,7 @@ public class DashboardIndex implements SearchIndex { doc.put("data_model_suggest", dataModelSuggest); doc.put("service_suggest", serviceSuggest); doc.put("serviceType", dashboard.getServiceType()); - doc.put("lineage", SearchIndex.getLineageData(dashboard.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(dashboard.getEntityReference())); doc.put("service", getEntityWithDisplayName(dashboard.getService())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MetricIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MetricIndex.java index 787caa5c1a8..99c36d37759 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MetricIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MetricIndex.java @@ -32,7 +32,7 @@ public class MetricIndex implements SearchIndex { public Map buildSearchIndexDocInternal(Map doc) { Map commonAttributes = getCommonAttributesMap(metric, Entity.METRIC); doc.putAll(commonAttributes); - doc.put("lineage", SearchIndex.getLineageData(metric.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(metric.getEntityReference())); return doc; } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MlModelIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MlModelIndex.java index 98865b5f63a..416b775fb54 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MlModelIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/MlModelIndex.java @@ -35,7 +35,7 @@ public class MlModelIndex implements SearchIndex { doc.put("tags", parseTags.getTags()); doc.put("tier", parseTags.getTierTag()); doc.put("serviceType", mlModel.getServiceType()); - doc.put("lineage", SearchIndex.getLineageData(mlModel.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(mlModel.getEntityReference())); doc.put("service", getEntityWithDisplayName(mlModel.getService())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/PipelineIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/PipelineIndex.java index 9479ae8d947..ca293b4b989 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/PipelineIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/PipelineIndex.java @@ -51,7 +51,7 @@ public class PipelineIndex implements SearchIndex { doc.put("task_suggest", taskSuggest); doc.put("service_suggest", serviceSuggest); doc.put("serviceType", pipeline.getServiceType()); - doc.put("lineage", SearchIndex.getLineageData(pipeline.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(pipeline.getEntityReference())); doc.put("service", getEntityWithDisplayName(pipeline.getService())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchEntityIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchEntityIndex.java index a04ebee91a0..2ed56f73ad3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchEntityIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchEntityIndex.java @@ -32,7 +32,7 @@ public record SearchEntityIndex(org.openmetadata.schema.entity.data.SearchIndex doc.put("tier", parseTags.getTierTag()); doc.put("service", getEntityWithDisplayName(searchIndex.getService())); doc.put("indexType", searchIndex.getIndexType()); - doc.put("lineage", SearchIndex.getLineageData(searchIndex.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(searchIndex.getEntityReference())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java index 902366cea84..79a29ba8fa0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java @@ -6,7 +6,7 @@ import static org.openmetadata.schema.type.Include.ALL; import static org.openmetadata.service.Entity.FIELD_DESCRIPTION; import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME; import static org.openmetadata.service.Entity.getEntityByName; -import static org.openmetadata.service.jdbi3.LineageRepository.buildRelationshipDetailsMap; +import static org.openmetadata.service.jdbi3.LineageRepository.buildEntityLineageData; import static org.openmetadata.service.search.EntityBuilderConstant.DISPLAY_NAME_KEYWORD; import static org.openmetadata.service.search.EntityBuilderConstant.FIELD_DISPLAY_NAME_NGRAM; import static org.openmetadata.service.search.EntityBuilderConstant.FULLY_QUALIFIED_NAME; @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.api.lineage.EsLineageData; import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; @@ -42,7 +43,7 @@ import org.openmetadata.service.util.JsonUtils; public interface SearchIndex { Set DEFAULT_EXCLUDED_FIELDS = - Set.of("changeDescription", "lineage.pipeline.changeDescription", "connection"); + Set.of("changeDescription", "upstreamLineage.pipeline.changeDescription", "connection"); public static final SearchClient searchClient = Entity.getSearchRepository().getSearchClient(); default Map buildSearchIndexDoc() { @@ -149,31 +150,25 @@ public interface SearchIndex { return nullOrEmpty(entity.getDescription()) ? "INCOMPLETE" : "COMPLETE"; } - static List> getLineageData(EntityReference entity) { - List> data = new ArrayList<>(); - CollectionDAO dao = Entity.getCollectionDAO(); - List toRelationshipsRecords = - dao.relationshipDAO() - .findTo(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal()); - for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : toRelationshipsRecords) { + static List getLineageData(EntityReference entity) { + return new ArrayList<>( + getLineageDataFromRefs( + entity, + Entity.getCollectionDAO() + .relationshipDAO() + .findFrom(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal()))); + } + + static List getLineageDataFromRefs( + EntityReference entity, List records) { + List data = new ArrayList<>(); + for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : records) { EntityReference ref = Entity.getEntityReferenceById( entityRelationshipRecord.getType(), entityRelationshipRecord.getId(), Include.ALL); LineageDetails lineageDetails = JsonUtils.readValue(entityRelationshipRecord.getJson(), LineageDetails.class); - data.add(buildRelationshipDetailsMap(entity, ref, lineageDetails)); - } - List fromRelationshipsRecords = - dao.relationshipDAO() - .findFrom(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal()); - for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : - fromRelationshipsRecords) { - EntityReference ref = - Entity.getEntityReferenceById( - entityRelationshipRecord.getType(), entityRelationshipRecord.getId(), Include.ALL); - LineageDetails lineageDetails = - JsonUtils.readValue(entityRelationshipRecord.getJson(), LineageDetails.class); - data.add(buildRelationshipDetailsMap(ref, entity, lineageDetails)); + data.add(buildEntityLineageData(ref, entity, lineageDetails)); } return data; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java index 06394422581..d6fd404ffda 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java @@ -30,7 +30,7 @@ public record StoredProcedureIndex(StoredProcedure storedProcedure) implements S ParseTags parseTags = new ParseTags(Entity.getEntityTags(Entity.STORED_PROCEDURE, storedProcedure)); doc.put("tags", parseTags.getTags()); - doc.put("lineage", SearchIndex.getLineageData(storedProcedure.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(storedProcedure.getEntityReference())); doc.put("tier", parseTags.getTierTag()); doc.put("service", getEntityWithDisplayName(storedProcedure.getService())); return doc; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java index d4c44bb1f03..6a43bf3ceba 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java @@ -103,7 +103,7 @@ public record TableIndex(Table table) implements ColumnIndex { doc.put("schemaDefinition", table.getSchemaDefinition()); doc.put("service", getEntityWithDisplayName(table.getService())); doc.put("database", getEntityWithDisplayName(table.getDatabase())); - doc.put("lineage", SearchIndex.getLineageData(table.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(table.getEntityReference())); doc.put("entityRelationship", SearchIndex.populateEntityRelationshipData(table)); doc.put("databaseSchema", getEntityWithDisplayName(table.getDatabaseSchema())); return doc; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TopicIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TopicIndex.java index 109c97217eb..9d9159a08e5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TopicIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TopicIndex.java @@ -81,7 +81,7 @@ public class TopicIndex implements SearchIndex { doc.put("field_suggest", fieldSuggest); doc.put("service_suggest", serviceSuggest); doc.put("serviceType", topic.getServiceType()); - doc.put("lineage", SearchIndex.getLineageData(topic.getEntityReference())); + doc.put("upstreamLineage", SearchIndex.getLineageData(topic.getEntityReference())); doc.put("messageSchema", topic.getMessageSchema() != null ? topic.getMessageSchema() : null); doc.put("service", getEntityWithDisplayName(topic.getService())); return doc; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 50a90e72486..392b8e37913 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -2,6 +2,7 @@ package org.openmetadata.service.search.opensearch; import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static javax.ws.rs.core.Response.Status.OK; +import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA; import static org.openmetadata.service.Entity.DATA_PRODUCT; @@ -67,6 +68,11 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.jetbrains.annotations.NotNull; import org.openmetadata.common.utils.CommonUtil; +import org.openmetadata.schema.api.lineage.EsLineageData; +import org.openmetadata.schema.api.lineage.LineageDirection; +import org.openmetadata.schema.api.lineage.RelationshipRef; +import org.openmetadata.schema.api.lineage.SearchLineageRequest; +import org.openmetadata.schema.api.lineage.SearchLineageResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; @@ -839,64 +845,26 @@ public class OpenSearchClient implements SearchClient { return Response.status(OK).entity(response).build(); } - public Map searchLineageInternal( - String fqn, - int upstreamDepth, - int downstreamDepth, - String queryFilter, - boolean deleted, - String entityType) - throws IOException { - if (entityType.equalsIgnoreCase(Entity.PIPELINE) - || entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) { - return searchPipelineLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); - } - Map responseMap = new HashMap<>(); - Set> edges = new HashSet<>(); - Set> nodes = new HashSet<>(); - os.org.opensearch.action.search.SearchRequest searchRequest = - new os.org.opensearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - List sourceFieldsToExcludeCopy = new ArrayList<>(SOURCE_FIELDS_TO_EXCLUDE); - searchSourceBuilder.fetchSource(null, sourceFieldsToExcludeCopy.toArray(String[]::new)); - searchSourceBuilder.query( - QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); - searchRequest.source(searchSourceBuilder.size(1000)); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - for (var hit : searchResponse.getHits().getHits()) { - HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); - tempMap.keySet().removeAll(FIELDS_TO_REMOVE); - responseMap.put("entity", tempMap); - } - getLineage( - fqn, - downstreamDepth, - edges, - nodes, - queryFilter, - "lineage.fromEntity.fqnHash.keyword", - deleted); - getLineage( - fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqnHash.keyword", deleted); - responseMap.put("edges", edges); - responseMap.put("nodes", nodes); - return responseMap; + @Override + public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException { + SearchLineageResult result = + getDownStreamLineage(lineageRequest.withDirection(LineageDirection.DOWNSTREAM)); + SearchLineageResult upstreamLineage = + getDownStreamLineage(lineageRequest.withDirection(LineageDirection.UPSTREAM)); + + // Add All nodes and edges from upstream lineage to result + result.getNodes().putAll(upstreamLineage.getNodes()); + result.getUpstreamEdges().putAll(upstreamLineage.getUpstreamEdges()); + return result; } - @Override - public Response searchLineage( - String fqn, - int upstreamDepth, - int downstreamDepth, - String queryFilter, - boolean deleted, - String entityType) + public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest) throws IOException { - Map responseMap = - searchLineageInternal( - fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); - return Response.status(OK).entity(responseMap).build(); + if (lineageRequest.getDirection().equals(LineageDirection.UPSTREAM)) { + return getUpstreamLineage(lineageRequest); + } else { + return getDownStreamLineage(lineageRequest); + } } private void getEntityRelationship( @@ -1100,56 +1068,148 @@ public class OpenSearchClient implements SearchClient { return Response.status(OK).entity(responseMap).build(); } - private void getLineage( - String fqn, - int depth, - Set> edges, - Set> nodes, - String queryFilter, - String direction, - boolean deleted) + @Override + public Map searchEntityByKey( + String indexAlias, String keyName, String keyValue, List fieldsToRemove) throws IOException { - if (depth <= 0) { - return; - } os.org.opensearch.action.search.SearchRequest searchRequest = - new os.org.opensearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); - searchSourceBuilder.query( - QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))); - if (CommonUtil.nullOrEmpty(deleted)) { - searchSourceBuilder.query( - QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))) - .must(QueryBuilders.termQuery("deleted", deleted))); - } - buildSearchSourceFilter(queryFilter, searchSourceBuilder); - - searchRequest.source(searchSourceBuilder.size(1000)); + getSearchRequest(indexAlias, null, keyName, keyValue, null, null, fieldsToRemove); os.org.opensearch.action.search.SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - for (var hit : searchResponse.getHits().getHits()) { - List> lineage = - (List>) hit.getSourceAsMap().get("lineage"); - HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); - nodes.add(tempMap); - for (Map lin : lineage) { - HashMap fromEntity = (HashMap) lin.get("fromEntity"); - HashMap toEntity = (HashMap) lin.get("toEntity"); - if (direction.equalsIgnoreCase("lineage.fromEntity.fqnHash.keyword")) { - if (!edges.contains(lin) && fromEntity.get("fqn").equals(fqn)) { - edges.add(lin); - getLineage( - toEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); + int noOfHits = searchResponse.getHits().getHits().length; + if (noOfHits == 1) { + return new HashMap<>( + JsonUtils.getMap(searchResponse.getHits().getHits()[0].getSourceAsMap())); + } else { + throw new SearchException( + String.format( + "Issue in Search Entity By Key: %s, Value: %s , Number of Hits: %s", + keyName, keyValue, noOfHits)); + } + } + + private os.org.opensearch.action.search.SearchRequest getSearchRequest( + String indexAlias, + String queryFilter, + String key, + String value, + Boolean deleted, + List fieldsToInclude, + List fieldsToRemove) { + os.org.opensearch.action.search.SearchRequest searchRequest = + new os.org.opensearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(indexAlias)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.fetchSource( + listOrEmpty(fieldsToInclude).toArray(String[]::new), + listOrEmpty(fieldsToRemove).toArray(String[]::new)); + searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(key, value))); + if (!CommonUtil.nullOrEmpty(deleted)) { + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(key, value)) + .must(QueryBuilders.termQuery("deleted", deleted))); + } + + buildSearchSourceFilter(queryFilter, searchSourceBuilder); + searchRequest.source(searchSourceBuilder.size(1000)); + return searchRequest; + } + + public SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest) + throws IOException { + SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>()); + getUpstreamLineageRecursively(lineageRequest, result); + return result; + } + + private void getUpstreamLineageRecursively( + SearchLineageRequest lineageRequest, SearchLineageResult result) throws IOException { + if (lineageRequest.getUpstreamDepth() <= 0) { + return; + } + + Map entityMap = + searchEntityByKey( + GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE); + if (!entityMap.isEmpty()) { + result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap); + // Contains Upstream Lineage + if (entityMap.containsKey(UPSTREAM_LINEAGE_FIELD)) { + List upStreamEntities = + JsonUtils.readOrConvertValues( + entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class); + for (EsLineageData esLineageData : upStreamEntities) { + result + .getUpstreamEdges() + .putIfAbsent( + esLineageData.getDocId(), + esLineageData.withToEntity(SearchClient.getRelationshipRef(entityMap))); + String upstreamEntityId = esLineageData.getFromEntity().getId().toString(); + if (!result.getNodes().containsKey(upstreamEntityId)) { + SearchLineageRequest updatedRequest = + JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class) + .withUpstreamDepth(lineageRequest.getUpstreamDepth() - 1) + .withFqn(esLineageData.getFromEntity().getFqn()); + getUpstreamLineageRecursively(updatedRequest, result); } - } else { - if (!edges.contains(lin) && toEntity.get("fqn").equals(fqn)) { - edges.add(lin); - getLineage( - fromEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); + } + } + } + } + + public SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest) + throws IOException { + SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>()); + Map entityMap = + searchEntityByKey( + GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE); + if (!entityMap.isEmpty()) { + result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap); + getDownStreamRecursively(SearchClient.getRelationshipRef(entityMap), lineageRequest, result); + } + return result; + } + + private void getDownStreamRecursively( + RelationshipRef fromEntity, SearchLineageRequest lineageRequest, SearchLineageResult result) + throws IOException { + if (lineageRequest.getDownstreamDepth() <= 0) { + return; + } + + os.org.opensearch.action.search.SearchRequest searchDownstreamEntities = + getSearchRequest( + GLOBAL_SEARCH_ALIAS, + lineageRequest.getQueryFilter(), + "upstreamLineage.fromEntity.fqnHash", + FullyQualifiedName.buildHash(lineageRequest.getFqn()), + lineageRequest.getIncludeDeleted(), + null, + SOURCE_FIELDS_TO_EXCLUDE); + os.org.opensearch.action.search.SearchResponse downstreamEntities = + client.search(searchDownstreamEntities, RequestOptions.DEFAULT); + for (os.org.opensearch.search.SearchHit searchHit : downstreamEntities.getHits().getHits()) { + Map entityMap = new HashMap<>(searchHit.getSourceAsMap()); + if (!entityMap.isEmpty()) { + result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap); + List upStreamEntities = + JsonUtils.readOrConvertValues( + entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class); + EsLineageData esLineageData = + SearchClient.getEsLineageDataFromUpstreamLineage(fromEntity.getFqn(), upStreamEntities); + if (esLineageData != null) { + RelationshipRef toEntity = SearchClient.getRelationshipRef(entityMap); + result + .getDownstreamEdges() + .putIfAbsent(esLineageData.getDocId(), esLineageData.withToEntity(toEntity)); + String downStreamEntityId = toEntity.getId().toString(); + if (!result.getNodes().containsKey(downStreamEntityId)) { + SearchLineageRequest updatedRequest = + JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class) + .withDownstreamDepth(lineageRequest.getDownstreamDepth() - 1) + .withFqn(toEntity.getFqn()); + getDownStreamRecursively(toEntity, updatedRequest, result); } } } @@ -1293,107 +1353,6 @@ public class OpenSearchClient implements SearchClient { return client.search(searchRequest, RequestOptions.DEFAULT); } - private Map searchPipelineLineage( - String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) - throws IOException { - Map responseMap = new HashMap<>(); - Set> edges = new HashSet<>(); - Set> nodes = new HashSet<>(); - responseMap.put("entity", null); - Object[] searchAfter = null; - long processedRecords = 0; - long totalRecords = -1; - while (totalRecords != processedRecords) { - os.org.opensearch.action.search.SearchRequest searchRequest = - new os.org.opensearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - boolQueryBuilder.should( - QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn))); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); - FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName"); - searchSourceBuilder.sort(sortBuilder); - searchSourceBuilder.query(boolQueryBuilder); - if (searchAfter != null) { - searchSourceBuilder.searchAfter(searchAfter); - } - if (CommonUtil.nullOrEmpty(deleted)) { - searchSourceBuilder.query( - QueryBuilders.boolQuery() - .must(boolQueryBuilder) - .must(QueryBuilders.termQuery("deleted", deleted))); - } - buildSearchSourceFilter(queryFilter, searchSourceBuilder); - - searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - for (var hit : searchResponse.getHits().getHits()) { - List> lineage = - (List>) hit.getSourceAsMap().get("lineage"); - HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); - nodes.add(tempMap); - for (Map lin : lineage) { - HashMap fromEntity = (HashMap) lin.get("fromEntity"); - HashMap toEntity = (HashMap) lin.get("toEntity"); - HashMap pipeline = (HashMap) lin.get("pipeline"); - if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) { - edges.add(lin); - getLineage( - fromEntity.get("fqn"), - upstreamDepth, - edges, - nodes, - queryFilter, - "lineage.toEntity.fqn.keyword", - deleted); - getLineage( - toEntity.get("fqn"), - downstreamDepth, - edges, - nodes, - queryFilter, - "lineage.fromEntity.fqn.keyword", - deleted); - } - } - } - totalRecords = searchResponse.getHits().getTotalHits().value; - int currentHits = searchResponse.getHits().getHits().length; - processedRecords += currentHits; - if (currentHits > 0) { - searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues(); - } else { - searchAfter = null; - } - } - getLineage( - fqn, downstreamDepth, edges, nodes, queryFilter, "lineage.fromEntity.fqn.keyword", deleted); - getLineage( - fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted); - - if (edges.isEmpty()) { - os.org.opensearch.action.search.SearchRequest searchRequestForEntity = - new os.org.opensearch.action.search.SearchRequest( - Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); - SearchSourceBuilder searchSourceBuilderForEntity = new SearchSourceBuilder(); - searchSourceBuilderForEntity.query( - QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); - searchRequestForEntity.source(searchSourceBuilderForEntity.size(1000)); - SearchResponse searchResponseForEntity = - client.search(searchRequestForEntity, RequestOptions.DEFAULT); - for (var hit : searchResponseForEntity.getHits().getHits()) { - HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); - tempMap.keySet().removeAll(FIELDS_TO_REMOVE); - responseMap.put("entity", tempMap); - } - } - responseMap.put("edges", edges); - responseMap.put("nodes", nodes); - return responseMap; - } - private static FunctionScoreQueryBuilder boostScore(QueryStringQueryBuilder queryBuilder) { FunctionScoreQueryBuilder.FilterFunctionBuilder tier1Boost = new FunctionScoreQueryBuilder.FilterFunctionBuilder( @@ -2271,13 +2230,14 @@ public class OpenSearchClient implements SearchClient { @Override public void updateLineage( - String indexName, Pair fieldAndValue, Map lineagaData) { + String indexName, Pair fieldAndValue, EsLineageData lineageData) { if (isClientAvailable) { UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); updateByQueryRequest.setQuery( new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue()) .operator(Operator.AND)); - Map params = Collections.singletonMap("lineageData", lineagaData); + Map params = + Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData)); Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params); updateByQueryRequest.setScript(script); diff --git a/openmetadata-spec/src/main/resources/json/schema/api/lineage/esLineageData.json b/openmetadata-spec/src/main/resources/json/schema/api/lineage/esLineageData.json new file mode 100644 index 00000000000..cf0308bd1c0 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/api/lineage/esLineageData.json @@ -0,0 +1,73 @@ +{ + "$id": "https://open-metadata.org/schema/api/lineage/esLineageData.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "EsLineageData", + "description": "Response object for the search lineage request from Elastic Search.", + "javaType": "org.openmetadata.schema.api.lineage.EsLineageData", + "type": "object", + "definitions": { + "relationshipRef": { + "description": "Relationship Reference to an Entity.", + "type": "object", + "properties": { + "id": { + "description": "Unique identifier of this entity instance.", + "$ref": "../../type/basic.json#/definitions/uuid" + }, + "fqn": { + "description": "FullyQualifiedName of the entity.", + "$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName" + }, + "fqnHash": { + "description": "FullyQualifiedName Hash of the entity.", + "type": "string" + }, + "type": { + "description": "Type of the entity.", + "type": "string" + } + } + } + }, + "properties": { + "fromEntity": { + "description": "From Entity.", + "$ref": "#/definitions/relationshipRef" + }, + "toEntity": { + "description": "To Entity.", + "$ref": "#/definitions/relationshipRef" + }, + "pipeline": { + "description": "Pipeline in case pipeline is present between entities." + }, + "sqlQuery": { + "description": "Sql Query associated.", + "type": "string" + }, + "columns": { + "description": "Columns associated.", + "type": "array", + "items": { + "$ref": "../../type/entityLineage.json#/definitions/columnLineage" + } + }, + "description": { + "description": "Description.", + "type": "string" + }, + "source": { + "description": "Source of the Lineage.", + "type": "string" + }, + "doc_id": { + "description": "Doc Id for the Lineage.", + "type": "string" + }, + "pipelineEntityType": { + "description": "Pipeline Entity or Stored procedure.", + "type": "string" + } + }, + "additionalProperties": false +} diff --git a/openmetadata-spec/src/main/resources/json/schema/api/lineage/lineageDirection.json b/openmetadata-spec/src/main/resources/json/schema/api/lineage/lineageDirection.json new file mode 100644 index 00000000000..271ffdd2100 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/api/lineage/lineageDirection.json @@ -0,0 +1,9 @@ +{ + "$id": "https://open-metadata.org/schema/api/lineage/lineageDirection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "LineageDirection", + "description": "Lineage Direction Schema.", + "javaType": "org.openmetadata.schema.api.lineage.LineageDirection", + "type": "string", + "enum": ["Upstream", "Downstream"] +} diff --git a/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageRequest.json b/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageRequest.json new file mode 100644 index 00000000000..e3c357f22b2 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageRequest.json @@ -0,0 +1,46 @@ +{ + "$id": "https://open-metadata.org/schema/api/lineage/searchLineageRequest.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "SearchLineageRequest", + "description": "Search Lineage Request Schema to find linage from Elastic Search.", + "javaType": "org.openmetadata.schema.api.lineage.SearchLineageRequest", + "type": "object", + "properties": { + "fqn": { + "description": "Entity Fqn to search lineage", + "type": "string" + }, + "entityType": { + "description": "Query Filter", + "type": "string" + }, + "direction": { + "$ref": "./lineageDirection.json" + }, + "directionValue": { + "description": "Lineage Direction Value.", + "type": "string" + }, + "upstreamDepth": { + "description": "The upstream depth of the lineage", + "type": "integer", + "default": 3 + }, + "downstreamDepth": { + "description": "The downstream depth of the lineage", + "type": "integer", + "default": 3 + }, + "queryFilter": { + "description": "Query Filter", + "type": "string" + }, + "includeDeleted": { + "description": "Include deleted entities", + "type": "boolean", + "default": false + } + }, + "required": ["fqn", "entityType", "direction"], + "additionalProperties": false +} diff --git a/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageResult.json b/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageResult.json new file mode 100644 index 00000000000..4613bd1f063 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/api/lineage/searchLineageResult.json @@ -0,0 +1,35 @@ +{ + "$id": "https://open-metadata.org/schema/api/lineage/searchLineageResult.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "SearchLineageResult", + "description": "Search Lineage Response for the Lineage Request", + "javaType": "org.openmetadata.schema.api.lineage.SearchLineageResult", + "type": "object", + "definitions": { + "entity" : { + "description": "A general object to hold any object." + } + }, + "properties": { + "paging": { + "$ref": "../../type/paging.json" + }, + "direction": { + "description": "Lineage Direction Value.", + "type": "string" + }, + "nodes" : { + "description": "Nodes in the lineage response.", + "existingJavaType": "java.util.Map" + }, + "upstreamEdges": { + "description": "Upstream Edges for the entity.", + "existingJavaType": "java.util.Map" + }, + "downstreamEdges": { + "description": "Downstream Edges for the node.", + "existingJavaType": "java.util.Map" + } + }, + "additionalProperties": false +} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/esLineageData.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/esLineageData.ts new file mode 100644 index 00000000000..2da1437de9f --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/esLineageData.ts @@ -0,0 +1,99 @@ +/* + * Copyright 2025 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Response object for the search lineage request from Elastic Search. + */ +export interface EsLineageData { + /** + * Columns associated. + */ + columns?: ColumnLineage[]; + /** + * Description. + */ + description?: string; + /** + * Doc Id for the Lineage. + */ + doc_id?: string; + /** + * From Entity. + */ + fromEntity?: RelationshipRef; + /** + * Pipeline in case pipeline is present between entities. + */ + pipeline?: any; + /** + * Pipeline Entity or Stored procedure. + */ + pipelineEntityType?: string; + /** + * Source of the Lineage. + */ + source?: string; + /** + * Sql Query associated. + */ + sqlQuery?: string; + /** + * To Entity. + */ + toEntity?: RelationshipRef; +} + +export interface ColumnLineage { + /** + * One or more source columns identified by fully qualified column name used by + * transformation function to create destination column. + */ + fromColumns?: string[]; + /** + * Transformation function applied to source columns to create destination column. That is + * `function(fromColumns) -> toColumn`. + */ + function?: string; + /** + * Destination column identified by fully qualified column name created by the + * transformation of source columns. + */ + toColumn?: string; + [property: string]: any; +} + +/** + * From Entity. + * + * Relationship Reference to an Entity. + * + * To Entity. + */ +export interface RelationshipRef { + /** + * FullyQualifiedName of the entity. + */ + fqn?: string; + /** + * FullyQualifiedName Hash of the entity. + */ + fqnHash?: string; + /** + * Unique identifier of this entity instance. + */ + id?: string; + /** + * Type of the entity. + */ + type?: string; + [property: string]: any; +} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/lineageDirection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/lineageDirection.ts new file mode 100644 index 00000000000..941f89c401a --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/lineageDirection.ts @@ -0,0 +1,19 @@ +/* + * Copyright 2025 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Lineage Direction Schema. + */ +export enum LineageDirection { + Downstream = "Downstream", + Upstream = "Upstream", +} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageRequest.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageRequest.ts new file mode 100644 index 00000000000..45caa6d6410 --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageRequest.ts @@ -0,0 +1,54 @@ +/* + * Copyright 2025 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Search Lineage Request Schema to find linage from Elastic Search. + */ +export interface SearchLineageRequest { + direction: LineageDirection; + /** + * Lineage Direction Value. + */ + directionValue?: string; + /** + * The downstream depth of the lineage + */ + downstreamDepth?: number; + /** + * Query Filter + */ + entityType: string; + /** + * Entity Fqn to search lineage + */ + fqn: string; + /** + * Include deleted entities + */ + includeDeleted?: boolean; + /** + * Query Filter + */ + queryFilter?: string; + /** + * The upstream depth of the lineage + */ + upstreamDepth?: number; +} + +/** + * Lineage Direction Schema. + */ +export enum LineageDirection { + Downstream = "Downstream", + Upstream = "Upstream", +} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageResult.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageResult.ts new file mode 100644 index 00000000000..19858c45b81 --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/searchLineageResult.ts @@ -0,0 +1,60 @@ +/* + * Copyright 2025 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Search Lineage Response for the Lineage Request + */ +export interface SearchLineageResult { + /** + * Lineage Direction Value. + */ + direction?: string; + /** + * Downstream Edges for the node. + */ + downstreamEdges?: any; + /** + * Nodes in the lineage response. + */ + nodes?: any; + paging?: Paging; + /** + * Upstream Edges for the entity. + */ + upstreamEdges?: any; +} + +/** + * Type used for cursor based pagination information in GET list responses. + */ +export interface Paging { + /** + * After cursor used for getting the next page (see API pagination for details). + */ + after?: string; + /** + * Before cursor used for getting the previous page (see API pagination for details). + */ + before?: string; + /** + * Limit used in case of offset based pagination. + */ + limit?: number; + /** + * Offset used in case of offset based pagination. + */ + offset?: number; + /** + * Total number of entries available to page through. + */ + total: number; +}