Fix #783 Update table entity version when the entity changes during PUT and POST operations

This commit is contained in:
sureshms 2021-10-15 16:47:45 -07:00
parent b1ee1a23f1
commit 4ba975262b
14 changed files with 478 additions and 217 deletions

View File

@ -35,10 +35,13 @@ import org.openmetadata.catalog.type.ColumnProfile;
import org.openmetadata.catalog.type.DailyCount;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.JoinedWith;
import org.openmetadata.catalog.type.TableConstraint;
import org.openmetadata.catalog.type.TableData;
import org.openmetadata.catalog.type.TableJoins;
import org.openmetadata.catalog.type.TableProfile;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.EntityUpdater;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.EventUtils;
@ -195,35 +198,23 @@ public abstract class TableRepository {
}
@Transaction
public PutResponse<Table> createOrUpdate(Table updatedTable, EntityReference newOwner, UUID databaseId) throws
public PutResponse<Table> createOrUpdate(Table updated, EntityReference newOwner, UUID databaseId) throws
IOException, ParseException {
Database database = EntityUtil.validate(databaseId.toString(), databaseDAO().findById(databaseId.toString()),
Database.class);
String tableFQName = database.getFullyQualifiedName() + "." + updatedTable.getName();
Table storedTable = JsonUtils.readValue(tableDAO().findByFQN(tableFQName), Table.class);
if (storedTable == null) {
return new PutResponse<>(Status.CREATED, createInternal(database.getId(), updatedTable, newOwner));
String tableFQName = database.getFullyQualifiedName() + "." + updated.getName();
Table stored = JsonUtils.readValue(tableDAO().findByFQN(tableFQName), Table.class);
if (stored == null) {
return new PutResponse<>(Status.CREATED, createInternal(database.getId(), updated, newOwner));
}
updatedTable.setId(storedTable.getId());
validateRelationships(updatedTable, database, newOwner);
setFields(stored, TABLE_UPDATE_FIELDS);
updated.setId(stored.getId());
validateRelationships(updated, database, newOwner);
// Carry forward non-empty description
if (storedTable.getDescription() != null && !storedTable.getDescription().isEmpty()) {
// Update description only when it is empty
updatedTable.setDescription(storedTable.getDescription());
}
// PUT operation merges tags in the request with what already exists
// Remove current table tags in the database. It will be added back later from the merged tag list.
EntityUtil.removeTagsByPrefix(tagDAO(), storedTable.getFullyQualifiedName());
updatedTable.setTags(EntityUtil.mergeTags(updatedTable.getTags(), storedTable.getTags()));
ColumnChanges columnChanges = new ColumnChanges();
updateColumns(storedTable, updatedTable, columnChanges, false);
compareAndVersion(storedTable, updatedTable, columnChanges);
storeTable(updatedTable, true);
updateRelationships(storedTable, updatedTable);
setFields(storedTable, TABLE_UPDATE_FIELDS);
TableUpdater tableUpdater = new TableUpdater(stored, updated, false);
tableUpdater.updateAll();
tableUpdater.store();
// setFields(updated, TABLE_UPDATE_FIELDS); // TODO remove this
// if (updated) {
// // TODO clean this up
@ -232,21 +223,7 @@ public abstract class TableRepository {
// JsonUtils.pojoToJson(tableStored),
// JsonUtils.pojoToJson(tableUpdated));
// }
return new PutResponse<>(Status.OK, updatedTable);
}
private void compareAndVersion(Table storedTable, Table updatedTable, ColumnChanges columnChanges) {
if (columnChanges.columnsDeleted > 0) {
// Backward incompatible change - bump the major version number
updatedTable.setVersion(Math.floor(storedTable.getVersion()) + 1.0);
}
if (columnChanges.columnsUpdated > 0 || columnChanges.columnsAdded > 0 || // Columns added or modified
!Objects.equals(storedTable.getDescription(), updatedTable.getDescription()) || // Description changed
!Objects.equals(storedTable.getOwner(), updatedTable.getOwner()) || // Owner changed
!Objects.equals(storedTable.getTableConstraints(), updatedTable.getTableConstraints())
) {
updatedTable.setVersion(storedTable.getVersion() + 0.1);
}
return new PutResponse<>(Status.OK, updated);
}
@Transaction
@ -363,17 +340,17 @@ public abstract class TableRepository {
});
}
private void validateTags(List<Column> columns) {
private void addDerivedTags(List<Column> columns) throws IOException {
if (columns == null || columns.isEmpty()) {
return;
}
columns.forEach(column -> {
EntityUtil.validateTags(tagDAO(), column.getTags());
for (Column column : columns) {
column.setTags(EntityUtil.addDerivedTags(tagDAO(), column.getTags()));
if (column.getChildren() != null) {
validateTags(column.getChildren());
addDerivedTags(column.getChildren());
}
});
}
}
private void validateRelationships(Table table, Database database, EntityReference owner) throws IOException {
@ -385,11 +362,11 @@ public abstract class TableRepository {
// Check if owner is valid and set the relationship
table.setOwner(EntityUtil.populateOwner(userDAO(), teamDAO(), owner));
// Validate table tags
EntityUtil.validateTags(tagDAO(), table.getTags());
// Validate table tags and add derived tags to the list
table.setTags(EntityUtil.addDerivedTags(tagDAO(), table.getTags()));
// Validate column tags
validateTags(table.getColumns());
addDerivedTags(table.getColumns());
}
private void storeTable(Table table, boolean update) throws JsonProcessingException {
@ -452,14 +429,7 @@ public abstract class TableRepository {
applyTags(table);
}
private void updateRelationships(Table origTable, Table updatedTable) throws IOException {
// Add owner relationship
origTable.setOwner(getOwner(origTable));
EntityUtil.updateOwner(relationshipDAO(), origTable.getOwner(), updatedTable.getOwner(), origTable.getId(),
Entity.TABLE);
applyTags(updatedTable);
}
// TODO remove this
private void applyTags(List<Column> columns) throws IOException {
// Add column level tags by adding tag to column relationship
for (Column column : columns) {
@ -488,13 +458,9 @@ public abstract class TableRepository {
validateRelationships(updated, updated.getDatabase().getId(), updated.getOwner());
// Remove previous tags. Merge tags from the update and the existing tags
EntityUtil.removeTagsByPrefix(tagDAO(), original.getFullyQualifiedName());
ColumnChanges columnChanges = new ColumnChanges();
updateColumns(original, updated, columnChanges, true);
storeTable(updated, true);
updateRelationships(original, updated);
TableUpdater tableUpdater = new TableUpdater(original, updated, true);
tableUpdater.updateAll();
tableUpdater.store();
}
private Database getDatabase(Table table) throws IOException {
@ -537,58 +503,6 @@ public abstract class TableRepository {
return table == null ? null : EntityUtil.getFollowers(table.getId(), relationshipDAO(), userDAO());
}
private void updateColumns(List<Column> storedColumns, List<Column> updatedColumns,
ColumnChanges changes, boolean patchOperation) {
// Carry forward the user generated metadata from existing columns to new columns
for (Column updated : updatedColumns) {
// Find stored column matching name, data type and ordinal position
Column stored = storedColumns.stream()
.filter(s -> s.getName().equals(updated.getName()) &&
s.getDataType() == updated.getDataType() &&
s.getArrayDataType() == updated.getArrayDataType() &&
Objects.equals(s.getOrdinalPosition(), updated.getOrdinalPosition()))
.findAny()
.orElse(null);
if (stored == null) {
changes.columnsAdded++;
LOG.info("Column {} was newly added", updated.getFullyQualifiedName());
continue;
}
// For patch operation don't overwrite updated column with stored column description
// For put operation overwrite updated column with non-empty or non-null stored column description
if (!patchOperation &&
stored.getDescription() != null && !stored.getDescription().isEmpty()) {
updated.setDescription(stored.getDescription()); // Carry forward non-empty description
}
if (updated.getChildren() != null && stored.getChildren() != null) {
updateColumns(stored.getChildren(), updated.getChildren(), changes, patchOperation);
}
}
for (Column stored : storedColumns) {
// Find updated column matching name, data type and ordinal position
Column updated = updatedColumns.stream()
.filter(s -> s.getName().equals(stored.getName()) &&
s.getDataType() == stored.getDataType() &&
s.getArrayDataType() == stored.getArrayDataType() &&
Objects.equals(s.getOrdinalPosition(), stored.getOrdinalPosition()))
.findAny()
.orElse(null);
if (updated == null) {
LOG.info("Column {} was deleted", stored.getFullyQualifiedName());
changes.columnsDeleted++;
}
}
}
// TODO remove this method
private void updateColumns(Table storedTable, Table updatedTable, ColumnChanges changes, boolean patchOperation) {
updateColumns(storedTable.getColumns(), updatedTable.getColumns(), changes, patchOperation);
}
private List<TagLabel> getTags(String fqn) {
return tagDAO().getTags(fqn);
}
@ -817,9 +731,157 @@ public abstract class TableRepository {
int delete(@Bind("id") String id);
}
public static class ColumnChanges {
int columnsAdded = 0;
int columnsUpdated = 0;
int columnsDeleted = 0;
class TableEntityInterface implements EntityInterface {
private final Table table;
TableEntityInterface(Table table) {
this.table = table;
}
@Override
public UUID getId() {
return table.getId();
}
@Override
public String getDescription() {
return table.getDescription();
}
@Override
public EntityReference getOwner() {
return table.getOwner();
}
@Override
public String getFullyQualifiedName() {
return table.getFullyQualifiedName();
}
@Override
public List<TagLabel> getTags() {
return table.getTags();
}
@Override
public void setDescription(String description) {
table.setDescription(description);
}
@Override
public void setTags(List<TagLabel> tags) {
table.setTags(tags);
}
}
/**
* Handles entity updated from PUT and POST operation.
*/
public class TableUpdater extends EntityUpdater {
final Table orig;
final Table updated;
public TableUpdater(Table orig, Table updated, boolean patchOperation) {
super(new TableEntityInterface(orig), new TableEntityInterface(updated), patchOperation, relationshipDAO(),
tagDAO());
this.orig = orig;
this.updated = updated;
}
public void updateAll() throws IOException {
super.updateAll();
updateConstraints();
updateColumns(orig.getColumns(), updated.getColumns());
}
private void updateColumns(List<Column> origColumns, List<Column> updatedColumns) throws IOException {
// Carry forward the user generated metadata from existing columns to new columns
for (Column updated : updatedColumns) {
// Find stored column matching name, data type and ordinal position
Column stored = origColumns.stream()
.filter(s -> s.getName().equals(updated.getName()) &&
s.getDataType() == updated.getDataType() &&
s.getArrayDataType() == updated.getArrayDataType() &&
Objects.equals(s.getOrdinalPosition(), updated.getOrdinalPosition()))
.findAny()
.orElse(null);
if (stored == null) {
fieldsAdded.add("column:" + updated.getFullyQualifiedName());
EntityUtil.applyTags(tagDAO(), updated.getTags(), updated.getFullyQualifiedName());
continue;
}
updateColumnDescription(stored, updated);
updateColumnTags(stored, updated);
updateColumnConstraint(stored, updated);
if (updated.getChildren() != null && stored.getChildren() != null) {
updateColumns(stored.getChildren(), updated.getChildren());
}
}
for (Column stored : origColumns) {
// Find updated column matching name, data type and ordinal position
Column updated = updatedColumns.stream()
.filter(s -> s.getName().equals(stored.getName()) &&
s.getDataType() == stored.getDataType() &&
s.getArrayDataType() == stored.getArrayDataType() &&
Objects.equals(s.getOrdinalPosition(), stored.getOrdinalPosition()))
.findAny()
.orElse(null);
if (updated == null) {
fieldsDeleted.add("column:" + stored.getFullyQualifiedName());
majorVersionChange = true;
}
}
}
private void updateColumnDescription(Column origColumn, Column updatedColumn) {
if (!patchOperation
&& origColumn.getDescription() != null && !origColumn.getDescription().isEmpty()) {
// Update description only when stored is empty to retain user authored descriptions
updatedColumn.setDescription(origColumn.getDescription());
return;
}
update("column:" + origColumn.getFullyQualifiedName() + ":description", origColumn.getDescription(),
updatedColumn.getDescription());
}
private void updateColumnConstraint(Column origColumn, Column updatedColumn) {
update("column:" + orig.getFullyQualifiedName() + ":description", origColumn.getConstraint(),
updatedColumn.getConstraint());
}
private void updateConstraints() {
List<TableConstraint> origConstraints = orig.getTableConstraints();
List<TableConstraint> updatedConstraints = updated.getTableConstraints();
if (origConstraints != null) {
origConstraints.sort(Comparator.comparing(TableConstraint::getConstraintType));
origConstraints.stream().map(TableConstraint::getColumns).forEach(Collections::sort);
}
if (updatedConstraints != null) {
updatedConstraints.sort(Comparator.comparing(TableConstraint::getConstraintType));
updatedConstraints.stream().map(TableConstraint::getColumns).forEach(Collections::sort);
}
update("tableConstraints", origConstraints, updatedConstraints);
}
private void updateColumnTags(Column origColumn, Column updatedColumn) throws IOException {
if (!patchOperation) {
// PUT operation merges tags in the request with what already exists
updatedColumn.setTags(EntityUtil.mergeTags(updatedColumn.getTags(), origColumn.getTags()));
}
update("column:" + origColumn.getFullyQualifiedName() + ":tags",
origColumn.getTags() == null ? 0 : origColumn.getTags().size(),
updatedColumn.getTags() == null ? 0 : updatedColumn.getTags().size());
EntityUtil.applyTags(tagDAO(), updatedColumn.getTags(), updatedColumn.getFullyQualifiedName());
}
public void store() throws IOException {
updated.setVersion(getNewVersion(orig.getVersion()));
storeTable(updated, true);
}
}
}

View File

@ -0,0 +1,4 @@
package org.openmetadata.catalog.resources.databases;
public class TableUtil {
}

View File

@ -0,0 +1,18 @@
package org.openmetadata.catalog.util;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.TagLabel;
import java.util.List;
import java.util.UUID;
public interface EntityInterface {
UUID getId();
String getDescription();
EntityReference getOwner();
String getFullyQualifiedName();
List<TagLabel> getTags();
void setDescription(String description);
void setTags(List<TagLabel> tags);
}

View File

@ -0,0 +1,116 @@
package org.openmetadata.catalog.util;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.jdbi3.EntityRelationshipDAO;
import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO;
import org.openmetadata.catalog.type.EntityReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Used for updating the following common entity fields in PUT and PATCH operations.
* - description
* - tags
* - owner
*
* This class handles tracking all the changes in an update operation and also versioning
* of the entity.
*
* Concrete implementations need to implement update for other fields. See
* {@link org.openmetadata.catalog.jdbi3.TableRepository.TableUpdater} for an example implementation.
*/
public abstract class EntityUpdater {
private static final Logger LOG = LoggerFactory.getLogger(EntityUpdater.class);
private final EntityInterface originalEntity;
private final EntityInterface updatedEntity;
private final EntityRelationshipDAO relationshipDAO;
private final TagDAO tagDAO;
protected final boolean patchOperation;
protected List<String> fieldsUpdated = new ArrayList<>();
protected List<String> fieldsAdded = new ArrayList<>();
protected List<String> fieldsDeleted = new ArrayList<>();
protected boolean majorVersionChange = false;
public EntityUpdater(EntityInterface originalEntity, EntityInterface updatedEntity, boolean patchOperation,
EntityRelationshipDAO relationshipDAO, TagDAO tagDAO) {
this.originalEntity = originalEntity;
this.updatedEntity = updatedEntity;
this.patchOperation = patchOperation;
this.relationshipDAO = relationshipDAO;
this.tagDAO = tagDAO;
}
public void updateAll() throws IOException {
updateDescription();
updateOwner();
updateTags();
}
private void updateDescription() {
if (!patchOperation &&
originalEntity.getDescription() != null && !originalEntity.getDescription().isEmpty()) {
// Update description only when stored is empty to retain user authored descriptions
updatedEntity.setDescription(originalEntity.getDescription());
return;
}
update("description", originalEntity.getDescription(), updatedEntity.getDescription());
}
private void updateOwner() {
EntityReference origOwner = originalEntity.getOwner();
EntityReference updatedOwner = updatedEntity.getOwner();
if (update("owner", origOwner == null ? null : origOwner.getId(),
updatedOwner == null ? null : updatedOwner.getId())) {
EntityUtil.updateOwner(relationshipDAO, origOwner, updatedOwner, originalEntity.getId(), Entity.TABLE);
}
}
private void updateTags() throws IOException {
// Remove current table tags in the database. It will be added back later from the merged tag list.
EntityUtil.removeTagsByPrefix(tagDAO, originalEntity.getFullyQualifiedName());
if (!patchOperation) {
// PUT operation merges tags in the request with what already exists
updatedEntity.setTags(EntityUtil.mergeTags(updatedEntity.getTags(), originalEntity.getTags()));
}
update("tags", originalEntity.getTags() == null ? 0 : originalEntity.getTags().size(),
updatedEntity.getTags() == null ? 0 : updatedEntity.getTags().size());
EntityUtil.applyTags(tagDAO, updatedEntity.getTags(), updatedEntity.getFullyQualifiedName());
}
public Double getNewVersion(Double oldVersion) {
Double newVersion = oldVersion;
if (majorVersionChange) {
newVersion = oldVersion + 1.0;
} else if (!fieldsUpdated.isEmpty() || !fieldsAdded.isEmpty() || !fieldsDeleted.isEmpty()) {
newVersion = oldVersion + 0.1;
}
LOG.info("{}->{} - Fields added {}, updated {}, deleted {}",
oldVersion, newVersion, fieldsAdded, fieldsUpdated, fieldsDeleted);
return newVersion;
}
public abstract void store() throws IOException;
protected boolean update(String field, Object orig, Object updated) {
if (orig == null && updated == null) {
return false;
}
if (orig == null) {
fieldsAdded.add(field);
return true;
} else if (updated == null) {
fieldsDeleted.add(field);
return true;
} else if (!orig.equals(updated)) {
fieldsUpdated.add(field);
return true;
}
return false;
}
}

View File

@ -222,7 +222,7 @@ public final class EntityUtil {
EntityReference owner) {
// Add relationship owner --- owns ---> ownedEntity
if (owner != null) {
LOG.info("Owner {}:{} for entity {}", owner.getType(), owner.getId(), ownedEntityId);
LOG.info("Adding owner {}:{} for entity {}", owner.getType(), owner.getId(), ownedEntityId);
dao.insert(owner.getId().toString(), ownedEntityId.toString(), owner.getType(), ownedEntityType,
Relationship.OWNS.ordinal());
}
@ -243,7 +243,7 @@ public final class EntityUtil {
UUID ownedEntityId, String ownedEntityType) {
// TODO inefficient use replace instead of delete and add?
// TODO check for orig and new owners being the same
EntityUtil.unassignOwner(dao, originalOwner, ownedEntityId.toString());
unassignOwner(dao, originalOwner, ownedEntityId.toString());
setOwner(dao, ownedEntityId, ownedEntityType, newOwner);
}
@ -560,7 +560,6 @@ public final class EntityUtil {
Tag tag = JsonUtils.readValue(json, Tag.class);
// Apply tagLabel to targetFQN that identifies an entity or field
LOG.info("Applying tag {} to targetFQN {}", tagLabel.getTagFQN(), targetFQN);
tagDAO.applyTag(tagLabel.getTagFQN(), targetFQN, tagLabel.getLabelType().ordinal(),
tagLabel.getState().ordinal());
@ -578,6 +577,28 @@ public final class EntityUtil {
return derivedTags;
}
/**
* Validate given list of tags and add derived tags to it
*/
public static List<TagLabel> addDerivedTags(TagDAO tagDAO, List<TagLabel> tagLabels) throws IOException {
List<TagLabel> updatedTagLabels = new ArrayList<>();
for (TagLabel tagLabel : Optional.ofNullable(tagLabels).orElse(Collections.emptyList())) {
String json = tagDAO.findTag(tagLabel.getTagFQN());
if (json == null) {
// Invalid TagLabel
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Tag.class.getSimpleName(),
tagLabel.getTagFQN()));
}
Tag tag = JsonUtils.readValue(json, Tag.class);
updatedTagLabels.add(tagLabel);
// Apply derived tags
List<TagLabel> derivedTags = getDerivedTags(tagLabel, tag);
updatedTagLabels = EntityUtil.mergeTags(updatedTagLabels, derivedTags);
}
return updatedTagLabels;
}
public static void removeTags(TagDAO tagDAO, String fullyQualifiedName) {
tagDAO.deleteTags(fullyQualifiedName);
}
@ -587,9 +608,10 @@ public final class EntityUtil {
}
public static List<TagLabel> mergeTags(List<TagLabel> list1, List<TagLabel> list2) {
return Stream.concat(Optional.ofNullable(list1).orElse(Collections.emptyList()).stream(),
List<TagLabel> mergedTags = Stream.concat(Optional.ofNullable(list1).orElse(Collections.emptyList()).stream(),
Optional.ofNullable(list2).orElse(Collections.emptyList()).stream())
.distinct().collect(Collectors.toList());
return mergedTags.isEmpty() ? null : mergedTags;
}
public static void publishEntityCreatedEvent(String entity, String entityName, String event) {

View File

@ -86,7 +86,7 @@
"UNIQUE",
"PRIMARY_KEY"
],
"default": "NULL"
"default": null
},
"tableConstraint": {
"type": "object",

View File

@ -5,6 +5,21 @@
"description": "This schema defines the type used for capturing version of history of entity.",
"type": "object",
"javaType": "org.openmetadata.catalog.type.EntityLineage",
"definitions": {
"entityVersion" : {
"type": "object",
"properties" : {
"changeDescription" : {
"description": "Description of the change.",
"type" : "string"
},
"entity" : {
"description": "JSON payload corresponding to a version of the entity.",
"type" : "object"
}
}
}
},
"properties": {
"entityType" : {
"description": "Entity type, such as `database`, `table`, `dashboard`, for which this version history is produced.",
@ -14,7 +29,7 @@
"descriptions" : "All the versions of the entity ordered from the latest to the oldest version. Note the array element object has schema that corresponds to the entity schema. For example, if `entityType` is `table`, then the schema of the object in the array is `table.json`.",
"type" : "array",
"items" : {
"type": "object"
"$ref": "#/definitions/entityVersion"
}
}
},

View File

@ -601,7 +601,7 @@ public class ChartResourceTest extends CatalogApplicationTest {
assertEquals(expectedService.getId(), chart.getService().getId());
assertEquals(expectedService.getType(), chart.getService().getType());
}
TestUtils.validateTags(expectedTags, chart.getTags());
TestUtils.validateTags(chart.getFullyQualifiedName(), expectedTags, chart.getTags());
return chart;
}

View File

@ -666,7 +666,7 @@ public class DashboardResourceTest extends CatalogApplicationTest {
assertEquals(expectedService.getType(), dashboard.getService().getType());
}
validateDashboardCharts(dashboard, charts);
TestUtils.validateTags(expectedTags, dashboard.getTags());
TestUtils.validateTags(dashboard.getFullyQualifiedName(), expectedTags, dashboard.getTags());
return dashboard;
}

View File

@ -42,6 +42,7 @@ import org.openmetadata.catalog.resources.tags.TagResourceTest;
import org.openmetadata.catalog.resources.teams.TeamResourceTest;
import org.openmetadata.catalog.resources.teams.UserResourceTest;
import org.openmetadata.catalog.type.Column;
import org.openmetadata.catalog.type.ColumnConstraint;
import org.openmetadata.catalog.type.ColumnDataType;
import org.openmetadata.catalog.type.ColumnJoin;
import org.openmetadata.catalog.type.ColumnProfile;
@ -58,6 +59,7 @@ import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.TestUtils;
import org.openmetadata.catalog.util.TestUtils.UpdateType;
import org.openmetadata.common.utils.JsonSchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -99,6 +101,9 @@ import static org.openmetadata.catalog.type.ColumnDataType.INT;
import static org.openmetadata.catalog.type.ColumnDataType.STRUCT;
import static org.openmetadata.catalog.util.RestUtil.DATE_FORMAT;
import static org.openmetadata.catalog.util.TestUtils.NON_EXISTENT_ENTITY;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.MAJOR_UPDATE;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.NO_CHANGE;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination;
import static org.openmetadata.catalog.util.TestUtils.assertResponse;
@ -219,7 +224,7 @@ public class TableResourceTest extends CatalogApplicationTest {
}
@Test
public void post_validTables_200_OK(TestInfo test) throws HttpResponseException, InterruptedException {
public void post_validTables_200_OK(TestInfo test) throws HttpResponseException {
// Create table with different optional fields
// Optional field description
CreateTable create = create(test).withDescription("description");
@ -259,9 +264,10 @@ public class TableResourceTest extends CatalogApplicationTest {
// Test PUT operation
CreateTable create2 = create(test, 2).withColumns(Arrays.asList(c1, c2));
updateAndCheckTable(create2.withName("put_complexColumnType"), Status.CREATED, adminAuthHeaders());
Table table2= updateAndCheckTable(null, create2.withName("put_complexColumnType"), Status.CREATED,
adminAuthHeaders(), NO_CHANGE);
// Update without any change
updateAndCheckTable(create2.withName("put_complexColumnType"), Status.OK, adminAuthHeaders());
updateAndCheckTable(table2, create2.withName("put_complexColumnType"), Status.OK, adminAuthHeaders(), NO_CHANGE);
//
// Update the complex columns
@ -272,15 +278,14 @@ public class TableResourceTest extends CatalogApplicationTest {
// c2 from -> to
// struct<a:int, b:char, c:struct<d:int>>>
// struct<-----, b:char, c:struct<d:int, e:char>, f:char>
c2_b.withTags(singletonList(USER_BANK_ACCOUNT_TAG_LABEL)); // Change c2.b tag
c2_b.withTags(List.of(USER_ADDRESS_TAG_LABEL, USER_BANK_ACCOUNT_TAG_LABEL)); // Add new tag to c2.b tag
c2_c.getChildren().add(getColumn("e", INT,USER_ADDRESS_TAG_LABEL)); // Add c2.c.e
c2.getChildren().remove(0); // Remove c2.a from struct
c2.getChildren().add(getColumn("f", CHAR, USER_ADDRESS_TAG_LABEL)); // Add c2.f
create2 = create2.withColumns(Arrays.asList(c1, c2));
// Update the columns with put operation and validate update
updateAndCheckTable(create2.withName("put_complexColumnType"), Status.OK, adminAuthHeaders());
// Update the columns with PUT operation and validate update
updateAndCheckTable(table2, create2.withName("put_complexColumnType"), Status.OK, adminAuthHeaders(), MAJOR_UPDATE);
//
// Patch operations on table1 created by POST operation. Columns can't be added or deleted. Only tags and
@ -355,20 +360,19 @@ public class TableResourceTest extends CatalogApplicationTest {
@Test
public void put_tableUpdateWithNoChange_200(TestInfo test) throws HttpResponseException {
CreateTable request = create(test).withOwner(USER_OWNER1);
createAndCheckTable(request, adminAuthHeaders());
Table table = createAndCheckTable(request, adminAuthHeaders());
// Update table two times successfully with PUT requests
Double version1 = updateAndCheckTable(request, OK, adminAuthHeaders()).getVersion();
Double version2 = updateAndCheckTable(request, OK, adminAuthHeaders()).getVersion();
assertEquals(version1, version2); // No version change
updateAndCheckTable(table, request, OK, adminAuthHeaders(), NO_CHANGE);
updateAndCheckTable(table, request, OK, adminAuthHeaders(), NO_CHANGE);
}
@Test
public void put_tableCreate_200(TestInfo test) throws HttpResponseException {
// Create a new table with put
CreateTable request = create(test).withOwner(USER_OWNER1);
Table table = updateAndCheckTable(request.withName("newName").withDescription(null), CREATED, adminAuthHeaders());
assertEquals(0.1, table.getVersion()); // First version
updateAndCheckTable(null, request.withName("newName").withDescription(null), CREATED,
adminAuthHeaders(), NO_CHANGE);
}
@Test
@ -378,9 +382,7 @@ public class TableResourceTest extends CatalogApplicationTest {
Table table = createAndCheckTable(request, adminAuthHeaders());
// Update null description with a new description
Table updatedTable = updateAndCheckTable(request.withDescription("newDescription"), OK, adminAuthHeaders());
assertEquals("newDescription", updatedTable.getDescription());
assertEquals(table.getVersion() + 0.1, updatedTable.getVersion());
updateAndCheckTable(table, request.withDescription("newDescription"), OK, adminAuthHeaders(), MINOR_UPDATE);
}
@Test
@ -389,10 +391,8 @@ public class TableResourceTest extends CatalogApplicationTest {
CreateTable request = create(test).withOwner(USER_OWNER1);
Table table = createAndCheckTable(request, adminAuthHeaders());
// Update empty description with a new description
Table updatedTable = updateAndCheckTable(request.withDescription("newDescription"), OK, adminAuthHeaders());
assertEquals("newDescription", updatedTable.getDescription());
assertEquals(table.getVersion() + 0.1, updatedTable.getVersion());
// Update empty description with a new description and expect minor version update
updateAndCheckTable(table, request.withDescription("newDescription"), OK, adminAuthHeaders(), MINOR_UPDATE);
}
@Test
@ -413,16 +413,14 @@ public class TableResourceTest extends CatalogApplicationTest {
checkOwnerOwns(USER_OWNER1, table.getId(), true);
// Change ownership from USER_OWNER1 to TEAM_OWNER1
Table updatedTable = updateAndCheckTable(request.withOwner(TEAM_OWNER1), OK, adminAuthHeaders());
Table updatedTable = updateAndCheckTable(table, request.withOwner(TEAM_OWNER1), OK, adminAuthHeaders(), MINOR_UPDATE);
checkOwnerOwns(USER_OWNER1, updatedTable.getId(), false);
checkOwnerOwns(TEAM_OWNER1, updatedTable.getId(), true);
assertEquals(updatedTable.getVersion() + 0.1, updatedTable.getVersion());
// Remove ownership
updatedTable = updateAndCheckTable(request.withOwner(null), OK, adminAuthHeaders());
updatedTable = updateAndCheckTable(updatedTable, request.withOwner(null), OK, adminAuthHeaders(), MINOR_UPDATE);
assertNull(updatedTable.getOwner());
checkOwnerOwns(TEAM_OWNER1, updatedTable.getId(), false);
assertEquals(table.getVersion() + 0.2, updatedTable.getVersion());
}
@Test
@ -432,16 +430,42 @@ public class TableResourceTest extends CatalogApplicationTest {
Table table = createAndCheckTable(request, adminAuthHeaders());
checkOwnerOwns(USER_OWNER1, table.getId(), true);
// Update the table with constraints
request = create(test).withOwner(USER_OWNER1).withDescription("description");
Table updatedTable = updateAndCheckTable(request, OK, adminAuthHeaders());
assertEquals(table.getVersion() + 0.1, updatedTable.getVersion()); // Adding constraint is backward compatible
// Update the table with constraints and ensure minor version change
TableConstraint constraint = new TableConstraint().withConstraintType(ConstraintType.UNIQUE)
.withColumns(List.of(COLUMNS.get(0).getName()));
request = request.withTableConstraints(List.of(constraint));
Table updatedTable = updateAndCheckTable(table, request, OK, adminAuthHeaders(), MINOR_UPDATE);
// Update again with no change. Version must not change
updatedTable = updateAndCheckTable(updatedTable, request, OK, adminAuthHeaders(), NO_CHANGE);
// Update the table with new constraints
request = create(test).withOwner(USER_OWNER1).withDescription("description");
updatedTable = updateAndCheckTable(request, OK, adminAuthHeaders());
assertEquals(Math.floor(table.getVersion()) + 1.0, updatedTable.getVersion()); // Changing constraint is backward
// incompatible
constraint = constraint.withConstraintType(ConstraintType.PRIMARY_KEY);
request = request.withTableConstraints(List.of(constraint));
updatedTable = updateAndCheckTable(updatedTable, request, OK, adminAuthHeaders(), MINOR_UPDATE);
// Remove table constraint and ensure minor version changes
request = request.withTableConstraints(null);
updateAndCheckTable(updatedTable, request, OK, adminAuthHeaders(), MINOR_UPDATE);
}
@Test
public void put_columnConstraintUpdate_200(TestInfo test) throws HttpResponseException {
List<Column> columns = new ArrayList<>();
columns.add(getColumn("c1", INT, null).withConstraint(ColumnConstraint.NULL));
columns.add(getColumn("c2", INT, null).withConstraint(ColumnConstraint.UNIQUE));
CreateTable request = create(test).withColumns(columns);
Table table = createAndCheckTable(request, adminAuthHeaders());
// Change the the column constraints and expect minor version change
request.getColumns().get(0).withConstraint(ColumnConstraint.NOT_NULL);
request.getColumns().get(1).withConstraint(ColumnConstraint.PRIMARY_KEY);
Table updatedTable = updateAndCheckTable(table, request, OK, adminAuthHeaders(), MINOR_UPDATE);
// Remove column constraints and expect minor version change
request.getColumns().get(0).withConstraint(null);
request.getColumns().get(1).withConstraint(null);
updateAndCheckTable(updatedTable, request, OK, adminAuthHeaders(), MINOR_UPDATE);
}
@Test
@ -469,13 +493,13 @@ public class TableResourceTest extends CatalogApplicationTest {
assertEquals(bankTagUsageCount, getTagUsageCount(USER_BANK_ACCOUNT_TAG_LABEL.getTagFQN(), userAuthHeaders()));
//
// Update the c1 with additional tag USER_BANK_ACCOUNT_TAG_LABEL
// Update the c1 tags to USER_ADDRESS_TAB_LABEL, USER_BANK_ACCOUNT_TAG_LABEL (newly added)
// Ensure description and previous tag is carried forward during update
//
tags.add(USER_BANK_ACCOUNT_TAG_LABEL);
List<Column> updatedColumns = new ArrayList<>();
updatedColumns.add(getColumn("c1", BIGINT, null).withTags(tags));
table = updateAndCheckTable(request.withColumns(updatedColumns), OK, adminAuthHeaders());
table = updateAndCheckTable(table, request.withColumns(updatedColumns), OK, adminAuthHeaders(), MINOR_UPDATE);
// Ensure tag usage counts are updated
assertEquals(tagCategoryUsageCount + 2, getTagCategoryUsageCount("user", userAuthHeaders()));
@ -483,14 +507,11 @@ public class TableResourceTest extends CatalogApplicationTest {
assertEquals(bankTagUsageCount + 1, getTagUsageCount(USER_BANK_ACCOUNT_TAG_LABEL.getTagFQN(), userAuthHeaders()));
//
// Add a new column and make sure it is added by PUT
// Add a new column using PUT
//
updatedColumns.add(getColumn("c2", BINARY, null).withOrdinalPosition(2)
.withDataLength(10).withFullyQualifiedName(table.getFullyQualifiedName() + ".c2").withTags(tags));
table = updateAndCheckTable(request.withColumns(updatedColumns), OK, adminAuthHeaders());
assertEquals(2, table.getColumns().size());
TestUtils.validateTags(updatedColumns.get(0).getTags(), table.getColumns().get(0).getTags());
TestUtils.validateTags(updatedColumns.get(1).getTags(), table.getColumns().get(1).getTags());
.withDataLength(10).withTags(tags));
table = updateAndCheckTable(table, request.withColumns(updatedColumns), OK, adminAuthHeaders(), MINOR_UPDATE);
// Ensure tag usage counts are updated - column c2 added both address and bank tags
assertEquals(tagCategoryUsageCount + 4, getTagCategoryUsageCount("user", userAuthHeaders()));
@ -501,9 +522,9 @@ public class TableResourceTest extends CatalogApplicationTest {
// Remove a column c2 and make sure it is deleted by PUT
//
updatedColumns.remove(1);
table = updateAndCheckTable(request.withColumns(updatedColumns), OK, adminAuthHeaders());
table = updateAndCheckTable(table, request.withColumns(updatedColumns), OK, adminAuthHeaders(), MAJOR_UPDATE);
assertEquals(1, table.getColumns().size());
TestUtils.validateTags(columns.get(0).getTags(), table.getColumns().get(0).getTags());
validateTags(columns.get(0), table.getColumns().get(0));
// Ensure tag usage counts are updated to reflect removal of column c2
assertEquals(tagCategoryUsageCount + 2, getTagCategoryUsageCount("user", userAuthHeaders()));
@ -511,6 +532,10 @@ public class TableResourceTest extends CatalogApplicationTest {
assertEquals(bankTagUsageCount + 1, getTagUsageCount(USER_BANK_ACCOUNT_TAG_LABEL.getTagFQN(), userAuthHeaders()));
}
private void validateTags(Column expected, Column actual) throws HttpResponseException {
TestUtils.validateTags(expected.getFullyQualifiedName(), expected.getTags(), actual.getTags());
}
@Test
public void put_tableJoins_200(TestInfo test) throws HttpResponseException, ParseException {
Table table1 = createAndCheckTable(create(test, 1), adminAuthHeaders());
@ -650,41 +675,13 @@ public class TableResourceTest extends CatalogApplicationTest {
assertEquals(actual.getDayCount(), 30);
// Sort the columnJoins and the joinedWith to account for different ordering
expected.sort(new ColumnJoinComparator());
expected.forEach(c -> c.getJoinedWith().sort(new JoinedWithComparator()));
actual.getColumnJoins().sort(new ColumnJoinComparator());
actual.getColumnJoins().forEach(c -> c.getJoinedWith().sort(new JoinedWithComparator()));
expected.sort(Comparator.comparing(ColumnJoin::getColumnName));
expected.forEach(c -> c.getJoinedWith().sort(Comparator.comparing(JoinedWith::getFullyQualifiedName)));
actual.getColumnJoins().sort(Comparator.comparing(ColumnJoin::getColumnName));
actual.getColumnJoins().forEach(c -> c.getJoinedWith().sort(Comparator.comparing(JoinedWith::getFullyQualifiedName)));
assertEquals(expected, actual.getColumnJoins());
}
public static class TagLabelComparator implements Comparator<TagLabel> {
@Override
public int compare(TagLabel label, TagLabel t1) {
return label.getTagFQN().compareTo(t1.getTagFQN());
}
}
public static class ColumnComparator implements Comparator<Column> {
@Override
public int compare(Column column, Column t1) {
return column.getName().compareTo(t1.getName());
}
}
public static class ColumnJoinComparator implements Comparator<ColumnJoin> {
@Override
public int compare(ColumnJoin columnJoin, ColumnJoin t1) {
return columnJoin.getColumnName().compareTo(t1.getColumnName());
}
}
public static class JoinedWithComparator implements Comparator<JoinedWith> {
@Override
public int compare(JoinedWith joinedWith, JoinedWith t1) {
return joinedWith.getFullyQualifiedName().compareTo(t1.getFullyQualifiedName());
}
}
@Test
public void put_tableSampleData_200(TestInfo test) throws HttpResponseException {
Table table = createAndCheckTable(create(test), adminAuthHeaders());
@ -1130,11 +1127,6 @@ public class TableResourceTest extends CatalogApplicationTest {
return patchTable(tableJson, table, authHeaders);
}
// TODO disallow changing href, usage
// TODO allow changing columns, tableConstraints
// TODO Change column attributes
// TODO Add column
// TODO Remove column
public static Table createAndCheckTable(CreateTable create, Map<String, String> authHeaders)
throws HttpResponseException {
// Validate table created has all the information set in create request
@ -1255,10 +1247,9 @@ public class TableResourceTest extends CatalogApplicationTest {
assertEquals(expectedDatabaseId, table.getDatabase().getId());
}
// Validate table constraints
assertEquals(expectedTableConstraints, table.getTableConstraints());
TestUtils.validateTags(expectedTags, table.getTags());
TestUtils.validateTags(table.getFullyQualifiedName(), expectedTags, table.getTags());
TestUtils.validateEntityReference(table.getFollowers());
}
@ -1268,10 +1259,11 @@ public class TableResourceTest extends CatalogApplicationTest {
assertEquals(expectedColumn.getDescription(), actualColumn.getDescription());
assertEquals(expectedColumn.getDataType(), actualColumn.getDataType());
assertEquals(expectedColumn.getArrayDataType(), actualColumn.getArrayDataType());
assertEquals(expectedColumn.getConstraint(), actualColumn.getConstraint());
if (expectedColumn.getDataTypeDisplay() != null) {
assertEquals(expectedColumn.getDataTypeDisplay().toLowerCase(Locale.ROOT), actualColumn.getDataTypeDisplay());
}
TestUtils.validateTags(expectedColumn.getTags(), actualColumn.getTags());
TestUtils.validateTags(actualColumn.getFullyQualifiedName(), expectedColumn.getTags(), actualColumn.getTags());
// Check the nested columns
validateColumns(expectedColumn.getChildren(), actualColumn.getChildren());
@ -1351,17 +1343,32 @@ public class TableResourceTest extends CatalogApplicationTest {
return createTable(create, adminAuthHeaders());
}
public static Table updateAndCheckTable(CreateTable create, Status status, Map<String, String> authHeaders)
public static Table updateAndCheckTable(Table before, CreateTable create, Status status,
Map<String, String> authHeaders,
UpdateType updateType)
throws HttpResponseException {
String updatedBy = TestUtils.getPrincipal(authHeaders);
Table updatedTable = updateTable(create, status, authHeaders);
validateTable(updatedTable, create.getDescription(), create.getColumns(), create.getOwner(), create.getDatabase(),
create.getTableType(), create.getTableConstraints(), create.getTags(), updatedBy);
if (before == null) {
// First version created
assertEquals(0.1, updatedTable.getVersion());
} else {
TestUtils.validateUpdate(before.getVersion(), updatedTable.getVersion(), updateType);
}
// GET the newly updated database and validate
Table getTable = getTable(updatedTable.getId(), "columns,database,owner,tableConstraints,tags", authHeaders);
validateTable(getTable, create.getDescription(), create.getColumns(), create.getOwner(), create.getDatabase(),
create.getTableType(), create.getTableConstraints(), create.getTags(), updatedBy);
if (before == null) {
// First version created
assertEquals(0.1, updatedTable.getVersion());
} else {
TestUtils.validateUpdate(before.getVersion(), updatedTable.getVersion(), updateType);
}
// TODO columns check
return updatedTable;
}

View File

@ -671,7 +671,7 @@ public class PipelineResourceTest extends CatalogApplicationTest {
assertEquals(expectedService.getType(), pipeline.getService().getType());
}
validatePipelineTASKs(pipeline, tasks);
TestUtils.validateTags(expectedTags, pipeline.getTags());
TestUtils.validateTags(pipeline.getFullyQualifiedName(), expectedTags, pipeline.getTags());
return pipeline;
}

View File

@ -584,7 +584,7 @@ public class TaskResourceTest extends CatalogApplicationTest {
assertEquals(expectedService.getId(), task.getService().getId());
assertEquals(expectedService.getType(), task.getService().getType());
}
TestUtils.validateTags(expectedTags, task.getTags());
TestUtils.validateTags(task.getFullyQualifiedName(), expectedTags, task.getTags());
return task;
}

View File

@ -620,7 +620,7 @@ public class TopicResourceTest extends CatalogApplicationTest {
assertEquals(expectedService.getId(), topic.getService().getId());
assertEquals(expectedService.getType(), topic.getService().getType());
}
TestUtils.validateTags(expectedTags, topic.getTags());
TestUtils.validateTags(topic.getFullyQualifiedName(), expectedTags, topic.getTags());
return topic;
}

View File

@ -20,7 +20,6 @@ import org.apache.http.client.HttpResponseException;
import org.eclipse.jetty.http.HttpStatus;
import org.junit.jupiter.api.function.Executable;
import org.openmetadata.catalog.entity.teams.User;
import org.openmetadata.catalog.resources.databases.TableResourceTest.TagLabelComparator;
import org.openmetadata.catalog.resources.tags.TagResourceTest;
import org.openmetadata.catalog.resources.teams.UserResourceTest;
import org.openmetadata.catalog.security.CatalogOpenIdAuthorizationRequestFilter;
@ -40,6 +39,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -61,6 +61,12 @@ public final class TestUtils {
public static URI DASHBOARD_URL;
public static URI PIPELINE_URL;
public enum UpdateType {
NO_CHANGE, // PUT/PATCH made no change
MINOR_UPDATE, // PUT/PATCH made backward compatible minor version change
MAJOR_UPDATE // PUT/PATCH made backward incompatible minor version change
}
static {
JDBC_INFO = new JdbcInfo().withConnectionUrl("scheme://user_name:password#_@%:localhost:1000/test")
.withDriverClass("driverClass");
@ -230,11 +236,12 @@ public final class TestUtils {
return headers;
}
public static void validateTags(List<TagLabel> expectedList, List<TagLabel> actualList)
public static void validateTags(String fqn, List<TagLabel> expectedList, List<TagLabel> actualList)
throws HttpResponseException {
if (expectedList == null) {
return;
}
actualList = Optional.ofNullable(actualList).orElse(Collections.emptyList());
// When tags from the expected list is added to an entity, the derived tags for those tags are automatically added
// So add to the expectedList, the derived tags before validating the tags
List<TagLabel> updatedExpectedList = new ArrayList<>(expectedList);
@ -244,10 +251,10 @@ public final class TestUtils {
updatedExpectedList.addAll(derived);
}
updatedExpectedList = updatedExpectedList.stream().distinct().collect(Collectors.toList());
updatedExpectedList.sort(new TagLabelComparator());
actualList.sort(new TagLabelComparator());
updatedExpectedList.sort(Comparator.comparing(TagLabel::getTagFQN));
actualList.sort(Comparator.comparing(TagLabel::getTagFQN));
assertEquals(updatedExpectedList.size(), actualList.size());
assertEquals(updatedExpectedList.size(), actualList.size(), fqn);
for (int i = 0; i < actualList.size(); i++) {
assertEquals(updatedExpectedList.get(i), actualList.get(i));
}
@ -280,4 +287,14 @@ public final class TestUtils {
// Get user name from the email address
return authHeaders.get(CatalogOpenIdAuthorizationRequestFilter.X_AUTH_PARAMS_EMAIL_HEADER).split("@")[0];
}
public static void validateUpdate(Double previousVersion, Double newVersion, UpdateType updateType) {
if (updateType == UpdateType.NO_CHANGE) {
assertEquals(previousVersion, newVersion); // No change in the version
} else if (updateType == UpdateType.MINOR_UPDATE) {
assertEquals(previousVersion + 0.1, newVersion); // Minor version change
} else if (updateType == UpdateType.MAJOR_UPDATE) {
assertEquals(previousVersion + 1.0, newVersion); // Minor version change
}
}
}