diff --git a/packages/core/data-transfer/src/engine/diagnostic.ts b/packages/core/data-transfer/src/engine/diagnostic.ts new file mode 100644 index 0000000000..4ea31ceb2d --- /dev/null +++ b/packages/core/data-transfer/src/engine/diagnostic.ts @@ -0,0 +1,119 @@ +import { EventEmitter } from 'events'; + +export interface IDiagnosticReporterOptions { + stackSize?: number; +} + +export type GenericDiagnostic = { + kind: K; + details: { + message: string; + createdAt: Date; + } & T; +}; + +export type DiagnosticKind = 'error' | 'warning' | 'info'; + +export type DiagnosticListener = ( + diagnostic: { kind: T } & Diagnostic extends infer U ? U : 'foo' +) => void | Promise; + +export type DiagnosticEvent = 'diagnostic' | `diagnostic.${DiagnosticKind}`; + +export type GetEventListener = E extends 'diagnostic' + ? DiagnosticListener + : E extends `diagnostic.${infer K}` + ? K extends DiagnosticKind + ? DiagnosticListener + : never + : never; + +export type Diagnostic = ErrorDiagnostic | WarningDiagnostic | InfoDiagnostic; + +export type ErrorDiagnosticSeverity = 'fatal' | 'error' | 'silly'; + +export type ErrorDiagnostic = GenericDiagnostic< + 'error', + { + name: string; + severity: ErrorDiagnosticSeverity; + error: Error; + } +>; + +export type WarningDiagnostic = GenericDiagnostic< + 'warning', + { + origin?: string; + } +>; + +export type InfoDiagnostic = GenericDiagnostic< + 'info', + { + params?: T; + } +>; + +export interface IDiagnosticReporter { + stack: { + readonly size: number; + readonly items: Diagnostic[]; + }; + + report(diagnostic: Diagnostic): IDiagnosticReporter; + onDiagnostic(listener: DiagnosticListener): IDiagnosticReporter; + on(kind: T, listener: DiagnosticListener): IDiagnosticReporter; +} + +const createDiagnosticReporter = ( + options: IDiagnosticReporterOptions = {} +): IDiagnosticReporter => { + const { stackSize = -1 } = options; + + const emitter = new EventEmitter(); + const stack: Diagnostic[] = []; + + const addListener = (event: T, listener: GetEventListener) => { + emitter.on(event, listener); + }; + + return { + stack: { + get size() { + return stack.length; + }, + + get items() { + return stack; + }, + }, + + report(diagnostic: Diagnostic) { + emitter.emit('diagnostic', diagnostic); + emitter.emit(`diagnostic.${diagnostic.kind}`, diagnostic); + + if (stackSize !== -1 && stack.length >= stackSize) { + stack.shift(); + } + + stack.push(diagnostic); + + return this; + }, + + onDiagnostic(listener: DiagnosticListener) { + addListener('diagnostic', listener); + + return this; + }, + + on(kind: T, listener: DiagnosticListener) { + addListener(`diagnostic.${kind}`, listener as never); + + return this; + }, + }; +}; + +export { createDiagnosticReporter }; diff --git a/packages/core/data-transfer/src/engine/errors.ts b/packages/core/data-transfer/src/engine/errors.ts new file mode 100644 index 0000000000..aedd1bfd47 --- /dev/null +++ b/packages/core/data-transfer/src/engine/errors.ts @@ -0,0 +1,33 @@ +import { DataTransferError, Severity, SeverityKind } from '../errors'; + +type TransferEngineStep = 'initialization' | 'validation' | 'transfer'; + +type TransferEngineErrorDetails

