Revert "- Update lineage model and api"

This reverts commit c5cce2b0c7c1be7ba654bfc2677da0d05215a895.
This commit is contained in:
mohitdeuex 2025-02-03 12:11:07 +05:30
parent 03f492ad1d
commit 04cab8e71a
28 changed files with 533 additions and 980 deletions

View File

@ -33,7 +33,6 @@ import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -178,10 +177,6 @@ public final class CommonUtil {
return Optional.ofNullable(list).orElse(Collections.emptyList());
}
public static <K, V> Map<K, V> collectionOrEmpty(Map<K, V> input) {
return Optional.ofNullable(input).orElse(new HashMap<>());
}
public static <T> List<T> listOrEmptyMutable(List<T> list) {
return nullOrEmpty(list) ? new ArrayList<>() : new ArrayList<>(list);
}
@ -214,14 +209,6 @@ public final class CommonUtil {
}
}
public static <T> List<T> collectionOrDefault(List<T> c, List<T> defaultValue) {
if (nullOrEmpty(c)) {
return defaultValue;
} else {
return c;
}
}
public static String getResourceAsStream(ClassLoader loader, String file) throws IOException {
return IOUtils.toString(Objects.requireNonNull(loader.getResourceAsStream(file)), UTF_8);
}

View File

@ -14,8 +14,6 @@
package org.openmetadata.service.jdbi3;
import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.collectionOrDefault;
import static org.openmetadata.common.utils.CommonUtil.nullOrDefault;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.csv.CsvUtil.addField;
import static org.openmetadata.csv.EntityCsv.getCsvDocumentation;
@ -50,13 +48,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.csv.CsvUtil;
import org.openmetadata.schema.api.lineage.AddLineage;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.api.lineage.RelationshipRef;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.entity.data.APIEndpoint;
import org.openmetadata.schema.entity.data.Container;
import org.openmetadata.schema.entity.data.Dashboard;
@ -150,55 +144,70 @@ public class LineageRepository {
private void addLineageToSearch(
EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) {
IndexMapping sourceIndexMapping =
Entity.getSearchRepository().getIndexMapping(fromEntity.getType());
String sourceIndexName =
sourceIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias());
IndexMapping destinationIndexMapping =
Entity.getSearchRepository().getIndexMapping(toEntity.getType());
String destinationIndexName =
destinationIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias());
// For lineage from -> to (not stored) since the doc itself is the toEntity
EsLineageData lineageData =
buildEntityLineageData(fromEntity, toEntity, lineageDetails).withToEntity(null);
Map<String, Object> relationshipDetails =
buildRelationshipDetailsMap(fromEntity, toEntity, lineageDetails);
Pair<String, String> from = new ImmutablePair<>("_id", fromEntity.getId().toString());
Pair<String, String> to = new ImmutablePair<>("_id", toEntity.getId().toString());
searchClient.updateLineage(destinationIndexName, to, lineageData);
searchClient.updateLineage(sourceIndexName, from, relationshipDetails);
searchClient.updateLineage(destinationIndexName, to, relationshipDetails);
}
public static RelationshipRef buildEntityRefLineage(EntityReference entityRef) {
return new RelationshipRef()
.withId(entityRef.getId())
.withType(entityRef.getType())
.withFqn(entityRef.getFullyQualifiedName())
.withFqnHash(FullyQualifiedName.buildHash(entityRef.getFullyQualifiedName()));
public static Map<String, Object> buildEntityRefMap(EntityReference entityRef) {
Map<String, Object> details = new HashMap<>();
details.put("id", entityRef.getId().toString());
details.put("type", entityRef.getType());
details.put("fqn", entityRef.getFullyQualifiedName());
details.put("fqnHash", FullyQualifiedName.buildHash(entityRef.getFullyQualifiedName()));
return details;
}
public static EsLineageData buildEntityLineageData(
public static Map<String, Object> buildRelationshipDetailsMap(
EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) {
EsLineageData lineageData =
new EsLineageData()
.withDocId(fromEntity.getId().toString() + "-->" + toEntity.getId().toString())
.withFromEntity(buildEntityRefLineage(fromEntity));
Map<String, Object> relationshipDetails = new HashMap<>();
relationshipDetails.put(
"doc_id", fromEntity.getId().toString() + "-" + toEntity.getId().toString());
relationshipDetails.put("fromEntity", buildEntityRefMap(fromEntity));
relationshipDetails.put("toEntity", buildEntityRefMap(toEntity));
if (lineageDetails != null) {
// Add Pipeline Details
addPipelineDetails(lineageData, lineageDetails.getPipeline());
lineageData.setDescription(nullOrDefault(lineageDetails.getDescription(), null));
lineageData.setColumns(collectionOrDefault(lineageDetails.getColumnsLineage(), null));
lineageData.setSqlQuery(nullOrDefault(lineageDetails.getSqlQuery(), null));
lineageData.setSource(nullOrDefault(lineageDetails.getSource().value(), null));
addPipelineDetails(relationshipDetails, lineageDetails.getPipeline());
relationshipDetails.put(
"description",
CommonUtil.nullOrEmpty(lineageDetails.getDescription())
? null
: lineageDetails.getDescription());
if (!CommonUtil.nullOrEmpty(lineageDetails.getColumnsLineage())) {
List<Map<String, Object>> colummnLineageList = new ArrayList<>();
for (ColumnLineage columnLineage : lineageDetails.getColumnsLineage()) {
colummnLineageList.add(JsonUtils.getMap(columnLineage));
}
return lineageData;
relationshipDetails.put("columns", colummnLineageList);
}
relationshipDetails.put(
"sqlQuery",
CommonUtil.nullOrEmpty(lineageDetails.getSqlQuery())
? null
: lineageDetails.getSqlQuery());
relationshipDetails.put(
"source",
CommonUtil.nullOrEmpty(lineageDetails.getSource()) ? null : lineageDetails.getSource());
}
return relationshipDetails;
}
public static void addPipelineDetails(EsLineageData lineageData, EntityReference pipelineRef) {
if (nullOrEmpty(pipelineRef)) {
lineageData.setPipeline(null);
public static void addPipelineDetails(
Map<String, Object> relationshipDetails, EntityReference pipelineRef) {
if (CommonUtil.nullOrEmpty(pipelineRef)) {
relationshipDetails.put(PIPELINE, JsonUtils.getMap(null));
} else {
Pair<String, Map<String, Object>> pipelineOrStoredProcedure =
getPipelineOrStoredProcedure(pipelineRef, List.of("changeDescription"));
lineageData.setPipelineEntityType(pipelineOrStoredProcedure.getLeft());
lineageData.setPipeline(pipelineOrStoredProcedure.getRight());
}
}
public static Pair<String, Map<String, Object>> getPipelineOrStoredProcedure(
EntityReference pipelineRef, List<String> fieldsToRemove) {
Map<String, Object> pipelineMap;
if (pipelineRef.getType().equals(PIPELINE)) {
pipelineMap =
@ -207,8 +216,10 @@ public class LineageRepository {
} else {
pipelineMap = JsonUtils.getMap(Entity.getEntity(pipelineRef, "tags,owners", Include.ALL));
}
fieldsToRemove.forEach(pipelineMap::remove);
return Pair.of(pipelineRef.getType(), pipelineMap);
pipelineMap.remove("changeDescription");
relationshipDetails.put("pipelineEntityType", pipelineRef.getType());
relationshipDetails.put(PIPELINE, pipelineMap);
}
}
private String validateLineageDetails(
@ -238,21 +249,13 @@ public class LineageRepository {
throws IOException {
CsvDocumentation documentation = getCsvDocumentation("lineage");
List<CsvHeader> headers = documentation.getHeaders();
// TODO: Fix the lineage we need booth the lineage
SearchLineageResult result =
Map<String, Object> lineageMap =
Entity.getSearchRepository()
.searchLineageForExport(
fqn,
upstreamDepth,
downstreamDepth,
queryFilter,
deleted,
entityType,
LineageDirection.UPSTREAM);
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
CsvFile csvFile = new CsvFile().withHeaders(headers);
// TODO: Fix This
// addRecords(csvFile, result);
addRecords(csvFile, lineageMap);
return CsvUtil.formatCsv(csvFile);
}
@ -275,19 +278,10 @@ public class LineageRepository {
String entityType,
boolean deleted) {
try {
// TODO: fix Export to consider for both the nodes
SearchLineageResult response =
Response response =
Entity.getSearchRepository()
.searchLineage(
new SearchLineageRequest()
.withFqn(fqn)
.withUpstreamDepth(upstreamDepth)
.withDownstreamDepth(downstreamDepth)
.withQueryFilter(queryFilter)
.withIncludeDeleted(deleted)
.withEntityType(entityType)
.withDirection(LineageDirection.UPSTREAM));
String jsonResponse = JsonUtils.pojoToJson(response);
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
String jsonResponse = JsonUtils.pojoToJson(response.getEntity());
JsonNode rootNode = JsonUtils.readTree(jsonResponse);
Map<String, JsonNode> entityMap = new HashMap<>();

View File

@ -51,9 +51,6 @@ import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.lineage.AddLineage;
import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.type.EntityLineage;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.MetadataOperation;
@ -195,7 +192,7 @@ public class LineageResource {
mediaType = "application/json",
schema = @Schema(implementation = SearchResponse.class)))
})
public SearchLineageResult searchLineage(
public Response searchLineage(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "fqn") @QueryParam("fqn") String fqn,
@ -212,61 +209,9 @@ public class LineageResource {
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException {
return Entity.getSearchRepository()
.searchLineage(
new SearchLineageRequest()
.withFqn(fqn)
.withUpstreamDepth(upstreamDepth)
.withDownstreamDepth(downstreamDepth)
.withQueryFilter(queryFilter)
.withIncludeDeleted(deleted)
.withEntityType(entityType));
}
@GET
@Path("/getLineage/{direction}")
@Operation(
operationId = "searchLineageWithDirection",
summary = "Search lineage with Direction",
responses = {
@ApiResponse(
responseCode = "200",
description = "search response",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = SearchResponse.class)))
})
public SearchLineageResult searchLineageWithDirection(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "fqn") @QueryParam("fqn") String fqn,
@Parameter(description = "Direction", required = true, schema = @Schema(type = "string"))
@PathParam("direction")
LineageDirection direction,
@Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth,
@Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth")
int downstreamDepth,
@Parameter(
description =
"Elasticsearch query that will be combined with the query_string query generator from the `query` argument")
@QueryParam("query_filter")
String queryFilter,
@Parameter(description = "Filter documents by deleted param. By default deleted is false")
@QueryParam("includeDeleted")
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException {
return Entity.getSearchRepository()
.searchLineage(
new SearchLineageRequest()
.withFqn(fqn)
.withUpstreamDepth(upstreamDepth)
.withDownstreamDepth(downstreamDepth)
.withQueryFilter(queryFilter)
.withIncludeDeleted(deleted)
.withEntityType(entityType)
.withDirection(direction));
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}
@GET
@ -334,6 +279,8 @@ public class LineageResource {
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException {
Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}

View File

@ -7,7 +7,6 @@ import java.security.KeyStoreException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.json.JsonArray;
@ -16,10 +15,6 @@ import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.api.lineage.RelationshipRef;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.api.search.SearchSettings;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
@ -33,7 +28,6 @@ import org.openmetadata.service.resources.settings.SettingsCache;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.search.security.RBACConditionEvaluator;
import org.openmetadata.service.security.policyevaluator.SubjectContext;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.util.SSLUtil;
import os.org.opensearch.action.bulk.BulkRequest;
@ -41,9 +35,6 @@ import os.org.opensearch.action.bulk.BulkResponse;
import os.org.opensearch.client.RequestOptions;
public interface SearchClient {
String UPSTREAM_LINEAGE_FIELD = "upstreamLineage";
String FQN_FIELD = "fullyQualifiedName";
String ID_FIELD = "id";
ExecutorService asyncExecutor = Executors.newFixedThreadPool(1);
String UPDATE = "update";
@ -90,10 +81,10 @@ public interface SearchClient {
"if (ctx._source.certification != null && ctx._source.certification.tagLabel != null) {ctx._source.certification.tagLabel.style = params.style; ctx._source.certification.tagLabel.description = params.description; ctx._source.certification.tagLabel.tagFQN = params.tagFQN; ctx._source.certification.tagLabel.name = params.name; }";
String REMOVE_LINEAGE_SCRIPT =
"for (int i = 0; i < ctx._source.upstreamLineage.length; i++) { if (ctx._source.upstreamLineage[i].doc_id == '%s') { ctx._source.upstreamLineage.remove(i) }}";
"for (int i = 0; i < ctx._source.lineage.length; i++) { if (ctx._source.lineage[i].doc_id == '%s') { ctx._source.lineage.remove(i) }}";
String ADD_UPDATE_LINEAGE =
"boolean docIdExists = false; for (int i = 0; i < ctx._source.upstreamLineage.size(); i++) { if (ctx._source.upstreamLineage[i].doc_id.equalsIgnoreCase(params.lineageData.doc_id)) { ctx._source.upstreamLineage[i] = params.lineageData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.upstreamLineage.add(params.lineageData);}";
"boolean docIdExists = false; for (int i = 0; i < ctx._source.lineage.size(); i++) { if (ctx._source.lineage[i].doc_id.equalsIgnoreCase(params.lineageData.doc_id)) { ctx._source.lineage[i] = params.lineageData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.lineage.add(params.lineageData);}";
// The script is used for updating the entityRelationship attribute of the entity in ES
// It checks if any duplicate entry is present based on the doc_id and updates only if it is not
@ -197,15 +188,15 @@ public interface SearchClient {
Response searchBySourceUrl(String sourceUrl) throws IOException;
SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException;
SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest)
Response searchLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException;
SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest) throws IOException;
SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest) throws IOException;
Response searchEntityRelationship(
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException;
@ -217,10 +208,6 @@ public interface SearchClient {
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException;
Map<String, Object> searchEntityByKey(
String indexAlias, String keyName, String keyValue, List<String> fieldsToRemove)
throws IOException;
/*
Used for listing knowledge page hierarchy for a given parent and page type, used in Elastic/Open SearchClientExtension
*/
@ -233,7 +220,6 @@ public interface SearchClient {
/*
Used for listing knowledge page hierarchy for a given active Page and page type, used in Elastic/Open SearchClientExtension
*/
@SuppressWarnings("unused")
default ResultList listPageHierarchyForActivePage(
String activeFqn, String pageType, int offset, int limit) {
throw new CustomExceptionMessage(
@ -246,6 +232,15 @@ public interface SearchClient {
Response.Status.NOT_IMPLEMENTED, NOT_IMPLEMENTED_ERROR_TYPE, NOT_IMPLEMENTED_METHOD);
}
Map<String, Object> searchLineageInternal(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException;
Response searchByField(String fieldName, String fieldValue, String index) throws IOException;
Response aggregate(String index, String fieldName, String value, String query) throws IOException;
@ -296,7 +291,7 @@ public interface SearchClient {
Pair<String, Map<String, Object>> updates);
void updateLineage(
String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData);
String indexName, Pair<String, String> fieldAndValue, Map<String, Object> lineagaData);
void updateEntityRelationship(
String indexName,
@ -398,51 +393,5 @@ public interface SearchClient {
&& rbacConditionEvaluator != null;
}
// default String getLineageDirection(LineageDirection direction, String entityType) {
// if (LineageDirection.UPSTREAM.equals(direction)) {
// if (Boolean.FALSE.equals(entityType.equals(Entity.PIPELINE)) &&
// Boolean.FALSE.equals(entityType.equals(Entity.STORED_PROCEDURE))) {
// return
// }else{
//
// }
// } else {
//
// }
// return "";
// }
SearchHealthStatus getSearchHealthStatus() throws IOException;
static RelationshipRef getRelationshipRef(Map<String, Object> entityMap) {
// This assumes these keys exists in the map, use it with caution
return new RelationshipRef()
.withId(UUID.fromString(entityMap.get("id").toString()))
.withType(entityMap.get("entityType").toString())
.withFqn(entityMap.get("fullyQualifiedName").toString())
.withFqnHash(FullyQualifiedName.buildHash(entityMap.get("fullyQualifiedName").toString()));
}
static EsLineageData getEsLineageDataFromUpstreamLineage(
String fqn, List<EsLineageData> upstream) {
for (EsLineageData esLineageData : upstream) {
if (esLineageData.getFromEntity().getFqn().equals(fqn)) {
return esLineageData;
}
}
return null;
}
static EsLineageData copyEsLineageData(EsLineageData data) {
return new EsLineageData()
.withDocId(data.getDocId())
.withFromEntity(data.getFromEntity())
.withToEntity(data.getToEntity())
.withPipeline(data.getPipeline())
.withSqlQuery(data.getSqlQuery())
.withColumns(data.getColumns())
.withDescription(data.getDescription())
.withSource(data.getSource())
.withPipelineEntityType(data.getPipelineEntityType());
}
}

View File

@ -19,7 +19,6 @@ import org.openmetadata.schema.tests.Datum;
import org.openmetadata.schema.tests.type.DataQualityReportMetadata;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.Utilities;
public final class SearchIndexUtils {
@ -59,9 +58,8 @@ public final class SearchIndexUtils {
if (value instanceof Map) {
currentMap = (Map<String, Object>) value;
} else if (value instanceof List) {
List<?> list = (List<Map<String, Object>>) value;
for (Object obj : list) {
Map<String, Object> item = JsonUtils.getMap(obj);
List<Map<String, Object>> list = (List<Map<String, Object>>) value;
for (Map<String, Object> item : list) {
removeFieldByPath(
item,
Arrays.stream(pathElements, 1, pathElements.length).collect(Collectors.joining(".")));

View File

@ -60,9 +60,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.entity.classification.Tag;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
@ -961,13 +958,16 @@ public class SearchRepository {
return searchClient.searchBySourceUrl(sourceUrl);
}
public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException {
return searchClient.searchLineage(lineageRequest);
}
public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest)
public Response searchLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException {
return searchClient.searchLineage(lineageRequest);
return searchClient.searchLineage(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}
public Response searchEntityRelationship(
@ -989,24 +989,16 @@ public class SearchRepository {
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted);
}
public SearchLineageResult searchLineageForExport(
public Map<String, Object> searchLineageForExport(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType,
LineageDirection direction)
String entityType)
throws IOException {
return searchClient.searchLineage(
new SearchLineageRequest()
.withFqn(fqn)
.withUpstreamDepth(upstreamDepth)
.withDownstreamDepth(downstreamDepth)
.withQueryFilter(queryFilter)
.withIncludeDeleted(deleted)
.withEntityType(entityType)
.withDirection(direction));
return searchClient.searchLineageInternal(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}
public Response searchByField(String fieldName, String fieldValue, String index)

View File

@ -2,7 +2,6 @@ package org.openmetadata.service.search.elasticsearch;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.DATA_PRODUCT;
@ -153,11 +152,6 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.jetbrains.annotations.NotNull;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.api.lineage.RelationshipRef;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList;
@ -848,26 +842,64 @@ public class ElasticSearchClient implements SearchClient {
}
@Override
public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException {
SearchLineageResult result =
getDownStreamLineage(lineageRequest.withDirection(LineageDirection.DOWNSTREAM));
SearchLineageResult upstreamLineage =
getUpstreamLineage(lineageRequest.withDirection(LineageDirection.UPSTREAM));
// Add All nodes and edges from upstream lineage to result
result.getNodes().putAll(upstreamLineage.getNodes());
result.getUpstreamEdges().putAll(upstreamLineage.getUpstreamEdges());
return result;
public Map<String, Object> searchLineageInternal(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException {
Map<String, Object> responseMap = new HashMap<>();
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
if (entityType.equalsIgnoreCase(Entity.PIPELINE)
|| entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) {
return searchPipelineLineage(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, responseMap);
}
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
List<String> sourceFieldsToExcludeCopy = new ArrayList<>(SOURCE_FIELDS_TO_EXCLUDE);
searchSourceBuilder.fetchSource(null, sourceFieldsToExcludeCopy.toArray(String[]::new));
searchSourceBuilder.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
searchRequest.source(searchSourceBuilder.size(1000));
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
Map<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
responseMap.put("entity", tempMap);
}
getLineage(
fqn,
downstreamDepth,
edges,
nodes,
queryFilter,
"lineage.fromEntity.fqnHash.keyword",
deleted);
getLineage(
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqnHash.keyword", deleted);
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return responseMap;
}
@Override
public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest)
public Response searchLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException {
if (lineageRequest.getDirection().equals(LineageDirection.UPSTREAM)) {
return getUpstreamLineage(lineageRequest);
} else {
return getDownStreamLineage(lineageRequest);
}
Map<String, Object> responseMap =
searchLineageInternal(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
return Response.status(OK).entity(responseMap).build();
}
private void getEntityRelationship(
@ -1073,155 +1105,54 @@ public class ElasticSearchClient implements SearchClient {
return Response.status(OK).entity(responseMap).build();
}
@Override
public Map<String, Object> searchEntityByKey(
String indexAlias, String keyName, String keyValue, List<String> fieldsToRemove)
throws IOException {
es.org.elasticsearch.action.search.SearchRequest searchRequest =
getSearchRequest(indexAlias, null, keyName, keyValue, null, null, fieldsToRemove);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
int noOfHits = searchResponse.getHits().getHits().length;
if (noOfHits == 1) {
return new HashMap<>(
JsonUtils.getMap(searchResponse.getHits().getHits()[0].getSourceAsMap()));
} else {
throw new SearchException(
String.format(
"Issue in Search Entity By Key: %s, Value: %s , Number of Hits: %s",
keyName, keyValue, noOfHits));
}
}
private es.org.elasticsearch.action.search.SearchRequest getSearchRequest(
String indexAlias,
private void getLineage(
String fqn,
int depth,
Set<Map<String, Object>> edges,
Set<Map<String, Object>> nodes,
String queryFilter,
String key,
String value,
Boolean deleted,
List<String> fieldsToInclude,
List<String> fieldsToRemove) {
String direction,
boolean deleted)
throws IOException {
if (depth <= 0) {
return;
}
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(indexAlias));
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(
listOrEmpty(fieldsToInclude).toArray(String[]::new),
listOrEmpty(fieldsToRemove).toArray(String[]::new));
searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(key, value)));
if (!CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new));
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(key, value))
.must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))));
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))
.must(QueryBuilders.termQuery("deleted", deleted)));
}
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
searchRequest.source(searchSourceBuilder.size(1000));
return searchRequest;
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
List<Map<String, Object>> lineage =
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage");
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
nodes.add(tempMap);
for (Map<String, Object> lin : lineage) {
Map<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
Map<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
if (direction.equalsIgnoreCase("lineage.fromEntity.fqnHash.keyword")) {
if (!edges.contains(lin) && fromEntity.get("fqn").equals(fqn)) {
edges.add(lin);
getLineage(
toEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted);
}
public SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest)
throws IOException {
SearchLineageResult result =
new SearchLineageResult()
.withNodes(new HashMap<>())
.withUpstreamEdges(new HashMap<>())
.withDownstreamEdges(new HashMap<>());
getUpstreamLineageRecursively(lineageRequest, result);
return result;
}
private void getUpstreamLineageRecursively(
SearchLineageRequest lineageRequest, SearchLineageResult result) throws IOException {
if (lineageRequest.getUpstreamDepth() <= 0) {
return;
}
Map<String, Object> entityMap =
searchEntityByKey(
GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE);
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap);
// Contains Upstream Lineage
if (entityMap.containsKey(UPSTREAM_LINEAGE_FIELD)) {
List<EsLineageData> upStreamEntities =
JsonUtils.readOrConvertValues(
entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class);
for (EsLineageData esLineageData : upStreamEntities) {
result
.getUpstreamEdges()
.putIfAbsent(
esLineageData.getDocId(),
esLineageData.withToEntity(SearchClient.getRelationshipRef(entityMap)));
String upstreamEntityId = esLineageData.getFromEntity().getId().toString();
if (!result.getNodes().containsKey(upstreamEntityId)) {
SearchLineageRequest updatedRequest =
JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class)
.withUpstreamDepth(lineageRequest.getUpstreamDepth() - 1)
.withFqn(esLineageData.getFromEntity().getFqn());
getUpstreamLineageRecursively(updatedRequest, result);
}
}
}
}
}
public SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest)
throws IOException {
SearchLineageResult result =
new SearchLineageResult()
.withNodes(new HashMap<>())
.withUpstreamEdges(new HashMap<>())
.withDownstreamEdges(new HashMap<>());
Map<String, Object> entityMap =
searchEntityByKey(
GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE);
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap);
getDownStreamRecursively(SearchClient.getRelationshipRef(entityMap), lineageRequest, result);
}
return result;
}
private void getDownStreamRecursively(
RelationshipRef fromEntity, SearchLineageRequest lineageRequest, SearchLineageResult result)
throws IOException {
if (lineageRequest.getDownstreamDepth() <= 0) {
return;
}
es.org.elasticsearch.action.search.SearchRequest searchDownstreamEntities =
getSearchRequest(
GLOBAL_SEARCH_ALIAS,
lineageRequest.getQueryFilter(),
"upstreamLineage.fromEntity.fqnHash",
FullyQualifiedName.buildHash(lineageRequest.getFqn()),
lineageRequest.getIncludeDeleted(),
null,
SOURCE_FIELDS_TO_EXCLUDE);
SearchResponse downstreamEntities =
client.search(searchDownstreamEntities, RequestOptions.DEFAULT);
for (SearchHit searchHit : downstreamEntities.getHits().getHits()) {
Map<String, Object> entityMap = new HashMap<>(searchHit.getSourceAsMap());
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap);
List<EsLineageData> upStreamEntities =
JsonUtils.readOrConvertValues(
entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class);
EsLineageData esLineageData =
SearchClient.getEsLineageDataFromUpstreamLineage(fromEntity.getFqn(), upStreamEntities);
if (esLineageData != null) {
RelationshipRef toEntity = SearchClient.getRelationshipRef(entityMap);
result
.getDownstreamEdges()
.putIfAbsent(esLineageData.getDocId(), esLineageData.withToEntity(toEntity));
String downStreamEntityId = toEntity.getId().toString();
if (!result.getNodes().containsKey(downStreamEntityId)) {
SearchLineageRequest updatedRequest =
JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class)
.withDownstreamDepth(lineageRequest.getDownstreamDepth() - 1)
.withFqn(toEntity.getFqn());
getDownStreamRecursively(toEntity, updatedRequest, result);
} else {
if (!edges.contains(lin) && toEntity.get("fqn").equals(fqn)) {
edges.add(lin);
getLineage(
fromEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted);
}
}
}
@ -1365,6 +1296,112 @@ public class ElasticSearchClient implements SearchClient {
return client.search(searchRequest, RequestOptions.DEFAULT);
}
private Map<String, Object> searchPipelineLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
Map<String, Object> responseMap)
throws IOException {
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
Object[] searchAfter = null;
long processedRecords = 0;
long totalRecords = -1;
while (totalRecords != processedRecords) {
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
es.org.elasticsearch.index.query.BoolQueryBuilder boolQueryBuilder =
QueryBuilders.boolQuery();
boolQueryBuilder.should(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn)));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new));
FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName");
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(boolQueryBuilder);
if (searchAfter != null) {
searchSourceBuilder.searchAfter(searchAfter);
}
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(boolQueryBuilder)
.must(QueryBuilders.termQuery("deleted", deleted)));
}
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
List<Map<String, Object>> lineage =
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage");
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
nodes.add(tempMap);
for (Map<String, Object> lin : lineage) {
HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
HashMap<String, String> pipeline = (HashMap<String, String>) lin.get("pipeline");
if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) {
edges.add(lin);
getLineage(
fromEntity.get("fqn"),
upstreamDepth,
edges,
nodes,
queryFilter,
"lineage.toEntity.fqn.keyword",
deleted);
getLineage(
toEntity.get("fqn"),
downstreamDepth,
edges,
nodes,
queryFilter,
"lineage.fromEntity.fqn.keyword",
deleted);
}
}
}
totalRecords = searchResponse.getHits().getTotalHits().value;
int currentHits = searchResponse.getHits().getHits().length;
processedRecords += currentHits;
if (currentHits > 0) {
searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues();
} else {
searchAfter = null;
}
}
getLineage(
fqn, downstreamDepth, edges, nodes, queryFilter, "lineage.fromEntity.fqn.keyword", deleted);
getLineage(
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted);
// TODO: Fix this , this is hack
if (edges.isEmpty()) {
es.org.elasticsearch.action.search.SearchRequest searchRequestForEntity =
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilderForEntity = new SearchSourceBuilder();
searchSourceBuilderForEntity.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
searchRequestForEntity.source(searchSourceBuilderForEntity.size(1000));
SearchResponse searchResponseForEntity =
client.search(searchRequestForEntity, RequestOptions.DEFAULT);
for (var hit : searchResponseForEntity.getHits().getHits()) {
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
tempMap.keySet().removeAll(FIELDS_TO_REMOVE);
responseMap.put("entity", tempMap);
}
}
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return responseMap;
}
@Override
public Response searchBySourceUrl(String sourceUrl) throws IOException {
es.org.elasticsearch.action.search.SearchRequest searchRequest =
@ -2282,14 +2319,13 @@ public class ElasticSearchClient implements SearchClient {
@Override
public void updateLineage(
String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData) {
String indexName, Pair<String, String> fieldAndValue, Map<String, Object> lineageData) {
if (isClientAvailable) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName);
updateByQueryRequest.setQuery(
new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue())
.operator(Operator.AND));
Map<String, Object> params =
Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData));
Map<String, Object> params = Collections.singletonMap("lineageData", lineageData);
Script script =
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params);
updateByQueryRequest.setScript(script);

