Adds urnBasedPagination option to datahub-upgrade RestoreIndices (#9232)

Co-authored-by: RyanHolstien <RyanHolstien@users.noreply.github.com>
This commit is contained in:
Nate Bryant 2023-12-19 15:08:55 -05:00 committed by GitHub
parent 8f19138f68
commit a29fce9d82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 96 additions and 13 deletions

View File

@ -24,6 +24,7 @@ public class RestoreIndices implements Upgrade {
public static final String WRITER_POOL_SIZE = "WRITER_POOL_SIZE";
public static final String URN_ARG_NAME = "urn";
public static final String URN_LIKE_ARG_NAME = "urnLike";
public static final String URN_BASED_PAGINATION_ARG_NAME = "urnBasedPagination";
public static final String STARTING_OFFSET_ARG_NAME = "startingOffset";

View File

@ -31,6 +31,7 @@ public class SendMAEStep implements UpgradeStep {
private static final int DEFAULT_STARTING_OFFSET = 0;
private static final int DEFAULT_THREADS = 1;
private static final boolean DEFAULT_URN_BASED_PAGINATION = false;
private final Database _server;
private final EntityService _entityService;
@ -89,6 +90,7 @@ public class SendMAEStep implements UpgradeStep {
result.numThreads = getThreadCount(context.parsedArgs());
result.batchDelayMs = getBatchDelayMs(context.parsedArgs());
result.start = getStartingOffset(context.parsedArgs());
result.urnBasedPagination = getUrnBasedPagination(context.parsedArgs());
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAME_ARG_NAME)) {
result.aspectName = context.parsedArgs().get(RestoreIndices.ASPECT_NAME_ARG_NAME).get();
}
@ -140,18 +142,49 @@ public class SendMAEStep implements UpgradeStep {
List<Future<RestoreIndicesResult>> futures = new ArrayList<>();
startTime = System.currentTimeMillis();
while (start < rowCount) {
args = args.clone();
args.start = start;
futures.add(executor.submit(new KafkaJob(context, args)));
start = start + args.batchSize;
}
while (futures.size() > 0) {
List<RestoreIndicesResult> tmpResults = iterateFutures(futures);
for (RestoreIndicesResult tmpResult : tmpResults) {
reportStats(context, finalJobResult, tmpResult, rowCount, startTime);
if (args.urnBasedPagination) {
RestoreIndicesResult previousResult = null;
int rowsProcessed = 1;
while (rowsProcessed > 0) {
args = args.clone();
if (previousResult != null) {
args.lastUrn = previousResult.lastUrn;
args.lastAspect = previousResult.lastAspect;
}
args.start = start;
context
.report()
.addLine(
String.format(
"Getting next batch of urns + aspects, starting with %s - %s",
args.lastUrn, args.lastAspect));
Future<RestoreIndicesResult> future = executor.submit(new KafkaJob(context, args));
try {
RestoreIndicesResult result = future.get();
reportStats(context, finalJobResult, result, rowCount, startTime);
previousResult = result;
rowsProcessed = result.rowsMigrated + result.ignored;
context.report().addLine(String.format("Rows processed this loop %d", rowsProcessed));
start += args.batchSize;
} catch (InterruptedException | ExecutionException e) {
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
}
}
} else {
while (start < rowCount) {
args = args.clone();
args.start = start;
futures.add(executor.submit(new KafkaJob(context, args)));
start = start + args.batchSize;
}
while (futures.size() > 0) {
List<RestoreIndicesResult> tmpResults = iterateFutures(futures);
for (RestoreIndicesResult tmpResult : tmpResults) {
reportStats(context, finalJobResult, tmpResult, rowCount, startTime);
}
}
}
executor.shutdown();
if (finalJobResult.rowsMigrated != rowCount) {
float percentFailed = 0.0f;
@ -233,6 +266,15 @@ public class SendMAEStep implements UpgradeStep {
return getInt(parsedArgs, DEFAULT_THREADS, RestoreIndices.NUM_THREADS_ARG_NAME);
}
private boolean getUrnBasedPagination(final Map<String, Optional<String>> parsedArgs) {
boolean urnBasedPagination = DEFAULT_URN_BASED_PAGINATION;
if (containsKey(parsedArgs, RestoreIndices.URN_BASED_PAGINATION_ARG_NAME)) {
urnBasedPagination =
Boolean.parseBoolean(parsedArgs.get(RestoreIndices.URN_BASED_PAGINATION_ARG_NAME).get());
}
return urnBasedPagination;
}
private int getInt(
final Map<String, Optional<String>> parsedArgs, int defaultVal, String argKey) {
int result = defaultVal;

View File

@ -15,8 +15,16 @@ to metadata_aspect_v2 table. Arguments:
2. **NoCodeDataMigrationCleanup**: Cleanses graph index, search index, and key-value store of legacy DataHub data (metadata_aspect table) once
the No Code Data Migration has completed successfully. No arguments.
3. **RestoreIndices**: Restores indices by fetching the latest version of each aspect and producing MAE
3. **RestoreIndices**: Restores indices by fetching the latest version of each aspect and producing MAE. Arguments:
- *batchSize* (Optional): The number of rows to migrate at a time. Defaults to 1000.
- *batchDelayMs* (Optional): The number of milliseconds of delay between migrated batches. Used for rate limiting. Defaults to 250.
- *numThreads* (Optional): The number of threads to use, defaults to 1. Note that this is not used if `urnBasedPagination` is true.
- *aspectName* (Optional): The aspect name for producing events.
- *urn* (Optional): The urn for producing events.
- *urnLike* (Optional): The urn pattern for producing events, using `%` as a wild card
- *urnBasedPagination* (Optional): Paginate the SQL results using the urn + aspect string instead of `OFFSET`. Defaults to false,
though should improve performance for large amounts of data.
4. **RestoreBackup**: Restores the storage stack from a backup of the local database
## Environment Variables

View File

@ -1161,6 +1161,7 @@ public class EntityServiceImpl implements EntityService {
Urn urn;
try {
urn = Urn.createFromString(aspect.getKey().getUrn());
result.lastUrn = urn.toString();
} catch (Exception e) {
logger.accept(
String.format(
@ -1188,6 +1189,7 @@ public class EntityServiceImpl implements EntityService {
result.timeEntityRegistryCheckMs += System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();
final String aspectName = aspect.getKey().getAspect();
result.lastAspect = aspectName;
// 3. Verify that the aspect is a valid aspect associated with the entity
AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName);

View File

@ -477,11 +477,31 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
if (args.urnLike != null) {
exp = exp.like(EbeanAspectV2.URN_COLUMN, args.urnLike);
}
int start = args.start;
if (args.urnBasedPagination) {
start = 0;
if (args.lastUrn != null && !args.lastUrn.isEmpty()) {
exp = exp.where().ge(EbeanAspectV2.URN_COLUMN, args.lastUrn);
// To prevent processing the same aspect multiple times in a restore, it compares against
// the last aspect if the urn matches the last urn
if (args.lastAspect != null && !args.lastAspect.isEmpty()) {
exp =
exp.where()
.and()
.or()
.ne(EbeanAspectV2.URN_COLUMN, args.lastUrn)
.gt(EbeanAspectV2.ASPECT_COLUMN, args.lastAspect);
}
}
}
return exp.orderBy()
.asc(EbeanAspectV2.URN_COLUMN)
.orderBy()
.asc(EbeanAspectV2.ASPECT_COLUMN)
.setFirstRow(args.start)
.setFirstRow(start)
.setMaxRows(args.batchSize)
.findPagedList();
}

View File

@ -11,6 +11,9 @@ public class RestoreIndicesArgs implements Cloneable {
public String aspectName;
public String urn;
public String urnLike;
public Boolean urnBasedPagination = false;
public String lastUrn = "";
public String lastAspect = "";
@Override
public RestoreIndicesArgs clone() {
@ -51,4 +54,9 @@ public class RestoreIndicesArgs implements Cloneable {
}
return this;
}
public RestoreIndicesArgs setUrnBasedPagination(Boolean urnBasedPagination) {
this.urnBasedPagination = urnBasedPagination;
return this;
}
}

View File

@ -13,4 +13,6 @@ public class RestoreIndicesResult {
public long aspectCheckMs = 0;
public long createRecordMs = 0;
public long sendMessageMs = 0;
public String lastUrn = "";
public String lastAspect = "";
}