feat(upgrade): Make restore from backup logic generic (#6536)

This commit is contained in:
Pedro Silva 2022-11-24 16:19:24 +00:00 committed by GitHub
parent f77117a9e0
commit 5fd5866a03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 367 additions and 133 deletions

View File

@ -1,6 +1,6 @@
package com.linkedin.datahub.upgrade.restorebackup;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableBiMap;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
@ -9,17 +9,27 @@ import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.datahub.upgrade.restorebackup.backupreader.BackupReader;
import com.linkedin.datahub.upgrade.restorebackup.backupreader.BackupReaderArgs;
import com.linkedin.datahub.upgrade.restorebackup.backupreader.EbeanAspectBackupIterator;
import com.linkedin.datahub.upgrade.restorebackup.backupreader.LocalParquetReader;
import com.linkedin.datahub.upgrade.restorebackup.backupreader.ReaderWrapper;
import com.linkedin.datahub.upgrade.restoreindices.RestoreIndices;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -27,17 +37,34 @@ import java.util.stream.Collectors;
public class RestoreStorageStep implements UpgradeStep {
private static final int REPORT_BATCH_SIZE = 1000;
private static final int DEFAULT_THREAD_POOL = 4;
private final EntityService _entityService;
private final EntityRegistry _entityRegistry;
private final Map<String, BackupReader> _backupReaders;
private final Map<String, Class<? extends BackupReader<? extends ReaderWrapper<?>>>> _backupReaders;
private final ExecutorService _fileReaderThreadPool;
private final ExecutorService _gmsThreadPool;
public RestoreStorageStep(final EntityService entityService, final EntityRegistry entityRegistry) {
_entityService = entityService;
_entityRegistry = entityRegistry;
_backupReaders = ImmutableList.of(new LocalParquetReader())
.stream()
.collect(Collectors.toMap(BackupReader::getName, Function.identity()));
_backupReaders = ImmutableBiMap.of(LocalParquetReader.READER_NAME, LocalParquetReader.class);
final String readerPoolSize = System.getenv(RestoreIndices.READER_POOL_SIZE);
final String writerPoolSize = System.getenv(RestoreIndices.WRITER_POOL_SIZE);
int filePoolSize;
int gmsPoolSize;
try {
filePoolSize = Integer.parseInt(readerPoolSize);
} catch (NumberFormatException e) {
filePoolSize = DEFAULT_THREAD_POOL;
}
try {
gmsPoolSize = Integer.parseInt(writerPoolSize);
} catch (NumberFormatException e) {
gmsPoolSize = DEFAULT_THREAD_POOL;
}
_fileReaderThreadPool = Executors.newFixedThreadPool(filePoolSize);
_gmsThreadPool = Executors.newFixedThreadPool(gmsPoolSize);
}
@Override
@ -57,61 +84,38 @@ public class RestoreStorageStep implements UpgradeStep {
context.report().addLine("Starting backup restore...");
int numRows = 0;
Optional<String> backupReaderName = context.parsedArgs().get("BACKUP_READER");
context.report().addLine("Inputs!: " + context.parsedArgs());
context.report().addLine("BACKUP_READER: " + backupReaderName.toString());
if (!backupReaderName.isPresent() || !_backupReaders.containsKey(backupReaderName.get())) {
context.report().addLine("BACKUP_READER is not set or is not valid");
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
}
EbeanAspectBackupIterator iterator = _backupReaders.get(backupReaderName.get()).getBackupIterator(context);
EbeanAspectV2 aspect;
while ((aspect = iterator.next()) != null) {
numRows++;
// 1. Extract an Entity type from the entity Urn
Urn urn;
Class<? extends BackupReader<? extends ReaderWrapper>> clazz = _backupReaders.get(backupReaderName.get());
List<String> argNames = BackupReaderArgs.getArgNames(clazz);
List<Optional<String>> args = argNames.stream().map(argName -> context.parsedArgs().get(argName)).collect(
Collectors.toList());
BackupReader<? extends ReaderWrapper> backupReader;
try {
backupReader = clazz.getConstructor(List.class).newInstance(args);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
e.printStackTrace();
context.report().addLine("Invalid BackupReader, not able to construct instance of " + clazz.getSimpleName());
throw new IllegalArgumentException("Invalid BackupReader: " + clazz.getSimpleName() + ", need to implement proper constructor.");
}
EbeanAspectBackupIterator<? extends ReaderWrapper> iterator = backupReader.getBackupIterator(context);
ReaderWrapper reader;
List<Future<?>> futureList = new ArrayList<>();
while ((reader = iterator.getNextReader()) != null) {
final ReaderWrapper readerRef = reader;
futureList.add(_fileReaderThreadPool.submit(() -> readerExecutable(readerRef, context)));
}
for (Future<?> future : futureList) {
try {
urn = Urn.createFromString(aspect.getKey().getUrn());
} catch (Exception e) {
context.report()
.addLine(
String.format("Failed to bind Urn with value %s into Urn object: %s", aspect.getKey().getUrn(), e));
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
}
// 2. Verify that the entity associated with the aspect is found in the registry.
final String entityName = urn.getEntityType();
final EntitySpec entitySpec;
try {
entitySpec = _entityRegistry.getEntitySpec(entityName);
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to find Entity with name %s in Entity Registry: %s", entityName, e));
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
}
final String aspectName = aspect.getKey().getAspect();
// 3. Create record from json aspect
final RecordTemplate aspectRecord =
EntityUtils.toAspectRecord(entityName, aspectName, aspect.getMetadata(), _entityRegistry);
// 4. Verify that the aspect is a valid aspect associated with the entity
AspectSpec aspectSpec;
try {
aspectSpec = entitySpec.getAspectSpec(aspectName);
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to find aspect spec with name %s associated with entity named %s: %s",
aspectName, entityName, e));
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
}
// 5. Write the row back using the EntityService
boolean emitMae = aspect.getKey().getVersion() == 0L;
_entityService.updateAspect(urn, entityName, aspectName, aspectSpec, aspectRecord, toAuditStamp(aspect),
aspect.getKey().getVersion(), emitMae);
if (numRows % REPORT_BATCH_SIZE == 0) {
context.report().addLine(String.format("Successfully inserted %d rows", numRows));
future.get();
} catch (InterruptedException | ExecutionException e) {
context.report().addLine("Reading interrupted, not able to finish processing.");
throw new RuntimeException(e);
}
}
@ -120,6 +124,80 @@ public class RestoreStorageStep implements UpgradeStep {
};
}
private void readerExecutable(ReaderWrapper reader, UpgradeContext context) {
EbeanAspectV2 aspect;
int numRows = 0;
final ArrayList<Future<RecordTemplate>> futureList = new ArrayList<>();
while ((aspect = reader.next()) != null) {
numRows++;
// 1. Extract an Entity type from the entity Urn
final Urn urn;
try {
urn = Urn.createFromString(aspect.getKey().getUrn());
} catch (Exception e) {
context.report()
.addLine(
String.format("Failed to bind Urn with value %s into Urn object: %s", aspect.getKey().getUrn(), e));
continue;
}
// 2. Verify that the entity associated with the aspect is found in the registry.
final String entityName = urn.getEntityType();
final EntitySpec entitySpec;
try {
entitySpec = _entityRegistry.getEntitySpec(entityName);
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to find Entity with name %s in Entity Registry: %s", entityName, e));
continue;
}
final String aspectName = aspect.getKey().getAspect();
// 3. Create record from json aspect
final RecordTemplate aspectRecord;
try {
aspectRecord =
EntityUtils.toAspectRecord(entityName, aspectName, aspect.getMetadata(), _entityRegistry);
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to create aspect record with name %s associated with entity named %s: %s",
aspectName, entityName, e));
continue;
}
// 4. Verify that the aspect is a valid aspect associated with the entity
final AspectSpec aspectSpec;
try {
aspectSpec = entitySpec.getAspectSpec(aspectName);
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to find aspect spec with name %s associated with entity named %s: %s",
aspectName, entityName, e));
continue;
}
// 5. Write the row back using the EntityService
final long version = aspect.getKey().getVersion();
final AuditStamp auditStamp = toAuditStamp(aspect);
futureList.add(_gmsThreadPool.submit(() ->
_entityService.updateAspect(urn, entityName, aspectName, aspectSpec, aspectRecord, auditStamp,
version, version == 0L)));
if (numRows % REPORT_BATCH_SIZE == 0) {
for (Future<?> future : futureList) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
context.report().addLine("Reading interrupted, not able to finish processing.");
throw new RuntimeException(e);
}
}
futureList.clear();
context.report().addLine(String.format("Successfully inserted %d rows", numRows));
}
}
}
private AuditStamp toAuditStamp(final EbeanAspectV2 aspect) {
final AuditStamp auditStamp = new AuditStamp();
auditStamp.setTime(aspect.getCreatedOn().getTime());

View File

@ -6,11 +6,11 @@ import javax.annotation.Nonnull;
/**
* Base interface for BackupReader used for creating the BackupIterator to retrieve EbeanAspectV2 object to be
* ingested back into GMS
* ingested back into GMS. Must have a constructor that takes a List of Optional Strings
*/
public interface BackupReader {
public interface BackupReader<T extends ReaderWrapper> {
String getName();
@Nonnull
EbeanAspectBackupIterator getBackupIterator(UpgradeContext context);
EbeanAspectBackupIterator<T> getBackupIterator(UpgradeContext context);
}

View File

@ -0,0 +1,26 @@
package com.linkedin.datahub.upgrade.restorebackup.backupreader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Retains a map of what arguments are passed in to a backup reader
*/
public final class BackupReaderArgs {
private BackupReaderArgs() {
}
private static final Map<Class<? extends BackupReader>, List<String>> ARGS_MAP;
static {
ARGS_MAP = new HashMap<>();
ARGS_MAP.put(LocalParquetReader.class, LocalParquetReader.argNames());
}
public static List<String> getArgNames(Class<? extends BackupReader> clazz) {
return ARGS_MAP.get(clazz);
}
}

View File

@ -1,14 +1,46 @@
package com.linkedin.datahub.upgrade.restorebackup.backupreader;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Base interface for iterators that retrieves EbeanAspectV2 objects
* This allows us to restore from backups of various format
*/
public interface EbeanAspectBackupIterator extends Closeable {
// Get the next row in backup. Return null if finished.
EbeanAspectV2 next();
@Slf4j
@RequiredArgsConstructor
public class EbeanAspectBackupIterator<T extends ReaderWrapper> implements Closeable {
private final Collection<T> _readers;
private final Iterator<T> it;
public EbeanAspectBackupIterator(final Collection<T> readers) {
this._readers = readers;
it = _readers.iterator();
}
public T getNextReader() {
while (it.hasNext()) {
final T element = it.next();
log.warn("Iterating over reader {}", element.getFileName());
return element;
}
return null;
}
@Override
public void close() {
_readers.forEach(reader -> {
try {
reader.close();
} catch (IOException e) {
log.error("Error while closing parquet reader", e);
}
});
}
}

View File

@ -3,6 +3,8 @@ package com.linkedin.datahub.upgrade.restorebackup.backupreader;
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeContext;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
@ -17,9 +19,18 @@ import org.apache.parquet.hadoop.ParquetReader;
* BackupReader for retrieving EbeanAspectV2 objects from a local parquet file
*/
@Slf4j
public class LocalParquetReader implements BackupReader {
public class LocalParquetReader implements BackupReader<ParquetReaderWrapper> {
public LocalParquetReader() {
public static final String READER_NAME = "LOCAL_PARQUET";
public static List<String> argNames() {
return Collections.emptyList();
}
public LocalParquetReader(@Nonnull List<Optional<String>> args) {
if (args.size() != argNames().size()) {
throw new IllegalArgumentException("Incorrect number of arguments for LocalParquetReader.");
}
// Need below to solve issue with hadoop path class not working in linux systems
// https://stackoverflow.com/questions/41864985/hadoop-ioexception-failure-to-login
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("hduser"));
@ -32,7 +43,7 @@ public class LocalParquetReader implements BackupReader {
@Nonnull
@Override
public EbeanAspectBackupIterator getBackupIterator(UpgradeContext context) {
public EbeanAspectBackupIterator<ParquetReaderWrapper> getBackupIterator(UpgradeContext context) {
Optional<String> path = context.parsedArgs().get("BACKUP_FILE_PATH");
if (!path.isPresent()) {
context.report().addLine("BACKUP_FILE_PATH must be set to run RestoreBackup through local parquet file");
@ -42,9 +53,9 @@ public class LocalParquetReader implements BackupReader {
try {
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(path.get())).build();
return new ParquetEbeanAspectBackupIterator(ImmutableList.of(reader));
return new EbeanAspectBackupIterator<>(ImmutableList.of(new ParquetReaderWrapper(reader, path.get())));
} catch (IOException e) {
throw new RuntimeException(String.format("Failed to build ParquetReader: %s", e));
}
}
}
}

View File

@ -1,66 +0,0 @@
package com.linkedin.datahub.upgrade.restorebackup.backupreader;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.hadoop.ParquetReader;
/**
* Iterator to retrieve EbeanAspectV2 objects from the ParquetReader
* Converts the avro GenericRecord object into EbeanAspectV2
*/
@Slf4j
@RequiredArgsConstructor
public class ParquetEbeanAspectBackupIterator implements EbeanAspectBackupIterator {
private final List<ParquetReader<GenericRecord>> _parquetReaders;
private int currentReaderIndex = 0;
@Override
public EbeanAspectV2 next() {
if (currentReaderIndex >= _parquetReaders.size()) {
return null;
}
ParquetReader<GenericRecord> parquetReader = _parquetReaders.get(currentReaderIndex);
try {
GenericRecord record = parquetReader.read();
if (record == null) {
log.info("Record is null, moving to next reader {} {}", currentReaderIndex, _parquetReaders.size());
parquetReader.close();
currentReaderIndex++;
return next();
}
return convertRecord(record);
} catch (IOException e) {
log.error("Error while reading backed up aspect", e);
return null;
}
}
@Override
public void close() {
_parquetReaders.forEach(reader -> {
try {
reader.close();
} catch (IOException e) {
log.error("Error while closing parquet reader", e);
}
});
}
private EbeanAspectV2 convertRecord(GenericRecord record) {
return new EbeanAspectV2(record.get("urn").toString(), record.get("aspect").toString(),
(Long) record.get("version"), record.get("metadata").toString(),
Timestamp.from(Instant.ofEpochMilli((Long) record.get("createdon") / 1000)), record.get("createdby").toString(),
Optional.ofNullable(record.get("createdfor")).map(Object::toString).orElse(null),
Optional.ofNullable(record.get("systemmetadata")).map(Object::toString).orElse(null));
}
}

View File

@ -0,0 +1,70 @@
package com.linkedin.datahub.upgrade.restorebackup.backupreader;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.hadoop.ParquetReader;
@Slf4j
public class ParquetReaderWrapper extends ReaderWrapper<GenericRecord> {
private final static long NANOS_PER_MILLISECOND = 1000000;
private final static long MILLIS_IN_DAY = 86400000;
private final static long JULIAN_EPOCH_OFFSET_DAYS = 2440588;
private final ParquetReader<GenericRecord> _parquetReader;
public ParquetReaderWrapper(ParquetReader<GenericRecord> parquetReader, String fileName) {
super(fileName);
_parquetReader = parquetReader;
}
@Override
boolean isLatestVersion(GenericRecord record) {
return (Long) record.get("version") == 0L;
}
@Override
GenericRecord read() throws IOException {
return _parquetReader.read();
}
EbeanAspectV2 convertRecord(GenericRecord record) {
long ts;
if (record.get("createdon") instanceof GenericFixed) {
ts = convertFixed96IntToTs((GenericFixed) record.get("createdon"));
} else {
ts = (Long) record.get("createdon");
}
return new EbeanAspectV2(record.get("urn").toString(), record.get("aspect").toString(),
(Long) record.get("version"), record.get("metadata").toString(),
Timestamp.from(Instant.ofEpochMilli(ts / 1000)), record.get("createdby").toString(),
Optional.ofNullable(record.get("createdfor")).map(Object::toString).orElse(null),
Optional.ofNullable(record.get("systemmetadata")).map(Object::toString).orElse(null));
}
private long convertFixed96IntToTs(GenericFixed createdon) {
// From https://github.com/apache/parquet-format/pull/49/filesParquetTimestampUtils.java
// and ParquetTimestampUtils.java from https://github.com/kube-reporting/presto/blob/master/presto-parquet/
// src/main/java/io/prestosql/parquet/ParquetTimestampUtils.java
byte[] bytes = createdon.bytes(); // little endian encoding - need to invert byte order
long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]);
return ((julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
}
@Override
public void close() throws IOException {
_parquetReader.close();
}
}

View File

@ -0,0 +1,81 @@
package com.linkedin.datahub.upgrade.restorebackup.backupreader;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import java.io.Closeable;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
/**
* Abstract class that reads entries from a given source and transforms then into {@link EbeanAspectV2} instances.
* @param <T> The object type to read from a reader source.
*/
@Slf4j
public abstract class ReaderWrapper<T> implements Closeable {
private long totalTimeSpentInRead = 0L;
private long lastTimeLogged = 0L;
private int recordsSkipped = 0;
private int recordsFailed = 0;
private int recordsProcessed = 0;
private long totalTimeSpentInConvert = 0L;
private final String _fileName;
ReaderWrapper(String fileName) {
this._fileName = fileName;
}
public EbeanAspectV2 next() {
try {
long readStart = System.nanoTime();
T record = read();
long readEnd = System.nanoTime();
totalTimeSpentInRead += readEnd - readStart;
while ((record != null) && !isLatestVersion(record)) {
recordsSkipped += 1;
readStart = System.nanoTime();
record = read();
readEnd = System.nanoTime();
totalTimeSpentInRead += readEnd - readStart;
}
if ((readEnd - lastTimeLogged) > 1000L * 1000 * 1000 * 5) {
// print every 5 seconds
printStat("Running: ");
lastTimeLogged = readEnd;
}
if (record == null) {
printStat("Closing: ");
close();
return null;
}
long convertStart = System.nanoTime();
final EbeanAspectV2 ebeanAspectV2 = convertRecord(record);
long convertEnd = System.nanoTime();
this.totalTimeSpentInConvert += convertEnd - convertStart;
this.recordsProcessed++;
return ebeanAspectV2;
} catch (Exception e) {
log.error("Error while reading backed up aspect", e);
this.recordsFailed++;
return null;
}
}
abstract T read() throws IOException;
abstract boolean isLatestVersion(T record);
abstract EbeanAspectV2 convertRecord(T record);
private void printStat(String prefix) {
log.info("{} Reader {}. Stats: records processed: {}, Total millis spent in reading: {}, records skipped: {},"
+ " records failed: {}, Total millis in convert: {}", prefix, _fileName,
recordsProcessed, totalTimeSpentInRead / 1000 / 1000, recordsSkipped, recordsFailed,
totalTimeSpentInConvert / 1000 / 1000);
}
public String getFileName() {
return _fileName;
}
}

View File

@ -20,6 +20,8 @@ public class RestoreIndices implements Upgrade {
public static final String BATCH_DELAY_MS_ARG_NAME = "batchDelayMs";
public static final String NUM_THREADS_ARG_NAME = "numThreads";
public static final String ASPECT_NAME_ARG_NAME = "aspectName";
public static final String READER_POOL_SIZE = "READER_POOL_SIZE";
public static final String WRITER_POOL_SIZE = "WRITER_POOL_SIZE";
public static final String URN_ARG_NAME = "urn";
public static final String URN_LIKE_ARG_NAME = "urnLike";