- Update lineage model and api

This commit is contained in:
mohitdeuex 2025-02-02 22:46:28 +05:30
parent b792cd0f99
commit c5cce2b0c7
28 changed files with 986 additions and 539 deletions

View File

@ -33,6 +33,7 @@ import java.util.Base64;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -177,6 +178,10 @@ public final class CommonUtil {
return Optional.ofNullable(list).orElse(Collections.emptyList()); return Optional.ofNullable(list).orElse(Collections.emptyList());
} }
public static <K, V> Map<K, V> collectionOrEmpty(Map<K, V> input) {
return Optional.ofNullable(input).orElse(new HashMap<>());
}
public static <T> List<T> listOrEmptyMutable(List<T> list) { public static <T> List<T> listOrEmptyMutable(List<T> list) {
return nullOrEmpty(list) ? new ArrayList<>() : new ArrayList<>(list); return nullOrEmpty(list) ? new ArrayList<>() : new ArrayList<>(list);
} }
@ -209,6 +214,14 @@ public final class CommonUtil {
} }
} }
public static <T> List<T> collectionOrDefault(List<T> c, List<T> defaultValue) {
if (nullOrEmpty(c)) {
return defaultValue;
} else {
return c;
}
}
public static String getResourceAsStream(ClassLoader loader, String file) throws IOException { public static String getResourceAsStream(ClassLoader loader, String file) throws IOException {
return IOUtils.toString(Objects.requireNonNull(loader.getResourceAsStream(file)), UTF_8); return IOUtils.toString(Objects.requireNonNull(loader.getResourceAsStream(file)), UTF_8);
} }

View File

@ -14,6 +14,8 @@
package org.openmetadata.service.jdbi3; package org.openmetadata.service.jdbi3;
import static javax.ws.rs.core.Response.Status.OK; import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.collectionOrDefault;
import static org.openmetadata.common.utils.CommonUtil.nullOrDefault;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.csv.CsvUtil.addField; import static org.openmetadata.csv.CsvUtil.addField;
import static org.openmetadata.csv.EntityCsv.getCsvDocumentation; import static org.openmetadata.csv.EntityCsv.getCsvDocumentation;
@ -48,9 +50,13 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.transaction.Transaction; import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.csv.CsvUtil; import org.openmetadata.csv.CsvUtil;
import org.openmetadata.schema.api.lineage.AddLineage; import org.openmetadata.schema.api.lineage.AddLineage;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.api.lineage.RelationshipRef;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.entity.data.APIEndpoint; import org.openmetadata.schema.entity.data.APIEndpoint;
import org.openmetadata.schema.entity.data.Container; import org.openmetadata.schema.entity.data.Container;
import org.openmetadata.schema.entity.data.Dashboard; import org.openmetadata.schema.entity.data.Dashboard;
@ -144,70 +150,55 @@ public class LineageRepository {
private void addLineageToSearch( private void addLineageToSearch(
EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) { EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) {
IndexMapping sourceIndexMapping =
Entity.getSearchRepository().getIndexMapping(fromEntity.getType());
String sourceIndexName =
sourceIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias());
IndexMapping destinationIndexMapping = IndexMapping destinationIndexMapping =
Entity.getSearchRepository().getIndexMapping(toEntity.getType()); Entity.getSearchRepository().getIndexMapping(toEntity.getType());
String destinationIndexName = String destinationIndexName =
destinationIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()); destinationIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias());
Map<String, Object> relationshipDetails = // For lineage from -> to (not stored) since the doc itself is the toEntity
buildRelationshipDetailsMap(fromEntity, toEntity, lineageDetails); EsLineageData lineageData =
Pair<String, String> from = new ImmutablePair<>("_id", fromEntity.getId().toString()); buildEntityLineageData(fromEntity, toEntity, lineageDetails).withToEntity(null);
Pair<String, String> to = new ImmutablePair<>("_id", toEntity.getId().toString()); Pair<String, String> to = new ImmutablePair<>("_id", toEntity.getId().toString());
searchClient.updateLineage(sourceIndexName, from, relationshipDetails); searchClient.updateLineage(destinationIndexName, to, lineageData);
searchClient.updateLineage(destinationIndexName, to, relationshipDetails);
} }
public static Map<String, Object> buildEntityRefMap(EntityReference entityRef) { public static RelationshipRef buildEntityRefLineage(EntityReference entityRef) {
Map<String, Object> details = new HashMap<>(); return new RelationshipRef()
details.put("id", entityRef.getId().toString()); .withId(entityRef.getId())
details.put("type", entityRef.getType()); .withType(entityRef.getType())
details.put("fqn", entityRef.getFullyQualifiedName()); .withFqn(entityRef.getFullyQualifiedName())
details.put("fqnHash", FullyQualifiedName.buildHash(entityRef.getFullyQualifiedName())); .withFqnHash(FullyQualifiedName.buildHash(entityRef.getFullyQualifiedName()));
return details;
} }
public static Map<String, Object> buildRelationshipDetailsMap( public static EsLineageData buildEntityLineageData(
EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) { EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) {
Map<String, Object> relationshipDetails = new HashMap<>(); EsLineageData lineageData =
relationshipDetails.put( new EsLineageData()
"doc_id", fromEntity.getId().toString() + "-" + toEntity.getId().toString()); .withDocId(fromEntity.getId().toString() + "-->" + toEntity.getId().toString())
relationshipDetails.put("fromEntity", buildEntityRefMap(fromEntity)); .withFromEntity(buildEntityRefLineage(fromEntity));
relationshipDetails.put("toEntity", buildEntityRefMap(toEntity));
if (lineageDetails != null) { if (lineageDetails != null) {
// Add Pipeline Details // Add Pipeline Details
addPipelineDetails(relationshipDetails, lineageDetails.getPipeline()); addPipelineDetails(lineageData, lineageDetails.getPipeline());
relationshipDetails.put( lineageData.setDescription(nullOrDefault(lineageDetails.getDescription(), null));
"description", lineageData.setColumns(collectionOrDefault(lineageDetails.getColumnsLineage(), null));
CommonUtil.nullOrEmpty(lineageDetails.getDescription()) lineageData.setSqlQuery(nullOrDefault(lineageDetails.getSqlQuery(), null));
? null lineageData.setSource(nullOrDefault(lineageDetails.getSource().value(), null));
: lineageDetails.getDescription());
if (!CommonUtil.nullOrEmpty(lineageDetails.getColumnsLineage())) {
List<Map<String, Object>> colummnLineageList = new ArrayList<>();
for (ColumnLineage columnLineage : lineageDetails.getColumnsLineage()) {
colummnLineageList.add(JsonUtils.getMap(columnLineage));
} }
relationshipDetails.put("columns", colummnLineageList); return lineageData;
}
relationshipDetails.put(
"sqlQuery",
CommonUtil.nullOrEmpty(lineageDetails.getSqlQuery())
? null
: lineageDetails.getSqlQuery());
relationshipDetails.put(
"source",
CommonUtil.nullOrEmpty(lineageDetails.getSource()) ? null : lineageDetails.getSource());
}
return relationshipDetails;
} }
public static void addPipelineDetails( public static void addPipelineDetails(EsLineageData lineageData, EntityReference pipelineRef) {
Map<String, Object> relationshipDetails, EntityReference pipelineRef) { if (nullOrEmpty(pipelineRef)) {
if (CommonUtil.nullOrEmpty(pipelineRef)) { lineageData.setPipeline(null);
relationshipDetails.put(PIPELINE, JsonUtils.getMap(null));
} else { } else {
Pair<String, Map<String, Object>> pipelineOrStoredProcedure =
getPipelineOrStoredProcedure(pipelineRef, List.of("changeDescription"));
lineageData.setPipelineEntityType(pipelineOrStoredProcedure.getLeft());
lineageData.setPipeline(pipelineOrStoredProcedure.getRight());
}
}
public static Pair<String, Map<String, Object>> getPipelineOrStoredProcedure(
EntityReference pipelineRef, List<String> fieldsToRemove) {
Map<String, Object> pipelineMap; Map<String, Object> pipelineMap;
if (pipelineRef.getType().equals(PIPELINE)) { if (pipelineRef.getType().equals(PIPELINE)) {
pipelineMap = pipelineMap =
@ -216,10 +207,8 @@ public class LineageRepository {
} else { } else {
pipelineMap = JsonUtils.getMap(Entity.getEntity(pipelineRef, "tags,owners", Include.ALL)); pipelineMap = JsonUtils.getMap(Entity.getEntity(pipelineRef, "tags,owners", Include.ALL));
} }
pipelineMap.remove("changeDescription"); fieldsToRemove.forEach(pipelineMap::remove);
relationshipDetails.put("pipelineEntityType", pipelineRef.getType()); return Pair.of(pipelineRef.getType(), pipelineMap);
relationshipDetails.put(PIPELINE, pipelineMap);
}
} }
private String validateLineageDetails( private String validateLineageDetails(
@ -249,13 +238,21 @@ public class LineageRepository {
throws IOException { throws IOException {
CsvDocumentation documentation = getCsvDocumentation("lineage"); CsvDocumentation documentation = getCsvDocumentation("lineage");
List<CsvHeader> headers = documentation.getHeaders(); List<CsvHeader> headers = documentation.getHeaders();
Map<String, Object> lineageMap = // TODO: Fix the lineage we need booth the lineage
SearchLineageResult result =
Entity.getSearchRepository() Entity.getSearchRepository()
.searchLineageForExport( .searchLineageForExport(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); fqn,
upstreamDepth,
downstreamDepth,
queryFilter,
deleted,
entityType,
LineageDirection.UPSTREAM);
CsvFile csvFile = new CsvFile().withHeaders(headers); CsvFile csvFile = new CsvFile().withHeaders(headers);
addRecords(csvFile, lineageMap); // TODO: Fix This
// addRecords(csvFile, result);
return CsvUtil.formatCsv(csvFile); return CsvUtil.formatCsv(csvFile);
} }
@ -278,10 +275,19 @@ public class LineageRepository {
String entityType, String entityType,
boolean deleted) { boolean deleted) {
try { try {
Response response = // TODO: fix Export to consider for both the nodes
SearchLineageResult response =
Entity.getSearchRepository() Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); .searchLineage(
String jsonResponse = JsonUtils.pojoToJson(response.getEntity()); new SearchLineageRequest()
.withFqn(fqn)
.withUpstreamDepth(upstreamDepth)
.withDownstreamDepth(downstreamDepth)
.withQueryFilter(queryFilter)
.withIncludeDeleted(deleted)
.withEntityType(entityType)
.withDirection(LineageDirection.UPSTREAM));
String jsonResponse = JsonUtils.pojoToJson(response);
JsonNode rootNode = JsonUtils.readTree(jsonResponse); JsonNode rootNode = JsonUtils.readTree(jsonResponse);
Map<String, JsonNode> entityMap = new HashMap<>(); Map<String, JsonNode> entityMap = new HashMap<>();

View File

@ -51,6 +51,9 @@ import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.UriInfo;
import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.lineage.AddLineage; import org.openmetadata.schema.api.lineage.AddLineage;
import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.type.EntityLineage; import org.openmetadata.schema.type.EntityLineage;
import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.MetadataOperation; import org.openmetadata.schema.type.MetadataOperation;
@ -192,7 +195,7 @@ public class LineageResource {
mediaType = "application/json", mediaType = "application/json",
schema = @Schema(implementation = SearchResponse.class))) schema = @Schema(implementation = SearchResponse.class)))
}) })
public Response searchLineage( public SearchLineageResult searchLineage(
@Context UriInfo uriInfo, @Context UriInfo uriInfo,
@Context SecurityContext securityContext, @Context SecurityContext securityContext,
@Parameter(description = "fqn") @QueryParam("fqn") String fqn, @Parameter(description = "fqn") @QueryParam("fqn") String fqn,
@ -209,9 +212,61 @@ public class LineageResource {
boolean deleted, boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType) @Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException { throws IOException {
return Entity.getSearchRepository() return Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); .searchLineage(
new SearchLineageRequest()
.withFqn(fqn)
.withUpstreamDepth(upstreamDepth)
.withDownstreamDepth(downstreamDepth)
.withQueryFilter(queryFilter)
.withIncludeDeleted(deleted)
.withEntityType(entityType));
}
@GET
@Path("/getLineage/{direction}")
@Operation(
operationId = "searchLineageWithDirection",
summary = "Search lineage with Direction",
responses = {
@ApiResponse(
responseCode = "200",
description = "search response",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = SearchResponse.class)))
})
public SearchLineageResult searchLineageWithDirection(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "fqn") @QueryParam("fqn") String fqn,
@Parameter(description = "Direction", required = true, schema = @Schema(type = "string"))
@PathParam("direction")
LineageDirection direction,
@Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth,
@Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth")
int downstreamDepth,
@Parameter(
description =
"Elasticsearch query that will be combined with the query_string query generator from the `query` argument")
@QueryParam("query_filter")
String queryFilter,
@Parameter(description = "Filter documents by deleted param. By default deleted is false")
@QueryParam("includeDeleted")
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException {
return Entity.getSearchRepository()
.searchLineage(
new SearchLineageRequest()
.withFqn(fqn)
.withUpstreamDepth(upstreamDepth)
.withDownstreamDepth(downstreamDepth)
.withQueryFilter(queryFilter)
.withIncludeDeleted(deleted)
.withEntityType(entityType)
.withDirection(direction));
} }
@GET @GET
@ -279,8 +334,6 @@ public class LineageResource {
boolean deleted, boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType) @Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException { throws IOException {
Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
} }

