From aec36fe6646add49003a00bb5e211764c7728131 Mon Sep 17 00:00:00 2001 From: sureshms Date: Wed, 20 Oct 2021 14:21:24 -0700 Subject: [PATCH] Table entity migrated to jdbi3 --- catalog-rest-service/pom.xml | 4 + .../catalog/CatalogApplication.java | 28 ++ .../CatalogGenericExceptionMapper.java | 10 + .../catalog/jdbi3/DatabaseDAO.java | 51 +++ .../catalog/jdbi3/DatabaseRepository.java | 1 - .../catalog/jdbi3/EntityExtensionDAO3.java | 32 ++ .../catalog/jdbi3/EntityRelationshipDAO3.java | 88 +++++ .../catalog/jdbi3/FeedRepository.java | 1 - .../catalog/jdbi3/FieldRelationshipDAO3.java | 82 +++++ .../jdbi3/FromEntityReferenceMapper3.java | 34 ++ .../catalog/jdbi3/LineageRepository.java | 1 - .../openmetadata/catalog/jdbi3/TableDAO.java | 50 +++ .../openmetadata/catalog/jdbi3/TableDAO3.java | 54 ++++ .../catalog/jdbi3/TableRepository3.java | 53 +++ ...sitory.java => TableRepositoryHelper.java} | 188 ++++------- .../openmetadata/catalog/jdbi3/TagDAO.java | 69 ++++ .../openmetadata/catalog/jdbi3/TeamDAO.java | 43 +++ .../catalog/jdbi3/TeamRepository.java | 1 - .../jdbi3/ToEntityReferenceMapper3.java | 32 ++ .../openmetadata/catalog/jdbi3/UsageDAO.java | 86 +++++ .../catalog/jdbi3/UsageRepository.java | 1 - .../openmetadata/catalog/jdbi3/UserDAO.java | 48 +++ .../catalog/jdbi3/UserRepository.java | 1 - .../catalog/resources/CollectionRegistry.java | 18 ++ .../resources/databases/TableResource.java | 12 +- .../catalog/util/EntityUpdater.java | 3 +- .../catalog/util/EntityUpdater3.java | 130 ++++++++ .../openmetadata/catalog/util/EntityUtil.java | 305 +++++++++++++++++- pom.xml | 12 + 29 files changed, 1291 insertions(+), 147 deletions(-) create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseDAO.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityExtensionDAO3.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO3.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FieldRelationshipDAO3.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FromEntityReferenceMapper3.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableDAO.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableDAO3.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository3.java rename catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/{TableRepository.java => TableRepositoryHelper.java} (80%) create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TagDAO.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamDAO.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ToEntityReferenceMapper3.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageDAO.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserDAO.java create mode 100644 catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUpdater3.java diff --git a/catalog-rest-service/pom.xml b/catalog-rest-service/pom.xml index 5770fa0ea2c..06730187e57 100644 --- a/catalog-rest-service/pom.xml +++ b/catalog-rest-service/pom.xml @@ -35,6 +35,10 @@ io.dropwizard dropwizard-jdbi + + io.dropwizard + dropwizard-jdbi3 + mysql mysql-connector-java diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java index 0e945e84afb..7781ffa09cc 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/CatalogApplication.java @@ -20,7 +20,9 @@ import com.google.inject.Guice; import com.google.inject.Injector; import io.dropwizard.health.conf.HealthConfiguration; import io.dropwizard.health.core.HealthCheckBundle; +import io.dropwizard.jdbi3.JdbiFactory; import io.dropwizard.jersey.jackson.JsonProcessingExceptionMapper; +import org.jdbi.v3.core.Jdbi; import org.openmetadata.catalog.events.EventFilter; import org.openmetadata.catalog.exception.CatalogGenericExceptionMapper; import org.openmetadata.catalog.exception.ConstraintViolationExceptionMapper; @@ -78,6 +80,9 @@ public class CatalogApplication extends Application { final DBIFactory factory = new DBIFactory(); final DBI jdbi = factory.build(environment, catalogConfig.getDataSourceFactory(), "mysql"); + final JdbiFactory factory3 = new JdbiFactory(); + final Jdbi jdbi3 = factory3.build(environment, catalogConfig.getDataSourceFactory(), "mysql3"); + // Register Authorizer registerAuthorizer(catalogConfig, environment, jdbi); @@ -101,6 +106,7 @@ public class CatalogApplication extends Application { environment.jersey().register(JsonMappingExceptionMapper.class); environment.healthChecks().register("UserDatabaseCheck", new CatalogHealthCheck(catalogConfig, jdbi)); registerResources(catalogConfig, environment, jdbi); + registerResources(catalogConfig, environment, jdbi3); // Register Event Handler registerEventFilter(catalogConfig, environment, jdbi); @@ -166,6 +172,28 @@ public class CatalogApplication extends Application { jdbi.registerContainerFactory(new OptionalContainerFactory()); CollectionRegistry.getInstance().registerResources(jdbi, environment, authorizer); + + environment.lifecycle().manage(new Managed() { + @Override + public void start() { + } + + @Override + public void stop() { + long startTime = System.currentTimeMillis(); + LOG.info("Took " + (System.currentTimeMillis() - startTime) + " ms to close all the services"); + } + }); + environment.jersey().register(new SearchResource(config.getElasticSearchConfiguration())); + environment.jersey().register(new JsonPatchProvider()); + ErrorPageErrorHandler eph = new ErrorPageErrorHandler(); + eph.addErrorPage(Response.Status.NOT_FOUND.getStatusCode(), "/"); + environment.getApplicationContext().setErrorHandler(eph); + } + + private void registerResources(CatalogApplicationConfig config, Environment environment, Jdbi jdbi) { + CollectionRegistry.getInstance().registerResources3(jdbi, environment, authorizer); + environment.lifecycle().manage(new Managed() { @Override public void start() { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/exception/CatalogGenericExceptionMapper.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/exception/CatalogGenericExceptionMapper.java index 459a166633b..5c0c78a31cc 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/exception/CatalogGenericExceptionMapper.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/exception/CatalogGenericExceptionMapper.java @@ -55,6 +55,16 @@ public class CatalogGenericExceptionMapper implements ExceptionMapper .entity(new ErrorMessage(response.getStatus(), ex.getLocalizedMessage())) .build(); } else if (ex instanceof UnableToExecuteStatementException) { + // TODO remove this + if (ex.getCause() instanceof SQLIntegrityConstraintViolationException) { + return Response.status(CONFLICT) + .type(MediaType.APPLICATION_JSON_TYPE) + .entity(new ErrorMessage(CONFLICT.getStatusCode(), CatalogExceptionMessage.ENTITY_ALREADY_EXISTS)) + .build(); + + } + } else if (ex instanceof org.jdbi.v3.core.statement.UnableToExecuteStatementException) { + // TODO remove this if (ex.getCause() instanceof SQLIntegrityConstraintViolationException) { return Response.status(CONFLICT) .type(MediaType.APPLICATION_JSON_TYPE) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseDAO.java new file mode 100644 index 00000000000..a5db64f02da --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseDAO.java @@ -0,0 +1,51 @@ +package org.openmetadata.catalog.jdbi3; + +import org.jdbi.v3.sqlobject.SqlObject; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; + +import java.util.List; + +public interface DatabaseDAO { + @SqlUpdate("INSERT INTO database_entity (json) VALUES (:json)") + void insert(@Bind("json") String json); + + @SqlUpdate("UPDATE database_entity SET json = :json where id = :id") + void update(@Bind("id") String id, @Bind("json") String json); + + @SqlQuery("SELECT json FROM database_entity WHERE fullyQualifiedName = :name") + String findByFQN(@Bind("name") String name); + + @SqlQuery("SELECT json FROM database_entity WHERE id = :id") + String findById(@Bind("id") String id); + + @SqlQuery("SELECT count(*) FROM database_entity WHERE " + + "(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL)") + int listCount(@Bind("fqnPrefix") String fqnPrefix); + + @SqlQuery( + "SELECT json FROM (" + + "SELECT fullyQualifiedName, json FROM database_entity WHERE " + + "(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " +// Filter by service name + "fullyQualifiedName < :before " + // Pagination by database fullyQualifiedName + "ORDER BY fullyQualifiedName DESC " + // Pagination ordering by database fullyQualifiedName + "LIMIT :limit" + + ") last_rows_subquery ORDER BY fullyQualifiedName") + List listBefore(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit, + @Bind("before") String before); + + @SqlQuery("SELECT json FROM database_entity WHERE " + + "(fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') OR :fqnPrefix IS NULL) AND " + + "fullyQualifiedName > :after " + + "ORDER BY fullyQualifiedName " + + "LIMIT :limit") + List listAfter(@Bind("fqnPrefix") String fqnPrefix, @Bind("limit") int limit, + @Bind("after") String after); + + @SqlQuery("SELECT EXISTS (SELECT * FROM database_entity WHERE id = :id)") + boolean exists(@Bind("id") String id); + + @SqlUpdate("DELETE FROM database_entity WHERE id = :id") + int delete(@Bind("id") String id); +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseRepository.java index b89839f0d5c..8023432c18e 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseRepository.java @@ -23,7 +23,6 @@ import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.entity.services.DatabaseService; import org.openmetadata.catalog.exception.EntityNotFoundException; import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceDAO; -import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; import org.openmetadata.catalog.jdbi3.UsageRepository.UsageDAO; import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityExtensionDAO3.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityExtensionDAO3.java new file mode 100644 index 00000000000..8635ad9bc17 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityExtensionDAO3.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + + +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; + +public interface EntityExtensionDAO3 { + @SqlUpdate("REPLACE INTO entity_extension(id, extension, jsonSchema, json) " + + "VALUES (:id, :extension, :jsonSchema, :json)") + void insert(@Bind("id") String id, @Bind("extension") String extension, @Bind("jsonSchema") String jsonSchema, + @Bind("json") String json); + + @SqlQuery("SELECT json FROM entity_extension WHERE id = :id AND extension = :extension") + String getExtension(@Bind("id") String id, @Bind("extension") String extension); +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO3.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO3.java new file mode 100644 index 00000000000..6edc57ed3b1 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO3.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; +import org.openmetadata.catalog.type.EntityReference; + +import java.util.List; + +public interface EntityRelationshipDAO3 { + @SqlUpdate("INSERT IGNORE INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation) " + + "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation)") + int insert(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("fromEntity") String fromEntity, + @Bind("toEntity") String toEntity, @Bind("relation") int relation); + + // + // Find to operations + // + @SqlQuery("SELECT toId, toEntity FROM entity_relationship WHERE fromId = :fromId AND relation = :relation") + @RegisterRowMapper(ToEntityReferenceMapper3.class) + List findTo(@Bind("fromId") String fromId, @Bind("relation") int relation); + + @SqlQuery("SELECT toId FROM entity_relationship WHERE " + + "fromId = :fromId AND relation = :relation AND toEntity = :toEntity ORDER BY fromId") + List findTo(@Bind("fromId") String fromId, @Bind("relation") int relation, + @Bind("toEntity") String toEntity); + + @SqlQuery("SELECT count(*) FROM entity_relationship WHERE " + + "fromId = :fromId AND relation = :relation AND toEntity = :toEntity ORDER BY fromId") + int findToCount(@Bind("fromId") String fromId, @Bind("relation") int relation, @Bind("toEntity") String toEntity); + + // + // Find from operations + // + @SqlQuery("SELECT fromId FROM entity_relationship WHERE " + + "toId = :toId AND relation = :relation AND fromEntity = :fromEntity ORDER BY fromId") + List findFrom(@Bind("toId") String toId, @Bind("relation") int relation, + @Bind("fromEntity") String fromEntity); + + @SqlQuery("SELECT fromId, fromEntity FROM entity_relationship WHERE toId = :toId AND relation = :relation " + + "ORDER BY fromId") + @RegisterRowMapper(FromEntityReferenceMapper3.class) + List findFrom(@Bind("toId") String toId, @Bind("relation") int relation); + + @SqlQuery("SELECT fromId, fromEntity FROM entity_relationship WHERE toId = :toId AND relation = :relation AND " + + "fromEntity = :fromEntity ORDER BY fromId") + @RegisterRowMapper(FromEntityReferenceMapper3.class) + List findFromEntity(@Bind("toId") String toId, @Bind("relation") int relation, + @Bind("fromEntity") String fromEntity); + + // + // Delete Operations + // + @SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND toId = :toId AND relation = :relation") + void delete(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("relation") int relation); + + // Delete all the entity relationship fromID --- relation --> entity of type toEntity + @SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND relation = :relation AND toEntity = :toEntity") + void deleteFrom(@Bind("fromId") String fromId, @Bind("relation") int relation, @Bind("toEntity") String toEntity); + + // Delete all the entity relationship fromID --- relation --> to any entity + @SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND relation = :relation") + void deleteFrom(@Bind("fromId") String fromId, @Bind("relation") int relation); + + // Delete all the entity relationship toId <-- relation -- entity of type fromEntity + @SqlUpdate("DELETE from entity_relationship WHERE toId = :toId AND relation = :relation AND fromEntity = :fromEntity") + void deleteTo(@Bind("toId") String toId, @Bind("relation") int relation, @Bind("fromEntity") String fromEntity); + + @SqlUpdate("DELETE from entity_relationship WHERE toId = :id OR fromId = :id") + void deleteAll(@Bind("id") String id); +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FeedRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FeedRepository.java index 0d86e69691f..f081f31c8cd 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FeedRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FeedRepository.java @@ -23,7 +23,6 @@ import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardDAO; import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO; import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO; import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO; -import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FieldRelationshipDAO3.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FieldRelationshipDAO3.java new file mode 100644 index 00000000000..dd173d5237a --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FieldRelationshipDAO3.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + + +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; + +public interface FieldRelationshipDAO3 { + @SqlUpdate("INSERT IGNORE INTO field_relationship(fromFQN, toFQN, fromType, toType, relation) " + + "VALUES (:fromFQN, :toFQN, :fromType, :toType, :relation)") + void insert(@Bind("fromFQN") String fromFQN, @Bind("toFQN") String toFQN, @Bind("fromType") String fromType, + @Bind("toType") String toType, @Bind("relation") int relation); + + @SqlUpdate("INSERT INTO field_relationship(fromFQN, toFQN, fromType, toType, relation, jsonSchema, json) " + + "VALUES (:fromFQN, :toFQN, :fromType, :toType, :relation, :jsonSchema, :json) " + + "ON DUPLICATE KEY UPDATE json = :json") + void upsert(@Bind("fromFQN") String fromFQN, @Bind("toFQN") String toFQN, @Bind("fromType") String fromType, + @Bind("toType") String toType, @Bind("relation") int relation, + @Bind("jsonSchema") String jsonSchema, @Bind("json") String json); + + @SqlQuery("SELECT json FROM field_relationship WHERE " + + "fromFQN = :fromFQN AND toFQN = :toFQN AND fromType = :fromType " + + "AND toType = :toType AND relation = :relation") + String find(@Bind("fromFQN") String fromFQN, @Bind("toFQN") String toFQN, + @Bind("fromType") String fromType, @Bind("toType") String toType, + @Bind("relation") int relation); + + @SqlQuery("SELECT fromFQN, toFQN, json FROM field_relationship WHERE " + + "toFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType = :fromType AND toType = :toType AND relation = :relation") + @RegisterRowMapper(FromFieldMapper.class) + List> listFromByPrefix(@Bind("fqnPrefix") String fqnPrefix, @Bind("fromType") String fromType, + @Bind("toType") String toType, @Bind("relation") int relation); + + @SqlQuery("SELECT fromFQN, toFQN, json FROM field_relationship WHERE " + + "fromFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType = :fromType AND toType = :toType AND relation = :relation") + @RegisterRowMapper(ToFieldMapper.class) + List> listToByPrefix(@Bind("fqnPrefix") String fqnPrefix, @Bind("fromType") String fromType, + @Bind("toType") String toType, @Bind("relation") int relation); + + @SqlUpdate("DELETE from field_relationship WHERE " + + "(toFQN LIKE CONCAT(:fqnPrefix, '.%') OR fromFQN LIKE CONCAT(:fqnPrefix, '.%')) " + + "AND relation = :relation") + void deleteAllByPrefix(@Bind("fqnPrefix") String fqnPrefix, @Bind("relation") int relation); + + class ToFieldMapper implements RowMapper> { + @Override + public List map(ResultSet rs, StatementContext ctx) throws SQLException { + return Arrays.asList(rs.getString("fromFQN"), rs.getString("toFQN"), rs.getString("json")); + } + } + + class FromFieldMapper implements RowMapper> { + @Override + public List map(ResultSet rs, StatementContext ctx) throws SQLException { + return Arrays.asList(rs.getString("toFQN"), rs.getString("fromFQN"), rs.getString("json")); + } + } +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FromEntityReferenceMapper3.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FromEntityReferenceMapper3.java new file mode 100644 index 00000000000..70fb2b10850 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/FromEntityReferenceMapper3.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + +import org.jdbi.v3.core.mapper.RowMapper; +import org.openmetadata.catalog.type.EntityReference; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.ResultSetMapper; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.UUID; + +public class FromEntityReferenceMapper3 implements RowMapper { + @Override + public EntityReference map(ResultSet rs, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException { + return new EntityReference().withId(UUID.fromString(rs.getString("fromId"))) + .withType(rs.getString("fromEntity")); + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java index 0f5761e8fc8..3d25f3add93 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/LineageRepository.java @@ -23,7 +23,6 @@ import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO; import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO; import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO; import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO; -import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableDAO.java new file mode 100644 index 00000000000..9c881aeaa9b --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableDAO.java @@ -0,0 +1,50 @@ +package org.openmetadata.catalog.jdbi3; + +import org.skife.jdbi.v2.sqlobject.Bind; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; + +import java.util.List; + +public interface TableDAO { + @SqlUpdate("INSERT INTO table_entity (json) VALUES (:json)") + void insert(@Bind("json") String json); + + @SqlUpdate("UPDATE table_entity SET json = :json WHERE id = :id") + void update(@Bind("id") String id, @Bind("json") String json); + + @SqlQuery("SELECT json FROM table_entity WHERE id = :tableId") + String findById(@Bind("tableId") String tableId); + + @SqlQuery("SELECT json FROM table_entity WHERE fullyQualifiedName = :tableFQN") + String findByFqn(@Bind("tableFQN") String tableFQN); + + @SqlQuery("SELECT count(*) FROM table_entity WHERE " + + "(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL)") + int listCount(@Bind("databaseFQN") String databaseFQN); + + @SqlQuery( + "SELECT json FROM (" + + "SELECT fullyQualifiedName, json FROM table_entity WHERE " + + "(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND " + + "fullyQualifiedName < :before " + // Pagination by table fullyQualifiedName + "ORDER BY fullyQualifiedName DESC " + // Pagination ordering by table fullyQualifiedName + "LIMIT :limit" + + ") last_rows_subquery ORDER BY fullyQualifiedName") + List listBefore(@Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit, + @Bind("before") String before); + + @SqlQuery("SELECT json FROM table_entity WHERE " + + "(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND "+//Filter by databaseName + "fullyQualifiedName > :after " + // Pagination by table fullyQualifiedName + "ORDER BY fullyQualifiedName " + // Pagination ordering by table fullyQualifiedName + "LIMIT :limit") + List listAfter(@Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit, + @Bind("after") String after); + + @SqlQuery("SELECT EXISTS (SELECT * FROM table_entity WHERE id = :id)") + boolean exists(@Bind("id") String id); + + @SqlUpdate("DELETE FROM table_entity WHERE id = :id") + int delete(@Bind("id") String id); +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableDAO3.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableDAO3.java new file mode 100644 index 00000000000..7d137ede5fc --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableDAO3.java @@ -0,0 +1,54 @@ +package org.openmetadata.catalog.jdbi3; + + +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.customizer.Define; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; + +import java.util.List; + +public interface TableDAO3 extends EntityDAO { + @Override + default String getTableName() { + return "table_entity"; + } + + @Override + @SqlQuery("SELECT json FROM WHERE fullyQualifiedName = :tableFQN") + String findByFqn(@Define("table") String table, @Bind("tableFQN") String tableFQN); + + @Override + @SqlQuery("SELECT count(*) FROM
WHERE " + + "(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL)") + int listCount(@Define("table") String table, @Bind("databaseFQN") String databaseFQN); + + @Override + @SqlQuery( + "SELECT json FROM (" + + "SELECT fullyQualifiedName, json FROM
WHERE " + + "(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND " + + "fullyQualifiedName < :before " + // Pagination by table fullyQualifiedName + "ORDER BY fullyQualifiedName DESC " + // Pagination ordering by table fullyQualifiedName + "LIMIT :limit" + + ") last_rows_subquery ORDER BY fullyQualifiedName") + List listBefore(@Define("table") String table, @Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit, + @Bind("before") String before); + + @Override + @SqlQuery("SELECT json FROM
WHERE " + + "(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND "+//Filter by databaseName + "fullyQualifiedName > :after " + // Pagination by table fullyQualifiedName + "ORDER BY fullyQualifiedName " + // Pagination ordering by table fullyQualifiedName + "LIMIT :limit") + List listAfter(@Define("table") String table, @Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit, + @Bind("after") String after); + + @Override + @SqlQuery("SELECT EXISTS (SELECT * FROM
WHERE id = :id)") + boolean exists(@Define("table") String table, @Bind("id") String id); + + @Override + @SqlUpdate("DELETE FROM
WHERE id = :id") + int delete(@Define("table") String table, @Bind("id") String id); +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository3.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository3.java new file mode 100644 index 00000000000..c8a2f24680c --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository3.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + +import org.jdbi.v3.sqlobject.CreateSqlObject; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; + +import java.util.List; + +public interface TableRepository3 { + @CreateSqlObject + DatabaseDAO databaseDAO(); + + @CreateSqlObject + EntityRelationshipDAO3 relationshipDAO(); + + @CreateSqlObject + FieldRelationshipDAO3 fieldRelationshipDAO(); + + @CreateSqlObject + EntityExtensionDAO3 entityExtensionDAO(); + + @CreateSqlObject + UserDAO userDAO(); + + @CreateSqlObject + TeamDAO teamDAO(); + + @CreateSqlObject + UsageDAO usageDAO(); + + @CreateSqlObject + TagDAO tagDAO(); + + @CreateSqlObject + TableDAO3 tableDAO(); +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepositoryHelper.java similarity index 80% rename from catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java rename to catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepositoryHelper.java index 45d8b4b265e..9a8d6b3cf81 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepositoryHelper.java @@ -17,16 +17,12 @@ package org.openmetadata.catalog.jdbi3; import com.fasterxml.jackson.core.JsonProcessingException; +import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.data.Database; import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.exception.CatalogExceptionMessage; import org.openmetadata.catalog.exception.EntityNotFoundException; -import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO; -import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO; -import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; -import org.openmetadata.catalog.jdbi3.UsageRepository.UsageDAO; -import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; import org.openmetadata.catalog.resources.databases.TableResource; import org.openmetadata.catalog.resources.databases.TableResource.TableList; import org.openmetadata.catalog.type.Column; @@ -41,7 +37,7 @@ import org.openmetadata.catalog.type.TableJoins; import org.openmetadata.catalog.type.TableProfile; import org.openmetadata.catalog.type.TagLabel; import org.openmetadata.catalog.util.EntityInterface; -import org.openmetadata.catalog.util.EntityUpdater; +import org.openmetadata.catalog.util.EntityUpdater3; import org.openmetadata.catalog.util.EntityUtil; import org.openmetadata.catalog.util.EntityUtil.Fields; import org.openmetadata.catalog.util.EventUtils; @@ -50,11 +46,6 @@ import org.openmetadata.catalog.util.RestUtil; import org.openmetadata.catalog.util.RestUtil.PutResponse; import org.openmetadata.catalog.util.ResultList; import org.openmetadata.common.utils.CommonUtil; -import org.skife.jdbi.v2.sqlobject.Bind; -import org.skife.jdbi.v2.sqlobject.CreateSqlObject; -import org.skife.jdbi.v2.sqlobject.SqlQuery; -import org.skife.jdbi.v2.sqlobject.SqlUpdate; -import org.skife.jdbi.v2.sqlobject.Transaction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,56 +72,36 @@ import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityN import static org.openmetadata.catalog.jdbi3.Relationship.JOINED_WITH; import static org.openmetadata.common.utils.CommonUtil.parseDate; -public abstract class TableRepository { - private static final Logger LOG = LoggerFactory.getLogger(TableRepository.class); +public class TableRepositoryHelper { + static final Logger LOG = LoggerFactory.getLogger(TableRepositoryHelper.class); // Table fields that can be patched in a PATCH request - private static final Fields TABLE_PATCH_FIELDS = new Fields(TableResource.FIELD_LIST, + static final Fields TABLE_PATCH_FIELDS = new Fields(TableResource.FIELD_LIST, "owner,columns,database,tags,tableConstraints"); // Table fields that can be updated in a PUT request - private static final Fields TABLE_UPDATE_FIELDS = new Fields(TableResource.FIELD_LIST, + static final Fields TABLE_UPDATE_FIELDS = new Fields(TableResource.FIELD_LIST, "owner,columns,database,tags,tableConstraints"); - @CreateSqlObject - abstract DatabaseDAO databaseDAO(); + public TableRepositoryHelper(TableRepository3 tableRepo3) { + this.tableRepo3 = tableRepo3; + } - @CreateSqlObject - abstract EntityRelationshipDAO relationshipDAO(); - - @CreateSqlObject - abstract FieldRelationshipDAO fieldRelationshipDAO(); - - @CreateSqlObject - abstract EntityExtensionDAO entityExtensionDAO(); - - @CreateSqlObject - abstract TableDAO tableDAO(); - - @CreateSqlObject - abstract UserDAO userDAO(); - - @CreateSqlObject - abstract TeamDAO teamDAO(); - - @CreateSqlObject - abstract UsageDAO usageDAO(); - - @CreateSqlObject - abstract TagDAO tagDAO(); + // TODO initialize + private TableRepository3 tableRepo3; EntityRepository
entityRepository = new EntityRepository<>() { @Override public List listAfter(String fqnPrefix, int limitParam, String after) { - return tableDAO().listAfter(fqnPrefix, limitParam, after); + return tableRepo3.tableDAO().listAfter(fqnPrefix, limitParam, after); } @Override public List listBefore(String fqnPrefix, int limitParam, String after) { - return tableDAO().listBefore(fqnPrefix, limitParam, after); + return tableRepo3.tableDAO().listBefore(fqnPrefix, limitParam, after); } @Override public int listCount(String fqnPrefix) { - return tableDAO().listCount(fqnPrefix); + return tableRepo3.tableDAO().listCount(fqnPrefix); } @Override @@ -140,7 +111,7 @@ public abstract class TableRepository { @Override public Table setFields(Table entity, Fields fields) throws IOException, ParseException { - return TableRepository.this.setFields(entity, fields); + return TableRepositoryHelper.this.setFields(entity, fields); } @Override @@ -173,7 +144,7 @@ public abstract class TableRepository { @Transaction public Table getByName(String fqn, Fields fields) throws IOException, ParseException { - Table table = EntityUtil.validate(fqn, tableDAO().findByFQN(fqn), Table.class); + Table table = EntityUtil.validate(fqn, tableRepo3.tableDAO().findByFqn(fqn), Table.class); return setFields(table, fields); } @@ -194,17 +165,17 @@ public abstract class TableRepository { @Transaction public void delete(String id) { - if (tableDAO().delete(id) <= 0) { + if (tableRepo3.tableDAO().delete(id) <= 0) { throw EntityNotFoundException.byMessage(entityNotFound(Entity.TABLE, id)); } // Remove all relationships - relationshipDAO().deleteAll(id); + tableRepo3.relationshipDAO().deleteAll(id); } @Transaction public PutResponse
createOrUpdate(Table updated, UUID databaseId) throws IOException, ParseException { validateRelationships(updated, databaseId); - Table stored = JsonUtils.readValue(tableDAO().findByFQN(updated.getFullyQualifiedName()), Table.class); + Table stored = JsonUtils.readValue(tableRepo3.tableDAO().findByFqn(updated.getFullyQualifiedName()), Table.class); if (stored == null) { return new PutResponse<>(Status.CREATED, createInternal(updated)); } @@ -228,15 +199,15 @@ public abstract class TableRepository { @Transaction public Status addFollower(String tableId, String userId) throws IOException { - EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class); - return EntityUtil.addFollower(relationshipDAO(), userDAO(), tableId, Entity.TABLE, userId, Entity.USER) ? + EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class); + return EntityUtil.addFollower(tableRepo3.relationshipDAO(), tableRepo3.userDAO(), tableId, Entity.TABLE, userId, Entity.USER) ? Status.CREATED : Status.OK; } @Transaction public void addJoins(String tableId, TableJoins joins) throws IOException, ParseException { // Validate the request content - Table table = EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class); + Table table = EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class); if (!CommonUtil.dateInRange(RestUtil.DATE_FORMAT, joins.getStartDate(), 0, 30)) { throw new IllegalArgumentException("Date range can only include past 30 days starting today"); } @@ -257,7 +228,7 @@ public abstract class TableRepository { @Transaction public void addSampleData(String tableId, TableData tableData) throws IOException { // Validate the request content - Table table = EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class); + Table table = EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class); // Validate all the columns for (String columnName : tableData.getColumns()) { @@ -271,14 +242,14 @@ public abstract class TableRepository { } } - entityExtensionDAO().insert(tableId, "table.sampleData", "tableData", + tableRepo3.entityExtensionDAO().insert(tableId, "table.sampleData", "tableData", JsonUtils.pojoToJson(tableData)); } @Transaction public void addTableProfileData(String tableId, TableProfile tableProfile) throws IOException { // Validate the request content - Table table = EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class); + Table table = EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class); List storedTableProfiles = getTableProfile(table); Map storedMapTableProfiles = new HashMap<>(); @@ -294,19 +265,19 @@ public abstract class TableRepository { storedMapTableProfiles.put(tableProfile.getProfileDate(), tableProfile); List updatedProfiles = new ArrayList<>(storedMapTableProfiles.values()); - entityExtensionDAO().insert(tableId, "table.tableProfile", "tableProfile", + tableRepo3.entityExtensionDAO().insert(tableId, "table.tableProfile", "tableProfile", JsonUtils.pojoToJson(updatedProfiles)); } @Transaction public void deleteFollower(String tableId, String userId) { - EntityUtil.validateUser(userDAO(), userId); - EntityUtil.removeFollower(relationshipDAO(), tableId, userId); + EntityUtil.validateUser(tableRepo3.userDAO(), userId); + EntityUtil.removeFollower(tableRepo3.relationshipDAO(), tableId, userId); } @Transaction public EntityReference getOwnerReference(Table table) throws IOException { - return EntityUtil.populateOwner(userDAO(), teamDAO(), table.getOwner()); + return EntityUtil.populateOwner(tableRepo3.userDAO(), tableRepo3.teamDAO(), table.getOwner()); } @@ -322,7 +293,7 @@ public abstract class TableRepository { private void validateRelationships(Table table, UUID databaseId) throws IOException { // Validate database - Database db = EntityUtil.validate(databaseId.toString(), databaseDAO().findById(databaseId.toString()), + Database db = EntityUtil.validate(databaseId.toString(), tableRepo3.databaseDAO().findById(databaseId.toString()), Database.class); table.setDatabase(EntityUtil.getEntityReference(db)); // Validate and set other relationships @@ -345,7 +316,7 @@ public abstract class TableRepository { } for (Column column : columns) { - column.setTags(EntityUtil.addDerivedTags(tagDAO(), column.getTags())); + column.setTags(EntityUtil.addDerivedTags(tableRepo3.tagDAO(), column.getTags())); if (column.getChildren() != null) { addDerivedTags(column.getChildren()); } @@ -358,10 +329,10 @@ public abstract class TableRepository { setColumnFQN(table.getFullyQualifiedName(), table.getColumns()); // Check if owner is valid and set the relationship - table.setOwner(EntityUtil.populateOwner(userDAO(), teamDAO(), table.getOwner())); + table.setOwner(EntityUtil.populateOwner(tableRepo3.userDAO(), tableRepo3.teamDAO(), table.getOwner())); // Validate table tags and add derived tags to the list - table.setTags(EntityUtil.addDerivedTags(tagDAO(), table.getTags())); + table.setTags(EntityUtil.addDerivedTags(tableRepo3.tagDAO(), table.getTags())); // Validate column tags addDerivedTags(table.getColumns()); @@ -382,9 +353,9 @@ public abstract class TableRepository { table.getColumns().forEach(column -> column.setTags(null)); if (update) { - tableDAO().update(table.getId().toString(), JsonUtils.pojoToJson(table)); + tableRepo3.tableDAO().update(table.getId().toString(), JsonUtils.pojoToJson(table)); } else { - tableDAO().insert(JsonUtils.pojoToJson(table)); + tableRepo3.tableDAO().insert(JsonUtils.pojoToJson(table)); } // Restore the relationships @@ -417,11 +388,11 @@ public abstract class TableRepository { private void addRelationships(Table table) throws IOException { // Add relationship from database to table String databaseId = table.getDatabase().getId().toString(); - relationshipDAO().insert(databaseId, table.getId().toString(), Entity.DATABASE, Entity.TABLE, + tableRepo3.relationshipDAO().insert(databaseId, table.getId().toString(), Entity.DATABASE, Entity.TABLE, Relationship.CONTAINS.ordinal()); // Add table owner relationship - EntityUtil.setOwner(relationshipDAO(), table.getId(), Entity.TABLE, table.getOwner()); + EntityUtil.setOwner(tableRepo3.relationshipDAO(), table.getId(), Entity.TABLE, table.getOwner()); // Add tag to table relationship applyTags(table); @@ -431,7 +402,7 @@ public abstract class TableRepository { private void applyTags(List columns) throws IOException { // Add column level tags by adding tag to column relationship for (Column column : columns) { - EntityUtil.applyTags(tagDAO(), column.getTags(), column.getFullyQualifiedName()); + EntityUtil.applyTags(tableRepo3.tagDAO(), column.getTags(), column.getFullyQualifiedName()); column.setTags(getTags(column.getFullyQualifiedName())); // Update tag list to handle derived tags if (column.getChildren() != null) { applyTags(column.getChildren()); @@ -441,7 +412,7 @@ public abstract class TableRepository { private void applyTags(Table table) throws IOException { // Add table level tags by adding tag to table relationship - EntityUtil.applyTags(tagDAO(), table.getTags(), table.getFullyQualifiedName()); + EntityUtil.applyTags(tableRepo3.tagDAO(), table.getTags(), table.getFullyQualifiedName()); table.setTags(getTags(table.getFullyQualifiedName())); // Update tag to handle additional derived tags applyTags(table.getColumns()); } @@ -465,16 +436,16 @@ public abstract class TableRepository { private Database getDatabase(Table table) throws IOException { // Find database for the table String id = table.getId().toString(); - List result = relationshipDAO().findFrom(id, Relationship.CONTAINS.ordinal(), Entity.DATABASE); + List result = tableRepo3.relationshipDAO().findFrom(id, Relationship.CONTAINS.ordinal(), Entity.DATABASE); if (result.size() != 1) { throw EntityNotFoundException.byMessage(String.format("Database for table %s Not found", id)); } String databaseId = result.get(0); - return EntityUtil.validate(databaseId, databaseDAO().findById(databaseId), Database.class); + return EntityUtil.validate(databaseId, tableRepo3.databaseDAO().findById(databaseId), Database.class); } private Table validateTable(String tableId) throws IOException { - return EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class); + return EntityUtil.validate(tableId, tableRepo3.tableDAO().findById(tableId), Table.class); } private Table setFields(Table table, Fields fields) throws IOException, ParseException { @@ -482,7 +453,7 @@ public abstract class TableRepository { table.setTableConstraints(fields.contains("tableConstraints") ? table.getTableConstraints() : null); table.setOwner(fields.contains("owner") ? getOwner(table) : null); table.setFollowers(fields.contains("followers") ? getFollowers(table) : null); - table.setUsageSummary(fields.contains("usageSummary") ? EntityUtil.getLatestUsage(usageDAO(), table.getId()) : + table.setUsageSummary(fields.contains("usageSummary") ? EntityUtil.getLatestUsage(tableRepo3.usageDAO(), table.getId()) : null); table.setDatabase(fields.contains("database") ? EntityUtil.getEntityReference(getDatabase(table)) : null); table.setTags(fields.contains("tags") ? getTags(table.getFullyQualifiedName()) : null); @@ -495,15 +466,15 @@ public abstract class TableRepository { } private EntityReference getOwner(Table table) throws IOException { - return table == null ? null : EntityUtil.populateOwner(table.getId(), relationshipDAO(), userDAO(), teamDAO()); + return table == null ? null : EntityUtil.populateOwner(table.getId(), tableRepo3.relationshipDAO(), tableRepo3.userDAO(), tableRepo3.teamDAO()); } private List getFollowers(Table table) throws IOException { - return table == null ? null : EntityUtil.getFollowers(table.getId(), relationshipDAO(), userDAO()); + return table == null ? null : EntityUtil.getFollowers(table.getId(), tableRepo3.relationshipDAO(), tableRepo3.userDAO()); } private List getTags(String fqn) { - return tagDAO().getTags(fqn); + return tableRepo3.tagDAO().getTags(fqn); } private void getColumnTags(boolean setTags, List columns) { @@ -545,7 +516,7 @@ public abstract class TableRepository { for (JoinedWith joinedWith : joinedWithList) { // Validate table String tableFQN = getTableFQN(joinedWith.getFullyQualifiedName()); - Table joinedWithTable = EntityUtil.validate(tableFQN, tableDAO().findByFQN(tableFQN), Table.class); + Table joinedWithTable = EntityUtil.validate(tableFQN, tableRepo3.tableDAO().findByFqn(tableFQN), Table.class); // Validate column validateColumnFQN(joinedWithTable, joinedWith.getFullyQualifiedName()); @@ -580,7 +551,7 @@ public abstract class TableRepository { fromColumnFQN = joinedWith.getFullyQualifiedName(); toColumnFQN = columnFQN; } - String json = fieldRelationshipDAO().find(fromColumnFQN, toColumnFQN, "table.columns.column", + String json = tableRepo3.fieldRelationshipDAO().find(fromColumnFQN, toColumnFQN, "table.columns.column", "table.columns.column", JOINED_WITH.ordinal()); DailyCount dailyCount = new DailyCount().withCount(joinedWith.getJoinCount()).withDate(date); @@ -624,7 +595,7 @@ public abstract class TableRepository { } json = JsonUtils.pojoToJson(dailyCountList); - fieldRelationshipDAO().upsert(fromColumnFQN, toColumnFQN, "table.columns.column", + tableRepo3.fieldRelationshipDAO().upsert(fromColumnFQN, toColumnFQN, "table.columns.column", "table.columns.column", JOINED_WITH.ordinal(), "dailyCount", json); } } @@ -635,9 +606,9 @@ public abstract class TableRepository { TableJoins tableJoins = new TableJoins().withStartDate(todayMinus30Days).withDayCount(30) .withColumnJoins(Collections.emptyList()); - List> list = fieldRelationshipDAO().listToByPrefix(table.getFullyQualifiedName(), + List> list = tableRepo3.fieldRelationshipDAO().listToByPrefix(table.getFullyQualifiedName(), "table.columns.column", "table.columns.column", JOINED_WITH.ordinal()); - list.addAll(fieldRelationshipDAO().listFromByPrefix(table.getFullyQualifiedName(), "table.columns.column", + list.addAll(tableRepo3.fieldRelationshipDAO().listFromByPrefix(table.getFullyQualifiedName(), "table.columns.column", "table.columns.column", JOINED_WITH.ordinal())); if (list.size() == 0) { // No join information found. Return empty list @@ -671,12 +642,12 @@ public abstract class TableRepository { } private TableData getSampleData(Table table) throws IOException { - return JsonUtils.readValue(entityExtensionDAO().getExtension(table.getId().toString(), "table.sampleData"), + return JsonUtils.readValue(tableRepo3.entityExtensionDAO().getExtension(table.getId().toString(), "table.sampleData"), TableData.class); } private List getTableProfile(Table table) throws IOException { - List tableProfiles = JsonUtils.readObjects(entityExtensionDAO().getExtension(table.getId().toString(), + List tableProfiles = JsonUtils.readObjects(tableRepo3.entityExtensionDAO().getExtension(table.getId().toString(), "table.tableProfile"), TableProfile.class); if (tableProfiles != null) { @@ -687,49 +658,6 @@ public abstract class TableRepository { } - public interface TableDAO { - @SqlUpdate("INSERT INTO table_entity (json) VALUES (:json)") - void insert(@Bind("json") String json); - - @SqlUpdate("UPDATE table_entity SET json = :json WHERE id = :id") - void update(@Bind("id") String id, @Bind("json") String json); - - @SqlQuery("SELECT json FROM table_entity WHERE id = :tableId") - String findById(@Bind("tableId") String tableId); - - @SqlQuery("SELECT json FROM table_entity WHERE fullyQualifiedName = :tableFQN") - String findByFQN(@Bind("tableFQN") String tableFQN); - - @SqlQuery("SELECT count(*) FROM table_entity WHERE " + - "(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL)") - int listCount(@Bind("databaseFQN") String databaseFQN); - - @SqlQuery( - "SELECT json FROM (" + - "SELECT fullyQualifiedName, json FROM table_entity WHERE " + - "(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND " + - "fullyQualifiedName < :before " + // Pagination by table fullyQualifiedName - "ORDER BY fullyQualifiedName DESC " + // Pagination ordering by table fullyQualifiedName - "LIMIT :limit" + - ") last_rows_subquery ORDER BY fullyQualifiedName") - List listBefore(@Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit, - @Bind("before") String before); - - @SqlQuery("SELECT json FROM table_entity WHERE " + - "(fullyQualifiedName LIKE CONCAT(:databaseFQN, '.%') OR :databaseFQN IS NULL) AND "+//Filter by databaseName - "fullyQualifiedName > :after " + // Pagination by table fullyQualifiedName - "ORDER BY fullyQualifiedName " + // Pagination ordering by table fullyQualifiedName - "LIMIT :limit") - List listAfter(@Bind("databaseFQN") String databaseFQN, @Bind("limit") int limit, - @Bind("after") String after); - - @SqlQuery("SELECT EXISTS (SELECT * FROM table_entity WHERE id = :id)") - boolean exists(@Bind("id") String id); - - @SqlUpdate("DELETE FROM table_entity WHERE id = :id") - int delete(@Bind("id") String id); - } - static class TableEntityInterface implements EntityInterface { private final Table table; @@ -786,13 +714,13 @@ public abstract class TableRepository { /** * Handles entity updated from PUT and POST operation. */ - public class TableUpdater extends EntityUpdater { + public class TableUpdater extends EntityUpdater3 { final Table orig; final Table updated; public TableUpdater(Table orig, Table updated, boolean patchOperation) { - super(new TableEntityInterface(orig), new TableEntityInterface(updated), patchOperation, relationshipDAO(), - tagDAO()); + super(new TableEntityInterface(orig), new TableEntityInterface(updated), patchOperation, tableRepo3.relationshipDAO(), + tableRepo3.tagDAO()); this.orig = orig; this.updated = updated; } @@ -816,7 +744,7 @@ public abstract class TableRepository { .orElse(null); if (stored == null) { fieldsAdded.add("column:" + updated.getFullyQualifiedName()); - EntityUtil.applyTags(tagDAO(), updated.getTags(), updated.getFullyQualifiedName()); + EntityUtil.applyTags(tableRepo3.tagDAO(), updated.getTags(), updated.getFullyQualifiedName()); continue; } @@ -885,7 +813,7 @@ public abstract class TableRepository { update("column:" + origColumn.getFullyQualifiedName() + ":tags", origColumn.getTags() == null ? 0 : origColumn.getTags().size(), updatedColumn.getTags() == null ? 0 : updatedColumn.getTags().size()); - EntityUtil.applyTags(tagDAO(), updatedColumn.getTags(), updatedColumn.getFullyQualifiedName()); + EntityUtil.applyTags(tableRepo3.tagDAO(), updatedColumn.getTags(), updatedColumn.getFullyQualifiedName()); } public void store() throws IOException { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TagDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TagDAO.java new file mode 100644 index 00000000000..9e973422769 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TagDAO.java @@ -0,0 +1,69 @@ +package org.openmetadata.catalog.jdbi3; + +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; +import org.openmetadata.catalog.jdbi3.TagDAO.TagLabelMapper; +import org.openmetadata.catalog.type.TagLabel; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +@RegisterRowMapper(TagLabelMapper.class) +public interface TagDAO { + @SqlUpdate("INSERT INTO tag_category (json) VALUES (:json)") + void insertCategory(@Bind("json") String json); + + @SqlUpdate("INSERT INTO tag(json) VALUES (:json)") + void insertTag(@Bind("json") String json); + + @SqlUpdate("UPDATE tag_category SET json = :json where name = :name") + void updateCategory(@Bind("name") String name, @Bind("json") String json); + + @SqlUpdate("UPDATE tag SET json = :json where fullyQualifiedName = :fqn") + void updateTag(@Bind("fqn") String fqn, @Bind("json") String json); + + @SqlQuery("SELECT json FROM tag_category ORDER BY name") + List listCategories(); + + @SqlQuery("SELECT json FROM tag WHERE fullyQualifiedName LIKE CONCAT(:fqnPrefix, '.%') ORDER BY fullyQualifiedName") + List listChildrenTags(@Bind("fqnPrefix") String fqnPrefix); + + @SqlQuery("SELECT json FROM tag_category WHERE name = :name") + String findCategory(@Bind("name") String name); + + @SqlQuery("SELECT EXISTS (SELECT * FROM tag WHERE fullyQualifiedName = :fqn)") + boolean tagExists(@Bind("fqn") String fqn); + + @SqlQuery("SELECT json FROM tag WHERE fullyQualifiedName = :fqn") + String findTag(@Bind("fqn") String fqn); + + @SqlUpdate("INSERT IGNORE INTO tag_usage (tagFQN, targetFQN, labelType, state) VALUES (:tagFQN, :targetFQN, " + + ":labelType, :state)") + void applyTag(@Bind("tagFQN") String tagFQN, @Bind("targetFQN") String targetFQN, + @Bind("labelType") int labelType, @Bind("state") int state); + + @SqlQuery("SELECT tagFQN, labelType, state FROM tag_usage WHERE targetFQN = :targetFQN ORDER BY tagFQN") + List getTags(@Bind("targetFQN") String targetFQN); + + @SqlQuery("SELECT COUNT(*) FROM tag_usage WHERE tagFQN LIKE CONCAT(:fqnPrefix, '%')") + int getTagCount(@Bind("fqnPrefix") String fqnPrefix); + + @SqlUpdate("DELETE FROM tag_usage where targetFQN = :targetFQN") + void deleteTags(@Bind("targetFQN") String targetFQN); + + @SqlUpdate("DELETE FROM tag_usage where targetFQN LIKE CONCAT(:fqnPrefix, '%')") + void deleteTagsByPrefix(@Bind("fqnPrefix") String fqnPrefix); + + class TagLabelMapper implements RowMapper { + @Override + public TagLabel map(ResultSet r, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException { + return new TagLabel().withLabelType(TagLabel.LabelType.values()[r.getInt("labelType")]) + .withState(TagLabel.State.values()[r.getInt("state")]) + .withTagFQN(r.getString("tagFQN")); + } + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamDAO.java new file mode 100644 index 00000000000..5543957d547 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamDAO.java @@ -0,0 +1,43 @@ +package org.openmetadata.catalog.jdbi3; + + +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; + +import java.util.List; + +public interface TeamDAO { + @SqlUpdate("INSERT INTO team_entity (json) VALUES (:json)") + void insert(@Bind("json") String json); + + @SqlQuery("SELECT json FROM team_entity where id = :teamId") + String findById(@Bind("teamId") String teamId); + + @SqlQuery("SELECT json FROM team_entity where name = :name") + String findByName(@Bind("name") String name); + + @SqlQuery("SELECT count(*) FROM team_entity") + int listCount(); + + @SqlQuery( + "SELECT json FROM (" + + "SELECT name, json FROM team_entity WHERE " + + "name < :before " + // Pagination by team name + "ORDER BY name DESC " + // Pagination ordering by team name + "LIMIT :limit" + + ") last_rows_subquery ORDER BY name") + List listBefore(@Bind("limit") int limit, @Bind("before") String before); + + @SqlQuery("SELECT json FROM team_entity WHERE " + + "name > :after " + // Pagination by team name + "ORDER BY name " + // Pagination ordering by team name + "LIMIT :limit") + List listAfter(@Bind("limit") int limit, @Bind("after") String after); + + @SqlUpdate("DELETE FROM team_entity WHERE id = :teamId") + int delete(@Bind("teamId") String teamId); + + @SqlUpdate("UPDATE team_entity SET json = :json WHERE id = :id") + void update(@Bind("id") String id, @Bind("json") String json); +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java index 8c10214ecd0..0cad028bb86 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TeamRepository.java @@ -28,7 +28,6 @@ import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO; import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO; import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO; import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO; -import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ToEntityReferenceMapper3.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ToEntityReferenceMapper3.java new file mode 100644 index 00000000000..62b025aa215 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ToEntityReferenceMapper3.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.catalog.jdbi3; + +import org.jdbi.v3.core.mapper.RowMapper; +import org.openmetadata.catalog.type.EntityReference; +import org.skife.jdbi.v2.StatementContext; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.UUID; + +public class ToEntityReferenceMapper3 implements RowMapper { + @Override + public EntityReference map(ResultSet rs, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException { + return new EntityReference().withId(UUID.fromString(rs.getString("toId"))).withType(rs.getString("toEntity")); + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageDAO.java new file mode 100644 index 00000000000..2fa1b3c9f8f --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageDAO.java @@ -0,0 +1,86 @@ +package org.openmetadata.catalog.jdbi3; + +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; +import org.openmetadata.catalog.jdbi3.UsageDAO.UsageDetailsMapper; +import org.openmetadata.catalog.type.UsageDetails; +import org.openmetadata.catalog.type.UsageStats; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +@RegisterRowMapper(UsageDetailsMapper.class) +public interface UsageDAO { + @SqlUpdate("INSERT INTO entity_usage (usageDate, id, entityType, count1, count7, count30) " + + "SELECT :date, :id, :entityType, :count1, " + + "(:count1 + (SELECT COALESCE(SUM(count1), 0) FROM entity_usage WHERE id = :id AND usageDate >= :date - " + + "INTERVAL 6 DAY)), " + + "(:count1 + (SELECT COALESCE(SUM(count1), 0) FROM entity_usage WHERE id = :id AND usageDate >= :date - " + + "INTERVAL 29 DAY))") + void insert(@Bind("date") String date, @Bind("id") String id, @Bind("entityType") String entityType, @Bind( + "count1") int count1); + + @SqlUpdate("INSERT INTO entity_usage (usageDate, id, entityType, count1, count7, count30) " + + "SELECT :date, :id, :entityType, :count1, " + + "(:count1 + (SELECT COALESCE(SUM(count1), 0) FROM entity_usage WHERE id = :id AND usageDate >= :date - " + + "INTERVAL 6 DAY)), " + + "(:count1 + (SELECT COALESCE(SUM(count1), 0) FROM entity_usage WHERE id = :id AND usageDate >= :date - " + + "INTERVAL 29 DAY)) " + + "ON DUPLICATE KEY UPDATE count1 = count1 + :count1, count7 = count7 + :count1, count30 = count30 + :count1") + void insertOrUpdateCount(@Bind("date") String date, @Bind("id") String id, @Bind("entityType") String entityType, + @Bind("count1") int count1); + + @SqlQuery("SELECT id, usageDate, entityType, count1, count7, count30, " + + "percentile1, percentile7, percentile30 FROM entity_usage " + + "WHERE id = :id AND usageDate >= :date - INTERVAL :days DAY AND usageDate <= :date ORDER BY usageDate DESC") + List getUsageById(@Bind("id") String id, @Bind("date") String date, @Bind("days") int days); + + /** + * Get latest usage record + **/ + @SqlQuery("SELECT id, usageDate, entityType, count1, count7, count30, " + + "percentile1, percentile7, percentile30 FROM entity_usage " + + "WHERE usageDate IN (SELECT MAX(usageDate) FROM entity_usage WHERE id = :id) AND id = :id") + UsageDetails getLatestUsage(@Bind("id") String id); + + @SqlUpdate("DELETE FROM entity_usage WHERE id = :id") + int delete(@Bind("id") String id); + + /** + * Note not using in following percentile computation PERCENT_RANK function as unit tests use mysql5.7 and it does + * not have window function + */ + @SqlUpdate("UPDATE entity_usage u JOIN ( " + + "SELECT u1.id, " + + "(SELECT COUNT(*) FROM entity_usage as u2 WHERE u2.count1 < u1.count1 AND u2.entityType = :entityType " + + "AND u2.usageDate = :date) as p1, " + + "(SELECT COUNT(*) FROM entity_usage as u3 WHERE u3.count7 < u1.count7 AND u3.entityType = :entityType " + + "AND u3.usageDate = :date) as p7, " + + "(SELECT COUNT(*) FROM entity_usage as u4 WHERE u4.count30 < u1.count30 AND u4.entityType = :entityType " + + "AND u4.usageDate = :date) as p30, " + + "(SELECT COUNT(*) FROM entity_usage WHERE entityType = :entityType AND usageDate = :date) as total " + + "FROM entity_usage u1 WHERE u1.entityType = :entityType AND u1.usageDate = :date" + + ") vals ON u.id = vals.id AND usageDate = :date " + + "SET u.percentile1 = ROUND(100 * p1/total, 2), u.percentile7 = ROUND(p7 * 100/total, 2), u.percentile30 =" + + " ROUND(p30*100/total, 2)") + void computePercentile(@Bind("entityType") String entityType, @Bind("date") String date); + + class UsageDetailsMapper implements RowMapper { + @Override + public UsageDetails map(ResultSet r, org.jdbi.v3.core.statement.StatementContext ctx) throws SQLException { + UsageStats dailyStats = new UsageStats().withCount(r.getInt("count1")).withPercentileRank(r.getDouble( + "percentile1")); + UsageStats weeklyStats = new UsageStats().withCount(r.getInt("count7")).withPercentileRank(r.getDouble( + "percentile7")); + UsageStats monthlyStats = new UsageStats().withCount(r.getInt("count30")).withPercentileRank(r.getDouble( + "percentile30")); + return new UsageDetails().withDate(r.getString("usageDate")).withDailyStats(dailyStats) + .withWeeklyStats(weeklyStats).withMonthlyStats(monthlyStats); + } + } +} + diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageRepository.java index fde00412f8d..c0afce6e7cb 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UsageRepository.java @@ -19,7 +19,6 @@ package org.openmetadata.catalog.jdbi3; import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardDAO; import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO; import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO; -import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO; import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserDAO.java new file mode 100644 index 00000000000..877a05f84a5 --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserDAO.java @@ -0,0 +1,48 @@ +package org.openmetadata.catalog.jdbi3; + +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; + +import java.util.List; + +public interface UserDAO { + @SqlUpdate("INSERT INTO user_entity (json) VALUES (:json)") + void insert(@Bind("json") String json); + + @SqlQuery("SELECT json FROM user_entity WHERE id = :id") + String findById(@Bind("id") String id); + + @SqlQuery("SELECT json FROM user_entity WHERE name = :name") + String findByName(@Bind("name") String name); + + @SqlQuery("SELECT json FROM user_entity WHERE email = :email") + String findByEmail(@Bind("email") String email); + + @SqlQuery("SELECT json FROM user_entity") + List list(); + + @SqlQuery("SELECT count(*) FROM user_entity") + int listCount(); + + @SqlQuery( + "SELECT json FROM (" + + "SELECT name, json FROM user_entity WHERE " + + "name < :before " + // Pagination by user name + "ORDER BY name DESC " + // Pagination ordering by user name + "LIMIT :limit" + + ") last_rows_subquery ORDER BY name") + List listBefore(@Bind("limit") int limit, @Bind("before") String before); + + @SqlQuery("SELECT json FROM user_entity WHERE " + + "name > :after " + // Pagination by user name + "ORDER BY name " + // Pagination ordering by user name + "LIMIT :limit") + List listAfter(@Bind("limit") int limit, @Bind("after") String after); + + @SqlUpdate("UPDATE user_entity SET json = :json WHERE id = :id") + void update(@Bind("id") String id, @Bind("json") String json); + + @SqlQuery("SELECT EXISTS (SELECT * FROM user_entity where id = :id)") + boolean exists(@Bind("id") String id); +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java index 8682b9ff677..febe1e466b3 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/UserRepository.java @@ -27,7 +27,6 @@ import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO; import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO; import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO; import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO; -import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/CollectionRegistry.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/CollectionRegistry.java index bb6b29b1fee..4f023cf703b 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/CollectionRegistry.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/CollectionRegistry.java @@ -18,6 +18,10 @@ package org.openmetadata.catalog.resources; import io.dropwizard.setup.Environment; import io.swagger.annotations.Api; +import org.jdbi.v3.core.Jdbi; +import org.openmetadata.catalog.jdbi3.TableRepository3; +import org.openmetadata.catalog.jdbi3.TableRepositoryHelper; +import org.openmetadata.catalog.resources.databases.TableResource; import org.openmetadata.catalog.type.CollectionDescriptor; import org.openmetadata.catalog.type.CollectionInfo; import org.openmetadata.catalog.util.RestUtil; @@ -158,6 +162,20 @@ public final class CollectionRegistry { } } + /** + * Register resources from CollectionRegistry + */ + public void registerResources3(Jdbi jdbi, Environment environment, CatalogAuthorizer authorizer) { + LOG.info("Initializing jdbi3"); + Class repositoryClz = TableRepository3.class; + final TableRepository3 daoObject = (TableRepository3) jdbi.onDemand(repositoryClz); + TableRepositoryHelper helper = new TableRepositoryHelper(daoObject); + TableResource resource = new TableResource(helper, authorizer); + environment.jersey().register(resource); + LOG.info("Registering {}", resource); + LOG.info("Initialized jdbi3"); + } + /** Get collection details based on annotations in Resource classes */ private static CollectionDetails getCollection(Class cl) { String href, doc, name, repoClass; diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java index d869c576e30..04a7629618a 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java @@ -28,7 +28,8 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.responses.ApiResponse; import org.openmetadata.catalog.api.data.CreateTable; import org.openmetadata.catalog.entity.data.Table; -import org.openmetadata.catalog.jdbi3.TableRepository; +import org.openmetadata.catalog.jdbi3.TableRepository3; +import org.openmetadata.catalog.jdbi3.TableRepositoryHelper; import org.openmetadata.catalog.resources.Collection; import org.openmetadata.catalog.security.CatalogAuthorizer; import org.openmetadata.catalog.security.SecurityUtil; @@ -38,7 +39,6 @@ import org.openmetadata.catalog.type.TableJoins; import org.openmetadata.catalog.type.TableProfile; import org.openmetadata.catalog.util.EntityUtil; import org.openmetadata.catalog.util.EntityUtil.Fields; -import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.RestUtil; import org.openmetadata.catalog.util.RestUtil.PutResponse; import org.openmetadata.catalog.util.ResultList; @@ -80,11 +80,11 @@ import java.util.UUID; @Api(value = "Tables collection", tags = "Tables collection") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -@Collection(name = "tables", repositoryClass = "org.openmetadata.catalog.jdbi3.TableRepository") +//@Collection(name = "tables", repositoryClass = "org.openmetadata.catalog.jdbi3.TableRepository3") public class TableResource { private static final Logger LOG = LoggerFactory.getLogger(TableResource.class); private static final String TABLE_COLLECTION_PATH = "v1/tables/"; - private final TableRepository dao; + private final TableRepositoryHelper dao; private final CatalogAuthorizer authorizer; public static void addHref(UriInfo uriInfo, EntityReference ref) { @@ -102,8 +102,8 @@ public class TableResource { } @Inject - public TableResource(TableRepository dao, CatalogAuthorizer authorizer) { - Objects.requireNonNull(dao, "TableRepository must not be null"); + public TableResource(TableRepositoryHelper dao, CatalogAuthorizer authorizer) { + Objects.requireNonNull(dao, "TableRepository3 must not be null"); this.dao = dao; this.authorizer = authorizer; } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUpdater.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUpdater.java index 49dcbbb6c6b..e88577827b8 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUpdater.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUpdater.java @@ -2,6 +2,7 @@ package org.openmetadata.catalog.util; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.jdbi3.EntityRelationshipDAO; +import org.openmetadata.catalog.jdbi3.TableRepository3; import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO; import org.openmetadata.catalog.type.EntityReference; import org.slf4j.Logger; @@ -21,7 +22,7 @@ import java.util.List; * of the entity. * * Concrete implementations need to implement update for other fields. See - * {@link org.openmetadata.catalog.jdbi3.TableRepository.TableUpdater} for an example implementation. + * {@link TableRepository3.TableUpdater} for an example implementation. */ public abstract class EntityUpdater { private static final Logger LOG = LoggerFactory.getLogger(EntityUpdater.class); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUpdater3.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUpdater3.java new file mode 100644 index 00000000000..b3a9cc52c2b --- /dev/null +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUpdater3.java @@ -0,0 +1,130 @@ +package org.openmetadata.catalog.util; + +import org.openmetadata.catalog.Entity; +import org.openmetadata.catalog.jdbi3.EntityRelationshipDAO3; +import org.openmetadata.catalog.jdbi3.TableRepository3; +import org.openmetadata.catalog.jdbi3.TagDAO; +import org.openmetadata.catalog.type.EntityReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Used for updating the following common entity fields in PUT and PATCH operations. + * - description + * - tags + * - owner + * + * This class handles tracking all the changes in an update operation and also versioning + * of the entity. + * + * Concrete implementations need to implement update for other fields. See + * {@link TableRepository3.TableUpdater} for an example implementation. + */ +public abstract class EntityUpdater3 { + private static final Logger LOG = LoggerFactory.getLogger(EntityUpdater3.class); + private final EntityInterface originalEntity; + private final EntityInterface updatedEntity; + private final EntityRelationshipDAO3 relationshipDAO; + private final TagDAO tagDAO; + + protected final boolean patchOperation; + protected List fieldsUpdated = new ArrayList<>(); + protected List fieldsAdded = new ArrayList<>(); + protected List fieldsDeleted = new ArrayList<>(); + protected boolean majorVersionChange = false; + + public EntityUpdater3(EntityInterface originalEntity, EntityInterface updatedEntity, boolean patchOperation, + EntityRelationshipDAO3 relationshipDAO, TagDAO tagDAO) { + this.originalEntity = originalEntity; + this.updatedEntity = updatedEntity; + this.patchOperation = patchOperation; + this.relationshipDAO = relationshipDAO; + this.tagDAO = tagDAO; + } + + public void updateAll() throws IOException { + updateDescription(); + updateDisplayName(); + updateOwner(); + if (tagDAO != null) { + updateTags(); // If tagDAO != null, the Entity supports tags + } + } + + private void updateDescription() { + if (!patchOperation && + originalEntity.getDescription() != null && !originalEntity.getDescription().isEmpty()) { + // Update description only when stored is empty to retain user authored descriptions + updatedEntity.setDescription(originalEntity.getDescription()); + return; + } + update("description", originalEntity.getDescription(), updatedEntity.getDescription()); + } + + private void updateDisplayName() { + if (!patchOperation && + originalEntity.getDisplayName() != null && !originalEntity.getDisplayName().isEmpty()) { + // Update displayName only when stored is empty to retain user authored descriptions + updatedEntity.setDisplayName(originalEntity.getDisplayName()); + return; + } + update("displayName", originalEntity.getDisplayName(), updatedEntity.getDisplayName()); + } + + private void updateOwner() { + EntityReference origOwner = originalEntity.getOwner(); + EntityReference updatedOwner = updatedEntity.getOwner(); + if (update("owner", origOwner == null ? null : origOwner.getId(), + updatedOwner == null ? null : updatedOwner.getId())) { + EntityUtil.updateOwner(relationshipDAO, origOwner, updatedOwner, originalEntity.getId(), Entity.TABLE); + } + } + + private void updateTags() throws IOException { + // Remove current table tags in the database. It will be added back later from the merged tag list. + EntityUtil.removeTagsByPrefix(tagDAO, originalEntity.getFullyQualifiedName()); + if (!patchOperation) { + // PUT operation merges tags in the request with what already exists + updatedEntity.setTags(EntityUtil.mergeTags(updatedEntity.getTags(), originalEntity.getTags())); + } + + update("tags", originalEntity.getTags() == null ? 0 : originalEntity.getTags().size(), + updatedEntity.getTags() == null ? 0 : updatedEntity.getTags().size()); + EntityUtil.applyTags(tagDAO, updatedEntity.getTags(), updatedEntity.getFullyQualifiedName()); + } + + public Double getNewVersion(Double oldVersion) { + Double newVersion = oldVersion; + if (majorVersionChange) { + newVersion = oldVersion + 1.0; + } else if (!fieldsUpdated.isEmpty() || !fieldsAdded.isEmpty() || !fieldsDeleted.isEmpty()) { + newVersion = oldVersion + 0.1; + } + LOG.info("{}->{} - Fields added {}, updated {}, deleted {}", + oldVersion, newVersion, fieldsAdded, fieldsUpdated, fieldsDeleted); + return newVersion; + } + + public abstract void store() throws IOException; + + protected boolean update(String field, Object orig, Object updated) { + if (orig == null && updated == null) { + return false; + } + if (orig == null) { + fieldsAdded.add(field); + return true; + } else if (updated == null) { + fieldsDeleted.add(field); + return true; + } else if (!orig.equals(updated)) { + fieldsUpdated.add(field); + return true; + } + return false; + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index 6ecf4a8627f..02f471885d3 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -38,6 +38,7 @@ import org.openmetadata.catalog.exception.EntityNotFoundException; import org.openmetadata.catalog.jdbi3.ChartRepository.ChartDAO; import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardDAO; import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO; +import org.openmetadata.catalog.jdbi3.EntityRelationshipDAO3; import org.openmetadata.catalog.jdbi3.EntityRepository; import org.openmetadata.catalog.jdbi3.EntityRelationshipDAO; import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO; @@ -45,7 +46,8 @@ import org.openmetadata.catalog.jdbi3.ModelRepository.ModelDAO; import org.openmetadata.catalog.jdbi3.PipelineRepository.PipelineDAO; import org.openmetadata.catalog.jdbi3.Relationship; import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO; -import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO; +import org.openmetadata.catalog.jdbi3.TableDAO; +import org.openmetadata.catalog.jdbi3.TableDAO3; import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO; import org.openmetadata.catalog.jdbi3.TaskRepository.TaskDAO; import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO; @@ -183,6 +185,12 @@ public final class EntityUtil { } } + public static void validateUser(org.openmetadata.catalog.jdbi3.UserDAO userDAO, String userId) { + if (!userDAO.exists(userId)) { + throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Entity.USER, userId)); + } + } + // Get owner for a given entity public static EntityReference populateOwner(UUID id, EntityRelationshipDAO entityRelationshipDAO, UserDAO userDAO, TeamDAO teamDAO) throws IOException { @@ -193,6 +201,16 @@ public final class EntityUtil { return ids.isEmpty() ? null : EntityUtil.populateOwner(userDAO, teamDAO, ids.get(0)); } + // Get owner for a given entity + public static EntityReference populateOwner(UUID id, EntityRelationshipDAO3 entityRelationshipDAO, org.openmetadata.catalog.jdbi3.UserDAO userDAO, + org.openmetadata.catalog.jdbi3.TeamDAO teamDAO) throws IOException { + List ids = entityRelationshipDAO.findFrom(id.toString(), Relationship.OWNS.ordinal()); + if (ids.size() > 1) { + LOG.warn("Possible database issues - multiple owners {} found for entity {}", ids, id); + } + return ids.isEmpty() ? null : EntityUtil.populateOwner(userDAO, teamDAO, ids.get(0)); + } + /** * For given Owner with Id and Type that can be either team or user, * validate Owner ID and return fully populated Owner @@ -219,6 +237,28 @@ public final class EntityUtil { return owner; } + public static EntityReference populateOwner(org.openmetadata.catalog.jdbi3.UserDAO userDAO, org.openmetadata.catalog.jdbi3.TeamDAO teamDAO, + EntityReference owner) + throws IOException { + if (owner == null) { + return null; + } + String id = owner.getId().toString(); + if (owner.getType().equalsIgnoreCase("user")) { + User ownerInstance = EntityUtil.validate(id, userDAO.findById(id), User.class); + owner.setName(ownerInstance.getName()); + if (Optional.ofNullable(ownerInstance.getDeactivated()).orElse(false)) { + throw new IllegalArgumentException(CatalogExceptionMessage.deactivatedUser(id)); + } + } else if (owner.getType().equalsIgnoreCase("team")) { + Team ownerInstance = EntityUtil.validate(id, teamDAO.findById(id), Team.class); + owner.setDescription(ownerInstance.getDescription()); + owner.setName(ownerInstance.getName()); + } else { + throw new IllegalArgumentException(String.format("Invalid ownerType %s", owner.getType())); + } + return owner; + } public static void setOwner(EntityRelationshipDAO dao, UUID ownedEntityId, String ownedEntityType, EntityReference owner) { // Add relationship owner --- owns ---> ownedEntity @@ -229,6 +269,16 @@ public final class EntityUtil { } } + public static void setOwner(EntityRelationshipDAO3 dao, UUID ownedEntityId, String ownedEntityType, + EntityReference owner) { + // Add relationship owner --- owns ---> ownedEntity + if (owner != null) { + LOG.info("Adding owner {}:{} for entity {}", owner.getType(), owner.getId(), ownedEntityId); + dao.insert(owner.getId().toString(), ownedEntityId.toString(), owner.getType(), ownedEntityType, + Relationship.OWNS.ordinal()); + } + } + /** * Unassign owner relationship for a given entity */ @@ -240,6 +290,17 @@ public final class EntityUtil { } } + /** + * Unassign owner relationship for a given entity + */ + public static void unassignOwner(EntityRelationshipDAO3 dao, EntityReference owner, String ownedEntityId) { + if (owner != null && owner.getId() != null) { + LOG.info("Removing owner {}:{} for entity {}", owner.getType(), owner.getId(), + ownedEntityId); + dao.delete(owner.getId().toString(), ownedEntityId, Relationship.OWNS.ordinal()); + } + } + public static void updateOwner(EntityRelationshipDAO dao, EntityReference originalOwner, EntityReference newOwner, UUID ownedEntityId, String ownedEntityType) { // TODO inefficient use replace instead of delete and add? @@ -248,6 +309,14 @@ public final class EntityUtil { setOwner(dao, ownedEntityId, ownedEntityType, newOwner); } + public static void updateOwner(EntityRelationshipDAO3 dao, EntityReference originalOwner, EntityReference newOwner, + UUID ownedEntityId, String ownedEntityType) { + // TODO inefficient use replace instead of delete and add? + // TODO check for orig and new owners being the same + unassignOwner(dao, originalOwner, ownedEntityId.toString()); + setOwner(dao, ownedEntityId, ownedEntityType, newOwner); + } + public static List getEntityReference(List list, TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, DashboardDAO dashboardDAO, ReportDAO reportDAO, @@ -263,6 +332,22 @@ public final class EntityUtil { return list; } + public static List getEntityReference(List list, TableDAO3 tableDAO, + DatabaseDAO databaseDAO, MetricsDAO metricsDAO, + DashboardDAO dashboardDAO, ReportDAO reportDAO, + TopicDAO topicDAO, ChartDAO chartDAO, + TaskDAO taskDAO, ModelDAO modelDAO, + PipelineDAO pipelineDAO) throws IOException { + for (EntityReference ref : list) { + getEntityReference( + ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, + topicDAO, chartDAO, taskDAO, modelDAO, pipelineDAO + ); + } + return list; + } + + public static EntityReference getEntityReference(EntityReference ref, TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, DashboardDAO dashboardDAO, ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO, @@ -305,6 +390,48 @@ public final class EntityUtil { throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entity)); } + public static EntityReference getEntityReference(EntityReference ref, TableDAO3 tableDAO3, DatabaseDAO databaseDAO, + MetricsDAO metricsDAO, DashboardDAO dashboardDAO, + ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO, + TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO) + throws IOException { + // Note href to entity reference is not added here + String entity = ref.getType(); + String id = ref.getId().toString(); + if (entity.equalsIgnoreCase(Entity.TABLE)) { + Table instance = EntityUtil.validate(id, tableDAO3.findById(id), Table.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } else if (entity.equalsIgnoreCase(Entity.DATABASE)) { + Database instance = EntityUtil.validate(id, databaseDAO.findById(id), Database.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } else if (entity.equalsIgnoreCase(Entity.METRICS)) { + Metrics instance = EntityUtil.validate(id, metricsDAO.findById(id), Metrics.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } else if (entity.equalsIgnoreCase(Entity.DASHBOARD)) { + Dashboard instance = EntityUtil.validate(id, dashboardDAO.findById(id), Dashboard.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } else if (entity.equalsIgnoreCase(Entity.REPORT)) { + Report instance = EntityUtil.validate(id, reportDAO.findById(id), Report.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } else if (entity.equalsIgnoreCase(Entity.TOPIC)) { + Topic instance = EntityUtil.validate(id, topicDAO.findById(id), Topic.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } else if (entity.equalsIgnoreCase(Entity.CHART)) { + Chart instance = EntityUtil.validate(id, chartDAO.findById(id), Chart.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } else if (entity.equalsIgnoreCase(Entity.TASK)) { + Task instance = EntityUtil.validate(id, taskDAO.findById(id), Task.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } else if (entity.equalsIgnoreCase(Entity.PIPELINE)) { + Pipeline instance = EntityUtil.validate(id, pipelineDAO.findById(id), Pipeline.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } else if (entity.equalsIgnoreCase(Entity.MODEL)) { + Model instance = EntityUtil.validate(id, modelDAO.findById(id), Model.class); + return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName()); + } + throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entity)); + } + public static EntityReference getEntityReference(String entity, UUID id, TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, DashboardDAO dashboardDAO, ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO, @@ -315,6 +442,16 @@ public final class EntityUtil { reportDAO, topicDAO, chartDAO, taskDAO, modelDAO, pipelineDAO); } + public static EntityReference getEntityReference(String entity, UUID id, TableDAO3 tableDAO3, DatabaseDAO databaseDAO, + MetricsDAO metricsDAO, DashboardDAO dashboardDAO, + ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO, + TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO) + throws IOException { + EntityReference ref = new EntityReference().withId(id).withType(entity); + return getEntityReference(ref, tableDAO3, databaseDAO, metricsDAO, dashboardDAO, + reportDAO, topicDAO, chartDAO, taskDAO, modelDAO, pipelineDAO); + } + public static EntityReference getEntityReferenceByName(String entity, String fqn, TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO, @@ -322,7 +459,47 @@ public final class EntityUtil { PipelineDAO pipelineDAO) throws IOException { if (entity.equalsIgnoreCase(Entity.TABLE)) { - Table instance = EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class); + Table instance = EntityUtil.validate(fqn, tableDAO.findByFqn(fqn), Table.class); + return getEntityReference(instance); + } else if (entity.equalsIgnoreCase(Entity.DATABASE)) { + Database instance = EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class); + return getEntityReference(instance); + } else if (entity.equalsIgnoreCase(Entity.METRICS)) { + Metrics instance = EntityUtil.validate(fqn, metricsDAO.findByFQN(fqn), Metrics.class); + return getEntityReference(instance); + } else if (entity.equalsIgnoreCase(Entity.REPORT)) { + Report instance = EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class); + return getEntityReference(instance); + } else if (entity.equalsIgnoreCase(Entity.TOPIC)) { + Topic instance = EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class); + return getEntityReference(instance); + } else if (entity.equalsIgnoreCase(Entity.CHART)) { + Chart instance = EntityUtil.validate(fqn, chartDAO.findByFQN(fqn), Chart.class); + return getEntityReference(instance); + } else if (entity.equalsIgnoreCase(Entity.DASHBOARD)) { + Dashboard instance = EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class); + return getEntityReference(instance); + } else if (entity.equalsIgnoreCase(Entity.TASK)) { + Task instance = EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class); + return getEntityReference(instance); + } else if (entity.equalsIgnoreCase(Entity.PIPELINE)) { + Pipeline instance = EntityUtil.validate(fqn, pipelineDAO.findByFQN(fqn), Pipeline.class); + return getEntityReference(instance); + } else if (entity.equalsIgnoreCase(Entity.MODEL)) { + Model instance = EntityUtil.validate(fqn, modelDAO.findByFQN(fqn), Model.class); + return getEntityReference(instance); + } + throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn)); + } + + public static EntityReference getEntityReferenceByName(String entity, String fqn, TableDAO3 tableDAO3, + DatabaseDAO databaseDAO, MetricsDAO metricsDAO, + ReportDAO reportDAO, TopicDAO topicDAO, ChartDAO chartDAO, + DashboardDAO dashboardDAO, TaskDAO taskDAO, ModelDAO modelDAO, + PipelineDAO pipelineDAO) + throws IOException { + if (entity.equalsIgnoreCase(Entity.TABLE)) { + Table instance = EntityUtil.validate(fqn, tableDAO3.findByFqn(fqn), Table.class); return getEntityReference(instance); } else if (entity.equalsIgnoreCase(Entity.DATABASE)) { Database instance = EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class); @@ -430,7 +607,7 @@ public final class EntityUtil { } public static EntityReference validateEntityLink(EntityLink entityLink, UserDAO userDAO, TeamDAO teamDAO, - TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, + TableDAO3 tableDAO3, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, DashboardDAO dashboardDAO, ReportDAO reportDAO, TopicDAO topicDAO, TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO) throws IOException { @@ -441,7 +618,7 @@ public final class EntityUtil { } else if (entityType.equalsIgnoreCase(Entity.TEAM)) { return getEntityReference(EntityUtil.validate(fqn, teamDAO.findByName(fqn), Team.class)); } else if (entityType.equalsIgnoreCase(Entity.TABLE)) { - return getEntityReference(EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class)); + return getEntityReference(EntityUtil.validate(fqn, tableDAO3.findByFqn(fqn), Table.class)); } else if (entityType.equalsIgnoreCase(Entity.DATABASE)) { return getEntityReference(EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class)); } else if (entityType.equalsIgnoreCase(Entity.METRICS)) { @@ -463,6 +640,39 @@ public final class EntityUtil { } } + public static EntityReference validateEntityLink(EntityLink entityLink, UserDAO userDAO, TeamDAO teamDAO, + TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO, + DashboardDAO dashboardDAO, ReportDAO reportDAO, TopicDAO topicDAO, + TaskDAO taskDAO, ModelDAO modelDAO, PipelineDAO pipelineDAO) + throws IOException { + String entityType = entityLink.getEntityType(); + String fqn = entityLink.getEntityId(); + if (entityType.equalsIgnoreCase(Entity.USER)) { + return getEntityReference(EntityUtil.validate(fqn, userDAO.findByName(fqn), User.class)); + } else if (entityType.equalsIgnoreCase(Entity.TEAM)) { + return getEntityReference(EntityUtil.validate(fqn, teamDAO.findByName(fqn), Team.class)); + } else if (entityType.equalsIgnoreCase(Entity.TABLE)) { + return getEntityReference(EntityUtil.validate(fqn, tableDAO.findByFqn(fqn), Table.class)); + } else if (entityType.equalsIgnoreCase(Entity.DATABASE)) { + return getEntityReference(EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class)); + } else if (entityType.equalsIgnoreCase(Entity.METRICS)) { + return getEntityReference(EntityUtil.validate(fqn, metricsDAO.findByFQN(fqn), Metrics.class)); + } else if (entityType.equalsIgnoreCase(Entity.DASHBOARD)) { + return getEntityReference(EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class)); + } else if (entityType.equalsIgnoreCase(Entity.REPORT)) { + return getEntityReference(EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class)); + } else if (entityType.equalsIgnoreCase(Entity.TOPIC)) { + return getEntityReference(EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class)); + } else if (entityType.equalsIgnoreCase(Entity.TASK)) { + return getEntityReference(EntityUtil.validate(fqn, taskDAO.findByFQN(fqn), Task.class)); + } else if (entityType.equalsIgnoreCase(Entity.PIPELINE)) { + return getEntityReference(EntityUtil.validate(fqn, pipelineDAO.findByFQN(fqn), Pipeline.class)); + } else if (entityType.equalsIgnoreCase(Entity.MODEL)) { + return getEntityReference(EntityUtil.validate(fqn, modelDAO.findByFQN(fqn), Model.class)); + } else { + throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entityType, fqn)); + } + } public static UsageDetails getLatestUsage(UsageDAO usageDAO, UUID entityId) { LOG.debug("Getting latest usage for {}", entityId); @@ -476,6 +686,18 @@ public final class EntityUtil { return details; } + public static UsageDetails getLatestUsage(org.openmetadata.catalog.jdbi3.UsageDAO usageDAO, UUID entityId) { + LOG.debug("Getting latest usage for {}", entityId); + UsageDetails details = usageDAO.getLatestUsage(entityId.toString()); + if (details == null) { + LOG.debug("Usage details not found. Sending default usage"); + UsageStats stats = new UsageStats().withCount(0).withPercentileRank(0.0); + details = new UsageDetails().withDailyStats(stats).withWeeklyStats(stats).withMonthlyStats(stats) + .withDate(RestUtil.DATE_FORMAT.format(new Date())); + } + return details; + } + public static EntityReference getEntityReference(Pipeline pipeline) { return new EntityReference().withDescription(pipeline.getDescription()).withId(pipeline.getId()) .withName(pipeline.getFullyQualifiedName()).withType(Entity.PIPELINE); @@ -570,6 +792,28 @@ public final class EntityUtil { } } + /** + * Apply tags {@code tagLabels} to the entity or field identified by {@code targetFQN} + */ + public static void applyTags(org.openmetadata.catalog.jdbi3.TagDAO tagDAO, List tagLabels, String targetFQN) throws IOException { + for (TagLabel tagLabel : Optional.ofNullable(tagLabels).orElse(Collections.emptyList())) { + String json = tagDAO.findTag(tagLabel.getTagFQN()); + if (json == null) { + // Invalid TagLabel + throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Tag.class.getSimpleName(), + tagLabel.getTagFQN())); + } + Tag tag = JsonUtils.readValue(json, Tag.class); + + // Apply tagLabel to targetFQN that identifies an entity or field + tagDAO.applyTag(tagLabel.getTagFQN(), targetFQN, tagLabel.getLabelType().ordinal(), + tagLabel.getState().ordinal()); + + // Apply derived tags + List derivedTags = getDerivedTags(tagLabel, tag); + applyTags(tagDAO, derivedTags, targetFQN); + } + } public static List getDerivedTags(TagLabel tagLabel, Tag tag) { List derivedTags = new ArrayList<>(); for (String fqn : Optional.ofNullable(tag.getAssociatedTags()).orElse(Collections.emptyList())) { @@ -600,6 +844,27 @@ public final class EntityUtil { return updatedTagLabels; } + /** + * Validate given list of tags and add derived tags to it + */ + public static List addDerivedTags(org.openmetadata.catalog.jdbi3.TagDAO tagDAO, List tagLabels) throws IOException { + List updatedTagLabels = new ArrayList<>(); + for (TagLabel tagLabel : Optional.ofNullable(tagLabels).orElse(Collections.emptyList())) { + String json = tagDAO.findTag(tagLabel.getTagFQN()); + if (json == null) { + // Invalid TagLabel + throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(Tag.class.getSimpleName(), + tagLabel.getTagFQN())); + } + Tag tag = JsonUtils.readValue(json, Tag.class); + updatedTagLabels.add(tagLabel); + + // Apply derived tags + List derivedTags = getDerivedTags(tagLabel, tag); + updatedTagLabels = EntityUtil.mergeTags(updatedTagLabels, derivedTags); + } + return updatedTagLabels; + } public static void removeTags(TagDAO tagDAO, String fullyQualifiedName) { tagDAO.deleteTags(fullyQualifiedName); } @@ -608,6 +873,10 @@ public final class EntityUtil { tagDAO.deleteTagsByPrefix(fullyQualifiedName); } + public static void removeTagsByPrefix(org.openmetadata.catalog.jdbi3.TagDAO tagDAO, String fullyQualifiedName) { + tagDAO.deleteTagsByPrefix(fullyQualifiedName); + } + public static List mergeTags(List list1, List list2) { List mergedTags = Stream.concat(Optional.ofNullable(list1).orElse(Collections.emptyList()).stream(), Optional.ofNullable(list2).orElse(Collections.emptyList()).stream()) @@ -640,10 +909,26 @@ public final class EntityUtil { Relationship.FOLLOWS.ordinal()) > 0; } + public static boolean addFollower(EntityRelationshipDAO3 dao, org.openmetadata.catalog.jdbi3.UserDAO userDAO, + String followedEntityId, + String followedEntityType, String followerId, String followerEntity) + throws IOException { + User user = EntityUtil.validate(followerId, userDAO.findById(followerId), User.class); + if (Optional.ofNullable(user.getDeactivated()).orElse(false)) { + throw new IllegalArgumentException(CatalogExceptionMessage.deactivatedUser(followerId)); + } + return dao.insert(followerId, followedEntityId, followerEntity, followedEntityType, + Relationship.FOLLOWS.ordinal()) > 0; + } + public static void removeFollower(EntityRelationshipDAO dao, String followedEntityId, String followerId) { dao.delete(followerId, followedEntityId, Relationship.FOLLOWS.ordinal()); } + public static void removeFollower(EntityRelationshipDAO3 dao, String followedEntityId, String followerId) { + dao.delete(followerId, followedEntityId, Relationship.FOLLOWS.ordinal()); + } + public static List getFollowers(UUID followedEntityId, EntityRelationshipDAO entityRelationshipDAO, UserDAO userDAO) throws IOException { List followerIds = entityRelationshipDAO.findFrom(followedEntityId.toString(), @@ -657,6 +942,18 @@ public final class EntityUtil { return followers; } + public static List getFollowers(UUID followedEntityId, EntityRelationshipDAO3 entityRelationshipDAO, + org.openmetadata.catalog.jdbi3.UserDAO userDAO) throws IOException { + List followerIds = entityRelationshipDAO.findFrom(followedEntityId.toString(), + Relationship.FOLLOWS.ordinal(), + Entity.USER); + List followers = new ArrayList<>(); + for (String followerId : followerIds) { + User user = EntityUtil.validate(followerId, userDAO.findById(followerId), User.class); + followers.add(new EntityReference().withName(user.getName()).withId(user.getId()).withType("user")); + } + return followers; + } public static class Fields { private final List fieldList; diff --git a/pom.xml b/pom.xml index 242d30e410c..e674fcd4d1d 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,7 @@ 2.12.3 2.0.23 2.0.0-rc9 + 2.0.25 2.33 2.33 2.33 @@ -61,6 +62,7 @@ 2.15.0 0.8.6 2.78 + 3.23.0 1.4 2.10.0 7.11.0 @@ -137,11 +139,21 @@ dropwizard-jdbi ${dropwizard-jdbi.version} + + io.dropwizard + dropwizard-jdbi3 + ${dropwizard-jdbi3.version} + org.jdbi jdbi ${jdbi.version} + + org.jdbi + jdbi3-core + ${jdbi3.version} + commons-cli commons-cli