mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 18:07:57 +00:00
feat(restore-indices): add timing info (#5773)
This commit is contained in:
parent
b477047b3b
commit
157363586e
@ -47,12 +47,19 @@ public class SendMAEStep implements UpgradeStep {
|
||||
private final EntityRegistry _entityRegistry;
|
||||
|
||||
public static class KafkaJobResult {
|
||||
public int ignored;
|
||||
public int rowsMigrated;
|
||||
public KafkaJobResult(int ignored, int rowsMigrated) {
|
||||
this.ignored = ignored;
|
||||
this.rowsMigrated = rowsMigrated;
|
||||
}
|
||||
public int ignored = 0;
|
||||
public int rowsMigrated = 0;
|
||||
public long timeSqlQueryMs = 0;
|
||||
public long timeUrnMs = 0;
|
||||
public long timeEntityRegistryCheckMs = 0;
|
||||
public long aspectCheckMs = 0;
|
||||
public long createRecordMs = 0;
|
||||
public long sendMessageMs = 0;
|
||||
}
|
||||
|
||||
public static void reportStat(UpgradeContext context, String id, long timeMs) {
|
||||
context.report().addLine(String.format(
|
||||
"Mins taken for %s so far is %.2f", id, (float) timeMs / 1000 / 60));
|
||||
}
|
||||
|
||||
public class KafkaJob implements Callable<KafkaJobResult> {
|
||||
@ -66,16 +73,20 @@ public class SendMAEStep implements UpgradeStep {
|
||||
}
|
||||
@Override
|
||||
public KafkaJobResult call() {
|
||||
KafkaJobResult result = new KafkaJobResult();
|
||||
int ignored = 0;
|
||||
int rowsMigrated = 0;
|
||||
context.report().addLine(String.format(
|
||||
"Reading rows %s through %s from the aspects table started.", start, start + args.batchSize));
|
||||
long startTime = System.currentTimeMillis();
|
||||
PagedList<EbeanAspectV2> rows = getPagedAspects(start, args);
|
||||
result.timeSqlQueryMs = System.currentTimeMillis() - startTime;
|
||||
context.report().addLine(String.format(
|
||||
"Reading rows %s through %s from the aspects table completed.", start, start + args.batchSize));
|
||||
|
||||
for (EbeanAspectV2 aspect : rows.getList()) {
|
||||
// 1. Extract an Entity type from the entity Urn
|
||||
startTime = System.currentTimeMillis();
|
||||
Urn urn;
|
||||
try {
|
||||
urn = Urn.createFromString(aspect.getKey().getUrn());
|
||||
@ -86,6 +97,8 @@ public class SendMAEStep implements UpgradeStep {
|
||||
ignored = ignored + 1;
|
||||
continue;
|
||||
}
|
||||
result.timeUrnMs += System.currentTimeMillis() - startTime;
|
||||
startTime = System.currentTimeMillis();
|
||||
|
||||
// 2. Verify that the entity associated with the aspect is found in the registry.
|
||||
final String entityName = urn.getEntityType();
|
||||
@ -99,6 +112,8 @@ public class SendMAEStep implements UpgradeStep {
|
||||
ignored = ignored + 1;
|
||||
continue;
|
||||
}
|
||||
result.timeEntityRegistryCheckMs += System.currentTimeMillis() - startTime;
|
||||
startTime = System.currentTimeMillis();
|
||||
final String aspectName = aspect.getKey().getAspect();
|
||||
|
||||
// 3. Verify that the aspect is a valid aspect associated with the entity
|
||||
@ -110,6 +125,8 @@ public class SendMAEStep implements UpgradeStep {
|
||||
ignored = ignored + 1;
|
||||
continue;
|
||||
}
|
||||
result.aspectCheckMs += System.currentTimeMillis() - startTime;
|
||||
startTime = System.currentTimeMillis();
|
||||
|
||||
// 4. Create record from json aspect
|
||||
final RecordTemplate aspectRecord;
|
||||
@ -122,6 +139,8 @@ public class SendMAEStep implements UpgradeStep {
|
||||
ignored = ignored + 1;
|
||||
continue;
|
||||
}
|
||||
result.createRecordMs += System.currentTimeMillis() - startTime;
|
||||
startTime = System.currentTimeMillis();
|
||||
|
||||
SystemMetadata latestSystemMetadata = EntityUtils.parseSystemMetadata(aspect.getSystemMetadata());
|
||||
|
||||
@ -130,6 +149,7 @@ public class SendMAEStep implements UpgradeStep {
|
||||
latestSystemMetadata,
|
||||
new AuditStamp().setActor(UrnUtils.getUrn(SYSTEM_ACTOR)).setTime(System.currentTimeMillis()),
|
||||
ChangeType.RESTATE);
|
||||
result.sendMessageMs += System.currentTimeMillis() - startTime;
|
||||
|
||||
rowsMigrated++;
|
||||
}
|
||||
@ -139,7 +159,9 @@ public class SendMAEStep implements UpgradeStep {
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Thread interrupted while sleeping after successful batch migration.");
|
||||
}
|
||||
return new KafkaJobResult(ignored, rowsMigrated);
|
||||
result.ignored = ignored;
|
||||
result.rowsMigrated = rowsMigrated;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@ -245,6 +267,7 @@ public class SendMAEStep implements UpgradeStep {
|
||||
@Override
|
||||
public Function<UpgradeContext, UpgradeStepResult> executable() {
|
||||
return (context) -> {
|
||||
KafkaJobResult finalJobResult = new KafkaJobResult();
|
||||
JobArgs args = getArgs(context);
|
||||
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(args.numThreads);
|
||||
|
||||
@ -253,10 +276,7 @@ public class SendMAEStep implements UpgradeStep {
|
||||
final int rowCount = getRowCount(args);
|
||||
context.report().addLine(String.format("Found %s latest aspects in aspects table in %.2f minutes.",
|
||||
rowCount, (float) (System.currentTimeMillis() - startTime) / 1000 / 60));
|
||||
|
||||
int totalRowsMigrated = 0;
|
||||
int start = 0;
|
||||
int ignored = 0;
|
||||
|
||||
List<Future<KafkaJobResult>> futures = new ArrayList<>();
|
||||
startTime = System.currentTimeMillis();
|
||||
@ -267,38 +287,50 @@ public class SendMAEStep implements UpgradeStep {
|
||||
}
|
||||
List<KafkaJobResult> tmpResults = iterateFutures(futures);
|
||||
for (KafkaJobResult tmpResult: tmpResults) {
|
||||
totalRowsMigrated += tmpResult.rowsMigrated;
|
||||
ignored += tmpResult.ignored;
|
||||
reportStats(context, totalRowsMigrated, ignored, rowCount, startTime);
|
||||
reportStats(context, finalJobResult, tmpResult, rowCount, startTime);
|
||||
}
|
||||
}
|
||||
while (futures.size() > 0) {
|
||||
List<KafkaJobResult> tmpResults = iterateFutures(futures);
|
||||
for (KafkaJobResult tmpResult: tmpResults) {
|
||||
totalRowsMigrated += tmpResult.rowsMigrated;
|
||||
ignored += tmpResult.ignored;
|
||||
reportStats(context, totalRowsMigrated, ignored, rowCount, startTime);
|
||||
reportStats(context, finalJobResult, tmpResult, rowCount, startTime);
|
||||
}
|
||||
}
|
||||
executor.shutdown();
|
||||
if (totalRowsMigrated != rowCount) {
|
||||
if (finalJobResult.rowsMigrated != rowCount) {
|
||||
float percentFailed = 0.0f;
|
||||
if (rowCount > 0) {
|
||||
percentFailed = (float) (rowCount - totalRowsMigrated) * 100 / rowCount;
|
||||
percentFailed = (float) (rowCount - finalJobResult.rowsMigrated) * 100 / rowCount;
|
||||
}
|
||||
context.report().addLine(String.format(
|
||||
"Failed to send MAEs for %d rows (%.2f%% of total).", rowCount - totalRowsMigrated, percentFailed));
|
||||
"Failed to send MAEs for %d rows (%.2f%% of total).",
|
||||
rowCount - finalJobResult.rowsMigrated, percentFailed));
|
||||
}
|
||||
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
|
||||
};
|
||||
}
|
||||
|
||||
private static void reportStats(UpgradeContext context, int totalRowsMigrated, int ignored, int rowCount,
|
||||
long startTime) {
|
||||
private static void reportStats(UpgradeContext context, KafkaJobResult finalResult, KafkaJobResult tmpResult,
|
||||
int rowCount, long startTime) {
|
||||
finalResult.ignored += tmpResult.ignored;
|
||||
finalResult.rowsMigrated += tmpResult.rowsMigrated;
|
||||
finalResult.timeSqlQueryMs += tmpResult.timeSqlQueryMs;
|
||||
reportStat(context, "sql query", finalResult.timeSqlQueryMs);
|
||||
finalResult.timeUrnMs += tmpResult.timeUrnMs;
|
||||
reportStat(context, "timeUrnMs", finalResult.timeUrnMs);
|
||||
finalResult.timeEntityRegistryCheckMs += tmpResult.timeEntityRegistryCheckMs;
|
||||
reportStat(context, "timeEntityRegistryCheckMs", finalResult.timeEntityRegistryCheckMs);
|
||||
finalResult.aspectCheckMs += tmpResult.aspectCheckMs;
|
||||
reportStat(context, "aspectCheckMs", finalResult.aspectCheckMs);
|
||||
finalResult.createRecordMs += tmpResult.createRecordMs;
|
||||
reportStat(context, "createRecordMs", finalResult.createRecordMs);
|
||||
finalResult.sendMessageMs += tmpResult.sendMessageMs;
|
||||
reportStat(context, "sendMessageMs", finalResult.sendMessageMs);
|
||||
|
||||
long currentTime = System.currentTimeMillis();
|
||||
float timeSoFarMinutes = (float) (currentTime - startTime) / 1000 / 60;
|
||||
float percentSent = (float) totalRowsMigrated * 100 / rowCount;
|
||||
float percentIgnored = (float) ignored * 100 / rowCount;
|
||||
float percentSent = (float) finalResult.rowsMigrated * 100 / rowCount;
|
||||
float percentIgnored = (float) finalResult.ignored * 100 / rowCount;
|
||||
float estimatedTimeMinutesComplete = -1;
|
||||
if (percentSent > 0) {
|
||||
estimatedTimeMinutesComplete = timeSoFarMinutes * (100 - percentSent) / percentSent;
|
||||
@ -306,7 +338,7 @@ public class SendMAEStep implements UpgradeStep {
|
||||
float totalTimeComplete = timeSoFarMinutes + estimatedTimeMinutesComplete;
|
||||
context.report().addLine(String.format(
|
||||
"Successfully sent MAEs for %s/%s rows (%.2f%% of total). %s rows ignored (%.2f%% of total)",
|
||||
totalRowsMigrated, rowCount, percentSent, ignored, percentIgnored));
|
||||
finalResult.rowsMigrated, rowCount, percentSent, finalResult.ignored, percentIgnored));
|
||||
context.report().addLine(String.format("%.2f mins taken. %.2f est. mins to completion. Total mins est. = %.2f.",
|
||||
timeSoFarMinutes, estimatedTimeMinutesComplete, totalTimeComplete));
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user