Added EntityDAO as the base interface for all entities

This commit is contained in:
sureshms 2021-10-20 21:59:45 -07:00
parent aec36fe664
commit 1952501a6f
6 changed files with 161 additions and 53 deletions

View File

@ -48,6 +48,7 @@ public class CatalogGenericExceptionMapper implements ExceptionMapper<Throwable>
@Override
public Response toResponse(Throwable ex) {
LOG.info("Exception ex", ex);
if (ex instanceof ProcessingException || ex instanceof IllegalArgumentException) {
final Response response = BadRequestException.of().getResponse();
return Response.fromResponse(response)

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.customizer.Define;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.util.JsonUtils;
import java.io.IOException;
import java.util.List;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
public interface EntityDAO<T> {
// TODO javadoc
String getTableName();
Class<T> getEntityClass();
@SqlUpdate("INSERT INTO <table> (json) VALUES (:json)")
void insert(@Define("table") String table, @Bind("json") String json);
@SqlUpdate("UPDATE <table> SET json = :json WHERE id = :id")
void update(@Define("table") String table, @Bind("id") String id, @Bind("json") String json);
@SqlQuery("SELECT json FROM <table> WHERE id = :id")
String findById(@Define("table") String table, @Bind("id") String id);
String findByName(String table, String name);
int listCount(String table, String databaseFQN); // TODO check this
List<String> listBefore(String table, String fqn, int limit, String before);
List<String> listAfter(String table, String databaseFQN, int limit, String after);
boolean exists(String table, String id);
int delete(String table, String id);
default void insert(String json) {
insert(getTableName(), json);
}
default void update(String id, String json) {
update(getTableName(), id, json);
}
default T findEntityById(String id) throws IOException {
Class<T> clz = getEntityClass();
String json = findById(getTableName(), id);
T entity = null;
if (json != null) {
entity = JsonUtils.readValue(json, clz);
}
if (entity == null) {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(clz.getSimpleName(), id));
}
return entity;
}
default T findEntityByName(String fqn) throws IOException {
Class<T> clz = getEntityClass();
String json = findByName(getTableName(), fqn);
T entity = null;
if (json != null) {
entity = JsonUtils.readValue(json, clz);
}
if (entity == null) {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(clz.getSimpleName(), fqn));
}
return entity;
}
default String findJsonById(String fqn) throws IOException {
return findById(getTableName(), fqn);
}
default String findJsonByFqn(String fqn) throws IOException {
return findByName(getTableName(), fqn);
}
default int listCount(String databaseFQN) {
return listCount(getTableName(), databaseFQN);
}
default List<String> listBefore(String databaseFQN, int limit, String before) {
return listBefore(getTableName(), databaseFQN, limit, before);
}
default List<String> listAfter(String databaseFQN, int limit, String after) {
return listAfter(getTableName(), databaseFQN, limit, after);
}
default boolean exists(String id) {
return exists(getTableName(), id);
}
default int delete(String id) {
int rowsDeleted = delete(getTableName(), id);
if (rowsDeleted <= 0) {
throw EntityNotFoundException.byMessage(entityNotFound(getEntityClass().getSimpleName(), id));
}
return rowsDeleted;
}
}

View File

