fix code for tests

This commit is contained in:
Ben Irvin 2022-11-15 20:58:50 +01:00
parent 7f68db7fe7
commit 1a98345a9a
2 changed files with 36 additions and 20 deletions

View File

@ -66,8 +66,6 @@ describe('Transfer engine', () => {
const engine = createTransferEngine(mockedSource, mockedDestination, defaultOptions);
await engine.transfer();
expect(mockedSource.bootstrap).toHaveBeenCalledTimes(1);
expect(mockedDestination.bootstrap).toHaveBeenCalledTimes(1);
});
test('bootstraps all providers with a bootstrap', async () => {
const source = {
@ -82,8 +80,8 @@ describe('Transfer engine', () => {
await engine.transfer();
expect(mockedSource.bootstrap).toHaveBeenCalledTimes(1);
expect(mockedDestination.bootstrap).toHaveBeenCalledTimes(1);
expect(source.bootstrap).toHaveBeenCalledTimes(1);
expect(destination.bootstrap).toHaveBeenCalledTimes(1);
});
});

View File

@ -27,6 +27,8 @@ type TransferEngineProgress = {
stream: PassThrough;
};
export const VALID_STRATEGIES = ['restore', 'merge'];
class TransferEngine<
S extends ISourceProvider = ISourceProvider,
D extends IDestinationProvider = IDestinationProvider
@ -173,8 +175,16 @@ class TransferEngine<
}
}
validateTransferOptions() {
if (!VALID_STRATEGIES.includes(this.options.strategy)) {
throw new Error('Invalid stategy ' + this.options.strategy);
}
}
async transfer(): Promise<ITransferResults<S, D>> {
try {
this.validateTransferOptions();
await this.bootstrap();
const isValidTransfer = await this.integrityCheck();
@ -205,15 +215,17 @@ class TransferEngine<
async transferSchemas(): Promise<void> {
const stageName: TransferStage = 'schemas';
const inStream = await this.sourceProvider.streamSchemas?.();
const outStream = await this.destinationProvider.getSchemasStream?.();
const inStream = await this.sourceProvider.streamSchemas?.();
if (!inStream) {
throw new Error('Unable to transfer schemas, source stream is missing');
console.log('SourceProvider did not return a schemas stream');
return;
}
const outStream = await this.destinationProvider.getSchemasStream?.();
if (!outStream) {
throw new Error('Unable to transfer schemas, destination stream is missing');
console.log('DestinationProvider did not return a schemas stream');
return;
}
this.#updateStage('start', stageName);
@ -237,15 +249,17 @@ class TransferEngine<
async transferEntities(): Promise<void> {
const stageName: TransferStage = 'entities';
const inStream = await this.sourceProvider.streamEntities?.();
const outStream = await this.destinationProvider.getEntitiesStream?.();
const inStream = await this.sourceProvider.streamEntities?.();
if (!inStream) {
throw new Error('Unable to transfer entities, source stream is missing');
console.log('SourceProvider did not return entities stream');
return;
}
const outStream = await this.destinationProvider.getEntitiesStream?.();
if (!outStream) {
throw new Error('Unable to transfer entities, destination stream is missing');
console.log('DestinationProvider did not return entities stream');
return;
}
this.#updateStage('start', stageName);
@ -274,15 +288,17 @@ class TransferEngine<
async transferLinks(): Promise<void> {
const stageName: TransferStage = 'links';
const inStream = await this.sourceProvider.streamLinks?.();
const outStream = await this.destinationProvider.getLinksStream?.();
const inStream = await this.sourceProvider.streamLinks?.();
if (!inStream) {
throw new Error('Unable to transfer links, source stream is missing');
console.log('SourceProvider did not return a stream');
return;
}
const outStream = await this.destinationProvider.getLinksStream?.();
if (!outStream) {
throw new Error('Unable to transfer links, destination stream is missing');
console.log('DestinationProvider did not return a stream');
return;
}
this.#updateStage('start', 'links');
@ -319,15 +335,17 @@ class TransferEngine<
async transferConfiguration(): Promise<void> {
const stageName: TransferStage = 'configuration';
const inStream = await this.sourceProvider.streamConfiguration?.();
const outStream = await this.destinationProvider.getConfigurationStream?.();
const inStream = await this.sourceProvider.streamConfiguration?.();
if (!inStream) {
throw new Error('Unable to transfer configuration, source stream is missing');
console.log('SourceProvider did not return configuration stream');
return;
}
const outStream = await this.destinationProvider.getConfigurationStream?.();
if (!outStream) {
throw new Error('Unable to transfer configuration, destination stream is missing');
console.log('DestinationProvider did not return configuration stream');
return;
}
this.#updateStage('start', stageName);