Optimize db calls for entity_extension and field_relationship table (#12543)

* Optimize db calls for entity_extension and field_relationship table

* change count sql for postgres
This commit is contained in:
Mohit Yadav 2023-07-21 15:23:37 +05:30 committed by GitHub
parent 2a03cb0c7c
commit 977e13d00c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 179 additions and 183 deletions

View File

@ -35,6 +35,7 @@ import java.util.stream.Collectors;
import lombok.Builder; import lombok.Builder;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple; import org.apache.commons.lang3.tuple.Triple;
import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext; import org.jdbi.v3.core.statement.StatementContext;
@ -1253,12 +1254,19 @@ public interface CollectionDAO {
@Bind("toType") String toType, @Bind("toType") String toType,
@Bind("relation") int relation); @Bind("relation") int relation);
@SqlQuery("SELECT count(*) FROM field_relationship") @Deprecated
int listCount(); @ConnectionAwareSqlQuery(
value = "SELECT count(DISTINCT fromFQN, toFQN) FROM field_relationship",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value = "SELECT COUNT(*) FROM ( SELECT DISTINCT fromFQN, toFQN FROM field_relationship) AS subquery",
connectionType = POSTGRES)
int listDistinctCount();
@SqlQuery("SELECT * FROM field_relationship LIMIT :limit OFFSET :offset") @Deprecated
@SqlQuery("SELECT DISTINCT fromFQN, toFQN FROM field_relationship LIMIT :limit OFFSET :offset")
@RegisterRowMapper(FieldRelationShipMapper.class) @RegisterRowMapper(FieldRelationShipMapper.class)
List<FieldRelationship> listWithOffset(@Bind("limit") int limit, @Bind("offset") int offset); List<Pair<String, String>> listDistinctWithOffset(@Bind("limit") int limit, @Bind("offset") int offset);
@SqlQuery( @SqlQuery(
"SELECT fromFQN, toFQN, json FROM field_relationship WHERE " "SELECT fromFQN, toFQN, json FROM field_relationship WHERE "
@ -1314,20 +1322,10 @@ public interface CollectionDAO {
} }
} }
class FieldRelationShipMapper implements RowMapper<FieldRelationship> { class FieldRelationShipMapper implements RowMapper<Pair<String, String>> {
@Override @Override
public FieldRelationship map(ResultSet rs, StatementContext ctx) throws SQLException { public Pair<String, String> map(ResultSet rs, StatementContext ctx) throws SQLException {
FieldRelationship result = new FieldRelationship(); return Pair.of(rs.getString("fromFQN"), rs.getString("toFQN"));
result.setFromFQNHash(rs.getString("fromFQNHash"));
result.setToFQNHash(rs.getString("toFQNHash"));
result.setFromFQN(rs.getString("fromFQN"));
result.setToFQN(rs.getString("toFQN"));
result.setFromType(rs.getString("fromType"));
result.setToType(rs.getString("toType"));
result.setRelation(rs.getInt("relation"));
result.setJsonSchema(rs.getString("jsonSchema"));
result.setJson(rs.getString("json"));
return result;
} }
} }
@ -3151,11 +3149,12 @@ public interface CollectionDAO {
"SELECT json FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension") "SELECT json FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension")
String getExtension(@Bind("entityFQNHash") String entityId, @Bind("extension") String extension); String getExtension(@Bind("entityFQNHash") String entityId, @Bind("extension") String extension);
@SqlQuery("SELECT count(*) FROM entity_extension_time_series WHERE EntityFQNHash = :entityFQNHash") @SqlQuery("SELECT count(*) FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash")
int listCount(@Bind("entityFQNHash") String entityFQNHash); int listCount(@Bind("entityFQNHash") String entityFQNHash);
@SqlQuery("SELECT count(*) FROM entity_extension_time_series") @SqlQuery("SELECT COUNT(DISTINCT entityFQN) FROM entity_extension_time_series")
int listAllCount(); @Deprecated
int listDistinctCount();
@ConnectionAwareSqlQuery( @ConnectionAwareSqlQuery(
value = value =
@ -3405,39 +3404,9 @@ public interface CollectionDAO {
} }
} }
@SqlQuery("select * from entity_extension_time_series LIMIT :limit OFFSET :offset") @SqlQuery("SELECT DISTINCT entityFQN FROM entity_extension_time_series LIMIT :limit OFFSET :offset")
@RegisterRowMapper(EntityExtensionTimeSeries.class) @Deprecated
List<EntityExtensionTimeSeriesTable> listWithOffset(@Bind("limit") int limit, @Bind("offset") int offset); List<String> listDistinctWithOffset(@Bind("limit") int limit, @Bind("offset") int offset);
@Getter
@Setter
class EntityExtensionTimeSeriesTable {
private String entityFQN;
private String extension;
private String jsonSchema;
private String json;
private long timestamp;
private String entityFQNHash;
}
class EntityExtensionTimeSeries implements RowMapper<EntityExtensionTimeSeriesTable> {
@Override
public EntityExtensionTimeSeriesTable map(ResultSet rs, StatementContext ctx) throws SQLException {
EntityExtensionTimeSeriesTable result = new EntityExtensionTimeSeriesTable();
// TODO : Ugly , after migration this is removed
try {
result.setEntityFQN(rs.getString("entityFQN"));
} catch (Exception ex) {
// Nothing
}
result.setExtension(rs.getString("extension"));
result.setJsonSchema(rs.getString("jsonSchema"));
result.setJson(rs.getString("json"));
result.setTimestamp(rs.getLong("timestamp"));
result.setEntityFQNHash(rs.getString("entityFQNHash"));
return result;
}
}
} }
class EntitiesCountRowMapper implements RowMapper<EntitiesCount> { class EntitiesCountRowMapper implements RowMapper<EntitiesCount> {

View File

@ -12,6 +12,7 @@ import java.util.Set;
import java.util.UUID; import java.util.UUID;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.statement.PreparedBatch; import org.jdbi.v3.core.statement.PreparedBatch;
import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.common.utils.CommonUtil;
@ -88,44 +89,64 @@ public class MigrationUtil {
private static final String MYSQL_ENTITY_UPDATE = "UPDATE %s SET %s = :nameHashColumnValue WHERE id = :id"; private static final String MYSQL_ENTITY_UPDATE = "UPDATE %s SET %s = :nameHashColumnValue WHERE id = :id";
private static final String POSTGRES_ENTITY_UPDATE = "UPDATE %s SET %s = :nameHashColumnValue WHERE id = :id"; private static final String POSTGRES_ENTITY_UPDATE = "UPDATE %s SET %s = :nameHashColumnValue WHERE id = :id";
private static final String MYSQL_ENTITY_EXTENSION_TIME_SERIES_UPDATE = private static final String MYSQL_ENTITY_EXTENSION_TIME_SERIES_UPDATE =
"UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp"; "UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN";
private static final String POSTGRES_ENTITY_EXTENSION_TIME_SERIES_UPDATE = private static final String POSTGRES_ENTITY_EXTENSION_TIME_SERIES_UPDATE =
"UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp"; "UPDATE entity_extension_time_series set entityFQNHash = :entityFQNHash where entityFQN=:entityFQN";
private static final String MYSQL_FIELD_RELATIONSHIP_UPDATE =
"UPDATE field_relationship SET fromFQNHash = :fromFQNHash, toFQNHash = :toFQNHash where fromFQN= :fromFQN and toFQN = :toFQN";
private static final String POSTGRES_FIELD_RELATIONSHIP_UPDATE =
"UPDATE field_relationship SET fromFQNHash = :fromFQNHash, toFQNHash = :toFQNHash where fromFQN= :fromFQN and toFQN = :toFQN";
@SneakyThrows @SneakyThrows
public static <T extends EntityInterface> void updateFQNHashForEntity( public static <T extends EntityInterface> void updateFQNHashForEntity(
Handle handle, Class<T> clazz, EntityDAO<T> dao) { Handle handle, Class<T> clazz, EntityDAO<T> dao, int limitParam) {
if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) { if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) {
readAndProcessEntity( readAndProcessEntity(
handle, String.format(MYSQL_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), clazz, dao, false); handle,
String.format(MYSQL_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()),
clazz,
dao,
false,
limitParam);
} else { } else {
readAndProcessEntity( readAndProcessEntity(
handle, handle,
String.format(POSTGRES_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), String.format(POSTGRES_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()),
clazz, clazz,
dao, dao,
false); false,
limitParam);
} }
} }
@SneakyThrows @SneakyThrows
public static <T extends EntityInterface> void updateFQNHashForEntityWithName( public static <T extends EntityInterface> void updateFQNHashForEntityWithName(
Handle handle, Class<T> clazz, EntityDAO<T> dao) { Handle handle, Class<T> clazz, EntityDAO<T> dao, int limitParam) {
if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) { if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) {
readAndProcessEntity( readAndProcessEntity(
handle, String.format(MYSQL_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), clazz, dao, true); handle,
String.format(MYSQL_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()),
clazz,
dao,
true,
limitParam);
} else { } else {
readAndProcessEntity( readAndProcessEntity(
handle, String.format(POSTGRES_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()), clazz, dao, true); handle,
String.format(POSTGRES_ENTITY_UPDATE, dao.getTableName(), dao.getNameHashColumn()),
clazz,
dao,
true,
limitParam);
} }
} }
public static <T extends EntityInterface> void readAndProcessEntity( public static <T extends EntityInterface> void readAndProcessEntity(
Handle handle, String updateSql, Class<T> clazz, EntityDAO<T> dao, boolean withName) throws IOException { Handle handle, String updateSql, Class<T> clazz, EntityDAO<T> dao, boolean withName, int limitParam)
throws IOException {
LOG.debug("Starting Migration for table : {}", dao.getTableName()); LOG.debug("Starting Migration for table : {}", dao.getTableName());
int limitParam = 1000;
int offset = 0; int offset = 0;
int totalCount = dao.listTotalCount(); int totalCount = dao.listTotalCount();
while (offset < totalCount) { while (offset < totalCount) {
@ -137,18 +158,15 @@ public class MigrationUtil {
for (String json : jsons) { for (String json : jsons) {
// Update the Statements to Database // Update the Statements to Database
T entity = JsonUtils.readValue(json, clazz); T entity = JsonUtils.readValue(json, clazz);
String hash;
try { try {
hash = String hash =
withName withName
? FullyQualifiedName.buildHash(EntityInterfaceUtil.quoteName(entity.getFullyQualifiedName())) ? FullyQualifiedName.buildHash(EntityInterfaceUtil.quoteName(entity.getFullyQualifiedName()))
: FullyQualifiedName.buildHash(entity.getFullyQualifiedName()); : FullyQualifiedName.buildHash(entity.getFullyQualifiedName());
upsertBatch.bind("nameHashColumnValue", hash).bind("id", entity.getId().toString()).add();
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Failed in creating FQN Hash for Entity Name : {}", entity.getFullyQualifiedName(), ex); LOG.error("Failed in creating FQN Hash for Entity Name : {}", entity.getFullyQualifiedName(), ex);
// Continue to update further jsons
continue;
} }
upsertBatch.bind("nameHashColumnValue", hash).bind("id", entity.getId().toString()).add();
} }
upsertBatch.execute(); upsertBatch.execute();
} }
@ -177,148 +195,151 @@ public class MigrationUtil {
return result; return result;
} }
public static void dataMigrationFQNHashing(Handle handle, CollectionDAO collectionDAO) { public static void dataMigrationFQNHashing(Handle handle, CollectionDAO collectionDAO, int limitParam) {
// Migration for Entities with Name as their FQN // Migration for Entities with Name as their FQN
// We need to quote the FQN, if these entities have "." in their name we are storing it as it is // We need to quote the FQN, if these entities have "." in their name we are storing it as it is
// into the FQN field. // into the FQN field.
updateFQNHashForEntityWithName(handle, Bot.class, collectionDAO.botDAO()); updateFQNHashForEntityWithName(handle, Bot.class, collectionDAO.botDAO(), limitParam);
updateFQNHashForEntityWithName(handle, User.class, collectionDAO.userDAO()); updateFQNHashForEntityWithName(handle, User.class, collectionDAO.userDAO(), limitParam);
updateFQNHashForEntityWithName(handle, Team.class, collectionDAO.teamDAO()); updateFQNHashForEntityWithName(handle, Team.class, collectionDAO.teamDAO(), limitParam);
// Update all the services // Update all the services
updateFQNHashForEntityWithName(handle, DatabaseService.class, collectionDAO.dbServiceDAO()); updateFQNHashForEntityWithName(handle, DatabaseService.class, collectionDAO.dbServiceDAO(), limitParam);
updateFQNHashForEntityWithName(handle, DashboardService.class, collectionDAO.dashboardServiceDAO()); updateFQNHashForEntityWithName(handle, DashboardService.class, collectionDAO.dashboardServiceDAO(), limitParam);
updateFQNHashForEntityWithName(handle, MessagingService.class, collectionDAO.messagingServiceDAO()); updateFQNHashForEntityWithName(handle, MessagingService.class, collectionDAO.messagingServiceDAO(), limitParam);
updateFQNHashForEntityWithName(handle, MetadataService.class, collectionDAO.metadataServiceDAO()); updateFQNHashForEntityWithName(handle, MetadataService.class, collectionDAO.metadataServiceDAO(), limitParam);
updateFQNHashForEntityWithName(handle, MlModelService.class, collectionDAO.mlModelServiceDAO()); updateFQNHashForEntityWithName(handle, MlModelService.class, collectionDAO.mlModelServiceDAO(), limitParam);
updateFQNHashForEntityWithName(handle, StorageService.class, collectionDAO.storageServiceDAO()); updateFQNHashForEntityWithName(handle, StorageService.class, collectionDAO.storageServiceDAO(), limitParam);
updateFQNHashForEntityWithName(handle, PipelineService.class, collectionDAO.pipelineServiceDAO()); updateFQNHashForEntityWithName(handle, PipelineService.class, collectionDAO.pipelineServiceDAO(), limitParam);
updateFQNHashForEntity(handle, IngestionPipeline.class, collectionDAO.ingestionPipelineDAO()); updateFQNHashForEntity(handle, IngestionPipeline.class, collectionDAO.ingestionPipelineDAO(), limitParam);
// Update Entities // Update Entities
updateFQNHashForEntity(handle, Database.class, collectionDAO.databaseDAO()); updateFQNHashForEntity(handle, Database.class, collectionDAO.databaseDAO(), limitParam);
updateFQNHashForEntity(handle, DatabaseSchema.class, collectionDAO.databaseSchemaDAO()); updateFQNHashForEntity(handle, DatabaseSchema.class, collectionDAO.databaseSchemaDAO(), limitParam);
updateFQNHashForEntity(handle, Table.class, collectionDAO.tableDAO()); updateFQNHashForEntity(handle, Table.class, collectionDAO.tableDAO(), limitParam);
updateFQNHashForEntity(handle, Query.class, collectionDAO.queryDAO()); updateFQNHashForEntity(handle, Query.class, collectionDAO.queryDAO(), limitParam);
updateFQNHashForEntity(handle, Topic.class, collectionDAO.topicDAO()); updateFQNHashForEntity(handle, Topic.class, collectionDAO.topicDAO(), limitParam);
updateFQNHashForEntity(handle, Dashboard.class, collectionDAO.dashboardDAO()); updateFQNHashForEntity(handle, Dashboard.class, collectionDAO.dashboardDAO(), limitParam);
updateFQNHashForEntity(handle, DashboardDataModel.class, collectionDAO.dashboardDataModelDAO()); updateFQNHashForEntity(handle, DashboardDataModel.class, collectionDAO.dashboardDataModelDAO(), limitParam);
updateFQNHashForEntity(handle, Chart.class, collectionDAO.chartDAO()); updateFQNHashForEntity(handle, Chart.class, collectionDAO.chartDAO(), limitParam);
updateFQNHashForEntity(handle, Container.class, collectionDAO.containerDAO()); updateFQNHashForEntity(handle, Container.class, collectionDAO.containerDAO(), limitParam);
updateFQNHashForEntity(handle, MlModel.class, collectionDAO.mlModelDAO()); updateFQNHashForEntity(handle, MlModel.class, collectionDAO.mlModelDAO(), limitParam);
updateFQNHashForEntity(handle, Pipeline.class, collectionDAO.pipelineDAO()); updateFQNHashForEntity(handle, Pipeline.class, collectionDAO.pipelineDAO(), limitParam);
updateFQNHashForEntity(handle, Metrics.class, collectionDAO.metricsDAO()); updateFQNHashForEntity(handle, Metrics.class, collectionDAO.metricsDAO(), limitParam);
updateFQNHashForEntity(handle, Report.class, collectionDAO.reportDAO()); updateFQNHashForEntity(handle, Report.class, collectionDAO.reportDAO(), limitParam);
// Update Glossaries & Classifications // Update Glossaries & Classifications
updateFQNHashForEntity(handle, Classification.class, collectionDAO.classificationDAO()); updateFQNHashForEntity(handle, Classification.class, collectionDAO.classificationDAO(), limitParam);
updateFQNHashForEntity(handle, Glossary.class, collectionDAO.glossaryDAO()); updateFQNHashForEntity(handle, Glossary.class, collectionDAO.glossaryDAO(), limitParam);
updateFQNHashForEntity(handle, GlossaryTerm.class, collectionDAO.glossaryTermDAO()); updateFQNHashForEntity(handle, GlossaryTerm.class, collectionDAO.glossaryTermDAO(), limitParam);
updateFQNHashForEntity(handle, Tag.class, collectionDAO.tagDAO()); updateFQNHashForEntity(handle, Tag.class, collectionDAO.tagDAO(), limitParam);
// Update DataInsights // Update DataInsights
updateFQNHashForEntity(handle, DataInsightChart.class, collectionDAO.dataInsightChartDAO()); updateFQNHashForEntity(handle, DataInsightChart.class, collectionDAO.dataInsightChartDAO(), limitParam);
updateFQNHashForEntity(handle, Kpi.class, collectionDAO.kpiDAO()); updateFQNHashForEntity(handle, Kpi.class, collectionDAO.kpiDAO(), limitParam);
// Update DQ // Update DQ
updateFQNHashForEntity(handle, TestCase.class, collectionDAO.testCaseDAO()); updateFQNHashForEntity(handle, TestCase.class, collectionDAO.testCaseDAO(), limitParam);
updateFQNHashForEntity(handle, TestConnectionDefinition.class, collectionDAO.testConnectionDefinitionDAO()); updateFQNHashForEntity(
updateFQNHashForEntity(handle, TestDefinition.class, collectionDAO.testDefinitionDAO()); handle, TestConnectionDefinition.class, collectionDAO.testConnectionDefinitionDAO(), limitParam);
updateFQNHashForEntity(handle, TestSuite.class, collectionDAO.testSuiteDAO()); updateFQNHashForEntity(handle, TestDefinition.class, collectionDAO.testDefinitionDAO(), limitParam);
updateFQNHashForEntity(handle, TestSuite.class, collectionDAO.testSuiteDAO(), limitParam);
// Update Misc // Update Misc
updateFQNHashForEntity(handle, Policy.class, collectionDAO.policyDAO()); updateFQNHashForEntity(handle, Policy.class, collectionDAO.policyDAO(), limitParam);
updateFQNHashForEntity(handle, EventSubscription.class, collectionDAO.eventSubscriptionDAO()); updateFQNHashForEntity(handle, EventSubscription.class, collectionDAO.eventSubscriptionDAO(), limitParam);
updateFQNHashForEntity(handle, Role.class, collectionDAO.roleDAO()); updateFQNHashForEntity(handle, Role.class, collectionDAO.roleDAO(), limitParam);
updateFQNHashForEntity(handle, Type.class, collectionDAO.typeEntityDAO()); updateFQNHashForEntity(handle, Type.class, collectionDAO.typeEntityDAO(), limitParam);
updateFQNHashForEntity(handle, WebAnalyticEvent.class, collectionDAO.webAnalyticEventDAO()); updateFQNHashForEntity(handle, WebAnalyticEvent.class, collectionDAO.webAnalyticEventDAO(), limitParam);
updateFQNHashForEntity(handle, Workflow.class, collectionDAO.workflowDAO()); updateFQNHashForEntity(handle, Workflow.class, collectionDAO.workflowDAO(), limitParam);
// Field Relationship // Field Relationship
updateFQNHashForFieldRelationship(collectionDAO); if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) {
updateFQNHashForFieldRelationship(handle, MYSQL_FIELD_RELATIONSHIP_UPDATE, collectionDAO, limitParam);
} else {
updateFQNHashForFieldRelationship(handle, POSTGRES_FIELD_RELATIONSHIP_UPDATE, collectionDAO, limitParam);
}
// TimeSeries // TimeSeries
if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) { if (Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL())) {
updateFQNHashEntityExtensionTimeSeries(handle, MYSQL_ENTITY_EXTENSION_TIME_SERIES_UPDATE, collectionDAO); updateFQNHashEntityExtensionTimeSeries(
handle, MYSQL_ENTITY_EXTENSION_TIME_SERIES_UPDATE, collectionDAO, limitParam);
} else { } else {
updateFQNHashEntityExtensionTimeSeries(handle, POSTGRES_ENTITY_EXTENSION_TIME_SERIES_UPDATE, collectionDAO); updateFQNHashEntityExtensionTimeSeries(
handle, POSTGRES_ENTITY_EXTENSION_TIME_SERIES_UPDATE, collectionDAO, limitParam);
} }
// Tag Usage // Tag Usage
updateFQNHashTagUsage(collectionDAO); updateFQNHashTagUsage(collectionDAO);
} }
private static void updateFQNHashForFieldRelationship(CollectionDAO collectionDAO) { private static void updateFQNHashForFieldRelationship(
Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) {
LOG.debug("Starting Migration for Field Relationship"); LOG.debug("Starting Migration for Field Relationship");
int limitParam = 200;
int offset = 0; int offset = 0;
int totalCount = collectionDAO.fieldRelationshipDAO().listCount(); int totalCount;
while (offset < totalCount) { try {
List<CollectionDAO.FieldRelationshipDAO.FieldRelationship> fieldRelationships = // This might result into exceptions if the column entityFQN is dropped once
collectionDAO.fieldRelationshipDAO().listWithOffset(limitParam, offset); totalCount = collectionDAO.fieldRelationshipDAO().listDistinctCount();
for (CollectionDAO.FieldRelationshipDAO.FieldRelationship fieldRelationship : fieldRelationships) { } catch (Exception ex) {
if (CommonUtil.nullOrEmpty(fieldRelationship.getFromFQNHash()) return;
&& CommonUtil.nullOrEmpty(fieldRelationship.getToFQNHash())) { }
String fromFQNHash = ""; if (totalCount > 0) {
String toFQNHash = ""; while (offset < totalCount) {
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
List<Pair<String, String>> entityFQNPairList =
collectionDAO.fieldRelationshipDAO().listDistinctWithOffset(limitParam, offset);
for (Pair<String, String> entityFQNPair : entityFQNPairList) {
try { try {
fromFQNHash = FullyQualifiedName.buildHash(fieldRelationship.getFromFQN()); String fromFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getLeft());
toFQNHash = FullyQualifiedName.buildHash(fieldRelationship.getToFQN()); String toFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getRight());
upsertBatch
.bind("fromFQNHash", fromFQNHash)
.bind("toFQNHash", toFQNHash)
.bind("fromFQN", entityFQNPair.getLeft())
.bind("toFQN", entityFQNPair.getRight())
.add();
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Failed in creating FromFQNHash : {} , toFQNHash : {}", fromFQNHash, toFQNHash, ex); LOG.error(
// Update further rows "Failed in creating fromFQN : {} , toFQN : {}", entityFQNPair.getLeft(), entityFQNPair.getRight(), ex);
continue;
} }
collectionDAO
.fieldRelationshipDAO()
.upsertFQNHash(
fromFQNHash,
toFQNHash,
fieldRelationship.getFromFQN(),
fieldRelationship.getToFQN(),
fieldRelationship.getFromType(),
fieldRelationship.getToType(),
fieldRelationship.getRelation(),
fieldRelationship.getJsonSchema(),
fieldRelationship.getJson());
} }
upsertBatch.execute();
offset = offset + limitParam;
} }
offset = offset + limitParam;
} }
LOG.debug("End Migration for Field Relationship"); LOG.debug("End Migration for Field Relationship");
} }
private static void updateFQNHashEntityExtensionTimeSeries( private static void updateFQNHashEntityExtensionTimeSeries(
Handle handle, String updateSql, CollectionDAO collectionDAO) { Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) {
LOG.debug("Starting Migration for Entity Extension Time Series"); LOG.debug("Starting Migration for Entity Extension Time Series");
int limitParam = 1000;
int offset = 0; int offset = 0;
int totalCount = collectionDAO.entityExtensionTimeSeriesDao().listAllCount(); int totalCount;
PreparedBatch upsertBatch = handle.prepareBatch(updateSql); try {
while (offset < totalCount) { // This might result into exceptions if the column entityFQN is dropped once
List<CollectionDAO.EntityExtensionTimeSeriesDAO.EntityExtensionTimeSeriesTable> timeSeriesTables = totalCount = collectionDAO.entityExtensionTimeSeriesDao().listDistinctCount();
collectionDAO.entityExtensionTimeSeriesDao().listWithOffset(limitParam, offset); } catch (Exception ex) {
return;
for (CollectionDAO.EntityExtensionTimeSeriesDAO.EntityExtensionTimeSeriesTable timeSeries : timeSeriesTables) { }
if (CommonUtil.nullOrEmpty(timeSeries.getEntityFQNHash())) { if (totalCount > 0) {
String entityFQN = ""; while (offset < totalCount) {
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
List<String> entityFQNLists =
collectionDAO.entityExtensionTimeSeriesDao().listDistinctWithOffset(limitParam, offset);
for (String entityFQN : entityFQNLists) {
try { try {
entityFQN = FullyQualifiedName.buildHash(timeSeries.getEntityFQN()); upsertBatch
.bind("entityFQNHash", FullyQualifiedName.buildHash(entityFQN))
.bind("entityFQN", entityFQN)
.add();
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Failed in creating EntityFQN : {}", entityFQN, ex); LOG.error("Failed in creating EntityFQN : {}", entityFQN, ex);
// Update further rows
continue;
} }
upsertBatch
.bind("entityFQNHash", entityFQN)
.bind("entityFQN", timeSeries.getEntityFQN())
.bind("extension", timeSeries.getExtension())
.bind("timestamp", timeSeries.getTimestamp())
.add();
} }
upsertBatch.execute();
offset = offset + limitParam;
} }
upsertBatch.execute();
offset = offset + limitParam;
} }
LOG.debug("Ended Migration for Entity Extension Time Series"); LOG.debug("Ended Migration for Entity Extension Time Series");
} }
@ -328,26 +349,22 @@ public class MigrationUtil {
List<CollectionDAO.TagUsageDAO.TagLabelMigration> tagLabelMigrationList = collectionDAO.tagUsageDAO().listAll(); List<CollectionDAO.TagUsageDAO.TagLabelMigration> tagLabelMigrationList = collectionDAO.tagUsageDAO().listAll();
for (CollectionDAO.TagUsageDAO.TagLabelMigration tagLabel : tagLabelMigrationList) { for (CollectionDAO.TagUsageDAO.TagLabelMigration tagLabel : tagLabelMigrationList) {
if (CommonUtil.nullOrEmpty(tagLabel.getTagFQNHash()) && CommonUtil.nullOrEmpty(tagLabel.getTargetFQNHash())) { if (CommonUtil.nullOrEmpty(tagLabel.getTagFQNHash()) && CommonUtil.nullOrEmpty(tagLabel.getTargetFQNHash())) {
String tagFQNHash = "";
String targetFQNHash = "";
try { try {
tagFQNHash = FullyQualifiedName.buildHash(tagLabel.getTagFQN()); String tagFQNHash = FullyQualifiedName.buildHash(tagLabel.getTagFQN());
targetFQNHash = FullyQualifiedName.buildHash(tagLabel.getTargetFQN()); String targetFQNHash = FullyQualifiedName.buildHash(tagLabel.getTargetFQN());
collectionDAO
.tagUsageDAO()
.upsertFQNHash(
tagLabel.getSource(),
tagLabel.getTagFQN(),
tagFQNHash,
targetFQNHash,
tagLabel.getLabelType(),
tagLabel.getState(),
tagLabel.getTargetFQN());
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Failed in creating tagFQNHash : {}, targetFQNHash: {}", tagFQNHash, targetFQNHash, ex); LOG.error("Failed in creating tagFQN : {}, targetFQN: {}", tagLabel.getTagFQN(), tagLabel.getTargetFQN(), ex);
// Update further rows
continue;
} }
collectionDAO
.tagUsageDAO()
.upsertFQNHash(
tagLabel.getSource(),
tagLabel.getTagFQN(),
tagFQNHash,
targetFQNHash,
tagLabel.getLabelType(),
tagLabel.getState(),
tagLabel.getTargetFQN());
} }
} }
LOG.debug("Ended Migration for Tag Usage"); LOG.debug("Ended Migration for Tag Usage");

View File

@ -58,7 +58,12 @@ public class MySQLMigration implements MigrationStep {
@Override @Override
public void runDataMigration() { public void runDataMigration() {
// FQN Hashing Migrations // FQN Hashing Migrations
dataMigrationFQNHashing(handle, collectionDAO); String envVariableValue = System.getenv("MIGRATION_LIMIT_PARAM");
if (envVariableValue != null) {
dataMigrationFQNHashing(handle, collectionDAO, Integer.parseInt(envVariableValue));
} else {
dataMigrationFQNHashing(handle, collectionDAO, 1000);
}
} }
@Override @Override

View File

@ -58,7 +58,12 @@ public class PostgresMigration implements MigrationStep {
@Override @Override
public void runDataMigration() { public void runDataMigration() {
dataMigrationFQNHashing(handle, collectionDAO); String envVariableValue = System.getenv("MIGRATION_LIMIT_PARAM");
if (envVariableValue != null) {
dataMigrationFQNHashing(handle, collectionDAO, Integer.parseInt(envVariableValue));
} else {
dataMigrationFQNHashing(handle, collectionDAO, 1000);
}
} }
@Override @Override