diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/RestoreIndices.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/RestoreIndices.java index d38685553df..f46bb9b0562 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/RestoreIndices.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/RestoreIndices.java @@ -24,6 +24,7 @@ public class RestoreIndices implements Upgrade { public static final String WRITER_POOL_SIZE = "WRITER_POOL_SIZE"; public static final String URN_ARG_NAME = "urn"; public static final String URN_LIKE_ARG_NAME = "urnLike"; + public static final String URN_BASED_PAGINATION_ARG_NAME = "urnBasedPagination"; public static final String STARTING_OFFSET_ARG_NAME = "startingOffset"; diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java index ce59cf2edb8..574b1f08b5f 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java @@ -31,6 +31,7 @@ public class SendMAEStep implements UpgradeStep { private static final int DEFAULT_STARTING_OFFSET = 0; private static final int DEFAULT_THREADS = 1; + private static final boolean DEFAULT_URN_BASED_PAGINATION = false; private final Database _server; private final EntityService _entityService; @@ -89,6 +90,7 @@ public class SendMAEStep implements UpgradeStep { result.numThreads = getThreadCount(context.parsedArgs()); result.batchDelayMs = getBatchDelayMs(context.parsedArgs()); result.start = getStartingOffset(context.parsedArgs()); + result.urnBasedPagination = getUrnBasedPagination(context.parsedArgs()); if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAME_ARG_NAME)) { result.aspectName = context.parsedArgs().get(RestoreIndices.ASPECT_NAME_ARG_NAME).get(); } @@ -140,18 +142,49 @@ public class SendMAEStep implements UpgradeStep { List> futures = new ArrayList<>(); startTime = System.currentTimeMillis(); - while (start < rowCount) { - args = args.clone(); - args.start = start; - futures.add(executor.submit(new KafkaJob(context, args))); - start = start + args.batchSize; - } - while (futures.size() > 0) { - List tmpResults = iterateFutures(futures); - for (RestoreIndicesResult tmpResult : tmpResults) { - reportStats(context, finalJobResult, tmpResult, rowCount, startTime); + if (args.urnBasedPagination) { + RestoreIndicesResult previousResult = null; + int rowsProcessed = 1; + while (rowsProcessed > 0) { + args = args.clone(); + if (previousResult != null) { + args.lastUrn = previousResult.lastUrn; + args.lastAspect = previousResult.lastAspect; + } + args.start = start; + context + .report() + .addLine( + String.format( + "Getting next batch of urns + aspects, starting with %s - %s", + args.lastUrn, args.lastAspect)); + Future future = executor.submit(new KafkaJob(context, args)); + try { + RestoreIndicesResult result = future.get(); + reportStats(context, finalJobResult, result, rowCount, startTime); + previousResult = result; + rowsProcessed = result.rowsMigrated + result.ignored; + context.report().addLine(String.format("Rows processed this loop %d", rowsProcessed)); + start += args.batchSize; + } catch (InterruptedException | ExecutionException e) { + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + } + } else { + while (start < rowCount) { + args = args.clone(); + args.start = start; + futures.add(executor.submit(new KafkaJob(context, args))); + start = start + args.batchSize; + } + while (futures.size() > 0) { + List tmpResults = iterateFutures(futures); + for (RestoreIndicesResult tmpResult : tmpResults) { + reportStats(context, finalJobResult, tmpResult, rowCount, startTime); + } } } + executor.shutdown(); if (finalJobResult.rowsMigrated != rowCount) { float percentFailed = 0.0f; @@ -233,6 +266,15 @@ public class SendMAEStep implements UpgradeStep { return getInt(parsedArgs, DEFAULT_THREADS, RestoreIndices.NUM_THREADS_ARG_NAME); } + private boolean getUrnBasedPagination(final Map> parsedArgs) { + boolean urnBasedPagination = DEFAULT_URN_BASED_PAGINATION; + if (containsKey(parsedArgs, RestoreIndices.URN_BASED_PAGINATION_ARG_NAME)) { + urnBasedPagination = + Boolean.parseBoolean(parsedArgs.get(RestoreIndices.URN_BASED_PAGINATION_ARG_NAME).get()); + } + return urnBasedPagination; + } + private int getInt( final Map> parsedArgs, int defaultVal, String argKey) { int result = defaultVal; diff --git a/docker/datahub-upgrade/README.md b/docker/datahub-upgrade/README.md index 0d019971604..9c96114cdb2 100644 --- a/docker/datahub-upgrade/README.md +++ b/docker/datahub-upgrade/README.md @@ -15,8 +15,16 @@ to metadata_aspect_v2 table. Arguments: 2. **NoCodeDataMigrationCleanup**: Cleanses graph index, search index, and key-value store of legacy DataHub data (metadata_aspect table) once the No Code Data Migration has completed successfully. No arguments. -3. **RestoreIndices**: Restores indices by fetching the latest version of each aspect and producing MAE - +3. **RestoreIndices**: Restores indices by fetching the latest version of each aspect and producing MAE. Arguments: + - *batchSize* (Optional): The number of rows to migrate at a time. Defaults to 1000. + - *batchDelayMs* (Optional): The number of milliseconds of delay between migrated batches. Used for rate limiting. Defaults to 250. + - *numThreads* (Optional): The number of threads to use, defaults to 1. Note that this is not used if `urnBasedPagination` is true. + - *aspectName* (Optional): The aspect name for producing events. + - *urn* (Optional): The urn for producing events. + - *urnLike* (Optional): The urn pattern for producing events, using `%` as a wild card + - *urnBasedPagination* (Optional): Paginate the SQL results using the urn + aspect string instead of `OFFSET`. Defaults to false, + though should improve performance for large amounts of data. + 4. **RestoreBackup**: Restores the storage stack from a backup of the local database ## Environment Variables diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index a3338394165..7bd8e763cdc 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -1161,6 +1161,7 @@ public class EntityServiceImpl implements EntityService { Urn urn; try { urn = Urn.createFromString(aspect.getKey().getUrn()); + result.lastUrn = urn.toString(); } catch (Exception e) { logger.accept( String.format( @@ -1188,6 +1189,7 @@ public class EntityServiceImpl implements EntityService { result.timeEntityRegistryCheckMs += System.currentTimeMillis() - startTime; startTime = System.currentTimeMillis(); final String aspectName = aspect.getKey().getAspect(); + result.lastAspect = aspectName; // 3. Verify that the aspect is a valid aspect associated with the entity AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index b2b47c1d5ba..26946890daa 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -477,11 +477,31 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao { if (args.urnLike != null) { exp = exp.like(EbeanAspectV2.URN_COLUMN, args.urnLike); } + + int start = args.start; + if (args.urnBasedPagination) { + start = 0; + if (args.lastUrn != null && !args.lastUrn.isEmpty()) { + exp = exp.where().ge(EbeanAspectV2.URN_COLUMN, args.lastUrn); + + // To prevent processing the same aspect multiple times in a restore, it compares against + // the last aspect if the urn matches the last urn + if (args.lastAspect != null && !args.lastAspect.isEmpty()) { + exp = + exp.where() + .and() + .or() + .ne(EbeanAspectV2.URN_COLUMN, args.lastUrn) + .gt(EbeanAspectV2.ASPECT_COLUMN, args.lastAspect); + } + } + } + return exp.orderBy() .asc(EbeanAspectV2.URN_COLUMN) .orderBy() .asc(EbeanAspectV2.ASPECT_COLUMN) - .setFirstRow(args.start) + .setFirstRow(start) .setMaxRows(args.batchSize) .findPagedList(); } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesArgs.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesArgs.java index d8fcbe0b7d4..e50b44b7f0e 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesArgs.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesArgs.java @@ -11,6 +11,9 @@ public class RestoreIndicesArgs implements Cloneable { public String aspectName; public String urn; public String urnLike; + public Boolean urnBasedPagination = false; + public String lastUrn = ""; + public String lastAspect = ""; @Override public RestoreIndicesArgs clone() { @@ -51,4 +54,9 @@ public class RestoreIndicesArgs implements Cloneable { } return this; } + + public RestoreIndicesArgs setUrnBasedPagination(Boolean urnBasedPagination) { + this.urnBasedPagination = urnBasedPagination; + return this; + } } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesResult.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesResult.java index 8479338660d..a270cf4548b 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesResult.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/restoreindices/RestoreIndicesResult.java @@ -13,4 +13,6 @@ public class RestoreIndicesResult { public long aspectCheckMs = 0; public long createRecordMs = 0; public long sendMessageMs = 0; + public String lastUrn = ""; + public String lastAspect = ""; }