Minor: Fix Column Level Lineage export (#18670)

(cherry picked from commit 19497db455eda7e3c4350170dd6fe1a4b794016b)
This commit is contained in:
Sriharsha Chintalapani 2024-11-18 07:29:04 -08:00 committed by karanh37
parent 8eb66cd807
commit a72db2ca19
2 changed files with 131 additions and 135 deletions

View File

@ -101,6 +101,7 @@ public final class CatalogExceptionMessage {
public static final String INVALID_BOT_USER = "Revoke Token can only be applied to Bot Users."; public static final String INVALID_BOT_USER = "Revoke Token can only be applied to Bot Users.";
public static final String NO_MANUAL_TRIGGER_ERR = "App does not support manual trigger."; public static final String NO_MANUAL_TRIGGER_ERR = "App does not support manual trigger.";
public static final String INVALID_APP_TYPE = "Application Type is not valid."; public static final String INVALID_APP_TYPE = "Application Type is not valid.";
public static final String CSV_EXPORT_FAILED = "CSV Export Failed.";
private CatalogExceptionMessage() {} private CatalogExceptionMessage() {}

View File

@ -42,6 +42,8 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
import javax.json.JsonPatch; import javax.json.JsonPatch;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.transaction.Transaction; import org.jdbi.v3.sqlobject.transaction.Transaction;
@ -255,19 +257,28 @@ public class LineageRepository {
return CsvUtil.formatCsv(csvFile); return CsvUtil.formatCsv(csvFile);
} }
@Getter
private static class ColumnMapping {
String fromChildFQN;
String toChildFQN;
ColumnMapping(String from, String to) {
this.fromChildFQN = from;
this.toChildFQN = to;
}
}
public final String exportCsvAsync( public final String exportCsvAsync(
String fqn, String fqn,
int upstreamDepth, int upstreamDepth,
int downstreamDepth, int downstreamDepth,
String queryFilter, String queryFilter,
String entityType, String entityType,
boolean deleted) boolean deleted) {
throws IOException {
Response response =
Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
try { try {
Response response =
Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
String jsonResponse = JsonUtils.pojoToJson(response.getEntity()); String jsonResponse = JsonUtils.pojoToJson(response.getEntity());
JsonNode rootNode = JsonUtils.readTree(jsonResponse); JsonNode rootNode = JsonUtils.readTree(jsonResponse);
@ -281,11 +292,28 @@ public class LineageRepository {
StringWriter csvContent = new StringWriter(); StringWriter csvContent = new StringWriter();
CSVWriter csvWriter = new CSVWriter(csvContent); CSVWriter csvWriter = new CSVWriter(csvContent);
String[] headers = { String[] headers = {
"fromEntityFQN", "fromServiceName", "fromServiceType", "fromOwners", "fromDomain", "fromEntityFQN",
"toEntityFQN", "toServiceName", "toServiceType", "toOwners", "toDomain", "fromServiceName",
"fromChildEntityFQN", "toChildEntityFQN" "fromServiceType",
"fromOwners",
"fromDomain",
"toEntityFQN",
"toServiceName",
"toServiceType",
"toOwners",
"toDomain",
"fromChildEntityFQN",
"toChildEntityFQN",
"pipelineName",
"pipelineType",
"pipelineDescription",
"pipelineOwners",
"pipelineDomain",
"pipelineServiceName",
"pipelineServiceType"
}; };
csvWriter.writeNext(headers); csvWriter.writeNext(headers);
JsonNode edges = rootNode.path("edges"); JsonNode edges = rootNode.path("edges");
for (JsonNode edge : edges) { for (JsonNode edge : edges) {
String fromEntityId = edge.path("fromEntity").path("id").asText(); String fromEntityId = edge.path("fromEntity").path("id").asText();
@ -299,50 +327,59 @@ public class LineageRepository {
baseRow.put("fromServiceName", getText(fromEntity.path("service"), "name")); baseRow.put("fromServiceName", getText(fromEntity.path("service"), "name"));
baseRow.put("fromServiceType", getText(fromEntity, "serviceType")); baseRow.put("fromServiceType", getText(fromEntity, "serviceType"));
baseRow.put("fromOwners", getOwners(fromEntity.path("owners"))); baseRow.put("fromOwners", getOwners(fromEntity.path("owners")));
baseRow.put("fromDomain", getText(fromEntity, "domain")); baseRow.put("fromDomain", getDomainFQN(fromEntity.path("domain")));
baseRow.put("toEntityFQN", getText(toEntity, "fullyQualifiedName")); baseRow.put("toEntityFQN", getText(toEntity, "fullyQualifiedName"));
baseRow.put("toServiceName", getText(toEntity.path("service"), "name")); baseRow.put("toServiceName", getText(toEntity.path("service"), "name"));
baseRow.put("toServiceType", getText(toEntity, "serviceType")); baseRow.put("toServiceType", getText(toEntity, "serviceType"));
baseRow.put("toOwners", getOwners(toEntity.path("owners"))); baseRow.put("toOwners", getOwners(toEntity.path("owners")));
baseRow.put("toDomain", getText(toEntity, "domain")); baseRow.put("toDomain", getDomainFQN(toEntity.path("domain")));
List<String> fromChildFQNs = new ArrayList<>();
List<String> toChildFQNs = new ArrayList<>();
extractChildEntities(fromEntity, fromChildFQNs);
extractChildEntities(toEntity, toChildFQNs);
JsonNode columns = edge.path("columns"); JsonNode columns = edge.path("columns");
if (columns.isArray() && !columns.isEmpty()) { if (columns.isArray() && !columns.isEmpty()) {
for (JsonNode columnMapping : columns) { List<ColumnMapping> explicitColumnMappings = extractColumnMappingsFromEdge(columns);
JsonNode fromColumns = columnMapping.path("fromColumns"); for (ColumnMapping mapping : explicitColumnMappings) {
String toColumn = columnMapping.path("toColumn").asText(); writeCsvRow(
csvWriter,
baseRow,
mapping.getFromChildFQN(),
mapping.getToChildFQN(),
"",
"",
"",
"",
"",
"",
"");
LOG.debug(
"Exported explicit ColumnMapping: from='{}', to='{}'",
mapping.getFromChildFQN(),
mapping.getToChildFQN());
}
}
for (JsonNode fromColumn : fromColumns) { JsonNode pipeline = edge.path("pipeline");
String fromChildFQN = fromColumn.asText(); if (!pipeline.isMissingNode() && !pipeline.isNull()) {
String toChildFQN = toColumn; String pipelineName = getText(pipeline, "name");
writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN); String pipelineType = getText(pipeline, "serviceType");
} String pipelineDescription = getText(pipeline, "description");
} String pipelineOwners = getOwners(pipeline.path("owners"));
} else if (!fromChildFQNs.isEmpty() || !toChildFQNs.isEmpty()) { String pipelineServiceName = getText(pipeline.path("service"), "name");
if (!fromChildFQNs.isEmpty() && !toChildFQNs.isEmpty()) { String pipelineServiceType = getText(pipeline, "serviceType");
for (String fromChildFQN : fromChildFQNs) { String pipelineDomain = getDomainFQN(pipeline.path("domain"));
for (String toChildFQN : toChildFQNs) { writeCsvRow(
writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN); csvWriter,
} baseRow,
} "",
} else if (!fromChildFQNs.isEmpty()) { "",
for (String fromChildFQN : fromChildFQNs) { pipelineName,
writeCsvRow(csvWriter, baseRow, fromChildFQN, ""); pipelineType,
} pipelineDescription,
} else { pipelineOwners,
for (String toChildFQN : toChildFQNs) { pipelineDomain,
writeCsvRow(csvWriter, baseRow, "", toChildFQN); pipelineServiceName,
} pipelineServiceType);
} LOG.debug("Exported Pipeline Information: {}", pipelineName);
} else {
writeCsvRow(csvWriter, baseRow, "", "");
} }
} }
csvWriter.close(); csvWriter.close();
@ -353,20 +390,37 @@ public class LineageRepository {
} }
private static void writeCsvRow( private static void writeCsvRow(
CSVWriter csvWriter, Map<String, String> baseRow, String fromChildFQN, String toChildFQN) { CSVWriter csvWriter,
Map<String, String> baseRow,
String fromChildFQN,
String toChildFQN,
String pipelineName,
String pipelineType,
String pipelineDescription,
String pipelineOwners,
String pipelineDomain,
String pipelineServiceName,
String pipelineServiceType) {
String[] row = { String[] row = {
baseRow.get("fromEntityFQN"), baseRow.getOrDefault("fromEntityFQN", ""),
baseRow.get("fromServiceName"), baseRow.getOrDefault("fromServiceName", ""),
baseRow.get("fromServiceType"), baseRow.getOrDefault("fromServiceType", ""),
baseRow.get("fromOwners"), baseRow.getOrDefault("fromOwners", ""),
baseRow.get("fromDomain"), baseRow.getOrDefault("fromDomain", ""),
baseRow.get("toEntityFQN"), baseRow.getOrDefault("toEntityFQN", ""),
baseRow.get("toServiceName"), baseRow.getOrDefault("toServiceName", ""),
baseRow.get("toServiceType"), baseRow.getOrDefault("toServiceType", ""),
baseRow.get("toOwners"), baseRow.getOrDefault("toOwners", ""),
baseRow.get("toDomain"), baseRow.getOrDefault("toDomain", ""),
fromChildFQN, fromChildFQN,
toChildFQN toChildFQN,
pipelineName,
pipelineType,
pipelineDescription,
pipelineOwners,
pipelineDomain,
pipelineServiceName,
pipelineServiceType
}; };
csvWriter.writeNext(row); csvWriter.writeNext(row);
} }
@ -383,7 +437,7 @@ public class LineageRepository {
if (ownersNode != null && ownersNode.isArray()) { if (ownersNode != null && ownersNode.isArray()) {
List<String> ownersList = new ArrayList<>(); List<String> ownersList = new ArrayList<>();
for (JsonNode owner : ownersNode) { for (JsonNode owner : ownersNode) {
String ownerName = getText(owner, "name"); String ownerName = getText(owner, "displayName");
if (!ownerName.isEmpty()) { if (!ownerName.isEmpty()) {
ownersList.add(ownerName); ownersList.add(ownerName);
} }
@ -393,91 +447,32 @@ public class LineageRepository {
return ""; return "";
} }
private static void extractChildEntities(JsonNode entityNode, List<String> childFQNs) { private static String getDomainFQN(JsonNode domainNode) {
if (entityNode == null) { if (domainNode != null && domainNode.has("fullyQualifiedName")) {
return; JsonNode fqnNode = domainNode.get("fullyQualifiedName");
} return fqnNode.isNull() ? "" : fqnNode.asText();
String entityType = getText(entityNode, "entityType");
switch (entityType) {
case TABLE:
extractColumns(entityNode.path("columns"), childFQNs);
break;
case DASHBOARD:
extractCharts(entityNode.path("charts"), childFQNs);
break;
case SEARCH_INDEX:
extractFields(entityNode.path("fields"), childFQNs);
break;
case CONTAINER:
extractContainers(entityNode.path("children"), childFQNs);
extractColumns(entityNode.path("dataModel").path("columns"), childFQNs);
break;
case TOPIC:
extractSchemaFields(entityNode.path("messageSchema").path("schemaFields"), childFQNs);
break;
case DASHBOARD_DATA_MODEL:
extractColumns(entityNode.path("columns"), childFQNs);
break;
default:
break;
} }
return "";
} }
private static void extractColumns(JsonNode columnsNode, List<String> childFQNs) { private static List<ColumnMapping> extractColumnMappingsFromEdge(JsonNode columnsNode) {
List<ColumnMapping> mappings = new ArrayList<>();
if (columnsNode != null && columnsNode.isArray()) { if (columnsNode != null && columnsNode.isArray()) {
for (JsonNode column : columnsNode) { for (JsonNode columnMapping : columnsNode) {
if (column != null) { JsonNode fromColumns = columnMapping.path("fromColumns");
String columnFQN = getText(column, "fullyQualifiedName"); String toColumn = columnMapping.path("toColumn").asText().trim();
childFQNs.add(columnFQN);
extractColumns(column.path("children"), childFQNs); if (fromColumns.isArray() && !toColumn.isEmpty()) {
} for (JsonNode fromColumn : fromColumns) {
} String fromChildFQN = fromColumn.asText().trim();
} if (!fromChildFQN.isEmpty()) {
} mappings.add(new ColumnMapping(fromChildFQN, toColumn));
}
private static void extractCharts(JsonNode chartsNode, List<String> childFQNs) { }
if (chartsNode != null && chartsNode.isArray()) {
for (JsonNode chart : chartsNode) {
String chartFQN = getText(chart, "fullyQualifiedName");
childFQNs.add(chartFQN);
}
}
}
private static void extractFields(JsonNode fieldsNode, List<String> childFQNs) {
if (fieldsNode != null && fieldsNode.isArray()) {
for (JsonNode field : fieldsNode) {
if (field != null) {
String fieldFQN = getText(field, "fullyQualifiedName");
childFQNs.add(fieldFQN);
extractFields(field.path("children"), childFQNs);
}
}
}
}
private static void extractContainers(JsonNode containersNode, List<String> childFQNs) {
if (containersNode != null && containersNode.isArray()) {
for (JsonNode container : containersNode) {
if (container != null) {
String containerFQN = getText(container, "fullyQualifiedName");
childFQNs.add(containerFQN);
extractContainers(container.path("children"), childFQNs);
}
}
}
}
private static void extractSchemaFields(JsonNode schemaFieldsNode, List<String> childFQNs) {
if (schemaFieldsNode != null && schemaFieldsNode.isArray()) {
for (JsonNode field : schemaFieldsNode) {
if (field != null) {
String fieldFQN = getText(field, "fullyQualifiedName");
childFQNs.add(fieldFQN);
extractSchemaFields(field.path("children"), childFQNs);
} }
} }
} }
return mappings;
} }
private String getStringOrNull(HashMap map, String key) { private String getStringOrNull(HashMap map, String key) {