Handle object store tags & Create abstract EntityUpdaterWithColumn (#10548)

* Handle object store tags

* Set major update in test

* Handle datamodel updates

* Rename column updater
This commit is contained in:
Pere Miquel Brull 2023-03-15 09:21:44 +01:00 committed by GitHub
parent 7e4ba4567b
commit d84a2b3ed7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 167 additions and 147 deletions

View File

@ -7,7 +7,6 @@ import static org.openmetadata.service.Entity.FIELD_FOLLOWERS;
import static org.openmetadata.service.Entity.FIELD_TAGS;
import static org.openmetadata.service.Entity.OBJECT_STORE_SERVICE;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Collections;
@ -174,11 +173,11 @@ public class ContainerRepository extends EntityRepository<Container> {
@Override
public EntityUpdater getUpdater(Container original, Container updated, Operation operation) {
return new ContainerRepository.ContainerUpdater(original, updated, operation);
return new ContainerUpdater(original, updated, operation);
}
/** Handles entity updated from PUT and POST operations */
public class ContainerUpdater extends EntityUpdater {
public class ContainerUpdater extends ColumnEntityUpdater {
public ContainerUpdater(Container original, Container updated, Operation operation) {
super(original, updated, operation);
}
@ -188,8 +187,23 @@ public class ContainerRepository extends EntityRepository<Container> {
updateDataModel(original, updated);
}
private void updateDataModel(Container original, Container updated) throws JsonProcessingException {
recordChange("dataModel", original.getDataModel(), updated.getDataModel(), true);
private void updateDataModel(Container original, Container updated) throws IOException {
if (original.getDataModel() == null || updated.getDataModel() == null) {
recordChange("dataModel", original.getDataModel(), updated.getDataModel(), true);
}
if (original.getDataModel() != null && updated.getDataModel() != null) {
updateColumns(
"dataModel.columns",
original.getDataModel().getColumns(),
updated.getDataModel().getColumns(),
EntityUtil.columnMatch);
recordChange(
"dataModel.partition",
original.getDataModel().getIsPartitioned(),
updated.getDataModel().getIsPartitioned());
}
}
}
}

View File

@ -32,6 +32,7 @@ import static org.openmetadata.service.util.EntityUtil.compareTagLabel;
import static org.openmetadata.service.util.EntityUtil.entityReferenceMatch;
import static org.openmetadata.service.util.EntityUtil.fieldAdded;
import static org.openmetadata.service.util.EntityUtil.fieldDeleted;
import static org.openmetadata.service.util.EntityUtil.getColumnField;
import static org.openmetadata.service.util.EntityUtil.getExtensionField;
import static org.openmetadata.service.util.EntityUtil.nextMajorVersion;
import static org.openmetadata.service.util.EntityUtil.nextVersion;
@ -56,6 +57,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.json.JsonPatch;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
@ -72,6 +75,7 @@ import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Column;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EventType;
@ -91,7 +95,6 @@ import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityVersionPair;
import org.openmetadata.service.jdbi3.CollectionDAO.ExtensionRecord;
import org.openmetadata.service.jdbi3.TableRepository.TableUpdater;
import org.openmetadata.service.resources.tags.TagLabelCache;
import org.openmetadata.service.security.policyevaluator.SubjectCache;
import org.openmetadata.service.util.EntityUtil;
@ -1205,7 +1208,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
* Common entity attributes such as description, displayName, owner, tags are handled by this class. Override {@code
* entitySpecificUpdate()} to add additional entity specific fields to be updated.
*
* @see TableUpdater#entitySpecificUpdate() for example.
* @see TableRepository.TableUpdater#entitySpecificUpdate() for example.
*/
public class EntityUpdater {
protected final T original;
@ -1593,4 +1596,132 @@ public abstract class EntityRepository<T extends EntityInterface> {
return Boolean.TRUE.equals(updatingUser.getIsBot());
}
}
/** Handle column-specific updates for entities such as Tables, Containers' dataModel or Dashboard Model Entities. */
abstract class ColumnEntityUpdater extends EntityUpdater {
public ColumnEntityUpdater(T original, T updated, Operation operation) {
super(original, updated, operation);
}
public void updateColumns(
String fieldName,
List<Column> origColumns,
List<Column> updatedColumns,
BiPredicate<Column, Column> columnMatch)
throws IOException {
List<Column> deletedColumns = new ArrayList<>();
List<Column> addedColumns = new ArrayList<>();
recordListChange(fieldName, origColumns, updatedColumns, addedColumns, deletedColumns, columnMatch);
// carry forward tags and description if deletedColumns matches added column
Map<String, Column> addedColumnMap =
addedColumns.stream().collect(Collectors.toMap(Column::getName, Function.identity()));
for (Column deleted : deletedColumns) {
if (addedColumnMap.containsKey(deleted.getName())) {
Column addedColumn = addedColumnMap.get(deleted.getName());
if (nullOrEmpty(addedColumn.getDescription())) {
addedColumn.setDescription(deleted.getDescription());
}
if (nullOrEmpty(addedColumn.getTags()) && nullOrEmpty(deleted.getTags())) {
addedColumn.setTags(deleted.getTags());
}
}
}
// Delete tags related to deleted columns
deletedColumns.forEach(
deleted -> daoCollection.tagUsageDAO().deleteTagsByTarget(deleted.getFullyQualifiedName()));
// Add tags related to newly added columns
for (Column added : addedColumns) {
applyTags(added.getTags(), added.getFullyQualifiedName());
}
// Carry forward the user generated metadata from existing columns to new columns
for (Column updated : updatedColumns) {
// Find stored column matching name, data type and ordinal position
Column stored = origColumns.stream().filter(c -> columnMatch.test(c, updated)).findAny().orElse(null);
if (stored == null) { // New column added
continue;
}
updateColumnDescription(stored, updated);
updateColumnDisplayName(stored, updated);
updateColumnDataLength(stored, updated);
updateColumnPrecision(stored, updated);
updateColumnScale(stored, updated);
updateTags(
stored.getFullyQualifiedName(),
EntityUtil.getFieldName(fieldName, updated.getName(), FIELD_TAGS),
stored.getTags(),
updated.getTags());
updateColumnConstraint(stored, updated);
if (updated.getChildren() != null && stored.getChildren() != null) {
String childrenFieldName = EntityUtil.getFieldName(fieldName, updated.getName());
updateColumns(childrenFieldName, stored.getChildren(), updated.getChildren(), columnMatch);
}
}
majorVersionChange = majorVersionChange || !deletedColumns.isEmpty();
}
private void updateColumnDescription(Column origColumn, Column updatedColumn) throws JsonProcessingException {
if (operation.isPut() && !nullOrEmpty(origColumn.getDescription()) && updatedByBot()) {
// Revert the non-empty task description if being updated by a bot
updatedColumn.setDescription(origColumn.getDescription());
return;
}
String columnField = getColumnField(original, origColumn, FIELD_DESCRIPTION);
recordChange(columnField, origColumn.getDescription(), updatedColumn.getDescription());
}
private void updateColumnDisplayName(Column origColumn, Column updatedColumn) throws JsonProcessingException {
if (operation.isPut() && !nullOrEmpty(origColumn.getDescription()) && updatedByBot()) {
// Revert the non-empty task description if being updated by a bot
updatedColumn.setDisplayName(origColumn.getDisplayName());
return;
}
String columnField = getColumnField(original, origColumn, FIELD_DISPLAY_NAME);
recordChange(columnField, origColumn.getDisplayName(), updatedColumn.getDisplayName());
}
private void updateColumnConstraint(Column origColumn, Column updatedColumn) throws JsonProcessingException {
String columnField = getColumnField(original, origColumn, "constraint");
recordChange(columnField, origColumn.getConstraint(), updatedColumn.getConstraint());
}
protected void updateColumnDataLength(Column origColumn, Column updatedColumn) throws JsonProcessingException {
String columnField = getColumnField(original, origColumn, "dataLength");
boolean updated = recordChange(columnField, origColumn.getDataLength(), updatedColumn.getDataLength());
if (updated
&& (origColumn.getDataLength() == null || updatedColumn.getDataLength() < origColumn.getDataLength())) {
// The data length of a column was reduced or added. Treat it as backward-incompatible change
majorVersionChange = true;
}
}
private void updateColumnPrecision(Column origColumn, Column updatedColumn) throws JsonProcessingException {
String columnField = getColumnField(original, origColumn, "precision");
boolean updated = recordChange(columnField, origColumn.getPrecision(), updatedColumn.getPrecision());
if (origColumn.getPrecision() != null
&& updated
&& updatedColumn.getPrecision() < origColumn.getPrecision()) { // Previously precision was set
// The precision was reduced. Treat it as backward-incompatible change
majorVersionChange = true;
}
}
private void updateColumnScale(Column origColumn, Column updatedColumn) throws JsonProcessingException {
String columnField = getColumnField(original, origColumn, "scale");
boolean updated = recordChange(columnField, origColumn.getScale(), updatedColumn.getScale());
if (origColumn.getScale() != null
&& updated
&& updatedColumn.getScale() < origColumn.getScale()) { // Previously scale was set
// The scale was reduced. Treat it as backward-incompatible change
majorVersionChange = true;
}
}
}
}

View File

@ -19,14 +19,11 @@ import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.DATABASE_SCHEMA;
import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME;
import static org.openmetadata.service.Entity.FIELD_FOLLOWERS;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import static org.openmetadata.service.Entity.FIELD_TAGS;
import static org.openmetadata.service.Entity.LOCATION;
import static org.openmetadata.service.Entity.TABLE;
import static org.openmetadata.service.util.EntityUtil.getColumnField;
import static org.openmetadata.service.util.LambdaExceptionUtil.ignoringComparator;
import static org.openmetadata.service.util.LambdaExceptionUtil.rethrowFunction;
@ -45,8 +42,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -1044,7 +1039,7 @@ public class TableRepository extends EntityRepository<Table> {
}
/** Handles entity updated from PUT and POST operation. */
public class TableUpdater extends EntityUpdater {
public class TableUpdater extends ColumnEntityUpdater {
public TableUpdater(Table original, Table updated, Operation operation) {
super(original, updated, operation);
}
@ -1074,125 +1069,5 @@ public class TableRepository extends EntityRepository<Table> {
recordListChange(
"tableConstraints", origConstraints, updatedConstraints, added, deleted, EntityUtil.tableConstraintMatch);
}
private void updateColumns(
String fieldName,
List<Column> origColumns,
List<Column> updatedColumns,
BiPredicate<Column, Column> columnMatch)
throws IOException {
List<Column> deletedColumns = new ArrayList<>();
List<Column> addedColumns = new ArrayList<>();
recordListChange(fieldName, origColumns, updatedColumns, addedColumns, deletedColumns, columnMatch);
// carry forward tags and description if deletedColumns matches added column
Map<String, Column> addedColumnMap =
addedColumns.stream().collect(Collectors.toMap(Column::getName, Function.identity()));
for (Column deleted : deletedColumns) {
if (addedColumnMap.containsKey(deleted.getName())) {
Column addedColumn = addedColumnMap.get(deleted.getName());
if (nullOrEmpty(addedColumn.getDescription())) {
addedColumn.setDescription(deleted.getDescription());
}
if (nullOrEmpty(addedColumn.getTags()) && nullOrEmpty(deleted.getTags())) {
addedColumn.setTags(deleted.getTags());
}
}
}
// Delete tags related to deleted columns
deletedColumns.forEach(
deleted -> daoCollection.tagUsageDAO().deleteTagsByTarget(deleted.getFullyQualifiedName()));
// Add tags related to newly added columns
for (Column added : addedColumns) {
applyTags(added.getTags(), added.getFullyQualifiedName());
}
// Carry forward the user generated metadata from existing columns to new columns
for (Column updated : updatedColumns) {
// Find stored column matching name, data type and ordinal position
Column stored = origColumns.stream().filter(c -> columnMatch.test(c, updated)).findAny().orElse(null);
if (stored == null) { // New column added
continue;
}
updateColumnDescription(stored, updated);
updateColumnDisplayName(stored, updated);
updateColumnDataLength(stored, updated);
updateColumnPrecision(stored, updated);
updateColumnScale(stored, updated);
updateTags(
stored.getFullyQualifiedName(),
EntityUtil.getFieldName(fieldName, updated.getName(), FIELD_TAGS),
stored.getTags(),
updated.getTags());
updateColumnConstraint(stored, updated);
if (updated.getChildren() != null && stored.getChildren() != null) {
String childrenFieldName = EntityUtil.getFieldName(fieldName, updated.getName());
updateColumns(childrenFieldName, stored.getChildren(), updated.getChildren(), columnMatch);
}
}
majorVersionChange = majorVersionChange || !deletedColumns.isEmpty();
}
private void updateColumnDescription(Column origColumn, Column updatedColumn) throws JsonProcessingException {
if (operation.isPut() && !nullOrEmpty(origColumn.getDescription()) && updatedByBot()) {
// Revert the non-empty task description if being updated by a bot
updatedColumn.setDescription(origColumn.getDescription());
return;
}
String columnField = getColumnField(original, origColumn, FIELD_DESCRIPTION);
recordChange(columnField, origColumn.getDescription(), updatedColumn.getDescription());
}
private void updateColumnDisplayName(Column origColumn, Column updatedColumn) throws JsonProcessingException {
if (operation.isPut() && !nullOrEmpty(origColumn.getDescription()) && updatedByBot()) {
// Revert the non-empty task description if being updated by a bot
updatedColumn.setDisplayName(origColumn.getDisplayName());
return;
}
String columnField = getColumnField(original, origColumn, FIELD_DISPLAY_NAME);
recordChange(columnField, origColumn.getDisplayName(), updatedColumn.getDisplayName());
}
private void updateColumnConstraint(Column origColumn, Column updatedColumn) throws JsonProcessingException {
String columnField = getColumnField(original, origColumn, "constraint");
recordChange(columnField, origColumn.getConstraint(), updatedColumn.getConstraint());
}
protected void updateColumnDataLength(Column origColumn, Column updatedColumn) throws JsonProcessingException {
String columnField = getColumnField(original, origColumn, "dataLength");
boolean updated = recordChange(columnField, origColumn.getDataLength(), updatedColumn.getDataLength());
if (updated
&& (origColumn.getDataLength() == null || updatedColumn.getDataLength() < origColumn.getDataLength())) {
// The data length of a column was reduced or added. Treat it as backward-incompatible change
majorVersionChange = true;
}
}
private void updateColumnPrecision(Column origColumn, Column updatedColumn) throws JsonProcessingException {
String columnField = getColumnField(original, origColumn, "precision");
boolean updated = recordChange(columnField, origColumn.getPrecision(), updatedColumn.getPrecision());
if (origColumn.getPrecision() != null
&& updated
&& updatedColumn.getPrecision() < origColumn.getPrecision()) { // Previously precision was set
// The precision was reduced. Treat it as backward-incompatible change
majorVersionChange = true;
}
}
private void updateColumnScale(Column origColumn, Column updatedColumn) throws JsonProcessingException {
String columnField = getColumnField(original, origColumn, "scale");
boolean updated = recordChange(columnField, origColumn.getScale(), updatedColumn.getScale());
if (origColumn.getScale() != null
&& updated
&& updatedColumn.getScale() < origColumn.getScale()) { // Previously scale was set
// The scale was reduced. Treat it as backward-incompatible change
majorVersionChange = true;
}
}
}
}

View File

@ -328,10 +328,11 @@ public final class EntityUtil {
}
/** Return column field name of format "columns".columnName.columnFieldName */
public static String getColumnField(Table table, Column column, String columnField) {
public static <T extends EntityInterface> String getColumnField(
T entityWithColumns, Column column, String columnField) {
// Remove table FQN from column FQN to get the local name
String localColumnName =
EntityUtil.getLocalColumnName(table.getFullyQualifiedName(), column.getFullyQualifiedName());
EntityUtil.getLocalColumnName(entityWithColumns.getFullyQualifiedName(), column.getFullyQualifiedName());
return columnField == null
? FullyQualifiedName.build("columns", localColumnName)
: FullyQualifiedName.build("columns", localColumnName, columnField);

View File

@ -19,8 +19,6 @@ import static org.openmetadata.service.util.TestUtils.assertListNull;
import static org.openmetadata.service.util.TestUtils.assertResponse;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -36,6 +34,7 @@ import org.junit.jupiter.api.TestMethodOrder;
import org.openmetadata.schema.api.data.CreateContainer;
import org.openmetadata.schema.entity.data.Container;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.Column;
import org.openmetadata.schema.type.ColumnDataType;
import org.openmetadata.schema.type.ContainerDataModel;
import org.openmetadata.schema.type.ContainerFileFormat;
@ -52,14 +51,14 @@ import org.openmetadata.service.util.TestUtils;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class ContainerResourceTest extends EntityResourceTest<Container, CreateContainer> {
public static final List<Column> dataModelColumns =
List.of(
getColumn(C1, BIGINT, USER_ADDRESS_TAG_LABEL),
getColumn(C2, ColumnDataType.VARCHAR, USER_ADDRESS_TAG_LABEL).withDataLength(10),
getColumn(C3, BIGINT, GLOSSARY1_TERM1_LABEL));
public static final ContainerDataModel PARTITIONED_DATA_MODEL =
new ContainerDataModel()
.withIsPartitioned(true)
.withColumns(
Arrays.asList(
getColumn(C1, BIGINT, USER_ADDRESS_TAG_LABEL),
getColumn(C2, ColumnDataType.VARCHAR, USER_ADDRESS_TAG_LABEL).withDataLength(10),
getColumn(C3, BIGINT, GLOSSARY1_TERM1_LABEL)));
new ContainerDataModel().withIsPartitioned(true).withColumns(dataModelColumns);
public static final List<ContainerFileFormat> FILE_FORMATS = List.of(ContainerFileFormat.Parquet);
@ -154,9 +153,9 @@ public class ContainerResourceTest extends EntityResourceTest<Container, CreateC
Container container = createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
ChangeDescription change = getChangeDescription(container.getVersion());
ContainerDataModel newDataModel =
new ContainerDataModel().withIsPartitioned(false).withColumns(Collections.emptyList());
fieldUpdated(change, "dataModel", PARTITIONED_DATA_MODEL, newDataModel);
// We are removing the columns here. This is a major change
ContainerDataModel newDataModel = PARTITIONED_DATA_MODEL.withIsPartitioned(false);
fieldUpdated(change, "dataModel.partition", true, false);
updateAndCheckEntity(request.withDataModel(newDataModel), OK, ADMIN_AUTH_HEADERS, MINOR_UPDATE, change);
}