Fix #12502: Add support for lineage export (#16009)

This commit is contained in:
Mayur Singal 2024-04-25 16:08:50 +05:30 committed by GitHub
parent 828e9abc97
commit db8102271e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 284 additions and 18 deletions

View File

@ -14,6 +14,9 @@
package org.openmetadata.service.jdbi3;
import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.csv.CsvUtil.addField;
import static org.openmetadata.csv.EntityCsv.getCsvDocumentation;
import static org.openmetadata.service.Entity.CONTAINER;
import static org.openmetadata.service.Entity.DASHBOARD;
import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL;
@ -24,8 +27,11 @@ import static org.openmetadata.service.Entity.TOPIC;
import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
import static org.openmetadata.service.search.SearchClient.REMOVE_LINEAGE_SCRIPT;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -35,6 +41,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.csv.CsvUtil;
import org.openmetadata.schema.api.lineage.AddLineage;
import org.openmetadata.schema.entity.data.Container;
import org.openmetadata.schema.entity.data.Dashboard;
@ -51,6 +58,9 @@ import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.LineageDetails;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.csv.CsvDocumentation;
import org.openmetadata.schema.type.csv.CsvFile;
import org.openmetadata.schema.type.csv.CsvHeader;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
@ -202,6 +212,80 @@ public class LineageRepository {
return JsonUtils.pojoToJson(details);
}
public final String exportCsv(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException {
CsvDocumentation DOCUMENTATION = getCsvDocumentation("lineage");
List<CsvHeader> HEADERS = DOCUMENTATION.getHeaders();
Map lineageMap =
Entity.getSearchRepository()
.searchLineageForExport(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
CsvFile csvFile = new CsvFile().withHeaders(HEADERS);
addRecords(csvFile, lineageMap);
return CsvUtil.formatCsv(csvFile);
}
private String getStringOrNull(HashMap map, String key) {
return nullOrEmpty(map.get(key)) ? "" : map.get(key).toString();
}
private String getStringOrNull(HashMap map, String key, String nestedKey) {
return nullOrEmpty(map.get(key))
? ""
: getStringOrNull((HashMap<String, Object>) map.get(key), nestedKey);
}
private String processColumnLineage(HashMap lineageMap) {
if (lineageMap.get("columns") != null) {
StringBuilder str = new StringBuilder();
Collection collection = (Collection<ColumnLineage>) lineageMap.get("columns");
HashSet<HashMap> hashSet = new HashSet<HashMap>(collection);
for (HashMap colLineage : hashSet) {
for (String fromColumn : (List<String>) colLineage.get("fromColumns")) {
str.append(fromColumn);
str.append(":");
str.append(colLineage.get("toColumn"));
str.append(";");
}
// remove the last ;
return str.toString().substring(0, str.toString().length() - 1);
}
}
return "";
}
protected void addRecords(CsvFile csvFile, Map lineageMap) {
if (lineageMap.get("edges") != null && lineageMap.get("edges") instanceof Collection<?>) {
Collection collection = (Collection<HashMap>) lineageMap.get("edges");
HashSet<HashMap> edges = new HashSet<HashMap>(collection);
List<List<String>> finalRecordList = csvFile.getRecords();
for (HashMap edge : edges) {
List<String> recordList = new ArrayList<>();
addField(recordList, getStringOrNull(edge, "fromEntity", "id"));
addField(recordList, getStringOrNull(edge, "fromEntity", "type"));
addField(recordList, getStringOrNull(edge, "fromEntity", "fqn"));
addField(recordList, getStringOrNull(edge, "toEntity", "id"));
addField(recordList, getStringOrNull(edge, "toEntity", "type"));
addField(recordList, getStringOrNull(edge, "toEntity", "fqn"));
addField(recordList, getStringOrNull(edge, "description"));
addField(recordList, getStringOrNull(edge, "pipeline", "id"));
addField(recordList, getStringOrNull(edge, "pipeline", "fullyQualifiedName"));
addField(recordList, processColumnLineage(edge));
addField(recordList, getStringOrNull(edge, "sqlQuery"));
addField(recordList, getStringOrNull(edge, "source"));
finalRecordList.add(recordList);
}
csvFile.withRecords(finalRecordList);
}
}
private void validateChildren(String columnFQN, EntityReference entityReference) {
switch (entityReference.getType()) {
case TABLE -> {

View File

@ -208,6 +208,42 @@ public class LineageResource {
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}
@GET
@Path("/export")
@Produces(MediaType.TEXT_PLAIN)
@Operation(
operationId = "exportLineage",
summary = "Export lineage",
responses = {
@ApiResponse(
responseCode = "200",
description = "search response",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = SearchResponse.class)))
})
public String exportLineage(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "fqn") @QueryParam("fqn") String fqn,
@Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth,
@Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth")
int downstreamDepth,
@Parameter(
description =
"Elasticsearch query that will be combined with the query_string query generator from the `query` argument")
@QueryParam("query_filter")
String queryFilter,
@Parameter(description = "Filter documents by deleted param. By default deleted is false")
@QueryParam("includeDeleted")
boolean deleted,
@Parameter(description = "entity type") @QueryParam("type") String entityType)
throws IOException {
return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}
@PUT
@Operation(
operationId = "addLineageEdge",

View File

@ -95,6 +95,15 @@ public interface SearchClient {
String entityType)
throws IOException;
Map<String, Object> searchLineageInternal(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException;
Response searchByField(String fieldName, String fieldValue, String index) throws IOException;
Response aggregate(String index, String fieldName, String value, String query) throws IOException;

View File

@ -690,6 +690,18 @@ public class SearchRepository {
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}
public Map<String, Object> searchLineageForExport(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException {
return searchClient.searchLineageInternal(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
}
public Response searchByField(String fieldName, String fieldValue, String index)
throws IOException {
return searchClient.searchByField(fieldName, fieldValue, index);

View File

@ -595,7 +595,7 @@ public class ElasticSearchClient implements SearchClient {
}
@Override
public Response searchLineage(
public Map<String, Object> searchLineageInternal(
String fqn,
int upstreamDepth,
int downstreamDepth,
@ -629,6 +629,21 @@ public class ElasticSearchClient implements SearchClient {
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted);
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return responseMap;
}
@Override
public Response searchLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException {
Map<String, Object> responseMap =
searchLineageInternal(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
return Response.status(OK).entity(responseMap).build();
}
@ -697,7 +712,7 @@ public class ElasticSearchClient implements SearchClient {
}
}
private Response searchPipelineLineage(
private Map<String, Object> searchPipelineLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
@ -784,7 +799,7 @@ public class ElasticSearchClient implements SearchClient {
}
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return Response.status(OK).entity(responseMap).build();
return responseMap;
}
@Override

View File

@ -596,8 +596,7 @@ public class OpenSearchClient implements SearchClient {
return Response.status(OK).entity(response).build();
}
@Override
public Response searchLineage(
public Map<String, Object> searchLineageInternal(
String fqn,
int upstreamDepth,
int downstreamDepth,
@ -605,14 +604,13 @@ public class OpenSearchClient implements SearchClient {
boolean deleted,
String entityType)
throws IOException {
if (entityType.equalsIgnoreCase(Entity.PIPELINE)
|| entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) {
return searchPipelineLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted);
}
Map<String, Object> responseMap = new HashMap<>();
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
if (entityType.equalsIgnoreCase(Entity.PIPELINE)
|| entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) {
return searchPipelineLineage(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, responseMap);
}
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@ -631,6 +629,21 @@ public class OpenSearchClient implements SearchClient {
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted);
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return responseMap;
}
@Override
public Response searchLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
String entityType)
throws IOException {
Map<String, Object> responseMap =
searchLineageInternal(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
return Response.status(OK).entity(responseMap).build();
}
@ -700,14 +713,10 @@ public class OpenSearchClient implements SearchClient {
}
}
private Response searchPipelineLineage(
String fqn,
int upstreamDepth,
int downstreamDepth,
String queryFilter,
boolean deleted,
Map<String, Object> responseMap)
private Map<String, Object> searchPipelineLineage(
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException {
Map<String, Object> responseMap = new HashMap<>();
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
responseMap.put("entity", null);
@ -787,7 +796,7 @@ public class OpenSearchClient implements SearchClient {
}
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return Response.status(OK).entity(responseMap).build();
return responseMap;
}
private static ScriptScoreFunctionBuilder boostScore() {

View File

@ -0,0 +1,101 @@
{
"summary": "Lineage CSV file is used for exporting lineage of an entity.",
"headers": [
{
"name": "fromId",
"required": true,
"description": "The UUID of the source entity",
"examples": [
"`123e4567-e89b-12d3-a456-426614174000`"
]
},
{
"name": "fromEntityType",
"required": true,
"description": "Entity type of the source entity",
"examples": [
"`table`, `topic`"
]
},
{
"name": "fromFullyQualifiedName",
"required": true,
"description": "Fully qualified name of the source entity",
"examples": [
"`Redshift_DWH.prod.sales.customer`"
]
},
{
"name": "toId",
"required": true,
"description": "The UUID of the destination entity",
"examples": [
"`123e4567-e89b-12d3-a456-426614174000`"
]
},
{
"name": "toEntityType",
"required": true,
"description": "Entity type of the destination entity",
"examples": [
"`table`, `topic`"
]
},
{
"name": "toFullyQualifiedName",
"required": true,
"description": "Fully qualified name of the destination entity",
"examples": [
"`Redshift_DWH.prod.sales.customer`"
]
},
{
"name": "description",
"required": false,
"description": "Description for the lineage edge.",
"examples": [
"Data flow from `Redshift_DWH.prod.sales.customer` to `Redshift_DWH.prod.sales.orders`"
]
},
{
"name": "pipelineId",
"required": false,
"description": "Id of the pipeline involved in lineage",
"examples": [
"`123e4567-e89b-12d3-a456-426614174000`"
]
},
{
"name": "pipelineFullyQualifiedName",
"required": false,
"description": "Fully Qualified Name of the pipeline involved in lineage",
"examples": [
"`Airflow.transformation_pipeline`"
]
},
{
"name": "columnLineage",
"required": false,
"description": "Columns involved in the lineage in format `fromCol1:toCol1;fromCol2:toCol2`.",
"examples": [
"`sample_data.ecommerce_db.shopify.raw_customer.comments:sample_data.ecommerce_db.shopify.dim_address.address_id;sample_data.ecommerce_db.shopify.raw_customer.creditcard:sample_data.ecommerce_db.shopify.dim_address.address_id`"
]
},
{
"name": "sqlQuery",
"required": false,
"description": "SQL used for transformation",
"examples": [
"`create table dest as select * from source`"
]
},
{
"name": "source",
"required": false,
"description": "Source of lineage information",
"examples": [
"`Manual`, `ViewLineage`, `PipelineLineage`"
]
}
]
}