Add Lineage Export Async endpoint using Search (#18553)

* Add Lineage Export Async endpoint using Search

* Lineage api (#18593)

* add api for lineage export

* update API call for lineage async

* use JsonUtils instead of objectMapper to read lineage search response

---------

Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com>
Co-authored-by: Sweta Agarwalla <105535990+sweta1308@users.noreply.github.com>
Co-authored-by: sonikashah <sonikashah94@gmail.com>
This commit is contained in:
Sriharsha Chintalapani 2024-11-15 09:11:23 -08:00 committed by GitHub
parent 71f93633ac
commit 783ec62e4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 355 additions and 29 deletions

View File

@ -1,13 +0,0 @@
<component name="ProjectRunConfigurationManager">
<configuration default="true" type="JUnit" factoryName="JUnit">
<module name="openmetadata-service" />
<option name="PACKAGE_NAME" value="" />
<option name="MAIN_CLASS_NAME" value="" />
<option name="METHOD_NAME" value="" />
<option name="TEST_OBJECT" value="package" />
<option name="VM_PARAMETERS" value="-ea -DjdbcContainerClassName=org.testcontainers.containers.MySQLContainer -DjdbcContainerImage=mysql:8 -DelasticSearchContainerClassName=docker.elastic.co/elasticsearch/elasticsearch:8.11.4 -DopenSearchContainerClassName=opensearchproject/opensearch:1.3.0 -DrunESTestCases=false"/>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>

View File

@ -577,6 +577,11 @@
<artifactId>commons-csv</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.9</version>
</dependency>
<dependency>
<groupId>com.onelogin</groupId>
<artifactId>java-saml</artifactId>

View File

@ -30,7 +30,10 @@ import static org.openmetadata.service.Entity.TOPIC;
import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
import static org.openmetadata.service.search.SearchClient.REMOVE_LINEAGE_SCRIPT;
import com.fasterxml.jackson.databind.JsonNode;
import com.opencsv.CSVWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -66,6 +69,7 @@ import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.csv.CsvDocumentation;
import org.openmetadata.schema.type.csv.CsvFile;
import org.openmetadata.schema.type.csv.CsvHeader;
import org.openmetadata.sdk.exception.CSVExportException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
@ -254,6 +258,231 @@ public class LineageRepository {
return CsvUtil.formatCsv(csvFile);
}
public final String exportCsvAsync(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
String entityType,
boolean deleted)
throws IOException {
Response response =
Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
try {
String jsonResponse = JsonUtils.pojoToJson(response.getEntity());
JsonNode rootNode = JsonUtils.readTree(jsonResponse);
Map<String, JsonNode> entityMap = new HashMap<>();
JsonNode nodes = rootNode.path("nodes");
for (JsonNode node : nodes) {
String id = node.path("id").asText();
entityMap.put(id, node);
}
StringWriter csvContent = new StringWriter();
CSVWriter csvWriter = new CSVWriter(csvContent);
String[] headers = {
"fromEntityFQN", "fromServiceName", "fromServiceType", "fromOwners", "fromDomain",
"toEntityFQN", "toServiceName", "toServiceType", "toOwners", "toDomain",
"fromChildEntityFQN", "toChildEntityFQN"
};
csvWriter.writeNext(headers);
JsonNode edges = rootNode.path("edges");
for (JsonNode edge : edges) {
String fromEntityId = edge.path("fromEntity").path("id").asText();
String toEntityId = edge.path("toEntity").path("id").asText();
JsonNode fromEntity = entityMap.getOrDefault(fromEntityId, null);
JsonNode toEntity = entityMap.getOrDefault(toEntityId, null);
Map<String, String> baseRow = new HashMap<>();
baseRow.put("fromEntityFQN", getText(fromEntity, "fullyQualifiedName"));
baseRow.put("fromServiceName", getText(fromEntity.path("service"), "name"));
baseRow.put("fromServiceType", getText(fromEntity, "serviceType"));
baseRow.put("fromOwners", getOwners(fromEntity.path("owners")));
baseRow.put("fromDomain", getText(fromEntity, "domain"));
baseRow.put("toEntityFQN", getText(toEntity, "fullyQualifiedName"));
baseRow.put("toServiceName", getText(toEntity.path("service"), "name"));
baseRow.put("toServiceType", getText(toEntity, "serviceType"));
baseRow.put("toOwners", getOwners(toEntity.path("owners")));
baseRow.put("toDomain", getText(toEntity, "domain"));
List<String> fromChildFQNs = new ArrayList<>();
List<String> toChildFQNs = new ArrayList<>();
extractChildEntities(fromEntity, fromChildFQNs);
extractChildEntities(toEntity, toChildFQNs);
JsonNode columns = edge.path("columns");
if (columns.isArray() && !columns.isEmpty()) {
for (JsonNode columnMapping : columns) {
JsonNode fromColumns = columnMapping.path("fromColumns");
String toColumn = columnMapping.path("toColumn").asText();
for (JsonNode fromColumn : fromColumns) {
String fromChildFQN = fromColumn.asText();
String toChildFQN = toColumn;
writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN);
}
}
} else if (!fromChildFQNs.isEmpty() || !toChildFQNs.isEmpty()) {
if (!fromChildFQNs.isEmpty() && !toChildFQNs.isEmpty()) {
for (String fromChildFQN : fromChildFQNs) {
for (String toChildFQN : toChildFQNs) {
writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN);
}
}
} else if (!fromChildFQNs.isEmpty()) {
for (String fromChildFQN : fromChildFQNs) {
writeCsvRow(csvWriter, baseRow, fromChildFQN, "");
}
} else {
for (String toChildFQN : toChildFQNs) {
writeCsvRow(csvWriter, baseRow, "", toChildFQN);
}
}
} else {
writeCsvRow(csvWriter, baseRow, "", "");
}
}
csvWriter.close();
return csvContent.toString();
} catch (IOException e) {
throw CSVExportException.byMessage("Failed to export lineage data to CSV", e.getMessage());
}
}
private static void writeCsvRow(
CSVWriter csvWriter, Map<String, String> baseRow, String fromChildFQN, String toChildFQN) {
String[] row = {
baseRow.get("fromEntityFQN"),
baseRow.get("fromServiceName"),
baseRow.get("fromServiceType"),
baseRow.get("fromOwners"),
baseRow.get("fromDomain"),
baseRow.get("toEntityFQN"),
baseRow.get("toServiceName"),
baseRow.get("toServiceType"),
baseRow.get("toOwners"),
baseRow.get("toDomain"),
fromChildFQN,
toChildFQN
};
csvWriter.writeNext(row);
}
private static String getText(JsonNode node, String fieldName) {
if (node != null && node.has(fieldName)) {
JsonNode fieldNode = node.get(fieldName);
return fieldNode.isNull() ? "" : fieldNode.asText();
}
return "";
}
private static String getOwners(JsonNode ownersNode) {
if (ownersNode != null && ownersNode.isArray()) {
List<String> ownersList = new ArrayList<>();
for (JsonNode owner : ownersNode) {
String ownerName = getText(owner, "name");
if (!ownerName.isEmpty()) {
ownersList.add(ownerName);
}
}
return String.join(";", ownersList);
}
return "";
}
private static void extractChildEntities(JsonNode entityNode, List<String> childFQNs) {
if (entityNode == null) {
return;
}
String entityType = getText(entityNode, "entityType");
switch (entityType) {
case TABLE:
extractColumns(entityNode.path("columns"), childFQNs);
break;
case DASHBOARD:
extractCharts(entityNode.path("charts"), childFQNs);
break;
case SEARCH_INDEX:
extractFields(entityNode.path("fields"), childFQNs);
break;
case CONTAINER:
extractContainers(entityNode.path("children"), childFQNs);
extractColumns(entityNode.path("dataModel").path("columns"), childFQNs);
break;
case TOPIC:
extractSchemaFields(entityNode.path("messageSchema").path("schemaFields"), childFQNs);
break;
case DASHBOARD_DATA_MODEL:
extractColumns(entityNode.path("columns"), childFQNs);
break;
default:
break;
}
}
private static void extractColumns(JsonNode columnsNode, List<String> childFQNs) {
if (columnsNode != null && columnsNode.isArray()) {
for (JsonNode column : columnsNode) {
if (column != null) {
String columnFQN = getText(column, "fullyQualifiedName");
childFQNs.add(columnFQN);
extractColumns(column.path("children"), childFQNs);
}
}
}
}
private static void extractCharts(JsonNode chartsNode, List<String> childFQNs) {
if (chartsNode != null && chartsNode.isArray()) {
for (JsonNode chart : chartsNode) {
String chartFQN = getText(chart, "fullyQualifiedName");
childFQNs.add(chartFQN);
}
}
}
private static void extractFields(JsonNode fieldsNode, List<String> childFQNs) {
if (fieldsNode != null && fieldsNode.isArray()) {
for (JsonNode field : fieldsNode) {
if (field != null) {
String fieldFQN = getText(field, "fullyQualifiedName");
childFQNs.add(fieldFQN);
extractFields(field.path("children"), childFQNs);
}
}
}
}
private static void extractContainers(JsonNode containersNode, List<String> childFQNs) {
if (containersNode != null && containersNode.isArray()) {
for (JsonNode container : containersNode) {
if (container != null) {
String containerFQN = getText(container, "fullyQualifiedName");
childFQNs.add(containerFQN);
extractContainers(container.path("children"), childFQNs);
}
}
}
}
private static void extractSchemaFields(JsonNode schemaFieldsNode, List<String> childFQNs) {
if (schemaFieldsNode != null && schemaFieldsNode.isArray()) {
for (JsonNode field : schemaFieldsNode) {
if (field != null) {
String fieldFQN = getText(field, "fullyQualifiedName");
childFQNs.add(fieldFQN);
extractSchemaFields(field.path("children"), childFQNs);
}
}
}
}
private String getStringOrNull(HashMap map, String key) {
return nullOrEmpty(map.get(key)) ? "" : map.get(key).toString();
}

