Merge branch 'data-transfer/pull' of https://github.com/strapi/strapi into data-transfer/pull

This commit is contained in:
Christian Capeans 2023-03-14 09:13:08 +01:00
commit 284cc7512f
13 changed files with 212 additions and 41 deletions

View File

@ -298,7 +298,7 @@ describe('Transfer engine', () => {
versionStrategy: 'exact',
schemaStrategy: 'exact',
exclude: [],
} as ITransferEngineOptions;
} as unknown as ITransferEngineOptions;
let completeSource;
let completeDestination;
@ -490,7 +490,7 @@ describe('Transfer engine', () => {
versionStrategy: 'exact',
schemaStrategy: 'exact',
exclude: [],
} as ITransferEngineOptions;
} as unknown as ITransferEngineOptions;
test('source with source schema missing in destination fails', async () => {
const source = createSource();
source.getSchemas = jest.fn().mockResolvedValue([...schemas, { foo: 'bar' }]);

View File

@ -1,7 +1,7 @@
import { PassThrough, Transform, Readable, Writable, Stream } from 'stream';
import { PassThrough, Transform, Readable, Writable } from 'stream';
import { extname } from 'path';
import { EOL } from 'os';
import { isEmpty, uniq, last } from 'lodash/fp';
import { isEmpty, uniq, last, isNumber } from 'lodash/fp';
import { diff as semverDiff } from 'semver';
import type { Schema } from '@strapi/strapi';
@ -26,7 +26,7 @@ import type { Diff } from '../utils/json';
import { compareSchemas, validateProvider } from './validation';
import { filter, map } from '../utils/stream';
import { TransferEngineValidationError } from './errors';
import { TransferEngineError, TransferEngineValidationError } from './errors';
import {
createDiagnosticReporter,
IDiagnosticReporter,
@ -88,13 +88,19 @@ class TransferEngine<
#metadata: { source?: IMetadata; destination?: IMetadata } = {};
// Progress of the current stage
progress: {
// metrics on the progress such as size and record count
data: TransferProgress;
// stream that emits events
stream: PassThrough;
};
diagnostics: IDiagnosticReporter;
// Save the currently open stream so that we can access it at any time
#currentStream?: Writable;
constructor(sourceProvider: S, destinationProvider: D, options: ITransferEngineOptions) {
this.diagnostics = createDiagnosticReporter();
@ -163,6 +169,7 @@ class TransferEngine<
options: { includeGlobal?: boolean } = {}
): PassThrough | Transform {
const { includeGlobal = true } = options;
const { throttle } = this.options;
const { global: globalTransforms, [key]: stageTransforms } = this.options?.transforms ?? {};
let stream = new PassThrough({ objectMode: true });
@ -183,6 +190,20 @@ class TransferEngine<
applyTransforms(globalTransforms);
}
if (isNumber(throttle) && throttle > 0) {
stream = stream.pipe(
new PassThrough({
objectMode: true,
async transform(data, _encoding, callback) {
await new Promise((resolve) => {
setTimeout(resolve, throttle);
});
callback(null, data);
},
})
);
}
applyTransforms(stageTransforms as TransferTransform<unknown>[]);
return stream;
@ -266,7 +287,10 @@ class TransferEngine<
/**
* Shorthand method used to trigger stage update events to every listeners
*/
#emitStageUpdate(type: 'start' | 'finish' | 'progress' | 'skip', transferStage: TransferStage) {
#emitStageUpdate(
type: 'start' | 'finish' | 'progress' | 'skip' | 'error',
transferStage: TransferStage
) {
this.progress.stream.emit(`stage::${type}`, {
data: this.progress.data,
stage: transferStage,
@ -475,7 +499,7 @@ class TransferEngine<
this.#emitStageUpdate('start', stage);
await new Promise<void>((resolve, reject) => {
let stream: Stream = source;
let stream: Readable = source;
if (transform) {
stream = stream.pipe(transform);
@ -485,15 +509,17 @@ class TransferEngine<
stream = stream.pipe(tracker);
}
stream
this.#currentStream = stream
.pipe(destination)
.on('error', (e) => {
updateEndTime();
this.#emitStageUpdate('error', stage);
this.#reportError(e, 'error');
destination.destroy(e);
reject(e);
})
.on('close', () => {
this.#currentStream = undefined;
updateEndTime();
resolve();
});
@ -502,6 +528,11 @@ class TransferEngine<
this.#emitStageUpdate('finish', stage);
}
// Cause an ongoing transfer to abort gracefully
async abortTransfer(): Promise<void> {
this.#currentStream?.destroy(new TransferEngineError('fatal', 'Transfer aborted.'));
}
async init(): Promise<void> {
// Resolve providers' resource and store
// them in the engine's internal state

View File

@ -7,6 +7,7 @@ import type { IHandlerOptions, TransferMethod } from './types';
import { ProviderTransferError } from '../../../errors/providers';
type WSCallback = (client: WebSocket, request: IncomingMessage) => void;
type BufferLike = Parameters<WebSocket['send']>[0];
const VALID_TRANSFER_COMMANDS = ['init', 'end', 'status'] as const;
@ -85,7 +86,7 @@ export interface Handler {
/**
* It sends a message to the client
*/
send<T = unknown>(message: T, cb?: (err?: Error) => void): void;
send<T extends BufferLike>(message: T, cb?: (err?: Error) => void): void;
/**
* It sends a message to the client and waits for a confirmation
@ -185,7 +186,7 @@ export const handlerFactory =
return;
}
const payload = {
const payload = JSON.stringify({
uuid,
data: data ?? null,
error: e
@ -194,7 +195,7 @@ export const handlerFactory =
message: e?.message,
}
: null,
};
});
this.send(payload, (error) => (error ? reject(error) : resolve()));
});
@ -216,7 +217,7 @@ export const handlerFactory =
return new Promise((resolve, reject) => {
const uuid = randomUUID();
const payload = { uuid, data: message };
const payload = JSON.stringify({ uuid, data: message });
this.send(payload, (error) => {
if (error) {

View File

@ -144,4 +144,7 @@ export interface ITransferEngineOptions {
// List of TransferTransformList preset options to exclude/include
exclude: TransferFilterPreset[];
only: TransferFilterPreset[];
// delay after each record
throttle: number;
}

View File

@ -24,13 +24,10 @@ const { exitWith, ifOptions, assertUrlHasProtocol } = require('../lib/commands/u
const {
excludeOption,
onlyOption,
throttleOption,
validateExcludeOnly,
} = require('../lib/commands/transfer/utils');
process.on('SIGINT', () => {
process.exit();
});
const checkCwdIsStrapiApp = (name) => {
const logErrorAndExit = () => {
console.log(
@ -295,7 +292,16 @@ program
.addOption(forceOption)
.addOption(excludeOption)
.addOption(onlyOption)
.addOption(throttleOption)
.hook('preAction', validateExcludeOnly)
.hook(
'preAction',
ifOptions(
(opts) => !(opts.from || opts.to) || (opts.from && opts.to),
() =>
exitWith(1, 'Exactly one remote source (from) or destination (to) option must be provided')
)
)
// If --from is used, validate the URL and token
.hook(
'preAction',
@ -312,7 +318,7 @@ program
},
]);
if (!answers.fromToken?.length) {
exitWith(1, 'No token entered, aborting transfer.');
exitWith(1, 'No token provided for remote source, aborting transfer.');
}
thisCommand.opts().fromToken = answers.fromToken;
}
@ -335,7 +341,7 @@ program
},
]);
if (!answers.toToken?.length) {
exitWith(1, 'No token entered, aborting transfer.');
exitWith(1, 'No token provided for remote destination, aborting transfer.');
}
thisCommand.opts().toToken = answers.toToken;
}
@ -367,6 +373,7 @@ program
.addOption(new Option('-f, --file <file>', 'name to use for exported file (without extensions)'))
.addOption(excludeOption)
.addOption(onlyOption)
.addOption(throttleOption)
.hook('preAction', validateExcludeOnly)
.hook('preAction', promptEncryptionKey)
.action(getLocalScript('transfer/export'));
@ -389,6 +396,7 @@ program
.addOption(forceOption)
.addOption(excludeOption)
.addOption(onlyOption)
.addOption(throttleOption)
.hook('preAction', validateExcludeOnly)
.hook('preAction', async (thisCommand) => {
const opts = thisCommand.opts();

View File

@ -72,6 +72,7 @@ describe('Export', () => {
},
};
}),
exitMessageText: jest.fn(),
};
jest.mock(
'../../transfer/utils',

View File

@ -78,6 +78,7 @@ describe('Import', () => {
},
};
}),
exitMessageText: jest.fn(),
};
jest.mock(
'../../transfer/utils',

View File

@ -22,6 +22,7 @@ describe('Transfer', () => {
},
};
}),
exitMessageText: jest.fn(),
};
jest.mock(
'../../transfer/utils',
@ -35,6 +36,9 @@ describe('Transfer', () => {
strapi: {
providers: {
createLocalStrapiSourceProvider: jest.fn().mockReturnValue({ name: 'testLocalSource' }),
createLocalStrapiDestinationProvider: jest
.fn()
.mockReturnValue({ name: 'testLocalDestination' }),
createRemoteStrapiDestinationProvider: jest
.fn()
.mockReturnValue({ name: 'testRemoteDest' }),
@ -75,9 +79,11 @@ describe('Transfer', () => {
jest.spyOn(console, 'info').mockImplementation(() => {});
jest.spyOn(console, 'error').mockImplementation(() => {});
const destinationUrl = new URL('http://strapi.com/admin');
const destinationUrl = new URL('http://one.localhost/admin');
const destinationToken = 'test-token';
const sourceUrl = new URL('http://two.localhost/admin');
beforeEach(() => {
jest.clearAllMocks();
});
@ -87,7 +93,19 @@ describe('Transfer', () => {
await transferCommand({ from: undefined, to: undefined });
});
expect(console.error).toHaveBeenCalledWith(expect.stringMatching(/at least one source/i));
expect(console.error).toHaveBeenCalledWith(expect.stringMatching(/one source/i));
expect(
mockDataTransfer.strapi.providers.createRemoteStrapiDestinationProvider
).not.toHaveBeenCalled();
});
it('exits with error when both --to and --from are provided', async () => {
await expectExit(1, async () => {
await transferCommand({ from: sourceUrl, to: destinationUrl });
});
expect(console.error).toHaveBeenCalledWith(expect.stringMatching(/one source/i));
expect(
mockDataTransfer.strapi.providers.createRemoteStrapiDestinationProvider
@ -139,6 +157,8 @@ describe('Transfer', () => {
});
});
it.todo('uses local Strapi destination when to is not specified');
it('uses restore as the default strategy', async () => {
await expectExit(0, async () => {
await transferCommand({ from: undefined, to: destinationUrl, toToken: destinationToken });

View File

@ -22,15 +22,20 @@ const {
createStrapiInstance,
formatDiagnostic,
loadersFactory,
exitMessageText,
abortTransfer,
} = require('./utils');
const { exitWith } = require('../utils/helpers');
/**
* @typedef ExportCommandOptions Options given to the CLI import command
*
* @property {string} [file] The file path to import
* @property {string} [file] The file path to export to
* @property {boolean} [encrypt] Used to encrypt the final archive
* @property {string} [key] Encryption key, only useful when encryption is enabled
* @property {string} [key] Encryption key, used only when encryption is enabled
* @property {boolean} [compress] Used to compress the final archive
* @property {(keyof import('@strapi/data-transfer/src/engine').TransferGroupFilter)[]} [only] If present, only include these filtered groups of data
* @property {(keyof import('@strapi/data-transfer/src/engine').TransferGroupFilter)[]} [exclude] If present, exclude these filtered groups of data
* @property {number|undefined} [throttle] Delay in ms after each record
*/
const BYTES_IN_MB = 1024 * 1024;
@ -58,6 +63,7 @@ module.exports = async (opts) => {
schemaStrategy: 'ignore', // for an export to file, schemaStrategy will always be skipped
exclude: opts.exclude,
only: opts.only,
throttle: opts.throttle,
transforms: {
links: [
{
@ -108,12 +114,19 @@ module.exports = async (opts) => {
progress.on('transfer::start', async () => {
console.log(`Starting export...`);
await strapi.telemetry.send('didDEITSProcessStart', getTelemetryPayload());
});
let results;
let outFile;
try {
// Abort transfer if user interrupts process
['SIGTERM', 'SIGINT', 'SIGQUIT'].forEach((signal) => {
process.removeAllListeners(signal);
process.on(signal, () => abortTransfer({ engine, strapi }));
});
results = await engine.transfer();
outFile = results.destination.file.path;
const outFileExists = await fs.pathExists(outFile);
@ -122,7 +135,7 @@ module.exports = async (opts) => {
}
} catch {
await strapi.telemetry.send('didDEITSProcessFail', getTelemetryPayload());
exitWith(1, 'Export process failed.');
exitWith(1, exitMessageText('export', true));
}
await strapi.telemetry.send('didDEITSProcessFinish', getTelemetryPayload());
@ -133,8 +146,8 @@ module.exports = async (opts) => {
console.error('There was an error displaying the results of the transfer.');
}
console.log(`${chalk.bold('Export process has been completed successfully!')}`);
exitWith(0, `Export archive is in ${chalk.green(outFile)}`);
console.log(`Export archive is in ${chalk.green(outFile)}`);
exitWith(0, exitMessageText('export'));
};
/**

View File

@ -18,13 +18,32 @@ const {
createStrapiInstance,
formatDiagnostic,
loadersFactory,
exitMessageText,
abortTransfer,
} = require('./utils');
const { exitWith } = require('../utils/helpers');
/**
* @typedef {import('@strapi/data-transfer').ILocalFileSourceProviderOptions} ILocalFileSourceProviderOptions
* @typedef {import('@strapi/data-transfer/src/file/providers').ILocalFileSourceProviderOptions} ILocalFileSourceProviderOptions
*/
/**
* @typedef ImportCommandOptions Options given to the CLI import command
*
* @property {string} [file] The file path to import
* @property {string} [key] Encryption key, used when encryption is enabled
* @property {(keyof import('@strapi/data-transfer/src/engine').TransferGroupFilter)[]} [only] If present, only include these filtered groups of data
* @property {(keyof import('@strapi/data-transfer/src/engine').TransferGroupFilter)[]} [exclude] If present, exclude these filtered groups of data
* @property {number|undefined} [throttle] Delay in ms after each record
*/
/**
* Import command.
*
* It transfers data from a file to a local Strapi instance
*
* @param {ImportCommandOptions} opts
*/
module.exports = async (opts) => {
// validate inputs from Commander
if (!isObject(opts)) {
@ -63,6 +82,7 @@ module.exports = async (opts) => {
schemaStrategy: opts.schemaStrategy || DEFAULT_SCHEMA_STRATEGY,
exclude: opts.exclude,
only: opts.only,
throttle: opts.throttle,
rules: {
links: [
{
@ -118,11 +138,16 @@ module.exports = async (opts) => {
let results;
try {
// Abort transfer if user interrupts process
['SIGTERM', 'SIGINT', 'SIGQUIT'].forEach((signal) => {
process.removeAllListeners(signal);
process.on(signal, () => abortTransfer({ engine, strapi }));
});
results = await engine.transfer();
} catch (e) {
await strapiInstance.telemetry.send('didDEITSProcessFail', getTelemetryPayload());
console.error('Import process failed.');
process.exit(1);
exitWith(1, exitMessageText('import', true));
}
try {
@ -135,7 +160,7 @@ module.exports = async (opts) => {
await strapiInstance.telemetry.send('didDEITSProcessFinish', getTelemetryPayload());
await strapiInstance.destroy();
exitWith(0, 'Import process has been completed successfully!');
exitWith(0, exitMessageText('import'));
};
/**

View File

@ -12,7 +12,6 @@ const {
},
} = require('@strapi/data-transfer');
const { isObject } = require('lodash/fp');
const chalk = require('chalk');
const {
buildTransferTable,
@ -20,6 +19,8 @@ const {
DEFAULT_IGNORED_CONTENT_TYPES,
formatDiagnostic,
loadersFactory,
exitMessageText,
abortTransfer,
} = require('./utils');
const { exitWith } = require('../utils/helpers');
@ -30,6 +31,9 @@ const { exitWith } = require('../utils/helpers');
* @property {URL|undefined} [from] The url of a remote Strapi to use as remote source
* @property {string|undefined} [toToken] The transfer token for the remote Strapi destination
* @property {string|undefined} [fromToken] The transfer token for the remote Strapi source
* @property {(keyof import('@strapi/data-transfer/src/engine').TransferGroupFilter)[]} [only] If present, only include these filtered groups of data
* @property {(keyof import('@strapi/data-transfer/src/engine').TransferGroupFilter)[]} [exclude] If present, exclude these filtered groups of data
* @property {number|undefined} [throttle] Delay in ms after each record
*/
/**
@ -45,15 +49,14 @@ module.exports = async (opts) => {
exitWith(1, 'Could not parse command arguments');
}
const strapi = await createStrapiInstance();
if (!(opts.from || opts.to) || (opts.from && opts.to)) {
exitWith(1, 'Exactly one source (from) or destination (to) option must be provided');
}
const strapi = await createStrapiInstance();
let source;
let destination;
if (!opts.from && !opts.to) {
exitWith(1, 'At least one source (from) or destination (to) option must be provided');
}
// if no URL provided, use local Strapi
if (!opts.from) {
source = createLocalStrapiSourceProvider({
@ -62,6 +65,10 @@ module.exports = async (opts) => {
}
// if URL provided, set up a remote source provider
else {
if (!opts.fromToken) {
exitWith(1, 'Missing token for remote destination');
}
source = createRemoteStrapiSourceProvider({
getStrapi: () => strapi,
url: opts.from,
@ -108,6 +115,9 @@ module.exports = async (opts) => {
const engine = createTransferEngine(source, destination, {
versionStrategy: 'exact',
schemaStrategy: 'strict',
exclude: opts.exclude,
only: opts.only,
throttle: opts.throttle,
transforms: {
links: [
{
@ -147,15 +157,26 @@ module.exports = async (opts) => {
updateLoader(stage, data);
});
progress.on('stage::error', ({ stage, data }) => {
updateLoader(stage, data).fail();
});
let results;
try {
console.log(`Starting transfer...`);
// Abort transfer if user interrupts process
['SIGTERM', 'SIGINT', 'SIGQUIT'].forEach((signal) => {
process.removeAllListeners(signal);
process.on(signal, () => abortTransfer({ engine, strapi }));
});
results = await engine.transfer();
} catch (e) {
exitWith(1, 'Transfer process failed.');
exitWith(1, exitMessageText('transfer', true));
}
const table = buildTransferTable(results.engine);
console.log(table.toString());
exitWith(0, `${chalk.bold('Transfer process has been completed successfully!')}`);
exitWith(0, exitMessageText('transfer'));
};

View File

@ -14,7 +14,19 @@ const {
const ora = require('ora');
const { readableBytes, exitWith } = require('../utils/helpers');
const strapi = require('../../index');
const { getParseListWithChoices } = require('../utils/commander');
const { getParseListWithChoices, parseInteger } = require('../utils/commander');
const exitMessageText = (process, error = false) => {
const processCapitalized = process[0].toUpperCase() + process.slice(1);
if (!error) {
return chalk.bold(
chalk.green(`${processCapitalized} process has been completed successfully!`)
);
}
return chalk.bold(chalk.red(`${processCapitalized} process failed.`));
};
const pad = (n) => {
return (n < 10 ? '0' : '') + String(n);
@ -90,12 +102,23 @@ const DEFAULT_IGNORED_CONTENT_TYPES = [
'admin::audit-log',
];
const createStrapiInstance = async (logLevel = 'error') => {
const abortTransfer = async ({ engine, strapi }) => {
try {
await engine.abortTransfer();
await strapi.destroy();
} catch (e) {
// ignore because there's not much else we can do
return false;
}
return true;
};
const createStrapiInstance = async (opts = {}) => {
try {
const appContext = await strapi.compile();
const app = strapi(appContext);
const app = strapi({ ...opts, ...appContext });
app.log.level = logLevel;
app.log.level = opts.logLevel || 'error';
return await app.load();
} catch (err) {
if (err.code === 'ECONNREFUSED') {
@ -107,6 +130,13 @@ const createStrapiInstance = async (logLevel = 'error') => {
const transferDataTypes = Object.keys(TransferGroupPresets);
const throttleOption = new Option(
'--throttle <delay after each entity>',
`Add a delay in milliseconds between each transferred entity`
)
.argParser(parseInteger)
.hideHelp(); // This option is not publicly documented
const excludeOption = new Option(
'--exclude <comma-separated data types>',
`Exclude data using comma-separated types. Available types: ${transferDataTypes.join(',')}`
@ -219,7 +249,10 @@ module.exports = {
DEFAULT_IGNORED_CONTENT_TYPES,
createStrapiInstance,
excludeOption,
exitMessageText,
onlyOption,
throttleOption,
validateExcludeOnly,
formatDiagnostic,
abortTransfer,
};

View File

@ -7,6 +7,7 @@
const inquirer = require('inquirer');
const { InvalidOptionArgumentError, Option } = require('commander');
const { bold, green, cyan } = require('chalk');
const { isNaN } = require('lodash/fp');
const { exitWith } = require('./helpers');
/**
@ -40,6 +41,18 @@ const getParseListWithChoices = (choices, errorMessage = 'Invalid options:') =>
};
};
/**
* argParser: Parse a string as an integer
*/
const parseInteger = (value) => {
// parseInt takes a string and a radix
const parsedValue = parseInt(value, 10);
if (isNaN(parsedValue)) {
throw new InvalidOptionArgumentError(`Not an integer: ${value}`);
}
return parsedValue;
};
/**
* argParser: Parse a string as a URL object
*/
@ -131,6 +144,7 @@ module.exports = {
getParseListWithChoices,
parseList,
parseURL,
parseInteger,
promptEncryptionKey,
confirmMessage,
forceOption,