@ -5,18 +5,24 @@ import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.customizer.Define;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.openmetadata.catalog.entity.data.Table;
import java.util.List;
public interface TableDAO3 extends EntityDAO {
public interface TableDAO3 extends EntityDAO<Table> {
@Override
default String getTableName() {
return "table_entity";
}
@Override
default Class<Table> getEntityClass() {
return Table.class;
}
@Override
@SqlQuery("SELECT json FROM <table> WHERE fullyQualifiedName = :tableFQN")
String findByFqn(@Define("table") String table, @Bind("tableFQN") String tableFQN);
String findByName(@Define("table") String table, @Bind("tableFQN") String tableFQN);
@Override
@SqlQuery("SELECT count(*) FROM <table> WHERE " +

View File

@ -17,11 +17,6 @@
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.CreateSqlObject;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import java.util.List;
public interface TableRepository3 {
@CreateSqlObject

View File

@ -72,7 +72,7 @@ import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityN
import static org.openmetadata.catalog.jdbi3.Relationship.JOINED_WITH;
import static org.openmetadata.common.utils.CommonUtil.parseDate;
public class TableRepositoryHelper {
public class TableRepositoryHelper implements EntityRepository<Table> {
static final Logger LOG = LoggerFactory.getLogger(TableRepositoryHelper.class);
// Table fields that can be patched in a PATCH request
static final Fields TABLE_PATCH_FIELDS = new Fields(TableResource.FIELD_LIST,
@ -88,7 +88,6 @@ public class TableRepositoryHelper {
// TODO initialize
private TableRepository3 tableRepo3;
EntityRepository<Table> entityRepository = new EntityRepository<>() {
@Override
public List<String> listAfter(String fqnPrefix, int limitParam, String after) {
return tableRepo3.tableDAO().listAfter(fqnPrefix, limitParam, after);
@ -110,8 +109,21 @@ public class TableRepositoryHelper {
}
@Override
public Table setFields(Table entity, Fields fields) throws IOException, ParseException {
return TableRepositoryHelper.this.setFields(entity, fields);
public Table setFields(Table table, Fields fields) throws IOException, ParseException {
table.setColumns(fields.contains("columns") ? table.getColumns() : null);
table.setTableConstraints(fields.contains("tableConstraints") ? table.getTableConstraints() : null);
table.setOwner(fields.contains("owner") ? getOwner(table) : null);
table.setFollowers(fields.contains("followers") ? getFollowers(table) : null);
table.setUsageSummary(fields.contains("usageSummary") ? EntityUtil.getLatestUsage(tableRepo3.usageDAO(), table.getId()) :
null);
table.setDatabase(fields.contains("database") ? EntityUtil.getEntityReference(getDatabase(table)) : null);
table.setTags(fields.contains("tags") ? getTags(table.getFullyQualifiedName()) : null);
getColumnTags(fields.contains("tags"), table.getColumns());
table.setJoins(fields.contains("joins") ? getJoins(table) : null);
table.setSampleData(fields.contains("sampleData") ? getSampleData(table) : null);
table.setViewDefinition(fields.contains("viewDefinition") ? table.getViewDefinition() : null);
table.setTableProfile(fields.contains("tableProfile") ? getTableProfile(table): null);
return table;
}
@Override
@ -119,7 +131,6 @@ public class TableRepositoryHelper {
throws GeneralSecurityException, UnsupportedEncodingException {
return new TableList(entities, beforeCursor, afterCursor, total);
}
};
public static String getFQN(Table table) {
return (table.getDatabase().getName() + "." + table.getName());
@ -128,24 +139,23 @@ public class TableRepositoryHelper {
@Transaction
public ResultList<Table> listAfter(Fields fields, String databaseFQN, int limitParam, String after)
throws IOException, ParseException, GeneralSecurityException {
return EntityUtil.listAfter(entityRepository, Table.class, fields, databaseFQN, limitParam, after);
return EntityUtil.listAfter(this, Table.class, fields, databaseFQN, limitParam, after);
}
@Transaction
public ResultList<Table> listBefore(Fields fields, String databaseFQN, int limitParam, String before)
throws IOException, ParseException, GeneralSecurityException {
return EntityUtil.listBefore(entityRepository, Table.class, fields, databaseFQN, limitParam, before);
return EntityUtil.listBefore(this, Table.class, fields, databaseFQN, limitParam, before);
}
@Transaction
public Table get(String id, Fields fields) throws IOException, ParseException {
return setFields(validateTable(id), fields);
return setFields(tableRepo3.tableDAO().findEntityById(id), fields);
}
@Transaction
public Table getByName(String fqn, Fields fields) throws IOException, ParseException {
Table table = EntityUtil.validate(fqn, tableRepo3.tableDAO().findByFqn(fqn), Table.class);
return setFields(table, fields);
return setFields(tableRepo3.tableDAO().findEntityByName(fqn), fields);
}
@Transaction
@ -156,7 +166,7 @@ public class TableRepositoryHelper {
@Transaction
public Table patch(String id, String user, JsonPatch patch) throws IOException, ParseException {
Table original = setFields(validateTable(id), TABLE_PATCH_FIELDS);
Table original = setFields(tableRepo3.tableDAO().findEntityById(id), TABLE_PATCH_FIELDS);
Table updated = JsonUtils.applyPatch(original, patch, Table.class);
updated.withUpdatedBy(user).withUpdatedAt(new Date());
patch(original, updated);
@ -165,17 +175,15 @@ public class TableRepositoryHelper {
@Transaction
public void delete(String id) {
if (tableRepo3.tableDAO().delete(id) <= 0) {
throw EntityNotFoundException.byMessage(entityNotFound(Entity.TABLE, id));
}
// Remove all relationships
tableRepo3.relationshipDAO().deleteAll(id);
tableRepo3.tableDAO().delete(id);
tableRepo3.relationshipDAO().deleteAll(id); // Remove all relationships
}
@Transaction
public PutResponse<Table> createOrUpdate(Table updated, UUID databaseId) throws IOException, ParseException {
validateRelationships(updated, databaseId);
Table stored = JsonUtils.readValue(tableRepo3.tableDAO().findByFqn(updated.getFullyQualifiedName()), Table.class);
Table stored = JsonUtils.readValue(tableRepo3.tableDAO().findJsonByFqn(updated.getFullyQualifiedName()),
Table.class);
if (stored == null) {
return new PutResponse<>(Status.CREATED, createInternal(updated));
}
@ -199,7 +207,7 @@ public class TableRepositoryHelper {
@Transaction
public Status addFollower(String tableId, String userId) throws IOException {
EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class);
tableRepo3.tableDAO().findEntityById(tableId);
return EntityUtil.addFollower(tableRepo3.relationshipDAO(), tableRepo3.userDAO(), tableId, Entity.TABLE, userId, Entity.USER) ?
Status.CREATED : Status.OK;
}
@ -207,7 +215,7 @@ public class TableRepositoryHelper {
@Transaction
public void addJoins(String tableId, TableJoins joins) throws IOException, ParseException {
// Validate the request content
Table table = EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class);
Table table = tableRepo3.tableDAO().findEntityById(tableId);
if (!CommonUtil.dateInRange(RestUtil.DATE_FORMAT, joins.getStartDate(), 0, 30)) {
throw new IllegalArgumentException("Date range can only include past 30 days starting today");
}
@ -228,7 +236,7 @@ public class TableRepositoryHelper {
@Transaction
public void addSampleData(String tableId, TableData tableData) throws IOException {
// Validate the request content
Table table = EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class);
Table table = tableRepo3.tableDAO().findEntityById(tableId);
// Validate all the columns
for (String columnName : tableData.getColumns()) {
@ -249,7 +257,7 @@ public class TableRepositoryHelper {
@Transaction
public void addTableProfileData(String tableId, TableProfile tableProfile) throws IOException {
// Validate the request content
Table table = EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class);
Table table = tableRepo3.tableDAO().findEntityById(tableId);
List<TableProfile> storedTableProfiles = getTableProfile(table);
Map<String, TableProfile> storedMapTableProfiles = new HashMap<>();
@ -444,27 +452,6 @@ public class TableRepositoryHelper {
return EntityUtil.validate(databaseId, tableRepo3.databaseDAO().findById(databaseId), Database.class);
}
private Table validateTable(String tableId) throws IOException {
return EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class);
}
private Table setFields(Table table, Fields fields) throws IOException, ParseException {
table.setColumns(fields.contains("columns") ? table.getColumns() : null);
table.setTableConstraints(fields.contains("tableConstraints") ? table.getTableConstraints() : null);
table.setOwner(fields.contains("owner") ? getOwner(table) : null);
table.setFollowers(fields.contains("followers") ? getFollowers(table) : null);
table.setUsageSummary(fields.contains("usageSummary") ? EntityUtil.getLatestUsage(tableRepo3.usageDAO(), table.getId()) :
null);
table.setDatabase(fields.contains("database") ? EntityUtil.getEntityReference(getDatabase(table)) : null);
table.setTags(fields.contains("tags") ? getTags(table.getFullyQualifiedName()) : null);
getColumnTags(fields.contains("tags"), table.getColumns());
table.setJoins(fields.contains("joins") ? getJoins(table) : null);
table.setSampleData(fields.contains("sampleData") ? getSampleData(table) : null);
table.setViewDefinition(fields.contains("viewDefinition") ? table.getViewDefinition() : null);
table.setTableProfile(fields.contains("tableProfile") ? getTableProfile(table): null);
return table;
}
private EntityReference getOwner(Table table) throws IOException {
return table == null ? null : EntityUtil.populateOwner(table.getId(), tableRepo3.relationshipDAO(), tableRepo3.userDAO(), tableRepo3.teamDAO());
}
@ -516,7 +503,7 @@ public class TableRepositoryHelper {
for (JoinedWith joinedWith : joinedWithList) {
// Validate table
String tableFQN = getTableFQN(joinedWith.getFullyQualifiedName());
Table joinedWithTable = EntityUtil.validate(tableFQN, tableRepo3.tableDAO().findByFqn(tableFQN), Table.class);
Table joinedWithTable = tableRepo3.tableDAO().findEntityByName(tableFQN);
// Validate column
validateColumnFQN(joinedWithTable, joinedWith.getFullyQualifiedName());

View File

@ -399,7 +399,7 @@ public final class EntityUtil {
String entity = ref.getType();
String id = ref.getId().toString();
if (entity.equalsIgnoreCase(Entity.TABLE)) {
Table instance = EntityUtil.validate(id, tableDAO3.findById(id), Table.class);
Table instance = tableDAO3.findEntityById(id);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.DATABASE)) {
Database instance = EntityUtil.validate(id, databaseDAO.findById(id), Database.class);
@ -499,7 +499,7 @@ public final class EntityUtil {
PipelineDAO pipelineDAO)
throws IOException {
if (entity.equalsIgnoreCase(Entity.TABLE)) {
Table instance = EntityUtil.validate(fqn, tableDAO3.findByFqn(fqn), Table.class);
Table instance = tableDAO3.findEntityByName(fqn);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.DATABASE)) {
Database instance = EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class);
@ -618,7 +618,7 @@ public final class EntityUtil {
} else if (entityType.equalsIgnoreCase(Entity.TEAM)) {
return getEntityReference(EntityUtil.validate(fqn, teamDAO.findByName(fqn), Team.class));
} else if (entityType.equalsIgnoreCase(Entity.TABLE)) {
return getEntityReference(EntityUtil.validate(fqn, tableDAO3.findByFqn(fqn), Table.class));
return getEntityReference(tableDAO3.findEntityByName(fqn));
} else if (entityType.equalsIgnoreCase(Entity.DATABASE)) {
return getEntityReference(EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class));
} else if (entityType.equalsIgnoreCase(Entity.METRICS)) {