mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-09 23:40:05 +00:00
Update SQL queries for reading and comming migration
This commit is contained in:
parent
410cb984a2
commit
5af1bc4a55
@ -1264,9 +1264,10 @@ public interface CollectionDAO {
|
|||||||
int listDistinctCount();
|
int listDistinctCount();
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@SqlQuery("SELECT DISTINCT fromFQN, toFQN FROM field_relationship LIMIT :limit OFFSET :offset")
|
@SqlQuery(
|
||||||
|
"SELECT DISTINCT fromFQN, toFQN FROM field_relationship WHERE fromFQNHash = '' or fromFQNHash is null or toFQNHash = '' or toFQNHash is null LIMIT :limit")
|
||||||
@RegisterRowMapper(FieldRelationShipMapper.class)
|
@RegisterRowMapper(FieldRelationShipMapper.class)
|
||||||
List<Pair<String, String>> listDistinctWithOffset(@Bind("limit") int limit, @Bind("offset") int offset);
|
List<Pair<String, String>> migrationListDistinctWithOffset(@Bind("limit") int limit);
|
||||||
|
|
||||||
@SqlQuery(
|
@SqlQuery(
|
||||||
"SELECT fromFQN, toFQN, json FROM field_relationship WHERE "
|
"SELECT fromFQN, toFQN, json FROM field_relationship WHERE "
|
||||||
@ -3404,9 +3405,10 @@ public interface CollectionDAO {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SqlQuery("SELECT DISTINCT entityFQN FROM entity_extension_time_series LIMIT :limit OFFSET :offset")
|
@SqlQuery(
|
||||||
|
"SELECT DISTINCT entityFQN FROM entity_extension_time_series WHERE entityFQNHash = '' or entityFQNHash is null LIMIT :limit")
|
||||||
@Deprecated
|
@Deprecated
|
||||||
List<String> listDistinctWithOffset(@Bind("limit") int limit, @Bind("offset") int offset);
|
List<String> migrationListDistinctWithOffset(@Bind("limit") int limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
class EntitiesCountRowMapper implements RowMapper<EntitiesCount> {
|
class EntitiesCountRowMapper implements RowMapper<EntitiesCount> {
|
||||||
|
|||||||
@ -227,6 +227,10 @@ public interface EntityDAO<T extends EntityInterface> {
|
|||||||
@SqlQuery("SELECT json FROM <table> LIMIT :limit OFFSET :offset")
|
@SqlQuery("SELECT json FROM <table> LIMIT :limit OFFSET :offset")
|
||||||
List<String> listAfterWithOffset(@Define("table") String table, @Bind("limit") int limit, @Bind("offset") int offset);
|
List<String> listAfterWithOffset(@Define("table") String table, @Bind("limit") int limit, @Bind("offset") int offset);
|
||||||
|
|
||||||
|
@SqlQuery("SELECT json FROM <table> WHERE <nameHashColumn> = '' or <nameHashColumn> is null LIMIT :limit")
|
||||||
|
List<String> migrationListAfterWithOffset(
|
||||||
|
@Define("table") String table, @Define("nameHashColumn") String nameHashColumnName, @Bind("limit") int limit);
|
||||||
|
|
||||||
@SqlQuery("SELECT json FROM <table> <cond> AND " + "ORDER BY <nameColumn> " + "LIMIT :limit " + "OFFSET :offset")
|
@SqlQuery("SELECT json FROM <table> <cond> AND " + "ORDER BY <nameColumn> " + "LIMIT :limit " + "OFFSET :offset")
|
||||||
List<String> listAfter(
|
List<String> listAfter(
|
||||||
@Define("table") String table,
|
@Define("table") String table,
|
||||||
@ -351,6 +355,11 @@ public interface EntityDAO<T extends EntityInterface> {
|
|||||||
return listAfterWithOffset(getTableName(), limit, offset);
|
return listAfterWithOffset(getTableName(), limit, offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default List<String> migrationListAfterWithOffset(int limit) {
|
||||||
|
// No ordering
|
||||||
|
return migrationListAfterWithOffset(getTableName(), getNameHashColumn(), limit);
|
||||||
|
}
|
||||||
|
|
||||||
default List<String> listAfter(ListFilter filter, int limit, int offset) {
|
default List<String> listAfter(ListFilter filter, int limit, int offset) {
|
||||||
return listAfter(getTableName(), getNameHashColumn(), filter.getCondition(), limit, offset);
|
return listAfter(getTableName(), getNameHashColumn(), filter.getCondition(), limit, offset);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -147,13 +147,14 @@ public class MigrationUtil {
|
|||||||
Handle handle, String updateSql, Class<T> clazz, EntityDAO<T> dao, boolean withName, int limitParam)
|
Handle handle, String updateSql, Class<T> clazz, EntityDAO<T> dao, boolean withName, int limitParam)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.debug("Starting Migration for table : {}", dao.getTableName());
|
LOG.debug("Starting Migration for table : {}", dao.getTableName());
|
||||||
int offset = 0;
|
while (true) {
|
||||||
int totalCount = dao.listTotalCount();
|
|
||||||
while (offset < totalCount) {
|
|
||||||
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
|
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
|
||||||
// Read from Database
|
// Read from Database
|
||||||
List<String> jsons = dao.listAfterWithOffset(limitParam, offset);
|
List<String> jsons = dao.migrationListAfterWithOffset(limitParam);
|
||||||
offset = offset + limitParam;
|
LOG.debug("[{}]Read a Batch of Size: {}", dao.getTableName(), jsons.size());
|
||||||
|
if (jsons.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
// Process Update
|
// Process Update
|
||||||
for (String json : jsons) {
|
for (String json : jsons) {
|
||||||
// Update the Statements to Database
|
// Update the Statements to Database
|
||||||
@ -168,6 +169,7 @@ public class MigrationUtil {
|
|||||||
LOG.error("Failed in creating FQN Hash for Entity Name : {}", entity.getFullyQualifiedName(), ex);
|
LOG.error("Failed in creating FQN Hash for Entity Name : {}", entity.getFullyQualifiedName(), ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.debug("[{}]Committing a Batch of Size: {}", dao.getTableName(), jsons.size());
|
||||||
upsertBatch.execute();
|
upsertBatch.execute();
|
||||||
}
|
}
|
||||||
LOG.debug("End Migration for table : {}", dao.getTableName());
|
LOG.debug("End Migration for table : {}", dao.getTableName());
|
||||||
@ -276,19 +278,14 @@ public class MigrationUtil {
|
|||||||
private static void updateFQNHashForFieldRelationship(
|
private static void updateFQNHashForFieldRelationship(
|
||||||
Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) {
|
Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) {
|
||||||
LOG.debug("Starting Migration for Field Relationship");
|
LOG.debug("Starting Migration for Field Relationship");
|
||||||
int offset = 0;
|
while (true) {
|
||||||
int totalCount;
|
|
||||||
try {
|
|
||||||
// This might result into exceptions if the column entityFQN is dropped once
|
|
||||||
totalCount = collectionDAO.fieldRelationshipDAO().listDistinctCount();
|
|
||||||
} catch (Exception ex) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (totalCount > 0) {
|
|
||||||
while (offset < totalCount) {
|
|
||||||
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
|
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
|
||||||
List<Pair<String, String>> entityFQNPairList =
|
List<Pair<String, String>> entityFQNPairList =
|
||||||
collectionDAO.fieldRelationshipDAO().listDistinctWithOffset(limitParam, offset);
|
collectionDAO.fieldRelationshipDAO().migrationListDistinctWithOffset(limitParam);
|
||||||
|
LOG.debug("[FieldRelationship] Read a Batch of Size: {}", entityFQNPairList.size());
|
||||||
|
if (entityFQNPairList.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
for (Pair<String, String> entityFQNPair : entityFQNPairList) {
|
for (Pair<String, String> entityFQNPair : entityFQNPairList) {
|
||||||
try {
|
try {
|
||||||
String fromFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getLeft());
|
String fromFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getLeft());
|
||||||
@ -304,9 +301,8 @@ public class MigrationUtil {
|
|||||||
"Failed in creating fromFQN : {} , toFQN : {}", entityFQNPair.getLeft(), entityFQNPair.getRight(), ex);
|
"Failed in creating fromFQN : {} , toFQN : {}", entityFQNPair.getLeft(), entityFQNPair.getRight(), ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.debug("[FieldRelationship] Committing a Batch of Size: {}", entityFQNPairList.size());
|
||||||
upsertBatch.execute();
|
upsertBatch.execute();
|
||||||
offset = offset + limitParam;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
LOG.debug("End Migration for Field Relationship");
|
LOG.debug("End Migration for Field Relationship");
|
||||||
}
|
}
|
||||||
@ -314,32 +310,23 @@ public class MigrationUtil {
|
|||||||
private static void updateFQNHashEntityExtensionTimeSeries(
|
private static void updateFQNHashEntityExtensionTimeSeries(
|
||||||
Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) {
|
Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) {
|
||||||
LOG.debug("Starting Migration for Entity Extension Time Series");
|
LOG.debug("Starting Migration for Entity Extension Time Series");
|
||||||
int offset = 0;
|
while (true) {
|
||||||
int totalCount;
|
|
||||||
try {
|
|
||||||
// This might result into exceptions if the column entityFQN is dropped once
|
|
||||||
totalCount = collectionDAO.entityExtensionTimeSeriesDao().listDistinctCount();
|
|
||||||
} catch (Exception ex) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (totalCount > 0) {
|
|
||||||
while (offset < totalCount) {
|
|
||||||
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
|
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
|
||||||
List<String> entityFQNLists =
|
List<String> entityFQNLists =
|
||||||
collectionDAO.entityExtensionTimeSeriesDao().listDistinctWithOffset(limitParam, offset);
|
collectionDAO.entityExtensionTimeSeriesDao().migrationListDistinctWithOffset(limitParam);
|
||||||
|
LOG.debug("[TimeSeries] Read a Batch of Size: {}", entityFQNLists.size());
|
||||||
|
if (entityFQNLists.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
for (String entityFQN : entityFQNLists) {
|
for (String entityFQN : entityFQNLists) {
|
||||||
try {
|
try {
|
||||||
upsertBatch
|
upsertBatch.bind("entityFQNHash", FullyQualifiedName.buildHash(entityFQN)).bind("entityFQN", entityFQN).add();
|
||||||
.bind("entityFQNHash", FullyQualifiedName.buildHash(entityFQN))
|
|
||||||
.bind("entityFQN", entityFQN)
|
|
||||||
.add();
|
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
LOG.error("Failed in creating EntityFQN : {}", entityFQN, ex);
|
LOG.error("Failed in creating EntityFQN : {}", entityFQN, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.debug("[TimeSeries] Committing a Batch of Size: {}", entityFQNLists.size());
|
||||||
upsertBatch.execute();
|
upsertBatch.execute();
|
||||||
offset = offset + limitParam;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
LOG.debug("Ended Migration for Entity Extension Time Series");
|
LOG.debug("Ended Migration for Entity Extension Time Series");
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user