From 157363586e6a61c154bc9c5acdabcd0be1d87e57 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Tue, 30 Aug 2022 17:38:20 +0530 Subject: [PATCH] feat(restore-indices): add timing info (#5773) --- .../upgrade/restoreindices/SendMAEStep.java | 80 +++++++++++++------ 1 file changed, 56 insertions(+), 24 deletions(-) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java index e1ab85a6d3..9bce7dd7c2 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java @@ -47,12 +47,19 @@ public class SendMAEStep implements UpgradeStep { private final EntityRegistry _entityRegistry; public static class KafkaJobResult { - public int ignored; - public int rowsMigrated; - public KafkaJobResult(int ignored, int rowsMigrated) { - this.ignored = ignored; - this.rowsMigrated = rowsMigrated; - } + public int ignored = 0; + public int rowsMigrated = 0; + public long timeSqlQueryMs = 0; + public long timeUrnMs = 0; + public long timeEntityRegistryCheckMs = 0; + public long aspectCheckMs = 0; + public long createRecordMs = 0; + public long sendMessageMs = 0; + } + + public static void reportStat(UpgradeContext context, String id, long timeMs) { + context.report().addLine(String.format( + "Mins taken for %s so far is %.2f", id, (float) timeMs / 1000 / 60)); } public class KafkaJob implements Callable { @@ -66,16 +73,20 @@ public class SendMAEStep implements UpgradeStep { } @Override public KafkaJobResult call() { + KafkaJobResult result = new KafkaJobResult(); int ignored = 0; int rowsMigrated = 0; context.report().addLine(String.format( "Reading rows %s through %s from the aspects table started.", start, start + args.batchSize)); + long startTime = System.currentTimeMillis(); PagedList rows = getPagedAspects(start, args); + result.timeSqlQueryMs = System.currentTimeMillis() - startTime; context.report().addLine(String.format( "Reading rows %s through %s from the aspects table completed.", start, start + args.batchSize)); for (EbeanAspectV2 aspect : rows.getList()) { // 1. Extract an Entity type from the entity Urn + startTime = System.currentTimeMillis(); Urn urn; try { urn = Urn.createFromString(aspect.getKey().getUrn()); @@ -86,6 +97,8 @@ public class SendMAEStep implements UpgradeStep { ignored = ignored + 1; continue; } + result.timeUrnMs += System.currentTimeMillis() - startTime; + startTime = System.currentTimeMillis(); // 2. Verify that the entity associated with the aspect is found in the registry. final String entityName = urn.getEntityType(); @@ -99,6 +112,8 @@ public class SendMAEStep implements UpgradeStep { ignored = ignored + 1; continue; } + result.timeEntityRegistryCheckMs += System.currentTimeMillis() - startTime; + startTime = System.currentTimeMillis(); final String aspectName = aspect.getKey().getAspect(); // 3. Verify that the aspect is a valid aspect associated with the entity @@ -110,6 +125,8 @@ public class SendMAEStep implements UpgradeStep { ignored = ignored + 1; continue; } + result.aspectCheckMs += System.currentTimeMillis() - startTime; + startTime = System.currentTimeMillis(); // 4. Create record from json aspect final RecordTemplate aspectRecord; @@ -122,6 +139,8 @@ public class SendMAEStep implements UpgradeStep { ignored = ignored + 1; continue; } + result.createRecordMs += System.currentTimeMillis() - startTime; + startTime = System.currentTimeMillis(); SystemMetadata latestSystemMetadata = EntityUtils.parseSystemMetadata(aspect.getSystemMetadata()); @@ -130,6 +149,7 @@ public class SendMAEStep implements UpgradeStep { latestSystemMetadata, new AuditStamp().setActor(UrnUtils.getUrn(SYSTEM_ACTOR)).setTime(System.currentTimeMillis()), ChangeType.RESTATE); + result.sendMessageMs += System.currentTimeMillis() - startTime; rowsMigrated++; } @@ -139,7 +159,9 @@ public class SendMAEStep implements UpgradeStep { } catch (InterruptedException e) { throw new RuntimeException("Thread interrupted while sleeping after successful batch migration."); } - return new KafkaJobResult(ignored, rowsMigrated); + result.ignored = ignored; + result.rowsMigrated = rowsMigrated; + return result; } } @@ -245,6 +267,7 @@ public class SendMAEStep implements UpgradeStep { @Override public Function executable() { return (context) -> { + KafkaJobResult finalJobResult = new KafkaJobResult(); JobArgs args = getArgs(context); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(args.numThreads); @@ -253,10 +276,7 @@ public class SendMAEStep implements UpgradeStep { final int rowCount = getRowCount(args); context.report().addLine(String.format("Found %s latest aspects in aspects table in %.2f minutes.", rowCount, (float) (System.currentTimeMillis() - startTime) / 1000 / 60)); - - int totalRowsMigrated = 0; int start = 0; - int ignored = 0; List> futures = new ArrayList<>(); startTime = System.currentTimeMillis(); @@ -267,38 +287,50 @@ public class SendMAEStep implements UpgradeStep { } List tmpResults = iterateFutures(futures); for (KafkaJobResult tmpResult: tmpResults) { - totalRowsMigrated += tmpResult.rowsMigrated; - ignored += tmpResult.ignored; - reportStats(context, totalRowsMigrated, ignored, rowCount, startTime); + reportStats(context, finalJobResult, tmpResult, rowCount, startTime); } } while (futures.size() > 0) { List tmpResults = iterateFutures(futures); for (KafkaJobResult tmpResult: tmpResults) { - totalRowsMigrated += tmpResult.rowsMigrated; - ignored += tmpResult.ignored; - reportStats(context, totalRowsMigrated, ignored, rowCount, startTime); + reportStats(context, finalJobResult, tmpResult, rowCount, startTime); } } executor.shutdown(); - if (totalRowsMigrated != rowCount) { + if (finalJobResult.rowsMigrated != rowCount) { float percentFailed = 0.0f; if (rowCount > 0) { - percentFailed = (float) (rowCount - totalRowsMigrated) * 100 / rowCount; + percentFailed = (float) (rowCount - finalJobResult.rowsMigrated) * 100 / rowCount; } context.report().addLine(String.format( - "Failed to send MAEs for %d rows (%.2f%% of total).", rowCount - totalRowsMigrated, percentFailed)); + "Failed to send MAEs for %d rows (%.2f%% of total).", + rowCount - finalJobResult.rowsMigrated, percentFailed)); } return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); }; } - private static void reportStats(UpgradeContext context, int totalRowsMigrated, int ignored, int rowCount, - long startTime) { + private static void reportStats(UpgradeContext context, KafkaJobResult finalResult, KafkaJobResult tmpResult, + int rowCount, long startTime) { + finalResult.ignored += tmpResult.ignored; + finalResult.rowsMigrated += tmpResult.rowsMigrated; + finalResult.timeSqlQueryMs += tmpResult.timeSqlQueryMs; + reportStat(context, "sql query", finalResult.timeSqlQueryMs); + finalResult.timeUrnMs += tmpResult.timeUrnMs; + reportStat(context, "timeUrnMs", finalResult.timeUrnMs); + finalResult.timeEntityRegistryCheckMs += tmpResult.timeEntityRegistryCheckMs; + reportStat(context, "timeEntityRegistryCheckMs", finalResult.timeEntityRegistryCheckMs); + finalResult.aspectCheckMs += tmpResult.aspectCheckMs; + reportStat(context, "aspectCheckMs", finalResult.aspectCheckMs); + finalResult.createRecordMs += tmpResult.createRecordMs; + reportStat(context, "createRecordMs", finalResult.createRecordMs); + finalResult.sendMessageMs += tmpResult.sendMessageMs; + reportStat(context, "sendMessageMs", finalResult.sendMessageMs); + long currentTime = System.currentTimeMillis(); float timeSoFarMinutes = (float) (currentTime - startTime) / 1000 / 60; - float percentSent = (float) totalRowsMigrated * 100 / rowCount; - float percentIgnored = (float) ignored * 100 / rowCount; + float percentSent = (float) finalResult.rowsMigrated * 100 / rowCount; + float percentIgnored = (float) finalResult.ignored * 100 / rowCount; float estimatedTimeMinutesComplete = -1; if (percentSent > 0) { estimatedTimeMinutesComplete = timeSoFarMinutes * (100 - percentSent) / percentSent; @@ -306,7 +338,7 @@ public class SendMAEStep implements UpgradeStep { float totalTimeComplete = timeSoFarMinutes + estimatedTimeMinutesComplete; context.report().addLine(String.format( "Successfully sent MAEs for %s/%s rows (%.2f%% of total). %s rows ignored (%.2f%% of total)", - totalRowsMigrated, rowCount, percentSent, ignored, percentIgnored)); + finalResult.rowsMigrated, rowCount, percentSent, finalResult.ignored, percentIgnored)); context.report().addLine(String.format("%.2f mins taken. %.2f est. mins to completion. Total mins est. = %.2f.", timeSoFarMinutes, estimatedTimeMinutesComplete, totalTimeComplete)); }