MINOR - Review query performance (#15553)

* MINOR - Review query performance

* MINOR - Review query performance

* MINOR - Review query performance

* MINOR - Review query performance
This commit is contained in:
Pere Miquel Brull 2024-03-14 06:37:38 +01:00 committed by GitHub
parent dcbb1dd0f8
commit fd403bae9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 230 additions and 66 deletions

View File

@ -5,4 +5,50 @@ WHERE serviceType = 'MongoDB';
ALTER TABLE query_entity ADD COLUMN checksum VARCHAR(32) GENERATED ALWAYS AS (json ->> '$.checksum') NOT NULL UNIQUE;
UPDATE query_entity SET json = JSON_INSERT(json, '$.checksum', MD5(JSON_UNQUOTE(JSON_EXTRACT(json, '$.checksum'))));
UPDATE query_entity SET json = JSON_INSERT(json, '$.checksum', MD5(JSON_UNQUOTE(JSON_EXTRACT(json, '$.checksum'))));
ALTER TABLE chart_entity ADD INDEX index_chart_entity_deleted(fqnHash, deleted);
ALTER TABLE dashboard_data_model_entity ADD INDEX index_dashboard_data_model_entity_deleted(fqnHash, deleted);
ALTER TABLE dashboard_entity ADD INDEX index_dashboard_entity_deleted(fqnHash, deleted);
ALTER TABLE data_insight_chart ADD INDEX index_data_insight_chart_deleted(fqnHash, deleted);
ALTER TABLE database_entity ADD INDEX index_database_entity_deleted(fqnHash, deleted);
ALTER TABLE database_schema_entity ADD INDEX index_database_schema_entity_deleted(fqnHash, deleted);
ALTER TABLE glossary_term_entity ADD INDEX index_glossary_term_entity_deleted(fqnHash, deleted);
ALTER TABLE ingestion_pipeline_entity ADD INDEX index_ingestion_pipeline_entity_deleted(fqnHash, deleted);
ALTER TABLE metric_entity ADD INDEX index_metric_entity_deleted(fqnHash, deleted);
ALTER TABLE ml_model_entity ADD INDEX index_ml_model_entity_deleted(fqnHash, deleted);
ALTER TABLE pipeline_entity ADD INDEX index_pipeline_entity_deleted(fqnHash, deleted);
ALTER TABLE policy_entity ADD INDEX index_policy_entity_deleted(fqnHash, deleted);
ALTER TABLE report_entity ADD INDEX index_report_entity_deleted(fqnHash, deleted);
ALTER TABLE search_index_entity ADD INDEX index_search_index_entity_deleted(fqnHash, deleted);
ALTER TABLE storage_container_entity ADD INDEX index_storage_container_entity_deleted(fqnHash, deleted);
ALTER TABLE stored_procedure_entity ADD INDEX index_stored_procedure_entity_deleted(fqnHash, deleted);
ALTER TABLE table_entity ADD INDEX index_table_entity_deleted(fqnHash, deleted);
ALTER TABLE tag ADD INDEX index_tag_deleted(fqnHash, deleted);
ALTER TABLE test_case ADD INDEX index_test_case_deleted(fqnHash, deleted);
ALTER TABLE test_suite ADD INDEX index_test_suite_deleted(fqnHash, deleted);
ALTER TABLE topic_entity ADD INDEX index_topic_entity_deleted(fqnHash, deleted);
ALTER TABLE web_analytic_event ADD INDEX index_web_analytic_event_deleted(fqnHash, deleted);
ALTER TABLE apps_marketplace ADD INDEX index_apps_marketplace_deleted(nameHash, deleted);
ALTER TABLE bot_entity ADD INDEX index_bot_entity_deleted(nameHash, deleted);
ALTER TABLE classification ADD INDEX index_classification_deleted(nameHash, deleted);
ALTER TABLE dashboard_service_entity ADD INDEX index_dashboard_service_entity_deleted(nameHash, deleted);
ALTER TABLE dbservice_entity ADD INDEX index_dbservice_entity_deleted(nameHash, deleted);
ALTER TABLE glossary_entity ADD INDEX index_glossary_entity_deleted(nameHash, deleted);
ALTER TABLE installed_apps ADD INDEX index_installed_apps_deleted(nameHash, deleted);
ALTER TABLE knowledge_center ADD INDEX index_knowledge_center_deleted(nameHash, deleted);
ALTER TABLE kpi_entity ADD INDEX index_kpi_entity_deleted(nameHash, deleted);
ALTER TABLE messaging_service_entity ADD INDEX index_messaging_service_entity_deleted(nameHash, deleted);
ALTER TABLE metadata_service_entity ADD INDEX index_metadata_service_entity_deleted(nameHash, deleted);
ALTER TABLE mlmodel_service_entity ADD INDEX index_mlmodel_service_entity_deleted(nameHash, deleted);
ALTER TABLE pipeline_service_entity ADD INDEX index_pipeline_service_entity_deleted(nameHash, deleted);
ALTER TABLE role_entity ADD INDEX index_role_entity_deleted(nameHash, deleted);
ALTER TABLE search_service_entity ADD INDEX index_search_service_entity_deleted(nameHash, deleted);
ALTER TABLE storage_service_entity ADD INDEX index_storage_service_entity_deleted(nameHash, deleted);
ALTER TABLE team_entity ADD INDEX index_team_entity_deleted(nameHash, deleted);
ALTER TABLE user_entity ADD INDEX index_user_entity_deleted(nameHash, deleted);
ALTER TABLE apps_extension_time_series ADD INDEX apps_extension_time_series_index(appId);
ALTER TABLE suggestions ADD INDEX index_suggestions_type(suggestionType);
ALTER TABLE suggestions ADD INDEX index_suggestions_status(status);

View File

@ -6,4 +6,50 @@ WHERE serviceType = 'MongoDB';
ALTER TABLE query_entity ADD COLUMN checksum varchar(32) GENERATED ALWAYS AS (json ->> 'checksum') STORED NOT NULL,
ADD UNIQUE(checksum);
UPDATE query_entity SET json = jsonb_set(json::jsonb, '{checksum}', MD5(json->'connection'));
UPDATE query_entity SET json = jsonb_set(json::jsonb, '{checksum}', MD5(json->'connection'));
CREATE INDEX index_chart_entity_deleted ON chart_entity (fqnHash, deleted);
CREATE INDEX index_dashboard_data_model_entity_deleted ON dashboard_data_model_entity (fqnHash, deleted);
CREATE INDEX index_dashboard_entity_deleted ON dashboard_entity (fqnHash, deleted);
CREATE INDEX index_data_insight_chart_deleted ON data_insight_chart (fqnHash, deleted);
CREATE INDEX index_database_entity_deleted ON database_entity (fqnHash, deleted);
CREATE INDEX index_database_schema_entity_deleted ON database_schema_entity (fqnHash, deleted);
CREATE INDEX index_glossary_term_entity_deleted ON glossary_term_entity (fqnHash, deleted);
CREATE INDEX index_ingestion_pipeline_entity_deleted ON ingestion_pipeline_entity (fqnHash, deleted);
CREATE INDEX index_metric_entity_deleted ON metric_entity (fqnHash, deleted);
CREATE INDEX index_ml_model_entity_deleted ON ml_model_entity (fqnHash, deleted);
CREATE INDEX index_pipeline_entity_deleted ON pipeline_entity (fqnHash, deleted);
CREATE INDEX index_policy_entity_deleted ON policy_entity (fqnHash, deleted);
CREATE INDEX index_report_entity_deleted ON report_entity (fqnHash, deleted);
CREATE INDEX index_search_index_entity_deleted ON search_index_entity (fqnHash, deleted);
CREATE INDEX index_storage_container_entity_deleted ON storage_container_entity (fqnHash, deleted);
CREATE INDEX index_stored_procedure_entity_deleted ON stored_procedure_entity (fqnHash, deleted);
CREATE INDEX index_table_entity_deleted ON table_entity (fqnHash, deleted);
CREATE INDEX index_tag_deleted ON tag (fqnHash, deleted);
CREATE INDEX index_test_case_deleted ON test_case (fqnHash, deleted);
CREATE INDEX index_test_suite_deleted ON test_suite (fqnHash, deleted);
CREATE INDEX index_topic_entity_deleted ON topic_entity (fqnHash, deleted);
CREATE INDEX index_web_analytic_event_deleted ON web_analytic_event (fqnHash, deleted);
CREATE INDEX index_apps_marketplace_deleted ON apps_marketplace (nameHash, deleted);
CREATE INDEX index_bot_entity_deleted ON bot_entity (nameHash, deleted);
CREATE INDEX index_classification_deleted ON classification (nameHash, deleted);
CREATE INDEX index_dashboard_service_entity_deleted ON dashboard_service_entity (nameHash, deleted);
CREATE INDEX index_dbservice_entity_deleted ON dbservice_entity (nameHash, deleted);
CREATE INDEX index_glossary_entity_deleted ON glossary_entity (nameHash, deleted);
CREATE INDEX index_installed_apps_deleted ON installed_apps (nameHash, deleted);
CREATE INDEX index_knowledge_center_deleted ON knowledge_center (nameHash, deleted);
CREATE INDEX index_kpi_entity_deleted ON kpi_entity (nameHash, deleted);
CREATE INDEX index_messaging_service_entity_deleted ON messaging_service_entity (nameHash, deleted);
CREATE INDEX index_metadata_service_entity_deleted ON metadata_service_entity (nameHash, deleted);
CREATE INDEX index_mlmodel_service_entity_deleted ON mlmodel_service_entity (nameHash, deleted);
CREATE INDEX index_pipeline_service_entity_deleted ON pipeline_service_entity (nameHash, deleted);
CREATE INDEX index_role_entity_deleted ON role_entity (nameHash, deleted);
CREATE INDEX index_search_service_entity_deleted ON search_service_entity (nameHash, deleted);
CREATE INDEX index_storage_service_entity_deleted ON storage_service_entity (nameHash, deleted);
CREATE INDEX index_team_entity_deleted ON team_entity (nameHash, deleted);
CREATE INDEX index_user_entity_deleted ON user_entity (nameHash, deleted);
CREATE INDEX apps_extension_time_series_index ON apps_extension_time_series (appId);
CREATE INDEX index_suggestions_type ON suggestions (suggestionType);
CREATE INDEX index_suggestions_status ON suggestions (status);

View File

@ -201,6 +201,9 @@ class REST:
)
time.sleep(retry_wait)
retry -= 1
if retry == 0:
logger.error(f"No more retries left for {url}")
traceback.format_exc()
return None
def _one_request(self, method: str, url: URL, opts: dict, retry: int):

