mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-25 18:30:00 +00:00
Merge pull request #569 from open-metadata/issue567
This commit is contained in:
commit
fe1e3c5ac5
@ -72,7 +72,6 @@ import java.util.Map.Entry;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
|
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
|
||||||
import static org.openmetadata.catalog.jdbi3.Relationship.JOINED_WITH;
|
import static org.openmetadata.catalog.jdbi3.Relationship.JOINED_WITH;
|
||||||
@ -332,22 +331,39 @@ public abstract class TableRepository {
|
|||||||
validateRelationships(table, db, owner);
|
validateRelationships(table, db, owner);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setColumnFQN(String parentFQN, List<Column> columns) {
|
||||||
|
columns.forEach(c -> {
|
||||||
|
String columnFqn = parentFQN + "." + c.getName();
|
||||||
|
c.setFullyQualifiedName(columnFqn);
|
||||||
|
if (c.getChildren() != null) {
|
||||||
|
setColumnFQN(columnFqn, c.getChildren());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateTags(List<Column> columns) {
|
||||||
|
columns.forEach(column -> {
|
||||||
|
EntityUtil.validateTags(tagDAO(), column.getTags());
|
||||||
|
if (column.getChildren() != null) {
|
||||||
|
validateTags(column.getChildren());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private void validateRelationships(Table table, Database database, EntityReference owner) throws IOException {
|
private void validateRelationships(Table table, Database database, EntityReference owner) throws IOException {
|
||||||
// Set data in table entity based on database relationship
|
// Set data in table entity based on database relationship
|
||||||
table.setDatabase(EntityUtil.getEntityReference(database));
|
table.setDatabase(EntityUtil.getEntityReference(database));
|
||||||
table.setFullyQualifiedName(database.getFullyQualifiedName() + "." + table.getName());
|
table.setFullyQualifiedName(database.getFullyQualifiedName() + "." + table.getName());
|
||||||
for (Column c : table.getColumns()) {
|
setColumnFQN(table.getFullyQualifiedName(), table.getColumns());
|
||||||
c.setFullyQualifiedName(table.getFullyQualifiedName() + "." + c.getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if owner is valid and set the relationship
|
// Check if owner is valid and set the relationship
|
||||||
table.setOwner(EntityUtil.populateOwner(userDAO(), teamDAO(), owner));
|
table.setOwner(EntityUtil.populateOwner(userDAO(), teamDAO(), owner));
|
||||||
|
|
||||||
// Check table tags
|
// Validate table tags
|
||||||
EntityUtil.validateTags(tagDAO(), table.getTags());
|
EntityUtil.validateTags(tagDAO(), table.getTags());
|
||||||
|
|
||||||
// Check column tags
|
// Validate column tags
|
||||||
table.getColumns().forEach(column -> EntityUtil.validateTags(tagDAO(), column.getTags()));
|
validateTags(table.getColumns());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void storeTable(Table table, boolean update) throws JsonProcessingException {
|
private void storeTable(Table table, boolean update) throws JsonProcessingException {
|
||||||
@ -360,8 +376,8 @@ public abstract class TableRepository {
|
|||||||
table.withOwner(null).withDatabase(null).withHref(null).withTags(null);
|
table.withOwner(null).withDatabase(null).withHref(null).withTags(null);
|
||||||
|
|
||||||
// Don't store column tags as JSON but build it on the fly based on relationships
|
// Don't store column tags as JSON but build it on the fly based on relationships
|
||||||
Map<String, List<TagLabel>> columnTags = table.getColumns().stream()
|
List<Column> columnWithTags = table.getColumns();
|
||||||
.filter(c -> c.getTags() != null).collect(Collectors.toMap(Column::getName, Column::getTags, (a, b) -> b));
|
table.setColumns(cloneWithoutTags(columnWithTags));
|
||||||
table.getColumns().forEach(column -> column.setTags(null));
|
table.getColumns().forEach(column -> column.setTags(null));
|
||||||
|
|
||||||
if (update) {
|
if (update) {
|
||||||
@ -370,9 +386,28 @@ public abstract class TableRepository {
|
|||||||
tableDAO().insert(JsonUtils.pojoToJson(table));
|
tableDAO().insert(JsonUtils.pojoToJson(table));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore the relatinships
|
// Restore the relationships
|
||||||
table.withOwner(owner).withDatabase(database).withTags(tags);
|
table.withOwner(owner).withDatabase(database).withTags(tags);
|
||||||
table.getColumns().forEach(c -> c.withTags(columnTags.get(c.getName())));
|
table.setColumns(columnWithTags);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Column> cloneWithoutTags(List<Column> columns) {
|
||||||
|
List<Column> copy = new ArrayList<>();
|
||||||
|
columns.forEach(c -> copy.add(cloneWithoutTags(c)));
|
||||||
|
return copy;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Column cloneWithoutTags(Column column) {
|
||||||
|
List<Column> children = cloneWithoutTags(column.getChildren());
|
||||||
|
return new Column().withDescription(column.getDescription()).withName(column.getName())
|
||||||
|
.withFullyQualifiedName(column.getFullyQualifiedName())
|
||||||
|
.withArrayDataType(column.getArrayDataType())
|
||||||
|
.withColumnConstraint(column.getColumnConstraint())
|
||||||
|
.withColumnDataTypeDisplay(column.getColumnDataTypeDisplay())
|
||||||
|
.withColumnDataType(column.getColumnDataType())
|
||||||
|
.withDataLength(column.getDataLength())
|
||||||
|
.withOrdinalPosition(column.getOrdinalPosition())
|
||||||
|
.withChildren(children);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addRelationships(Table table) throws IOException {
|
private void addRelationships(Table table) throws IOException {
|
||||||
@ -381,7 +416,7 @@ public abstract class TableRepository {
|
|||||||
relationshipDAO().insert(databaseId, table.getId().toString(), Entity.DATABASE, Entity.TABLE,
|
relationshipDAO().insert(databaseId, table.getId().toString(), Entity.DATABASE, Entity.TABLE,
|
||||||
Relationship.CONTAINS.ordinal());
|
Relationship.CONTAINS.ordinal());
|
||||||
|
|
||||||
// Add owner relationship
|
// Add table owner relationship
|
||||||
EntityUtil.setOwner(relationshipDAO(), table.getId(), Entity.TABLE, table.getOwner());
|
EntityUtil.setOwner(relationshipDAO(), table.getId(), Entity.TABLE, table.getOwner());
|
||||||
|
|
||||||
// Add tag to table relationship
|
// Add tag to table relationship
|
||||||
@ -396,16 +431,22 @@ public abstract class TableRepository {
|
|||||||
applyTags(updatedTable);
|
applyTags(updatedTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void applyTags(List<Column> columns) throws IOException {
|
||||||
|
// Add column level tags by adding tag to column relationship
|
||||||
|
for (Column column : columns) {
|
||||||
|
EntityUtil.applyTags(tagDAO(), column.getTags(), column.getFullyQualifiedName());
|
||||||
|
column.setTags(getTags(column.getFullyQualifiedName())); // Update tag list to handle derived tags
|
||||||
|
if (column.getChildren() != null) {
|
||||||
|
applyTags(column.getChildren());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void applyTags(Table table) throws IOException {
|
private void applyTags(Table table) throws IOException {
|
||||||
// Add table level tags by adding tag to table relationship
|
// Add table level tags by adding tag to table relationship
|
||||||
EntityUtil.applyTags(tagDAO(), table.getTags(), table.getFullyQualifiedName());
|
EntityUtil.applyTags(tagDAO(), table.getTags(), table.getFullyQualifiedName());
|
||||||
table.setTags(getTags(table.getFullyQualifiedName())); // Update tag to handle additional derived tags
|
table.setTags(getTags(table.getFullyQualifiedName())); // Update tag to handle additional derived tags
|
||||||
|
applyTags(table.getColumns());
|
||||||
// Add column level tags by adding tag to column relationship
|
|
||||||
for (Column column : table.getColumns()) {
|
|
||||||
EntityUtil.applyTags(tagDAO(), column.getTags(), column.getFullyQualifiedName());
|
|
||||||
column.setTags(getTags(column.getFullyQualifiedName())); // Update tag list to handle derived tags
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -459,7 +500,7 @@ public abstract class TableRepository {
|
|||||||
null);
|
null);
|
||||||
table.setDatabase(fields.contains("database") ? EntityUtil.getEntityReference(getDatabase(table)) : null);
|
table.setDatabase(fields.contains("database") ? EntityUtil.getEntityReference(getDatabase(table)) : null);
|
||||||
table.setTags(fields.contains("tags") ? getTags(table.getFullyQualifiedName()) : null);
|
table.setTags(fields.contains("tags") ? getTags(table.getFullyQualifiedName()) : null);
|
||||||
getColumnTags(fields.contains("tags"), table);
|
getColumnTags(fields.contains("tags"), table.getColumns());
|
||||||
table.setJoins(fields.contains("joins") ? getJoins(table) : null);
|
table.setJoins(fields.contains("joins") ? getJoins(table) : null);
|
||||||
table.setSampleData(fields.contains("sampleData") ? getSampleData(table) : null);
|
table.setSampleData(fields.contains("sampleData") ? getSampleData(table) : null);
|
||||||
table.setViewDefinition(fields.contains("viewDefinition") ? table.getViewDefinition() : null);
|
table.setViewDefinition(fields.contains("viewDefinition") ? table.getViewDefinition() : null);
|
||||||
@ -475,9 +516,9 @@ public abstract class TableRepository {
|
|||||||
return table == null ? null : EntityUtil.getFollowers(table.getId(), relationshipDAO(), userDAO());
|
return table == null ? null : EntityUtil.getFollowers(table.getId(), relationshipDAO(), userDAO());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateColumns(Table storedTable, Table updatedTable) {
|
//TODO modified columns
|
||||||
List<Column> storedColumns = storedTable.getColumns();
|
private void updateColumns(List<Column> storedColumns, List<Column> updatedColumns, List<Column> addedColumns,
|
||||||
List<Column> updatedColumns = updatedTable.getColumns();
|
List<Column> deletedColumns) {
|
||||||
// Carry forward the user generated metadata from existing columns to new columns
|
// Carry forward the user generated metadata from existing columns to new columns
|
||||||
for (Column updated : updatedColumns) {
|
for (Column updated : updatedColumns) {
|
||||||
// Find stored column matching name, data type and ordinal position
|
// Find stored column matching name, data type and ordinal position
|
||||||
@ -489,8 +530,8 @@ public abstract class TableRepository {
|
|||||||
.orElse(null);
|
.orElse(null);
|
||||||
if (stored == null) {
|
if (stored == null) {
|
||||||
// TODO versioning of schema
|
// TODO versioning of schema
|
||||||
// TODO identify column that was deleted
|
addedColumns.add(updated);
|
||||||
LOG.info("Table {} has new column {}", storedTable.getFullyQualifiedName(), updated.getName());
|
LOG.info("Column {} was newly added", updated.getFullyQualifiedName());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -499,20 +540,46 @@ public abstract class TableRepository {
|
|||||||
updated.setDescription(stored.getDescription()); // Carry forward non-empty description
|
updated.setDescription(stored.getDescription()); // Carry forward non-empty description
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove all tags for the table and columns
|
||||||
EntityUtil.removeTagsByPrefix(tagDAO(), stored.getFullyQualifiedName());
|
EntityUtil.removeTagsByPrefix(tagDAO(), stored.getFullyQualifiedName());
|
||||||
//update tags
|
|
||||||
|
// Update tags
|
||||||
updated.setTags(updated.getTags());
|
updated.setTags(updated.getTags());
|
||||||
|
|
||||||
|
if (updated.getChildren() != null && stored.getChildren() != null) {
|
||||||
|
updateColumns(stored.getChildren(), updated.getChildren(), addedColumns, deletedColumns);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
storedTable.setColumns(updatedColumns);
|
|
||||||
|
for (Column stored : storedColumns) {
|
||||||
|
// Find updated column matching name, data type and ordinal position
|
||||||
|
Column updated = storedColumns.stream()
|
||||||
|
.filter(s -> s.getName().equals(stored.getName()) &&
|
||||||
|
s.getColumnDataType() == stored.getColumnDataType() &&
|
||||||
|
Objects.equals(s.getOrdinalPosition(), stored.getOrdinalPosition()))
|
||||||
|
.findAny()
|
||||||
|
.orElse(null);
|
||||||
|
if (updated == null) {
|
||||||
|
// TODO versioning of schema addedColumns.add(stored);
|
||||||
|
deletedColumns.add(stored);
|
||||||
|
LOG.info("Column {} was deleted", stored.getFullyQualifiedName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateColumns(Table storedTable, Table updatedTable) {
|
||||||
|
updateColumns(storedTable.getColumns(), updatedTable.getColumns(), new ArrayList<>(), new ArrayList<>());
|
||||||
|
storedTable.setColumns(updatedTable.getColumns());
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<TagLabel> getTags(String fqn) {
|
private List<TagLabel> getTags(String fqn) {
|
||||||
return tagDAO().getTags(fqn);
|
return tagDAO().getTags(fqn);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void getColumnTags(boolean setTags, Table table) {
|
private void getColumnTags(boolean setTags, List<Column> columns) {
|
||||||
for (Column c : Optional.ofNullable(table.getColumns()).orElse(Collections.emptyList())) {
|
for (Column c : Optional.ofNullable(columns).orElse(Collections.emptyList())) {
|
||||||
c.setTags(setTags ? getTags(c.getFullyQualifiedName()) : null);
|
c.setTags(setTags ? getTags(c.getFullyQualifiedName()) : null);
|
||||||
|
getColumnTags(setTags, c.getChildren());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -587,11 +654,11 @@ public abstract class TableRepository {
|
|||||||
"table.columns.column", JOINED_WITH.ordinal());
|
"table.columns.column", JOINED_WITH.ordinal());
|
||||||
|
|
||||||
DailyCount dailyCount = new DailyCount().withCount(joinedWith.getJoinCount()).withDate(date);
|
DailyCount dailyCount = new DailyCount().withCount(joinedWith.getJoinCount()).withDate(date);
|
||||||
|
List<DailyCount> dailyCountList;
|
||||||
if (json == null) { // Create first entry
|
if (json == null) { // Create first entry
|
||||||
List<DailyCount> dailyCountList = Collections.singletonList(dailyCount);
|
dailyCountList = Collections.singletonList(dailyCount);
|
||||||
json = JsonUtils.pojoToJson(dailyCountList);
|
|
||||||
} else { // Update the existing entry
|
} else { // Update the existing entry
|
||||||
List<DailyCount> dailyCountList = JsonUtils.readObjects(json, DailyCount.class);
|
dailyCountList = JsonUtils.readObjects(json, DailyCount.class);
|
||||||
boolean foundDate = false;
|
boolean foundDate = false;
|
||||||
for (DailyCount d : dailyCountList) { // If the date already exists, update the count. Otherwise add a new entry
|
for (DailyCount d : dailyCountList) { // If the date already exists, update the count. Otherwise add a new entry
|
||||||
if (d.getDate().equals(dailyCount.getDate())) {
|
if (d.getDate().equals(dailyCount.getDate())) {
|
||||||
@ -624,8 +691,8 @@ public abstract class TableRepository {
|
|||||||
joinedWith.getFullyQualifiedName(), reportedOnDate);
|
joinedWith.getFullyQualifiedName(), reportedOnDate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
json = JsonUtils.pojoToJson(dailyCountList);
|
|
||||||
}
|
}
|
||||||
|
json = JsonUtils.pojoToJson(dailyCountList);
|
||||||
|
|
||||||
fieldRelationshipDAO().upsert(fromColumnFQN, toColumnFQN, "table.columns.column",
|
fieldRelationshipDAO().upsert(fromColumnFQN, toColumnFQN, "table.columns.column",
|
||||||
"table.columns.column", JOINED_WITH.ordinal(), "dailyCount", json);
|
"table.columns.column", JOINED_WITH.ordinal(), "dailyCount", json);
|
||||||
|
@ -45,6 +45,7 @@
|
|||||||
"SMALLINT",
|
"SMALLINT",
|
||||||
"INT",
|
"INT",
|
||||||
"BIGINT",
|
"BIGINT",
|
||||||
|
"BYTEINT",
|
||||||
"FLOAT",
|
"FLOAT",
|
||||||
"DOUBLE",
|
"DOUBLE",
|
||||||
"DECIMAL",
|
"DECIMAL",
|
||||||
@ -110,7 +111,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"columnName": {
|
"columnName": {
|
||||||
"description": "Local name (not fully qualified name) of the column.",
|
"description": "Local name (not fully qualified name) of the column. ColumnName is `-` when the column is not named in struct columnDataType. For example, BigQuery supports struct with unnamed fields",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"minLength": 1,
|
"minLength": 1,
|
||||||
"maxLength": 64,
|
"maxLength": 64,
|
||||||
@ -124,7 +125,7 @@
|
|||||||
"pattern": "^[^.]*$"
|
"pattern": "^[^.]*$"
|
||||||
},
|
},
|
||||||
"fullyQualifiedColumnName": {
|
"fullyQualifiedColumnName": {
|
||||||
"description": "Fully qualified name of the column that includes `serviceName.databaseName.tableName.columnName`.",
|
"description": "Fully qualified name of the column that includes `serviceName.databaseName.tableName.columnName[.nestedColumnName]`. When columnName is null for columnDataType struct fields, `field_#` where `#` is field index is used. For map columnDataType, for key the field name `key` is used and for the value field `value` is used.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"minLength": 1,
|
"minLength": 1,
|
||||||
"maxLength": 256
|
"maxLength": 256
|
||||||
@ -141,6 +142,16 @@
|
|||||||
"description": "Data type of the column (int, date etc.).",
|
"description": "Data type of the column (int, date etc.).",
|
||||||
"$ref": "#/definitions/columnDataType"
|
"$ref": "#/definitions/columnDataType"
|
||||||
},
|
},
|
||||||
|
"arrayDataType" : {
|
||||||
|
"description": "Data type used array in columnDataType. For example, `array<int>` has columnDataType as `array` and arrayDataType as `int`."
|
||||||
|
},
|
||||||
|
"dataLength" : {
|
||||||
|
"description": "Length of `char`, `varchar`, `binary`, `varbinary` `columnDataTypes`, else null. For example, `varchar(20)` has columnDataType as `varchar` and dataLength as `20`.",
|
||||||
|
"type": "integer"
|
||||||
|
},
|
||||||
|
"columnDataTypeDisplay" : {
|
||||||
|
"description" : "Display name used for columnDataType. This is useful for complex types, such as `array<int>, map<int,string>, struct<>, and union types."
|
||||||
|
},
|
||||||
"description": {
|
"description": {
|
||||||
"description": "Description of the column.",
|
"description": "Description of the column.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
@ -163,12 +174,24 @@
|
|||||||
"ordinalPosition": {
|
"ordinalPosition": {
|
||||||
"description": "Ordinal position of the column.",
|
"description": "Ordinal position of the column.",
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
|
},
|
||||||
|
"jsonSchema" : {
|
||||||
|
"description": "Json schema only if the columnDataType is JSON else null.",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"children" : {
|
||||||
|
"description": "Child columns if columnDataType or arrayDataType is `map`, `struct`, or `union` else `null`.",
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"$ref": "#/definitions/column"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"required": [
|
"required": [
|
||||||
"name",
|
"name",
|
||||||
"columnDataType"
|
"columnDataType"
|
||||||
]
|
],
|
||||||
|
"additionalProperties": false
|
||||||
},
|
},
|
||||||
"columnJoins": {
|
"columnJoins": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
|
@ -39,5 +39,6 @@
|
|||||||
"required": [
|
"required": [
|
||||||
"id",
|
"id",
|
||||||
"type"
|
"type"
|
||||||
]
|
],
|
||||||
|
"additionalProperties": false
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user