= { + step: P; +} & ([U] extends [never] ? unknown : { details?: U }); + +class TransferEngineError< + P extends TransferEngineStep = TransferEngineStep, + U = never, + T extends TransferEngineErrorDetails = TransferEngineErrorDetails +> extends DataTransferError { + constructor(severity: Severity, message?: string, details?: T | null) { + super('engine', severity, message, details); + } +} + +class TransferEngineInitializationError extends TransferEngineError<'initialization'> { + constructor(message?: string) { + super(SeverityKind.FATAL, message, { step: 'initialization' }); + } +} + +class TransferEngineValidationError< + T extends { check: string } = { check: string } +> extends TransferEngineError<'validation', T> { + constructor(message?: string, details?: T) { + super(SeverityKind.FATAL, message, { step: 'validation', details }); + } +} + +export { TransferEngineError, TransferEngineInitializationError, TransferEngineValidationError }; diff --git a/packages/core/data-transfer/src/engine/index.ts b/packages/core/data-transfer/src/engine/index.ts index f26706a4e0..14f15e811d 100644 --- a/packages/core/data-transfer/src/engine/index.ts +++ b/packages/core/data-transfer/src/engine/index.ts @@ -1,6 +1,7 @@ import { PassThrough, Transform, Readable, Writable, Stream } from 'stream'; import { extname } from 'path'; -import { isEmpty, uniq } from 'lodash/fp'; +import { EOL } from 'os'; +import { isEmpty, uniq, last } from 'lodash/fp'; import { diff as semverDiff } from 'semver'; import type { Schema } from '@strapi/strapi'; @@ -16,12 +17,20 @@ import type { ITransferResults, TransferStage, TransferTransform, + IProvider, } from '../../types'; import type { Diff } from '../utils/json'; -import { compareSchemas } from './validation/schemas'; +import { compareSchemas, validateProvider } from './validation'; import { filter, map } from '../utils/stream'; +import { TransferEngineValidationError } from './errors'; +import { + createDiagnosticReporter, + IDiagnosticReporter, + ErrorDiagnosticSeverity, +} from './diagnostic'; + export const TRANSFER_STAGES: ReadonlyArray = Object.freeze([ 'entities', 'links', @@ -53,17 +62,13 @@ class TransferEngine< stream: PassThrough; }; - constructor( - sourceProvider: ISourceProvider, - destinationProvider: IDestinationProvider, - options: ITransferEngineOptions - ) { - if (sourceProvider.type !== 'source') { - throw new Error("SourceProvider does not have type 'source'"); - } - if (destinationProvider.type !== 'destination') { - throw new Error("DestinationProvider does not have type 'destination'"); - } + diagnostics: IDiagnosticReporter; + + constructor(sourceProvider: S, destinationProvider: D, options: ITransferEngineOptions) { + this.diagnostics = createDiagnosticReporter(); + + validateProvider('source', sourceProvider); + this.sourceProvider = sourceProvider; this.destinationProvider = destinationProvider; this.options = options; @@ -71,6 +76,56 @@ class TransferEngine< this.progress = { data: {}, stream: new PassThrough({ objectMode: true }) }; } + /** + * Report a fatal error and throw it + */ + #panic(error: Error) { + this.#reportError(error, 'fatal'); + + throw error; + } + + /** + * Report an error diagnostic + */ + #reportError(error: Error, severity: ErrorDiagnosticSeverity) { + this.diagnostics.report({ + kind: 'error', + details: { + severity, + createdAt: new Date(), + name: error.name, + message: error.message, + error, + }, + }); + } + + /** + * Report a warning diagnostic + */ + #reportWarning(message: string, origin?: string) { + this.diagnostics.report({ + kind: 'warning', + details: { createdAt: new Date(), message, origin }, + }); + } + + /** + * Report an info diagnostic + */ + #reportInfo(message: string, params?: unknown) { + this.diagnostics.report({ + kind: 'info', + details: { createdAt: new Date(), message, params }, + }); + } + + /** + * Create and return a transform stream based on the given stage and options. + * + * Allowed transformations includes 'filter' and 'map'. + */ #createStageTransformStream( key: T, options: { includeGlobal?: boolean } = {} @@ -101,6 +156,11 @@ class TransferEngine< return stream; } + /** + * Update the Engine's transfer progress data for a given stage. + * + * Providing aggregate options enable custom computation to get the size (bytes) or the aggregate key associated with the data + */ #updateTransferProgress( stage: TransferStage, data: T, @@ -142,6 +202,11 @@ class TransferEngine< } } + /** + * Create and return a PassThrough stream. + * + * Upon writing data into it, it'll update the Engine's transfer progress data and trigger stage update events. + */ #progressTracker( stage: TransferStage, aggregate?: { @@ -159,10 +224,16 @@ class TransferEngine< }); } + /** + * Shorthand method used to trigger transfer update events to every listeners + */ #emitTransferUpdate(type: 'init' | 'start' | 'finish' | 'error', payload?: object) { this.progress.stream.emit(`transfer::${type}`, payload); } + /** + * Shorthand method used to trigger stage update events to every listeners + */ #emitStageUpdate(type: 'start' | 'finish' | 'progress' | 'skip', transferStage: TransferStage) { this.progress.stream.emit(`stage::${type}`, { data: this.progress.data, @@ -170,9 +241,25 @@ class TransferEngine< }); } + /** + * Run a version check between two strapi version (source and destination) using the strategy given to the engine during initialization. + * + * If there is a mismatch, throws a validation error. + */ #assertStrapiVersionIntegrity(sourceVersion?: string, destinationVersion?: string) { const strategy = this.options.versionStrategy || DEFAULT_VERSION_STRATEGY; + const reject = () => { + throw new TransferEngineValidationError( + `The source and destination provide are targeting incompatible Strapi versions (using the "${strategy}" strategy). The source (${this.sourceProvider.name}) version is ${sourceVersion} and the destination (${this.destinationProvider.name}) version is ${destinationVersion}`, + { + check: 'strapi.version', + strategy, + versions: { source: sourceVersion, destination: destinationVersion }, + } + ); + }; + if ( !sourceVersion || !destinationVersion || @@ -185,11 +272,10 @@ class TransferEngine< let diff; try { diff = semverDiff(sourceVersion, destinationVersion); - } catch (e: unknown) { - throw new Error( - `Strapi versions doesn't match (${strategy} check): ${sourceVersion} does not match with ${destinationVersion}` - ); + } catch { + reject(); } + if (!diff) { return; } @@ -207,13 +293,17 @@ class TransferEngine< return; } - throw new Error( - `Strapi versions doesn't match (${strategy} check): ${sourceVersion} does not match with ${destinationVersion}` - ); + reject(); } + /** + * Run a check between two set of schemas (source and destination) using the strategy given to the engine during initialization. + * + * If there are differences and/or incompatibilities between source and destination schemas, then throw a validation error. + */ #assertSchemasMatching(sourceSchemas: SchemaMap, destinationSchemas: SchemaMap) { const strategy = this.options.schemaStrategy || DEFAULT_SCHEMA_STRATEGY; + if (strategy === 'ignore') { return; } @@ -232,14 +322,48 @@ class TransferEngine< }); if (!isEmpty(diffs)) { - throw new Error( - `Import process failed because the project doesn't have a matching data structure - ${JSON.stringify(diffs, null, 2)} - ` + const formattedDiffs = Object.entries(diffs) + .map(([uid, ctDiffs]) => { + let msg = `- ${uid}:${EOL}`; + + msg += ctDiffs + .sort((a, b) => (a.kind > b.kind ? -1 : 1)) + .map((diff) => { + if (diff.kind === 'added') { + return `Added "${diff.path}": "${diff.value}" (${diff.type})`; + } + + if (diff.kind === 'deleted') { + return `Removed "${diff.path}"`; + } + + if (diff.kind === 'modified') { + return `Modified "${diff.path}". "${diff.values[0]}" (${diff.types[0]}) => "${diff.values[1]}" (${diff.types[1]})`; + } + + throw new Error(`Invalid diff found for "${uid}"`); + }) + .map((line) => ` - ${line}`) + .join(EOL); + + return msg; + }) + .join(EOL); + + throw new TransferEngineValidationError( + `Invalid schema changes detected during integrity checks (using the ${strategy} strategy). Please find a summary of the changes below:\n${formattedDiffs}`, + { + check: 'schema.changes', + strategy, + diffs, + } ); } } + /** + * Build a run a stage transfer based on the given parameters. + */ async #transferStage(options: { stage: TransferStage; source?: Readable; @@ -251,7 +375,7 @@ class TransferEngine< if (!source || !destination) { // Wait until source and destination are closed - await Promise.allSettled( + const results = await Promise.allSettled( [source, destination].map((stream) => { // if stream is undefined or already closed, resolve immediately if (!stream || stream.destroyed) { @@ -265,6 +389,12 @@ class TransferEngine< }) ); + results.forEach((state) => { + if (state.status === 'rejected') { + this.#reportWarning(state.reason, `transfer(${stage})`); + } + }); + this.#emitStageUpdate('skip', stage); return; @@ -283,7 +413,15 @@ class TransferEngine< stream = stream.pipe(tracker); } - stream.pipe(destination).on('error', reject).on('close', resolve); + stream + .pipe(destination) + .on('error', (e) => { + // TODO ? + // this.#reportError(e, 'error'); + // destination.destroy(e); + reject(e); + }) + .on('close', resolve); }); this.#emitStageUpdate('finish', stage); @@ -302,12 +440,36 @@ class TransferEngine< } } + /** + * Run the bootstrap method in both source and destination providers + */ async bootstrap(): Promise { - await Promise.all([this.sourceProvider.bootstrap?.(), this.destinationProvider.bootstrap?.()]); + const results = await Promise.allSettled([ + this.sourceProvider.bootstrap?.(), + this.destinationProvider.bootstrap?.(), + ]); + + results.forEach((result) => { + if (result.status === 'rejected') { + this.#panic(result.reason); + } + }); } + /** + * Run the close method in both source and destination providers + */ async close(): Promise { - await Promise.all([this.sourceProvider.close?.(), this.destinationProvider.close?.()]); + const results = await Promise.allSettled([ + this.sourceProvider.close?.(), + this.destinationProvider.bootstrap?.(), + ]); + + results.forEach((result) => { + if (result.status === 'rejected') { + this.#panic(result.reason); + } + }); } async #resolveProviderResource() { @@ -323,7 +485,7 @@ class TransferEngine< } } - async integrityCheck(): Promise { + async integrityCheck() { try { const sourceMetadata = await this.sourceProvider.getMetadata(); const destinationMetadata = await this.destinationProvider.getMetadata(); @@ -341,10 +503,12 @@ class TransferEngine< if (sourceSchemas && destinationSchemas) { this.#assertSchemasMatching(sourceSchemas, destinationSchemas); } - - return true; } catch (error) { - return false; + if (error instanceof Error) { + this.#panic(error); + } + + throw error; } } @@ -356,17 +520,13 @@ class TransferEngine< this.#emitTransferUpdate('init'); await this.bootstrap(); await this.init(); - const isValidTransfer = await this.integrityCheck(); - if (!isValidTransfer) { - // TODO: provide the log from the integrity check - throw new Error( - `Unable to transfer the data between ${this.sourceProvider.name} and ${this.destinationProvider.name}.\nPlease refer to the log above for more information.` - ); - } + + await this.integrityCheck(); this.#emitTransferUpdate('start'); await this.beforeTransfer(); + // Run the transfer stages await this.transferSchemas(); await this.transferEntities(); @@ -380,9 +540,20 @@ class TransferEngine< } catch (e: unknown) { this.#emitTransferUpdate('error', { error: e }); + const lastDiagnostic = last(this.diagnostics.stack.items); + + // Do not report an error diagnostic if the last one reported the same error + if ( + e instanceof Error && + (!lastDiagnostic || lastDiagnostic.kind !== 'error' || lastDiagnostic.details.error !== e) + ) { + this.#reportError(e, 'fatal'); + } + // Rollback the destination provider if an exception is thrown during the transfer // Note: This will be configurable in the future await this.destinationProvider.rollback?.(e as Error); + throw e; } @@ -394,8 +565,23 @@ class TransferEngine< } async beforeTransfer(): Promise { - await this.sourceProvider.beforeTransfer?.(); - await this.destinationProvider.beforeTransfer?.(); + const runWithDiagnostic = async (provider: IProvider) => { + try { + await provider.beforeTransfer?.(); + } catch (error) { + // Error happening during the before transfer step should be considered fatal errors + if (error instanceof Error) { + this.#panic(error); + } else { + this.#panic( + new Error(`Unknwon error when executing "beforeTransfer" on the ${origin} provider`) + ); + } + } + }; + + await runWithDiagnostic(this.sourceProvider); + await runWithDiagnostic(this.destinationProvider); } async transferSchemas(): Promise { @@ -443,7 +629,7 @@ class TransferEngine< const transform = this.#createStageTransformStream(stage); const tracker = this.#progressTracker(stage, { size: (value: IAsset) => value.stats.size, - key: (value: IAsset) => extname(value.filename), + key: (value: IAsset) => extname(value.filename) ?? 'NA', }); await this.#transferStage({ stage, source, destination, transform, tracker }); @@ -462,10 +648,7 @@ class TransferEngine< } } -export const createTransferEngine = < - S extends ISourceProvider = ISourceProvider, - D extends IDestinationProvider = IDestinationProvider ->( +export const createTransferEngine = ( sourceProvider: S, destinationProvider: D, options: ITransferEngineOptions diff --git a/packages/core/data-transfer/src/engine/validation/index.ts b/packages/core/data-transfer/src/engine/validation/index.ts index 197f1788ef..9fc174d463 100644 --- a/packages/core/data-transfer/src/engine/validation/index.ts +++ b/packages/core/data-transfer/src/engine/validation/index.ts @@ -1 +1,2 @@ -export * as schemas from './schemas'; +export * from './schemas'; +export * from './provider'; diff --git a/packages/core/data-transfer/src/engine/validation/provider.ts b/packages/core/data-transfer/src/engine/validation/provider.ts new file mode 100644 index 0000000000..b5d95eeece --- /dev/null +++ b/packages/core/data-transfer/src/engine/validation/provider.ts @@ -0,0 +1,27 @@ +import { capitalize } from 'lodash/fp'; + +import type { IDestinationProvider, ISourceProvider, ProviderType } from '../../../types'; +import { TransferEngineValidationError } from '../errors'; + +const reject = (reason: string): never => { + throw new TransferEngineValidationError(`Invalid provider supplied. ${reason}`); +}; + +const validateProvider = ( + type: ProviderType, + provider?: ([T] extends ['source'] ? ISourceProvider : IDestinationProvider) | null +) => { + if (!provider) { + return reject( + `Expected an instance of "${capitalize(type)}Provider", but got "${typeof provider}" instead.` + ); + } + + if (provider.type !== type) { + return reject( + `Expected the provider to be of type "${type}" but got "${provider.type}" instead.` + ); + } +}; + +export { validateProvider }; diff --git a/packages/core/data-transfer/src/errors/base.ts b/packages/core/data-transfer/src/errors/base.ts new file mode 100644 index 0000000000..1d3987a39f --- /dev/null +++ b/packages/core/data-transfer/src/errors/base.ts @@ -0,0 +1,19 @@ +import { Severity } from './constants'; + +class DataTransferError extends Error { + origin: string; + + severity: Severity; + + details: T | null; + + constructor(origin: string, severity: Severity, message?: string, details?: T | null) { + super(message); + + this.origin = origin; + this.severity = severity; + this.details = details ?? null; + } +} + +export { DataTransferError }; diff --git a/packages/core/data-transfer/src/errors/constants.ts b/packages/core/data-transfer/src/errors/constants.ts new file mode 100644 index 0000000000..6c3964cc3f --- /dev/null +++ b/packages/core/data-transfer/src/errors/constants.ts @@ -0,0 +1,6 @@ +export const SeverityKind = { + FATAL: 1, + ERROR: 2, + SILLY: 3, +} as const; +export type Severity = typeof SeverityKind[keyof typeof SeverityKind]; diff --git a/packages/core/data-transfer/src/errors/index.ts b/packages/core/data-transfer/src/errors/index.ts new file mode 100644 index 0000000000..41bf5eaffb --- /dev/null +++ b/packages/core/data-transfer/src/errors/index.ts @@ -0,0 +1,2 @@ +export * from './constants'; +export * from './base'; diff --git a/packages/core/data-transfer/types/transfer-engine.d.ts b/packages/core/data-transfer/types/transfer-engine.d.ts index bbf6d588bc..cec7ae9f00 100644 --- a/packages/core/data-transfer/types/transfer-engine.d.ts +++ b/packages/core/data-transfer/types/transfer-engine.d.ts @@ -1,7 +1,10 @@ -import type { IAsset, IEntity, ILink } from './common-entities'; -import type { ITransferResults, TransferTransform } from './utils'; -import type { ISourceProvider, IDestinationProvider } from './providers'; +import { PassThrough } from 'stream'; import type { Schema } from '@strapi/strapi'; +import type { IAsset, IEntity, ILink } from './common-entities'; +import type { ITransferResults, TransferTransform, TransferProgress } from './utils'; +import type { ISourceProvider, IDestinationProvider } from './providers'; +import type { Severity } from '../src/errors'; +import type { DiagnosticReporter } from '../src/engine/diagnostic'; /** * Defines the capabilities and properties of the transfer engine @@ -22,6 +25,18 @@ export interface ITransferEngine< * The options used to customize the behavio of the transfer engine */ options: ITransferEngineOptions; + /** + * A diagnostic reporter instance used to gather information about + * errors, warnings and information emitted by the engine + */ + diagnostics: DiagnosticReporter; + /** + * Utilities used to retrieve transfer progress data + */ + progress: { + data: TransferProgress; + stream: PassThrough; + }; /** * Runs the integrity check which will make sure it's possible @@ -29,7 +44,7 @@ export interface ITransferEngine< * * Note: It requires to read the content of the source & destination metadata files */ - integrityCheck(): Promise; + integrityCheck(): Promise; /** * Start streaming selected data from the source to the destination diff --git a/packages/core/strapi/lib/commands/transfer/export.js b/packages/core/strapi/lib/commands/transfer/export.js index be5d33eb08..fc5029d0e9 100644 --- a/packages/core/strapi/lib/commands/transfer/export.js +++ b/packages/core/strapi/lib/commands/transfer/export.js @@ -74,6 +74,29 @@ module.exports = async (opts) => { }, }); + engine.diagnostics + .on('error', ({ details }) => { + const { createdAt, severity, name, message } = details; + + logger.error( + chalk.red(`[${createdAt.toISOString()}] [error] (${severity}) ${name}: ${message}`) + ); + }) + .on('info', ({ details }) => { + const { createdAt, message, params } = details; + + const msg = typeof message === 'function' ? message(params) : message; + + logger.info(chalk.blue(`[${createdAt.toISOString()}] [info] ${msg}`)); + }) + .on('warning', ({ details }) => { + const { createdAt, origin, message } = details; + + logger.warn( + chalk.yellow(`[${createdAt.toISOString()}] [warning] (${origin ?? 'transfer'}) ${message}`) + ); + }); + const progress = engine.progress.stream; const getTelemetryPayload = (/* payload */) => { @@ -104,9 +127,9 @@ module.exports = async (opts) => { logger.log(`${chalk.bold('Export process has been completed successfully!')}`); logger.log(`Export archive is in ${chalk.green(outFile)}`); - } catch (e) { + } catch { await strapi.telemetry.send('didDEITSProcessFail', getTelemetryPayload()); - logger.error('Export process failed unexpectedly:', e.toString()); + logger.error('Export process failed'); process.exit(1); } diff --git a/packages/core/strapi/lib/commands/transfer/import.js b/packages/core/strapi/lib/commands/transfer/import.js index 72fd378d79..61e36b228b 100644 --- a/packages/core/strapi/lib/commands/transfer/import.js +++ b/packages/core/strapi/lib/commands/transfer/import.js @@ -14,6 +14,7 @@ const { const { isObject } = require('lodash/fp'); const path = require('path'); +const chalk = require('chalk'); const strapi = require('../../index'); const { buildTransferTable, DEFAULT_IGNORED_CONTENT_TYPES } = require('./utils'); @@ -83,6 +84,29 @@ module.exports = async (opts) => { const engine = createTransferEngine(source, destination, engineOptions); + engine.diagnostics + .on('error', ({ details }) => { + const { createdAt, severity, name, message } = details; + + logger.error( + chalk.red(`[${createdAt.toISOString()}] [error] (${severity}) ${name}: ${message}`) + ); + }) + .on('info', ({ details }) => { + const { createdAt, message, params } = details; + + const msg = typeof message === 'function' ? message(params) : message; + + logger.info(chalk.blue(`[${createdAt.toISOString()}] [info] ${msg}`)); + }) + .on('warning', ({ details }) => { + const { createdAt, origin, message } = details; + + logger.warn( + chalk.yellow(`[${createdAt.toISOString()}] [warning] (${origin ?? 'transfer'}) ${message}`) + ); + }); + const progress = engine.progress.stream; const getTelemetryPayload = () => { return { @@ -106,8 +130,8 @@ module.exports = async (opts) => { logger.info('Import process has been completed successfully!'); } catch (e) { await strapiInstance.telemetry.send('didDEITSProcessFail', getTelemetryPayload()); - logger.error('Import process failed unexpectedly:'); - logger.error(e); + logger.error('Import process failed'); + process.exit(1); }