View File

@ -405,7 +405,12 @@ class OpenMetadata(
try:
entities.append(entity(**elmt))
except Exception as exc:
logger.error(f"Error creating entity. Failed with exception {exc}")
logger.error(
f"Error creating entity [{entity.__name__}]. Failed with exception {exc}"
)
logger.debug(
f"Can't create [{entity.__name__}] from [{elmt}]. Skipping."
)
continue
else:
entities = [entity(**elmt) for elmt in resp["data"]]

View File

@ -522,7 +522,7 @@ public interface CollectionDAO {
}
String sqlCondition = String.format("%s AND er.toId is NULL", condition);
return listCount(getTableName(), sqlCondition);
return listCount(getTableName(), getNameHashColumn(), sqlCondition);
}
@SqlQuery(
@ -563,7 +563,17 @@ public interface CollectionDAO {
@Bind("limit") int limit,
@Bind("after") String after);
@SqlQuery(
@ConnectionAwareSqlQuery(
value =
"SELECT count(<nameHashColumn>) FROM <table> ce "
+ "LEFT JOIN ("
+ " SELECT toId FROM entity_relationship "
+ " WHERE fromEntity = 'container' AND toEntity = 'container' AND relation = 0 "
+ ") er "
+ "on ce.id = er.toId "
+ "<sqlCondition>",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT count(*) FROM <table> ce "
+ "LEFT JOIN ("
@ -571,8 +581,12 @@ public interface CollectionDAO {
+ " WHERE fromEntity = 'container' AND toEntity = 'container' AND relation = 0 "
+ ") er "
+ "on ce.id = er.toId "
+ "<sqlCondition>")
int listCount(@Define("table") String table, @Define("sqlCondition") String mysqlCond);
+ "<sqlCondition>",
connectionType = POSTGRES)
int listCount(
@Define("table") String table,
@Define("nameHashColumn") String nameHashColumn,
@Define("sqlCondition") String mysqlCond);
}
interface SearchServiceDAO extends EntityDAO<SearchService> {
@ -1971,11 +1985,11 @@ public interface CollectionDAO {
String.format("%s %s", mySqlCondition, filter.getCondition(getTableName()));
postgresCondition =
String.format("%s %s", postgresCondition, filter.getCondition(getTableName()));
return listCount(getTableName(), mySqlCondition, postgresCondition);
return listCount(getTableName(), getNameHashColumn(), mySqlCondition, postgresCondition);
}
String condition = filter.getCondition(getTableName());
return listCount(getTableName(), condition, condition);
return listCount(getTableName(), getNameHashColumn(), condition, condition);
}
@Override
@ -2217,7 +2231,7 @@ public interface CollectionDAO {
mySqlCondition = String.format("%s %s", mySqlCondition, filter.getCondition("tag"));
postgresCondition = String.format("%s %s", postgresCondition, filter.getCondition("tag"));
return listCount(getTableName(), mySqlCondition, postgresCondition);
return listCount(getTableName(), getNameHashColumn(), mySqlCondition, postgresCondition);
}
@Override
@ -2645,7 +2659,7 @@ public interface CollectionDAO {
"%s AND ((json#>'{isJoinable}')::boolean) = %s ", postgresCondition, isJoinable);
}
return listCount(getTableName(), mySqlCondition, postgresCondition);
return listCount(getTableName(), getNameHashColumn(), mySqlCondition, postgresCondition);
}
@Override
@ -3386,7 +3400,8 @@ public interface CollectionDAO {
String psqlStr = String.format("AND supported_data_types @> '`%s`' ", supportedDataType);
psqlCondition.append(psqlStr.replace('`', '"'));
}
return listCount(getTableName(), mysqlCondition.toString(), psqlCondition.toString());
return listCount(
getTableName(), getNameHashColumn(), mysqlCondition.toString(), psqlCondition.toString());
}
@ConnectionAwareSqlQuery(
@ -3428,13 +3443,14 @@ public interface CollectionDAO {
@Bind("after") String after);
@ConnectionAwareSqlQuery(
value = "SELECT count(*) FROM <table> <mysqlCond>",
value = "SELECT count(<nameHashColumn>) FROM <table> <mysqlCond>",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value = "SELECT count(*) FROM <table> <psqlCond>",
connectionType = POSTGRES)
int listCount(
@Define("table") String table,
@Define("nameHashColumn") String nameHashColumn,
@Define("mysqlCond") String mysqlCond,
@Define("psqlCond") String psqlCond);
}
@ -3474,7 +3490,7 @@ public interface CollectionDAO {
postgresCondition =
String.format("%s %s", postgresCondition, filter.getCondition(getTableName()));
}
return listCount(
return listCountDistinct(
getTableName(),
mySqlCondition,
postgresCondition,
@ -3862,6 +3878,29 @@ public interface CollectionDAO {
}
interface SystemDAO {
@ConnectionAwareSqlQuery(
value =
"SELECT (SELECT COUNT(fqnHash) FROM table_entity <cond>) as tableCount, "
+ "(SELECT COUNT(fqnHash) FROM topic_entity <cond>) as topicCount, "
+ "(SELECT COUNT(fqnHash) FROM dashboard_entity <cond>) as dashboardCount, "
+ "(SELECT COUNT(fqnHash) FROM pipeline_entity <cond>) as pipelineCount, "
+ "(SELECT COUNT(fqnHash) FROM ml_model_entity <cond>) as mlmodelCount, "
+ "(SELECT COUNT(fqnHash) FROM storage_container_entity <cond>) as storageContainerCount, "
+ "(SELECT COUNT(fqnHash) FROM search_index_entity <cond>) as searchIndexCount, "
+ "(SELECT COUNT(nameHash) FROM glossary_entity <cond>) as glossaryCount, "
+ "(SELECT COUNT(fqnHash) FROM glossary_term_entity <cond>) as glossaryTermCount, "
+ "(SELECT (SELECT COUNT(nameHash) FROM metadata_service_entity <cond>) + "
+ "(SELECT COUNT(nameHash) FROM dbservice_entity <cond>)+"
+ "(SELECT COUNT(nameHash) FROM messaging_service_entity <cond>)+ "
+ "(SELECT COUNT(nameHash) FROM dashboard_service_entity <cond>)+ "
+ "(SELECT COUNT(nameHash) FROM pipeline_service_entity <cond>)+ "
+ "(SELECT COUNT(nameHash) FROM mlmodel_service_entity <cond>)+ "
+ "(SELECT COUNT(nameHash) FROM search_service_entity <cond>)+ "
+ "(SELECT COUNT(nameHash) FROM storage_service_entity <cond>)) as servicesCount, "
+ "(SELECT COUNT(nameHash) FROM user_entity <cond> AND (JSON_EXTRACT(json, '$.isBot') IS NULL OR JSON_EXTRACT(json, '$.isBot') = FALSE)) as userCount, "
+ "(SELECT COUNT(nameHash) FROM team_entity <cond>) as teamCount, "
+ "(SELECT COUNT(fqnHash) FROM test_suite <cond>) as testSuiteCount",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT (SELECT COUNT(*) FROM table_entity <cond>) as tableCount, "
@ -3881,29 +3920,6 @@ public interface CollectionDAO {
+ "(SELECT COUNT(*) FROM mlmodel_service_entity <cond>)+ "
+ "(SELECT COUNT(*) FROM search_service_entity <cond>)+ "
+ "(SELECT COUNT(*) FROM storage_service_entity <cond>)) as servicesCount, "
+ "(SELECT COUNT(*) FROM user_entity <cond> AND (JSON_EXTRACT(json, '$.isBot') IS NULL OR JSON_EXTRACT(json, '$.isBot') = FALSE)) as userCount, "
+ "(SELECT COUNT(*) FROM team_entity <cond>) as teamCount, "
+ "(SELECT COUNT(*) FROM test_suite <cond>) as testSuiteCount",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT (SELECT COUNT(*) FROM table_entity <cond>) as tableCount, "
+ "(SELECT COUNT(*) FROM topic_entity <cond>) as topicCount, "
+ "(SELECT COUNT(*) FROM dashboard_entity <cond>) as dashboardCount, "
+ "(SELECT COUNT(*) FROM pipeline_entity <cond>) as pipelineCount, "
+ "(SELECT COUNT(*) FROM ml_model_entity <cond>) as mlmodelCount, "
+ "(SELECT COUNT(*) FROM storage_container_entity <cond>) as storageContainerCount, "
+ "(SELECT COUNT(*) FROM search_index_entity <cond>) as searchIndexCount, "
+ "(SELECT COUNT(*) FROM glossary_entity <cond>) as glossaryCount, "
+ "(SELECT COUNT(*) FROM glossary_term_entity <cond>) as glossaryTermCount, "
+ "(SELECT (SELECT COUNT(*) FROM metadata_service_entity <cond>) + "
+ "(SELECT COUNT(*) FROM dbservice_entity <cond>)+ "
+ "(SELECT COUNT(*) FROM messaging_service_entity <cond>)+ "
+ "(SELECT COUNT(*) FROM dashboard_service_entity <cond>)+ "
+ "(SELECT COUNT(*) FROM pipeline_service_entity <cond>)+ "
+ "(SELECT COUNT(*) FROM mlmodel_service_entity <cond>)+ "
+ "(SELECT COUNT(*) FROM search_service_entity <cond>)+ "
+ "(SELECT COUNT(*) FROM storage_service_entity <cond>)) as servicesCount, "
+ "(SELECT COUNT(*) FROM user_entity <cond> AND (json#>'{isBot}' IS NULL OR ((json#>'{isBot}')::boolean) = FALSE)) as userCount, "
+ "(SELECT COUNT(*) FROM team_entity <cond>) as teamCount, "
+ "(SELECT COUNT(*) FROM test_suite <cond>) as testSuiteCount",
@ -3911,14 +3927,26 @@ public interface CollectionDAO {
@RegisterRowMapper(EntitiesCountRowMapper.class)
EntitiesCount getAggregatedEntitiesCount(@Define("cond") String cond) throws StatementException;
@SqlQuery(
"SELECT (SELECT COUNT(*) FROM database_entity <cond>) as databaseServiceCount, "
+ "(SELECT COUNT(*) FROM messaging_service_entity <cond>) as messagingServiceCount, "
+ "(SELECT COUNT(*) FROM dashboard_service_entity <cond>) as dashboardServiceCount, "
+ "(SELECT COUNT(*) FROM pipeline_service_entity <cond>) as pipelineServiceCount, "
+ "(SELECT COUNT(*) FROM mlmodel_service_entity <cond>) as mlModelServiceCount, "
+ "(SELECT COUNT(*) FROM storage_service_entity <cond>) as storageServiceCount, "
+ "(SELECT COUNT(*) FROM search_service_entity <cond>) as searchServiceCount")
@ConnectionAwareSqlQuery(
value =
"SELECT (SELECT COUNT(nameHash) FROM dbservice_entity <cond>) as databaseServiceCount, "
+ "(SELECT COUNT(nameHash) FROM messaging_service_entity <cond>) as messagingServiceCount, "
+ "(SELECT COUNT(nameHash) FROM dashboard_service_entity <cond>) as dashboardServiceCount, "
+ "(SELECT COUNT(nameHash) FROM pipeline_service_entity <cond>) as pipelineServiceCount, "
+ "(SELECT COUNT(nameHash) FROM mlmodel_service_entity <cond>) as mlModelServiceCount, "
+ "(SELECT COUNT(nameHash) FROM storage_service_entity <cond>) as storageServiceCount, "
+ "(SELECT COUNT(nameHash) FROM search_service_entity <cond>) as searchServiceCount",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT (SELECT COUNT(*) FROM dbservice_entity <cond>) as databaseServiceCount, "
+ "(SELECT COUNT(*) FROM messaging_service_entity <cond>) as messagingServiceCount, "
+ "(SELECT COUNT(*) FROM dashboard_service_entity <cond>) as dashboardServiceCount, "
+ "(SELECT COUNT(*) FROM pipeline_service_entity <cond>) as pipelineServiceCount, "
+ "(SELECT COUNT(*) FROM mlmodel_service_entity <cond>) as mlModelServiceCount, "
+ "(SELECT COUNT(*) FROM storage_service_entity <cond>) as storageServiceCount, "
+ "(SELECT COUNT(*) FROM search_service_entity <cond>) as searchServiceCount",
connectionType = POSTGRES)
@RegisterRowMapper(ServicesCountRowMapper.class)
ServicesCount getAggregatedServicesCount(@Define("cond") String cond) throws StatementException;
@ -4290,7 +4318,8 @@ public interface CollectionDAO {
psqlCondition.append(String.format(" AND entityType='%s' ", entityType));
}
return listCount(getTableName(), mysqlCondition.toString(), psqlCondition.toString());
return listCount(
getTableName(), getNameHashColumn(), mysqlCondition.toString(), psqlCondition.toString());
}
@ConnectionAwareSqlQuery(
@ -4330,17 +4359,6 @@ public interface CollectionDAO {
@Define("psqlCond") String psqlCond,
@Bind("limit") int limit,
@Bind("after") String after);
@ConnectionAwareSqlQuery(
value = "SELECT count(*) FROM <table> <mysqlCond>",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value = "SELECT count(*) FROM <table> <psqlCond>",
connectionType = POSTGRES)
int listCount(
@Define("table") String table,
@Define("mysqlCond") String mysqlCond,
@Define("psqlCond") String psqlCond);
}
interface SuggestionDAO {

View File

@ -133,17 +133,21 @@ public interface EntityDAO<T extends EntityInterface> {
@BindFQN("name") String name,
@Define("cond") String cond);
@SqlQuery("SELECT count(*) FROM <table> <cond>")
int listCount(@Define("table") String table, @Define("cond") String cond);
@SqlQuery("SELECT count(<nameHashColumn>) FROM <table> <cond>")
int listCount(
@Define("table") String table,
@Define("nameHashColumn") String nameHashColumn,
@Define("cond") String cond);
@ConnectionAwareSqlQuery(
value = "SELECT count(*) FROM <table> <mysqlCond>",
value = "SELECT count(<nameHashColumn>) FROM <table> <mysqlCond>",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value = "SELECT count(*) FROM <table> <postgresCond>",
connectionType = POSTGRES)
int listCount(
@Define("table") String table,
@Define("nameHashColumn") String nameHashColumn,
@Define("mysqlCond") String mysqlCond,
@Define("postgresCond") String postgresCond);
@ -199,8 +203,12 @@ public interface EntityDAO<T extends EntityInterface> {
@Bind("limit") int limit,
@Bind("after") String after);
@SqlQuery("SELECT count(*) FROM <table>")
int listTotalCount(@Define("table") String table);
@ConnectionAwareSqlQuery(
value = "SELECT count(<nameHashColumn>) FROM <table>",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(value = "SELECT count(*) FROM <table>", connectionType = POSTGRES)
int listTotalCount(
@Define("table") String table, @Define("nameHashColumn") String nameHashColumn);
@ConnectionAwareSqlQuery(
value = "SELECT count(distinct(<distinctColumn>)) FROM <table> <mysqlCond>",
@ -208,7 +216,7 @@ public interface EntityDAO<T extends EntityInterface> {
@ConnectionAwareSqlQuery(
value = "SELECT count(distinct(<distinctColumn>)) FROM <table> <postgresCond>",
connectionType = POSTGRES)
int listCount(
int listCountDistinct(
@Define("table") String table,
@Define("mysqlCond") String mysqlCond,
@Define("postgresCond") String postgresCond,
@ -325,6 +333,14 @@ public interface EntityDAO<T extends EntityInterface> {
@SqlUpdate("DELETE FROM <table> WHERE id = :id")
int delete(@Define("table") String table, @BindUUID("id") UUID id);
@ConnectionAwareSqlUpdate(value = "ANALYZE TABLE <table>", connectionType = MYSQL)
@ConnectionAwareSqlUpdate(value = "ANALYZE <table>", connectionType = POSTGRES)
void analyze(@Define("table") String table);
default void analyzeTable() {
analyze(getTableName());
}
/** Default methods that interfaces with implementation. Don't override */
default void insert(EntityInterface entity, String fqn) {
insert(getTableName(), getNameHashColumn(), fqn, JsonUtils.pojoToJson(entity));
@ -403,11 +419,11 @@ public interface EntityDAO<T extends EntityInterface> {
}
default int listCount(ListFilter filter) {
return listCount(getTableName(), filter.getCondition());
return listCount(getTableName(), getNameHashColumn(), filter.getCondition());
}
default int listTotalCount() {
return listTotalCount(getTableName());
return listTotalCount(getTableName(), getNameHashColumn());
}
default List<String> listBefore(ListFilter filter, int limit, String before) {

View File

@ -35,6 +35,7 @@ import org.flywaydb.core.api.MigrationVersion;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;
import org.jdbi.v3.sqlobject.SqlObjects;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppSchedule;
@ -49,8 +50,10 @@ import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.apps.ApplicationHandler;
import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.fernet.Fernet;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.MigrationDAO;
@ -313,11 +316,38 @@ public class OpenMetadataOperations implements Callable<Integer> {
new SecretsManagerUpdateService(secretsManager, config.getClusterName()).updateEntities();
return 0;
} catch (Exception e) {
LOG.error("Failed to deploy pipelines due to ", e);
LOG.error("Failed to migrate secrets due to ", e);
return 1;
}
}
@Command(
name = "analyze-tables",
description =
"Migrate secrets from DB to the configured Secrets Manager. "
+ "Note that this does not support migrating between external Secrets Managers")
public Integer analyzeTables() {
try {
LOG.info("Analyzing Tables...");
parseConfig();
Entity.getEntityList().forEach(this::analyzeEntityTable);
return 0;
} catch (Exception e) {
LOG.error("Failed to analyze tables due to ", e);
return 1;
}
}
private void analyzeEntityTable(String entity) {
try {
EntityRepository<? extends EntityInterface> repository = Entity.getEntityRepository(entity);
LOG.info("Analyzing table for [{}] Entity", entity);
repository.getDao().analyzeTable();
} catch (EntityNotFoundException e) {
LOG.debug("No repository for [{}] Entity", entity);
}
}
private void deployPipeline(
IngestionPipeline pipeline,
PipelineServiceClient pipelineServiceClient,