diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index 6fdd97a7a76..25407b7046b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -14,6 +14,9 @@ package org.openmetadata.service.jdbi3; import static javax.ws.rs.core.Response.Status.OK; +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.csv.CsvUtil.addField; +import static org.openmetadata.csv.EntityCsv.getCsvDocumentation; import static org.openmetadata.service.Entity.CONTAINER; import static org.openmetadata.service.Entity.DASHBOARD; import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL; @@ -24,8 +27,11 @@ import static org.openmetadata.service.Entity.TOPIC; import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS; import static org.openmetadata.service.search.SearchClient.REMOVE_LINEAGE_SCRIPT; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.UUID; @@ -35,6 +41,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.common.utils.CommonUtil; +import org.openmetadata.csv.CsvUtil; import org.openmetadata.schema.api.lineage.AddLineage; import org.openmetadata.schema.entity.data.Container; import org.openmetadata.schema.entity.data.Dashboard; @@ -51,6 +58,9 @@ import org.openmetadata.schema.type.EventType; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.LineageDetails; import org.openmetadata.schema.type.Relationship; +import org.openmetadata.schema.type.csv.CsvDocumentation; +import org.openmetadata.schema.type.csv.CsvFile; +import org.openmetadata.schema.type.csv.CsvHeader; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.CatalogExceptionMessage; import org.openmetadata.service.exception.EntityNotFoundException; @@ -202,6 +212,80 @@ public class LineageRepository { return JsonUtils.pojoToJson(details); } + public final String exportCsv( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) + throws IOException { + CsvDocumentation DOCUMENTATION = getCsvDocumentation("lineage"); + List HEADERS = DOCUMENTATION.getHeaders(); + Map lineageMap = + Entity.getSearchRepository() + .searchLineageForExport( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + CsvFile csvFile = new CsvFile().withHeaders(HEADERS); + + addRecords(csvFile, lineageMap); + return CsvUtil.formatCsv(csvFile); + } + + private String getStringOrNull(HashMap map, String key) { + return nullOrEmpty(map.get(key)) ? "" : map.get(key).toString(); + } + + private String getStringOrNull(HashMap map, String key, String nestedKey) { + return nullOrEmpty(map.get(key)) + ? "" + : getStringOrNull((HashMap) map.get(key), nestedKey); + } + + private String processColumnLineage(HashMap lineageMap) { + if (lineageMap.get("columns") != null) { + StringBuilder str = new StringBuilder(); + Collection collection = (Collection) lineageMap.get("columns"); + HashSet hashSet = new HashSet(collection); + for (HashMap colLineage : hashSet) { + for (String fromColumn : (List) colLineage.get("fromColumns")) { + str.append(fromColumn); + str.append(":"); + str.append(colLineage.get("toColumn")); + str.append(";"); + } + // remove the last ; + return str.toString().substring(0, str.toString().length() - 1); + } + } + return ""; + } + + protected void addRecords(CsvFile csvFile, Map lineageMap) { + if (lineageMap.get("edges") != null && lineageMap.get("edges") instanceof Collection) { + Collection collection = (Collection) lineageMap.get("edges"); + HashSet edges = new HashSet(collection); + List> finalRecordList = csvFile.getRecords(); + for (HashMap edge : edges) { + List recordList = new ArrayList<>(); + addField(recordList, getStringOrNull(edge, "fromEntity", "id")); + addField(recordList, getStringOrNull(edge, "fromEntity", "type")); + addField(recordList, getStringOrNull(edge, "fromEntity", "fqn")); + addField(recordList, getStringOrNull(edge, "toEntity", "id")); + addField(recordList, getStringOrNull(edge, "toEntity", "type")); + addField(recordList, getStringOrNull(edge, "toEntity", "fqn")); + addField(recordList, getStringOrNull(edge, "description")); + addField(recordList, getStringOrNull(edge, "pipeline", "id")); + addField(recordList, getStringOrNull(edge, "pipeline", "fullyQualifiedName")); + addField(recordList, processColumnLineage(edge)); + addField(recordList, getStringOrNull(edge, "sqlQuery")); + addField(recordList, getStringOrNull(edge, "source")); + finalRecordList.add(recordList); + } + csvFile.withRecords(finalRecordList); + } + } + private void validateChildren(String columnFQN, EntityReference entityReference) { switch (entityReference.getType()) { case TABLE -> { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java index 880b10c294b..8038728f782 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/lineage/LineageResource.java @@ -208,6 +208,42 @@ public class LineageResource { .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } + @GET + @Path("/export") + @Produces(MediaType.TEXT_PLAIN) + @Operation( + operationId = "exportLineage", + summary = "Export lineage", + responses = { + @ApiResponse( + responseCode = "200", + description = "search response", + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = SearchResponse.class))) + }) + public String exportLineage( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "fqn") @QueryParam("fqn") String fqn, + @Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth, + @Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth") + int downstreamDepth, + @Parameter( + description = + "Elasticsearch query that will be combined with the query_string query generator from the `query` argument") + @QueryParam("query_filter") + String queryFilter, + @Parameter(description = "Filter documents by deleted param. By default deleted is false") + @QueryParam("includeDeleted") + boolean deleted, + @Parameter(description = "entity type") @QueryParam("type") String entityType) + throws IOException { + + return dao.exportCsv(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + } + @PUT @Operation( operationId = "addLineageEdge", diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index b2d6ec7d35b..0e07d5238b3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -95,6 +95,15 @@ public interface SearchClient { String entityType) throws IOException; + Map searchLineageInternal( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) + throws IOException; + Response searchByField(String fieldName, String fieldValue, String index) throws IOException; Response aggregate(String index, String fieldName, String value, String query) throws IOException; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index a3e5a2204cc..63dfb495e31 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -690,6 +690,18 @@ public class SearchRepository { fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } + public Map searchLineageForExport( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) + throws IOException { + return searchClient.searchLineageInternal( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); + } + public Response searchByField(String fieldName, String fieldValue, String index) throws IOException { return searchClient.searchByField(fieldName, fieldValue, index); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index ac0bb2708b2..7a9339d7463 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -595,7 +595,7 @@ public class ElasticSearchClient implements SearchClient { } @Override - public Response searchLineage( + public Map searchLineageInternal( String fqn, int upstreamDepth, int downstreamDepth, @@ -629,6 +629,21 @@ public class ElasticSearchClient implements SearchClient { fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted); responseMap.put("edges", edges); responseMap.put("nodes", nodes); + return responseMap; + } + + @Override + public Response searchLineage( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) + throws IOException { + Map responseMap = + searchLineageInternal( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); return Response.status(OK).entity(responseMap).build(); } @@ -697,7 +712,7 @@ public class ElasticSearchClient implements SearchClient { } } - private Response searchPipelineLineage( + private Map searchPipelineLineage( String fqn, int upstreamDepth, int downstreamDepth, @@ -784,7 +799,7 @@ public class ElasticSearchClient implements SearchClient { } responseMap.put("edges", edges); responseMap.put("nodes", nodes); - return Response.status(OK).entity(responseMap).build(); + return responseMap; } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 45851e6fdb3..8eb30e090a8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -596,8 +596,7 @@ public class OpenSearchClient implements SearchClient { return Response.status(OK).entity(response).build(); } - @Override - public Response searchLineage( + public Map searchLineageInternal( String fqn, int upstreamDepth, int downstreamDepth, @@ -605,14 +604,13 @@ public class OpenSearchClient implements SearchClient { boolean deleted, String entityType) throws IOException { + if (entityType.equalsIgnoreCase(Entity.PIPELINE) + || entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) { + return searchPipelineLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); + } Map responseMap = new HashMap<>(); Set> edges = new HashSet<>(); Set> nodes = new HashSet<>(); - if (entityType.equalsIgnoreCase(Entity.PIPELINE) - || entityType.equalsIgnoreCase(Entity.STORED_PROCEDURE)) { - return searchPipelineLineage( - fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, responseMap); - } os.org.opensearch.action.search.SearchRequest searchRequest = new os.org.opensearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); @@ -631,6 +629,21 @@ public class OpenSearchClient implements SearchClient { fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted); responseMap.put("edges", edges); responseMap.put("nodes", nodes); + return responseMap; + } + + @Override + public Response searchLineage( + String fqn, + int upstreamDepth, + int downstreamDepth, + String queryFilter, + boolean deleted, + String entityType) + throws IOException { + Map responseMap = + searchLineageInternal( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); return Response.status(OK).entity(responseMap).build(); } @@ -700,14 +713,10 @@ public class OpenSearchClient implements SearchClient { } } - private Response searchPipelineLineage( - String fqn, - int upstreamDepth, - int downstreamDepth, - String queryFilter, - boolean deleted, - Map responseMap) + private Map searchPipelineLineage( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) throws IOException { + Map responseMap = new HashMap<>(); Set> edges = new HashSet<>(); Set> nodes = new HashSet<>(); responseMap.put("entity", null); @@ -787,7 +796,7 @@ public class OpenSearchClient implements SearchClient { } responseMap.put("edges", edges); responseMap.put("nodes", nodes); - return Response.status(OK).entity(responseMap).build(); + return responseMap; } private static ScriptScoreFunctionBuilder boostScore() { diff --git a/openmetadata-service/src/main/resources/json/data/lineage/lineageCsvDocumentation.json b/openmetadata-service/src/main/resources/json/data/lineage/lineageCsvDocumentation.json new file mode 100644 index 00000000000..2ee9fd6ee5a --- /dev/null +++ b/openmetadata-service/src/main/resources/json/data/lineage/lineageCsvDocumentation.json @@ -0,0 +1,101 @@ +{ + "summary": "Lineage CSV file is used for exporting lineage of an entity.", + "headers": [ + { + "name": "fromId", + "required": true, + "description": "The UUID of the source entity", + "examples": [ + "`123e4567-e89b-12d3-a456-426614174000`" + ] + }, + { + "name": "fromEntityType", + "required": true, + "description": "Entity type of the source entity", + "examples": [ + "`table`, `topic`" + ] + }, + { + "name": "fromFullyQualifiedName", + "required": true, + "description": "Fully qualified name of the source entity", + "examples": [ + "`Redshift_DWH.prod.sales.customer`" + ] + }, + { + "name": "toId", + "required": true, + "description": "The UUID of the destination entity", + "examples": [ + "`123e4567-e89b-12d3-a456-426614174000`" + ] + }, + { + "name": "toEntityType", + "required": true, + "description": "Entity type of the destination entity", + "examples": [ + "`table`, `topic`" + ] + }, + { + "name": "toFullyQualifiedName", + "required": true, + "description": "Fully qualified name of the destination entity", + "examples": [ + "`Redshift_DWH.prod.sales.customer`" + ] + }, + { + "name": "description", + "required": false, + "description": "Description for the lineage edge.", + "examples": [ + "Data flow from `Redshift_DWH.prod.sales.customer` to `Redshift_DWH.prod.sales.orders`" + ] + }, + { + "name": "pipelineId", + "required": false, + "description": "Id of the pipeline involved in lineage", + "examples": [ + "`123e4567-e89b-12d3-a456-426614174000`" + ] + }, + { + "name": "pipelineFullyQualifiedName", + "required": false, + "description": "Fully Qualified Name of the pipeline involved in lineage", + "examples": [ + "`Airflow.transformation_pipeline`" + ] + }, + { + "name": "columnLineage", + "required": false, + "description": "Columns involved in the lineage in format `fromCol1:toCol1;fromCol2:toCol2`.", + "examples": [ + "`sample_data.ecommerce_db.shopify.raw_customer.comments:sample_data.ecommerce_db.shopify.dim_address.address_id;sample_data.ecommerce_db.shopify.raw_customer.creditcard:sample_data.ecommerce_db.shopify.dim_address.address_id`" + ] + }, + { + "name": "sqlQuery", + "required": false, + "description": "SQL used for transformation", + "examples": [ + "`create table dest as select * from source`" + ] + }, + { + "name": "source", + "required": false, + "description": "Source of lineage information", + "examples": [ + "`Manual`, `ViewLineage`, `PipelineLineage`" + ] + } + ] + } \ No newline at end of file