View File

@ -7,6 +7,7 @@ import java.security.KeyStoreException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import javax.json.JsonArray; import javax.json.JsonArray;
@ -15,6 +16,10 @@ import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import lombok.Getter; import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.api.lineage.RelationshipRef;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.api.search.SearchSettings; import org.openmetadata.schema.api.search.SearchSettings;
import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
@ -28,6 +33,7 @@ import org.openmetadata.service.resources.settings.SettingsCache;
import org.openmetadata.service.search.models.IndexMapping; import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.search.security.RBACConditionEvaluator; import org.openmetadata.service.search.security.RBACConditionEvaluator;
import org.openmetadata.service.security.policyevaluator.SubjectContext; import org.openmetadata.service.security.policyevaluator.SubjectContext;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.ResultList; import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.util.SSLUtil; import org.openmetadata.service.util.SSLUtil;
import os.org.opensearch.action.bulk.BulkRequest; import os.org.opensearch.action.bulk.BulkRequest;
@ -35,6 +41,9 @@ import os.org.opensearch.action.bulk.BulkResponse;
import os.org.opensearch.client.RequestOptions; import os.org.opensearch.client.RequestOptions;
public interface SearchClient { public interface SearchClient {
String UPSTREAM_LINEAGE_FIELD = "upstreamLineage";
String FQN_FIELD = "fullyQualifiedName";
String ID_FIELD = "id";
ExecutorService asyncExecutor = Executors.newFixedThreadPool(1); ExecutorService asyncExecutor = Executors.newFixedThreadPool(1);
String UPDATE = "update"; String UPDATE = "update";
@ -81,10 +90,10 @@ public interface SearchClient {
"if (ctx._source.certification != null && ctx._source.certification.tagLabel != null) {ctx._source.certification.tagLabel.style = params.style; ctx._source.certification.tagLabel.description = params.description; ctx._source.certification.tagLabel.tagFQN = params.tagFQN; ctx._source.certification.tagLabel.name = params.name; }"; "if (ctx._source.certification != null && ctx._source.certification.tagLabel != null) {ctx._source.certification.tagLabel.style = params.style; ctx._source.certification.tagLabel.description = params.description; ctx._source.certification.tagLabel.tagFQN = params.tagFQN; ctx._source.certification.tagLabel.name = params.name; }";
String REMOVE_LINEAGE_SCRIPT = String REMOVE_LINEAGE_SCRIPT =
"for (int i = 0; i < ctx._source.lineage.length; i++) { if (ctx._source.lineage[i].doc_id == '%s') { ctx._source.lineage.remove(i) }}"; "for (int i = 0; i < ctx._source.upstreamLineage.length; i++) { if (ctx._source.upstreamLineage[i].doc_id == '%s') { ctx._source.upstreamLineage.remove(i) }}";
String ADD_UPDATE_LINEAGE = String ADD_UPDATE_LINEAGE =
"boolean docIdExists = false; for (int i = 0; i < ctx._source.lineage.size(); i++) { if (ctx._source.lineage[i].doc_id.equalsIgnoreCase(params.lineageData.doc_id)) { ctx._source.lineage[i] = params.lineageData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.lineage.add(params.lineageData);}"; "boolean docIdExists = false; for (int i = 0; i < ctx._source.upstreamLineage.size(); i++) { if (ctx._source.upstreamLineage[i].doc_id.equalsIgnoreCase(params.lineageData.doc_id)) { ctx._source.upstreamLineage[i] = params.lineageData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.upstreamLineage.add(params.lineageData);}";
// The script is used for updating the entityRelationship attribute of the entity in ES // The script is used for updating the entityRelationship attribute of the entity in ES
// It checks if any duplicate entry is present based on the doc_id and updates only if it is not // It checks if any duplicate entry is present based on the doc_id and updates only if it is not
@ -188,15 +197,15 @@ public interface SearchClient {
Response searchBySourceUrl(String sourceUrl) throws IOException; Response searchBySourceUrl(String sourceUrl) throws IOException;
Response searchLineage( SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException;
String fqn,
int upstreamDepth, SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest)
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException; throws IOException;
SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest) throws IOException;
SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest) throws IOException;
Response searchEntityRelationship( Response searchEntityRelationship(
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException; throws IOException;
@ -208,6 +217,10 @@ public interface SearchClient {
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException; throws IOException;
Map<String, Object> searchEntityByKey(
String indexAlias, String keyName, String keyValue, List<String> fieldsToRemove)
throws IOException;
/* /*
Used for listing knowledge page hierarchy for a given parent and page type, used in Elastic/Open SearchClientExtension Used for listing knowledge page hierarchy for a given parent and page type, used in Elastic/Open SearchClientExtension
*/ */
@ -220,6 +233,7 @@ public interface SearchClient {
/* /*
Used for listing knowledge page hierarchy for a given active Page and page type, used in Elastic/Open SearchClientExtension Used for listing knowledge page hierarchy for a given active Page and page type, used in Elastic/Open SearchClientExtension
*/ */
@SuppressWarnings("unused")
default ResultList listPageHierarchyForActivePage( default ResultList listPageHierarchyForActivePage(
String activeFqn, String pageType, int offset, int limit) { String activeFqn, String pageType, int offset, int limit) {
throw new CustomExceptionMessage( throw new CustomExceptionMessage(
@ -232,15 +246,6 @@ public interface SearchClient {
Response.Status.NOT_IMPLEMENTED, NOT_IMPLEMENTED_ERROR_TYPE, NOT_IMPLEMENTED_METHOD); Response.Status.NOT_IMPLEMENTED, NOT_IMPLEMENTED_ERROR_TYPE, NOT_IMPLEMENTED_METHOD);
} }
Map<String, Object> searchLineageInternal(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException;
Response searchByField(String fieldName, String fieldValue, String index) throws IOException; Response searchByField(String fieldName, String fieldValue, String index) throws IOException;
Response aggregate(String index, String fieldName, String value, String query) throws IOException; Response aggregate(String index, String fieldName, String value, String query) throws IOException;
@ -291,7 +296,7 @@ public interface SearchClient {
Pair<String, Map<String, Object>> updates); Pair<String, Map<String, Object>> updates);
void updateLineage( void updateLineage(
String indexName, Pair<String, String> fieldAndValue, Map<String, Object> lineagaData); String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData);
void updateEntityRelationship( void updateEntityRelationship(
String indexName, String indexName,
@ -393,5 +398,51 @@ public interface SearchClient {
&& rbacConditionEvaluator != null; && rbacConditionEvaluator != null;
} }
// default String getLineageDirection(LineageDirection direction, String entityType) {
// if (LineageDirection.UPSTREAM.equals(direction)) {
// if (Boolean.FALSE.equals(entityType.equals(Entity.PIPELINE)) &&
// Boolean.FALSE.equals(entityType.equals(Entity.STORED_PROCEDURE))) {
// return
// }else{
//
// }
// } else {
//
// }
// return "";
// }
SearchHealthStatus getSearchHealthStatus() throws IOException; SearchHealthStatus getSearchHealthStatus() throws IOException;
static RelationshipRef getRelationshipRef(Map<String, Object> entityMap) {
// This assumes these keys exists in the map, use it with caution
return new RelationshipRef()
.withId(UUID.fromString(entityMap.get("id").toString()))
.withType(entityMap.get("entityType").toString())
.withFqn(entityMap.get("fullyQualifiedName").toString())
.withFqnHash(FullyQualifiedName.buildHash(entityMap.get("fullyQualifiedName").toString()));
}
static EsLineageData getEsLineageDataFromUpstreamLineage(
String fqn, List<EsLineageData> upstream) {
for (EsLineageData esLineageData : upstream) {
if (esLineageData.getFromEntity().getFqn().equals(fqn)) {
return esLineageData;
}
}
return null;
}
static EsLineageData copyEsLineageData(EsLineageData data) {
return new EsLineageData()
.withDocId(data.getDocId())
.withFromEntity(data.getFromEntity())
.withToEntity(data.getToEntity())
.withPipeline(data.getPipeline())
.withSqlQuery(data.getSqlQuery())
.withColumns(data.getColumns())
.withDescription(data.getDescription())
.withSource(data.getSource())
.withPipelineEntityType(data.getPipelineEntityType());
}
} }

View File

@ -19,6 +19,7 @@ import org.openmetadata.schema.tests.Datum;
import org.openmetadata.schema.tests.type.DataQualityReportMetadata; import org.openmetadata.schema.tests.type.DataQualityReportMetadata;
import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.TagLabel; import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.Utilities; import org.openmetadata.service.util.Utilities;
public final class SearchIndexUtils { public final class SearchIndexUtils {
@ -58,8 +59,9 @@ public final class SearchIndexUtils {
if (value instanceof Map) { if (value instanceof Map) {
currentMap = (Map<String, Object>) value; currentMap = (Map<String, Object>) value;
} else if (value instanceof List) { } else if (value instanceof List) {
List<Map<String, Object>> list = (List<Map<String, Object>>) value; List<?> list = (List<Map<String, Object>>) value;
for (Map<String, Object> item : list) { for (Object obj : list) {
Map<String, Object> item = JsonUtils.getMap(obj);
removeFieldByPath( removeFieldByPath(
item, item,
Arrays.stream(pathElements, 1, pathElements.length).collect(Collectors.joining("."))); Arrays.stream(pathElements, 1, pathElements.length).collect(Collectors.joining(".")));

View File

@ -60,6 +60,9 @@ import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.EntityTimeSeriesInterface; import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.entity.classification.Tag; import org.openmetadata.schema.entity.classification.Tag;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
@ -958,16 +961,13 @@ public class SearchRepository {
return searchClient.searchBySourceUrl(sourceUrl); return searchClient.searchBySourceUrl(sourceUrl);
} }
public Response searchLineage( public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException {
String fqn, return searchClient.searchLineage(lineageRequest);
int upstreamDepth, }
int downstreamDepth,
String queryFilter, public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest)
boolean deleted,
String entityType)
throws IOException { throws IOException {
return searchClient.searchLineage( return searchClient.searchLineage(lineageRequest);
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
} }
public Response searchEntityRelationship( public Response searchEntityRelationship(
@ -989,16 +989,24 @@ public class SearchRepository {
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); fqn, upstreamDepth, downstreamDepth, queryFilter, deleted);
} }
public Map<String, Object> searchLineageForExport( public SearchLineageResult searchLineageForExport(
String fqn, String fqn,
int upstreamDepth, int upstreamDepth,
int downstreamDepth, int downstreamDepth,
String queryFilter, String queryFilter,
boolean deleted, boolean deleted,
String entityType) String entityType,
LineageDirection direction)
throws IOException { throws IOException {
return searchClient.searchLineageInternal( return searchClient.searchLineage(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); new SearchLineageRequest()
.withFqn(fqn)
.withUpstreamDepth(upstreamDepth)
.withDownstreamDepth(downstreamDepth)
.withQueryFilter(queryFilter)
.withIncludeDeleted(deleted)
.withEntityType(entityType)
.withDirection(direction));
} }
public Response searchByField(String fieldName, String fieldValue, String index) public Response searchByField(String fieldName, String fieldValue, String index)

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.search.elasticsearch;
import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK; import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA; import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.DATA_PRODUCT; import static org.openmetadata.service.Entity.DATA_PRODUCT;
@ -152,6 +153,11 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.api.lineage.RelationshipRef;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList;
@ -842,64 +848,26 @@ public class ElasticSearchClient implements SearchClient {
} }
@Override @Override
public Map<String, Object> searchLineageInternal( public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException {
String fqn, SearchLineageResult result =
int upstreamDepth, getDownStreamLineage(lineageRequest.withDirection(LineageDirection.DOWNSTREAM));
int downstreamDepth, SearchLineageResult upstreamLineage =
String queryFilter, getUpstreamLineage(lineageRequest.withDirection(LineageDirection.UPSTREAM));
boolean deleted,
String entityType) // Add All nodes and edges from upstream lineage to result
throws IOException { result.getNodes().putAll(upstreamLineage.getNodes());
Map<String, Object> responseMap = new HashMap<>(); result.getUpstreamEdges().putAll(upstreamLineage.getUpstreamEdges());
Set<Map<String, Object>> edges = new HashSet<>(); return result;
Set<Map<String, Object>> nodes = new HashSet<>();
if (entityType.equalsIgnoreCase(Entity.PIPELINE)
|| entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) {
return searchPipelineLineage(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, responseMap);
}
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
List<String> sourceFieldsToExcludeCopy = new ArrayList<>(SOURCE_FIELDS_TO_EXCLUDE);
searchSourceBuilder.fetchSource(null, sourceFieldsToExcludeCopy.toArray(String[]::new));
searchSourceBuilder.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
searchRequest.source(searchSourceBuilder.size(1000));
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
Map<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
responseMap.put("entity", tempMap);
}
getLineage(
fqn,
downstreamDepth,
edges,
nodes,
queryFilter,
"lineage.fromEntity.fqnHash.keyword",
deleted);
getLineage(
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqnHash.keyword", deleted);
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return responseMap;
} }
@Override @Override
public Response searchLineage( public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest)
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException { throws IOException {
Map<String, Object> responseMap = if (lineageRequest.getDirection().equals(LineageDirection.UPSTREAM)) {
searchLineageInternal( return getUpstreamLineage(lineageRequest);
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } else {
return Response.status(OK).entity(responseMap).build(); return getDownStreamLineage(lineageRequest);
}
} }
private void getEntityRelationship( private void getEntityRelationship(
@ -1105,54 +1073,155 @@ public class ElasticSearchClient implements SearchClient {
return Response.status(OK).entity(responseMap).build(); return Response.status(OK).entity(responseMap).build();
} }
private void getLineage( @Override
String fqn, public Map<String, Object> searchEntityByKey(
int depth, String indexAlias, String keyName, String keyValue, List<String> fieldsToRemove)
Set<Map<String, Object>> edges,
Set<Map<String, Object>> nodes,
String queryFilter,
String direction,
boolean deleted)
throws IOException { throws IOException {
if (depth <= 0) { es.org.elasticsearch.action.search.SearchRequest searchRequest =
return; getSearchRequest(indexAlias, null, keyName, keyValue, null, null, fieldsToRemove);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
int noOfHits = searchResponse.getHits().getHits().length;
if (noOfHits == 1) {
return new HashMap<>(
JsonUtils.getMap(searchResponse.getHits().getHits()[0].getSourceAsMap()));
} else {
throw new SearchException(
String.format(
"Issue in Search Entity By Key: %s, Value: %s , Number of Hits: %s",
keyName, keyValue, noOfHits));
} }
}
private es.org.elasticsearch.action.search.SearchRequest getSearchRequest(
String indexAlias,
String queryFilter,
String key,
String value,
Boolean deleted,
List<String> fieldsToInclude,
List<String> fieldsToRemove) {
es.org.elasticsearch.action.search.SearchRequest searchRequest = es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest( new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); Entity.getSearchRepository().getIndexOrAliasName(indexAlias));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new)); searchSourceBuilder.fetchSource(
listOrEmpty(fieldsToInclude).toArray(String[]::new),
listOrEmpty(fieldsToRemove).toArray(String[]::new));
searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(key, value)));
if (!CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query( searchSourceBuilder.query(
QueryBuilders.boolQuery() QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))); .must(QueryBuilders.termQuery(key, value))
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))
.must(QueryBuilders.termQuery("deleted", deleted))); .must(QueryBuilders.termQuery("deleted", deleted)));
} }
buildSearchSourceFilter(queryFilter, searchSourceBuilder); buildSearchSourceFilter(queryFilter, searchSourceBuilder);
searchRequest.source(searchSourceBuilder.size(1000)); searchRequest.source(searchSourceBuilder.size(1000));
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); return searchRequest;
for (var hit : searchResponse.getHits().getHits()) {
List<Map<String, Object>> lineage =
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage");
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
nodes.add(tempMap);
for (Map<String, Object> lin : lineage) {
Map<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
Map<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
if (direction.equalsIgnoreCase("lineage.fromEntity.fqnHash.keyword")) {
if (!edges.contains(lin) && fromEntity.get("fqn").equals(fqn)) {
edges.add(lin);
getLineage(
toEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted);
} }
} else {
if (!edges.contains(lin) && toEntity.get("fqn").equals(fqn)) { public SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest)
edges.add(lin); throws IOException {
getLineage( SearchLineageResult result =
fromEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); new SearchLineageResult()
.withNodes(new HashMap<>())
.withUpstreamEdges(new HashMap<>())
.withDownstreamEdges(new HashMap<>());
getUpstreamLineageRecursively(lineageRequest, result);
return result;
}
private void getUpstreamLineageRecursively(
SearchLineageRequest lineageRequest, SearchLineageResult result) throws IOException {
if (lineageRequest.getUpstreamDepth() <= 0) {
return;
}
Map<String, Object> entityMap =
searchEntityByKey(
GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE);
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap);
// Contains Upstream Lineage
if (entityMap.containsKey(UPSTREAM_LINEAGE_FIELD)) {
List<EsLineageData> upStreamEntities =
JsonUtils.readOrConvertValues(
entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class);
for (EsLineageData esLineageData : upStreamEntities) {
result
.getUpstreamEdges()
.putIfAbsent(
esLineageData.getDocId(),
esLineageData.withToEntity(SearchClient.getRelationshipRef(entityMap)));
String upstreamEntityId = esLineageData.getFromEntity().getId().toString();
if (!result.getNodes().containsKey(upstreamEntityId)) {
SearchLineageRequest updatedRequest =
JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class)
.withUpstreamDepth(lineageRequest.getUpstreamDepth() - 1)
.withFqn(esLineageData.getFromEntity().getFqn());
getUpstreamLineageRecursively(updatedRequest, result);
}
}
}
}
}
public SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest)
throws IOException {
SearchLineageResult result =
new SearchLineageResult()
.withNodes(new HashMap<>())
.withUpstreamEdges(new HashMap<>())
.withDownstreamEdges(new HashMap<>());
Map<String, Object> entityMap =
searchEntityByKey(
GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE);
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap);
getDownStreamRecursively(SearchClient.getRelationshipRef(entityMap), lineageRequest, result);
}
return result;
}
private void getDownStreamRecursively(
RelationshipRef fromEntity, SearchLineageRequest lineageRequest, SearchLineageResult result)
throws IOException {
if (lineageRequest.getDownstreamDepth() <= 0) {
return;
}
es.org.elasticsearch.action.search.SearchRequest searchDownstreamEntities =
getSearchRequest(
GLOBAL_SEARCH_ALIAS,
lineageRequest.getQueryFilter(),
"upstreamLineage.fromEntity.fqnHash",
FullyQualifiedName.buildHash(lineageRequest.getFqn()),
lineageRequest.getIncludeDeleted(),
null,
SOURCE_FIELDS_TO_EXCLUDE);
SearchResponse downstreamEntities =
client.search(searchDownstreamEntities, RequestOptions.DEFAULT);
for (SearchHit searchHit : downstreamEntities.getHits().getHits()) {
Map<String, Object> entityMap = new HashMap<>(searchHit.getSourceAsMap());
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap);
List<EsLineageData> upStreamEntities =
JsonUtils.readOrConvertValues(
entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class);
EsLineageData esLineageData =
SearchClient.getEsLineageDataFromUpstreamLineage(fromEntity.getFqn(), upStreamEntities);
if (esLineageData != null) {
RelationshipRef toEntity = SearchClient.getRelationshipRef(entityMap);
result
.getDownstreamEdges()
.putIfAbsent(esLineageData.getDocId(), esLineageData.withToEntity(toEntity));
String downStreamEntityId = toEntity.getId().toString();
if (!result.getNodes().containsKey(downStreamEntityId)) {
SearchLineageRequest updatedRequest =
JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class)
.withDownstreamDepth(lineageRequest.getDownstreamDepth() - 1)
.withFqn(toEntity.getFqn());
getDownStreamRecursively(toEntity, updatedRequest, result);
} }
} }
} }
@ -1296,112 +1365,6 @@ public class ElasticSearchClient implements SearchClient {
return client.search(searchRequest, RequestOptions.DEFAULT); return client.search(searchRequest, RequestOptions.DEFAULT);
} }
private Map<String, Object> searchPipelineLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
Map<String, Object> responseMap)
throws IOException {
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
Object[] searchAfter = null;
long processedRecords = 0;
long totalRecords = -1;
while (totalRecords != processedRecords) {
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
es.org.elasticsearch.index.query.BoolQueryBuilder boolQueryBuilder =
QueryBuilders.boolQuery();
boolQueryBuilder.should(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn)));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new));
FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName");
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(boolQueryBuilder);
if (searchAfter != null) {
searchSourceBuilder.searchAfter(searchAfter);
}
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(boolQueryBuilder)
.must(QueryBuilders.termQuery("deleted", deleted)));
}
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
List<Map<String, Object>> lineage =
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage");
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
nodes.add(tempMap);
for (Map<String, Object> lin : lineage) {
HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
HashMap<String, String> pipeline = (HashMap<String, String>) lin.get("pipeline");
if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) {
edges.add(lin);
getLineage(
fromEntity.get("fqn"),
upstreamDepth,
edges,
nodes,
queryFilter,
"lineage.toEntity.fqn.keyword",
deleted);
getLineage(
toEntity.get("fqn"),
downstreamDepth,
edges,
nodes,
queryFilter,
"lineage.fromEntity.fqn.keyword",
deleted);
}
}
}
totalRecords = searchResponse.getHits().getTotalHits().value;
int currentHits = searchResponse.getHits().getHits().length;
processedRecords += currentHits;
if (currentHits > 0) {
searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues();
} else {
searchAfter = null;
}
}
getLineage(
fqn, downstreamDepth, edges, nodes, queryFilter, "lineage.fromEntity.fqn.keyword", deleted);
getLineage(
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted);
// TODO: Fix this , this is hack
if (edges.isEmpty()) {
es.org.elasticsearch.action.search.SearchRequest searchRequestForEntity =
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilderForEntity = new SearchSourceBuilder();
searchSourceBuilderForEntity.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
searchRequestForEntity.source(searchSourceBuilderForEntity.size(1000));
SearchResponse searchResponseForEntity =
client.search(searchRequestForEntity, RequestOptions.DEFAULT);
for (var hit : searchResponseForEntity.getHits().getHits()) {
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
tempMap.keySet().removeAll(FIELDS_TO_REMOVE);
responseMap.put("entity", tempMap);
}
}
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return responseMap;
}
@Override @Override
public Response searchBySourceUrl(String sourceUrl) throws IOException { public Response searchBySourceUrl(String sourceUrl) throws IOException {
es.org.elasticsearch.action.search.SearchRequest searchRequest = es.org.elasticsearch.action.search.SearchRequest searchRequest =
@ -2319,13 +2282,14 @@ public class ElasticSearchClient implements SearchClient {
@Override @Override
public void updateLineage( public void updateLineage(
String indexName, Pair<String, String> fieldAndValue, Map<String, Object> lineageData) { String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData) {
if (isClientAvailable) { if (isClientAvailable) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName);
updateByQueryRequest.setQuery( updateByQueryRequest.setQuery(
new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue()) new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue())
.operator(Operator.AND)); .operator(Operator.AND));
Map<String, Object> params = Collections.singletonMap("lineageData", lineageData); Map<String, Object> params =
Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData));
Script script = Script script =
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params); new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params);
updateByQueryRequest.setScript(script); updateByQueryRequest.setScript(script);

