Table entity migrated to jdbi3

This commit is contained in:
sureshms 2021-10-20 14:21:24 -07:00
parent dd12eb69a2
commit aec36fe664
29 changed files with 1291 additions and 147 deletions

View File

@ -35,6 +35,10 @@
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-jdbi</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-jdbi3</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>

View File

@ -20,7 +20,9 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import io.dropwizard.health.conf.HealthConfiguration;
import io.dropwizard.health.core.HealthCheckBundle;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.jersey.jackson.JsonProcessingExceptionMapper;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.catalog.events.EventFilter;
import org.openmetadata.catalog.exception.CatalogGenericExceptionMapper;
import org.openmetadata.catalog.exception.ConstraintViolationExceptionMapper;
@ -78,6 +80,9 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
final DBIFactory factory = new DBIFactory();
final DBI jdbi = factory.build(environment, catalogConfig.getDataSourceFactory(), "mysql");
final JdbiFactory factory3 = new JdbiFactory();
final Jdbi jdbi3 = factory3.build(environment, catalogConfig.getDataSourceFactory(), "mysql3");
// Register Authorizer
registerAuthorizer(catalogConfig, environment, jdbi);
@ -101,6 +106,7 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
environment.jersey().register(JsonMappingExceptionMapper.class);
environment.healthChecks().register("UserDatabaseCheck", new CatalogHealthCheck(catalogConfig, jdbi));
registerResources(catalogConfig, environment, jdbi);
registerResources(catalogConfig, environment, jdbi3);
// Register Event Handler
registerEventFilter(catalogConfig, environment, jdbi);
@ -166,6 +172,28 @@ public class CatalogApplication extends Application<CatalogApplicationConfig> {
jdbi.registerContainerFactory(new OptionalContainerFactory());
CollectionRegistry.getInstance().registerResources(jdbi, environment, authorizer);
environment.lifecycle().manage(new Managed() {
@Override
public void start() {
}
@Override
public void stop() {
long startTime = System.currentTimeMillis();
LOG.info("Took " + (System.currentTimeMillis() - startTime) + " ms to close all the services");
}
});
environment.jersey().register(new SearchResource(config.getElasticSearchConfiguration()));
environment.jersey().register(new JsonPatchProvider());
ErrorPageErrorHandler eph = new ErrorPageErrorHandler();
eph.addErrorPage(Response.Status.NOT_FOUND.getStatusCode(), "/");
environment.getApplicationContext().setErrorHandler(eph);
}
private void registerResources(CatalogApplicationConfig config, Environment environment, Jdbi jdbi) {
CollectionRegistry.getInstance().registerResources3(jdbi, environment, authorizer);
environment.lifecycle().manage(new Managed() {
@Override
public void start() {

View File

@ -55,6 +55,16 @@ public class CatalogGenericExceptionMapper implements ExceptionMapper<Throwable>
.entity(new ErrorMessage(response.getStatus(), ex.getLocalizedMessage()))
.build();
} else if (ex instanceof UnableToExecuteStatementException) {
// TODO remove this
if (ex.getCause() instanceof SQLIntegrityConstraintViolationException) {
return Response.status(CONFLICT)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(new ErrorMessage(CONFLICT.getStatusCode(), CatalogExceptionMessage.ENTITY_ALREADY_EXISTS))
.build();
}
} else if (ex instanceof org.jdbi.v3.core.statement.UnableToExecuteStatementException) {
// TODO remove this
if (ex.getCause() instanceof SQLIntegrityConstraintViolationException) {
return Response.status(CONFLICT)
.type(MediaType.APPLICATION_JSON_TYPE)

View File

@ -0,0 +1,51 @@
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.SqlObject;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import java.util.List;
public interface DatabaseDAO {
@SqlUpdate("INSERT INTO database_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);
@SqlUpdate("UPDATE database_entity SET json = :json where id = :id")
void update(@Bind("id") String id, @Bind("json") String json);
@SqlQuery("SELECT json FROM database_entity WHERE fullyQualifiedName = :name")
String findByFQN(@Bind("name") String name);
@SqlQuery("SELECT json FROM database_entity WHERE id = :id")
String findById(@Bind("id") String id);
@SqlQuery("SELECT count(*) FROM database_entity WHERE " +
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL)")
int listCount(@Bind("fqnPrefix") String fqnPrefix);
@SqlQuery(
"SELECT json FROM (" +
"SELECT fullyQualifiedName, json FROM database_entity WHERE " +
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +// Filter by service name
"fullyQualifiedName < :before " + // Pagination by database fullyQualifiedName
"ORDER BY fullyQualifiedName DESC " + // Pagination ordering by database fullyQualifiedName
"LIMIT :limit" +
") last_rows_subquery ORDER BY fullyQualifiedName")
List<String> listBefore(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit,
@Bind("before") String before);
@SqlQuery("SELECT json FROM database_entity WHERE " +
"(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +
"fullyQualifiedName > :after " +
"ORDER BY fullyQualifiedName " +
"LIMIT :limit")
List<String> listAfter(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit,
@Bind("after") String after);
@SqlQuery("SELECT EXISTS (SELECT * FROM database_entity WHERE id = :id)")
boolean exists(@Bind("id") String id);
@SqlUpdate("DELETE FROM database_entity WHERE id = :id")
int delete(@Bind("id") String id);
}

View File

@ -23,7 +23,6 @@ import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.services.DatabaseService;
import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
import org.openmetadata.catalog.jdbi3.UsageRepository.UsageDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
public interface EntityExtensionDAO3 {
@SqlUpdate("REPLACE INTO entity_extension(id, extension, jsonSchema, json) " +
"VALUES (:id, :extension, :jsonSchema, :json)")
void insert(@Bind("id") String id, @Bind("extension") String extension, @Bind("jsonSchema") String jsonSchema,
@Bind("json") String json);
@SqlQuery("SELECT json FROM entity_extension WHERE id = :id AND extension = :extension")
String getExtension(@Bind("id") String id, @Bind("extension") String extension);
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.openmetadata.catalog.type.EntityReference;
import java.util.List;
public interface EntityRelationshipDAO3 {
@SqlUpdate("INSERT IGNORE INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation) " +
"VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation)")
int insert(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("fromEntity") String fromEntity,
@Bind("toEntity") String toEntity, @Bind("relation") int relation);
//
// Find to operations
//
@SqlQuery("SELECT toId, toEntity FROM entity_relationship WHERE fromId = :fromId AND relation = :relation")
@RegisterRowMapper(ToEntityReferenceMapper3.class)
List<EntityReference> findTo(@Bind("fromId") String fromId, @Bind("relation") int relation);
@SqlQuery("SELECT toId FROM entity_relationship WHERE " +
"fromId = :fromId AND relation = :relation AND toEntity = :toEntity ORDER BY fromId")
List<String> findTo(@Bind("fromId") String fromId, @Bind("relation") int relation,
@Bind("toEntity") String toEntity);
@SqlQuery("SELECT count(*) FROM entity_relationship WHERE " +
"fromId = :fromId AND relation = :relation AND toEntity = :toEntity ORDER BY fromId")
int findToCount(@Bind("fromId") String fromId, @Bind("relation") int relation, @Bind("toEntity") String toEntity);
//
// Find from operations
//
@SqlQuery("SELECT fromId FROM entity_relationship WHERE " +
"toId = :toId AND relation = :relation AND fromEntity = :fromEntity ORDER BY fromId")
List<String> findFrom(@Bind("toId") String toId, @Bind("relation") int relation,
@Bind("fromEntity") String fromEntity);
@SqlQuery("SELECT fromId, fromEntity FROM entity_relationship WHERE toId = :toId AND relation = :relation " +
"ORDER BY fromId")
@RegisterRowMapper(FromEntityReferenceMapper3.class)
List<EntityReference> findFrom(@Bind("toId") String toId, @Bind("relation") int relation);
@SqlQuery("SELECT fromId, fromEntity FROM entity_relationship WHERE toId = :toId AND relation = :relation AND " +
"fromEntity = :fromEntity ORDER BY fromId")
@RegisterRowMapper(FromEntityReferenceMapper3.class)
List<EntityReference> findFromEntity(@Bind("toId") String toId, @Bind("relation") int relation,
@Bind("fromEntity") String fromEntity);
//
// Delete Operations
//
@SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND toId = :toId AND relation = :relation")
void delete(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("relation") int relation);
// Delete all the entity relationship fromID --- relation --> entity of type toEntity
@SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND relation = :relation AND toEntity = :toEntity")
void deleteFrom(@Bind("fromId") String fromId, @Bind("relation") int relation, @Bind("toEntity") String toEntity);
// Delete all the entity relationship fromID --- relation --> to any entity
@SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND relation = :relation")
void deleteFrom(@Bind("fromId") String fromId, @Bind("relation") int relation);
// Delete all the entity relationship toId <-- relation -- entity of type fromEntity
@SqlUpdate("DELETE from entity_relationship WHERE toId = :toId AND relation = :relation AND fromEntity = :fromEntity")
void deleteTo(@Bind("toId") String toId, @Bind("relation") int relation, @Bind("fromEntity") String fromEntity);
@SqlUpdate("DELETE from entity_relationship WHERE toId = :id OR fromId = :id")
void deleteAll(@Bind("id") String id);
}

View File

@ -23,7 +23,6 @@ import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardDAO;
import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO;
import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
public interface FieldRelationshipDAO3 {
@SqlUpdate("INSERT IGNORE INTO field_relationship(fromFQN, toFQN, fromType, toType, relation) " +
"VALUES (:fromFQN, :toFQN, :fromType, :toType, :relation)")
void insert(@Bind("fromFQN") String fromFQN, @Bind("toFQN") String toFQN, @Bind("fromType") String fromType,
@Bind("toType") String toType, @Bind("relation") int relation);
@SqlUpdate("INSERT INTO field_relationship(fromFQN, toFQN, fromType, toType, relation, jsonSchema, json) " +
"VALUES (:fromFQN, :toFQN, :fromType, :toType, :relation, :jsonSchema, :json) " +
"ON DUPLICATE KEY UPDATE json = :json")
void upsert(@Bind("fromFQN") String fromFQN, @Bind("toFQN") String toFQN, @Bind("fromType") String fromType,
@Bind("toType") String toType, @Bind("relation") int relation,
@Bind("jsonSchema") String jsonSchema, @Bind("json") String json);
@SqlQuery("SELECT json FROM field_relationship WHERE " +
"fromFQN = :fromFQN AND toFQN = :toFQN AND fromType = :fromType " +
"AND toType = :toType AND relation = :relation")
String find(@Bind("fromFQN") String fromFQN, @Bind("toFQN") String toFQN,
@Bind("fromType") String fromType, @Bind("toType") String toType,
@Bind("relation") int relation);
@SqlQuery("SELECT fromFQN, toFQN, json FROM field_relationship WHERE " +
"toFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType = :fromType AND toType = :toType AND relation = :relation")
@RegisterRowMapper(FromFieldMapper.class)
List<List<String>> listFromByPrefix(@Bind("fqnPrefix") String fqnPrefix, @Bind("fromType") String fromType,
@Bind("toType") String toType, @Bind("relation") int relation);
@SqlQuery("SELECT fromFQN, toFQN, json FROM field_relationship WHERE " +
"fromFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType = :fromType AND toType = :toType AND relation = :relation")
@RegisterRowMapper(ToFieldMapper.class)
List<List<String>> listToByPrefix(@Bind("fqnPrefix") String fqnPrefix, @Bind("fromType") String fromType,
@Bind("toType") String toType, @Bind("relation") int relation);
@SqlUpdate("DELETE from field_relationship WHERE " +
"(toFQN LIKE CONCAT(:fqnPrefix, '.%') OR fromFQN LIKE CONCAT(:fqnPrefix, '.%')) " +
"AND relation = :relation")
void deleteAllByPrefix(@Bind("fqnPrefix") String fqnPrefix, @Bind("relation") int relation);
class ToFieldMapper implements RowMapper<List<String>> {
@Override
public List<String> map(ResultSet rs, StatementContext ctx) throws SQLException {
return Arrays.asList(rs.getString("fromFQN"), rs.getString("toFQN"), rs.getString("json"));
}
}
class FromFieldMapper implements RowMapper<List<String>> {
@Override
public List<String> map(ResultSet rs, StatementContext ctx) throws SQLException {
return Arrays.asList(rs.getString("toFQN"), rs.getString("fromFQN"), rs.getString("json"));
}
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.core.mapper.RowMapper;
import org.openmetadata.catalog.type.EntityReference;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.UUID;
public class FromEntityReferenceMapper3 implements RowMapper<EntityReference> {
@Override
public EntityReference map(ResultSet rs, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException {
return new EntityReference().withId(UUID.fromString(rs.getString("fromId")))
.withType(rs.getString("fromEntity"));
}
}

View File

@ -23,7 +23,6 @@ import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO;
import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;

View File

@ -0,0 +1,50 @@
package org.openmetadata.catalog.jdbi3;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import java.util.List;
public interface TableDAO {
@SqlUpdate("INSERT INTO table_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);
@SqlUpdate("UPDATE table_entity SET json = :json WHERE id = :id")
void update(@Bind("id") String id, @Bind("json") String json);
@SqlQuery("SELECT json FROM table_entity WHERE id = :tableId")
String findById(@Bind("tableId") String tableId);
@SqlQuery("SELECT json FROM table_entity WHERE fullyQualifiedName = :tableFQN")
String findByFqn(@Bind("tableFQN") String tableFQN);
@SqlQuery("SELECT count(*) FROM table_entity WHERE " +
"(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL)")
int listCount(@Bind("databaseFQN") String databaseFQN);
@SqlQuery(
"SELECT json FROM (" +
"SELECT fullyQualifiedName, json FROM table_entity WHERE " +
"(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND " +
"fullyQualifiedName < :before " + // Pagination by table fullyQualifiedName
"ORDER BY fullyQualifiedName DESC " + // Pagination ordering by table fullyQualifiedName
"LIMIT :limit" +
") last_rows_subquery ORDER BY fullyQualifiedName")
List<String> listBefore(@Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit,
@Bind("before") String before);
@SqlQuery("SELECT json FROM table_entity WHERE " +
"(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND "+//Filter by databaseName
"fullyQualifiedName > :after " + // Pagination by table fullyQualifiedName
"ORDER BY fullyQualifiedName " + // Pagination ordering by table fullyQualifiedName
"LIMIT :limit")
List<String> listAfter(@Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit,
@Bind("after") String after);
@SqlQuery("SELECT EXISTS (SELECT * FROM table_entity WHERE id = :id)")
boolean exists(@Bind("id") String id);
@SqlUpdate("DELETE FROM table_entity WHERE id = :id")
int delete(@Bind("id") String id);
}

View File

@ -0,0 +1,54 @@
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.customizer.Define;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import java.util.List;
public interface TableDAO3 extends EntityDAO {
@Override
default String getTableName() {
return "table_entity";
}
@Override
@SqlQuery("SELECT json FROM <table> WHERE fullyQualifiedName = :tableFQN")
String findByFqn(@Define("table") String table, @Bind("tableFQN") String tableFQN);
@Override
@SqlQuery("SELECT count(*) FROM <table> WHERE " +
"(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL)")
int listCount(@Define("table") String table, @Bind("databaseFQN") String databaseFQN);
@Override
@SqlQuery(
"SELECT json FROM (" +
"SELECT fullyQualifiedName, json FROM <table> WHERE " +
"(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND " +
"fullyQualifiedName < :before " + // Pagination by table fullyQualifiedName
"ORDER BY fullyQualifiedName DESC " + // Pagination ordering by table fullyQualifiedName
"LIMIT :limit" +
") last_rows_subquery ORDER BY fullyQualifiedName")
List<String> listBefore(@Define("table") String table, @Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit,
@Bind("before") String before);
@Override
@SqlQuery("SELECT json FROM <table> WHERE " +
"(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND "+//Filter by databaseName
"fullyQualifiedName > :after " + // Pagination by table fullyQualifiedName
"ORDER BY fullyQualifiedName " + // Pagination ordering by table fullyQualifiedName
"LIMIT :limit")
List<String> listAfter(@Define("table") String table, @Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit,
@Bind("after") String after);
@Override
@SqlQuery("SELECT EXISTS (SELECT * FROM <table> WHERE id = :id)")
boolean exists(@Define("table") String table, @Bind("id") String id);
@Override
@SqlUpdate("DELETE FROM <table> WHERE id = :id")
int delete(@Define("table") String table, @Bind("id") String id);
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.CreateSqlObject;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import java.util.List;
public interface TableRepository3 {
@CreateSqlObject
DatabaseDAO databaseDAO();
@CreateSqlObject
EntityRelationshipDAO3 relationshipDAO();
@CreateSqlObject
FieldRelationshipDAO3 fieldRelationshipDAO();
@CreateSqlObject
EntityExtensionDAO3 entityExtensionDAO();
@CreateSqlObject
UserDAO userDAO();
@CreateSqlObject
TeamDAO teamDAO();
@CreateSqlObject
UsageDAO usageDAO();
@CreateSqlObject
TagDAO tagDAO();
@CreateSqlObject
TableDAO3 tableDAO();
}

View File

@ -17,16 +17,12 @@
package org.openmetadata.catalog.jdbi3;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Database;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO;
import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
import org.openmetadata.catalog.jdbi3.UsageRepository.UsageDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
import org.openmetadata.catalog.resources.databases.TableResource;
import org.openmetadata.catalog.resources.databases.TableResource.TableList;
import org.openmetadata.catalog.type.Column;
@ -41,7 +37,7 @@ import org.openmetadata.catalog.type.TableJoins;
import org.openmetadata.catalog.type.TableProfile;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.EntityUpdater;
import org.openmetadata.catalog.util.EntityUpdater3;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.EventUtils;
@ -50,11 +46,6 @@ import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.RestUtil.PutResponse;
import org.openmetadata.catalog.util.ResultList;
import org.openmetadata.common.utils.CommonUtil;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.CreateSqlObject;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -81,56 +72,36 @@ import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityN
import static org.openmetadata.catalog.jdbi3.Relationship.JOINED_WITH;
import static org.openmetadata.common.utils.CommonUtil.parseDate;
public abstract class TableRepository {
private static final Logger LOG = LoggerFactory.getLogger(TableRepository.class);
public class TableRepositoryHelper {
static final Logger LOG = LoggerFactory.getLogger(TableRepositoryHelper.class);
// Table fields that can be patched in a PATCH request
private static final Fields TABLE_PATCH_FIELDS = new Fields(TableResource.FIELD_LIST,
static final Fields TABLE_PATCH_FIELDS = new Fields(TableResource.FIELD_LIST,
"owner,columns,database,tags,tableConstraints");
// Table fields that can be updated in a PUT request
private static final Fields TABLE_UPDATE_FIELDS = new Fields(TableResource.FIELD_LIST,
static final Fields TABLE_UPDATE_FIELDS = new Fields(TableResource.FIELD_LIST,
"owner,columns,database,tags,tableConstraints");
@CreateSqlObject
abstract DatabaseDAO databaseDAO();
public TableRepositoryHelper(TableRepository3 tableRepo3) {
this.tableRepo3 = tableRepo3;
}
@CreateSqlObject
abstract EntityRelationshipDAO relationshipDAO();
@CreateSqlObject
abstract FieldRelationshipDAO fieldRelationshipDAO();
@CreateSqlObject
abstract EntityExtensionDAO entityExtensionDAO();
@CreateSqlObject
abstract TableDAO tableDAO();
@CreateSqlObject
abstract UserDAO userDAO();
@CreateSqlObject
abstract TeamDAO teamDAO();
@CreateSqlObject
abstract UsageDAO usageDAO();
@CreateSqlObject
abstract TagDAO tagDAO();
// TODO initialize
private TableRepository3 tableRepo3;
EntityRepository<Table> entityRepository = new EntityRepository<>() {
@Override
public List<String> listAfter(String fqnPrefix, int limitParam, String after) {
return tableDAO().listAfter(fqnPrefix, limitParam, after);
return tableRepo3.tableDAO().listAfter(fqnPrefix, limitParam, after);
}
@Override
public List<String> listBefore(String fqnPrefix, int limitParam, String after) {
return tableDAO().listBefore(fqnPrefix, limitParam, after);
return tableRepo3.tableDAO().listBefore(fqnPrefix, limitParam, after);
}
@Override
public int listCount(String fqnPrefix) {
return tableDAO().listCount(fqnPrefix);
return tableRepo3.tableDAO().listCount(fqnPrefix);
}
@Override
@ -140,7 +111,7 @@ public abstract class TableRepository {
@Override
public Table setFields(Table entity, Fields fields) throws IOException, ParseException {
return TableRepository.this.setFields(entity, fields);
return TableRepositoryHelper.this.setFields(entity, fields);
}
@Override
@ -173,7 +144,7 @@ public abstract class TableRepository {
@Transaction
public Table getByName(String fqn, Fields fields) throws IOException, ParseException {
Table table = EntityUtil.validate(fqn, tableDAO().findByFQN(fqn), Table.class);
Table table = EntityUtil.validate(fqn, tableRepo3.tableDAO().findByFqn(fqn), Table.class);
return setFields(table, fields);
}
@ -194,17 +165,17 @@ public abstract class TableRepository {
@Transaction
public void delete(String id) {
if (tableDAO().delete(id) <= 0) {
if (tableRepo3.tableDAO().delete(id) <= 0) {
throw EntityNotFoundException.byMessage(entityNotFound(Entity.TABLE, id));
}
// Remove all relationships
relationshipDAO().deleteAll(id);
tableRepo3.relationshipDAO().deleteAll(id);
}
@Transaction
public PutResponse<Table> createOrUpdate(Table updated, UUID databaseId) throws IOException, ParseException {
validateRelationships(updated, databaseId);
Table stored = JsonUtils.readValue(tableDAO().findByFQN(updated.getFullyQualifiedName()), Table.class);
Table stored = JsonUtils.readValue(tableRepo3.tableDAO().findByFqn(updated.getFullyQualifiedName()), Table.class);
if (stored == null) {
return new PutResponse<>(Status.CREATED, createInternal(updated));
}
@ -228,15 +199,15 @@ public abstract class TableRepository {
@Transaction
public Status addFollower(String tableId, String userId) throws IOException {
EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class);
return EntityUtil.addFollower(relationshipDAO(), userDAO(), tableId, Entity.TABLE, userId, Entity.USER) ?
EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class);
return EntityUtil.addFollower(tableRepo3.relationshipDAO(), tableRepo3.userDAO(), tableId, Entity.TABLE, userId, Entity.USER) ?
Status.CREATED : Status.OK;
}
@Transaction
public void addJoins(String tableId, TableJoins joins) throws IOException, ParseException {
// Validate the request content
Table table = EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class);
Table table = EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class);
if (!CommonUtil.dateInRange(RestUtil.DATE_FORMAT, joins.getStartDate(), 0, 30)) {
throw new IllegalArgumentException("Date range can only include past 30 days starting today");
}
@ -257,7 +228,7 @@ public abstract class TableRepository {
@Transaction
public void addSampleData(String tableId, TableData tableData) throws IOException {
// Validate the request content
Table table = EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class);
Table table = EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class);
// Validate all the columns
for (String columnName : tableData.getColumns()) {
@ -271,14 +242,14 @@ public abstract class TableRepository {
}
}
entityExtensionDAO().insert(tableId, "table.sampleData", "tableData",
tableRepo3.entityExtensionDAO().insert(tableId, "table.sampleData", "tableData",
JsonUtils.pojoToJson(tableData));
}
@Transaction
public void addTableProfileData(String tableId, TableProfile tableProfile) throws IOException {
// Validate the request content
Table table = EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class);
Table table = EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class);
List<TableProfile> storedTableProfiles = getTableProfile(table);
Map<String, TableProfile> storedMapTableProfiles = new HashMap<>();
@ -294,19 +265,19 @@ public abstract class TableRepository {
storedMapTableProfiles.put(tableProfile.getProfileDate(), tableProfile);
List<TableProfile> updatedProfiles = new ArrayList<>(storedMapTableProfiles.values());
entityExtensionDAO().insert(tableId, "table.tableProfile", "tableProfile",
tableRepo3.entityExtensionDAO().insert(tableId, "table.tableProfile", "tableProfile",
JsonUtils.pojoToJson(updatedProfiles));
}
@Transaction
public void deleteFollower(String tableId, String userId) {
EntityUtil.validateUser(userDAO(), userId);
EntityUtil.removeFollower(relationshipDAO(), tableId, userId);
EntityUtil.validateUser(tableRepo3.userDAO(), userId);
EntityUtil.removeFollower(tableRepo3.relationshipDAO(), tableId, userId);
}
@Transaction
public EntityReference getOwnerReference(Table table) throws IOException {
return EntityUtil.populateOwner(userDAO(), teamDAO(), table.getOwner());
return EntityUtil.populateOwner(tableRepo3.userDAO(), tableRepo3.teamDAO(), table.getOwner());
}
@ -322,7 +293,7 @@ public abstract class TableRepository {
private void validateRelationships(Table table, UUID databaseId) throws IOException {
// Validate database
Database db = EntityUtil.validate(databaseId.toString(), databaseDAO().findById(databaseId.toString()),
Database db = EntityUtil.validate(databaseId.toString(), tableRepo3.databaseDAO().findById(databaseId.toString()),
Database.class);
table.setDatabase(EntityUtil.getEntityReference(db));
// Validate and set other relationships
@ -345,7 +316,7 @@ public abstract class TableRepository {
}
for (Column column : columns) {
column.setTags(EntityUtil.addDerivedTags(tagDAO(), column.getTags()));
column.setTags(EntityUtil.addDerivedTags(tableRepo3.tagDAO(), column.getTags()));
if (column.getChildren() != null) {
addDerivedTags(column.getChildren());
}
@ -358,10 +329,10 @@ public abstract class TableRepository {
setColumnFQN(table.getFullyQualifiedName(), table.getColumns());
// Check if owner is valid and set the relationship
table.setOwner(EntityUtil.populateOwner(userDAO(), teamDAO(), table.getOwner()));
table.setOwner(EntityUtil.populateOwner(tableRepo3.userDAO(), tableRepo3.teamDAO(), table.getOwner()));
// Validate table tags and add derived tags to the list
table.setTags(EntityUtil.addDerivedTags(tagDAO(), table.getTags()));
table.setTags(EntityUtil.addDerivedTags(tableRepo3.tagDAO(), table.getTags()));
// Validate column tags
addDerivedTags(table.getColumns());
@ -382,9 +353,9 @@ public abstract class TableRepository {
table.getColumns().forEach(column -> column.setTags(null));
if (update) {
tableDAO().update(table.getId().toString(), JsonUtils.pojoToJson(table));
tableRepo3.tableDAO().update(table.getId().toString(), JsonUtils.pojoToJson(table));
} else {
tableDAO().insert(JsonUtils.pojoToJson(table));
tableRepo3.tableDAO().insert(JsonUtils.pojoToJson(table));
}
// Restore the relationships
@ -417,11 +388,11 @@ public abstract class TableRepository {
private void addRelationships(Table table) throws IOException {
// Add relationship from database to table
String databaseId = table.getDatabase().getId().toString();
relationshipDAO().insert(databaseId, table.getId().toString(), Entity.DATABASE, Entity.TABLE,
tableRepo3.relationshipDAO().insert(databaseId, table.getId().toString(), Entity.DATABASE, Entity.TABLE,
Relationship.CONTAINS.ordinal());
// Add table owner relationship
EntityUtil.setOwner(relationshipDAO(), table.getId(), Entity.TABLE, table.getOwner());
EntityUtil.setOwner(tableRepo3.relationshipDAO(), table.getId(), Entity.TABLE, table.getOwner());
// Add tag to table relationship
applyTags(table);
@ -431,7 +402,7 @@ public abstract class TableRepository {
private void applyTags(List<Column> columns) throws IOException {
// Add column level tags by adding tag to column relationship
for (Column column : columns) {
EntityUtil.applyTags(tagDAO(), column.getTags(), column.getFullyQualifiedName());
EntityUtil.applyTags(tableRepo3.tagDAO(), column.getTags(), column.getFullyQualifiedName());
column.setTags(getTags(column.getFullyQualifiedName())); // Update tag list to handle derived tags
if (column.getChildren() != null) {
applyTags(column.getChildren());
@ -441,7 +412,7 @@ public abstract class TableRepository {
private void applyTags(Table table) throws IOException {
// Add table level tags by adding tag to table relationship
EntityUtil.applyTags(tagDAO(), table.getTags(), table.getFullyQualifiedName());
EntityUtil.applyTags(tableRepo3.tagDAO(), table.getTags(), table.getFullyQualifiedName());
table.setTags(getTags(table.getFullyQualifiedName())); // Update tag to handle additional derived tags
applyTags(table.getColumns());
}
@ -465,16 +436,16 @@ public abstract class TableRepository {
private Database getDatabase(Table table) throws IOException {
// Find database for the table
String id = table.getId().toString();
List<String> result = relationshipDAO().findFrom(id, Relationship.CONTAINS.ordinal(), Entity.DATABASE);
List<String> result = tableRepo3.relationshipDAO().findFrom(id, Relationship.CONTAINS.ordinal(), Entity.DATABASE);
if (result.size() != 1) {
throw EntityNotFoundException.byMessage(String.format("Database for table %s Not found", id));
}
String databaseId = result.get(0);
return EntityUtil.validate(databaseId, databaseDAO().findById(databaseId), Database.class);
return EntityUtil.validate(databaseId, tableRepo3.databaseDAO().findById(databaseId), Database.class);
}
private Table validateTable(String tableId) throws IOException {
return EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class);
return EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class);
}
private Table setFields(Table table, Fields fields) throws IOException, ParseException {
@ -482,7 +453,7 @@ public abstract class TableRepository {
table.setTableConstraints(fields.contains("tableConstraints") ? table.getTableConstraints() : null);
table.setOwner(fields.contains("owner") ? getOwner(table) : null);
table.setFollowers(fields.contains("followers") ? getFollowers(table) : null);
table.setUsageSummary(fields.contains("usageSummary") ? EntityUtil.getLatestUsage(usageDAO(), table.getId()) :
table.setUsageSummary(fields.contains("usageSummary") ? EntityUtil.getLatestUsage(tableRepo3.usageDAO(), table.getId()) :
null);
table.setDatabase(fields.contains("database") ? EntityUtil.getEntityReference(getDatabase(table)) : null);
table.setTags(fields.contains("tags") ? getTags(table.getFullyQualifiedName()) : null);
@ -495,15 +466,15 @@ public abstract class TableRepository {
}
private EntityReference getOwner(Table table) throws IOException {
return table == null ? null : EntityUtil.populateOwner(table.getId(), relationshipDAO(), userDAO(), teamDAO());
return table == null ? null : EntityUtil.populateOwner(table.getId(), tableRepo3.relationshipDAO(), tableRepo3.userDAO(), tableRepo3.teamDAO());
}
private List<EntityReference> getFollowers(Table table) throws IOException {
return table == null ? null : EntityUtil.getFollowers(table.getId(), relationshipDAO(), userDAO());
return table == null ? null : EntityUtil.getFollowers(table.getId(), tableRepo3.relationshipDAO(), tableRepo3.userDAO());
}
private List<TagLabel> getTags(String fqn) {
return tagDAO().getTags(fqn);
return tableRepo3.tagDAO().getTags(fqn);
}
private void getColumnTags(boolean setTags, List<Column> columns) {
@ -545,7 +516,7 @@ public abstract class TableRepository {
for (JoinedWith joinedWith : joinedWithList) {
// Validate table
String tableFQN = getTableFQN(joinedWith.getFullyQualifiedName());
Table joinedWithTable = EntityUtil.validate(tableFQN, tableDAO().findByFQN(tableFQN), Table.class);
Table joinedWithTable = EntityUtil.validate(tableFQN, tableRepo3.tableDAO().findByFqn(tableFQN), Table.class);
// Validate column
validateColumnFQN(joinedWithTable, joinedWith.getFullyQualifiedName());
@ -580,7 +551,7 @@ public abstract class TableRepository {
fromColumnFQN = joinedWith.getFullyQualifiedName();
toColumnFQN = columnFQN;
}
String json = fieldRelationshipDAO().find(fromColumnFQN, toColumnFQN, "table.columns.column",
String json = tableRepo3.fieldRelationshipDAO().find(fromColumnFQN, toColumnFQN, "table.columns.column",
"table.columns.column", JOINED_WITH.ordinal());
DailyCount dailyCount = new DailyCount().withCount(joinedWith.getJoinCount()).withDate(date);
@ -624,7 +595,7 @@ public abstract class TableRepository {
}
json = JsonUtils.pojoToJson(dailyCountList);
fieldRelationshipDAO().upsert(fromColumnFQN, toColumnFQN, "table.columns.column",
tableRepo3.fieldRelationshipDAO().upsert(fromColumnFQN, toColumnFQN, "table.columns.column",
"table.columns.column", JOINED_WITH.ordinal(), "dailyCount", json);
}
}
@ -635,9 +606,9 @@ public abstract class TableRepository {
TableJoins tableJoins = new TableJoins().withStartDate(todayMinus30Days).withDayCount(30)
.withColumnJoins(Collections.emptyList());
List<List<String>> list = fieldRelationshipDAO().listToByPrefix(table.getFullyQualifiedName(),
List<List<String>> list = tableRepo3.fieldRelationshipDAO().listToByPrefix(table.getFullyQualifiedName(),
"table.columns.column", "table.columns.column", JOINED_WITH.ordinal());
list.addAll(fieldRelationshipDAO().listFromByPrefix(table.getFullyQualifiedName(), "table.columns.column",
list.addAll(tableRepo3.fieldRelationshipDAO().listFromByPrefix(table.getFullyQualifiedName(), "table.columns.column",
"table.columns.column", JOINED_WITH.ordinal()));
if (list.size() == 0) { // No join information found. Return empty list
@ -671,12 +642,12 @@ public abstract class TableRepository {
}
private TableData getSampleData(Table table) throws IOException {
return JsonUtils.readValue(entityExtensionDAO().getExtension(table.getId().toString(), "table.sampleData"),
return JsonUtils.readValue(tableRepo3.entityExtensionDAO().getExtension(table.getId().toString(), "table.sampleData"),
TableData.class);
}
private List<TableProfile> getTableProfile(Table table) throws IOException {
List<TableProfile> tableProfiles = JsonUtils.readObjects(entityExtensionDAO().getExtension(table.getId().toString(),
List<TableProfile> tableProfiles = JsonUtils.readObjects(tableRepo3.entityExtensionDAO().getExtension(table.getId().toString(),
"table.tableProfile"),
TableProfile.class);
if (tableProfiles != null) {
@ -687,49 +658,6 @@ public abstract class TableRepository {
}
public interface TableDAO {
@SqlUpdate("INSERT INTO table_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);
@SqlUpdate("UPDATE table_entity SET json = :json WHERE id = :id")
void update(@Bind("id") String id, @Bind("json") String json);
@SqlQuery("SELECT json FROM table_entity WHERE id = :tableId")
String findById(@Bind("tableId") String tableId);
@SqlQuery("SELECT json FROM table_entity WHERE fullyQualifiedName = :tableFQN")
String findByFQN(@Bind("tableFQN") String tableFQN);
@SqlQuery("SELECT count(*) FROM table_entity WHERE " +
"(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL)")
int listCount(@Bind("databaseFQN") String databaseFQN);
@SqlQuery(
"SELECT json FROM (" +
"SELECT fullyQualifiedName, json FROM table_entity WHERE " +
"(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND " +
"fullyQualifiedName < :before " + // Pagination by table fullyQualifiedName
"ORDER BY fullyQualifiedName DESC " + // Pagination ordering by table fullyQualifiedName
"LIMIT :limit" +
") last_rows_subquery ORDER BY fullyQualifiedName")
List<String> listBefore(@Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit,
@Bind("before") String before);
@SqlQuery("SELECT json FROM table_entity WHERE " +
"(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND "+//Filter by databaseName
"fullyQualifiedName > :after " + // Pagination by table fullyQualifiedName
"ORDER BY fullyQualifiedName " + // Pagination ordering by table fullyQualifiedName
"LIMIT :limit")
List<String> listAfter(@Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit,
@Bind("after") String after);
@SqlQuery("SELECT EXISTS (SELECT * FROM table_entity WHERE id = :id)")
boolean exists(@Bind("id") String id);
@SqlUpdate("DELETE FROM table_entity WHERE id = :id")
int delete(@Bind("id") String id);
}
static class TableEntityInterface implements EntityInterface {
private final Table table;
@ -786,13 +714,13 @@ public abstract class TableRepository {
/**
* Handles entity updated from PUT and POST operation.
*/
public class TableUpdater extends EntityUpdater {
public class TableUpdater extends EntityUpdater3 {
final Table orig;
final Table updated;
public TableUpdater(Table orig, Table updated, boolean patchOperation) {
super(new TableEntityInterface(orig), new TableEntityInterface(updated), patchOperation, relationshipDAO(),
tagDAO());
super(new TableEntityInterface(orig), new TableEntityInterface(updated), patchOperation, tableRepo3.relationshipDAO(),
tableRepo3.tagDAO());
this.orig = orig;
this.updated = updated;
}
@ -816,7 +744,7 @@ public abstract class TableRepository {
.orElse(null);
if (stored == null) {
fieldsAdded.add("column:" + updated.getFullyQualifiedName());
EntityUtil.applyTags(tagDAO(), updated.getTags(), updated.getFullyQualifiedName());
EntityUtil.applyTags(tableRepo3.tagDAO(), updated.getTags(), updated.getFullyQualifiedName());
continue;
}
@ -885,7 +813,7 @@ public abstract class TableRepository {
update("column:" + origColumn.getFullyQualifiedName() + ":tags",
origColumn.getTags() == null ? 0 : origColumn.getTags().size(),
updatedColumn.getTags() == null ? 0 : updatedColumn.getTags().size());
EntityUtil.applyTags(tagDAO(), updatedColumn.getTags(), updatedColumn.getFullyQualifiedName());
EntityUtil.applyTags(tableRepo3.tagDAO(), updatedColumn.getTags(), updatedColumn.getFullyQualifiedName());
}
public void store() throws IOException {

View File

@ -0,0 +1,69 @@
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.openmetadata.catalog.jdbi3.TagDAO.TagLabelMapper;
import org.openmetadata.catalog.type.TagLabel;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
@RegisterRowMapper(TagLabelMapper.class)
public interface TagDAO {
@SqlUpdate("INSERT INTO tag_category (json) VALUES (:json)")
void insertCategory(@Bind("json") String json);
@SqlUpdate("INSERT INTO tag(json) VALUES (:json)")
void insertTag(@Bind("json") String json);
@SqlUpdate("UPDATE tag_category SET json = :json where name = :name")
void updateCategory(@Bind("name") String name, @Bind("json") String json);
@SqlUpdate("UPDATE tag SET json = :json where fullyQualifiedName = :fqn")
void updateTag(@Bind("fqn") String fqn, @Bind("json") String json);
@SqlQuery("SELECT json FROM tag_category ORDER BY name")
List<String> listCategories();
@SqlQuery("SELECT json FROM tag WHERE fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') ORDER BY fullyQualifiedName")
List<String> listChildrenTags(@Bind("fqnPrefix") String fqnPrefix);
@SqlQuery("SELECT json FROM tag_category WHERE name = :name")
String findCategory(@Bind("name") String name);
@SqlQuery("SELECT EXISTS (SELECT * FROM tag WHERE fullyQualifiedName = :fqn)")
boolean tagExists(@Bind("fqn") String fqn);
@SqlQuery("SELECT json FROM tag WHERE fullyQualifiedName = :fqn")
String findTag(@Bind("fqn") String fqn);
@SqlUpdate("INSERT IGNORE INTO tag_usage (tagFQN, targetFQN, labelType, state) VALUES (:tagFQN, :targetFQN, " +
":labelType, :state)")
void applyTag(@Bind("tagFQN") String tagFQN, @Bind("targetFQN") String targetFQN,
@Bind("labelType") int labelType, @Bind("state") int state);
@SqlQuery("SELECT tagFQN, labelType, state FROM tag_usage WHERE targetFQN = :targetFQN ORDER BY tagFQN")
List<TagLabel> getTags(@Bind("targetFQN") String targetFQN);
@SqlQuery("SELECT COUNT(*) FROM tag_usage WHERE tagFQN LIKE CONCAT(:fqnPrefix, '%')")
int getTagCount(@Bind("fqnPrefix") String fqnPrefix);
@SqlUpdate("DELETE FROM tag_usage where targetFQN = :targetFQN")
void deleteTags(@Bind("targetFQN") String targetFQN);
@SqlUpdate("DELETE FROM tag_usage where targetFQN LIKE CONCAT(:fqnPrefix, '%')")
void deleteTagsByPrefix(@Bind("fqnPrefix") String fqnPrefix);
class TagLabelMapper implements RowMapper<TagLabel> {
@Override
public TagLabel map(ResultSet r, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException {
return new TagLabel().withLabelType(TagLabel.LabelType.values()[r.getInt("labelType")])
.withState(TagLabel.State.values()[r.getInt("state")])
.withTagFQN(r.getString("tagFQN"));
}
}
}

View File

@ -0,0 +1,43 @@
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import java.util.List;
public interface TeamDAO {
@SqlUpdate("INSERT INTO team_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);
@SqlQuery("SELECT json FROM team_entity where id = :teamId")
String findById(@Bind("teamId") String teamId);
@SqlQuery("SELECT json FROM team_entity where name = :name")
String findByName(@Bind("name") String name);
@SqlQuery("SELECT count(*) FROM team_entity")
int listCount();
@SqlQuery(
"SELECT json FROM (" +
"SELECT name, json FROM team_entity WHERE " +
"name < :before " + // Pagination by team name
"ORDER BY name DESC " + // Pagination ordering by team name
"LIMIT :limit" +
") last_rows_subquery ORDER BY name")
List<String> listBefore(@Bind("limit") int limit, @Bind("before") String before);
@SqlQuery("SELECT json FROM team_entity WHERE " +
"name > :after " + // Pagination by team name
"ORDER BY name " + // Pagination ordering by team name
"LIMIT :limit")
List<String> listAfter(@Bind("limit") int limit, @Bind("after") String after);
@SqlUpdate("DELETE FROM team_entity WHERE id = :teamId")
int delete(@Bind("teamId") String teamId);
@SqlUpdate("UPDATE team_entity SET json = :json WHERE id = :id")
void update(@Bind("id") String id, @Bind("json") String json);
}

View File

@ -28,7 +28,6 @@ import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.core.mapper.RowMapper;
import org.openmetadata.catalog.type.EntityReference;
import org.skife.jdbi.v2.StatementContext;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.UUID;
public class ToEntityReferenceMapper3 implements RowMapper<EntityReference> {
@Override
public EntityReference map(ResultSet rs, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException {
return new EntityReference().withId(UUID.fromString(rs.getString("toId"))).withType(rs.getString("toEntity"));
}
}

View File

@ -0,0 +1,86 @@
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.openmetadata.catalog.jdbi3.UsageDAO.UsageDetailsMapper;
import org.openmetadata.catalog.type.UsageDetails;
import org.openmetadata.catalog.type.UsageStats;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
@RegisterRowMapper(UsageDetailsMapper.class)
public interface UsageDAO {
@SqlUpdate("INSERT INTO entity_usage (usageDate, id, entityType, count1, count7, count30) " +
"SELECT :date, :id, :entityType, :count1, " +
"(:count1 + (SELECT COALESCE(SUM(count1), 0) FROM entity_usage WHERE id = :id AND usageDate >= :date - " +
"INTERVAL 6 DAY)), " +
"(:count1 + (SELECT COALESCE(SUM(count1), 0) FROM entity_usage WHERE id = :id AND usageDate >= :date - " +
"INTERVAL 29 DAY))")
void insert(@Bind("date") String date, @Bind("id") String id, @Bind("entityType") String entityType, @Bind(
"count1") int count1);
@SqlUpdate("INSERT INTO entity_usage (usageDate, id, entityType, count1, count7, count30) " +
"SELECT :date, :id, :entityType, :count1, " +
"(:count1 + (SELECT COALESCE(SUM(count1), 0) FROM entity_usage WHERE id = :id AND usageDate >= :date - " +
"INTERVAL 6 DAY)), " +
"(:count1 + (SELECT COALESCE(SUM(count1), 0) FROM entity_usage WHERE id = :id AND usageDate >= :date - " +
"INTERVAL 29 DAY)) " +
"ON DUPLICATE KEY UPDATE count1 = count1 + :count1, count7 = count7 + :count1, count30 = count30 + :count1")
void insertOrUpdateCount(@Bind("date") String date, @Bind("id") String id, @Bind("entityType") String entityType,
@Bind("count1") int count1);
@SqlQuery("SELECT id, usageDate, entityType, count1, count7, count30, " +
"percentile1, percentile7, percentile30 FROM entity_usage " +
"WHERE id = :id AND usageDate >= :date - INTERVAL :days DAY AND usageDate <= :date ORDER BY usageDate DESC")
List<UsageDetails> getUsageById(@Bind("id") String id, @Bind("date") String date, @Bind("days") int days);
/**
* Get latest usage record
**/
@SqlQuery("SELECT id, usageDate, entityType, count1, count7, count30, " +
"percentile1, percentile7, percentile30 FROM entity_usage " +
"WHERE usageDate IN (SELECT MAX(usageDate) FROM entity_usage WHERE id = :id) AND id = :id")
UsageDetails getLatestUsage(@Bind("id") String id);
@SqlUpdate("DELETE FROM entity_usage WHERE id = :id")
int delete(@Bind("id") String id);
/**
* Note not using in following percentile computation PERCENT_RANK function as unit tests use mysql5.7 and it does
* not have window function
*/
@SqlUpdate("UPDATE entity_usage u JOIN ( " +
"SELECT u1.id, " +
"(SELECT COUNT(*) FROM entity_usage as u2 WHERE u2.count1 < u1.count1 AND u2.entityType = :entityType " +
"AND u2.usageDate = :date) as p1, " +
"(SELECT COUNT(*) FROM entity_usage as u3 WHERE u3.count7 < u1.count7 AND u3.entityType = :entityType " +
"AND u3.usageDate = :date) as p7, " +
"(SELECT COUNT(*) FROM entity_usage as u4 WHERE u4.count30 < u1.count30 AND u4.entityType = :entityType " +
"AND u4.usageDate = :date) as p30, " +
"(SELECT COUNT(*) FROM entity_usage WHERE entityType = :entityType AND usageDate = :date) as total " +
"FROM entity_usage u1 WHERE u1.entityType = :entityType AND u1.usageDate = :date" +
") vals ON u.id = vals.id AND usageDate = :date " +
"SET u.percentile1 = ROUND(100 * p1/total, 2), u.percentile7 = ROUND(p7 * 100/total, 2), u.percentile30 =" +
" ROUND(p30*100/total, 2)")
void computePercentile(@Bind("entityType") String entityType, @Bind("date") String date);
class UsageDetailsMapper implements RowMapper<UsageDetails> {
@Override
public UsageDetails map(ResultSet r, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException {
UsageStats dailyStats = new UsageStats().withCount(r.getInt("count1")).withPercentileRank(r.getDouble(
"percentile1"));
UsageStats weeklyStats = new UsageStats().withCount(r.getInt("count7")).withPercentileRank(r.getDouble(
"percentile7"));
UsageStats monthlyStats = new UsageStats().withCount(r.getInt("count30")).withPercentileRank(r.getDouble(
"percentile30"));
return new UsageDetails().withDate(r.getString("usageDate")).withDailyStats(dailyStats)
.withWeeklyStats(weeklyStats).withMonthlyStats(monthlyStats);
}
}
}

View File

@ -19,7 +19,6 @@ package org.openmetadata.catalog.jdbi3;
import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardDAO;
import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;

View File

@ -0,0 +1,48 @@
package org.openmetadata.catalog.jdbi3;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import java.util.List;
public interface UserDAO {
@SqlUpdate("INSERT INTO user_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);
@SqlQuery("SELECT json FROM user_entity WHERE id = :id")
String findById(@Bind("id") String id);
@SqlQuery("SELECT json FROM user_entity WHERE name = :name")
String findByName(@Bind("name") String name);
@SqlQuery("SELECT json FROM user_entity WHERE email = :email")
String findByEmail(@Bind("email") String email);
@SqlQuery("SELECT json FROM user_entity")
List<String> list();
@SqlQuery("SELECT count(*) FROM user_entity")
int listCount();
@SqlQuery(
"SELECT json FROM (" +
"SELECT name, json FROM user_entity WHERE " +
"name < :before " + // Pagination by user name
"ORDER BY name DESC " + // Pagination ordering by user name
"LIMIT :limit" +
") last_rows_subquery ORDER BY name")
List<String> listBefore(@Bind("limit") int limit, @Bind("before") String before);
@SqlQuery("SELECT json FROM user_entity WHERE " +
"name > :after " + // Pagination by user name
"ORDER BY name " + // Pagination ordering by user name
"LIMIT :limit")
List<String> listAfter(@Bind("limit") int limit, @Bind("after") String after);
@SqlUpdate("UPDATE user_entity SET json = :json WHERE id = :id")
void update(@Bind("id") String id, @Bind("json") String json);
@SqlQuery("SELECT EXISTS (SELECT * FROM user_entity where id = :id)")
boolean exists(@Bind("id") String id);
}

View File

@ -27,7 +27,6 @@ import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;

View File

@ -18,6 +18,10 @@ package org.openmetadata.catalog.resources;
import io.dropwizard.setup.Environment;
import io.swagger.annotations.Api;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.catalog.jdbi3.TableRepository3;
import org.openmetadata.catalog.jdbi3.TableRepositoryHelper;
import org.openmetadata.catalog.resources.databases.TableResource;
import org.openmetadata.catalog.type.CollectionDescriptor;
import org.openmetadata.catalog.type.CollectionInfo;
import org.openmetadata.catalog.util.RestUtil;
@ -158,6 +162,20 @@ public final class CollectionRegistry {
}
}
/**
* Register resources from CollectionRegistry
*/
public void registerResources3(Jdbi jdbi, Environment environment, CatalogAuthorizer authorizer) {
LOG.info("Initializing jdbi3");
Class<?> repositoryClz = TableRepository3.class;
final TableRepository3 daoObject = (TableRepository3) jdbi.onDemand(repositoryClz);
TableRepositoryHelper helper = new TableRepositoryHelper(daoObject);
TableResource resource = new TableResource(helper, authorizer);
environment.jersey().register(resource);
LOG.info("Registering {}", resource);
LOG.info("Initialized jdbi3");
}
/** Get collection details based on annotations in Resource classes */
private static CollectionDetails getCollection(Class<?> cl) {
String href, doc, name, repoClass;

View File

@ -28,7 +28,8 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.openmetadata.catalog.api.data.CreateTable;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.jdbi3.TableRepository;
import org.openmetadata.catalog.jdbi3.TableRepository3;
import org.openmetadata.catalog.jdbi3.TableRepositoryHelper;
import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.security.CatalogAuthorizer;
import org.openmetadata.catalog.security.SecurityUtil;
@ -38,7 +39,6 @@ import org.openmetadata.catalog.type.TableJoins;
import org.openmetadata.catalog.type.TableProfile;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.RestUtil.PutResponse;
import org.openmetadata.catalog.util.ResultList;
@ -80,11 +80,11 @@ import java.util.UUID;
@Api(value = "Tables collection", tags = "Tables collection")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "tables", repositoryClass = "org.openmetadata.catalog.jdbi3.TableRepository")
//@Collection(name = "tables", repositoryClass = "org.openmetadata.catalog.jdbi3.TableRepository3")
public class TableResource {
private static final Logger LOG = LoggerFactory.getLogger(TableResource.class);
private static final String TABLE_COLLECTION_PATH = "v1/tables/";
private final TableRepository dao;
private final TableRepositoryHelper dao;
private final CatalogAuthorizer authorizer;
public static void addHref(UriInfo uriInfo, EntityReference ref) {
@ -102,8 +102,8 @@ public class TableResource {
}
@Inject
public TableResource(TableRepository dao, CatalogAuthorizer authorizer) {
Objects.requireNonNull(dao, "TableRepository must not be null");
public TableResource(TableRepositoryHelper dao, CatalogAuthorizer authorizer) {
Objects.requireNonNull(dao, "TableRepository3 must not be null");
this.dao = dao;
this.authorizer = authorizer;
}

View File

@ -2,6 +2,7 @@ package org.openmetadata.catalog.util;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.jdbi3.EntityRelationshipDAO;
import org.openmetadata.catalog.jdbi3.TableRepository3;
import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO;
import org.openmetadata.catalog.type.EntityReference;
import org.slf4j.Logger;
@ -21,7 +22,7 @@ import java.util.List;
* of the entity.
*
* Concrete implementations need to implement update for other fields. See
* {@link org.openmetadata.catalog.jdbi3.TableRepository.TableUpdater} for an example implementation.
* {@link TableRepository3.TableUpdater} for an example implementation.
*/
public abstract class EntityUpdater {
private static final Logger LOG = LoggerFactory.getLogger(EntityUpdater.class);

View File

@ -0,0 +1,130 @@
package org.openmetadata.catalog.util;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.jdbi3.EntityRelationshipDAO3;
import org.openmetadata.catalog.jdbi3.TableRepository3;
import org.openmetadata.catalog.jdbi3.TagDAO;
import org.openmetadata.catalog.type.EntityReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Used for updating the following common entity fields in PUT and PATCH operations.
* - description
* - tags
* - owner
*
* This class handles tracking all the changes in an update operation and also versioning
* of the entity.
*
* Concrete implementations need to implement update for other fields. See
* {@link TableRepository3.TableUpdater} for an example implementation.
*/
public abstract class EntityUpdater3 {
private static final Logger LOG = LoggerFactory.getLogger(EntityUpdater3.class);
private final EntityInterface originalEntity;
private final EntityInterface updatedEntity;
private final EntityRelationshipDAO3 relationshipDAO;
private final TagDAO tagDAO;
protected final boolean patchOperation;
protected List<String> fieldsUpdated = new ArrayList<>();
protected List<String> fieldsAdded = new ArrayList<>();
protected List<String> fieldsDeleted = new ArrayList<>();
protected boolean majorVersionChange = false;
public EntityUpdater3(EntityInterface originalEntity, EntityInterface updatedEntity, boolean patchOperation,
EntityRelationshipDAO3 relationshipDAO, TagDAO tagDAO) {
this.originalEntity = originalEntity;
this.updatedEntity = updatedEntity;
this.patchOperation = patchOperation;
this.relationshipDAO = relationshipDAO;
this.tagDAO = tagDAO;
}
public void updateAll() throws IOException {
updateDescription();
updateDisplayName();
updateOwner();
if (tagDAO != null) {
updateTags(); // If tagDAO != null, the Entity supports tags
}
}
private void updateDescription() {
if (!patchOperation &&
originalEntity.getDescription() != null && !originalEntity.getDescription().isEmpty()) {
// Update description only when stored is empty to retain user authored descriptions
updatedEntity.setDescription(originalEntity.getDescription());
return;
}
update("description", originalEntity.getDescription(), updatedEntity.getDescription());
}
private void updateDisplayName() {
if (!patchOperation &&
originalEntity.getDisplayName() != null && !originalEntity.getDisplayName().isEmpty()) {
// Update displayName only when stored is empty to retain user authored descriptions
updatedEntity.setDisplayName(originalEntity.getDisplayName());
return;
}
update("displayName", originalEntity.getDisplayName(), updatedEntity.getDisplayName());
}
private void updateOwner() {
EntityReference origOwner = originalEntity.getOwner();
EntityReference updatedOwner = updatedEntity.getOwner();
if (update("owner", origOwner == null ? null : origOwner.getId(),
updatedOwner == null ? null : updatedOwner.getId())) {
EntityUtil.updateOwner(relationshipDAO, origOwner, updatedOwner, originalEntity.getId(), Entity.TABLE);
}
}
private void updateTags() throws IOException {
// Remove current table tags in the database. It will be added back later from the merged tag list.
EntityUtil.removeTagsByPrefix(tagDAO, originalEntity.getFullyQualifiedName());
if (!patchOperation) {
// PUT operation merges tags in the request with what already exists
updatedEntity.setTags(EntityUtil.mergeTags(updatedEntity.getTags(), originalEntity.getTags()));
}
update("tags", originalEntity.getTags() == null ? 0 : originalEntity.getTags().size(),
updatedEntity.getTags() == null ? 0 : updatedEntity.getTags().size());
EntityUtil.applyTags(tagDAO, updatedEntity.getTags(), updatedEntity.getFullyQualifiedName());
}
public Double getNewVersion(Double oldVersion) {
Double newVersion = oldVersion;
if (majorVersionChange) {
newVersion = oldVersion + 1.0;
} else if (!fieldsUpdated.isEmpty() || !fieldsAdded.isEmpty() || !fieldsDeleted.isEmpty()) {
newVersion = oldVersion + 0.1;
}
LOG.info("{}->{} - Fields added {}, updated {}, deleted {}",
oldVersion, newVersion, fieldsAdded, fieldsUpdated, fieldsDeleted);
return newVersion;
}
public abstract void store() throws IOException;
protected boolean update(String field, Object orig, Object updated) {
if (orig == null && updated == null) {
return false;
}
if (orig == null) {
fieldsAdded.add(field);
return true;
} else if (updated == null) {
fieldsDeleted.add(field);
return true;
} else if (!orig.equals(updated)) {
fieldsUpdated.add(field);
return true;
}
return false;
}
}

View File

@ -38,6 +38,7 @@ import org.openmetadata.catalog.exception.EntityNotFoundException;
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO;
import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardDAO;
import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO;
import org.openmetadata.catalog.jdbi3.EntityRelationshipDAO3;
import org.openmetadata.catalog.jdbi3.EntityRepository;
import org.openmetadata.catalog.jdbi3.EntityRelationshipDAO;
import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
@ -45,7 +46,8 @@ import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO;
import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO;
import org.openmetadata.catalog.jdbi3.Relationship;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TableDAO;
import org.openmetadata.catalog.jdbi3.TableDAO3;
import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO;
import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
@ -183,6 +185,12 @@ public final class EntityUtil {
}
}
public static void validateUser(org.openmetadata.catalog.jdbi3.UserDAO userDAO, String userId) {
if (!userDAO.exists(userId)) {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Entity.USER, userId));
}
}
// Get owner for a given entity
public static EntityReference populateOwner(UUID id, EntityRelationshipDAO entityRelationshipDAO, UserDAO userDAO,
TeamDAO teamDAO) throws IOException {
@ -193,6 +201,16 @@ public final class EntityUtil {
return ids.isEmpty() ? null : EntityUtil.populateOwner(userDAO, teamDAO, ids.get(0));
}
// Get owner for a given entity
public static EntityReference populateOwner(UUID id, EntityRelationshipDAO3 entityRelationshipDAO, org.openmetadata.catalog.jdbi3.UserDAO userDAO,
org.openmetadata.catalog.jdbi3.TeamDAO teamDAO) throws IOException {
List<EntityReference> ids = entityRelationshipDAO.findFrom(id.toString(), Relationship.OWNS.ordinal());
if (ids.size() > 1) {
LOG.warn("Possible database issues - multiple owners {} found for entity {}", ids, id);
}
return ids.isEmpty() ? null : EntityUtil.populateOwner(userDAO, teamDAO, ids.get(0));
}
/**
* For given Owner with Id and Type that can be either team or user,
* validate Owner ID and return fully populated Owner
@ -219,6 +237,28 @@ public final class EntityUtil {
return owner;
}
public static EntityReference populateOwner(org.openmetadata.catalog.jdbi3.UserDAO userDAO, org.openmetadata.catalog.jdbi3.TeamDAO teamDAO,
EntityReference owner)
throws IOException {
if (owner == null) {
return null;
}
String id = owner.getId().toString();
if (owner.getType().equalsIgnoreCase("user")) {
User ownerInstance = EntityUtil.validate(id, userDAO.findById(id), User.class);
owner.setName(ownerInstance.getName());
if (Optional.ofNullable(ownerInstance.getDeactivated()).orElse(false)) {
throw new IllegalArgumentException(CatalogExceptionMessage.deactivatedUser(id));
}
} else if (owner.getType().equalsIgnoreCase("team")) {
Team ownerInstance = EntityUtil.validate(id, teamDAO.findById(id), Team.class);
owner.setDescription(ownerInstance.getDescription());
owner.setName(ownerInstance.getName());
} else {
throw new IllegalArgumentException(String.format("Invalid ownerType %s", owner.getType()));
}
return owner;
}
public static void setOwner(EntityRelationshipDAO dao, UUID ownedEntityId, String ownedEntityType,
EntityReference owner) {
// Add relationship owner --- owns ---> ownedEntity
@ -229,6 +269,16 @@ public final class EntityUtil {
}
}
public static void setOwner(EntityRelationshipDAO3 dao, UUID ownedEntityId, String ownedEntityType,
EntityReference owner) {
// Add relationship owner --- owns ---> ownedEntity
if (owner != null) {
LOG.info("Adding owner {}:{} for entity {}", owner.getType(), owner.getId(), ownedEntityId);
dao.insert(owner.getId().toString(), ownedEntityId.toString(), owner.getType(), ownedEntityType,
Relationship.OWNS.ordinal());
}
}
/**
* Unassign owner relationship for a given entity
*/
@ -240,6 +290,17 @@ public final class EntityUtil {
}
}
/**
* Unassign owner relationship for a given entity
*/
public static void unassignOwner(EntityRelationshipDAO3 dao, EntityReference owner, String ownedEntityId) {
if (owner != null && owner.getId() != null) {
LOG.info("Removing owner {}:{} for entity {}", owner.getType(), owner.getId(),
ownedEntityId);
dao.delete(owner.getId().toString(), ownedEntityId, Relationship.OWNS.ordinal());
}
}
public static void updateOwner(EntityRelationshipDAO dao, EntityReference originalOwner, EntityReference newOwner,
UUID ownedEntityId, String ownedEntityType) {
// TODO inefficient use replace instead of delete and add?
@ -248,6 +309,14 @@ public final class EntityUtil {
setOwner(dao, ownedEntityId, ownedEntityType, newOwner);
}
public static void updateOwner(EntityRelationshipDAO3 dao, EntityReference originalOwner, EntityReference newOwner,
UUID ownedEntityId, String ownedEntityType) {
// TODO inefficient use replace instead of delete and add?
// TODO check for orig and new owners being the same
unassignOwner(dao, originalOwner, ownedEntityId.toString());
setOwner(dao, ownedEntityId, ownedEntityType, newOwner);
}
public static List<EntityReference> getEntityReference(List<EntityReference> list, TableDAO tableDAO,
DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
DashboardDAO dashboardDAO, ReportDAO reportDAO,
@ -263,6 +332,22 @@ public final class EntityUtil {
return list;
}
public static List<EntityReference> getEntityReference(List<EntityReference> list, TableDAO3 tableDAO,
DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
DashboardDAO dashboardDAO, ReportDAO reportDAO,
TopicDAO topicDAO, ChartDAO chartDAO,
TaskDAO taskDAO, ModelDAO modelDAO,
PipelineDAO pipelineDAO) throws IOException {
for (EntityReference ref : list) {
getEntityReference(
ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO,
topicDAO, chartDAO, taskDAO, modelDAO, pipelineDAO
);
}
return list;
}
public static EntityReference getEntityReference(EntityReference ref, TableDAO tableDAO, DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
@ -305,6 +390,48 @@ public final class EntityUtil {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entity));
}
public static EntityReference getEntityReference(EntityReference ref, TableDAO3 tableDAO3, DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO)
throws IOException {
// Note href to entity reference is not added here
String entity = ref.getType();
String id = ref.getId().toString();
if (entity.equalsIgnoreCase(Entity.TABLE)) {
Table instance = EntityUtil.validate(id, tableDAO3.findById(id), Table.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.DATABASE)) {
Database instance = EntityUtil.validate(id, databaseDAO.findById(id), Database.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.METRICS)) {
Metrics instance = EntityUtil.validate(id, metricsDAO.findById(id), Metrics.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.DASHBOARD)) {
Dashboard instance = EntityUtil.validate(id, dashboardDAO.findById(id), Dashboard.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.REPORT)) {
Report instance = EntityUtil.validate(id, reportDAO.findById(id), Report.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.TOPIC)) {
Topic instance = EntityUtil.validate(id, topicDAO.findById(id), Topic.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.CHART)) {
Chart instance = EntityUtil.validate(id, chartDAO.findById(id), Chart.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
Task instance = EntityUtil.validate(id, taskDAO.findById(id), Task.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.PIPELINE)) {
Pipeline instance = EntityUtil.validate(id, pipelineDAO.findById(id), Pipeline.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.MODEL)) {
Model instance = EntityUtil.validate(id, modelDAO.findById(id), Model.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
}
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entity));
}
public static EntityReference getEntityReference(String entity, UUID id, TableDAO tableDAO, DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
@ -315,6 +442,16 @@ public final class EntityUtil {
reportDAO, topicDAO, chartDAO, taskDAO, modelDAO, pipelineDAO);
}
public static EntityReference getEntityReference(String entity, UUID id, TableDAO3 tableDAO3, DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO)
throws IOException {
EntityReference ref = new EntityReference().withId(id).withType(entity);
return getEntityReference(ref, tableDAO3, databaseDAO, metricsDAO, dashboardDAO,
reportDAO, topicDAO, chartDAO, taskDAO, modelDAO, pipelineDAO);
}
public static EntityReference getEntityReferenceByName(String entity, String fqn, TableDAO tableDAO,
DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
@ -322,7 +459,47 @@ public final class EntityUtil {
PipelineDAO pipelineDAO)
throws IOException {
if (entity.equalsIgnoreCase(Entity.TABLE)) {
Table instance = EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class);
Table instance = EntityUtil.validate(fqn, tableDAO.findByFqn(fqn), Table.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.DATABASE)) {
Database instance = EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.METRICS)) {
Metrics instance = EntityUtil.validate(fqn, metricsDAO.findByFQN(fqn), Metrics.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.REPORT)) {
Report instance = EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.TOPIC)) {
Topic instance = EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.CHART)) {
Chart instance = EntityUtil.validate(fqn, chartDAO.findByFQN(fqn), Chart.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.DASHBOARD)) {
Dashboard instance = EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.TASK)) {
Task instance = EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.PIPELINE)) {
Pipeline instance = EntityUtil.validate(fqn, pipelineDAO.findByFQN(fqn), Pipeline.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.MODEL)) {
Model instance = EntityUtil.validate(fqn, modelDAO.findByFQN(fqn), Model.class);
return getEntityReference(instance);
}
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn));
}
public static EntityReference getEntityReferenceByName(String entity, String fqn, TableDAO3 tableDAO3,
DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO,
DashboardDAO dashboardDAO, TaskDAO taskDAO, ModelDAO modelDAO,
PipelineDAO pipelineDAO)
throws IOException {
if (entity.equalsIgnoreCase(Entity.TABLE)) {
Table instance = EntityUtil.validate(fqn, tableDAO3.findByFqn(fqn), Table.class);
return getEntityReference(instance);
} else if (entity.equalsIgnoreCase(Entity.DATABASE)) {
Database instance = EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class);
@ -430,7 +607,7 @@ public final class EntityUtil {
}
public static EntityReference validateEntityLink(EntityLink entityLink, UserDAO userDAO, TeamDAO teamDAO,
TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
TableDAO3 tableDAO3, DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
DashboardDAO dashboardDAO, ReportDAO reportDAO, TopicDAO topicDAO,
TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO)
throws IOException {
@ -441,7 +618,7 @@ public final class EntityUtil {
} else if (entityType.equalsIgnoreCase(Entity.TEAM)) {
return getEntityReference(EntityUtil.validate(fqn, teamDAO.findByName(fqn), Team.class));
} else if (entityType.equalsIgnoreCase(Entity.TABLE)) {
return getEntityReference(EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class));
return getEntityReference(EntityUtil.validate(fqn, tableDAO3.findByFqn(fqn), Table.class));
} else if (entityType.equalsIgnoreCase(Entity.DATABASE)) {
return getEntityReference(EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class));
} else if (entityType.equalsIgnoreCase(Entity.METRICS)) {
@ -463,6 +640,39 @@ public final class EntityUtil {
}
}
public static EntityReference validateEntityLink(EntityLink entityLink, UserDAO userDAO, TeamDAO teamDAO,
TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
DashboardDAO dashboardDAO, ReportDAO reportDAO, TopicDAO topicDAO,
TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO)
throws IOException {
String entityType = entityLink.getEntityType();
String fqn = entityLink.getEntityId();
if (entityType.equalsIgnoreCase(Entity.USER)) {
return getEntityReference(EntityUtil.validate(fqn, userDAO.findByName(fqn), User.class));
} else if (entityType.equalsIgnoreCase(Entity.TEAM)) {
return getEntityReference(EntityUtil.validate(fqn, teamDAO.findByName(fqn), Team.class));
} else if (entityType.equalsIgnoreCase(Entity.TABLE)) {
return getEntityReference(EntityUtil.validate(fqn, tableDAO.findByFqn(fqn), Table.class));
} else if (entityType.equalsIgnoreCase(Entity.DATABASE)) {
return getEntityReference(EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class));
} else if (entityType.equalsIgnoreCase(Entity.METRICS)) {
return getEntityReference(EntityUtil.validate(fqn, metricsDAO.findByFQN(fqn), Metrics.class));
} else if (entityType.equalsIgnoreCase(Entity.DASHBOARD)) {
return getEntityReference(EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class));
} else if (entityType.equalsIgnoreCase(Entity.REPORT)) {
return getEntityReference(EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class));
} else if (entityType.equalsIgnoreCase(Entity.TOPIC)) {
return getEntityReference(EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class));
} else if (entityType.equalsIgnoreCase(Entity.TASK)) {
return getEntityReference(EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class));
} else if (entityType.equalsIgnoreCase(Entity.PIPELINE)) {
return getEntityReference(EntityUtil.validate(fqn, pipelineDAO.findByFQN(fqn), Pipeline.class));
} else if (entityType.equalsIgnoreCase(Entity.MODEL)) {
return getEntityReference(EntityUtil.validate(fqn, modelDAO.findByFQN(fqn), Model.class));
} else {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entityType, fqn));
}
}
public static UsageDetails getLatestUsage(UsageDAO usageDAO, UUID entityId) {
LOG.debug("Getting latest usage for {}", entityId);
@ -476,6 +686,18 @@ public final class EntityUtil {
return details;
}
public static UsageDetails getLatestUsage(org.openmetadata.catalog.jdbi3.UsageDAO usageDAO, UUID entityId) {
LOG.debug("Getting latest usage for {}", entityId);
UsageDetails details = usageDAO.getLatestUsage(entityId.toString());
if (details == null) {
LOG.debug("Usage details not found. Sending default usage");
UsageStats stats = new UsageStats().withCount(0).withPercentileRank(0.0);
details = new UsageDetails().withDailyStats(stats).withWeeklyStats(stats).withMonthlyStats(stats)
.withDate(RestUtil.DATE_FORMAT.format(new Date()));
}
return details;
}
public static EntityReference getEntityReference(Pipeline pipeline) {
return new EntityReference().withDescription(pipeline.getDescription()).withId(pipeline.getId())
.withName(pipeline.getFullyQualifiedName()).withType(Entity.PIPELINE);
@ -570,6 +792,28 @@ public final class EntityUtil {
}
}
/**
* Apply tags {@code tagLabels} to the entity or field identified by {@code targetFQN}
*/
public static void applyTags(org.openmetadata.catalog.jdbi3.TagDAO tagDAO, List<TagLabel> tagLabels, String targetFQN) throws IOException {
for (TagLabel tagLabel : Optional.ofNullable(tagLabels).orElse(Collections.emptyList())) {
String json = tagDAO.findTag(tagLabel.getTagFQN());
if (json == null) {
// Invalid TagLabel
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Tag.class.getSimpleName(),
tagLabel.getTagFQN()));
}
Tag tag = JsonUtils.readValue(json, Tag.class);
// Apply tagLabel to targetFQN that identifies an entity or field
tagDAO.applyTag(tagLabel.getTagFQN(), targetFQN, tagLabel.getLabelType().ordinal(),
tagLabel.getState().ordinal());
// Apply derived tags
List<TagLabel> derivedTags = getDerivedTags(tagLabel, tag);
applyTags(tagDAO, derivedTags, targetFQN);
}
}
public static List<TagLabel> getDerivedTags(TagLabel tagLabel, Tag tag) {
List<TagLabel> derivedTags = new ArrayList<>();
for (String fqn : Optional.ofNullable(tag.getAssociatedTags()).orElse(Collections.emptyList())) {
@ -600,6 +844,27 @@ public final class EntityUtil {
return updatedTagLabels;
}
/**
* Validate given list of tags and add derived tags to it
*/
public static List<TagLabel> addDerivedTags(org.openmetadata.catalog.jdbi3.TagDAO tagDAO, List<TagLabel> tagLabels) throws IOException {
List<TagLabel> updatedTagLabels = new ArrayList<>();
for (TagLabel tagLabel : Optional.ofNullable(tagLabels).orElse(Collections.emptyList())) {
String json = tagDAO.findTag(tagLabel.getTagFQN());
if (json == null) {
// Invalid TagLabel
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Tag.class.getSimpleName(),
tagLabel.getTagFQN()));
}
Tag tag = JsonUtils.readValue(json, Tag.class);
updatedTagLabels.add(tagLabel);
// Apply derived tags
List<TagLabel> derivedTags = getDerivedTags(tagLabel, tag);
updatedTagLabels = EntityUtil.mergeTags(updatedTagLabels, derivedTags);
}
return updatedTagLabels;
}
public static void removeTags(TagDAO tagDAO, String fullyQualifiedName) {
tagDAO.deleteTags(fullyQualifiedName);
}
@ -608,6 +873,10 @@ public final class EntityUtil {
tagDAO.deleteTagsByPrefix(fullyQualifiedName);
}
public static void removeTagsByPrefix(org.openmetadata.catalog.jdbi3.TagDAO tagDAO, String fullyQualifiedName) {
tagDAO.deleteTagsByPrefix(fullyQualifiedName);
}
public static List<TagLabel> mergeTags(List<TagLabel> list1, List<TagLabel> list2) {
List<TagLabel> mergedTags = Stream.concat(Optional.ofNullable(list1).orElse(Collections.emptyList()).stream(),
Optional.ofNullable(list2).orElse(Collections.emptyList()).stream())
@ -640,10 +909,26 @@ public final class EntityUtil {
Relationship.FOLLOWS.ordinal()) > 0;
}
public static boolean addFollower(EntityRelationshipDAO3 dao, org.openmetadata.catalog.jdbi3.UserDAO userDAO,
String followedEntityId,
String followedEntityType, String followerId, String followerEntity)
throws IOException {
User user = EntityUtil.validate(followerId, userDAO.findById(followerId), User.class);
if (Optional.ofNullable(user.getDeactivated()).orElse(false)) {
throw new IllegalArgumentException(CatalogExceptionMessage.deactivatedUser(followerId));
}
return dao.insert(followerId, followedEntityId, followerEntity, followedEntityType,
Relationship.FOLLOWS.ordinal()) > 0;
}
public static void removeFollower(EntityRelationshipDAO dao, String followedEntityId, String followerId) {
dao.delete(followerId, followedEntityId, Relationship.FOLLOWS.ordinal());
}
public static void removeFollower(EntityRelationshipDAO3 dao, String followedEntityId, String followerId) {
dao.delete(followerId, followedEntityId, Relationship.FOLLOWS.ordinal());
}
public static List<EntityReference> getFollowers(UUID followedEntityId, EntityRelationshipDAO entityRelationshipDAO,
UserDAO userDAO) throws IOException {
List<String> followerIds = entityRelationshipDAO.findFrom(followedEntityId.toString(),
@ -657,6 +942,18 @@ public final class EntityUtil {
return followers;
}
public static List<EntityReference> getFollowers(UUID followedEntityId, EntityRelationshipDAO3 entityRelationshipDAO,
org.openmetadata.catalog.jdbi3.UserDAO userDAO) throws IOException {
List<String> followerIds = entityRelationshipDAO.findFrom(followedEntityId.toString(),
Relationship.FOLLOWS.ordinal(),
Entity.USER);
List<EntityReference> followers = new ArrayList<>();
for (String followerId : followerIds) {
User user = EntityUtil.validate(followerId, userDAO.findById(followerId), User.class);
followers.add(new EntityReference().withName(user.getName()).withId(user.getId()).withType("user"));
}
return followers;
}
public static class Fields {
private final List<String> fieldList;

12
pom.xml
View File

@ -43,6 +43,7 @@
<jackson.version>2.12.3</jackson.version>
<dropwizard.version>2.0.23</dropwizard.version>
<dropwizard-jdbi.version>2.0.0-rc9</dropwizard-jdbi.version>
<dropwizard-jdbi3.version>2.0.25</dropwizard-jdbi3.version>
<jersey-media-multipart.version>2.33</jersey-media-multipart.version>
<jersey-client.version>2.33</jersey-client.version>
<jersey-media-json-jackson.version>2.33</jersey-media-json-jackson.version>
@ -61,6 +62,7 @@
<wiremock-standalone.version>2.15.0</wiremock-standalone.version>
<jacoco-plugin.version>0.8.6</jacoco-plugin.version>
<jdbi.version>2.78</jdbi.version>
<jdbi3.version>3.23.0</jdbi3.version>
<commons-cli.version>1.4</commons-cli.version>
<commons-io.version>2.10.0</commons-io.version>
<flyway.version>7.11.0</flyway.version>
@ -137,11 +139,21 @@
<artifactId>dropwizard-jdbi</artifactId>
<version>${dropwizard-jdbi.version}</version>
</dependency>
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-jdbi3</artifactId>
<version>${dropwizard-jdbi3.version}</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<version>${jdbi.version}</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-core</artifactId>
<version>${jdbi3.version}</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>