View File

@ -29,6 +29,7 @@ import io.swagger.v3.oas.annotations.tags.Tag;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import javax.json.JsonPatch;
import javax.validation.Valid;
import javax.validation.constraints.Max;
@ -60,6 +61,10 @@ import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContextInterface;
import org.openmetadata.service.util.AsyncService;
import org.openmetadata.service.util.CSVExportMessage;
import org.openmetadata.service.util.CSVExportResponse;
import org.openmetadata.service.util.WebsocketNotificationHandler;
@Path("/v1/lineage")
@Tag(
@ -273,10 +278,61 @@ public class LineageResource {
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException {
Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}
@GET
@Path("/exportAsync")
@Produces(MediaType.APPLICATION_JSON)
@Operation(
operationId = "exportLineage",
summary = "Export lineage",
responses = {
@ApiResponse(
responseCode = "200",
description = "search response",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CSVExportMessage.class)))
})
public Response exportLineageAsync(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "fqn") @QueryParam("fqn") String fqn,
@Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth,
@Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth")
int downstreamDepth,
@Parameter(
description =
"Elasticsearch query that will be combined with the query_string query generator from the `query` argument")
@QueryParam("query_filter")
String queryFilter,
@Parameter(description = "Filter documents by deleted param. By default deleted is false")
@QueryParam("includeDeleted")
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType) {
String jobId = UUID.randomUUID().toString();
ExecutorService executorService = AsyncService.getInstance().getExecutorService();
executorService.submit(
() -> {
try {
String csvData =
dao.exportCsvAsync(
fqn, upstreamDepth, downstreamDepth, queryFilter, entityType, deleted);
WebsocketNotificationHandler.sendCsvExportCompleteNotification(
jobId, securityContext, csvData);
} catch (Exception e) {
WebsocketNotificationHandler.sendCsvExportFailedNotification(
jobId, securityContext, e.getMessage());
}
});
CSVExportResponse response = new CSVExportResponse(jobId, "Export initiated successfully.");
return Response.accepted().entity(response).type(MediaType.APPLICATION_JSON).build();
}
@PUT
@Operation(
operationId = "addLineageEdge",

View File

@ -0,0 +1,30 @@
package org.openmetadata.sdk.exception;
import javax.ws.rs.core.Response;
public class CSVExportException extends WebServiceException {
private static final String BY_NAME_MESSAGE = "CSVExport Exception [%s] due to [%s].";
private static final String ERROR_TYPE = "CSV_EXPORT_ERROR";
public CSVExportException(String message) {
super(Response.Status.BAD_REQUEST, ERROR_TYPE, message);
}
public CSVExportException(Response.Status status, String message) {
super(status, ERROR_TYPE, message);
}
public static CSVExportException byMessage(
String name, String errorMessage, Response.Status status) {
return new CSVExportException(status, buildMessageByName(name, errorMessage));
}
public static CSVExportException byMessage(String name, String errorMessage) {
return new CSVExportException(
Response.Status.BAD_REQUEST, buildMessageByName(name, errorMessage));
}
private static String buildMessageByName(String name, String errorMessage) {
return String.format(BY_NAME_MESSAGE, name, errorMessage);
}
}

View File

@ -80,6 +80,7 @@ import { useApplicationStore } from '../../hooks/useApplicationStore';
import useCustomLocation from '../../hooks/useCustomLocation/useCustomLocation';
import { useFqn } from '../../hooks/useFqn';
import {
exportLineageAsync,
getDataQualityLineage,
getLineageDataByFQN,
updateLineageEdge,
@ -96,7 +97,6 @@ import {
getChildMap,
getClassifiedEdge,
getConnectedNodesEdges,
getExportData,
getLayoutedElements,
getLineageEdge,
getLineageEdgeForAPI,
@ -338,20 +338,14 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
const exportLineageData = useCallback(
async (_: string) => {
if (
entityType === EntityType.PIPELINE ||
entityType === EntityType.STORED_PROCEDURE
) {
// Since pipeline is an edge, we cannot create a tree, hence we take the nodes from the lineage response
// to get the export data.
return getExportData(entityLineage.nodes ?? []);
}
const { exportResult } = getChildMap(entityLineage, decodedFqn);
return exportResult;
return exportLineageAsync(
decodedFqn,
entityType,
lineageConfig,
queryFilter
);
},
[entityType, decodedFqn, entityLineage]
[entityType, decodedFqn, lineageConfig, queryFilter]
);
const onExportClick = useCallback(() => {
@ -361,7 +355,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
onExport: exportLineageData,
});
}
}, [entityType, decodedFqn, entityLineage]);
}, [entityType, decodedFqn, lineageConfig, queryFilter]);
const loadChildNodesHandler = useCallback(
async (node: SourceType, direction: EdgeTypeEnum) => {

View File

@ -11,6 +11,7 @@
* limitations under the License.
*/
import { CSVExportResponse } from '../components/Entity/EntityExportModalProvider/EntityExportModalProvider.interface';
import { LineageConfig } from '../components/Entity/EntityLineage/EntityLineage.interface';
import { EntityLineageResponse } from '../components/Lineage/Lineage.interface';
import { AddLineage } from '../generated/api/lineage/addLineage';
@ -22,6 +23,30 @@ export const updateLineageEdge = async (edge: AddLineage) => {
return response.data;
};
export const exportLineageAsync = async (
fqn: string,
entityType: string,
config?: LineageConfig,
queryFilter?: string
) => {
const { upstreamDepth = 1, downstreamDepth = 1 } = config ?? {};
const response = await APIClient.get<CSVExportResponse>(
`/lineage/exportAsync`,
{
params: {
fqn,
type: entityType,
upstreamDepth,
downstreamDepth,
query_filter: queryFilter,
includeDeleted: false,
},
}
);
return response.data;
};
export const getLineageDataByFQN = async (
fqn: string,
entityType: string,