View File

@ -95,7 +95,7 @@ public class APIEndpointIndex implements SearchIndex {
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll); .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
Map<String, Object> commonAttributes = getCommonAttributesMap(apiEndpoint, Entity.API_ENDPOINT); Map<String, Object> commonAttributes = getCommonAttributesMap(apiEndpoint, Entity.API_ENDPOINT);
doc.putAll(commonAttributes); doc.putAll(commonAttributes);
doc.put("lineage", SearchIndex.getLineageData(apiEndpoint.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(apiEndpoint.getEntityReference()));
doc.put( doc.put(
"requestSchema", "requestSchema",
apiEndpoint.getRequestSchema() != null ? apiEndpoint.getRequestSchema() : null); apiEndpoint.getRequestSchema() != null ? apiEndpoint.getRequestSchema() : null);

View File

@ -63,7 +63,7 @@ public record ContainerIndex(Container container) implements ColumnIndex {
doc.put("column_suggest", columnSuggest); doc.put("column_suggest", columnSuggest);
doc.put("serviceType", container.getServiceType()); doc.put("serviceType", container.getServiceType());
doc.put("fullPath", container.getFullPath()); doc.put("fullPath", container.getFullPath());
doc.put("lineage", SearchIndex.getLineageData(container.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(container.getEntityReference()));
doc.put("service", getEntityWithDisplayName(container.getService())); doc.put("service", getEntityWithDisplayName(container.getService()));
return doc; return doc;
} }

View File

@ -64,7 +64,7 @@ public record DashboardDataModelIndex(DashboardDataModel dashboardDataModel)
doc.put("column_suggest", columnSuggest); doc.put("column_suggest", columnSuggest);
doc.put("tier", parseTags.getTierTag()); doc.put("tier", parseTags.getTierTag());
doc.put("service", getEntityWithDisplayName(dashboardDataModel.getService())); doc.put("service", getEntityWithDisplayName(dashboardDataModel.getService()));
doc.put("lineage", SearchIndex.getLineageData(dashboardDataModel.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(dashboardDataModel.getEntityReference()));
doc.put("domain", getEntityWithDisplayName(dashboardDataModel.getDomain())); doc.put("domain", getEntityWithDisplayName(dashboardDataModel.getDomain()));
return doc; return doc;
} }

