Create a relationship to manage ER based on tableConstraints (#18892)

* Create a relationship to manage ER based on tableConstraints

* Add relationship/remove relationship from entity_relationship table whenever table constriants are updated/added , validate table constraints

* remove findRelatedTables queries

* Add Migrations to add constrait relationship

* remove findRelatedTables code

* Fix postgres migration query

* fix pg migration

* fix test

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Sriharsha Chintalapani 2024-12-03 20:47:34 -08:00 committed by GitHub
parent 98799ba7f8
commit bf00c9c12e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 326 additions and 32 deletions

View File

@ -31,7 +31,7 @@ public class EntityNotFoundException extends WebServiceException {
super(Response.Status.NOT_FOUND, ERROR_TYPE, message);
}
private EntityNotFoundException(String message, Throwable cause) {
public EntityNotFoundException(String message, Throwable cause) {
super(Response.Status.NOT_FOUND, ERROR_TYPE, message, cause);
}

View File

@ -2741,25 +2741,6 @@ public interface CollectionDAO {
return listAfter(
getTableName(), filter.getQueryParams(), condition, condition, limit, afterName, afterId);
}
@ConnectionAwareSqlQuery(
value =
"SELECT json FROM table_entity "
+ "WHERE JSON_SEARCH(JSON_EXTRACT(json, '$.tableConstraints[*].referredColumns'), "
+ "'one', :fqn) IS NOT NULL",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT json "
+ "FROM table_entity "
+ "WHERE EXISTS ("
+ " SELECT 1"
+ " FROM jsonb_array_elements(json->'tableConstraints') AS constraints"
+ " CROSS JOIN jsonb_array_elements_text(constraints->'referredColumns') AS referredColumn "
+ " WHERE referredColumn LIKE :fqn"
+ ")",
connectionType = POSTGRES)
List<String> findRelatedTables(@Bind("fqn") String fqn);
}
interface StoredProcedureDAO extends EntityDAO<StoredProcedure> {

View File

@ -40,6 +40,7 @@ import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -89,6 +90,7 @@ import org.openmetadata.schema.type.csv.CsvDocumentation;
import org.openmetadata.schema.type.csv.CsvFile;
import org.openmetadata.schema.type.csv.CsvHeader;
import org.openmetadata.schema.type.csv.CsvImportResult;
import org.openmetadata.sdk.exception.EntitySpecViolationException;
import org.openmetadata.sdk.exception.SuggestionException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
@ -676,6 +678,7 @@ public class TableRepository extends EntityRepository<Table> {
.withDatabase(schema.getDatabase())
.withService(schema.getService())
.withServiceType(schema.getServiceType());
validateTableConstraints(table);
}
@Override
@ -693,6 +696,8 @@ public class TableRepository extends EntityRepository<Table> {
// Restore the relationships
table.withColumns(columnWithTags).withService(service);
// Store ER relationships based on table constraints
addConstraintRelationship(table, table.getTableConstraints());
}
@Override
@ -1117,6 +1122,33 @@ public class TableRepository extends EntityRepository<Table> {
return customMetrics;
}
private void validateTableConstraints(Table table) {
if (!nullOrEmpty(table.getTableConstraints())) {
Set<TableConstraint> constraintSet = new HashSet<>();
for (TableConstraint constraint : table.getTableConstraints()) {
if (!constraintSet.add(constraint)) {
throw new EntitySpecViolationException(
"Duplicate constraint found in request: " + constraint);
}
for (String column : constraint.getColumns()) {
validateColumn(table, column);
}
if (!nullOrEmpty(constraint.getReferredColumns())) {
for (String column : constraint.getReferredColumns()) {
String toParent = FullyQualifiedName.getParentFQN(column);
String columnName = FullyQualifiedName.getColumnName(column);
try {
Table toTable = findByName(toParent, NON_DELETED);
validateColumn(toTable, columnName);
} catch (EntityNotFoundException e) {
throw new EntitySpecViolationException("Table not found: " + toParent);
}
}
}
}
}
}
/** Handles entity updated from PUT and POST operation. */
public class TableUpdater extends ColumnEntityUpdater {
public TableUpdater(Table original, Table updated, Operation operation) {
@ -1129,7 +1161,7 @@ public class TableRepository extends EntityRepository<Table> {
Table updatedTable = updated;
DatabaseUtil.validateColumns(updatedTable.getColumns());
recordChange("tableType", origTable.getTableType(), updatedTable.getTableType());
updateConstraints(origTable, updatedTable);
updateTableConstraints(origTable, updatedTable, operation);
updateColumns(
COLUMN_FIELD, origTable.getColumns(), updated.getColumns(), EntityUtil.columnMatch);
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
@ -1138,10 +1170,26 @@ public class TableRepository extends EntityRepository<Table> {
recordChange("locationPath", original.getLocationPath(), updated.getLocationPath());
}
private void updateConstraints(Table origTable, Table updatedTable) {
private void updateTableConstraints(Table origTable, Table updatedTable, Operation operation) {
validateTableConstraints(updatedTable);
if (operation.isPatch()
&& !nullOrEmpty(updatedTable.getTableConstraints())
&& !nullOrEmpty(origTable.getTableConstraints())) {
List<TableConstraint> newConstraints = new ArrayList<>();
for (TableConstraint constraint : updatedTable.getTableConstraints()) {
TableConstraint existing =
origTable.getTableConstraints().stream()
.filter(c -> EntityUtil.tableConstraintMatch.test(c, constraint))
.findAny()
.orElse(null);
if (existing == null) {
newConstraints.add(constraint);
}
}
checkDuplicateTableConstraints(origTable, newConstraints);
}
List<TableConstraint> origConstraints = listOrEmpty(origTable.getTableConstraints());
List<TableConstraint> updatedConstraints = listOrEmpty(updatedTable.getTableConstraints());
origConstraints.sort(EntityUtil.compareTableConstraint);
origConstraints.stream().map(TableConstraint::getColumns).forEach(Collections::sort);
@ -1157,6 +1205,66 @@ public class TableRepository extends EntityRepository<Table> {
added,
deleted,
EntityUtil.tableConstraintMatch);
// manage table ER relationship based on table constraints
addConstraintRelationship(origTable, added);
deleteConstraintRelationship(origTable, deleted);
}
}
private void checkDuplicateTableConstraints(
Table origTable, List<TableConstraint> newConstraints) {
if (!nullOrEmpty(origTable.getTableConstraints()) && !nullOrEmpty(newConstraints)) {
Set<TableConstraint> origConstraints =
new HashSet<>(listOrEmpty(origTable.getTableConstraints()));
for (TableConstraint constraint : newConstraints) {
if (!origConstraints.add(constraint)) {
throw new EntitySpecViolationException("Table Constraint is Duplicate: " + constraint);
}
}
}
}
private void addConstraintRelationship(Table table, List<TableConstraint> constraints) {
if (!nullOrEmpty(constraints)) {
for (TableConstraint constraint : constraints) {
if (!nullOrEmpty(constraint.getReferredColumns())) {
for (String column : constraint.getReferredColumns()) {
String toParent = FullyQualifiedName.getParentFQN(column);
try {
EntityReference toTable =
Entity.getEntityReferenceByName(TABLE, toParent, NON_DELETED);
addRelationship(
table.getId(), toTable.getId(), TABLE, TABLE, Relationship.RELATED_TO);
} catch (EntityNotFoundException e) {
throw EntityNotFoundException.byName(
String.format(
"Failed to add table constraint due to missing table %s", toParent));
}
}
}
}
}
}
private void deleteConstraintRelationship(Table table, List<TableConstraint> constraints) {
if (!nullOrEmpty(constraints)) {
for (TableConstraint constraint : constraints) {
if (!nullOrEmpty(constraint.getReferredColumns())) {
for (String column : constraint.getReferredColumns()) {
String toParent = FullyQualifiedName.getParentFQN(column);
try {
EntityReference toTable = Entity.getEntityReferenceByName(TABLE, toParent, ALL);
deleteRelationship(
table.getId(), TABLE, toTable.getId(), TABLE, Relationship.RELATED_TO);
} catch (EntityNotFoundException e) {
throw EntityNotFoundException.byName(
String.format(
"Failed to add table constraint due to missing table %s", toParent));
}
}
}
}
}
}

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.migration.mysql.v160;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addDisplayNameToCustomProperty;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addEditGlossaryTermsToDataConsumerPolicy;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addRelationsForTableConstraints;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addViewAllRuleToOrgPolicy;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.migrateServiceTypesAndConnections;
@ -22,5 +23,6 @@ public class Migration extends MigrationProcessImpl {
addViewAllRuleToOrgPolicy(collectionDAO);
addEditGlossaryTermsToDataConsumerPolicy(collectionDAO);
addDisplayNameToCustomProperty(handle, false);
addRelationsForTableConstraints(handle, false);
}
}

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.migration.postgres.v160;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addDisplayNameToCustomProperty;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addEditGlossaryTermsToDataConsumerPolicy;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addRelationsForTableConstraints;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addViewAllRuleToOrgPolicy;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.migrateServiceTypesAndConnections;
@ -22,5 +23,6 @@ public class Migration extends MigrationProcessImpl {
addViewAllRuleToOrgPolicy(collectionDAO);
addEditGlossaryTermsToDataConsumerPolicy(collectionDAO);
addDisplayNameToCustomProperty(handle, true);
addRelationsForTableConstraints(handle, true);
}
}

View File

@ -1,20 +1,31 @@
package org.openmetadata.service.migration.utils.v160;
import static org.openmetadata.common.utils.CommonUtil.listOf;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.type.Include.NON_DELETED;
import static org.openmetadata.service.Entity.TABLE;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.policies.Policy;
import org.openmetadata.schema.entity.policies.accessControl.Rule;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.TableConstraint;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.PolicyRepository;
import org.openmetadata.service.jdbi3.TableRepository;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
@ -114,6 +125,89 @@ public class MigrationUtil {
}
}
public static void addRelationsForTableConstraints(Handle handle, boolean postgresql) {
LOG.info("Starting table constraint relationship migration");
final int batchSize = 1000;
int offset = 0;
String fetchQuery =
"SELECT id, json FROM table_entity "
+ "WHERE JSON_LENGTH(JSON_EXTRACT(json, '$.tableConstraints')) > 0 "
+ "AND JSON_LENGTH(JSON_EXTRACT(json, '$.tableConstraints[*].referredColumns')) > 0 "
+ "LIMIT :limit OFFSET :offset";
if (postgresql) {
fetchQuery =
"SELECT id, json FROM table_entity "
+ "WHERE jsonb_typeof(json->'tableConstraints') = 'array' "
+ "AND jsonb_array_length(json->'tableConstraints') > 0 "
+ "AND EXISTS ("
+ " SELECT 1 FROM jsonb_array_elements(json->'tableConstraints') AS tc "
+ " WHERE jsonb_typeof(tc->'referredColumns') = 'array' "
+ " AND jsonb_array_length(tc->'referredColumns') > 0"
+ ") "
+ "LIMIT :limit OFFSET :offset";
}
TableRepository tableRepository = (TableRepository) Entity.getEntityRepository(TABLE);
while (true) {
List<Map<String, Object>> tables =
handle
.createQuery(fetchQuery)
.bind("limit", batchSize)
.bind("offset", offset)
.mapToMap()
.list();
if (tables.isEmpty()) {
break;
}
for (Map<String, Object> tableRow : tables) {
String tableId = (String) tableRow.get("id");
String json = tableRow.get("json").toString();
try {
Table table = JsonUtils.readValue(json, Table.class);
addConstraintRelationship(table, table.getTableConstraints(), tableRepository);
} catch (Exception e) {
LOG.error("Error processing table ID '{}': {}", tableId, e.getMessage());
}
}
offset += batchSize;
LOG.debug("Processed of table constraint up to offset {}", offset);
}
}
private static void addConstraintRelationship(
Table table, List<TableConstraint> constraints, TableRepository tableRepository) {
if (!nullOrEmpty(constraints)) {
for (TableConstraint constraint : constraints) {
if (!nullOrEmpty(constraint.getReferredColumns())) {
List<EntityReference> relationships =
tableRepository.findTo(table.getId(), TABLE, Relationship.RELATED_TO, TABLE);
Map<UUID, EntityReference> relatedTables = new HashMap<>();
relationships.forEach(r -> relatedTables.put(r.getId(), r));
for (String column : constraint.getReferredColumns()) {
String toParent = FullyQualifiedName.getParentFQN(column);
try {
EntityReference toTable =
Entity.getEntityReferenceByName(TABLE, toParent, NON_DELETED);
if (!relatedTables.containsKey(toTable.getId())) {
tableRepository.addRelationship(
table.getId(), toTable.getId(), TABLE, TABLE, Relationship.RELATED_TO);
}
} catch (EntityNotFoundException e) {
throw EntityNotFoundException.byName(
String.format(
"Failed to add table constraint due to missing table %s", toParent));
}
}
}
}
}
}
public static void migrateServiceTypesAndConnections(Handle handle, boolean postgresql) {
LOG.info("Starting service type and connection type migrations");
try {

View File

@ -247,12 +247,15 @@ public interface SearchIndex {
// We need to query the table_entity table to find the references this current table
// has with other tables. We pick this info from the ES however in case of re-indexing this info
// needs to be picked from the db
CollectionDAO dao = Entity.getCollectionDAO();
List<String> json_array =
dao.tableDAO().findRelatedTables(entity.getFullyQualifiedName() + "%");
for (String json : json_array) {
Table foreign_table = JsonUtils.readValue(json, Table.class);
processConstraints(foreign_table, entity, constraints, false);
List<CollectionDAO.EntityRelationshipRecord> relatedTables =
Entity.getCollectionDAO()
.relationshipDAO()
.findFrom(entity.getId(), Entity.TABLE, Relationship.RELATED_TO.ordinal());
for (CollectionDAO.EntityRelationshipRecord table : relatedTables) {
Table foreignTable =
Entity.getEntity(Entity.TABLE, table.getId(), "tableConstraints", NON_DELETED);
processConstraints(foreignTable, entity, constraints, false);
}
return constraints;
}

View File

@ -34,7 +34,7 @@ class EnumBackwardCompatibilityTest {
/** */
@Test
void testRelationshipEnumBackwardCompatible() {
assertEquals(22, Relationship.values().length);
assertEquals(23, Relationship.values().length);
assertEquals(21, Relationship.DEFAULTS_TO.ordinal());
assertEquals(20, Relationship.EDITED_BY.ordinal());
assertEquals(19, Relationship.EXPERT.ordinal());

View File

@ -2954,6 +2954,48 @@ public class TableResourceTest extends EntityResourceTest<Table, CreateTable> {
ADMIN_AUTH_HEADERS);
}
@Test
void put_tableTableConstraintDuplicate_400(TestInfo test) throws IOException {
// Create table with a constraint
CreateTable request =
createRequest(test)
.withColumns(List.of(getColumn(C1, BIGINT, USER_ADDRESS_TAG_LABEL)))
.withTableConstraints(null);
Table table = createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
// Attempt to add duplicate constraints
TableConstraint constraint =
new TableConstraint().withConstraintType(ConstraintType.UNIQUE).withColumns(List.of(C1));
request = request.withTableConstraints(List.of(constraint, constraint)); // Duplicate constraint
CreateTable finalRequest = request;
assertResponseContains(
() -> updateEntity(finalRequest, OK, ADMIN_AUTH_HEADERS),
BAD_REQUEST,
"Duplicate constraint found in request: ");
}
@Test
void put_tableTableConstraintInvalidColumn_400(TestInfo test) throws IOException {
CreateTable request =
createRequest(test)
.withColumns(List.of(getColumn(C1, BIGINT, USER_ADDRESS_TAG_LABEL)))
.withTableConstraints(null);
Table table = createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
TableConstraint constraint =
new TableConstraint()
.withConstraintType(ConstraintType.UNIQUE)
.withColumns(List.of("invalid_column")); // Non-existent column
request = request.withTableConstraints(List.of(constraint));
CreateTable finalRequest = request;
assertResponseContains(
() -> updateEntity(finalRequest, OK, ADMIN_AUTH_HEADERS),
BAD_REQUEST,
"Invalid column name found in table constraint");
}
void assertFields(List<Table> tableList, String fieldsParam) {
tableList.forEach(t -> assertFields(t, fieldsParam));
}

View File

@ -0,0 +1,30 @@
package org.openmetadata.sdk.exception;
import javax.ws.rs.core.Response;
public class EntitySpecViolationException extends WebServiceException {
private static final String BY_NAME_MESSAGE = "Entity Spec Violation [%s] due to [%s].";
private static final String ERROR_TYPE = "ENTITY_SPEC_VIOLATION";
public EntitySpecViolationException(String message) {
super(Response.Status.BAD_REQUEST, ERROR_TYPE, message);
}
public EntitySpecViolationException(Response.Status status, String message) {
super(status, ERROR_TYPE, message);
}
public static EntitySpecViolationException byMessage(
String name, String errorMessage, Response.Status status) {
return new EntitySpecViolationException(status, buildMessageByName(name, errorMessage));
}
public static EntitySpecViolationException byMessage(String name, String errorMessage) {
return new EntitySpecViolationException(
Response.Status.BAD_REQUEST, buildMessageByName(name, errorMessage));
}
private static String buildMessageByName(String name, String errorMessage) {
return String.format(BY_NAME_MESSAGE, name, errorMessage);
}
}

View File

@ -0,0 +1,30 @@
package org.openmetadata.sdk.exception;
import javax.ws.rs.core.Response;
public class EntityUpdateException extends WebServiceException {
private static final String BY_NAME_MESSAGE = "Entity Update Exception [%s] due to [%s].";
private static final String ERROR_TYPE = "ENTITY_UPDATE_EXCEPTION";
public EntityUpdateException(String message) {
super(Response.Status.BAD_REQUEST, ERROR_TYPE, message);
}
public EntityUpdateException(Response.Status status, String message) {
super(status, ERROR_TYPE, message);
}
public static EntityUpdateException byMessage(
String name, String errorMessage, Response.Status status) {
return new EntityUpdateException(status, buildMessageByName(name, errorMessage));
}
public static EntityUpdateException byMessage(String name, String errorMessage) {
return new EntityUpdateException(
Response.Status.BAD_REQUEST, buildMessageByName(name, errorMessage));
}
private static String buildMessageByName(String name, String errorMessage) {
return String.format(BY_NAME_MESSAGE, name, errorMessage);
}
}

View File

@ -33,7 +33,8 @@
"voted",
"expert",
"editedBy",
"defaultsTo"
"defaultsTo",
"relatesTo"
],
"javaEnums": [
{ "name": "CONTAINS" },
@ -57,7 +58,8 @@
{ "name": "VOTED" },
{ "name": "EXPERT" },
{ "name": "EDITED_BY" },
{ "name": "DEFAULTS_TO" }
{ "name": "DEFAULTS_TO" },
{ "name": "RELATES_TO" }
]
}
},