View File

@ -95,7 +95,7 @@ public class APIEndpointIndex implements SearchIndex {
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
Map<String, Object> commonAttributes = getCommonAttributesMap(apiEndpoint, Entity.API_ENDPOINT);
doc.putAll(commonAttributes);
doc.put("upstreamLineage", SearchIndex.getLineageData(apiEndpoint.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(apiEndpoint.getEntityReference()));
doc.put(
"requestSchema",
apiEndpoint.getRequestSchema() != null ? apiEndpoint.getRequestSchema() : null);

View File

@ -63,7 +63,7 @@ public record ContainerIndex(Container container) implements ColumnIndex {
doc.put("column_suggest", columnSuggest);
doc.put("serviceType", container.getServiceType());
doc.put("fullPath", container.getFullPath());
doc.put("upstreamLineage", SearchIndex.getLineageData(container.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(container.getEntityReference()));
doc.put("service", getEntityWithDisplayName(container.getService()));
return doc;
}

View File

@ -64,7 +64,7 @@ public record DashboardDataModelIndex(DashboardDataModel dashboardDataModel)
doc.put("column_suggest", columnSuggest);
doc.put("tier", parseTags.getTierTag());
doc.put("service", getEntityWithDisplayName(dashboardDataModel.getService()));
doc.put("upstreamLineage", SearchIndex.getLineageData(dashboardDataModel.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(dashboardDataModel.getEntityReference()));
doc.put("domain", getEntityWithDisplayName(dashboardDataModel.getDomain()));
return doc;
}

View File

@ -54,7 +54,7 @@ public class DashboardIndex implements SearchIndex {
doc.put("data_model_suggest", dataModelSuggest);
doc.put("service_suggest", serviceSuggest);
doc.put("serviceType", dashboard.getServiceType());
doc.put("upstreamLineage", SearchIndex.getLineageData(dashboard.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(dashboard.getEntityReference()));
doc.put("service", getEntityWithDisplayName(dashboard.getService()));
return doc;
}

View File

@ -32,7 +32,7 @@ public class MetricIndex implements SearchIndex {
public Map<String, Object> buildSearchIndexDocInternal(Map<String, Object> doc) {
Map<String, Object> commonAttributes = getCommonAttributesMap(metric, Entity.METRIC);
doc.putAll(commonAttributes);
doc.put("upstreamLineage", SearchIndex.getLineageData(metric.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(metric.getEntityReference()));
return doc;
}
}

View File

@ -35,7 +35,7 @@ public class MlModelIndex implements SearchIndex {
doc.put("tags", parseTags.getTags());
doc.put("tier", parseTags.getTierTag());
doc.put("serviceType", mlModel.getServiceType());
doc.put("upstreamLineage", SearchIndex.getLineageData(mlModel.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(mlModel.getEntityReference()));
doc.put("service", getEntityWithDisplayName(mlModel.getService()));
return doc;
}

View File

@ -51,7 +51,7 @@ public class PipelineIndex implements SearchIndex {
doc.put("task_suggest", taskSuggest);
doc.put("service_suggest", serviceSuggest);
doc.put("serviceType", pipeline.getServiceType());
doc.put("upstreamLineage", SearchIndex.getLineageData(pipeline.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(pipeline.getEntityReference()));
doc.put("service", getEntityWithDisplayName(pipeline.getService()));
return doc;
}

View File

@ -32,7 +32,7 @@ public record SearchEntityIndex(org.openmetadata.schema.entity.data.SearchIndex
doc.put("tier", parseTags.getTierTag());
doc.put("service", getEntityWithDisplayName(searchIndex.getService()));
doc.put("indexType", searchIndex.getIndexType());
doc.put("upstreamLineage", SearchIndex.getLineageData(searchIndex.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(searchIndex.getEntityReference()));
return doc;
}

View File

@ -6,7 +6,7 @@ import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME;
import static org.openmetadata.service.Entity.getEntityByName;
import static org.openmetadata.service.jdbi3.LineageRepository.buildEntityLineageData;
import static org.openmetadata.service.jdbi3.LineageRepository.buildRelationshipDetailsMap;
import static org.openmetadata.service.search.EntityBuilderConstant.DISPLAY_NAME_KEYWORD;
import static org.openmetadata.service.search.EntityBuilderConstant.FIELD_DISPLAY_NAME_NGRAM;
import static org.openmetadata.service.search.EntityBuilderConstant.FULLY_QUALIFIED_NAME;
@ -24,7 +24,6 @@ import java.util.Set;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
@ -43,7 +42,7 @@ import org.openmetadata.service.util.JsonUtils;
public interface SearchIndex {
Set<String> DEFAULT_EXCLUDED_FIELDS =
Set.of("changeDescription", "upstreamLineage.pipeline.changeDescription", "connection");
Set.of("changeDescription", "lineage.pipeline.changeDescription", "connection");
public static final SearchClient searchClient = Entity.getSearchRepository().getSearchClient();
default Map<String, Object> buildSearchIndexDoc() {
@ -150,25 +149,31 @@ public interface SearchIndex {
return nullOrEmpty(entity.getDescription()) ? "INCOMPLETE" : "COMPLETE";
}
static List<EsLineageData> getLineageData(EntityReference entity) {
return new ArrayList<>(
getLineageDataFromRefs(
entity,
Entity.getCollectionDAO()
.relationshipDAO()
.findFrom(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal())));
}
static List<EsLineageData> getLineageDataFromRefs(
EntityReference entity, List<CollectionDAO.EntityRelationshipRecord> records) {
List<EsLineageData> data = new ArrayList<>();
for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : records) {
static List<Map<String, Object>> getLineageData(EntityReference entity) {
List<Map<String, Object>> data = new ArrayList<>();
CollectionDAO dao = Entity.getCollectionDAO();
List<CollectionDAO.EntityRelationshipRecord> toRelationshipsRecords =
dao.relationshipDAO()
.findTo(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal());
for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord : toRelationshipsRecords) {
EntityReference ref =
Entity.getEntityReferenceById(
entityRelationshipRecord.getType(), entityRelationshipRecord.getId(), Include.ALL);
LineageDetails lineageDetails =
JsonUtils.readValue(entityRelationshipRecord.getJson(), LineageDetails.class);
data.add(buildEntityLineageData(ref, entity, lineageDetails));
data.add(buildRelationshipDetailsMap(entity, ref, lineageDetails));
}
List<CollectionDAO.EntityRelationshipRecord> fromRelationshipsRecords =
dao.relationshipDAO()
.findFrom(entity.getId(), entity.getType(), Relationship.UPSTREAM.ordinal());
for (CollectionDAO.EntityRelationshipRecord entityRelationshipRecord :
fromRelationshipsRecords) {
EntityReference ref =
Entity.getEntityReferenceById(
entityRelationshipRecord.getType(), entityRelationshipRecord.getId(), Include.ALL);
LineageDetails lineageDetails =
JsonUtils.readValue(entityRelationshipRecord.getJson(), LineageDetails.class);
data.add(buildRelationshipDetailsMap(ref, entity, lineageDetails));
}
return data;
}

View File

@ -30,7 +30,7 @@ public record StoredProcedureIndex(StoredProcedure storedProcedure) implements S
ParseTags parseTags =
new ParseTags(Entity.getEntityTags(Entity.STORED_PROCEDURE, storedProcedure));
doc.put("tags", parseTags.getTags());
doc.put("upstreamLineage", SearchIndex.getLineageData(storedProcedure.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(storedProcedure.getEntityReference()));
doc.put("tier", parseTags.getTierTag());
doc.put("service", getEntityWithDisplayName(storedProcedure.getService()));
return doc;

View File

@ -103,7 +103,7 @@ public record TableIndex(Table table) implements ColumnIndex {
doc.put("schemaDefinition", table.getSchemaDefinition());
doc.put("service", getEntityWithDisplayName(table.getService()));
doc.put("database", getEntityWithDisplayName(table.getDatabase()));
doc.put("upstreamLineage", SearchIndex.getLineageData(table.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(table.getEntityReference()));
doc.put("entityRelationship", SearchIndex.populateEntityRelationshipData(table));
doc.put("databaseSchema", getEntityWithDisplayName(table.getDatabaseSchema()));
return doc;

View File

@ -81,7 +81,7 @@ public class TopicIndex implements SearchIndex {
doc.put("field_suggest", fieldSuggest);
doc.put("service_suggest", serviceSuggest);
doc.put("serviceType", topic.getServiceType());
doc.put("upstreamLineage", SearchIndex.getLineageData(topic.getEntityReference()));
doc.put("lineage", SearchIndex.getLineageData(topic.getEntityReference()));
doc.put("messageSchema", topic.getMessageSchema() != null ? topic.getMessageSchema() : null);
doc.put("service", getEntityWithDisplayName(topic.getService()));
return doc;

View File

@ -2,7 +2,6 @@ package org.openmetadata.service.search.opensearch;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.DATA_PRODUCT;
@ -68,11 +67,6 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.jetbrains.annotations.NotNull;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.api.lineage.RelationshipRef;
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
import org.openmetadata.schema.api.lineage.SearchLineageResult;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList;
@ -845,26 +839,64 @@ public class OpenSearchClient implements SearchClient {
return Response.status(OK).entity(response).build();
}
@Override
public SearchLineageResult searchLineage(SearchLineageRequest lineageRequest) throws IOException {
SearchLineageResult result =
getDownStreamLineage(lineageRequest.withDirection(LineageDirection.DOWNSTREAM));
SearchLineageResult upstreamLineage =
getDownStreamLineage(lineageRequest.withDirection(LineageDirection.UPSTREAM));
// Add All nodes and edges from upstream lineage to result
result.getNodes().putAll(upstreamLineage.getNodes());
result.getUpstreamEdges().putAll(upstreamLineage.getUpstreamEdges());
return result;
}
public SearchLineageResult searchLineageWithDirection(SearchLineageRequest lineageRequest)
public Map<String, Object> searchLineageInternal(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException {
if (lineageRequest.getDirection().equals(LineageDirection.UPSTREAM)) {
return getUpstreamLineage(lineageRequest);
} else {
return getDownStreamLineage(lineageRequest);
if (entityType.equalsIgnoreCase(Entity.PIPELINE)
|| entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) {
return searchPipelineLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted);
}
Map<String, Object> responseMap = new HashMap<>();
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
List<String> sourceFieldsToExcludeCopy = new ArrayList<>(SOURCE_FIELDS_TO_EXCLUDE);
searchSourceBuilder.fetchSource(null, sourceFieldsToExcludeCopy.toArray(String[]::new));
searchSourceBuilder.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
searchRequest.source(searchSourceBuilder.size(1000));
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
tempMap.keySet().removeAll(FIELDS_TO_REMOVE);
responseMap.put("entity", tempMap);
}
getLineage(
fqn,
downstreamDepth,
edges,
nodes,
queryFilter,
"lineage.fromEntity.fqnHash.keyword",
deleted);
getLineage(
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqnHash.keyword", deleted);
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return responseMap;
}
@Override
public Response searchLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException {
Map<String, Object> responseMap =
searchLineageInternal(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
return Response.status(OK).entity(responseMap).build();
}
private void getEntityRelationship(
@ -1068,148 +1100,56 @@ public class OpenSearchClient implements SearchClient {
return Response.status(OK).entity(responseMap).build();
}
@Override
public Map<String, Object> searchEntityByKey(
String indexAlias, String keyName, String keyValue, List<String> fieldsToRemove)
throws IOException {
os.org.opensearch.action.search.SearchRequest searchRequest =
getSearchRequest(indexAlias, null, keyName, keyValue, null, null, fieldsToRemove);
os.org.opensearch.action.search.SearchResponse searchResponse =
client.search(searchRequest, RequestOptions.DEFAULT);
int noOfHits = searchResponse.getHits().getHits().length;
if (noOfHits == 1) {
return new HashMap<>(
JsonUtils.getMap(searchResponse.getHits().getHits()[0].getSourceAsMap()));
} else {
throw new SearchException(
String.format(
"Issue in Search Entity By Key: %s, Value: %s , Number of Hits: %s",
keyName, keyValue, noOfHits));
}
}
private os.org.opensearch.action.search.SearchRequest getSearchRequest(
String indexAlias,
private void getLineage(
String fqn,
int depth,
Set<Map<String, Object>> edges,
Set<Map<String, Object>> nodes,
String queryFilter,
String key,
String value,
Boolean deleted,
List<String> fieldsToInclude,
List<String> fieldsToRemove) {
String direction,
boolean deleted)
throws IOException {
if (depth <= 0) {
return;
}
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(indexAlias));
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(
listOrEmpty(fieldsToInclude).toArray(String[]::new),
listOrEmpty(fieldsToRemove).toArray(String[]::new));
searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(key, value)));
if (!CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new));
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(key, value))
.must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))));
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))
.must(QueryBuilders.termQuery("deleted", deleted)));
}
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
searchRequest.source(searchSourceBuilder.size(1000));
return searchRequest;
os.org.opensearch.action.search.SearchResponse searchResponse =
client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
List<Map<String, Object>> lineage =
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage");
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
nodes.add(tempMap);
for (Map<String, Object> lin : lineage) {
HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
if (direction.equalsIgnoreCase("lineage.fromEntity.fqnHash.keyword")) {
if (!edges.contains(lin) && fromEntity.get("fqn").equals(fqn)) {
edges.add(lin);
getLineage(
toEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted);
}
public SearchLineageResult getUpstreamLineage(SearchLineageRequest lineageRequest)
throws IOException {
SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>());
getUpstreamLineageRecursively(lineageRequest, result);
return result;
}
private void getUpstreamLineageRecursively(
SearchLineageRequest lineageRequest, SearchLineageResult result) throws IOException {
if (lineageRequest.getUpstreamDepth() <= 0) {
return;
}
Map<String, Object> entityMap =
searchEntityByKey(
GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE);
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap);
// Contains Upstream Lineage
if (entityMap.containsKey(UPSTREAM_LINEAGE_FIELD)) {
List<EsLineageData> upStreamEntities =
JsonUtils.readOrConvertValues(
entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class);
for (EsLineageData esLineageData : upStreamEntities) {
result
.getUpstreamEdges()
.putIfAbsent(
esLineageData.getDocId(),
esLineageData.withToEntity(SearchClient.getRelationshipRef(entityMap)));
String upstreamEntityId = esLineageData.getFromEntity().getId().toString();
if (!result.getNodes().containsKey(upstreamEntityId)) {
SearchLineageRequest updatedRequest =
JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class)
.withUpstreamDepth(lineageRequest.getUpstreamDepth() - 1)
.withFqn(esLineageData.getFromEntity().getFqn());
getUpstreamLineageRecursively(updatedRequest, result);
}
}
}
}
}
public SearchLineageResult getDownStreamLineage(SearchLineageRequest lineageRequest)
throws IOException {
SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>());
Map<String, Object> entityMap =
searchEntityByKey(
GLOBAL_SEARCH_ALIAS, FQN_FIELD, lineageRequest.getFqn(), SOURCE_FIELDS_TO_EXCLUDE);
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get("id").toString(), entityMap);
getDownStreamRecursively(SearchClient.getRelationshipRef(entityMap), lineageRequest, result);
}
return result;
}
private void getDownStreamRecursively(
RelationshipRef fromEntity, SearchLineageRequest lineageRequest, SearchLineageResult result)
throws IOException {
if (lineageRequest.getDownstreamDepth() <= 0) {
return;
}
os.org.opensearch.action.search.SearchRequest searchDownstreamEntities =
getSearchRequest(
GLOBAL_SEARCH_ALIAS,
lineageRequest.getQueryFilter(),
"upstreamLineage.fromEntity.fqnHash",
FullyQualifiedName.buildHash(lineageRequest.getFqn()),
lineageRequest.getIncludeDeleted(),
null,
SOURCE_FIELDS_TO_EXCLUDE);
os.org.opensearch.action.search.SearchResponse downstreamEntities =
client.search(searchDownstreamEntities, RequestOptions.DEFAULT);
for (os.org.opensearch.search.SearchHit searchHit : downstreamEntities.getHits().getHits()) {
Map<String, Object> entityMap = new HashMap<>(searchHit.getSourceAsMap());
if (!entityMap.isEmpty()) {
result.getNodes().putIfAbsent(entityMap.get(ID_FIELD).toString(), entityMap);
List<EsLineageData> upStreamEntities =
JsonUtils.readOrConvertValues(
entityMap.get(UPSTREAM_LINEAGE_FIELD), EsLineageData.class);
EsLineageData esLineageData =
SearchClient.getEsLineageDataFromUpstreamLineage(fromEntity.getFqn(), upStreamEntities);
if (esLineageData != null) {
RelationshipRef toEntity = SearchClient.getRelationshipRef(entityMap);
result
.getDownstreamEdges()
.putIfAbsent(esLineageData.getDocId(), esLineageData.withToEntity(toEntity));
String downStreamEntityId = toEntity.getId().toString();
if (!result.getNodes().containsKey(downStreamEntityId)) {
SearchLineageRequest updatedRequest =
JsonUtils.deepCopy(lineageRequest, SearchLineageRequest.class)
.withDownstreamDepth(lineageRequest.getDownstreamDepth() - 1)
.withFqn(toEntity.getFqn());
getDownStreamRecursively(toEntity, updatedRequest, result);
} else {
if (!edges.contains(lin) && toEntity.get("fqn").equals(fqn)) {
edges.add(lin);
getLineage(
fromEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted);
}
}
}
@ -1353,6 +1293,107 @@ public class OpenSearchClient implements SearchClient {
return client.search(searchRequest, RequestOptions.DEFAULT);
}
private Map<String, Object> searchPipelineLineage(
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException {
Map<String, Object> responseMap = new HashMap<>();
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
responseMap.put("entity", null);
Object[] searchAfter = null;
long processedRecords = 0;
long totalRecords = -1;
while (totalRecords != processedRecords) {
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.should(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("lineage.pipeline.fullyQualifiedName.keyword", fqn)));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(null, SOURCE_FIELDS_TO_EXCLUDE.toArray(String[]::new));
FieldSortBuilder sortBuilder = SortBuilders.fieldSort("fullyQualifiedName");
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(boolQueryBuilder);
if (searchAfter != null) {
searchSourceBuilder.searchAfter(searchAfter);
}
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(boolQueryBuilder)
.must(QueryBuilders.termQuery("deleted", deleted)));
}
buildSearchSourceFilter(queryFilter, searchSourceBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
List<Map<String, Object>> lineage =
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage");
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
nodes.add(tempMap);
for (Map<String, Object> lin : lineage) {
HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
HashMap<String, String> pipeline = (HashMap<String, String>) lin.get("pipeline");
if (pipeline != null && pipeline.get("fullyQualifiedName").equalsIgnoreCase(fqn)) {
edges.add(lin);
getLineage(
fromEntity.get("fqn"),
upstreamDepth,
edges,
nodes,
queryFilter,
"lineage.toEntity.fqn.keyword",
deleted);
getLineage(
toEntity.get("fqn"),
downstreamDepth,
edges,
nodes,
queryFilter,
"lineage.fromEntity.fqn.keyword",
deleted);
}
}
}
totalRecords = searchResponse.getHits().getTotalHits().value;
int currentHits = searchResponse.getHits().getHits().length;
processedRecords += currentHits;
if (currentHits > 0) {
searchAfter = searchResponse.getHits().getHits()[currentHits - 1].getSortValues();
} else {
searchAfter = null;
}
}
getLineage(
fqn, downstreamDepth, edges, nodes, queryFilter, "lineage.fromEntity.fqn.keyword", deleted);
getLineage(
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted);
if (edges.isEmpty()) {
os.org.opensearch.action.search.SearchRequest searchRequestForEntity =
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilderForEntity = new SearchSourceBuilder();
searchSourceBuilderForEntity.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
searchRequestForEntity.source(searchSourceBuilderForEntity.size(1000));
SearchResponse searchResponseForEntity =
client.search(searchRequestForEntity, RequestOptions.DEFAULT);
for (var hit : searchResponseForEntity.getHits().getHits()) {
HashMap<String, Object> tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap()));
tempMap.keySet().removeAll(FIELDS_TO_REMOVE);
responseMap.put("entity", tempMap);
}
}
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return responseMap;
}
private static FunctionScoreQueryBuilder boostScore(QueryStringQueryBuilder queryBuilder) {
FunctionScoreQueryBuilder.FilterFunctionBuilder tier1Boost =
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
@ -2230,14 +2271,13 @@ public class OpenSearchClient implements SearchClient {
@Override
public void updateLineage(
String indexName, Pair<String, String> fieldAndValue, EsLineageData lineageData) {
String indexName, Pair<String, String> fieldAndValue, Map<String, Object> lineagaData) {
if (isClientAvailable) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName);
updateByQueryRequest.setQuery(
new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue())
.operator(Operator.AND));
Map<String, Object> params =
Collections.singletonMap("lineageData", JsonUtils.getMap(lineageData));
Map<String, Object> params = Collections.singletonMap("lineageData", lineagaData);
Script script =
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params);
updateByQueryRequest.setScript(script);