View File

@ -54,7 +54,7 @@ public class DashboardIndex implements SearchIndex {
doc.put("data_model_suggest", dataModelSuggest); doc.put("data_model_suggest", dataModelSuggest);
doc.put("service_suggest", serviceSuggest); doc.put("service_suggest", serviceSuggest);
doc.put("serviceType", dashboard.getServiceType()); doc.put("serviceType", dashboard.getServiceType());
doc.put("lineage", SearchIndex.getLineageData(dashboard.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(dashboard.getEntityReference()));
doc.put("service", getEntityWithDisplayName(dashboard.getService())); doc.put("service", getEntityWithDisplayName(dashboard.getService()));
return doc; return doc;
} }

View File

@ -32,7 +32,7 @@ public class MetricIndex implements SearchIndex {
public Map<String, Object> buildSearchIndexDocInternal(Map<String, Object> doc) { public Map<String, Object> buildSearchIndexDocInternal(Map<String, Object> doc) {
Map<String, Object> commonAttributes = getCommonAttributesMap(metric, Entity.METRIC); Map<String, Object> commonAttributes = getCommonAttributesMap(metric, Entity.METRIC);
doc.putAll(commonAttributes); doc.putAll(commonAttributes);
doc.put("lineage", SearchIndex.getLineageData(metric.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(metric.getEntityReference()));
return doc; return doc;
} }
} }

