From f79061edcde7d060aad6ef1a94dc9da0b07fc62e Mon Sep 17 00:00:00 2001 From: sureshms Date: Thu, 16 Sep 2021 15:57:46 -0700 Subject: [PATCH] "Fix #567 Add APIs for support table complextType columns - array, struct, map, and union" --- .../catalog/jdbi3/TableRepository.java | 131 +++++++++++++----- .../json/schema/entity/data/table.json | 29 +++- .../json/schema/type/entityReference.json | 3 +- 3 files changed, 127 insertions(+), 36 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java index b942db4e4d9..a86ec6da14f 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java @@ -72,7 +72,6 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound; import static org.openmetadata.catalog.jdbi3.Relationship.JOINED_WITH; @@ -332,22 +331,39 @@ public abstract class TableRepository { validateRelationships(table, db, owner); } + private void setColumnFQN(String parentFQN, List columns) { + columns.forEach(c -> { + String columnFqn = parentFQN + "." + c.getName(); + c.setFullyQualifiedName(columnFqn); + if (c.getChildren() != null) { + setColumnFQN(columnFqn, c.getChildren()); + } + }); + } + + private void validateTags(List columns) { + columns.forEach(column -> { + EntityUtil.validateTags(tagDAO(), column.getTags()); + if (column.getChildren() != null) { + validateTags(column.getChildren()); + } + }); + } + private void validateRelationships(Table table, Database database, EntityReference owner) throws IOException { // Set data in table entity based on database relationship table.setDatabase(EntityUtil.getEntityReference(database)); table.setFullyQualifiedName(database.getFullyQualifiedName() + "." + table.getName()); - for (Column c : table.getColumns()) { - c.setFullyQualifiedName(table.getFullyQualifiedName() + "." + c.getName()); - } + setColumnFQN(table.getFullyQualifiedName(), table.getColumns()); // Check if owner is valid and set the relationship table.setOwner(EntityUtil.populateOwner(userDAO(), teamDAO(), owner)); - // Check table tags + // Validate table tags EntityUtil.validateTags(tagDAO(), table.getTags()); - // Check column tags - table.getColumns().forEach(column -> EntityUtil.validateTags(tagDAO(), column.getTags())); + // Validate column tags + validateTags(table.getColumns()); } private void storeTable(Table table, boolean update) throws JsonProcessingException { @@ -360,8 +376,8 @@ public abstract class TableRepository { table.withOwner(null).withDatabase(null).withHref(null).withTags(null); // Don't store column tags as JSON but build it on the fly based on relationships - Map> columnTags = table.getColumns().stream() - .filter(c -> c.getTags() != null).collect(Collectors.toMap(Column::getName, Column::getTags, (a, b) -> b)); + List columnWithTags = table.getColumns(); + table.setColumns(cloneWithoutTags(columnWithTags)); table.getColumns().forEach(column -> column.setTags(null)); if (update) { @@ -370,9 +386,28 @@ public abstract class TableRepository { tableDAO().insert(JsonUtils.pojoToJson(table)); } - // Restore the relatinships + // Restore the relationships table.withOwner(owner).withDatabase(database).withTags(tags); - table.getColumns().forEach(c -> c.withTags(columnTags.get(c.getName()))); + table.setColumns(columnWithTags); + } + + List cloneWithoutTags(List columns) { + List copy = new ArrayList<>(); + columns.forEach(c -> copy.add(cloneWithoutTags(c))); + return copy; + } + + private Column cloneWithoutTags(Column column) { + List children = cloneWithoutTags(column.getChildren()); + return new Column().withDescription(column.getDescription()).withName(column.getName()) + .withFullyQualifiedName(column.getFullyQualifiedName()) + .withArrayDataType(column.getArrayDataType()) + .withColumnConstraint(column.getColumnConstraint()) + .withColumnDataTypeDisplay(column.getColumnDataTypeDisplay()) + .withColumnDataType(column.getColumnDataType()) + .withDataLength(column.getDataLength()) + .withOrdinalPosition(column.getOrdinalPosition()) + .withChildren(children); } private void addRelationships(Table table) throws IOException { @@ -381,7 +416,7 @@ public abstract class TableRepository { relationshipDAO().insert(databaseId, table.getId().toString(), Entity.DATABASE, Entity.TABLE, Relationship.CONTAINS.ordinal()); - // Add owner relationship + // Add table owner relationship EntityUtil.setOwner(relationshipDAO(), table.getId(), Entity.TABLE, table.getOwner()); // Add tag to table relationship @@ -396,16 +431,22 @@ public abstract class TableRepository { applyTags(updatedTable); } + private void applyTags(List columns) throws IOException { + // Add column level tags by adding tag to column relationship + for (Column column : columns) { + EntityUtil.applyTags(tagDAO(), column.getTags(), column.getFullyQualifiedName()); + column.setTags(getTags(column.getFullyQualifiedName())); // Update tag list to handle derived tags + if (column.getChildren() != null) { + applyTags(column.getChildren()); + } + } + } + private void applyTags(Table table) throws IOException { // Add table level tags by adding tag to table relationship EntityUtil.applyTags(tagDAO(), table.getTags(), table.getFullyQualifiedName()); table.setTags(getTags(table.getFullyQualifiedName())); // Update tag to handle additional derived tags - - // Add column level tags by adding tag to column relationship - for (Column column : table.getColumns()) { - EntityUtil.applyTags(tagDAO(), column.getTags(), column.getFullyQualifiedName()); - column.setTags(getTags(column.getFullyQualifiedName())); // Update tag list to handle derived tags - } + applyTags(table.getColumns()); } /** @@ -459,7 +500,7 @@ public abstract class TableRepository { null); table.setDatabase(fields.contains("database") ? EntityUtil.getEntityReference(getDatabase(table)) : null); table.setTags(fields.contains("tags") ? getTags(table.getFullyQualifiedName()) : null); - getColumnTags(fields.contains("tags"), table); + getColumnTags(fields.contains("tags"), table.getColumns()); table.setJoins(fields.contains("joins") ? getJoins(table) : null); table.setSampleData(fields.contains("sampleData") ? getSampleData(table) : null); table.setViewDefinition(fields.contains("viewDefinition") ? table.getViewDefinition() : null); @@ -475,9 +516,9 @@ public abstract class TableRepository { return table == null ? null : EntityUtil.getFollowers(table.getId(), relationshipDAO(), userDAO()); } - private void updateColumns(Table storedTable, Table updatedTable) { - List storedColumns = storedTable.getColumns(); - List updatedColumns = updatedTable.getColumns(); + //TODO modified columns + private void updateColumns(List storedColumns, List updatedColumns, List addedColumns, + List deletedColumns) { // Carry forward the user generated metadata from existing columns to new columns for (Column updated : updatedColumns) { // Find stored column matching name, data type and ordinal position @@ -489,8 +530,8 @@ public abstract class TableRepository { .orElse(null); if (stored == null) { // TODO versioning of schema - // TODO identify column that was deleted - LOG.info("Table {} has new column {}", storedTable.getFullyQualifiedName(), updated.getName()); + addedColumns.add(updated); + LOG.info("Column {} was newly added", updated.getFullyQualifiedName()); continue; } @@ -499,20 +540,46 @@ public abstract class TableRepository { updated.setDescription(stored.getDescription()); // Carry forward non-empty description } + // Remove all tags for the table and columns EntityUtil.removeTagsByPrefix(tagDAO(), stored.getFullyQualifiedName()); - //update tags + + // Update tags updated.setTags(updated.getTags()); + + if (updated.getChildren() != null && stored.getChildren() != null) { + updateColumns(stored.getChildren(), updated.getChildren(), addedColumns, deletedColumns); + } } - storedTable.setColumns(updatedColumns); + + for (Column stored : storedColumns) { + // Find updated column matching name, data type and ordinal position + Column updated = storedColumns.stream() + .filter(s -> s.getName().equals(stored.getName()) && + s.getColumnDataType() == stored.getColumnDataType() && + Objects.equals(s.getOrdinalPosition(), stored.getOrdinalPosition())) + .findAny() + .orElse(null); + if (updated == null) { + // TODO versioning of schema addedColumns.add(stored); + deletedColumns.add(stored); + LOG.info("Column {} was deleted", stored.getFullyQualifiedName()); + } + } + } + + private void updateColumns(Table storedTable, Table updatedTable) { + updateColumns(storedTable.getColumns(), updatedTable.getColumns(), new ArrayList<>(), new ArrayList<>()); + storedTable.setColumns(updatedTable.getColumns()); } private List getTags(String fqn) { return tagDAO().getTags(fqn); } - private void getColumnTags(boolean setTags, Table table) { - for (Column c : Optional.ofNullable(table.getColumns()).orElse(Collections.emptyList())) { + private void getColumnTags(boolean setTags, List columns) { + for (Column c : Optional.ofNullable(columns).orElse(Collections.emptyList())) { c.setTags(setTags ? getTags(c.getFullyQualifiedName()) : null); + getColumnTags(setTags, c.getChildren()); } } @@ -587,11 +654,11 @@ public abstract class TableRepository { "table.columns.column", JOINED_WITH.ordinal()); DailyCount dailyCount = new DailyCount().withCount(joinedWith.getJoinCount()).withDate(date); + List dailyCountList; if (json == null) { // Create first entry - List dailyCountList = Collections.singletonList(dailyCount); - json = JsonUtils.pojoToJson(dailyCountList); + dailyCountList = Collections.singletonList(dailyCount); } else { // Update the existing entry - List dailyCountList = JsonUtils.readObjects(json, DailyCount.class); + dailyCountList = JsonUtils.readObjects(json, DailyCount.class); boolean foundDate = false; for (DailyCount d : dailyCountList) { // If the date already exists, update the count. Otherwise add a new entry if (d.getDate().equals(dailyCount.getDate())) { @@ -624,8 +691,8 @@ public abstract class TableRepository { joinedWith.getFullyQualifiedName(), reportedOnDate); } } - json = JsonUtils.pojoToJson(dailyCountList); } + json = JsonUtils.pojoToJson(dailyCountList); fieldRelationshipDAO().upsert(fromColumnFQN, toColumnFQN, "table.columns.column", "table.columns.column", JOINED_WITH.ordinal(), "dailyCount", json); diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json index 8d051191f99..f50cbb77f91 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json @@ -45,6 +45,7 @@ "SMALLINT", "INT", "BIGINT", + "BYTEINT", "FLOAT", "DOUBLE", "DECIMAL", @@ -110,7 +111,7 @@ } }, "columnName": { - "description": "Local name (not fully qualified name) of the column.", + "description": "Local name (not fully qualified name) of the column. ColumnName is `-` when the column is not named in struct columnDataType. For example, BigQuery supports struct with unnamed fields", "type": "string", "minLength": 1, "maxLength": 64, @@ -124,7 +125,7 @@ "pattern": "^[^.]*$" }, "fullyQualifiedColumnName": { - "description": "Fully qualified name of the column that includes `serviceName.databaseName.tableName.columnName`.", + "description": "Fully qualified name of the column that includes `serviceName.databaseName.tableName.columnName[.nestedColumnName]`. When columnName is null for columnDataType struct fields, `field_#` where `#` is field index is used. For map columnDataType, for key the field name `key` is used and for the value field `value` is used.", "type": "string", "minLength": 1, "maxLength": 256 @@ -141,6 +142,16 @@ "description": "Data type of the column (int, date etc.).", "$ref": "#/definitions/columnDataType" }, + "arrayDataType" : { + "description": "Data type used array in columnDataType. For example, `array` has columnDataType as `array` and arrayDataType as `int`." + }, + "dataLength" : { + "description": "Length of `char`, `varchar`, `binary`, `varbinary` `columnDataTypes`, else null. For example, `varchar(20)` has columnDataType as `varchar` and dataLength as `20`.", + "type": "integer" + }, + "columnDataTypeDisplay" : { + "description" : "Display name used for columnDataType. This is useful for complex types, such as `array, map, struct<>, and union types." + }, "description": { "description": "Description of the column.", "type": "string" @@ -163,12 +174,24 @@ "ordinalPosition": { "description": "Ordinal position of the column.", "type": "integer" + }, + "jsonSchema" : { + "description": "Json schema only if the columnDataType is JSON else null.", + "type": "string" + }, + "children" : { + "description": "Child columns if columnDataType or arrayDataType is `map`, `struct`, or `union` else `null`.", + "type": "array", + "items": { + "$ref": "#/definitions/column" + } } }, "required": [ "name", "columnDataType" - ] + ], + "additionalProperties": false }, "columnJoins": { "type": "object", diff --git a/catalog-rest-service/src/main/resources/json/schema/type/entityReference.json b/catalog-rest-service/src/main/resources/json/schema/type/entityReference.json index 60379cb539f..37f4aadd42e 100644 --- a/catalog-rest-service/src/main/resources/json/schema/type/entityReference.json +++ b/catalog-rest-service/src/main/resources/json/schema/type/entityReference.json @@ -39,5 +39,6 @@ "required": [ "id", "type" - ] + ], + "additionalProperties": false } \ No newline at end of file