View File

@ -1,73 +0,0 @@
{
"$id": "https://open-metadata.org/schema/api/lineage/esLineageData.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "EsLineageData",
"description": "Response object for the search lineage request from Elastic Search.",
"javaType": "org.openmetadata.schema.api.lineage.EsLineageData",
"type": "object",
"definitions": {
"relationshipRef": {
"description": "Relationship Reference to an Entity.",
"type": "object",
"properties": {
"id": {
"description": "Unique identifier of this entity instance.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"fqn": {
"description": "FullyQualifiedName of the entity.",
"$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
},
"fqnHash": {
"description": "FullyQualifiedName Hash of the entity.",
"type": "string"
},
"type": {
"description": "Type of the entity.",
"type": "string"
}
}
}
},
"properties": {
"fromEntity": {
"description": "From Entity.",
"$ref": "#/definitions/relationshipRef"
},
"toEntity": {
"description": "To Entity.",
"$ref": "#/definitions/relationshipRef"
},
"pipeline": {
"description": "Pipeline in case pipeline is present between entities."
},
"sqlQuery": {
"description": "Sql Query associated.",
"type": "string"
},
"columns": {
"description": "Columns associated.",
"type": "array",
"items": {
"$ref": "../../type/entityLineage.json#/definitions/columnLineage"
}
},
"description": {
"description": "Description.",
"type": "string"
},
"source": {
"description": "Source of the Lineage.",
"type": "string"
},
"doc_id": {
"description": "Doc Id for the Lineage.",
"type": "string"
},
"pipelineEntityType": {
"description": "Pipeline Entity or Stored procedure.",
"type": "string"
}
},
"additionalProperties": false
}