View File

@ -35,7 +35,7 @@ public class MlModelIndex implements SearchIndex {
doc.put("tags", parseTags.getTags()); doc.put("tags", parseTags.getTags());
doc.put("tier", parseTags.getTierTag()); doc.put("tier", parseTags.getTierTag());
doc.put("serviceType", mlModel.getServiceType()); doc.put("serviceType", mlModel.getServiceType());
doc.put("lineage", SearchIndex.getLineageData(mlModel.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(mlModel.getEntityReference()));
doc.put("service", getEntityWithDisplayName(mlModel.getService())); doc.put("service", getEntityWithDisplayName(mlModel.getService()));
return doc; return doc;
} }

View File

@ -51,7 +51,7 @@ public class PipelineIndex implements SearchIndex {
doc.put("task_suggest", taskSuggest); doc.put("task_suggest", taskSuggest);
doc.put("service_suggest", serviceSuggest); doc.put("service_suggest", serviceSuggest);
doc.put("serviceType", pipeline.getServiceType()); doc.put("serviceType", pipeline.getServiceType());
doc.put("lineage", SearchIndex.getLineageData(pipeline.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(pipeline.getEntityReference()));
doc.put("service", getEntityWithDisplayName(pipeline.getService())); doc.put("service", getEntityWithDisplayName(pipeline.getService()));
return doc; return doc;
} }

View File

@ -32,7 +32,7 @@ public record SearchEntityIndex(org.openmetadata.schema.entity.data.SearchIndex
doc.put("tier", parseTags.getTierTag()); doc.put("tier", parseTags.getTierTag());
doc.put("service", getEntityWithDisplayName(searchIndex.getService())); doc.put("service", getEntityWithDisplayName(searchIndex.getService()));
doc.put("indexType", searchIndex.getIndexType()); doc.put("indexType", searchIndex.getIndexType());
doc.put("lineage", SearchIndex.getLineageData(searchIndex.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(searchIndex.getEntityReference()));
return doc; return doc;
} }

View File

