From fd403bae9a88f1b0cccd6f91c419e2418a755cef Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 14 Mar 2024 06:37:38 +0100 Subject: [PATCH] MINOR - Review query performance (#15553) * MINOR - Review query performance * MINOR - Review query performance * MINOR - Review query performance * MINOR - Review query performance --- .../native/1.4.0/mysql/schemaChanges.sql | 48 ++++++- .../native/1.4.0/postgres/schemaChanges.sql | 48 ++++++- .../src/metadata/ingestion/ometa/client.py | 3 + .../src/metadata/ingestion/ometa/ometa_api.py | 7 +- .../service/jdbi3/CollectionDAO.java | 126 ++++++++++-------- .../openmetadata/service/jdbi3/EntityDAO.java | 32 +++-- .../service/util/OpenMetadataOperations.java | 32 ++++- 7 files changed, 230 insertions(+), 66 deletions(-) diff --git a/bootstrap/sql/migrations/native/1.4.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.4.0/mysql/schemaChanges.sql index d47997009b3..38a7bdc8811 100644 --- a/bootstrap/sql/migrations/native/1.4.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.4.0/mysql/schemaChanges.sql @@ -5,4 +5,50 @@ WHERE serviceType = 'MongoDB'; ALTER TABLE query_entity ADD COLUMN checksum VARCHAR(32) GENERATED ALWAYS AS (json ->> '$.checksum') NOT NULL UNIQUE; -UPDATE query_entity SET json = JSON_INSERT(json, '$.checksum', MD5(JSON_UNQUOTE(JSON_EXTRACT(json, '$.checksum')))); \ No newline at end of file +UPDATE query_entity SET json = JSON_INSERT(json, '$.checksum', MD5(JSON_UNQUOTE(JSON_EXTRACT(json, '$.checksum')))); + +ALTER TABLE chart_entity ADD INDEX index_chart_entity_deleted(fqnHash, deleted); +ALTER TABLE dashboard_data_model_entity ADD INDEX index_dashboard_data_model_entity_deleted(fqnHash, deleted); +ALTER TABLE dashboard_entity ADD INDEX index_dashboard_entity_deleted(fqnHash, deleted); +ALTER TABLE data_insight_chart ADD INDEX index_data_insight_chart_deleted(fqnHash, deleted); +ALTER TABLE database_entity ADD INDEX index_database_entity_deleted(fqnHash, deleted); +ALTER TABLE database_schema_entity ADD INDEX index_database_schema_entity_deleted(fqnHash, deleted); +ALTER TABLE glossary_term_entity ADD INDEX index_glossary_term_entity_deleted(fqnHash, deleted); +ALTER TABLE ingestion_pipeline_entity ADD INDEX index_ingestion_pipeline_entity_deleted(fqnHash, deleted); +ALTER TABLE metric_entity ADD INDEX index_metric_entity_deleted(fqnHash, deleted); +ALTER TABLE ml_model_entity ADD INDEX index_ml_model_entity_deleted(fqnHash, deleted); +ALTER TABLE pipeline_entity ADD INDEX index_pipeline_entity_deleted(fqnHash, deleted); +ALTER TABLE policy_entity ADD INDEX index_policy_entity_deleted(fqnHash, deleted); +ALTER TABLE report_entity ADD INDEX index_report_entity_deleted(fqnHash, deleted); +ALTER TABLE search_index_entity ADD INDEX index_search_index_entity_deleted(fqnHash, deleted); +ALTER TABLE storage_container_entity ADD INDEX index_storage_container_entity_deleted(fqnHash, deleted); +ALTER TABLE stored_procedure_entity ADD INDEX index_stored_procedure_entity_deleted(fqnHash, deleted); +ALTER TABLE table_entity ADD INDEX index_table_entity_deleted(fqnHash, deleted); +ALTER TABLE tag ADD INDEX index_tag_deleted(fqnHash, deleted); +ALTER TABLE test_case ADD INDEX index_test_case_deleted(fqnHash, deleted); +ALTER TABLE test_suite ADD INDEX index_test_suite_deleted(fqnHash, deleted); +ALTER TABLE topic_entity ADD INDEX index_topic_entity_deleted(fqnHash, deleted); +ALTER TABLE web_analytic_event ADD INDEX index_web_analytic_event_deleted(fqnHash, deleted); + +ALTER TABLE apps_marketplace ADD INDEX index_apps_marketplace_deleted(nameHash, deleted); +ALTER TABLE bot_entity ADD INDEX index_bot_entity_deleted(nameHash, deleted); +ALTER TABLE classification ADD INDEX index_classification_deleted(nameHash, deleted); +ALTER TABLE dashboard_service_entity ADD INDEX index_dashboard_service_entity_deleted(nameHash, deleted); +ALTER TABLE dbservice_entity ADD INDEX index_dbservice_entity_deleted(nameHash, deleted); +ALTER TABLE glossary_entity ADD INDEX index_glossary_entity_deleted(nameHash, deleted); +ALTER TABLE installed_apps ADD INDEX index_installed_apps_deleted(nameHash, deleted); +ALTER TABLE knowledge_center ADD INDEX index_knowledge_center_deleted(nameHash, deleted); +ALTER TABLE kpi_entity ADD INDEX index_kpi_entity_deleted(nameHash, deleted); +ALTER TABLE messaging_service_entity ADD INDEX index_messaging_service_entity_deleted(nameHash, deleted); +ALTER TABLE metadata_service_entity ADD INDEX index_metadata_service_entity_deleted(nameHash, deleted); +ALTER TABLE mlmodel_service_entity ADD INDEX index_mlmodel_service_entity_deleted(nameHash, deleted); +ALTER TABLE pipeline_service_entity ADD INDEX index_pipeline_service_entity_deleted(nameHash, deleted); +ALTER TABLE role_entity ADD INDEX index_role_entity_deleted(nameHash, deleted); +ALTER TABLE search_service_entity ADD INDEX index_search_service_entity_deleted(nameHash, deleted); +ALTER TABLE storage_service_entity ADD INDEX index_storage_service_entity_deleted(nameHash, deleted); +ALTER TABLE team_entity ADD INDEX index_team_entity_deleted(nameHash, deleted); +ALTER TABLE user_entity ADD INDEX index_user_entity_deleted(nameHash, deleted); + +ALTER TABLE apps_extension_time_series ADD INDEX apps_extension_time_series_index(appId); +ALTER TABLE suggestions ADD INDEX index_suggestions_type(suggestionType); +ALTER TABLE suggestions ADD INDEX index_suggestions_status(status); diff --git a/bootstrap/sql/migrations/native/1.4.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.4.0/postgres/schemaChanges.sql index bcab92ec586..6eb94adca62 100644 --- a/bootstrap/sql/migrations/native/1.4.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.4.0/postgres/schemaChanges.sql @@ -6,4 +6,50 @@ WHERE serviceType = 'MongoDB'; ALTER TABLE query_entity ADD COLUMN checksum varchar(32) GENERATED ALWAYS AS (json ->> 'checksum') STORED NOT NULL, ADD UNIQUE(checksum); -UPDATE query_entity SET json = jsonb_set(json::jsonb, '{checksum}', MD5(json->'connection')); \ No newline at end of file +UPDATE query_entity SET json = jsonb_set(json::jsonb, '{checksum}', MD5(json->'connection')); + +CREATE INDEX index_chart_entity_deleted ON chart_entity (fqnHash, deleted); +CREATE INDEX index_dashboard_data_model_entity_deleted ON dashboard_data_model_entity (fqnHash, deleted); +CREATE INDEX index_dashboard_entity_deleted ON dashboard_entity (fqnHash, deleted); +CREATE INDEX index_data_insight_chart_deleted ON data_insight_chart (fqnHash, deleted); +CREATE INDEX index_database_entity_deleted ON database_entity (fqnHash, deleted); +CREATE INDEX index_database_schema_entity_deleted ON database_schema_entity (fqnHash, deleted); +CREATE INDEX index_glossary_term_entity_deleted ON glossary_term_entity (fqnHash, deleted); +CREATE INDEX index_ingestion_pipeline_entity_deleted ON ingestion_pipeline_entity (fqnHash, deleted); +CREATE INDEX index_metric_entity_deleted ON metric_entity (fqnHash, deleted); +CREATE INDEX index_ml_model_entity_deleted ON ml_model_entity (fqnHash, deleted); +CREATE INDEX index_pipeline_entity_deleted ON pipeline_entity (fqnHash, deleted); +CREATE INDEX index_policy_entity_deleted ON policy_entity (fqnHash, deleted); +CREATE INDEX index_report_entity_deleted ON report_entity (fqnHash, deleted); +CREATE INDEX index_search_index_entity_deleted ON search_index_entity (fqnHash, deleted); +CREATE INDEX index_storage_container_entity_deleted ON storage_container_entity (fqnHash, deleted); +CREATE INDEX index_stored_procedure_entity_deleted ON stored_procedure_entity (fqnHash, deleted); +CREATE INDEX index_table_entity_deleted ON table_entity (fqnHash, deleted); +CREATE INDEX index_tag_deleted ON tag (fqnHash, deleted); +CREATE INDEX index_test_case_deleted ON test_case (fqnHash, deleted); +CREATE INDEX index_test_suite_deleted ON test_suite (fqnHash, deleted); +CREATE INDEX index_topic_entity_deleted ON topic_entity (fqnHash, deleted); +CREATE INDEX index_web_analytic_event_deleted ON web_analytic_event (fqnHash, deleted); + +CREATE INDEX index_apps_marketplace_deleted ON apps_marketplace (nameHash, deleted); +CREATE INDEX index_bot_entity_deleted ON bot_entity (nameHash, deleted); +CREATE INDEX index_classification_deleted ON classification (nameHash, deleted); +CREATE INDEX index_dashboard_service_entity_deleted ON dashboard_service_entity (nameHash, deleted); +CREATE INDEX index_dbservice_entity_deleted ON dbservice_entity (nameHash, deleted); +CREATE INDEX index_glossary_entity_deleted ON glossary_entity (nameHash, deleted); +CREATE INDEX index_installed_apps_deleted ON installed_apps (nameHash, deleted); +CREATE INDEX index_knowledge_center_deleted ON knowledge_center (nameHash, deleted); +CREATE INDEX index_kpi_entity_deleted ON kpi_entity (nameHash, deleted); +CREATE INDEX index_messaging_service_entity_deleted ON messaging_service_entity (nameHash, deleted); +CREATE INDEX index_metadata_service_entity_deleted ON metadata_service_entity (nameHash, deleted); +CREATE INDEX index_mlmodel_service_entity_deleted ON mlmodel_service_entity (nameHash, deleted); +CREATE INDEX index_pipeline_service_entity_deleted ON pipeline_service_entity (nameHash, deleted); +CREATE INDEX index_role_entity_deleted ON role_entity (nameHash, deleted); +CREATE INDEX index_search_service_entity_deleted ON search_service_entity (nameHash, deleted); +CREATE INDEX index_storage_service_entity_deleted ON storage_service_entity (nameHash, deleted); +CREATE INDEX index_team_entity_deleted ON team_entity (nameHash, deleted); +CREATE INDEX index_user_entity_deleted ON user_entity (nameHash, deleted); + +CREATE INDEX apps_extension_time_series_index ON apps_extension_time_series (appId); +CREATE INDEX index_suggestions_type ON suggestions (suggestionType); +CREATE INDEX index_suggestions_status ON suggestions (status); diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index e52a2f324ef..50627c79253 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -201,6 +201,9 @@ class REST: ) time.sleep(retry_wait) retry -= 1 + if retry == 0: + logger.error(f"No more retries left for {url}") + traceback.format_exc() return None def _one_request(self, method: str, url: URL, opts: dict, retry: int): diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index b278a54dad9..0e2d5e2479c 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -405,7 +405,12 @@ class OpenMetadata( try: entities.append(entity(**elmt)) except Exception as exc: - logger.error(f"Error creating entity. Failed with exception {exc}") + logger.error( + f"Error creating entity [{entity.__name__}]. Failed with exception {exc}" + ) + logger.debug( + f"Can't create [{entity.__name__}] from [{elmt}]. Skipping." + ) continue else: entities = [entity(**elmt) for elmt in resp["data"]] diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 7d9c5190852..094ce932de0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -522,7 +522,7 @@ public interface CollectionDAO { } String sqlCondition = String.format("%s AND er.toId is NULL", condition); - return listCount(getTableName(), sqlCondition); + return listCount(getTableName(), getNameHashColumn(), sqlCondition); } @SqlQuery( @@ -563,7 +563,17 @@ public interface CollectionDAO { @Bind("limit") int limit, @Bind("after") String after); - @SqlQuery( + @ConnectionAwareSqlQuery( + value = + "SELECT count() FROM ce " + + "LEFT JOIN (" + + " SELECT toId FROM entity_relationship " + + " WHERE fromEntity = 'container' AND toEntity = 'container' AND relation = 0 " + + ") er " + + "on ce.id = er.toId " + + "", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( value = "SELECT count(*) FROM
ce " + "LEFT JOIN (" @@ -571,8 +581,12 @@ public interface CollectionDAO { + " WHERE fromEntity = 'container' AND toEntity = 'container' AND relation = 0 " + ") er " + "on ce.id = er.toId " - + "") - int listCount(@Define("table") String table, @Define("sqlCondition") String mysqlCond); + + "", + connectionType = POSTGRES) + int listCount( + @Define("table") String table, + @Define("nameHashColumn") String nameHashColumn, + @Define("sqlCondition") String mysqlCond); } interface SearchServiceDAO extends EntityDAO { @@ -1971,11 +1985,11 @@ public interface CollectionDAO { String.format("%s %s", mySqlCondition, filter.getCondition(getTableName())); postgresCondition = String.format("%s %s", postgresCondition, filter.getCondition(getTableName())); - return listCount(getTableName(), mySqlCondition, postgresCondition); + return listCount(getTableName(), getNameHashColumn(), mySqlCondition, postgresCondition); } String condition = filter.getCondition(getTableName()); - return listCount(getTableName(), condition, condition); + return listCount(getTableName(), getNameHashColumn(), condition, condition); } @Override @@ -2217,7 +2231,7 @@ public interface CollectionDAO { mySqlCondition = String.format("%s %s", mySqlCondition, filter.getCondition("tag")); postgresCondition = String.format("%s %s", postgresCondition, filter.getCondition("tag")); - return listCount(getTableName(), mySqlCondition, postgresCondition); + return listCount(getTableName(), getNameHashColumn(), mySqlCondition, postgresCondition); } @Override @@ -2645,7 +2659,7 @@ public interface CollectionDAO { "%s AND ((json#>'{isJoinable}')::boolean) = %s ", postgresCondition, isJoinable); } - return listCount(getTableName(), mySqlCondition, postgresCondition); + return listCount(getTableName(), getNameHashColumn(), mySqlCondition, postgresCondition); } @Override @@ -3386,7 +3400,8 @@ public interface CollectionDAO { String psqlStr = String.format("AND supported_data_types @> '`%s`' ", supportedDataType); psqlCondition.append(psqlStr.replace('`', '"')); } - return listCount(getTableName(), mysqlCondition.toString(), psqlCondition.toString()); + return listCount( + getTableName(), getNameHashColumn(), mysqlCondition.toString(), psqlCondition.toString()); } @ConnectionAwareSqlQuery( @@ -3428,13 +3443,14 @@ public interface CollectionDAO { @Bind("after") String after); @ConnectionAwareSqlQuery( - value = "SELECT count(*) FROM
", + value = "SELECT count() FROM
", connectionType = MYSQL) @ConnectionAwareSqlQuery( value = "SELECT count(*) FROM
", connectionType = POSTGRES) int listCount( @Define("table") String table, + @Define("nameHashColumn") String nameHashColumn, @Define("mysqlCond") String mysqlCond, @Define("psqlCond") String psqlCond); } @@ -3474,7 +3490,7 @@ public interface CollectionDAO { postgresCondition = String.format("%s %s", postgresCondition, filter.getCondition(getTableName())); } - return listCount( + return listCountDistinct( getTableName(), mySqlCondition, postgresCondition, @@ -3862,6 +3878,29 @@ public interface CollectionDAO { } interface SystemDAO { + @ConnectionAwareSqlQuery( + value = + "SELECT (SELECT COUNT(fqnHash) FROM table_entity ) as tableCount, " + + "(SELECT COUNT(fqnHash) FROM topic_entity ) as topicCount, " + + "(SELECT COUNT(fqnHash) FROM dashboard_entity ) as dashboardCount, " + + "(SELECT COUNT(fqnHash) FROM pipeline_entity ) as pipelineCount, " + + "(SELECT COUNT(fqnHash) FROM ml_model_entity ) as mlmodelCount, " + + "(SELECT COUNT(fqnHash) FROM storage_container_entity ) as storageContainerCount, " + + "(SELECT COUNT(fqnHash) FROM search_index_entity ) as searchIndexCount, " + + "(SELECT COUNT(nameHash) FROM glossary_entity ) as glossaryCount, " + + "(SELECT COUNT(fqnHash) FROM glossary_term_entity ) as glossaryTermCount, " + + "(SELECT (SELECT COUNT(nameHash) FROM metadata_service_entity ) + " + + "(SELECT COUNT(nameHash) FROM dbservice_entity )+" + + "(SELECT COUNT(nameHash) FROM messaging_service_entity )+ " + + "(SELECT COUNT(nameHash) FROM dashboard_service_entity )+ " + + "(SELECT COUNT(nameHash) FROM pipeline_service_entity )+ " + + "(SELECT COUNT(nameHash) FROM mlmodel_service_entity )+ " + + "(SELECT COUNT(nameHash) FROM search_service_entity )+ " + + "(SELECT COUNT(nameHash) FROM storage_service_entity )) as servicesCount, " + + "(SELECT COUNT(nameHash) FROM user_entity AND (JSON_EXTRACT(json, '$.isBot') IS NULL OR JSON_EXTRACT(json, '$.isBot') = FALSE)) as userCount, " + + "(SELECT COUNT(nameHash) FROM team_entity ) as teamCount, " + + "(SELECT COUNT(fqnHash) FROM test_suite ) as testSuiteCount", + connectionType = MYSQL) @ConnectionAwareSqlQuery( value = "SELECT (SELECT COUNT(*) FROM table_entity ) as tableCount, " @@ -3881,29 +3920,6 @@ public interface CollectionDAO { + "(SELECT COUNT(*) FROM mlmodel_service_entity )+ " + "(SELECT COUNT(*) FROM search_service_entity )+ " + "(SELECT COUNT(*) FROM storage_service_entity )) as servicesCount, " - + "(SELECT COUNT(*) FROM user_entity AND (JSON_EXTRACT(json, '$.isBot') IS NULL OR JSON_EXTRACT(json, '$.isBot') = FALSE)) as userCount, " - + "(SELECT COUNT(*) FROM team_entity ) as teamCount, " - + "(SELECT COUNT(*) FROM test_suite ) as testSuiteCount", - connectionType = MYSQL) - @ConnectionAwareSqlQuery( - value = - "SELECT (SELECT COUNT(*) FROM table_entity ) as tableCount, " - + "(SELECT COUNT(*) FROM topic_entity ) as topicCount, " - + "(SELECT COUNT(*) FROM dashboard_entity ) as dashboardCount, " - + "(SELECT COUNT(*) FROM pipeline_entity ) as pipelineCount, " - + "(SELECT COUNT(*) FROM ml_model_entity ) as mlmodelCount, " - + "(SELECT COUNT(*) FROM storage_container_entity ) as storageContainerCount, " - + "(SELECT COUNT(*) FROM search_index_entity ) as searchIndexCount, " - + "(SELECT COUNT(*) FROM glossary_entity ) as glossaryCount, " - + "(SELECT COUNT(*) FROM glossary_term_entity ) as glossaryTermCount, " - + "(SELECT (SELECT COUNT(*) FROM metadata_service_entity ) + " - + "(SELECT COUNT(*) FROM dbservice_entity )+ " - + "(SELECT COUNT(*) FROM messaging_service_entity )+ " - + "(SELECT COUNT(*) FROM dashboard_service_entity )+ " - + "(SELECT COUNT(*) FROM pipeline_service_entity )+ " - + "(SELECT COUNT(*) FROM mlmodel_service_entity )+ " - + "(SELECT COUNT(*) FROM search_service_entity )+ " - + "(SELECT COUNT(*) FROM storage_service_entity )) as servicesCount, " + "(SELECT COUNT(*) FROM user_entity AND (json#>'{isBot}' IS NULL OR ((json#>'{isBot}')::boolean) = FALSE)) as userCount, " + "(SELECT COUNT(*) FROM team_entity ) as teamCount, " + "(SELECT COUNT(*) FROM test_suite ) as testSuiteCount", @@ -3911,14 +3927,26 @@ public interface CollectionDAO { @RegisterRowMapper(EntitiesCountRowMapper.class) EntitiesCount getAggregatedEntitiesCount(@Define("cond") String cond) throws StatementException; - @SqlQuery( - "SELECT (SELECT COUNT(*) FROM database_entity ) as databaseServiceCount, " - + "(SELECT COUNT(*) FROM messaging_service_entity ) as messagingServiceCount, " - + "(SELECT COUNT(*) FROM dashboard_service_entity ) as dashboardServiceCount, " - + "(SELECT COUNT(*) FROM pipeline_service_entity ) as pipelineServiceCount, " - + "(SELECT COUNT(*) FROM mlmodel_service_entity ) as mlModelServiceCount, " - + "(SELECT COUNT(*) FROM storage_service_entity ) as storageServiceCount, " - + "(SELECT COUNT(*) FROM search_service_entity ) as searchServiceCount") + @ConnectionAwareSqlQuery( + value = + "SELECT (SELECT COUNT(nameHash) FROM dbservice_entity ) as databaseServiceCount, " + + "(SELECT COUNT(nameHash) FROM messaging_service_entity ) as messagingServiceCount, " + + "(SELECT COUNT(nameHash) FROM dashboard_service_entity ) as dashboardServiceCount, " + + "(SELECT COUNT(nameHash) FROM pipeline_service_entity ) as pipelineServiceCount, " + + "(SELECT COUNT(nameHash) FROM mlmodel_service_entity ) as mlModelServiceCount, " + + "(SELECT COUNT(nameHash) FROM storage_service_entity ) as storageServiceCount, " + + "(SELECT COUNT(nameHash) FROM search_service_entity ) as searchServiceCount", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT (SELECT COUNT(*) FROM dbservice_entity ) as databaseServiceCount, " + + "(SELECT COUNT(*) FROM messaging_service_entity ) as messagingServiceCount, " + + "(SELECT COUNT(*) FROM dashboard_service_entity ) as dashboardServiceCount, " + + "(SELECT COUNT(*) FROM pipeline_service_entity ) as pipelineServiceCount, " + + "(SELECT COUNT(*) FROM mlmodel_service_entity ) as mlModelServiceCount, " + + "(SELECT COUNT(*) FROM storage_service_entity ) as storageServiceCount, " + + "(SELECT COUNT(*) FROM search_service_entity ) as searchServiceCount", + connectionType = POSTGRES) @RegisterRowMapper(ServicesCountRowMapper.class) ServicesCount getAggregatedServicesCount(@Define("cond") String cond) throws StatementException; @@ -4290,7 +4318,8 @@ public interface CollectionDAO { psqlCondition.append(String.format(" AND entityType='%s' ", entityType)); } - return listCount(getTableName(), mysqlCondition.toString(), psqlCondition.toString()); + return listCount( + getTableName(), getNameHashColumn(), mysqlCondition.toString(), psqlCondition.toString()); } @ConnectionAwareSqlQuery( @@ -4330,17 +4359,6 @@ public interface CollectionDAO { @Define("psqlCond") String psqlCond, @Bind("limit") int limit, @Bind("after") String after); - - @ConnectionAwareSqlQuery( - value = "SELECT count(*) FROM
", - connectionType = MYSQL) - @ConnectionAwareSqlQuery( - value = "SELECT count(*) FROM
", - connectionType = POSTGRES) - int listCount( - @Define("table") String table, - @Define("mysqlCond") String mysqlCond, - @Define("psqlCond") String psqlCond); } interface SuggestionDAO { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java index 9a89b16120f..8749654944d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java @@ -133,17 +133,21 @@ public interface EntityDAO { @BindFQN("name") String name, @Define("cond") String cond); - @SqlQuery("SELECT count(*) FROM
") - int listCount(@Define("table") String table, @Define("cond") String cond); + @SqlQuery("SELECT count() FROM
") + int listCount( + @Define("table") String table, + @Define("nameHashColumn") String nameHashColumn, + @Define("cond") String cond); @ConnectionAwareSqlQuery( - value = "SELECT count(*) FROM
", + value = "SELECT count() FROM
", connectionType = MYSQL) @ConnectionAwareSqlQuery( value = "SELECT count(*) FROM
", connectionType = POSTGRES) int listCount( @Define("table") String table, + @Define("nameHashColumn") String nameHashColumn, @Define("mysqlCond") String mysqlCond, @Define("postgresCond") String postgresCond); @@ -199,8 +203,12 @@ public interface EntityDAO { @Bind("limit") int limit, @Bind("after") String after); - @SqlQuery("SELECT count(*) FROM
") - int listTotalCount(@Define("table") String table); + @ConnectionAwareSqlQuery( + value = "SELECT count() FROM
", + connectionType = MYSQL) + @ConnectionAwareSqlQuery(value = "SELECT count(*) FROM
", connectionType = POSTGRES) + int listTotalCount( + @Define("table") String table, @Define("nameHashColumn") String nameHashColumn); @ConnectionAwareSqlQuery( value = "SELECT count(distinct()) FROM
", @@ -208,7 +216,7 @@ public interface EntityDAO { @ConnectionAwareSqlQuery( value = "SELECT count(distinct()) FROM
", connectionType = POSTGRES) - int listCount( + int listCountDistinct( @Define("table") String table, @Define("mysqlCond") String mysqlCond, @Define("postgresCond") String postgresCond, @@ -325,6 +333,14 @@ public interface EntityDAO { @SqlUpdate("DELETE FROM
WHERE id = :id") int delete(@Define("table") String table, @BindUUID("id") UUID id); + @ConnectionAwareSqlUpdate(value = "ANALYZE TABLE
", connectionType = MYSQL) + @ConnectionAwareSqlUpdate(value = "ANALYZE
", connectionType = POSTGRES) + void analyze(@Define("table") String table); + + default void analyzeTable() { + analyze(getTableName()); + } + /** Default methods that interfaces with implementation. Don't override */ default void insert(EntityInterface entity, String fqn) { insert(getTableName(), getNameHashColumn(), fqn, JsonUtils.pojoToJson(entity)); @@ -403,11 +419,11 @@ public interface EntityDAO { } default int listCount(ListFilter filter) { - return listCount(getTableName(), filter.getCondition()); + return listCount(getTableName(), getNameHashColumn(), filter.getCondition()); } default int listTotalCount() { - return listTotalCount(getTableName()); + return listTotalCount(getTableName(), getNameHashColumn()); } default List listBefore(ListFilter filter, int limit, String before) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index 39b752e531e..c9ff047b7f9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -35,6 +35,7 @@ import org.flywaydb.core.api.MigrationVersion; import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.sqlobject.SqlObjectPlugin; import org.jdbi.v3.sqlobject.SqlObjects; +import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.ServiceEntityInterface; import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppSchedule; @@ -49,8 +50,10 @@ import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.apps.ApplicationHandler; import org.openmetadata.service.apps.scheduler.AppScheduler; import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; +import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.fernet.Fernet; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.IngestionPipelineRepository; import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.MigrationDAO; @@ -313,11 +316,38 @@ public class OpenMetadataOperations implements Callable { new SecretsManagerUpdateService(secretsManager, config.getClusterName()).updateEntities(); return 0; } catch (Exception e) { - LOG.error("Failed to deploy pipelines due to ", e); + LOG.error("Failed to migrate secrets due to ", e); return 1; } } + @Command( + name = "analyze-tables", + description = + "Migrate secrets from DB to the configured Secrets Manager. " + + "Note that this does not support migrating between external Secrets Managers") + public Integer analyzeTables() { + try { + LOG.info("Analyzing Tables..."); + parseConfig(); + Entity.getEntityList().forEach(this::analyzeEntityTable); + return 0; + } catch (Exception e) { + LOG.error("Failed to analyze tables due to ", e); + return 1; + } + } + + private void analyzeEntityTable(String entity) { + try { + EntityRepository repository = Entity.getEntityRepository(entity); + LOG.info("Analyzing table for [{}] Entity", entity); + repository.getDao().analyzeTable(); + } catch (EntityNotFoundException e) { + LOG.debug("No repository for [{}] Entity", entity); + } + } + private void deployPipeline( IngestionPipeline pipeline, PipelineServiceClient pipelineServiceClient,