View File

@ -1,9 +0,0 @@
{
"$id": "https://open-metadata.org/schema/api/lineage/lineageDirection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "LineageDirection",
"description": "Lineage Direction Schema.",
"javaType": "org.openmetadata.schema.api.lineage.LineageDirection",
"type": "string",
"enum": ["Upstream", "Downstream"]
}

View File

@ -1,46 +0,0 @@
{
"$id": "https://open-metadata.org/schema/api/lineage/searchLineageRequest.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SearchLineageRequest",
"description": "Search Lineage Request Schema to find linage from Elastic Search.",
"javaType": "org.openmetadata.schema.api.lineage.SearchLineageRequest",
"type": "object",
"properties": {
"fqn": {
"description": "Entity Fqn to search lineage",
"type": "string"
},
"entityType": {
"description": "Query Filter",
"type": "string"
},
"direction": {
"$ref": "./lineageDirection.json"
},
"directionValue": {
"description": "Lineage Direction Value.",
"type": "string"
},
"upstreamDepth": {
"description": "The upstream depth of the lineage",
"type": "integer",
"default": 3
},
"downstreamDepth": {
"description": "The downstream depth of the lineage",
"type": "integer",
"default": 3
},
"queryFilter": {
"description": "Query Filter",
"type": "string"
},
"includeDeleted": {
"description": "Include deleted entities",
"type": "boolean",
"default": false
}
},
"required": ["fqn", "entityType", "direction"],
"additionalProperties": false
}