@ -6,7 +6,7 @@ import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.FIELD_DESCRIPTION; import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME; import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME;
import static org.openmetadata.service.Entity.getEntityByName; import static org.openmetadata.service.Entity.getEntityByName;
import static org.openmetadata.service.jdbi3.LineageRepository.buildRelationshipDetailsMap; import static org.openmetadata.service.jdbi3.LineageRepository.buildEntityLineageData;
import static org.openmetadata.service.search.EntityBuilderConstant.DISPLAY_NAME_KEYWORD; import static org.openmetadata.service.search.EntityBuilderConstant.DISPLAY_NAME_KEYWORD;
import static org.openmetadata.service.search.EntityBuilderConstant.FIELD_DISPLAY_NAME_NGRAM; import static org.openmetadata.service.search.EntityBuilderConstant.FIELD_DISPLAY_NAME_NGRAM;
import static org.openmetadata.service.search.EntityBuilderConstant.FULLY_QUALIFIED_NAME; import static org.openmetadata.service.search.EntityBuilderConstant.FULLY_QUALIFIED_NAME;
@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Include;
@ -42,7 +43,7 @@ import org.openmetadata.service.util.JsonUtils;
public interface SearchIndex { public interface SearchIndex {
Set<String> DEFAULT_EXCLUDED_FIELDS = Set<String> DEFAULT_EXCLUDED_FIELDS =
Set.of("changeDescription", "lineage.pipeline.changeDescription", "connection"); Set.of("changeDescription", "upstreamLineage.pipeline.changeDescription", "connection");
public static final SearchClient searchClient = Entity.getSearchRepository().getSearchClient(); public static final SearchClient searchClient = Entity.getSearchRepository().getSearchClient();
default Map<String, Object> buildSearchIndexDoc() { default Map<String, Object> buildSearchIndexDoc() {
@ -149,31 +150,25 @@ public interface SearchIndex {
return nullOrEmpty(entity.getDescription()) ? "INCOMPLETE" : "COMPLETE"; return nullOrEmpty(entity.getDescription()) ? "INCOMPLETE" : "COMPLETE";
} }
static List<Map<String, Object>> getLineageData(EntityReference entity) { static List<EsLineageData> getLineageData(EntityReference entity) {
List<Map<String, Object>> data = new ArrayList<>(); return new ArrayList<>(
CollectionDAO dao = Entity.getCollectionDAO(); getLineageDataFromRefs(
List<CollectionDAO.EntityRelationshipRecord> toRelationshipsRecords = entity,
dao.relationshipDAO() Entity.getCollectionDAO()
.findTo(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal()); .relationshipDAO()
for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : toRelationshipsRecords) { .findFrom(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal())));
EntityReference ref =
Entity.getEntityReferenceById(
entityRelationshipRecord.getType(), entityRelationshipRecord.getId(), Include.ALL);
LineageDetails lineageDetails =
JsonUtils.readValue(entityRelationshipRecord.getJson(), LineageDetails.class);
data.add(buildRelationshipDetailsMap(entity, ref, lineageDetails));
} }
List<CollectionDAO.EntityRelationshipRecord> fromRelationshipsRecords =
dao.relationshipDAO() static List<EsLineageData> getLineageDataFromRefs(
.findFrom(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal()); EntityReference entity, List<CollectionDAO.EntityRelationshipRecord> records) {
for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : List<EsLineageData> data = new ArrayList<>();
fromRelationshipsRecords) { for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : records) {
EntityReference ref = EntityReference ref =
Entity.getEntityReferenceById( Entity.getEntityReferenceById(
entityRelationshipRecord.getType(), entityRelationshipRecord.getId(), Include.ALL); entityRelationshipRecord.getType(), entityRelationshipRecord.getId(), Include.ALL);
LineageDetails lineageDetails = LineageDetails lineageDetails =
JsonUtils.readValue(entityRelationshipRecord.getJson(), LineageDetails.class); JsonUtils.readValue(entityRelationshipRecord.getJson(), LineageDetails.class);
data.add(buildRelationshipDetailsMap(ref, entity, lineageDetails)); data.add(buildEntityLineageData(ref, entity, lineageDetails));
} }
return data; return data;
} }

View File

