Merge branch 'features/deits' into deits/restore-mechanism

This commit is contained in:
Bassel 2022-11-18 16:32:30 +02:00
commit 6d2ea2b675
12 changed files with 2400 additions and 2582 deletions

View File

@ -1,17 +1,53 @@
import _ from 'lodash/fp';
import { PassThrough } from 'stream-chain';
import type {
IDestinationProvider,
IMetadata,
ISourceProvider,
ITransferEngine,
ITransferEngineOptions,
ITransferResults,
TransferStage,
} from '../../types';
class TransferEngine implements ITransferEngine {
type TransferProgress = {
[key in TransferStage]?: {
count: number;
bytes: number;
aggregates?: {
[key: string]: {
count: number;
bytes: number;
};
};
};
};
type TransferEngineProgress = {
data: any;
stream: PassThrough;
};
class TransferEngine<
S extends ISourceProvider = ISourceProvider,
D extends IDestinationProvider = IDestinationProvider
> implements ITransferEngine
{
sourceProvider: ISourceProvider;
destinationProvider: IDestinationProvider;
options: ITransferEngineOptions;
#metadata: { source?: IMetadata; destination?: IMetadata } = {};
#transferProgress: TransferProgress = {};
// TODO: Type the stream chunks. Doesn't seem trivial, especially since PassThrough doesn't provide a PassThroughOptions type
#progressStream: PassThrough = new PassThrough({ objectMode: true });
get progress(): TransferEngineProgress {
return {
data: this.#transferProgress,
stream: this.#progressStream,
};
}
constructor(
sourceProvider: ISourceProvider,
destinationProvider: IDestinationProvider,
@ -22,6 +58,45 @@ class TransferEngine implements ITransferEngine {
this.options = options;
}
#increaseTransferProgress(transferStage: TransferStage, data: any, aggregateKey?: string) {
if (!this.#transferProgress[transferStage]) {
this.#transferProgress[transferStage] = { count: 0, bytes: 0 };
}
this.#transferProgress[transferStage]!.count += 1;
const size = JSON.stringify(data).length;
this.#transferProgress[transferStage]!.bytes! += size;
if (aggregateKey && _.has(aggregateKey, data)) {
const aggKeyValue = data[aggregateKey];
if (!_.has('aggregates', this.#transferProgress[transferStage])) {
this.#transferProgress[transferStage]!.aggregates = {};
}
if (!_.has(aggKeyValue, this.#transferProgress[transferStage]!.aggregates)) {
this.#transferProgress[transferStage]!.aggregates![aggKeyValue] = { count: 0, bytes: 0 };
}
this.#transferProgress[transferStage]!.aggregates![aggKeyValue].count += 1;
this.#transferProgress[transferStage]!.aggregates![aggKeyValue].bytes! += size;
}
}
#countRecorder = (transferStage: TransferStage, aggregateKey?: string) => {
return new PassThrough({
objectMode: true,
transform: (data, _encoding, callback) => {
this.#increaseTransferProgress(transferStage, data, aggregateKey);
this.#updateStage('progress', transferStage);
callback(null, data);
},
});
};
#updateStage = (type: 'start' | 'complete' | 'progress', transferStage: TransferStage) => {
this.#progressStream.emit(type, {
data: this.#transferProgress,
stage: transferStage,
});
};
#assertStrapiVersionIntegrity(sourceVersion?: string, destinationVersion?: string) {
const strategy = this.options.versionMatching;
@ -125,7 +200,7 @@ class TransferEngine implements ITransferEngine {
}
}
async transfer(): Promise<void> {
async transfer(): Promise<ITransferResults<S, D>> {
try {
await this.bootstrap();
await this.init();
@ -148,12 +223,17 @@ class TransferEngine implements ITransferEngine {
// Gracefully close the providers
await this.close();
} catch (e) {
console.log('error', e);
} catch (e: any) {
throw e;
// Rollback the destination provider if an exception is thrown during the transfer
// Note: This will be configurable in the future
// await this.destinationProvider?.rollback(e);
}
return {
source: this.sourceProvider.results,
destination: this.destinationProvider.results,
};
}
async beforeTransfer(): Promise<void> {
@ -162,6 +242,7 @@ class TransferEngine implements ITransferEngine {
}
async transferSchemas(): Promise<void> {
const stageName: TransferStage = 'schemas';
const inStream = await this.sourceProvider.streamSchemas?.();
const outStream = await this.destinationProvider.getSchemasStream?.();
@ -173,6 +254,7 @@ class TransferEngine implements ITransferEngine {
throw new Error('Unable to transfer schemas, destination stream is missing');
}
this.#updateStage('start', stageName);
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
@ -182,13 +264,17 @@ class TransferEngine implements ITransferEngine {
// Throw on error in the destination
.on('error', reject)
// Resolve the promise when the destination has finished reading all the data from the source
.on('close', resolve);
.on('close', () => {
this.#updateStage('complete', stageName);
resolve();
});
inStream.pipe(outStream);
inStream.pipe(this.#countRecorder(stageName)).pipe(outStream);
});
}
async transferEntities(): Promise<void> {
const stageName: TransferStage = 'entities';
const inStream = await this.sourceProvider.streamEntities?.();
const outStream = await this.destinationProvider.getEntitiesStream?.();
@ -200,6 +286,8 @@ class TransferEngine implements ITransferEngine {
throw new Error('Unable to transfer entities, destination stream is missing');
}
this.#updateStage('start', stageName);
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
@ -213,13 +301,17 @@ class TransferEngine implements ITransferEngine {
reject(e);
})
// Resolve the promise when the destination has finished reading all the data from the source
.on('close', resolve);
.on('close', () => {
this.#updateStage('complete', stageName);
resolve();
});
inStream.pipe(outStream);
inStream.pipe(this.#countRecorder(stageName, 'type')).pipe(outStream);
});
}
async transferLinks(): Promise<void> {
const stageName: TransferStage = 'links';
const inStream = await this.sourceProvider.streamLinks?.();
const outStream = await this.destinationProvider.getLinksStream?.();
@ -231,6 +323,8 @@ class TransferEngine implements ITransferEngine {
throw new Error('Unable to transfer links, destination stream is missing');
}
this.#updateStage('start', 'links');
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
@ -240,18 +334,29 @@ class TransferEngine implements ITransferEngine {
// Throw on error in the destination
.on('error', reject)
// Resolve the promise when the destination has finished reading all the data from the source
.on('close', resolve);
.on('close', () => {
this.#updateStage('complete', stageName);
resolve();
});
inStream.pipe(outStream);
inStream.pipe(this.#countRecorder(stageName)).pipe(outStream);
});
}
async transferMedia(): Promise<void> {
const stageName: TransferStage = 'media';
this.#updateStage('start', stageName);
console.warn('transferMedia not yet implemented');
return new Promise((resolve) => resolve());
return new Promise((resolve) =>
(() => {
this.#updateStage('complete', stageName);
resolve();
})()
);
}
async transferConfiguration(): Promise<void> {
const stageName: TransferStage = 'configuration';
const inStream = await this.sourceProvider.streamConfiguration?.();
const outStream = await this.destinationProvider.getConfigurationStream?.();
@ -263,6 +368,8 @@ class TransferEngine implements ITransferEngine {
throw new Error('Unable to transfer configuration, destination stream is missing');
}
this.#updateStage('start', stageName);
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
@ -272,17 +379,20 @@ class TransferEngine implements ITransferEngine {
// Throw on error in the destination
.on('error', reject)
// Resolve the promise when the destination has finished reading all the data from the source
.on('close', resolve);
.on('close', () => {
this.#updateStage('complete', stageName);
resolve();
});
inStream.pipe(outStream);
inStream.pipe(this.#countRecorder(stageName)).pipe(outStream);
});
}
}
export const createTransferEngine = <T extends ISourceProvider, U extends IDestinationProvider>(
sourceProvider: T,
destinationProvider: U,
export const createTransferEngine = <S extends ISourceProvider, D extends IDestinationProvider>(
sourceProvider: S,
destinationProvider: D,
options: ITransferEngineOptions
): TransferEngine => {
return new TransferEngine(sourceProvider, destinationProvider, options);
): TransferEngine<S, D> => {
return new TransferEngine<S, D>(sourceProvider, destinationProvider, options);
};

View File

@ -1,11 +1,16 @@
import fs from 'fs';
import fs from 'fs-extra';
import path from 'path';
import zip from 'zlib';
import { Writable, Readable } from 'stream';
import { chain } from 'stream-chain';
import { stringer } from 'stream-json/jsonl/Stringer';
import type { IDestinationProvider, IMetadata, ProviderType } from '../../types';
import {
IDestinationProvider,
IDestinationProviderTransferResults,
ProviderType,
IMetadata,
} from '../../types';
import { createEncryptionCipher } from '../encryption/encrypt';
export interface ILocalFileDestinationProviderOptions {
@ -33,6 +38,13 @@ export interface ILocalFileDestinationProviderOptions {
};
}
export interface ILocalFileDestinationProviderTransferResults
extends IDestinationProviderTransferResults {
file?: {
path?: string;
};
}
export const createLocalFileDestinationProvider = (
options: ILocalFileDestinationProviderOptions
) => {
@ -43,6 +55,7 @@ class LocalFileDestinationProvider implements IDestinationProvider {
name: string = 'destination::local-file';
type: ProviderType = 'destination';
options: ILocalFileDestinationProviderOptions;
results: ILocalFileDestinationProviderTransferResults = {};
#providersMetadata: { source?: IMetadata; destination?: IMetadata } = {};
constructor(options: ILocalFileDestinationProviderOptions) {
@ -85,10 +98,10 @@ class LocalFileDestinationProvider implements IDestinationProvider {
bootstrap(): void | Promise<void> {
const rootDir = this.options.file.path;
const dirExists = fs.existsSync(rootDir);
const dirExists = fs.pathExistsSync(rootDir);
if (dirExists) {
fs.rmSync(rootDir, { force: true, recursive: true });
throw new Error('File with that name already exists');
}
if (this.options.encryption.enabled) {
@ -103,10 +116,13 @@ class LocalFileDestinationProvider implements IDestinationProvider {
fs.mkdirSync(path.join(rootDir, 'links'));
fs.mkdirSync(path.join(rootDir, 'media'));
fs.mkdirSync(path.join(rootDir, 'configuration'));
this.results.file = { path: this.options.file.path };
}
async close(): Promise<void> {
await this.#writeMetadata();
this.results.file = { path: this.options.file.path };
}
rollback(): void {

View File

@ -1,7 +1,7 @@
import type { IMedia, IMetadata, ISourceProvider, ProviderType } from '../../../types';
import { chain } from 'stream-chain';
import { Readable } from 'stream';
import type { IMetadata, ISourceProvider, ProviderType } from '../../../types';
import { createEntitiesStream, createEntitiesTransformStream } from './entities';
import { createLinksStream } from './links';
import { createConfigurationStream } from './configuration';
@ -67,6 +67,7 @@ class LocalStrapiSourceProvider implements ISourceProvider {
return chain([
// Entities stream
createEntitiesStream(this.strapi),
// Transform stream
createEntitiesTransformStream(),
]);
@ -93,10 +94,17 @@ class LocalStrapiSourceProvider implements ISourceProvider {
throw new Error('Not able to get Schemas. Strapi instance not found');
}
return [...Object.values(this.strapi.contentTypes), ...Object.values(this.strapi.components)];
const schemas = [
...Object.values(this.strapi.contentTypes),
...Object.values(this.strapi.components),
];
return schemas;
}
streamSchemas(): NodeJS.ReadableStream {
return Readable.from(this.getSchemas());
}
}
export type ILocalStrapiSourceProvider = InstanceType<typeof LocalStrapiSourceProvider>;

View File

@ -40,6 +40,7 @@
"@strapi/logger": "4.5.0",
"@strapi/strapi": "4.5.0",
"chalk": "4.1.2",
"fs-extra": "10.0.0",
"lodash": "4.17.21",
"prettier": "2.7.1",
"stream-chain": "2.2.5",
@ -48,6 +49,7 @@
},
"devDependencies": {
"@tsconfig/node16": "1.0.3",
"@types/fs-extra": "9.0.13",
"@types/jest": "29.2.0",
"@types/stream-chain": "2.0.1",
"@types/stream-json": "1.7.2",

View File

@ -1,4 +1,9 @@
import { Stream } from './utils';
import {
IDestinationProviderTransferResults,
IProviderTransferResults,
ISourceProviderTransferResults,
Stream,
} from './utils';
import { IMetadata } from './common-entities';
import { PipelineSource, PipelineDestination } from 'stream';
@ -7,6 +12,7 @@ type ProviderType = 'source' | 'destination';
interface IProvider {
type: ProviderType;
name: string;
results?: IProviderTransferResults;
bootstrap?(): Promise<void> | void;
getSchemas?(): any;
@ -16,6 +22,8 @@ interface IProvider {
}
export interface ISourceProvider extends IProvider {
results?: ISourceProviderTransferResults;
// Getters for the source's transfer streams
streamEntities?(): NodeJS.ReadableStream | Promise<NodeJS.ReadableStream>;
streamLinks?(): NodeJS.ReadableStream | Promise<NodeJS.ReadableStream>;
@ -25,6 +33,7 @@ export interface ISourceProvider extends IProvider {
}
export interface IDestinationProvider extends IProvider {
results?: IDestinationProviderTransferResults;
#providersMetadata?: { source?: IMetadata; destination?: IMetadata };
/**

View File

@ -6,7 +6,10 @@ import { ISourceProvider, IDestinationProvider } from './provider';
/**
* Defines the capabilities and properties of the transfer engine
*/
export interface ITransferEngine {
export interface ITransferEngine<
S extends ISourceProvider = ISourceProvider,
D extends IDestinationProvider = IDestinationProvider
> {
/**
* Provider used as a source which that will stream its data to the transfer engine
*/
@ -31,7 +34,7 @@ export interface ITransferEngine {
/**
* Start streaming selected data from the source to the destination
*/
transfer(): Promise<void>;
transfer(): Promise<ITransferResults<S, D>>;
/**
* Run the bootstrap lifecycle method of each provider

View File

@ -1,4 +1,5 @@
import { Readable, Writable, Duplex, Transform } from 'stream';
import type { Readable, Writable, Duplex, Transform } from 'stream';
import type { IDestinationProvider, ISourceProvider } from './providers';
/**
* Default signature for transfer rules' filter methods
@ -25,3 +26,15 @@ export interface ITransferRule<
export type TransformFunction = (chunk: any, encoding?: string) => any;
export type StreamItem = Stream | TransformFunction;
type Stream = Readable | Writable | Duplex | Transform;
export type TransferStage = 'entities' | 'links' | 'media' | 'schemas' | 'configuration';
export interface ITransferResults<S extends ISourceProvider, D extends IDestinationProvider> {
source?: S['results'];
destination?: D['results'];
}
// There aren't currently any universal results provided but there likely will be in the future, so providers that have their own results should extend from these to be safe
export type IProviderTransferResults = {};
export type ISourceProviderTransferResults = {};
export type IDestinationProviderTransferResults = {};

View File

@ -272,24 +272,32 @@ program
.command('export')
.description('Export data from Strapi to file')
.addOption(
new Option('--encrypt [boolean]', `Encrypt output file using the 'aes-128-ecb' algorithm`)
new Option(
'--encrypt <boolean>',
`Encrypt output file using the 'aes-128-ecb' algorithm. Prompts for key unless key option is used.`
)
.default(true)
.argParser(parseInputBool)
)
.addOption(
new Option('--compress [boolean]', 'Compress output file using gzip compression')
new Option('--compress <boolean>', 'Compress output file using gzip compression')
.default(true)
.argParser(parseInputBool)
)
.addOption(
new Option(
'--archive [boolean]',
'--archive <boolean>',
'Export all backup files into a single tar archive instead of a folder'
)
.default(true)
.argParser(parseInputBool)
)
.addOption(new Option('--key', 'Provide encryption key in command instead of using a prompt'))
.addOption(
new Option(
'--key <encryption key>',
'Provide encryption key directly instead of being prompted'
)
)
.addOption(
new Option('--max-size <max MB per file>', 'split final file when exceeding size in MB')
)

View File

@ -8,10 +8,33 @@ const {
// eslint-disable-next-line import/no-unresolved, node/no-missing-require
} = require('@strapi/data-transfer');
const _ = require('lodash/fp');
const Table = require('cli-table3');
const fs = require('fs-extra');
const chalk = require('chalk');
const strapi = require('../../Strapi');
const { readableBytes } = require('../utils');
const getDefaultExportBackupName = () => `strapi-backup`;
const pad = (n) => {
return (n < 10 ? '0' : '') + String(n);
};
const yyyymmddHHMMSS = () => {
const date = new Date();
return (
date.getFullYear() +
pad(date.getMonth() + 1) +
pad(date.getDate()) +
pad(date.getHours()) +
pad(date.getMinutes()) +
pad(date.getSeconds())
);
};
const getDefaultExportName = () => {
return `export_${yyyymmddHHMMSS()}`;
};
const logger = console;
@ -33,13 +56,15 @@ module.exports = async (filename, opts) => {
};
const source = createLocalStrapiSourceProvider(sourceOptions);
const file = _.isString(filename) && filename.length > 0 ? filename : getDefaultExportName();
/**
* To a Strapi backup file
*/
// treat any unknown arguments as filenames
const destinationOptions = {
file: {
path: _.isString(filename) && filename.length > 0 ? filename : getDefaultExportBackupName(),
path: file,
maxSize: _.isFinite(opts.maxSize) ? Math.floor(opts.maxSize) * BYTES_IN_MB : undefined,
maxSizeJsonl: _.isFinite(opts.maxSizeJsonl)
? Math.floor(opts.maxSizeJsonl) * BYTES_IN_MB
@ -69,12 +94,71 @@ module.exports = async (filename, opts) => {
const engine = createTransferEngine(source, destination, engineOptions);
try {
const result = await engine.transfer();
if (!result?.destination?.path) throw new Error('Export file not created');
logger.log(
'Export process has been completed successfully! Export archive is in %s',
result.destination.path
);
let resultData = [];
logger.log(`Starting export...`);
engine.progress.stream.on('start', ({ stage }) => {
logger.log(`Starting transfer of ${stage}...`);
});
// engine.progress.stream..on('progress', ({ stage, data }) => {
// logger.log('progress');
// });
engine.progress.stream.on('complete', ({ stage, data }) => {
logger.log(`...${stage} complete`);
resultData = data;
});
const results = await engine.transfer();
// Build pretty table
const table = new Table({
head: ['Type', 'Count', 'Size'],
});
let totalBytes = 0;
let totalItems = 0;
Object.keys(resultData).forEach((key) => {
const item = resultData[key];
table.push([
{ hAlign: 'left', content: chalk.bold(key) },
{ hAlign: 'right', content: item.count },
{ hAlign: 'right', content: `${readableBytes(item.bytes, 1, 11)} ` },
]);
totalBytes += item.bytes;
totalItems += item.count;
if (item.aggregates) {
Object.keys(item.aggregates).forEach((subkey) => {
const subitem = item.aggregates[subkey];
table.push([
{ hAlign: 'left', content: `-- ${chalk.bold(subkey)}` },
{ hAlign: 'right', content: subitem.count },
{ hAlign: 'right', content: `(${chalk.grey(readableBytes(subitem.bytes, 1, 11))})` },
]);
});
}
});
table.push([
{ hAlign: 'left', content: chalk.bold.green('Total') },
{ hAlign: 'right', content: chalk.bold.green(totalItems) },
{ hAlign: 'right', content: `${chalk.bold.green(readableBytes(totalBytes, 1, 11))} ` },
]);
logger.log(table.toString());
// TODO: once archiving is implemented, we need to check file extensions
if (!fs.pathExistsSync(file)) {
logger.log(file);
throw new Error('Export file not created');
}
logger.log(`
${chalk.bold('Export process has been completed successfully!')}
Export archive is in ${chalk.green(results.destination.file.path)}
`);
process.exit(0);
} catch (e) {
logger.error('Export process failed unexpectedly:', e.toString());

View File

@ -28,8 +28,13 @@ const parseInputList = (value) => {
const promptEncryptionKey = async (thisCommand) => {
const opts = thisCommand.opts();
if (!opts.encrypt && opts.key) {
console.error('Key may not be present unless encryption is used');
process.exit(1);
}
// if encrypt is set but we have no key, prompt for it
if (opts.encrypt && !opts.key) {
if (opts.encrypt && !(opts.key && opts.key.length > 0)) {
try {
const answers = await inquirer.prompt([
{
@ -46,12 +51,10 @@ const promptEncryptionKey = async (thisCommand) => {
opts.key = answers.key;
} catch (e) {
console.error('Failed to get encryption key');
console.error('Export process failed unexpectedly');
process.exit(1);
}
if (!opts.key) {
console.error('Failed to get encryption key');
console.error('Export process failed unexpectedly');
process.exit(1);
}
}

View File

@ -0,0 +1,20 @@
'use strict';
const bytesPerKb = 1024;
const sizes = ['B ', 'KB', 'MB', 'GB', 'TB', 'PB'];
const readableBytes = (bytes, decimals = 1, padStart = 0) => {
if (!bytes) {
return '0';
}
const i = Math.floor(Math.log(bytes) / Math.log(bytesPerKb));
const result = `${parseFloat((bytes / bytesPerKb ** i).toFixed(decimals))} ${sizes[i].padStart(
2
)}`;
return result.padStart(padStart);
};
module.exports = {
readableBytes,
};

4618
yarn.lock

File diff suppressed because it is too large Load Diff