Merge branch 'data-transfer/progress' into data-transfer/fix-transfer-transaction-close

This commit is contained in:
Ben Irvin 2023-02-24 10:07:47 +01:00
commit 5889463574
8 changed files with 120 additions and 16 deletions

View File

@ -202,7 +202,7 @@ class TransferEngine<
}
) {
if (!this.progress.data[stage]) {
this.progress.data[stage] = { count: 0, bytes: 0 };
this.progress.data[stage] = { count: 0, bytes: 0, startTime: Date.now() };
}
const stageProgress = this.progress.data[stage];
@ -437,6 +437,14 @@ class TransferEngine<
}) {
const { stage, source, destination, transform, tracker } = options;
const updateEndTime = () => {
const stageData = this.progress.data[stage];
if (stageData) {
stageData.endTime = Date.now();
}
};
if (!source || !destination || this.shouldSkipStage(stage)) {
// Wait until source and destination are closed
const results = await Promise.allSettled(
@ -480,11 +488,15 @@ class TransferEngine<
stream
.pipe(destination)
.on('error', (e) => {
updateEndTime();
this.#reportError(e, 'error');
destination.destroy(e);
reject(e);
})
.on('close', resolve);
.on('close', () => {
updateEndTime();
resolve();
});
});
this.#emitStageUpdate('finish', stage);

View File

@ -54,22 +54,26 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
return new Promise<string>((resolve, reject) => {
this.ws
?.once('open', async () => {
const query = this.dispatcher?.dispatchCommand({
command: 'init',
params: { options: { strategy, restore }, transfer: 'push' },
});
try {
const query = this.dispatcher?.dispatchCommand({
command: 'init',
params: { options: { strategy, restore }, transfer: 'push' },
});
const res = (await query) as server.Payload<server.InitMessage>;
const res = (await query) as server.Payload<server.InitMessage>;
if (!res?.transferID) {
return reject(
new ProviderTransferError('Init failed, invalid response from the server')
);
if (!res?.transferID) {
throw new ProviderTransferError('Init failed, invalid response from the server');
}
resolve(res.transferID);
} catch (e: unknown) {
reject(e);
}
resolve(res.transferID);
})
.once('error', reject);
.once('error', (message) => {
reject(message);
});
});
}

View File

@ -56,7 +56,6 @@ const createDispatcher = (ws: WebSocket) => {
}
};
// TODO: What happens if the server sends another message (not a response to this message)
ws.once('message', onResponse);
});
};

View File

@ -64,6 +64,8 @@ export type TransferProgress = {
[key in TransferStage]?: {
count: number;
bytes: number;
startTime: number;
endTime?: number;
aggregates?: {
[key: string]: {
count: number;

View File

@ -18,9 +18,9 @@ const {
DEFAULT_IGNORED_CONTENT_TYPES,
createStrapiInstance,
formatDiagnostic,
loadersFactory,
} = require('./utils');
const { exitWith } = require('../utils/helpers');
/**
* @typedef ExportCommandOptions Options given to the CLI import command
*
@ -80,6 +80,20 @@ module.exports = async (opts) => {
const progress = engine.progress.stream;
const { updateLoader } = loadersFactory();
progress.on(`stage::start`, ({ stage, data }) => {
updateLoader(stage, data).start();
});
progress.on('stage::finish', ({ stage, data }) => {
updateLoader(stage, data).succeed();
});
progress.on('stage::progress', ({ stage, data }) => {
updateLoader(stage, data);
});
const getTelemetryPayload = (/* payload */) => {
return {
eventProperties: {

View File

@ -19,6 +19,7 @@ const {
DEFAULT_IGNORED_CONTENT_TYPES,
createStrapiInstance,
formatDiagnostic,
loadersFactory,
} = require('./utils');
const { exitWith } = require('../utils/helpers');
@ -88,6 +89,21 @@ module.exports = async (opts) => {
engine.diagnostics.onDiagnostic(formatDiagnostic('import'));
const progress = engine.progress.stream;
const { updateLoader } = loadersFactory();
progress.on(`stage::start`, ({ stage, data }) => {
updateLoader(stage, data).start();
});
progress.on('stage::finish', ({ stage, data }) => {
updateLoader(stage, data).succeed();
});
progress.on('stage::progress', ({ stage, data }) => {
updateLoader(stage, data);
});
const getTelemetryPayload = () => {
return {
eventProperties: {

View File

@ -16,6 +16,7 @@ const {
createStrapiInstance,
DEFAULT_IGNORED_CONTENT_TYPES,
formatDiagnostic,
loadersFactory,
} = require('./utils');
const { exitWith } = require('../utils/helpers');
@ -116,6 +117,22 @@ module.exports = async (opts) => {
engine.diagnostics.onDiagnostic(formatDiagnostic('transfer'));
const progress = engine.progress.stream;
const { updateLoader } = loadersFactory();
progress.on(`stage::start`, ({ stage, data }) => {
updateLoader(stage, data).start();
});
progress.on('stage::finish', ({ stage, data }) => {
updateLoader(stage, data).succeed();
});
progress.on('stage::progress', ({ stage, data }) => {
updateLoader(stage, data);
});
let results;
try {
console.log(`Starting transfer...`);

View File

@ -9,6 +9,7 @@ const {
configs: { createOutputFileConfiguration },
createLogger,
} = require('@strapi/logger');
const ora = require('ora');
const { readableBytes, exitWith } = require('../utils/helpers');
const strapi = require('../../index');
const { getParseListWithChoices } = require('../utils/commander');
@ -171,7 +172,46 @@ const formatDiagnostic =
}
};
const loadersFactory = (defaultLoaders = {}) => {
const loaders = defaultLoaders;
const updateLoader = (stage, data) => {
if (!(stage in loaders)) {
createLoader(stage);
}
const stageData = data[stage];
const elapsedTime = stageData?.startTime
? (stageData?.endTime || Date.now()) - stageData.startTime
: 0;
const size = `size: ${readableBytes(stageData?.bytes ?? 0)}`;
const elapsed = `elapsed: ${elapsedTime} ms`;
const speed =
elapsedTime > 0 && `(${readableBytes(((stageData?.bytes ?? 0) * 1000) / elapsedTime)}/s)`;
loaders[stage].text = `${stage}: ${stageData?.count ?? 0} transfered (${size}) (${elapsed}) ${
!stageData?.endTime ? speed : ''
}`;
return loaders[stage];
};
const createLoader = (stage) => {
Object.assign(loaders, { [stage]: ora() });
return loaders[stage];
};
const getLoader = (stage) => {
return loaders[stage];
};
return {
updateLoader,
createLoader,
getLoader,
};
};
module.exports = {
loadersFactory,
buildTransferTable,
getDefaultExportName,
DEFAULT_IGNORED_CONTENT_TYPES,