@ -30,7 +30,7 @@ public record StoredProcedureIndex(StoredProcedure storedProcedure) implements S
ParseTags parseTags = ParseTags parseTags =
new ParseTags(Entity.getEntityTags(Entity.STORED_PROCEDURE, storedProcedure)); new ParseTags(Entity.getEntityTags(Entity.STORED_PROCEDURE, storedProcedure));
doc.put("tags", parseTags.getTags()); doc.put("tags", parseTags.getTags());
doc.put("lineage", SearchIndex.getLineageData(storedProcedure.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(storedProcedure.getEntityReference()));
doc.put("tier", parseTags.getTierTag()); doc.put("tier", parseTags.getTierTag());
doc.put("service", getEntityWithDisplayName(storedProcedure.getService())); doc.put("service", getEntityWithDisplayName(storedProcedure.getService()));
return doc; return doc;

View File

@ -103,7 +103,7 @@ public record TableIndex(Table table) implements ColumnIndex {
doc.put("schemaDefinition", table.getSchemaDefinition()); doc.put("schemaDefinition", table.getSchemaDefinition());
doc.put("service", getEntityWithDisplayName(table.getService())); doc.put("service", getEntityWithDisplayName(table.getService()));
doc.put("database", getEntityWithDisplayName(table.getDatabase())); doc.put("database", getEntityWithDisplayName(table.getDatabase()));
doc.put("lineage", SearchIndex.getLineageData(table.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(table.getEntityReference()));
doc.put("entityRelationship", SearchIndex.populateEntityRelationshipData(table)); doc.put("entityRelationship", SearchIndex.populateEntityRelationshipData(table));
doc.put("databaseSchema", getEntityWithDisplayName(table.getDatabaseSchema())); doc.put("databaseSchema", getEntityWithDisplayName(table.getDatabaseSchema()));
return doc; return doc;

View File

@ -81,7 +81,7 @@ public class TopicIndex implements SearchIndex {
doc.put("field_suggest", fieldSuggest); doc.put("field_suggest", fieldSuggest);
doc.put("service_suggest", serviceSuggest); doc.put("service_suggest", serviceSuggest);
doc.put("serviceType", topic.getServiceType()); doc.put("serviceType", topic.getServiceType());
doc.put("lineage", SearchIndex.getLineageData(topic.getEntityReference())); doc.put("upstreamLineage", SearchIndex.getLineageData(topic.getEntityReference()));
doc.put("messageSchema", topic.getMessageSchema() != null ? topic.getMessageSchema() : null); doc.put("messageSchema", topic.getMessageSchema() != null ? topic.getMessageSchema() : null);
doc.put("service", getEntityWithDisplayName(topic.getService())); doc.put("service", getEntityWithDisplayName(topic.getService()));
return doc; return doc;

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.search.opensearch;
import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK; import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA; import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.DATA_PRODUCT; import static org.openmetadata.service.Entity.DATA_PRODUCT;
@ -67,6 +68,11 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.api.lineage.RelationshipRef;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.dataInsight.DataInsightChartResult; import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList;
@ -839,64 +845,26 @@ public class OpenSearchClient implements SearchClient {
return Response.status(OK).entity(response).build(); return Response.status(OK).entity(response).build();
} }
public Map<String, Object> searchLineageInternal( @Override
String fqn, public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException {
int upstreamDepth, SearchLineageResult result =
int downstreamDepth, getDownStreamLineage(lineageRequest.withDirection(LineageDirection.DOWNSTREAM));
String queryFilter, SearchLineageResult upstreamLineage =
boolean deleted, getDownStreamLineage(lineageRequest.withDirection(LineageDirection.UPSTREAM));
String entityType)
throws IOException { // Add All nodes and edges from upstream lineage to result
if (entityType.equalsIgnoreCase(Entity.PIPELINE) result.getNodes().putAll(upstreamLineage.getNodes());
|| entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) { result.getUpstreamEdges().putAll(upstreamLineage.getUpstreamEdges());
return searchPipelineLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); return result;
}
Map<String, Object> responseMap = new HashMap<>();
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
List<String> sourceFieldsToExcludeCopy = new ArrayList<>(SOURCE_FIELDS_TO_EXCLUDE);
searchSourceBuilder.fetchSource(null, sourceFieldsToExcludeCopy.toArray(String[]::new));
searchSourceBuilder.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
searchRequest.source(searchSourceBuilder.size(1000));
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
tempMap.keySet().removeAll(FIELDS_TO_REMOVE);
responseMap.put("entity", tempMap);
}
getLineage(
fqn,
downstreamDepth,
edges,
nodes,
queryFilter,
"lineage.fromEntity.fqnHash.keyword",
deleted);
getLineage(
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqnHash.keyword", deleted);
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return responseMap;
} }
@Override public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest)
public Response searchLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException { throws IOException {
Map<String, Object> responseMap = if (lineageRequest.getDirection().equals(LineageDirection.UPSTREAM)) {
searchLineageInternal( return getUpstreamLineage(lineageRequest);
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } else {
return Response.status(OK).entity(responseMap).build(); return getDownStreamLineage(lineageRequest);
}
} }
private void getEntityRelationship( private void getEntityRelationship(
@ -1100,56 +1068,148 @@ public class OpenSearchClient implements SearchClient {
return Response.status(OK).entity(responseMap).build(); return Response.status(OK).entity(responseMap).build();
} }
private void getLineage( @Override
String fqn, public Map<String, Object> searchEntityByKey(
int depth, String indexAlias, String keyName, String keyValue, List<String> fieldsToRemove)
Set<Map<String, Object>> edges,
Set<Map<String, Object>> nodes,
String queryFilter,
String direction,
boolean deleted)
throws IOException { throws IOException {
if (depth <= 0) {
return;
}
os.org.opensearch.action.search.SearchRequest searchRequest = os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest( getSearchRequest(indexAlias, null, keyName, keyValue, null, null, fieldsToRemove);
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new));
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))));
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))
.must(QueryBuilders.termQuery("deleted", deleted)));
}
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
searchRequest.source(searchSourceBuilder.size(1000));
os.org.opensearch.action.search.SearchResponse searchResponse = os.org.opensearch.action.search.SearchResponse searchResponse =
client.search(searchRequest, RequestOptions.DEFAULT); client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) { int noOfHits = searchResponse.getHits().getHits().length;
List<Map<String, Object>> lineage = if (noOfHits == 1) {
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage"); return new HashMap<>(
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); JsonUtils.getMap(searchResponse.getHits().getHits()[0].getSourceAsMap()));
nodes.add(tempMap);
for (Map<String, Object> lin : lineage) {
HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
if (direction.equalsIgnoreCase("lineage.fromEntity.fqnHash.keyword")) {
if (!edges.contains(lin) && fromEntity.get("fqn").equals(fqn)) {
edges.add(lin);
getLineage(
toEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted);
}
} else { } else {
if (!edges.contains(lin) && toEntity.get("fqn").equals(fqn)) { throw new SearchException(
edges.add(lin); String.format(
getLineage( "Issue in Search Entity By Key: %s, Value: %s , Number of Hits: %s",
fromEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); keyName, keyValue, noOfHits));
}
}
private os.org.opensearch.action.search.SearchRequest getSearchRequest(
String indexAlias,
String queryFilter,
String key,
String value,
Boolean deleted,
List<String> fieldsToInclude,
List<String> fieldsToRemove) {
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(indexAlias));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(
listOrEmpty(fieldsToInclude).toArray(String[]::new),
listOrEmpty(fieldsToRemove).toArray(String[]::new));
searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(key, value)));
if (!CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(key, value))
.must(QueryBuilders.termQuery("deleted", deleted)));
}
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
searchRequest.source(searchSourceBuilder.size(1000));
return searchRequest;
}
public SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest)
throws IOException {
SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>());
getUpstreamLineageRecursively(lineageRequest, result);
return result;
}
private void getUpstreamLineageRecursively(
SearchLineageRequest lineageRequest, SearchLineageResult result) throws IOException {
if (lineageRequest.getUpstreamDepth() <= 0) {
return;
}
Map<String, Object> entityMap =
searchEntityByKey(
GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE);
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap);
// Contains Upstream Lineage
if (entityMap.containsKey(UPSTREAM_LINEAGE_FIELD)) {
List<EsLineageData> upStreamEntities =
JsonUtils.readOrConvertValues(
entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class);
for (EsLineageData esLineageData : upStreamEntities) {
result
.getUpstreamEdges()
.putIfAbsent(
esLineageData.getDocId(),
esLineageData.withToEntity(SearchClient.getRelationshipRef(entityMap)));
String upstreamEntityId = esLineageData.getFromEntity().getId().toString();
if (!result.getNodes().containsKey(upstreamEntityId)) {
SearchLineageRequest updatedRequest =
JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class)
.withUpstreamDepth(lineageRequest.getUpstreamDepth() - 1)
.withFqn(esLineageData.getFromEntity().getFqn());
getUpstreamLineageRecursively(updatedRequest, result);
}
}
}
}
}
public SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest)
throws IOException {
SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>());
Map<String, Object> entityMap =
searchEntityByKey(
GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE);
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap);
getDownStreamRecursively(SearchClient.getRelationshipRef(entityMap), lineageRequest, result);
}
return result;
}
private void getDownStreamRecursively(
RelationshipRef fromEntity, SearchLineageRequest lineageRequest, SearchLineageResult result)
throws IOException {
if (lineageRequest.getDownstreamDepth() <= 0) {
return;
}
os.org.opensearch.action.search.SearchRequest searchDownstreamEntities =
getSearchRequest(
GLOBAL_SEARCH_ALIAS,
lineageRequest.getQueryFilter(),
"upstreamLineage.fromEntity.fqnHash",
FullyQualifiedName.buildHash(lineageRequest.getFqn()),
lineageRequest.getIncludeDeleted(),
null,
SOURCE_FIELDS_TO_EXCLUDE);
os.org.opensearch.action.search.SearchResponse downstreamEntities =
client.search(searchDownstreamEntities, RequestOptions.DEFAULT);
for (os.org.opensearch.search.SearchHit searchHit : downstreamEntities.getHits().getHits()) {
Map<String, Object> entityMap = new HashMap<>(searchHit.getSourceAsMap());
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap);
List<EsLineageData> upStreamEntities =
JsonUtils.readOrConvertValues(
entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class);
EsLineageData esLineageData =
SearchClient.getEsLineageDataFromUpstreamLineage(fromEntity.getFqn(), upStreamEntities);
if (esLineageData != null) {
RelationshipRef toEntity = SearchClient.getRelationshipRef(entityMap);
result
.getDownstreamEdges()
.putIfAbsent(esLineageData.getDocId(), esLineageData.withToEntity(toEntity));
String downStreamEntityId = toEntity.getId().toString();
if (!result.getNodes().containsKey(downStreamEntityId)) {
SearchLineageRequest updatedRequest =
JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class)
.withDownstreamDepth(lineageRequest.getDownstreamDepth() - 1)
.withFqn(toEntity.getFqn());
getDownStreamRecursively(toEntity, updatedRequest, result);
} }
} }
} }
@ -1293,107 +1353,6 @@ public class OpenSearchClient implements SearchClient {
return client.search(searchRequest, RequestOptions.DEFAULT); return client.search(searchRequest, RequestOptions.DEFAULT);
} }
private Map<String, Object> searchPipelineLineage(
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException {
Map<String, Object> responseMap = new HashMap<>();
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
responseMap.put("entity", null);
Object[] searchAfter = null;
long processedRecords = 0;
long totalRecords = -1;
while (totalRecords != processedRecords) {
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.should(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn)));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new));
FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName");
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(boolQueryBuilder);
if (searchAfter != null) {
searchSourceBuilder.searchAfter(searchAfter);
}
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(boolQueryBuilder)
.must(QueryBuilders.termQuery("deleted", deleted)));
}
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
List<Map<String, Object>> lineage =
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage");
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
nodes.add(tempMap);
for (Map<String, Object> lin : lineage) {
HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
HashMap<String, String> pipeline = (HashMap<String, String>) lin.get("pipeline");
if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) {
edges.add(lin);
getLineage(
fromEntity.get("fqn"),
upstreamDepth,
edges,
nodes,
queryFilter,
"lineage.toEntity.fqn.keyword",
deleted);
getLineage(
toEntity.get("fqn"),
downstreamDepth,
edges,
nodes,
queryFilter,
"lineage.fromEntity.fqn.keyword",
deleted);
}
}
}
totalRecords = searchResponse.getHits().getTotalHits().value;
int currentHits = searchResponse.getHits().getHits().length;
processedRecords += currentHits;
if (currentHits > 0) {
searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues();
} else {
searchAfter = null;
}
}
getLineage(
fqn, downstreamDepth, edges, nodes, queryFilter, "lineage.fromEntity.fqn.keyword", deleted);
getLineage(
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted);
if (edges.isEmpty()) {
os.org.opensearch.action.search.SearchRequest searchRequestForEntity =
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilderForEntity = new SearchSourceBuilder();
searchSourceBuilderForEntity.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
searchRequestForEntity.source(searchSourceBuilderForEntity.size(1000));
SearchResponse searchResponseForEntity =
client.search(searchRequestForEntity, RequestOptions.DEFAULT);
for (var hit : searchResponseForEntity.getHits().getHits()) {
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
tempMap.keySet().removeAll(FIELDS_TO_REMOVE);
responseMap.put("entity", tempMap);
}
}
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return responseMap;
}
private static FunctionScoreQueryBuilder boostScore(QueryStringQueryBuilder queryBuilder) { private static FunctionScoreQueryBuilder boostScore(QueryStringQueryBuilder queryBuilder) {
FunctionScoreQueryBuilder.FilterFunctionBuilder tier1Boost = FunctionScoreQueryBuilder.FilterFunctionBuilder tier1Boost =
new FunctionScoreQueryBuilder.FilterFunctionBuilder( new FunctionScoreQueryBuilder.FilterFunctionBuilder(
@ -2271,13 +2230,14 @@ public class OpenSearchClient implements SearchClient {
@Override @Override
public void updateLineage( public void updateLineage(
String indexName, Pair<String, String> fieldAndValue, Map<String, Object> lineagaData) { String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData) {
if (isClientAvailable) { if (isClientAvailable) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName);
updateByQueryRequest.setQuery( updateByQueryRequest.setQuery(
new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue()) new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue())
.operator(Operator.AND)); .operator(Operator.AND));
Map<String, Object> params = Collections.singletonMap("lineageData", lineagaData); Map<String, Object> params =
Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData));
Script script = Script script =
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params); new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params);
updateByQueryRequest.setScript(script); updateByQueryRequest.setScript(script);

View File

@ -0,0 +1,73 @@
{
"$id": "https://open-metadata.org/schema/api/lineage/esLineageData.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "EsLineageData",
"description": "Response object for the search lineage request from Elastic Search.",
"javaType": "org.openmetadata.schema.api.lineage.EsLineageData",
"type": "object",
"definitions": {
"relationshipRef": {
"description": "Relationship Reference to an Entity.",
"type": "object",
"properties": {
"id": {
"description": "Unique identifier of this entity instance.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"fqn": {
"description": "FullyQualifiedName of the entity.",
"$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
},
"fqnHash": {
"description": "FullyQualifiedName Hash of the entity.",
"type": "string"
},
"type": {
"description": "Type of the entity.",
"type": "string"
}
}
}
},
"properties": {
"fromEntity": {
"description": "From Entity.",
"$ref": "#/definitions/relationshipRef"
},
"toEntity": {
"description": "To Entity.",
"$ref": "#/definitions/relationshipRef"
},
"pipeline": {
"description": "Pipeline in case pipeline is present between entities."
},
"sqlQuery": {
"description": "Sql Query associated.",
"type": "string"
},
"columns": {
"description": "Columns associated.",
"type": "array",
"items": {
"$ref": "../../type/entityLineage.json#/definitions/columnLineage"
}
},
"description": {
"description": "Description.",
"type": "string"
},
"source": {
"description": "Source of the Lineage.",
"type": "string"
},
"doc_id": {
"description": "Doc Id for the Lineage.",
"type": "string"
},
"pipelineEntityType": {
"description": "Pipeline Entity or Stored procedure.",
"type": "string"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,9 @@
{
"$id": "https://open-metadata.org/schema/api/lineage/lineageDirection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "LineageDirection",
"description": "Lineage Direction Schema.",
"javaType": "org.openmetadata.schema.api.lineage.LineageDirection",
"type": "string",
"enum": ["Upstream", "Downstream"]
}

View File

@ -0,0 +1,46 @@
{
"$id": "https://open-metadata.org/schema/api/lineage/searchLineageRequest.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SearchLineageRequest",
"description": "Search Lineage Request Schema to find linage from Elastic Search.",
"javaType": "org.openmetadata.schema.api.lineage.SearchLineageRequest",
"type": "object",
"properties": {
"fqn": {
"description": "Entity Fqn to search lineage",
"type": "string"
},
"entityType": {
"description": "Query Filter",
"type": "string"
},
"direction": {
"$ref": "./lineageDirection.json"
},
"directionValue": {
"description": "Lineage Direction Value.",
"type": "string"
},
"upstreamDepth": {
"description": "The upstream depth of the lineage",
"type": "integer",
"default": 3
},
"downstreamDepth": {
"description": "The downstream depth of the lineage",
"type": "integer",
"default": 3
},
"queryFilter": {
"description": "Query Filter",
"type": "string"
},
"includeDeleted": {
"description": "Include deleted entities",
"type": "boolean",
"default": false
}
},
"required": ["fqn", "entityType", "direction"],
"additionalProperties": false
}

View File

@ -0,0 +1,35 @@
{
"$id": "https://open-metadata.org/schema/api/lineage/searchLineageResult.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SearchLineageResult",
"description": "Search Lineage Response for the Lineage Request",
"javaType": "org.openmetadata.schema.api.lineage.SearchLineageResult",
"type": "object",
"definitions": {
"entity" : {
"description": "A general object to hold any object."
}
},
"properties": {
"paging": {
"$ref": "../../type/paging.json"
},
"direction": {
"description": "Lineage Direction Value.",
"type": "string"
},
"nodes" : {
"description": "Nodes in the lineage response.",
"existingJavaType": "java.util.Map<java.lang.String, java.lang.Object>"
},
"upstreamEdges": {
"description": "Upstream Edges for the entity.",
"existingJavaType": "java.util.Map<java.lang.String, org.openmetadata.schema.api.lineage.EsLineageData>"
},
"downstreamEdges": {
"description": "Downstream Edges for the node.",
"existingJavaType": "java.util.Map<java.lang.String, org.openmetadata.schema.api.lineage.EsLineageData>"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,99 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Response object for the search lineage request from Elastic Search.
*/
export interface EsLineageData {
/**
* Columns associated.
*/
columns?: ColumnLineage[];
/**
* Description.
*/
description?: string;
/**
* Doc Id for the Lineage.
*/
doc_id?: string;
/**
* From Entity.
*/
fromEntity?: RelationshipRef;
/**
* Pipeline in case pipeline is present between entities.
*/
pipeline?: any;
/**
* Pipeline Entity or Stored procedure.
*/
pipelineEntityType?: string;
/**
* Source of the Lineage.
*/
source?: string;
/**
* Sql Query associated.
*/
sqlQuery?: string;
/**
* To Entity.
*/
toEntity?: RelationshipRef;
}
export interface ColumnLineage {
/**
* One or more source columns identified by fully qualified column name used by
* transformation function to create destination column.
*/
fromColumns?: string[];
/**
* Transformation function applied to source columns to create destination column. That is
* `function(fromColumns) -> toColumn`.
*/
function?: string;
/**
* Destination column identified by fully qualified column name created by the
* transformation of source columns.
*/
toColumn?: string;
[property: string]: any;
}
/**
* From Entity.
*
* Relationship Reference to an Entity.
*
* To Entity.
*/
export interface RelationshipRef {
/**
* FullyQualifiedName of the entity.
*/
fqn?: string;
/**
* FullyQualifiedName Hash of the entity.
*/
fqnHash?: string;
/**
* Unique identifier of this entity instance.
*/
id?: string;
/**
* Type of the entity.
*/
type?: string;
[property: string]: any;
}

View File

@ -0,0 +1,19 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Lineage Direction Schema.
*/
export enum LineageDirection {
Downstream = "Downstream",
Upstream = "Upstream",
}

View File

@ -0,0 +1,54 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Search Lineage Request Schema to find linage from Elastic Search.
*/
export interface SearchLineageRequest {
direction: LineageDirection;
/**
* Lineage Direction Value.
*/
directionValue?: string;
/**
* The downstream depth of the lineage
*/
downstreamDepth?: number;
/**
* Query Filter
*/
entityType: string;
/**
* Entity Fqn to search lineage
*/
fqn: string;
/**
* Include deleted entities
*/
includeDeleted?: boolean;
/**
* Query Filter
*/
queryFilter?: string;
/**
* The upstream depth of the lineage
*/
upstreamDepth?: number;
}
/**
* Lineage Direction Schema.
*/
export enum LineageDirection {
Downstream = "Downstream",
Upstream = "Upstream",
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Search Lineage Response for the Lineage Request
*/
export interface SearchLineageResult {
/**
* Lineage Direction Value.
*/
direction?: string;
/**
* Downstream Edges for the node.
*/
downstreamEdges?: any;
/**
* Nodes in the lineage response.
*/
nodes?: any;
paging?: Paging;
/**
* Upstream Edges for the entity.
*/
upstreamEdges?: any;
}
/**
* Type used for cursor based pagination information in GET list responses.
*/
export interface Paging {
/**
* After cursor used for getting the next page (see API pagination for details).
*/
after?: string;
/**
* Before cursor used for getting the previous page (see API pagination for details).
*/
before?: string;
/**
* Limit used in case of offset based pagination.
*/
limit?: number;
/**
* Offset used in case of offset based pagination.
*/
offset?: number;
/**
* Total number of entries available to page through.
*/
total: number;
}