mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-30 20:59:57 +00:00
* Add CSV support for bulk edits * Add CSV support for bulk edits - updates to the previous patch * APIs for Glossary bulk export/import support
This commit is contained in:
parent
d6a8646fa8
commit
240920fa9f
@ -410,6 +410,11 @@
|
||||
<artifactId>diff-match-patch</artifactId>
|
||||
<version>${diffMatch.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-csv</artifactId>
|
||||
<version>1.9.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -0,0 +1,129 @@
|
||||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.csv;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOf;
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.csv.CSVFormat.Builder;
|
||||
import org.apache.commons.csv.CSVPrinter;
|
||||
import org.apache.commons.csv.CSVRecord;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.csv.CsvFile;
|
||||
import org.openmetadata.schema.type.csv.CsvHeader;
|
||||
|
||||
public final class CsvUtil {
|
||||
public static String SEPARATOR = ",";
|
||||
public static String FIELD_SEPARATOR = ";";
|
||||
public static String LINE_SEPARATOR = "\r\n";
|
||||
|
||||
private CsvUtil() {
|
||||
// Utility class hides the constructor
|
||||
}
|
||||
|
||||
public static String formatCsv(CsvFile csvFile) throws IOException {
|
||||
// CSV file is generated by the backend and the data exported is expected to be correct. Hence, no validation
|
||||
StringWriter writer = new StringWriter();
|
||||
List<String> headers = getHeaders(csvFile.getHeaders());
|
||||
CSVFormat csvFormat = Builder.create(CSVFormat.DEFAULT).setHeader(headers.toArray(new String[0])).build();
|
||||
try (CSVPrinter printer = new CSVPrinter(writer, csvFormat)) {
|
||||
for (List<String> record : listOrEmpty(csvFile.getRecords())) {
|
||||
printer.printRecord(record);
|
||||
}
|
||||
}
|
||||
return writer.toString();
|
||||
}
|
||||
|
||||
/** Get headers from CsvHeaders */
|
||||
public static List<String> getHeaders(List<CsvHeader> csvHeaders) {
|
||||
List<String> headers = new ArrayList<>();
|
||||
for (CsvHeader header : csvHeaders) {
|
||||
String headerString = header.getRequired() ? String.format("%s*", header.getName()) : header.getName();
|
||||
headers.add(quoteField(headerString));
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
|
||||
public static String recordToString(CSVRecord record) {
|
||||
return recordToString(record.toList());
|
||||
}
|
||||
|
||||
public static String recordToString(List<String> fields) {
|
||||
return String.join(SEPARATOR, fields);
|
||||
}
|
||||
|
||||
public static String recordToString(String[] fields) {
|
||||
return String.join(SEPARATOR, fields);
|
||||
}
|
||||
|
||||
public static List<String> fieldToStrings(String field) {
|
||||
// Split a field that contains multiple strings separated by FIELD_SEPARATOR
|
||||
return field == null ? null : listOf(field.split(FIELD_SEPARATOR));
|
||||
}
|
||||
|
||||
public static String quote(String field) {
|
||||
return String.format("\"%s\"", field);
|
||||
}
|
||||
|
||||
/** Quote a CSV field that has SEPARATOR with " " */
|
||||
public static String quoteField(String field) {
|
||||
return field == null ? "" : field.contains(SEPARATOR) ? quote(field) : field;
|
||||
}
|
||||
|
||||
/** Quote a CSV field made of multiple strings that has SEPARATOR or FIELD_SEPARATOR with " " */
|
||||
public static String quoteField(List<String> field) {
|
||||
return nullOrEmpty(field)
|
||||
? ""
|
||||
: field.stream()
|
||||
.map(str -> str.contains(SEPARATOR) || str.contains(FIELD_SEPARATOR) ? quote(str) : str)
|
||||
.collect(Collectors.joining(FIELD_SEPARATOR));
|
||||
}
|
||||
|
||||
public static List<String> addField(List<String> record, String field) {
|
||||
record.add(quoteField(field));
|
||||
return record;
|
||||
}
|
||||
|
||||
public static List<String> addFieldList(List<String> record, List<String> field) {
|
||||
record.add(quoteField(field));
|
||||
return record;
|
||||
}
|
||||
|
||||
public static List<String> addEntityReferences(List<String> record, List<EntityReference> refs) {
|
||||
record.add(
|
||||
nullOrEmpty(refs)
|
||||
? null
|
||||
: refs.stream().map(EntityReference::getFullyQualifiedName).collect(Collectors.joining(FIELD_SEPARATOR)));
|
||||
return record;
|
||||
}
|
||||
|
||||
public static List<String> addEntityReference(List<String> record, EntityReference ref) {
|
||||
record.add(nullOrEmpty(ref) ? null : quoteField(ref.getFullyQualifiedName()));
|
||||
return record;
|
||||
}
|
||||
|
||||
public static List<String> addTagLabels(List<String> record, List<TagLabel> tags) {
|
||||
record.add(
|
||||
nullOrEmpty(tags) ? null : tags.stream().map(TagLabel::getTagFQN).collect(Collectors.joining(FIELD_SEPARATOR)));
|
||||
return record;
|
||||
}
|
||||
}
|
@ -0,0 +1,345 @@
|
||||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.csv;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOf;
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
import static org.openmetadata.csv.CsvUtil.FIELD_SEPARATOR;
|
||||
import static org.openmetadata.csv.CsvUtil.recordToString;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Reader;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import javax.ws.rs.core.Response;
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.csv.CSVFormat.Builder;
|
||||
import org.apache.commons.csv.CSVPrinter;
|
||||
import org.apache.commons.csv.CSVRecord;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TagLabel.TagSource;
|
||||
import org.openmetadata.schema.type.csv.CsvErrorType;
|
||||
import org.openmetadata.schema.type.csv.CsvFile;
|
||||
import org.openmetadata.schema.type.csv.CsvHeader;
|
||||
import org.openmetadata.schema.type.csv.CsvImportResult;
|
||||
import org.openmetadata.schema.type.csv.CsvImportResult.Status;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository;
|
||||
import org.openmetadata.service.util.RestUtil.PutResponse;
|
||||
|
||||
/**
|
||||
* EntityCsv provides export and import capabilities for an entity. Each entity must implement the abstract methods to
|
||||
* provide entity specific processing functionality to export an entity to a CSV record, and import an entity from a CSV
|
||||
* record.
|
||||
*/
|
||||
public abstract class EntityCsv<T extends EntityInterface> {
|
||||
public static final String IMPORT_STATUS_HEADER = "status";
|
||||
public static final String IMPORT_STATUS_DETAILS = "details";
|
||||
public static final String IMPORT_STATUS_SUCCESS = "success";
|
||||
public static final String IMPORT_STATUS_FAILED = "failure";
|
||||
private final String entityType;
|
||||
private final List<CsvHeader> csvHeaders;
|
||||
private final CsvImportResult importResult = new CsvImportResult();
|
||||
protected boolean processRecord; // When set to false record processing is discontinued
|
||||
public static final String ENTITY_CREATED = "Entity created";
|
||||
public static final String ENTITY_UPDATED = "Entity updated";
|
||||
private final Map<String, T> dryRunCreatedEntities = new HashMap<>();
|
||||
private final String user;
|
||||
|
||||
protected EntityCsv(String entityType, List<CsvHeader> csvHeaders, String user) {
|
||||
this.entityType = entityType;
|
||||
this.csvHeaders = csvHeaders;
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
// Import entities from the CSV file
|
||||
public final CsvImportResult importCsv(String csv, boolean dryRun) throws IOException {
|
||||
importResult.withDryRun(dryRun);
|
||||
StringWriter writer = new StringWriter();
|
||||
CSVPrinter resultsPrinter = getResultsCsv(csvHeaders, writer);
|
||||
if (resultsPrinter == null) {
|
||||
return importResult;
|
||||
}
|
||||
|
||||
// Parse CSV
|
||||
Iterator<CSVRecord> records = parse(csv);
|
||||
if (records == null) {
|
||||
return importResult; // Error during parsing
|
||||
}
|
||||
|
||||
// Validate headers
|
||||
List<String> expectedHeaders = CsvUtil.getHeaders(csvHeaders);
|
||||
if (!validateHeaders(expectedHeaders, records.next())) {
|
||||
return importResult;
|
||||
}
|
||||
importResult.withNumberOfRowsPassed(importResult.getNumberOfRowsPassed() + 1);
|
||||
|
||||
// Validate and load each record
|
||||
while (records.hasNext()) {
|
||||
CSVRecord record = records.next();
|
||||
processRecord(resultsPrinter, expectedHeaders, record);
|
||||
}
|
||||
|
||||
// Finally, create the entities parsed from the record
|
||||
setFinalStatus();
|
||||
importResult.withImportResultsCsv(writer.toString());
|
||||
return importResult;
|
||||
}
|
||||
|
||||
/** Implement this method to validate each record */
|
||||
protected abstract T toEntity(CSVPrinter resultsPrinter, CSVRecord record) throws IOException;
|
||||
|
||||
public final String exportCsv(List<T> entities) throws IOException {
|
||||
CsvFile csvFile = new CsvFile().withHeaders(csvHeaders);
|
||||
List<List<String>> records = new ArrayList<>();
|
||||
for (T entity : entities) {
|
||||
records.add(toRecord(entity));
|
||||
}
|
||||
csvFile.withRecords(records);
|
||||
return CsvUtil.formatCsv(csvFile);
|
||||
}
|
||||
|
||||
/** Implement this method to turn an entity into a list of fields */
|
||||
protected abstract List<String> toRecord(T entity);
|
||||
|
||||
protected final EntityReference getEntityReference(
|
||||
CSVPrinter printer, CSVRecord record, int fieldNumber, String entityType) throws IOException {
|
||||
String fqn = record.get(fieldNumber);
|
||||
return getEntityReference(printer, record, fieldNumber, entityType, fqn);
|
||||
}
|
||||
|
||||
private EntityInterface getEntity(String entityType, String fqn) {
|
||||
EntityInterface entity = entityType.equals(this.entityType) ? dryRunCreatedEntities.get(fqn) : null;
|
||||
if (entity == null) {
|
||||
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
|
||||
entity = entityRepository.findByNameOrNull(fqn, "", Include.NON_DELETED);
|
||||
}
|
||||
return entity;
|
||||
}
|
||||
|
||||
protected final EntityReference getEntityReference(
|
||||
CSVPrinter printer, CSVRecord record, int fieldNumber, String entityType, String fqn) throws IOException {
|
||||
if (nullOrEmpty(fqn)) {
|
||||
return null;
|
||||
}
|
||||
EntityInterface entity = getEntity(entityType, fqn);
|
||||
if (entity == null) {
|
||||
importFailure(printer, entityNotFound(fieldNumber, fqn), record);
|
||||
processRecord = false;
|
||||
return null;
|
||||
}
|
||||
return entity.getEntityReference();
|
||||
}
|
||||
|
||||
protected final List<EntityReference> getEntityReferences(
|
||||
CSVPrinter printer, CSVRecord record, int fieldNumber, String entityType) throws IOException {
|
||||
String fqns = record.get(fieldNumber);
|
||||
if (nullOrEmpty(fqns)) {
|
||||
return null;
|
||||
}
|
||||
List<String> fqnList = listOrEmpty(CsvUtil.fieldToStrings(fqns));
|
||||
List<EntityReference> refs = new ArrayList<>();
|
||||
for (String fqn : fqnList) {
|
||||
EntityReference ref = getEntityReference(printer, record, fieldNumber, entityType, fqn);
|
||||
if (!processRecord) {
|
||||
return null;
|
||||
}
|
||||
if (ref != null) {
|
||||
refs.add(ref);
|
||||
}
|
||||
}
|
||||
return refs.isEmpty() ? null : refs;
|
||||
}
|
||||
|
||||
protected final List<TagLabel> getTagLabels(CSVPrinter printer, CSVRecord record, int fieldNumber)
|
||||
throws IOException {
|
||||
List<EntityReference> refs = getEntityReferences(printer, record, fieldNumber, Entity.TAG);
|
||||
if (!processRecord || nullOrEmpty(refs)) {
|
||||
return null;
|
||||
}
|
||||
List<TagLabel> tagLabels = new ArrayList<>();
|
||||
for (EntityReference ref : refs) {
|
||||
tagLabels.add(new TagLabel().withSource(TagSource.TAG).withTagFQN(ref.getFullyQualifiedName()));
|
||||
}
|
||||
return tagLabels;
|
||||
}
|
||||
|
||||
public static String[] getResultHeaders(List<CsvHeader> csvHeaders) {
|
||||
List<String> importResultsCsvHeader = listOf(IMPORT_STATUS_HEADER, IMPORT_STATUS_DETAILS);
|
||||
importResultsCsvHeader.addAll(CsvUtil.getHeaders(csvHeaders));
|
||||
return importResultsCsvHeader.toArray(new String[0]);
|
||||
}
|
||||
|
||||
// Create a CSVPrinter to capture the import results
|
||||
private CSVPrinter getResultsCsv(List<CsvHeader> csvHeaders, StringWriter writer) {
|
||||
CSVFormat format = Builder.create(CSVFormat.DEFAULT).setHeader(getResultHeaders(csvHeaders)).build();
|
||||
try {
|
||||
return new CSVPrinter(writer, format);
|
||||
} catch (IOException e) {
|
||||
documentFailure(failed(e.getMessage(), CsvErrorType.UNKNOWN));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Iterator<CSVRecord> parse(String csv) {
|
||||
Reader in = new StringReader(csv);
|
||||
try {
|
||||
return CSVFormat.DEFAULT.parse(in).iterator();
|
||||
} catch (IOException e) {
|
||||
documentFailure(failed(e.getMessage(), CsvErrorType.PARSER_FAILURE));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean validateHeaders(List<String> expectedHeaders, CSVRecord record) {
|
||||
importResult.withNumberOfRowsProcessed((int) record.getRecordNumber());
|
||||
if (expectedHeaders.equals(record.toList())) {
|
||||
return true;
|
||||
}
|
||||
importResult.withNumberOfRowsFailed(1);
|
||||
documentFailure(invalidHeader(recordToString(expectedHeaders), recordToString(record)));
|
||||
return false;
|
||||
}
|
||||
|
||||
private void processRecord(CSVPrinter resultsPrinter, List<String> expectedHeader, CSVRecord record)
|
||||
throws IOException {
|
||||
processRecord = true;
|
||||
// Every row must have total fields corresponding to the number of headers
|
||||
if (csvHeaders.size() != record.size()) {
|
||||
importFailure(resultsPrinter, invalidFieldCount(expectedHeader.size(), record.size()), record);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if required values are present
|
||||
List<String> errors = new ArrayList<>();
|
||||
for (int i = 0; i < csvHeaders.size(); i++) {
|
||||
String field = record.get(i);
|
||||
boolean fieldRequired = Boolean.TRUE.equals(csvHeaders.get(i).getRequired());
|
||||
if (fieldRequired && nullOrEmpty(field)) {
|
||||
errors.add(fieldRequired(i));
|
||||
}
|
||||
}
|
||||
|
||||
if (!errors.isEmpty()) {
|
||||
importFailure(resultsPrinter, String.join(FIELD_SEPARATOR, errors), record);
|
||||
return;
|
||||
}
|
||||
|
||||
// Finally, convert record into entity for importing
|
||||
T entity = toEntity(resultsPrinter, record);
|
||||
if (entity != null) {
|
||||
// Finally, create entities
|
||||
createEntity(resultsPrinter, record, entity);
|
||||
}
|
||||
}
|
||||
|
||||
private void createEntity(CSVPrinter resultsPrinter, CSVRecord record, T entity) throws IOException {
|
||||
entity.setId(UUID.randomUUID());
|
||||
entity.setUpdatedBy(user);
|
||||
entity.setUpdatedAt(System.currentTimeMillis());
|
||||
EntityRepository<EntityInterface> repository = Entity.getEntityRepository(entityType);
|
||||
Response.Status responseStatus;
|
||||
if (!importResult.getDryRun()) {
|
||||
try {
|
||||
repository.prepareInternal(entity);
|
||||
PutResponse<EntityInterface> response = repository.createOrUpdate(null, entity);
|
||||
responseStatus = response.getStatus();
|
||||
} catch (Exception ex) {
|
||||
importFailure(resultsPrinter, ex.getMessage(), record);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
repository.setFullyQualifiedName(entity);
|
||||
responseStatus =
|
||||
repository.findByNameOrNull(entity.getFullyQualifiedName(), "", Include.NON_DELETED) == null
|
||||
? Response.Status.CREATED
|
||||
: Response.Status.OK;
|
||||
// Track the dryRun created entities, as they may be referred by other entities being created during import
|
||||
dryRunCreatedEntities.put(entity.getFullyQualifiedName(), entity);
|
||||
}
|
||||
|
||||
if (Response.Status.CREATED.equals(responseStatus)) {
|
||||
importSuccess(resultsPrinter, record, ENTITY_CREATED);
|
||||
} else {
|
||||
importSuccess(resultsPrinter, record, ENTITY_UPDATED);
|
||||
}
|
||||
}
|
||||
|
||||
public String failed(String exception, CsvErrorType errorType) {
|
||||
return String.format("#%s: Failed to parse the CSV filed - reason %s", errorType, exception);
|
||||
}
|
||||
|
||||
public static String invalidHeader(String expected, String actual) {
|
||||
return String.format("#%s: Headers [%s] doesn't match [%s]", CsvErrorType.INVALID_HEADER, actual, expected);
|
||||
}
|
||||
|
||||
public static String invalidFieldCount(int expectedFieldCount, int actualFieldCount) {
|
||||
return String.format(
|
||||
"#%s: Field count %d does not match the expected field count of %d",
|
||||
CsvErrorType.INVALID_FIELD_COUNT, actualFieldCount, expectedFieldCount);
|
||||
}
|
||||
|
||||
public static String fieldRequired(int field) {
|
||||
return String.format("#%s: Field %d is required", CsvErrorType.FIELD_REQUIRED, field + 1);
|
||||
}
|
||||
|
||||
public static String invalidField(int field, String error) {
|
||||
return String.format("#%s: Field %d error - %s", CsvErrorType.INVALID_FIELD, field + 1, error);
|
||||
}
|
||||
|
||||
public static String entityNotFound(int field, String fqn) {
|
||||
String error = String.format("Entity %s not found", fqn);
|
||||
return String.format("#%s: Field %d error - %s", CsvErrorType.INVALID_FIELD, field + 1, error);
|
||||
}
|
||||
|
||||
private void documentFailure(String error) {
|
||||
importResult.withStatus(Status.ABORTED);
|
||||
importResult.withAbortReason(error);
|
||||
}
|
||||
|
||||
private void importSuccess(CSVPrinter printer, CSVRecord inputRecord, String successDetails) throws IOException {
|
||||
List<String> record = listOf(IMPORT_STATUS_SUCCESS, successDetails);
|
||||
record.addAll(inputRecord.toList());
|
||||
printer.printRecord(record);
|
||||
importResult.withNumberOfRowsProcessed((int) inputRecord.getRecordNumber());
|
||||
importResult.withNumberOfRowsPassed(importResult.getNumberOfRowsPassed() + 1);
|
||||
}
|
||||
|
||||
protected void importFailure(CSVPrinter printer, String failedReason, CSVRecord inputRecord) throws IOException {
|
||||
List<String> record = listOf(IMPORT_STATUS_FAILED, failedReason);
|
||||
record.addAll(inputRecord.toList());
|
||||
printer.printRecord(record);
|
||||
importResult.withNumberOfRowsProcessed((int) inputRecord.getRecordNumber());
|
||||
importResult.withNumberOfRowsFailed(importResult.getNumberOfRowsFailed() + 1);
|
||||
processRecord = false;
|
||||
}
|
||||
|
||||
private void setFinalStatus() {
|
||||
Status status =
|
||||
importResult.getNumberOfRowsPassed().equals(importResult.getNumberOfRowsProcessed())
|
||||
? Status.SUCCESS
|
||||
: importResult.getNumberOfRowsPassed() > 1 ? Status.PARTIAL_SUCCESS : Status.FAILURE;
|
||||
importResult.setStatus(status);
|
||||
}
|
||||
}
|
@ -197,4 +197,8 @@ public final class CatalogExceptionMessage {
|
||||
"Tag labels %s and %s are mutually exclusive and can't be assigned together",
|
||||
tag1.getTagFQN(), tag2.getTagFQN());
|
||||
}
|
||||
|
||||
public static String csvNotSupported(String entityType) {
|
||||
return String.format("Upload/download CSV for bulk operations is not supported for entity [%s]", entityType);
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ import static org.openmetadata.service.Entity.FIELD_FOLLOWERS;
|
||||
import static org.openmetadata.service.Entity.FIELD_OWNER;
|
||||
import static org.openmetadata.service.Entity.FIELD_TAGS;
|
||||
import static org.openmetadata.service.Entity.getEntityFields;
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.csvNotSupported;
|
||||
import static org.openmetadata.service.util.EntityUtil.compareTagLabel;
|
||||
import static org.openmetadata.service.util.EntityUtil.entityReferenceMatch;
|
||||
import static org.openmetadata.service.util.EntityUtil.fieldAdded;
|
||||
@ -79,6 +80,7 @@ import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.ProviderType;
|
||||
import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.csv.CsvImportResult;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.TypeRegistry;
|
||||
@ -326,9 +328,25 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public final T findByNameOrNull(String fqn, String fields, Include include) throws IOException {
|
||||
public final T findByNameOrNull(String fqn, String fields, Include include) {
|
||||
String json = dao.findJsonByFqn(fqn, include);
|
||||
return json == null ? null : setFieldsInternal(JsonUtils.readValue(json, entityClass), getFields(fields));
|
||||
try {
|
||||
return json == null ? null : setFieldsInternal(JsonUtils.readValue(json, entityClass), getFields(fields));
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Transaction
|
||||
public final List<T> listAll(Fields fields, ListFilter filter) throws IOException {
|
||||
// forward scrolling, if after == null then first page is being asked
|
||||
List<String> jsons = dao.listAfter(filter, Integer.MAX_VALUE, "");
|
||||
List<T> entities = new ArrayList<>();
|
||||
for (String json : jsons) {
|
||||
T entity = setFieldsInternal(JsonUtils.readValue(json, entityClass), fields);
|
||||
entities.add(entity);
|
||||
}
|
||||
return entities;
|
||||
}
|
||||
|
||||
@Transaction
|
||||
@ -1162,6 +1180,16 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
return Entity.getEntityReferenceById(owner.getType(), owner.getId(), ALL);
|
||||
}
|
||||
|
||||
/** Override this method to support downloading CSV functionality */
|
||||
public String exportToCsv(String name, String user) throws IOException {
|
||||
throw new IllegalArgumentException(csvNotSupported(entityType));
|
||||
}
|
||||
|
||||
/** Load CSV provided for bulk upload */
|
||||
public CsvImportResult importFromCsv(String name, String csv, boolean dryRun, String user) throws IOException {
|
||||
throw new IllegalArgumentException(csvNotSupported(entityType));
|
||||
}
|
||||
|
||||
public enum Operation {
|
||||
PUT,
|
||||
PATCH,
|
||||
@ -1541,8 +1569,6 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
String toEntityType,
|
||||
UUID toId)
|
||||
throws JsonProcessingException {
|
||||
List<EntityReference> added = new ArrayList<>();
|
||||
List<EntityReference> deleted = new ArrayList<>();
|
||||
if (!recordChange(field, originFromRef, updatedFromRef, true, entityReferenceMatch)) {
|
||||
return; // No changes between original and updated.
|
||||
}
|
||||
|
@ -17,17 +17,36 @@
|
||||
package org.openmetadata.service.jdbi3;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
import static org.openmetadata.csv.CsvUtil.addEntityReference;
|
||||
import static org.openmetadata.csv.CsvUtil.addEntityReferences;
|
||||
import static org.openmetadata.csv.CsvUtil.addField;
|
||||
import static org.openmetadata.csv.CsvUtil.addTagLabels;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.csv.CSVPrinter;
|
||||
import org.apache.commons.csv.CSVRecord;
|
||||
import org.openmetadata.csv.CsvUtil;
|
||||
import org.openmetadata.csv.EntityCsv;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.api.data.TermReference;
|
||||
import org.openmetadata.schema.entity.data.Glossary;
|
||||
import org.openmetadata.schema.entity.data.GlossaryTerm;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.ProviderType;
|
||||
import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TagLabel.TagSource;
|
||||
import org.openmetadata.schema.type.csv.CsvHeader;
|
||||
import org.openmetadata.schema.type.csv.CsvImportResult;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
|
||||
@ -96,6 +115,123 @@ public class GlossaryRepository extends EntityRepository<Glossary> {
|
||||
return new GlossaryUpdater(original, updated, operation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String exportToCsv(String name, String user) throws IOException {
|
||||
Glossary glossary = getByName(null, name, Fields.EMPTY_FIELDS); // Validate glossary name
|
||||
EntityRepository<GlossaryTerm> repository = Entity.getEntityRepository(Entity.GLOSSARY_TERM);
|
||||
ListFilter filter = new ListFilter(Include.NON_DELETED).addQueryParam("parent", name);
|
||||
List<GlossaryTerm> terms = repository.listAll(repository.getFields("reviewers,tags,relatedTerms"), filter);
|
||||
terms.sort(Comparator.comparing(EntityInterface::getFullyQualifiedName));
|
||||
return new GlossaryCsv(glossary, user).exportCsv(terms);
|
||||
}
|
||||
|
||||
/** Load CSV provided for bulk upload */
|
||||
@Override
|
||||
public CsvImportResult importFromCsv(String name, String csv, boolean dryRun, String user) throws IOException {
|
||||
Glossary glossary = getByName(null, name, Fields.EMPTY_FIELDS); // Validate glossary name
|
||||
GlossaryCsv glossaryCsv = new GlossaryCsv(glossary, user);
|
||||
return glossaryCsv.importCsv(csv, dryRun);
|
||||
}
|
||||
|
||||
public static class GlossaryCsv extends EntityCsv<GlossaryTerm> {
|
||||
public static final List<CsvHeader> HEADERS = new ArrayList<>();
|
||||
private final Glossary glossary;
|
||||
|
||||
static {
|
||||
HEADERS.add(new CsvHeader().withName("parent").withRequired(false));
|
||||
HEADERS.add(new CsvHeader().withName("name").withRequired(true));
|
||||
HEADERS.add(new CsvHeader().withName("displayName").withRequired(false));
|
||||
HEADERS.add(new CsvHeader().withName("description").withRequired(true));
|
||||
HEADERS.add(new CsvHeader().withName("synonyms").withRequired(false));
|
||||
HEADERS.add(new CsvHeader().withName("relatedTerms").withRequired(false));
|
||||
HEADERS.add(new CsvHeader().withName("references").withRequired(false));
|
||||
HEADERS.add(new CsvHeader().withName("tags").withRequired(false));
|
||||
}
|
||||
|
||||
GlossaryCsv(Glossary glossary, String user) {
|
||||
super(Entity.GLOSSARY_TERM, HEADERS, user);
|
||||
this.glossary = glossary;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GlossaryTerm toEntity(CSVPrinter printer, CSVRecord record) throws IOException {
|
||||
GlossaryTerm glossaryTerm = new GlossaryTerm().withGlossary(glossary.getEntityReference());
|
||||
|
||||
// Field 1 - parent term
|
||||
glossaryTerm.withParent(getEntityReference(printer, record, 0, Entity.GLOSSARY_TERM));
|
||||
if (!processRecord) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Field 2,3,4 - Glossary name, displayName, description
|
||||
glossaryTerm.withName(record.get(1)).withDisplayName(record.get(2)).withDescription(record.get(3));
|
||||
|
||||
// Field 5 - Synonym list
|
||||
glossaryTerm.withSynonyms(CsvUtil.fieldToStrings(record.get(4)));
|
||||
|
||||
// Field 6 - Related terms
|
||||
glossaryTerm.withRelatedTerms(getEntityReferences(printer, record, 5, Entity.GLOSSARY_TERM));
|
||||
if (!processRecord) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Field 7 - TermReferences
|
||||
glossaryTerm.withReferences(getTermReferences(printer, record, 6));
|
||||
if (!processRecord) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Field 8 - tags
|
||||
glossaryTerm.withTags(getTagLabels(printer, record, 7));
|
||||
if (!processRecord) {
|
||||
return null;
|
||||
}
|
||||
return glossaryTerm;
|
||||
}
|
||||
|
||||
private List<TermReference> getTermReferences(CSVPrinter printer, CSVRecord record, int fieldNumber)
|
||||
throws IOException {
|
||||
String termRefs = record.get(fieldNumber);
|
||||
if (nullOrEmpty(termRefs)) {
|
||||
return null;
|
||||
}
|
||||
List<String> termRefList = CsvUtil.fieldToStrings(termRefs);
|
||||
if (termRefList.size() % 2 != 0) {
|
||||
// List should have even numbered terms - termName and endPoint
|
||||
importFailure(printer, invalidField(fieldNumber, "Term references should termName;endpoint"), record);
|
||||
processRecord = false;
|
||||
return null;
|
||||
}
|
||||
List<TermReference> list = new ArrayList<>();
|
||||
for (int i = 0; i < termRefList.size(); ) {
|
||||
list.add(new TermReference().withName(termRefList.get(i++)).withEndpoint(URI.create(termRefList.get(i++))));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> toRecord(GlossaryTerm entity) {
|
||||
List<String> record = new ArrayList<>();
|
||||
addEntityReference(record, entity.getParent());
|
||||
addField(record, entity.getName());
|
||||
addField(record, entity.getDisplayName());
|
||||
addField(record, entity.getDescription());
|
||||
CsvUtil.addFieldList(record, entity.getSynonyms());
|
||||
addEntityReferences(record, entity.getRelatedTerms());
|
||||
addField(record, termReferencesToRecord(entity.getReferences()));
|
||||
addTagLabels(record, entity.getTags());
|
||||
return record;
|
||||
}
|
||||
|
||||
private String termReferencesToRecord(List<TermReference> list) {
|
||||
return nullOrEmpty(list)
|
||||
? null
|
||||
: list.stream()
|
||||
.map(termReference -> termReference.getName() + CsvUtil.FIELD_SEPARATOR + termReference.getEndpoint())
|
||||
.collect(Collectors.joining(";"));
|
||||
}
|
||||
}
|
||||
|
||||
private List<EntityReference> getReviewers(Glossary entity) throws IOException {
|
||||
List<EntityRelationshipRecord> ids = findFrom(entity.getId(), Entity.GLOSSARY, Relationship.REVIEWS, Entity.USER);
|
||||
return EntityUtil.populateEntityReferences(ids, Entity.USER);
|
||||
|
@ -19,6 +19,7 @@ import org.openmetadata.schema.type.EntityHistory;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.MetadataOperation;
|
||||
import org.openmetadata.schema.type.csv.CsvImportResult;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository;
|
||||
@ -252,6 +253,19 @@ public abstract class EntityResource<T extends EntityInterface, K extends Entity
|
||||
return Response.ok(entity.getHref()).entity(entity).build();
|
||||
}
|
||||
|
||||
public String exportCsvInternal(UriInfo uriInfo, SecurityContext securityContext, String name) throws IOException {
|
||||
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.VIEW_ALL);
|
||||
authorizer.authorize(securityContext, operationContext, getResourceContextByName(name));
|
||||
return dao.exportToCsv(name, securityContext.getUserPrincipal().getName());
|
||||
}
|
||||
|
||||
protected CsvImportResult importCsvInternal(
|
||||
UriInfo uriInfo, SecurityContext securityContext, String name, String csv, boolean dryRun) throws IOException {
|
||||
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL);
|
||||
authorizer.authorize(securityContext, operationContext, getResourceContextByName(name));
|
||||
return dao.importFromCsv(name, csv, dryRun, securityContext.getUserPrincipal().getName());
|
||||
}
|
||||
|
||||
public T copy(T entity, CreateEntity request, String updatedBy) throws IOException {
|
||||
EntityReference owner = dao.validateOwner(request.getOwner());
|
||||
entity.setId(UUID.randomUUID());
|
||||
|
@ -50,12 +50,14 @@ import org.openmetadata.schema.api.data.RestoreEntity;
|
||||
import org.openmetadata.schema.entity.data.Glossary;
|
||||
import org.openmetadata.schema.type.EntityHistory;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.csv.CsvImportResult;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.jdbi3.GlossaryRepository;
|
||||
import org.openmetadata.service.jdbi3.ListFilter;
|
||||
import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.resources.EntityResource;
|
||||
import org.openmetadata.service.resources.glossary.GlossaryTermResource.GlossaryTermList;
|
||||
import org.openmetadata.service.security.Authorizer;
|
||||
import org.openmetadata.service.util.RestUtil;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
@ -365,6 +367,58 @@ public class GlossaryResource extends EntityResource<Glossary, GlossaryRepositor
|
||||
return restoreEntity(uriInfo, securityContext, restore.getId());
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/name/{name}/export")
|
||||
@Produces(MediaType.TEXT_PLAIN)
|
||||
@Valid
|
||||
@Operation(
|
||||
operationId = "exportGlossary",
|
||||
summary = "Export glossary in CSV format",
|
||||
tags = "glossaries",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "List of glossary terms",
|
||||
content =
|
||||
@Content(mediaType = "application/json", schema = @Schema(implementation = GlossaryTermList.class)))
|
||||
})
|
||||
public String exportCsv(
|
||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("name") String name)
|
||||
throws IOException {
|
||||
return super.exportCsvInternal(uriInfo, securityContext, name);
|
||||
}
|
||||
|
||||
@PUT
|
||||
@Path("/name/{name}/import")
|
||||
@Consumes(MediaType.TEXT_PLAIN)
|
||||
@Valid
|
||||
@Operation(
|
||||
operationId = "importGlossary",
|
||||
summary = "Import glossary terms from CSV to create, and update glossary terms.",
|
||||
tags = "glossaries",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "List of glossary terms",
|
||||
content =
|
||||
@Content(mediaType = "application/json", schema = @Schema(implementation = GlossaryTermList.class)))
|
||||
})
|
||||
public CsvImportResult importCsv(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@PathParam("name") String name,
|
||||
@Parameter(
|
||||
description =
|
||||
"Dry-run when true is used for validating the CSV without really importing it. (default=true)",
|
||||
schema = @Schema(type = "boolean"))
|
||||
@DefaultValue("true")
|
||||
@QueryParam("dryRun")
|
||||
boolean dryRun,
|
||||
String csv)
|
||||
throws IOException {
|
||||
return super.importCsvInternal(uriInfo, securityContext, name, csv, dryRun);
|
||||
}
|
||||
|
||||
private Glossary getGlossary(CreateGlossary create, String user) throws IOException {
|
||||
return copy(new Glossary(), create, user)
|
||||
.withReviewers(create.getReviewers())
|
||||
|
@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.csv;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOf;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
|
||||
class CsvUtilTest {
|
||||
@Test
|
||||
void testQuoteField() {
|
||||
// Single string field related tests
|
||||
assertEquals("abc", CsvUtil.quoteField("abc")); // Strings without separator is not quoted
|
||||
assertEquals("\"a,bc\"", CsvUtil.quoteField("a,bc")); // Strings with separator are quoted
|
||||
|
||||
// List of strings in a field related tests
|
||||
assertEquals("abc;def;ghi", CsvUtil.quoteField(listOf("abc", "def", "ghi")));
|
||||
assertEquals("\"a;bc\";\"d,ef\";ghi", CsvUtil.quoteField(listOf("a;bc", "d,ef", "ghi")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAddRecord() {
|
||||
List<String> expectedRecord = new ArrayList<>();
|
||||
List<String> actualRecord = new ArrayList<>();
|
||||
|
||||
// Add string
|
||||
expectedRecord.add("");
|
||||
assertEquals(expectedRecord, CsvUtil.addField(actualRecord, null));
|
||||
|
||||
expectedRecord.add("abc");
|
||||
assertEquals(expectedRecord, CsvUtil.addField(actualRecord, "abc"));
|
||||
|
||||
// Add list of strings
|
||||
expectedRecord.add("");
|
||||
assertEquals(expectedRecord, CsvUtil.addFieldList(actualRecord, null));
|
||||
|
||||
expectedRecord.add("def;ghi");
|
||||
assertEquals(expectedRecord, CsvUtil.addFieldList(actualRecord, listOf("def", "ghi")));
|
||||
|
||||
// Add entity reference
|
||||
expectedRecord.add("");
|
||||
assertEquals(expectedRecord, CsvUtil.addEntityReference(actualRecord, null)); // Null entity reference
|
||||
|
||||
expectedRecord.add("fqn");
|
||||
assertEquals(
|
||||
expectedRecord, CsvUtil.addEntityReference(actualRecord, new EntityReference().withFullyQualifiedName("fqn")));
|
||||
|
||||
// Add entity references
|
||||
expectedRecord.add("");
|
||||
assertEquals(expectedRecord, CsvUtil.addEntityReferences(actualRecord, null)); // Null entity references
|
||||
|
||||
expectedRecord.add("fqn1;fqn2");
|
||||
List<EntityReference> refs =
|
||||
listOf(
|
||||
new EntityReference().withFullyQualifiedName("fqn1"), new EntityReference().withFullyQualifiedName("fqn2"));
|
||||
assertEquals(expectedRecord, CsvUtil.addEntityReferences(actualRecord, refs));
|
||||
|
||||
// Add tag labels
|
||||
expectedRecord.add("");
|
||||
assertEquals(expectedRecord, CsvUtil.addTagLabels(actualRecord, null)); // Null entity references
|
||||
|
||||
expectedRecord.add("t1;t2");
|
||||
List<TagLabel> tags = listOf(new TagLabel().withTagFQN("t1"), new TagLabel().withTagFQN("t2"));
|
||||
assertEquals(expectedRecord, CsvUtil.addTagLabels(actualRecord, tags));
|
||||
}
|
||||
}
|
@ -0,0 +1,173 @@
|
||||
package org.openmetadata.csv;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOf;
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
import static org.openmetadata.csv.CsvUtil.LINE_SEPARATOR;
|
||||
import static org.openmetadata.csv.CsvUtil.recordToString;
|
||||
import static org.openmetadata.csv.EntityCsv.ENTITY_CREATED;
|
||||
import static org.openmetadata.csv.EntityCsv.ENTITY_UPDATED;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.commons.csv.CSVPrinter;
|
||||
import org.apache.commons.csv.CSVRecord;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.entity.data.Table;
|
||||
import org.openmetadata.schema.type.csv.CsvFile;
|
||||
import org.openmetadata.schema.type.csv.CsvHeader;
|
||||
import org.openmetadata.schema.type.csv.CsvImportResult;
|
||||
import org.openmetadata.schema.type.csv.CsvImportResult.Status;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO.TableDAO;
|
||||
import org.openmetadata.service.jdbi3.TableRepository;
|
||||
|
||||
public class EntityCsvTest {
|
||||
private static final List<CsvHeader> CSV_HEADERS;
|
||||
private static final String HEADER_STRING = "h1*,h2,h3" + LINE_SEPARATOR;
|
||||
|
||||
static {
|
||||
Object[][] headers = {
|
||||
{"h1", Boolean.TRUE},
|
||||
{"h2", Boolean.FALSE},
|
||||
{"h3", Boolean.FALSE}
|
||||
};
|
||||
CSV_HEADERS = getHeaders(headers);
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
public static void setup() {
|
||||
Entity.registerEntity(Table.class, Entity.TABLE, Mockito.mock(TableDAO.class), Mockito.mock(TableRepository.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void test_formatHeader() throws IOException {
|
||||
CsvFile csvFile = new CsvFile();
|
||||
csvFile.withHeaders(CSV_HEADERS);
|
||||
String file = CsvUtil.formatCsv(csvFile);
|
||||
assertEquals(HEADER_STRING, file);
|
||||
}
|
||||
|
||||
@Test
|
||||
void test_validateCsvInvalidHeader() throws IOException {
|
||||
String csv = ",h2,h3" + LINE_SEPARATOR; // Header h1 is missing in the CSV file
|
||||
TestCsv testCsv = new TestCsv(CSV_HEADERS);
|
||||
CsvImportResult importResult = testCsv.importCsv(csv, true);
|
||||
assertSummary(importResult, Status.ABORTED, 1, 0, 1);
|
||||
assertNull(importResult.getImportResultsCsv());
|
||||
assertEquals(TestCsv.invalidHeader("h1*,h2,h3", ",h2,h3"), importResult.getAbortReason());
|
||||
}
|
||||
|
||||
@Test
|
||||
void test_validateCsvInvalidRecords() throws IOException {
|
||||
// Invalid record 2 - Missing required value in h1
|
||||
// Invalid record 3 - Record with only two fields instead of 3
|
||||
List<String> records = listOf(",2,3", "1,2", "1,2,3");
|
||||
String csv = createCsv(CSV_HEADERS, records);
|
||||
|
||||
TestCsv testCsv = new TestCsv(CSV_HEADERS);
|
||||
CsvImportResult importResult = testCsv.importCsv(csv, true);
|
||||
assertSummary(importResult, Status.PARTIAL_SUCCESS, 4, 2, 2);
|
||||
|
||||
String[] expectedRecords = {
|
||||
CsvUtil.recordToString(EntityCsv.getResultHeaders(CSV_HEADERS)),
|
||||
getFailedRecord(",2,3", TestCsv.fieldRequired(0)),
|
||||
getFailedRecord("1,2", TestCsv.invalidFieldCount(3, 2)),
|
||||
getSuccessRecord("1,2,3", ENTITY_CREATED)
|
||||
};
|
||||
|
||||
assertRows(importResult, expectedRecords);
|
||||
}
|
||||
|
||||
public static void assertSummary(
|
||||
CsvImportResult importResult,
|
||||
Status expectedStatus,
|
||||
int expectedRowsProcessed,
|
||||
int expectedRowsPassed,
|
||||
int expectedRowsFailed) {
|
||||
assertEquals(expectedStatus, importResult.getStatus());
|
||||
assertEquals(expectedRowsProcessed, importResult.getNumberOfRowsProcessed());
|
||||
assertEquals(expectedRowsPassed, importResult.getNumberOfRowsPassed());
|
||||
assertEquals(expectedRowsFailed, importResult.getNumberOfRowsFailed());
|
||||
}
|
||||
|
||||
public static void assertRows(CsvImportResult importResult, String... expectedRows) {
|
||||
String[] resultRecords = importResult.getImportResultsCsv().split(LINE_SEPARATOR);
|
||||
assertEquals(expectedRows.length, resultRecords.length);
|
||||
for (int i = 0; i < resultRecords.length; i++) {
|
||||
assertEquals(expectedRows[i], resultRecords[i], "Row number is " + i);
|
||||
}
|
||||
}
|
||||
|
||||
public static String getSuccessRecord(String record, String successDetails) {
|
||||
return String.format("%s,%s,%s", EntityCsv.IMPORT_STATUS_SUCCESS, successDetails, record);
|
||||
}
|
||||
|
||||
public static String getFailedRecord(String record, String errorDetails) {
|
||||
return String.format("%s,\"%s\",%s", EntityCsv.IMPORT_STATUS_FAILED, errorDetails, record);
|
||||
}
|
||||
|
||||
private static List<CsvHeader> getHeaders(Object[][] headers) {
|
||||
List<CsvHeader> csvHeaders = new ArrayList<>();
|
||||
for (Object[] header : headers) {
|
||||
csvHeaders.add(new CsvHeader().withName((String) header[0]).withRequired((Boolean) header[1]));
|
||||
}
|
||||
return csvHeaders;
|
||||
}
|
||||
|
||||
public static String createCsv(List<CsvHeader> csvHeaders, List<String> records) {
|
||||
records.add(0, recordToString(CsvUtil.getHeaders(csvHeaders)));
|
||||
return String.join(LINE_SEPARATOR, records) + LINE_SEPARATOR;
|
||||
}
|
||||
|
||||
public static String createCsv(List<CsvHeader> csvHeaders, List<String> createRecords, List<String> updateRecords) {
|
||||
// Create CSV
|
||||
List<String> csvRecords = new ArrayList<>();
|
||||
if (!nullOrEmpty(createRecords)) {
|
||||
csvRecords.addAll(createRecords);
|
||||
}
|
||||
if (!nullOrEmpty(updateRecords)) {
|
||||
csvRecords.addAll(updateRecords);
|
||||
}
|
||||
return createCsv(csvHeaders, csvRecords);
|
||||
}
|
||||
|
||||
public static String createCsvResult(
|
||||
List<CsvHeader> csvHeaders, List<String> createRecords, List<String> updateRecords) {
|
||||
// Create CSV
|
||||
List<String> csvRecords = new ArrayList<>();
|
||||
csvRecords.add(recordToString(EntityCsv.getResultHeaders(csvHeaders)));
|
||||
if (!nullOrEmpty(createRecords)) {
|
||||
for (String record : createRecords) {
|
||||
csvRecords.add(getSuccessRecord(record, ENTITY_CREATED));
|
||||
}
|
||||
}
|
||||
if (!nullOrEmpty(updateRecords)) {
|
||||
for (String record : updateRecords) {
|
||||
csvRecords.add(getSuccessRecord(record, ENTITY_UPDATED));
|
||||
}
|
||||
}
|
||||
return String.join(LINE_SEPARATOR, csvRecords) + LINE_SEPARATOR;
|
||||
}
|
||||
|
||||
private static class TestCsv extends EntityCsv<EntityInterface> {
|
||||
protected TestCsv(List<CsvHeader> csvHeaders) {
|
||||
super(Entity.TABLE, csvHeaders, "admin");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EntityInterface toEntity(CSVPrinter resultsPrinter, CSVRecord record) {
|
||||
return new Table(); // Return a random entity to mark successfully processing a record
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> toRecord(EntityInterface entity) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
@ -16,9 +16,17 @@
|
||||
|
||||
package org.openmetadata.service.resources.glossary;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOf;
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||
import static org.openmetadata.csv.CsvUtil.recordToString;
|
||||
import static org.openmetadata.csv.EntityCsv.entityNotFound;
|
||||
import static org.openmetadata.csv.EntityCsvTest.assertRows;
|
||||
import static org.openmetadata.csv.EntityCsvTest.assertSummary;
|
||||
import static org.openmetadata.csv.EntityCsvTest.createCsv;
|
||||
import static org.openmetadata.csv.EntityCsvTest.getFailedRecord;
|
||||
import static org.openmetadata.schema.type.ProviderType.SYSTEM;
|
||||
import static org.openmetadata.service.util.EntityUtil.fieldAdded;
|
||||
import static org.openmetadata.service.util.EntityUtil.fieldDeleted;
|
||||
@ -36,6 +44,7 @@ import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.http.client.HttpResponseException;
|
||||
@ -43,7 +52,8 @@ import org.junit.jupiter.api.MethodOrderer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInfo;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
import org.openmetadata.csv.EntityCsv;
|
||||
import org.openmetadata.csv.EntityCsvTest;
|
||||
import org.openmetadata.schema.api.data.CreateGlossary;
|
||||
import org.openmetadata.schema.api.data.CreateGlossaryTerm;
|
||||
import org.openmetadata.schema.api.data.CreateTable;
|
||||
@ -56,8 +66,11 @@ import org.openmetadata.schema.type.ColumnDataType;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.ProviderType;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.csv.CsvHeader;
|
||||
import org.openmetadata.schema.type.csv.CsvImportResult;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.jdbi3.GlossaryRepository.GlossaryCsv;
|
||||
import org.openmetadata.service.resources.EntityResourceTest;
|
||||
import org.openmetadata.service.resources.databases.TableResourceTest;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
@ -177,7 +190,7 @@ public class GlossaryResourceTest extends EntityResourceTest<Glossary, CreateGlo
|
||||
List<TagLabel> tagLabels = toTagLabels(t1, t11, t12, t2, t21, t22);
|
||||
Column column = new Column().withName("c1").withDataType(ColumnDataType.INT).withTags(tagLabels);
|
||||
CreateTable createTable =
|
||||
tableResourceTest.createRequest(getEntityName(test)).withTags(tagLabels).withColumns(CommonUtil.listOf(column));
|
||||
tableResourceTest.createRequest(getEntityName(test)).withTags(tagLabels).withColumns(listOf(column));
|
||||
Table table = tableResourceTest.createEntity(createTable, ADMIN_AUTH_HEADERS);
|
||||
|
||||
//
|
||||
@ -234,7 +247,7 @@ public class GlossaryResourceTest extends EntityResourceTest<Glossary, CreateGlo
|
||||
List<TagLabel> tagLabels = toTagLabels(t1, t11, t111, t12, t121, t13, t131, t2, t21, t211, h1, h11, h111);
|
||||
Column column = new Column().withName("c1").withDataType(ColumnDataType.INT).withTags(tagLabels);
|
||||
CreateTable createTable =
|
||||
tableResourceTest.createRequest(getEntityName(test)).withTags(tagLabels).withColumns(CommonUtil.listOf(column));
|
||||
tableResourceTest.createRequest(getEntityName(test)).withTags(tagLabels).withColumns(listOf(column));
|
||||
Table table = tableResourceTest.createEntity(createTable, ADMIN_AUTH_HEADERS);
|
||||
|
||||
Object[][] scenarios = {
|
||||
@ -287,6 +300,92 @@ public class GlossaryResourceTest extends EntityResourceTest<Glossary, CreateGlo
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGlossaryImportInvalidCsv() throws IOException {
|
||||
String glossaryName = "invalidCsv";
|
||||
createEntity(createRequest(glossaryName), ADMIN_AUTH_HEADERS);
|
||||
|
||||
// Create glossaryTerm with invalid parent
|
||||
String resultsHeader = recordToString(EntityCsv.getResultHeaders(GlossaryCsv.HEADERS));
|
||||
String record = "invalidParent,g1,dsp1,dsc1,h1;h2;h3,,term1;http://term1,Tier.Tier1";
|
||||
String csv = createCsv(GlossaryCsv.HEADERS, listOf(record), null);
|
||||
CsvImportResult result = importCsv(glossaryName, csv, false);
|
||||
assertSummary(result, CsvImportResult.Status.FAILURE, 2, 1, 1);
|
||||
String[] expectedRows = {resultsHeader, getFailedRecord(record, entityNotFound(0, "invalidParent"))};
|
||||
assertRows(result, expectedRows);
|
||||
|
||||
// Create glossaryTerm with invalid tags field
|
||||
record = "invalidParent,g1,dsp1,dsc1,h1;h2;h3,,term1;http://term1,Tier.Tier1";
|
||||
csv = createCsv(GlossaryCsv.HEADERS, listOf(record), null);
|
||||
result = importCsv(glossaryName, csv, false);
|
||||
assertSummary(result, CsvImportResult.Status.FAILURE, 2, 1, 1);
|
||||
expectedRows = new String[] {resultsHeader, getFailedRecord(record, entityNotFound(0, "invalidParent"))};
|
||||
assertRows(result, expectedRows);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGlossaryImportExport() throws IOException {
|
||||
String glossaryName = "importExportTest";
|
||||
createEntity(createRequest(glossaryName), ADMIN_AUTH_HEADERS);
|
||||
|
||||
// CSV Header "parent" "name" "displayName" "description" "synonyms" "relatedTerms" "references" "tags"
|
||||
// Create two records
|
||||
List<String> createRecords =
|
||||
listOf(
|
||||
",g1,dsp1,dsc1,h1;h2;h3,,term1;http://term1,Tier.Tier1",
|
||||
",g2,dsp2,dsc3,h1;h3;h3,,term2;https://term2,Tier.Tier2");
|
||||
importCsvAndValidate(glossaryName, GlossaryCsv.HEADERS, createRecords, null); // Dry run
|
||||
|
||||
// Update terms with change in description
|
||||
List<String> updateRecords =
|
||||
listOf(
|
||||
",g1,dsp1,new-dsc1,h1;h2;h3,,term1;http://term1,Tier.Tier1",
|
||||
",g2,dsp2,new-dsc3,h1;h3;h3,,term2;https://term2,Tier.Tier2");
|
||||
importCsvAndValidate(glossaryName, GlossaryCsv.HEADERS, null, updateRecords);
|
||||
|
||||
// Add new row to existing rows
|
||||
createRecords = listOf(",g0,dsp0,dsc0,h1;h2;h3,,term0;http://term0,Tier.Tier3");
|
||||
importCsvAndValidate(glossaryName, GlossaryCsv.HEADERS, createRecords, updateRecords);
|
||||
}
|
||||
|
||||
private void importCsvAndValidate(
|
||||
String glossaryName, List<CsvHeader> csvHeaders, List<String> createRecords, List<String> updateRecords)
|
||||
throws HttpResponseException {
|
||||
createRecords = listOrEmpty(createRecords);
|
||||
updateRecords = listOrEmpty(updateRecords);
|
||||
|
||||
// Import CSV with dryRun=true first
|
||||
String csv = EntityCsvTest.createCsv(csvHeaders, createRecords, updateRecords);
|
||||
CsvImportResult dryRunResult = importCsv(glossaryName, csv, true);
|
||||
|
||||
// Validate the import result summary
|
||||
int totalRows = 1 + createRecords.size() + updateRecords.size();
|
||||
assertSummary(dryRunResult, CsvImportResult.Status.SUCCESS, totalRows, totalRows, 0);
|
||||
|
||||
// Validate the import result CSV
|
||||
String resultsCsv = EntityCsvTest.createCsvResult(csvHeaders, createRecords, updateRecords);
|
||||
assertEquals(resultsCsv, dryRunResult.getImportResultsCsv());
|
||||
|
||||
// Import CSV with dryRun=false to import the data
|
||||
CsvImportResult result = importCsv(glossaryName, csv, false);
|
||||
assertEquals(dryRunResult.withDryRun(false), result);
|
||||
|
||||
// Finally, export CSV and ensure the exported CSV is same as imported CSV
|
||||
String exportedCsv = exportCsv(glossaryName);
|
||||
assertEquals(csv, exportedCsv);
|
||||
}
|
||||
|
||||
private CsvImportResult importCsv(String glossaryName, String csv, boolean dryRun) throws HttpResponseException {
|
||||
WebTarget target = getResourceByName(glossaryName + "/import");
|
||||
target = !dryRun ? target.queryParam("dryRun", false) : target;
|
||||
return TestUtils.putCsv(target, csv, CsvImportResult.class, Status.OK, ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
private String exportCsv(String glossaryName) throws HttpResponseException {
|
||||
WebTarget target = getResourceByName(glossaryName + "/export");
|
||||
return TestUtils.get(target, String.class, ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
private void copyGlossaryTerm(GlossaryTerm from, GlossaryTerm to) {
|
||||
to.withGlossary(from.getGlossary())
|
||||
.withParent(from.getParent())
|
||||
|
@ -341,6 +341,14 @@ public final class TestUtils {
|
||||
return readResponse(response, clz, expectedStatus.getStatusCode());
|
||||
}
|
||||
|
||||
public static <T> T putCsv(
|
||||
WebTarget target, String request, Class<T> clz, Status expectedStatus, Map<String, String> headers)
|
||||
throws HttpResponseException {
|
||||
Response response =
|
||||
SecurityUtil.addHeaders(target, headers).method("PUT", Entity.entity(request, MediaType.TEXT_PLAIN));
|
||||
return readResponse(response, clz, expectedStatus.getStatusCode());
|
||||
}
|
||||
|
||||
public static void get(WebTarget target, Map<String, String> headers) throws HttpResponseException {
|
||||
final Response response = SecurityUtil.addHeaders(target, headers).get();
|
||||
readResponse(response, Status.NO_CONTENT.getStatusCode());
|
||||
|
@ -140,6 +140,11 @@
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"status" : {
|
||||
"description": "State of an action over API.",
|
||||
"type" : "string",
|
||||
"enum" : ["success", "failure", "aborted", "partialSuccess"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,16 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/type/csvErrorType.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "csvErrorType",
|
||||
"type": "string",
|
||||
"javaType": "org.openmetadata.schema.type.csv.CsvErrorType",
|
||||
"description": "What type of error occurred when importing a CSV file.",
|
||||
"enum": [
|
||||
"UNKNOWN",
|
||||
"PARSER_FAILURE",
|
||||
"INVALID_HEADER",
|
||||
"INVALID_FIELD_COUNT",
|
||||
"FIELD_REQUIRED",
|
||||
"INVALID_FIELD"
|
||||
]
|
||||
}
|
@ -0,0 +1,48 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/type/csvFile.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "csvFile",
|
||||
"description": "Represents a CSV file.",
|
||||
"javaType": "org.openmetadata.schema.type.csv.CsvFile",
|
||||
"definitions": {
|
||||
"csvHeader": {
|
||||
"description": "Represents a header for a field in a CSV file.",
|
||||
"javaType": "org.openmetadata.schema.type.csv.CsvHeader",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"required": {
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"csvRecord": {
|
||||
"javaType": "org.openmetadata.schema.type.csv.CsvRecord",
|
||||
"description": "Represents a CSV record that contains one row values separated by a separator.",
|
||||
"type": "array",
|
||||
"items" : {
|
||||
"type" : "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"headers": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/csvHeader"
|
||||
}
|
||||
},
|
||||
"records": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/csvRecord"
|
||||
}
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/type/csvImportResult.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "csvImportResult",
|
||||
"description": "Represents result of importing a CSV file. Detailed error is provided on if the CSV file is conformant and failure to load any of the records in the CSV file.",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.type.csv.CsvImportResult",
|
||||
"definitions": {
|
||||
"rowCount" : {
|
||||
"description": "Type used to indicate row count",
|
||||
"type": "integer",
|
||||
"format" : "int64",
|
||||
"minimum": 0,
|
||||
"default": 0
|
||||
},
|
||||
"index" : {
|
||||
"description": "Type used to indicate row number or field number. In CSV the indexes start with 1.",
|
||||
"type": "integer",
|
||||
"format" : "int64",
|
||||
"minimum": 1
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"dryRun" : {
|
||||
"description": "True if the CSV import has dryRun flag enabled",
|
||||
"type" : "boolean"
|
||||
},
|
||||
"status" : {
|
||||
"$ref" : "basic.json#/definitions/status"
|
||||
},
|
||||
"abortReason": {
|
||||
"description": "Reason why import was aborted. This is set only when the `status` field is set to `aborted`",
|
||||
"type" : "string"
|
||||
},
|
||||
"numberOfRowsProcessed" : {
|
||||
"$ref": "#/definitions/rowCount"
|
||||
},
|
||||
"numberOfRowsPassed" : {
|
||||
"$ref": "#/definitions/rowCount"
|
||||
},
|
||||
"numberOfRowsFailed" : {
|
||||
"$ref": "#/definitions/rowCount"
|
||||
},
|
||||
"importResultsCsv" : {
|
||||
"description": "CSV file that captures the result of import operation.",
|
||||
"type" : "string"
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user