From 977e13d00cc3dbd7396a336c5b33b465779e94de Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Fri, 21 Jul 2023 15:23:37 +0530 Subject: [PATCH] Optimize db calls for entity_extension and field_relationship table (#12543) * Optimize db calls for entity_extension and field_relationship table * change count sql for postgres --- .../service/jdbi3/CollectionDAO.java | 75 ++--- .../service/migration/MigrationUtil.java | 273 ++++++++++-------- .../versions/mysql/v110/MySQLMigration.java | 7 +- .../postgres/v110/PostgresMigration.java | 7 +- 4 files changed, 179 insertions(+), 183 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 7486974ad4e..3658aef6688 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -35,6 +35,7 @@ import java.util.stream.Collectors; import lombok.Builder; import lombok.Getter; import lombok.Setter; +import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.statement.StatementContext; @@ -1253,12 +1254,19 @@ public interface CollectionDAO { @Bind("toType") String toType, @Bind("relation") int relation); - @SqlQuery("SELECT count(*) FROM field_relationship") - int listCount(); + @Deprecated + @ConnectionAwareSqlQuery( + value = "SELECT count(DISTINCT fromFQN, toFQN) FROM field_relationship", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = "SELECT COUNT(*) FROM ( SELECT DISTINCT fromFQN, toFQN FROM field_relationship) AS subquery", + connectionType = POSTGRES) + int listDistinctCount(); - @SqlQuery("SELECT * FROM field_relationship LIMIT :limit OFFSET :offset") + @Deprecated + @SqlQuery("SELECT DISTINCT fromFQN, toFQN FROM field_relationship LIMIT :limit OFFSET :offset") @RegisterRowMapper(FieldRelationShipMapper.class) - List listWithOffset(@Bind("limit") int limit, @Bind("offset") int offset); + List> listDistinctWithOffset(@Bind("limit") int limit, @Bind("offset") int offset); @SqlQuery( "SELECT fromFQN, toFQN, json FROM field_relationship WHERE " @@ -1314,20 +1322,10 @@ public interface CollectionDAO { } } - class FieldRelationShipMapper implements RowMapper { + class FieldRelationShipMapper implements RowMapper> { @Override - public FieldRelationship map(ResultSet rs, StatementContext ctx) throws SQLException { - FieldRelationship result = new FieldRelationship(); - result.setFromFQNHash(rs.getString("fromFQNHash")); - result.setToFQNHash(rs.getString("toFQNHash")); - result.setFromFQN(rs.getString("fromFQN")); - result.setToFQN(rs.getString("toFQN")); - result.setFromType(rs.getString("fromType")); - result.setToType(rs.getString("toType")); - result.setRelation(rs.getInt("relation")); - result.setJsonSchema(rs.getString("jsonSchema")); - result.setJson(rs.getString("json")); - return result; + public Pair map(ResultSet rs, StatementContext ctx) throws SQLException { + return Pair.of(rs.getString("fromFQN"), rs.getString("toFQN")); } } @@ -3151,11 +3149,12 @@ public interface CollectionDAO { "SELECT json FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension") String getExtension(@Bind("entityFQNHash") String entityId, @Bind("extension") String extension); - @SqlQuery("SELECT count(*) FROM entity_extension_time_series WHERE EntityFQNHash = :entityFQNHash") + @SqlQuery("SELECT count(*) FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash") int listCount(@Bind("entityFQNHash") String entityFQNHash); - @SqlQuery("SELECT count(*) FROM entity_extension_time_series") - int listAllCount(); + @SqlQuery("SELECT COUNT(DISTINCT entityFQN) FROM entity_extension_time_series") + @Deprecated + int listDistinctCount(); @ConnectionAwareSqlQuery( value = @@ -3405,39 +3404,9 @@ public interface CollectionDAO { } } - @SqlQuery("select * from entity_extension_time_series LIMIT :limit OFFSET :offset") - @RegisterRowMapper(EntityExtensionTimeSeries.class) - List listWithOffset(@Bind("limit") int limit, @Bind("offset") int offset); - - @Getter - @Setter - class EntityExtensionTimeSeriesTable { - private String entityFQN; - private String extension; - private String jsonSchema; - private String json; - private long timestamp; - private String entityFQNHash; - } - - class EntityExtensionTimeSeries implements RowMapper { - @Override - public EntityExtensionTimeSeriesTable map(ResultSet rs, StatementContext ctx) throws SQLException { - EntityExtensionTimeSeriesTable result = new EntityExtensionTimeSeriesTable(); - // TODO : Ugly , after migration this is removed - try { - result.setEntityFQN(rs.getString("entityFQN")); - } catch (Exception ex) { - // Nothing - } - result.setExtension(rs.getString("extension")); - result.setJsonSchema(rs.getString("jsonSchema")); - result.setJson(rs.getString("json")); - result.setTimestamp(rs.getLong("timestamp")); - result.setEntityFQNHash(rs.getString("entityFQNHash")); - return result; - } - } + @SqlQuery("SELECT DISTINCT entityFQN FROM entity_extension_time_series LIMIT :limit OFFSET :offset") + @Deprecated + List listDistinctWithOffset(@Bind("limit") int limit, @Bind("offset") int offset); } class EntitiesCountRowMapper implements RowMapper { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java index 936e07e71af..e049ee54172 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.UUID; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.statement.PreparedBatch; import org.openmetadata.common.utils.CommonUtil; @@ -88,44 +89,64 @@ public class MigrationUtil { private static final String MYSQL_ENTITY_UPDATE = "UPDATE %s SET %s = :nameHashColumnValue WHERE id = :id"; private static final String POSTGRES_ENTITY_UPDATE = "UPDATE %s SET %s = :nameHashColumnValue WHERE id = :id"; - private static final String MYSQL_ENTITY_EXTENSION_TIME_SERIES_UPDATE = - "UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp"; + "UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN"; private static final String POSTGRES_ENTITY_EXTENSION_TIME_SERIES_UPDATE = - "UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp"; + "UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN"; + + private static final String MYSQL_FIELD_RELATIONSHIP_UPDATE = + "UPDATE field_relationship SET fromFQNHash = :fromFQNHash, toFQNHash = :toFQNHash where fromFQN= :fromFQN and toFQN = :toFQN"; + private static final String POSTGRES_FIELD_RELATIONSHIP_UPDATE = + "UPDATE field_relationship SET fromFQNHash = :fromFQNHash, toFQNHash = :toFQNHash where fromFQN= :fromFQN and toFQN = :toFQN"; @SneakyThrows public static void updateFQNHashForEntity( - Handle handle, Class clazz, EntityDAO dao) { + Handle handle, Class clazz, EntityDAO dao, int limitParam) { if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) { readAndProcessEntity( - handle, String.format(MYSQL_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), clazz, dao, false); + handle, + String.format(MYSQL_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), + clazz, + dao, + false, + limitParam); } else { readAndProcessEntity( handle, String.format(POSTGRES_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), clazz, dao, - false); + false, + limitParam); } } @SneakyThrows public static void updateFQNHashForEntityWithName( - Handle handle, Class clazz, EntityDAO dao) { + Handle handle, Class clazz, EntityDAO dao, int limitParam) { if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) { readAndProcessEntity( - handle, String.format(MYSQL_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), clazz, dao, true); + handle, + String.format(MYSQL_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), + clazz, + dao, + true, + limitParam); } else { readAndProcessEntity( - handle, String.format(POSTGRES_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), clazz, dao, true); + handle, + String.format(POSTGRES_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), + clazz, + dao, + true, + limitParam); } } public static void readAndProcessEntity( - Handle handle, String updateSql, Class clazz, EntityDAO dao, boolean withName) throws IOException { + Handle handle, String updateSql, Class clazz, EntityDAO dao, boolean withName, int limitParam) + throws IOException { LOG.debug("Starting Migration for table : {}", dao.getTableName()); - int limitParam = 1000; int offset = 0; int totalCount = dao.listTotalCount(); while (offset < totalCount) { @@ -137,18 +158,15 @@ public class MigrationUtil { for (String json : jsons) { // Update the Statements to Database T entity = JsonUtils.readValue(json, clazz); - String hash; try { - hash = + String hash = withName ? FullyQualifiedName.buildHash(EntityInterfaceUtil.quoteName(entity.getFullyQualifiedName())) : FullyQualifiedName.buildHash(entity.getFullyQualifiedName()); + upsertBatch.bind("nameHashColumnValue", hash).bind("id", entity.getId().toString()).add(); } catch (Exception ex) { LOG.error("Failed in creating FQN Hash for Entity Name : {}", entity.getFullyQualifiedName(), ex); - // Continue to update further jsons - continue; } - upsertBatch.bind("nameHashColumnValue", hash).bind("id", entity.getId().toString()).add(); } upsertBatch.execute(); } @@ -177,148 +195,151 @@ public class MigrationUtil { return result; } - public static void dataMigrationFQNHashing(Handle handle, CollectionDAO collectionDAO) { + public static void dataMigrationFQNHashing(Handle handle, CollectionDAO collectionDAO, int limitParam) { // Migration for Entities with Name as their FQN // We need to quote the FQN, if these entities have "." in their name we are storing it as it is // into the FQN field. - updateFQNHashForEntityWithName(handle, Bot.class, collectionDAO.botDAO()); - updateFQNHashForEntityWithName(handle, User.class, collectionDAO.userDAO()); - updateFQNHashForEntityWithName(handle, Team.class, collectionDAO.teamDAO()); + updateFQNHashForEntityWithName(handle, Bot.class, collectionDAO.botDAO(), limitParam); + updateFQNHashForEntityWithName(handle, User.class, collectionDAO.userDAO(), limitParam); + updateFQNHashForEntityWithName(handle, Team.class, collectionDAO.teamDAO(), limitParam); // Update all the services - updateFQNHashForEntityWithName(handle, DatabaseService.class, collectionDAO.dbServiceDAO()); - updateFQNHashForEntityWithName(handle, DashboardService.class, collectionDAO.dashboardServiceDAO()); - updateFQNHashForEntityWithName(handle, MessagingService.class, collectionDAO.messagingServiceDAO()); - updateFQNHashForEntityWithName(handle, MetadataService.class, collectionDAO.metadataServiceDAO()); - updateFQNHashForEntityWithName(handle, MlModelService.class, collectionDAO.mlModelServiceDAO()); - updateFQNHashForEntityWithName(handle, StorageService.class, collectionDAO.storageServiceDAO()); - updateFQNHashForEntityWithName(handle, PipelineService.class, collectionDAO.pipelineServiceDAO()); - updateFQNHashForEntity(handle, IngestionPipeline.class, collectionDAO.ingestionPipelineDAO()); + updateFQNHashForEntityWithName(handle, DatabaseService.class, collectionDAO.dbServiceDAO(), limitParam); + updateFQNHashForEntityWithName(handle, DashboardService.class, collectionDAO.dashboardServiceDAO(), limitParam); + updateFQNHashForEntityWithName(handle, MessagingService.class, collectionDAO.messagingServiceDAO(), limitParam); + updateFQNHashForEntityWithName(handle, MetadataService.class, collectionDAO.metadataServiceDAO(), limitParam); + updateFQNHashForEntityWithName(handle, MlModelService.class, collectionDAO.mlModelServiceDAO(), limitParam); + updateFQNHashForEntityWithName(handle, StorageService.class, collectionDAO.storageServiceDAO(), limitParam); + updateFQNHashForEntityWithName(handle, PipelineService.class, collectionDAO.pipelineServiceDAO(), limitParam); + updateFQNHashForEntity(handle, IngestionPipeline.class, collectionDAO.ingestionPipelineDAO(), limitParam); // Update Entities - updateFQNHashForEntity(handle, Database.class, collectionDAO.databaseDAO()); - updateFQNHashForEntity(handle, DatabaseSchema.class, collectionDAO.databaseSchemaDAO()); - updateFQNHashForEntity(handle, Table.class, collectionDAO.tableDAO()); - updateFQNHashForEntity(handle, Query.class, collectionDAO.queryDAO()); - updateFQNHashForEntity(handle, Topic.class, collectionDAO.topicDAO()); - updateFQNHashForEntity(handle, Dashboard.class, collectionDAO.dashboardDAO()); - updateFQNHashForEntity(handle, DashboardDataModel.class, collectionDAO.dashboardDataModelDAO()); - updateFQNHashForEntity(handle, Chart.class, collectionDAO.chartDAO()); - updateFQNHashForEntity(handle, Container.class, collectionDAO.containerDAO()); - updateFQNHashForEntity(handle, MlModel.class, collectionDAO.mlModelDAO()); - updateFQNHashForEntity(handle, Pipeline.class, collectionDAO.pipelineDAO()); - updateFQNHashForEntity(handle, Metrics.class, collectionDAO.metricsDAO()); - updateFQNHashForEntity(handle, Report.class, collectionDAO.reportDAO()); + updateFQNHashForEntity(handle, Database.class, collectionDAO.databaseDAO(), limitParam); + updateFQNHashForEntity(handle, DatabaseSchema.class, collectionDAO.databaseSchemaDAO(), limitParam); + updateFQNHashForEntity(handle, Table.class, collectionDAO.tableDAO(), limitParam); + updateFQNHashForEntity(handle, Query.class, collectionDAO.queryDAO(), limitParam); + updateFQNHashForEntity(handle, Topic.class, collectionDAO.topicDAO(), limitParam); + updateFQNHashForEntity(handle, Dashboard.class, collectionDAO.dashboardDAO(), limitParam); + updateFQNHashForEntity(handle, DashboardDataModel.class, collectionDAO.dashboardDataModelDAO(), limitParam); + updateFQNHashForEntity(handle, Chart.class, collectionDAO.chartDAO(), limitParam); + updateFQNHashForEntity(handle, Container.class, collectionDAO.containerDAO(), limitParam); + updateFQNHashForEntity(handle, MlModel.class, collectionDAO.mlModelDAO(), limitParam); + updateFQNHashForEntity(handle, Pipeline.class, collectionDAO.pipelineDAO(), limitParam); + updateFQNHashForEntity(handle, Metrics.class, collectionDAO.metricsDAO(), limitParam); + updateFQNHashForEntity(handle, Report.class, collectionDAO.reportDAO(), limitParam); // Update Glossaries & Classifications - updateFQNHashForEntity(handle, Classification.class, collectionDAO.classificationDAO()); - updateFQNHashForEntity(handle, Glossary.class, collectionDAO.glossaryDAO()); - updateFQNHashForEntity(handle, GlossaryTerm.class, collectionDAO.glossaryTermDAO()); - updateFQNHashForEntity(handle, Tag.class, collectionDAO.tagDAO()); + updateFQNHashForEntity(handle, Classification.class, collectionDAO.classificationDAO(), limitParam); + updateFQNHashForEntity(handle, Glossary.class, collectionDAO.glossaryDAO(), limitParam); + updateFQNHashForEntity(handle, GlossaryTerm.class, collectionDAO.glossaryTermDAO(), limitParam); + updateFQNHashForEntity(handle, Tag.class, collectionDAO.tagDAO(), limitParam); // Update DataInsights - updateFQNHashForEntity(handle, DataInsightChart.class, collectionDAO.dataInsightChartDAO()); - updateFQNHashForEntity(handle, Kpi.class, collectionDAO.kpiDAO()); + updateFQNHashForEntity(handle, DataInsightChart.class, collectionDAO.dataInsightChartDAO(), limitParam); + updateFQNHashForEntity(handle, Kpi.class, collectionDAO.kpiDAO(), limitParam); // Update DQ - updateFQNHashForEntity(handle, TestCase.class, collectionDAO.testCaseDAO()); - updateFQNHashForEntity(handle, TestConnectionDefinition.class, collectionDAO.testConnectionDefinitionDAO()); - updateFQNHashForEntity(handle, TestDefinition.class, collectionDAO.testDefinitionDAO()); - updateFQNHashForEntity(handle, TestSuite.class, collectionDAO.testSuiteDAO()); + updateFQNHashForEntity(handle, TestCase.class, collectionDAO.testCaseDAO(), limitParam); + updateFQNHashForEntity( + handle, TestConnectionDefinition.class, collectionDAO.testConnectionDefinitionDAO(), limitParam); + updateFQNHashForEntity(handle, TestDefinition.class, collectionDAO.testDefinitionDAO(), limitParam); + updateFQNHashForEntity(handle, TestSuite.class, collectionDAO.testSuiteDAO(), limitParam); // Update Misc - updateFQNHashForEntity(handle, Policy.class, collectionDAO.policyDAO()); - updateFQNHashForEntity(handle, EventSubscription.class, collectionDAO.eventSubscriptionDAO()); - updateFQNHashForEntity(handle, Role.class, collectionDAO.roleDAO()); - updateFQNHashForEntity(handle, Type.class, collectionDAO.typeEntityDAO()); - updateFQNHashForEntity(handle, WebAnalyticEvent.class, collectionDAO.webAnalyticEventDAO()); - updateFQNHashForEntity(handle, Workflow.class, collectionDAO.workflowDAO()); + updateFQNHashForEntity(handle, Policy.class, collectionDAO.policyDAO(), limitParam); + updateFQNHashForEntity(handle, EventSubscription.class, collectionDAO.eventSubscriptionDAO(), limitParam); + updateFQNHashForEntity(handle, Role.class, collectionDAO.roleDAO(), limitParam); + updateFQNHashForEntity(handle, Type.class, collectionDAO.typeEntityDAO(), limitParam); + updateFQNHashForEntity(handle, WebAnalyticEvent.class, collectionDAO.webAnalyticEventDAO(), limitParam); + updateFQNHashForEntity(handle, Workflow.class, collectionDAO.workflowDAO(), limitParam); // Field Relationship - updateFQNHashForFieldRelationship(collectionDAO); + if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) { + updateFQNHashForFieldRelationship(handle, MYSQL_FIELD_RELATIONSHIP_UPDATE, collectionDAO, limitParam); + } else { + updateFQNHashForFieldRelationship(handle, POSTGRES_FIELD_RELATIONSHIP_UPDATE, collectionDAO, limitParam); + } // TimeSeries if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) { - updateFQNHashEntityExtensionTimeSeries(handle, MYSQL_ENTITY_EXTENSION_TIME_SERIES_UPDATE, collectionDAO); + updateFQNHashEntityExtensionTimeSeries( + handle, MYSQL_ENTITY_EXTENSION_TIME_SERIES_UPDATE, collectionDAO, limitParam); } else { - updateFQNHashEntityExtensionTimeSeries(handle, POSTGRES_ENTITY_EXTENSION_TIME_SERIES_UPDATE, collectionDAO); + updateFQNHashEntityExtensionTimeSeries( + handle, POSTGRES_ENTITY_EXTENSION_TIME_SERIES_UPDATE, collectionDAO, limitParam); } // Tag Usage updateFQNHashTagUsage(collectionDAO); } - private static void updateFQNHashForFieldRelationship(CollectionDAO collectionDAO) { + private static void updateFQNHashForFieldRelationship( + Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) { LOG.debug("Starting Migration for Field Relationship"); - int limitParam = 200; int offset = 0; - int totalCount = collectionDAO.fieldRelationshipDAO().listCount(); - while (offset < totalCount) { - List fieldRelationships = - collectionDAO.fieldRelationshipDAO().listWithOffset(limitParam, offset); - for (CollectionDAO.FieldRelationshipDAO.FieldRelationship fieldRelationship : fieldRelationships) { - if (CommonUtil.nullOrEmpty(fieldRelationship.getFromFQNHash()) - && CommonUtil.nullOrEmpty(fieldRelationship.getToFQNHash())) { - String fromFQNHash = ""; - String toFQNHash = ""; + int totalCount; + try { + // This might result into exceptions if the column entityFQN is dropped once + totalCount = collectionDAO.fieldRelationshipDAO().listDistinctCount(); + } catch (Exception ex) { + return; + } + if (totalCount > 0) { + while (offset < totalCount) { + PreparedBatch upsertBatch = handle.prepareBatch(updateSql); + List> entityFQNPairList = + collectionDAO.fieldRelationshipDAO().listDistinctWithOffset(limitParam, offset); + for (Pair entityFQNPair : entityFQNPairList) { try { - fromFQNHash = FullyQualifiedName.buildHash(fieldRelationship.getFromFQN()); - toFQNHash = FullyQualifiedName.buildHash(fieldRelationship.getToFQN()); + String fromFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getLeft()); + String toFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getRight()); + upsertBatch + .bind("fromFQNHash", fromFQNHash) + .bind("toFQNHash", toFQNHash) + .bind("fromFQN", entityFQNPair.getLeft()) + .bind("toFQN", entityFQNPair.getRight()) + .add(); } catch (Exception ex) { - LOG.error("Failed in creating FromFQNHash : {} , toFQNHash : {}", fromFQNHash, toFQNHash, ex); - // Update further rows - continue; + LOG.error( + "Failed in creating fromFQN : {} , toFQN : {}", entityFQNPair.getLeft(), entityFQNPair.getRight(), ex); } - collectionDAO - .fieldRelationshipDAO() - .upsertFQNHash( - fromFQNHash, - toFQNHash, - fieldRelationship.getFromFQN(), - fieldRelationship.getToFQN(), - fieldRelationship.getFromType(), - fieldRelationship.getToType(), - fieldRelationship.getRelation(), - fieldRelationship.getJsonSchema(), - fieldRelationship.getJson()); } + upsertBatch.execute(); + offset = offset + limitParam; } - offset = offset + limitParam; } LOG.debug("End Migration for Field Relationship"); } private static void updateFQNHashEntityExtensionTimeSeries( - Handle handle, String updateSql, CollectionDAO collectionDAO) { + Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) { LOG.debug("Starting Migration for Entity Extension Time Series"); - int limitParam = 1000; int offset = 0; - int totalCount = collectionDAO.entityExtensionTimeSeriesDao().listAllCount(); - PreparedBatch upsertBatch = handle.prepareBatch(updateSql); - while (offset < totalCount) { - List timeSeriesTables = - collectionDAO.entityExtensionTimeSeriesDao().listWithOffset(limitParam, offset); - - for (CollectionDAO.EntityExtensionTimeSeriesDAO.EntityExtensionTimeSeriesTable timeSeries : timeSeriesTables) { - if (CommonUtil.nullOrEmpty(timeSeries.getEntityFQNHash())) { - String entityFQN = ""; + int totalCount; + try { + // This might result into exceptions if the column entityFQN is dropped once + totalCount = collectionDAO.entityExtensionTimeSeriesDao().listDistinctCount(); + } catch (Exception ex) { + return; + } + if (totalCount > 0) { + while (offset < totalCount) { + PreparedBatch upsertBatch = handle.prepareBatch(updateSql); + List entityFQNLists = + collectionDAO.entityExtensionTimeSeriesDao().listDistinctWithOffset(limitParam, offset); + for (String entityFQN : entityFQNLists) { try { - entityFQN = FullyQualifiedName.buildHash(timeSeries.getEntityFQN()); + upsertBatch + .bind("entityFQNHash", FullyQualifiedName.buildHash(entityFQN)) + .bind("entityFQN", entityFQN) + .add(); } catch (Exception ex) { LOG.error("Failed in creating EntityFQN : {}", entityFQN, ex); - // Update further rows - continue; } - upsertBatch - .bind("entityFQNHash", entityFQN) - .bind("entityFQN", timeSeries.getEntityFQN()) - .bind("extension", timeSeries.getExtension()) - .bind("timestamp", timeSeries.getTimestamp()) - .add(); } + upsertBatch.execute(); + offset = offset + limitParam; } - upsertBatch.execute(); - offset = offset + limitParam; } LOG.debug("Ended Migration for Entity Extension Time Series"); } @@ -328,26 +349,22 @@ public class MigrationUtil { List tagLabelMigrationList = collectionDAO.tagUsageDAO().listAll(); for (CollectionDAO.TagUsageDAO.TagLabelMigration tagLabel : tagLabelMigrationList) { if (CommonUtil.nullOrEmpty(tagLabel.getTagFQNHash()) && CommonUtil.nullOrEmpty(tagLabel.getTargetFQNHash())) { - String tagFQNHash = ""; - String targetFQNHash = ""; try { - tagFQNHash = FullyQualifiedName.buildHash(tagLabel.getTagFQN()); - targetFQNHash = FullyQualifiedName.buildHash(tagLabel.getTargetFQN()); + String tagFQNHash = FullyQualifiedName.buildHash(tagLabel.getTagFQN()); + String targetFQNHash = FullyQualifiedName.buildHash(tagLabel.getTargetFQN()); + collectionDAO + .tagUsageDAO() + .upsertFQNHash( + tagLabel.getSource(), + tagLabel.getTagFQN(), + tagFQNHash, + targetFQNHash, + tagLabel.getLabelType(), + tagLabel.getState(), + tagLabel.getTargetFQN()); } catch (Exception ex) { - LOG.error("Failed in creating tagFQNHash : {}, targetFQNHash: {}", tagFQNHash, targetFQNHash, ex); - // Update further rows - continue; + LOG.error("Failed in creating tagFQN : {}, targetFQN: {}", tagLabel.getTagFQN(), tagLabel.getTargetFQN(), ex); } - collectionDAO - .tagUsageDAO() - .upsertFQNHash( - tagLabel.getSource(), - tagLabel.getTagFQN(), - tagFQNHash, - targetFQNHash, - tagLabel.getLabelType(), - tagLabel.getState(), - tagLabel.getTargetFQN()); } } LOG.debug("Ended Migration for Tag Usage"); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/mysql/v110/MySQLMigration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/mysql/v110/MySQLMigration.java index b29023be1dc..b6e033301ce 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/mysql/v110/MySQLMigration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/mysql/v110/MySQLMigration.java @@ -58,7 +58,12 @@ public class MySQLMigration implements MigrationStep { @Override public void runDataMigration() { // FQN Hashing Migrations - dataMigrationFQNHashing(handle, collectionDAO); + String envVariableValue = System.getenv("MIGRATION_LIMIT_PARAM"); + if (envVariableValue != null) { + dataMigrationFQNHashing(handle, collectionDAO, Integer.parseInt(envVariableValue)); + } else { + dataMigrationFQNHashing(handle, collectionDAO, 1000); + } } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/postgres/v110/PostgresMigration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/postgres/v110/PostgresMigration.java index 8337d53885d..7014dea05e7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/postgres/v110/PostgresMigration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/versions/postgres/v110/PostgresMigration.java @@ -58,7 +58,12 @@ public class PostgresMigration implements MigrationStep { @Override public void runDataMigration() { - dataMigrationFQNHashing(handle, collectionDAO); + String envVariableValue = System.getenv("MIGRATION_LIMIT_PARAM"); + if (envVariableValue != null) { + dataMigrationFQNHashing(handle, collectionDAO, Integer.parseInt(envVariableValue)); + } else { + dataMigrationFQNHashing(handle, collectionDAO, 1000); + } } @Override