View File

@ -1,35 +0,0 @@
{
"$id": "https://open-metadata.org/schema/api/lineage/searchLineageResult.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SearchLineageResult",
"description": "Search Lineage Response for the Lineage Request",
"javaType": "org.openmetadata.schema.api.lineage.SearchLineageResult",
"type": "object",
"definitions": {
"entity" : {
"description": "A general object to hold any object."
}
},
"properties": {
"paging": {
"$ref": "../../type/paging.json"
},
"direction": {
"description": "Lineage Direction Value.",
"type": "string"
},
"nodes" : {
"description": "Nodes in the lineage response.",
"existingJavaType": "java.util.Map<java.lang.String, java.lang.Object>"
},
"upstreamEdges": {
"description": "Upstream Edges for the entity.",
"existingJavaType": "java.util.Map<java.lang.String, org.openmetadata.schema.api.lineage.EsLineageData>"
},
"downstreamEdges": {
"description": "Downstream Edges for the node.",
"existingJavaType": "java.util.Map<java.lang.String, org.openmetadata.schema.api.lineage.EsLineageData>"
}
},
"additionalProperties": false
}

View File

@ -1,99 +0,0 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Response object for the search lineage request from Elastic Search.
*/
export interface EsLineageData {
/**
* Columns associated.
*/
columns?: ColumnLineage[];
/**
* Description.
*/
description?: string;
/**
* Doc Id for the Lineage.
*/
doc_id?: string;
/**
* From Entity.
*/
fromEntity?: RelationshipRef;
/**
* Pipeline in case pipeline is present between entities.
*/
pipeline?: any;
/**
* Pipeline Entity or Stored procedure.
*/
pipelineEntityType?: string;
/**
* Source of the Lineage.
*/
source?: string;
/**
* Sql Query associated.
*/
sqlQuery?: string;
/**
* To Entity.
*/
toEntity?: RelationshipRef;
}
export interface ColumnLineage {
/**
* One or more source columns identified by fully qualified column name used by
* transformation function to create destination column.
*/
fromColumns?: string[];
/**
* Transformation function applied to source columns to create destination column. That is
* `function(fromColumns) -> toColumn`.
*/
function?: string;
/**
* Destination column identified by fully qualified column name created by the
* transformation of source columns.
*/
toColumn?: string;
[property: string]: any;
}
/**
* From Entity.
*
* Relationship Reference to an Entity.
*
* To Entity.
*/
export interface RelationshipRef {
/**
* FullyQualifiedName of the entity.
*/
fqn?: string;
/**
* FullyQualifiedName Hash of the entity.
*/
fqnHash?: string;
/**
* Unique identifier of this entity instance.
*/
id?: string;
/**
* Type of the entity.
*/
type?: string;
[property: string]: any;
}

