/* * Copyright 2021 Collate * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.openmetadata.csv; import static org.openmetadata.common.utils.CommonUtil.listOf; import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.csv.CsvUtil.FIELD_SEPARATOR; import static org.openmetadata.csv.CsvUtil.recordToString; import java.io.IOException; import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVFormat.Builder; import org.apache.commons.csv.CSVPrinter; import org.apache.commons.csv.CSVRecord; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.TagLabel; import org.openmetadata.schema.type.TagLabel.TagSource; import org.openmetadata.schema.type.csv.CsvDocumentation; import org.openmetadata.schema.type.csv.CsvErrorType; import org.openmetadata.schema.type.csv.CsvFile; import org.openmetadata.schema.type.csv.CsvHeader; import org.openmetadata.schema.type.csv.CsvImportResult; import org.openmetadata.schema.type.csv.CsvImportResult.Status; import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.RestUtil.PutResponse; import org.openmetadata.service.util.ValidatorUtil; /** * EntityCsv provides export and import capabilities for an entity. Each entity must implement the abstract methods to * provide entity specific processing functionality to export an entity to a CSV record, and import an entity from a CSV * record. */ @Slf4j public abstract class EntityCsv { public static final String FIELD_ERROR_MSG = "#%s: Field %d error - %s"; public static final String IMPORT_STATUS_HEADER = "status"; public static final String IMPORT_STATUS_DETAILS = "details"; public static final String IMPORT_STATUS_SUCCESS = "success"; public static final String IMPORT_STATUS_FAILED = "failure"; public static final String ENTITY_CREATED = "Entity created"; public static final String ENTITY_UPDATED = "Entity updated"; private final String entityType; private final List csvHeaders; private final CsvImportResult importResult = new CsvImportResult(); protected boolean processRecord; // When set to false record processing is discontinued protected final Map dryRunCreatedEntities = new HashMap<>(); private final String importedBy; protected EntityCsv(String entityType, List csvHeaders, String importedBy) { this.entityType = entityType; this.csvHeaders = csvHeaders; this.importedBy = importedBy; } /** Import entities from a CSV file */ public final CsvImportResult importCsv(String csv, boolean dryRun) throws IOException { importResult.withDryRun(dryRun); StringWriter writer = new StringWriter(); CSVPrinter resultsPrinter = getResultsCsv(csvHeaders, writer); if (resultsPrinter == null) { return importResult; } // Parse CSV Iterator records = parse(csv); if (records == null) { return importResult; // Error during parsing } // First record is CSV header - Validate headers List expectedHeaders = CsvUtil.getHeaders(csvHeaders); if (!validateHeaders(expectedHeaders, records.next())) { return importResult; } importResult.withNumberOfRowsPassed(importResult.getNumberOfRowsPassed() + 1); // Validate and load each record while (records.hasNext()) { CSVRecord csvRecord = records.next(); processRecord(resultsPrinter, expectedHeaders, csvRecord); } // Finally, create the entities parsed from the record setFinalStatus(); importResult.withImportResultsCsv(writer.toString()); return importResult; } /** Implement this method to a CSV record and turn it into an entity */ protected abstract T toEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord) throws IOException; public final String exportCsv(List entities) throws IOException { CsvFile csvFile = new CsvFile().withHeaders(csvHeaders); List> records = new ArrayList<>(); for (T entity : entities) { records.add(toRecord(entity)); } csvFile.withRecords(records); return CsvUtil.formatCsv(csvFile); } public static CsvDocumentation getCsvDocumentation(String entityType) { LOG.info("Initializing CSV documentation for entity {}", entityType); String path = String.format(".*json/data/%s/%sCsvDocumentation.json$", entityType, entityType); try { List jsonDataFiles = EntityUtil.getJsonDataResources(path); String json = CommonUtil.getResourceAsStream(EntityRepository.class.getClassLoader(), jsonDataFiles.get(0)); return JsonUtils.readValue(json, CsvDocumentation.class); } catch (IOException e) { LOG.error("FATAL - Failed to load CSV documentation for entity {} from the path {}", entityType, path); } return null; } /** Implement this method to export an entity into a list of fields to create a CSV record */ protected abstract List toRecord(T entity); /** Owner field is in entityType;entityName format */ public EntityReference getOwner(CSVPrinter printer, CSVRecord csvRecord, int fieldNumber) throws IOException { String owner = csvRecord.get(fieldNumber); if (nullOrEmpty(owner)) { return null; } List list = CsvUtil.fieldToStrings(owner); if (list.size() != 2) { importFailure(printer, invalidOwner(fieldNumber), csvRecord); return null; } return getEntityReference(printer, csvRecord, fieldNumber, list.get(0), list.get(1)); } /** Owner field is in entityName format */ public EntityReference getOwnerAsUser(CSVPrinter printer, CSVRecord csvRecord, int fieldNumber) throws IOException { String owner = csvRecord.get(fieldNumber); if (nullOrEmpty(owner)) { return null; } return getEntityReference(printer, csvRecord, fieldNumber, Entity.USER, owner); } protected final Boolean getBoolean(CSVPrinter printer, CSVRecord csvRecord, int fieldNumber) throws IOException { String field = csvRecord.get(fieldNumber); if (nullOrEmpty(field)) { return null; } if (field.equals(Boolean.TRUE.toString())) { return true; } if (field.equals(Boolean.FALSE.toString())) { return false; } importFailure(printer, invalidBoolean(fieldNumber, field), csvRecord); processRecord = false; return false; } protected final EntityReference getEntityReference( CSVPrinter printer, CSVRecord csvRecord, int fieldNumber, String entityType) throws IOException { String fqn = csvRecord.get(fieldNumber); return getEntityReference(printer, csvRecord, fieldNumber, entityType, fqn); } protected EntityInterface getEntityByName(String entityType, String fqn) { EntityInterface entity = entityType.equals(this.entityType) ? dryRunCreatedEntities.get(fqn) : null; if (entity == null) { EntityRepository entityRepository = Entity.getEntityRepository(entityType); entity = entityRepository.findByNameOrNull(fqn, Include.NON_DELETED); } return entity; } protected final EntityReference getEntityReference( CSVPrinter printer, CSVRecord csvRecord, int fieldNumber, String entityType, String fqn) throws IOException { if (nullOrEmpty(fqn)) { return null; } EntityInterface entity = getEntityByName(entityType, fqn); if (entity == null) { importFailure(printer, entityNotFound(fieldNumber, fqn), csvRecord); processRecord = false; return null; } return entity.getEntityReference(); } protected final List getEntityReferences( CSVPrinter printer, CSVRecord csvRecord, int fieldNumber, String entityType) throws IOException { String fqns = csvRecord.get(fieldNumber); if (nullOrEmpty(fqns)) { return null; } List fqnList = listOrEmpty(CsvUtil.fieldToStrings(fqns)); List refs = new ArrayList<>(); for (String fqn : fqnList) { EntityReference ref = getEntityReference(printer, csvRecord, fieldNumber, entityType, fqn); if (!processRecord) { return null; } if (ref != null) { refs.add(ref); } } return refs.isEmpty() ? null : refs; } protected final List getUserOrTeamEntityReferences( CSVPrinter printer, CSVRecord csvRecord, int fieldNumber, String entityType) throws IOException { String fqns = csvRecord.get(fieldNumber); if (nullOrEmpty(fqns)) { return null; } List fqnList = listOrEmpty(CsvUtil.fieldToStrings(fqns)); List refs = new ArrayList<>(); for (String fqn : fqnList) { EntityReference ref = getEntityReference(printer, csvRecord, fieldNumber, entityType, fqn); if (!processRecord) { return null; } if (ref != null) { refs.add(ref); } } return refs.isEmpty() ? null : refs; } protected final List getTagLabels(CSVPrinter printer, CSVRecord csvRecord, int fieldNumber) throws IOException { List refs = getEntityReferences(printer, csvRecord, fieldNumber, Entity.TAG); if (!processRecord || nullOrEmpty(refs)) { return null; } List tagLabels = new ArrayList<>(); for (EntityReference ref : refs) { tagLabels.add(new TagLabel().withSource(TagSource.CLASSIFICATION).withTagFQN(ref.getFullyQualifiedName())); } return tagLabels; } public static String[] getResultHeaders(List csvHeaders) { List importResultsCsvHeader = listOf(IMPORT_STATUS_HEADER, IMPORT_STATUS_DETAILS); importResultsCsvHeader.addAll(CsvUtil.getHeaders(csvHeaders)); return importResultsCsvHeader.toArray(new String[0]); } // Create a CSVPrinter to capture the import results private CSVPrinter getResultsCsv(List csvHeaders, StringWriter writer) { CSVFormat format = Builder.create(CSVFormat.DEFAULT).setHeader(getResultHeaders(csvHeaders)).build(); try { return new CSVPrinter(writer, format); } catch (IOException e) { documentFailure(failed(e.getMessage(), CsvErrorType.UNKNOWN)); } return null; } private Iterator parse(String csv) { Reader in = new StringReader(csv); try { return CSVFormat.DEFAULT.parse(in).iterator(); } catch (IOException e) { documentFailure(failed(e.getMessage(), CsvErrorType.PARSER_FAILURE)); } return null; } private boolean validateHeaders(List expectedHeaders, CSVRecord csvRecord) { importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); if (expectedHeaders.equals(csvRecord.toList())) { return true; } importResult.withNumberOfRowsFailed(1); documentFailure(invalidHeader(recordToString(expectedHeaders), recordToString(csvRecord))); return false; } private void processRecord(CSVPrinter resultsPrinter, List expectedHeader, CSVRecord csvRecord) throws IOException { processRecord = true; // Every row must have total fields corresponding to the number of headers if (csvHeaders.size() != csvRecord.size()) { importFailure(resultsPrinter, invalidFieldCount(expectedHeader.size(), csvRecord.size()), csvRecord); return; } // Check if required values are present List errors = new ArrayList<>(); for (int i = 0; i < csvHeaders.size(); i++) { String field = csvRecord.get(i); boolean fieldRequired = Boolean.TRUE.equals(csvHeaders.get(i).getRequired()); if (fieldRequired && nullOrEmpty(field)) { errors.add(fieldRequired(i)); } } if (!errors.isEmpty()) { importFailure(resultsPrinter, String.join(FIELD_SEPARATOR, errors), csvRecord); return; } // Finally, convert record into entity for importing T entity = toEntity(resultsPrinter, csvRecord); if (entity != null) { // Finally, create entities createEntity(resultsPrinter, csvRecord, entity); } } private void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T entity) throws IOException { entity.setId(UUID.randomUUID()); entity.setUpdatedBy(importedBy); entity.setUpdatedAt(System.currentTimeMillis()); EntityRepository repository = (EntityRepository) Entity.getEntityRepository(entityType); Response.Status responseStatus; String violations = ValidatorUtil.validate(entity); if (violations != null) { // JSON schema based validation failed for the entity importFailure(resultsPrinter, violations, csvRecord); return; } if (Boolean.FALSE.equals(importResult.getDryRun())) { try { repository.prepareInternal(entity); PutResponse response = repository.createOrUpdate(null, entity); responseStatus = response.getStatus(); } catch (Exception ex) { importFailure(resultsPrinter, ex.getMessage(), csvRecord); return; } } else { repository.setFullyQualifiedName(entity); responseStatus = repository.findByNameOrNull(entity.getFullyQualifiedName(), Include.NON_DELETED) == null ? Response.Status.CREATED : Response.Status.OK; // Track the dryRun created entities, as they may be referred by other entities being created during import dryRunCreatedEntities.put(entity.getFullyQualifiedName(), entity); } if (Response.Status.CREATED.equals(responseStatus)) { importSuccess(resultsPrinter, csvRecord, ENTITY_CREATED); } else { importSuccess(resultsPrinter, csvRecord, ENTITY_UPDATED); } } public String failed(String exception, CsvErrorType errorType) { return String.format("#%s: Failed to parse the CSV filed - reason %s", errorType, exception); } public static String invalidHeader(String expected, String actual) { return String.format("#%s: Headers [%s] doesn't match [%s]", CsvErrorType.INVALID_HEADER, actual, expected); } public static String invalidFieldCount(int expectedFieldCount, int actualFieldCount) { return String.format( "#%s: Field count %d does not match the expected field count of %d", CsvErrorType.INVALID_FIELD_COUNT, actualFieldCount, expectedFieldCount); } public static String fieldRequired(int field) { return String.format("#%s: Field %d is required", CsvErrorType.FIELD_REQUIRED, field + 1); } public static String invalidField(int field, String error) { return String.format(FIELD_ERROR_MSG, CsvErrorType.INVALID_FIELD, field + 1, error); } public static String entityNotFound(int field, String fqn) { String error = String.format("Entity %s not found", fqn); return String.format(FIELD_ERROR_MSG, CsvErrorType.INVALID_FIELD, field + 1, error); } public static String invalidOwner(int field) { String error = "Owner should be of format user;userName or team;teamName"; return String.format(FIELD_ERROR_MSG, CsvErrorType.INVALID_FIELD, field + 1, error); } public static String invalidBoolean(int field, String fieldValue) { String error = String.format("Field %s should be either 'true' of 'false'", fieldValue); return String.format(FIELD_ERROR_MSG, CsvErrorType.INVALID_FIELD, field + 1, error); } private void documentFailure(String error) { importResult.withStatus(Status.ABORTED); importResult.withAbortReason(error); } private void importSuccess(CSVPrinter printer, CSVRecord inputRecord, String successDetails) throws IOException { List recordList = listOf(IMPORT_STATUS_SUCCESS, successDetails); recordList.addAll(inputRecord.toList()); printer.printRecord(recordList); importResult.withNumberOfRowsProcessed((int) inputRecord.getRecordNumber()); importResult.withNumberOfRowsPassed(importResult.getNumberOfRowsPassed() + 1); } protected void importFailure(CSVPrinter printer, String failedReason, CSVRecord inputRecord) throws IOException { List recordList = listOf(IMPORT_STATUS_FAILED, failedReason); recordList.addAll(inputRecord.toList()); printer.printRecord(recordList); importResult.withNumberOfRowsProcessed((int) inputRecord.getRecordNumber()); importResult.withNumberOfRowsFailed(importResult.getNumberOfRowsFailed() + 1); processRecord = false; } private void setFinalStatus() { Status status = Status.FAILURE; if (importResult.getNumberOfRowsPassed().equals(importResult.getNumberOfRowsProcessed())) { status = Status.SUCCESS; } else if (importResult.getNumberOfRowsPassed() > 1) { status = Status.PARTIAL_SUCCESS; } importResult.setStatus(status); } }