View File

@ -1,19 +0,0 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Lineage Direction Schema.
*/
export enum LineageDirection {
Downstream = "Downstream",
Upstream = "Upstream",
}

View File

@ -1,54 +0,0 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Search Lineage Request Schema to find linage from Elastic Search.
*/
export interface SearchLineageRequest {
direction: LineageDirection;
/**
* Lineage Direction Value.
*/
directionValue?: string;
/**
* The downstream depth of the lineage
*/
downstreamDepth?: number;
/**
* Query Filter
*/
entityType: string;
/**
* Entity Fqn to search lineage
*/
fqn: string;
/**
* Include deleted entities
*/
includeDeleted?: boolean;
/**
* Query Filter
*/
queryFilter?: string;
/**
* The upstream depth of the lineage
*/
upstreamDepth?: number;
}
/**
* Lineage Direction Schema.
*/
export enum LineageDirection {
Downstream = "Downstream",
Upstream = "Upstream",
}

View File

@ -1,60 +0,0 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Search Lineage Response for the Lineage Request
*/
export interface SearchLineageResult {
/**
* Lineage Direction Value.
*/
direction?: string;
/**
* Downstream Edges for the node.
*/
downstreamEdges?: any;
/**
* Nodes in the lineage response.
*/
nodes?: any;
paging?: Paging;
/**
* Upstream Edges for the entity.
*/
upstreamEdges?: any;
}
/**
* Type used for cursor based pagination information in GET list responses.
*/
export interface Paging {
/**
* After cursor used for getting the next page (see API pagination for details).
*/
after?: string;
/**
* Before cursor used for getting the previous page (see API pagination for details).
*/
before?: string;
/**
* Limit used in case of offset based pagination.
*/
limit?: number;
/**
* Offset used in case of offset based pagination.
*/
offset?: number;
/**
* Total number of entries available to page through.
*/
total: number;
}