Init diagnostics

This commit is contained in:
Convly 2023-01-13 16:43:20 +01:00
parent 544fb057c2
commit ec41c1ce38
11 changed files with 507 additions and 55 deletions

View File

@ -0,0 +1,119 @@
import { EventEmitter } from 'events';
export interface IDiagnosticReporterOptions {
stackSize?: number;
}
export type GenericDiagnostic<K extends DiagnosticKind, T = unknown> = {
kind: K;
details: {
message: string;
createdAt: Date;
} & T;
};
export type DiagnosticKind = 'error' | 'warning' | 'info';
export type DiagnosticListener<T extends DiagnosticKind = DiagnosticKind> = (
diagnostic: { kind: T } & Diagnostic extends infer U ? U : 'foo'
) => void | Promise<void>;
export type DiagnosticEvent = 'diagnostic' | `diagnostic.${DiagnosticKind}`;
export type GetEventListener<E extends DiagnosticEvent> = E extends 'diagnostic'
? DiagnosticListener
: E extends `diagnostic.${infer K}`
? K extends DiagnosticKind
? DiagnosticListener<K>
: never
: never;
export type Diagnostic = ErrorDiagnostic | WarningDiagnostic | InfoDiagnostic;
export type ErrorDiagnosticSeverity = 'fatal' | 'error' | 'silly';
export type ErrorDiagnostic = GenericDiagnostic<
'error',
{
name: string;
severity: ErrorDiagnosticSeverity;
error: Error;
}
>;
export type WarningDiagnostic = GenericDiagnostic<
'warning',
{
origin?: string;
}
>;
export type InfoDiagnostic<T = unknown> = GenericDiagnostic<
'info',
{
params?: T;
}
>;
export interface IDiagnosticReporter {
stack: {
readonly size: number;
readonly items: Diagnostic[];
};
report(diagnostic: Diagnostic): IDiagnosticReporter;
onDiagnostic(listener: DiagnosticListener): IDiagnosticReporter;
on<T extends DiagnosticKind>(kind: T, listener: DiagnosticListener<T>): IDiagnosticReporter;
}
const createDiagnosticReporter = (
options: IDiagnosticReporterOptions = {}
): IDiagnosticReporter => {
const { stackSize = -1 } = options;
const emitter = new EventEmitter();
const stack: Diagnostic[] = [];
const addListener = <T extends DiagnosticEvent>(event: T, listener: GetEventListener<T>) => {
emitter.on(event, listener);
};
return {
stack: {
get size() {
return stack.length;
},
get items() {
return stack;
},
},
report(diagnostic: Diagnostic) {
emitter.emit('diagnostic', diagnostic);
emitter.emit(`diagnostic.${diagnostic.kind}`, diagnostic);
if (stackSize !== -1 && stack.length >= stackSize) {
stack.shift();
}
stack.push(diagnostic);
return this;
},
onDiagnostic(listener: DiagnosticListener) {
addListener('diagnostic', listener);
return this;
},
on<T extends DiagnosticKind>(kind: T, listener: DiagnosticListener<T>) {
addListener(`diagnostic.${kind}`, listener as never);
return this;
},
};
};
export { createDiagnosticReporter };

View File

@ -0,0 +1,33 @@
import { DataTransferError, Severity, SeverityKind } from '../errors';
type TransferEngineStep = 'initialization' | 'validation' | 'transfer';
type TransferEngineErrorDetails<P extends TransferEngineStep = TransferEngineStep, U = never> = {
step: P;
} & ([U] extends [never] ? unknown : { details?: U });
class TransferEngineError<
P extends TransferEngineStep = TransferEngineStep,
U = never,
T extends TransferEngineErrorDetails<P, U> = TransferEngineErrorDetails<P, U>
> extends DataTransferError<T> {
constructor(severity: Severity, message?: string, details?: T | null) {
super('engine', severity, message, details);
}
}
class TransferEngineInitializationError extends TransferEngineError<'initialization'> {
constructor(message?: string) {
super(SeverityKind.FATAL, message, { step: 'initialization' });
}
}
class TransferEngineValidationError<
T extends { check: string } = { check: string }
> extends TransferEngineError<'validation', T> {
constructor(message?: string, details?: T) {
super(SeverityKind.FATAL, message, { step: 'validation', details });
}
}
export { TransferEngineError, TransferEngineInitializationError, TransferEngineValidationError };

View File

@ -1,6 +1,7 @@
import { PassThrough, Transform, Readable, Writable, Stream } from 'stream';
import { extname } from 'path';
import { isEmpty, uniq } from 'lodash/fp';
import { EOL } from 'os';
import { isEmpty, uniq, last } from 'lodash/fp';
import { diff as semverDiff } from 'semver';
import type { Schema } from '@strapi/strapi';
@ -16,12 +17,20 @@ import type {
ITransferResults,
TransferStage,
TransferTransform,
IProvider,
} from '../../types';
import type { Diff } from '../utils/json';
import { compareSchemas } from './validation/schemas';
import { compareSchemas, validateProvider } from './validation';
import { filter, map } from '../utils/stream';
import { TransferEngineValidationError } from './errors';
import {
createDiagnosticReporter,
IDiagnosticReporter,
ErrorDiagnosticSeverity,
} from './diagnostic';
export const TRANSFER_STAGES: ReadonlyArray<TransferStage> = Object.freeze([
'entities',
'links',
@ -53,17 +62,13 @@ class TransferEngine<
stream: PassThrough;
};
constructor(
sourceProvider: ISourceProvider,
destinationProvider: IDestinationProvider,
options: ITransferEngineOptions
) {
if (sourceProvider.type !== 'source') {
throw new Error("SourceProvider does not have type 'source'");
}
if (destinationProvider.type !== 'destination') {
throw new Error("DestinationProvider does not have type 'destination'");
}
diagnostics: IDiagnosticReporter;
constructor(sourceProvider: S, destinationProvider: D, options: ITransferEngineOptions) {
this.diagnostics = createDiagnosticReporter();
validateProvider('source', sourceProvider);
this.sourceProvider = sourceProvider;
this.destinationProvider = destinationProvider;
this.options = options;
@ -71,6 +76,56 @@ class TransferEngine<
this.progress = { data: {}, stream: new PassThrough({ objectMode: true }) };
}
/**
* Report a fatal error and throw it
*/
#panic(error: Error) {
this.#reportError(error, 'fatal');
throw error;
}
/**
* Report an error diagnostic
*/
#reportError(error: Error, severity: ErrorDiagnosticSeverity) {
this.diagnostics.report({
kind: 'error',
details: {
severity,
createdAt: new Date(),
name: error.name,
message: error.message,
error,
},
});
}
/**
* Report a warning diagnostic
*/
#reportWarning(message: string, origin?: string) {
this.diagnostics.report({
kind: 'warning',
details: { createdAt: new Date(), message, origin },
});
}
/**
* Report an info diagnostic
*/
#reportInfo(message: string, params?: unknown) {
this.diagnostics.report({
kind: 'info',
details: { createdAt: new Date(), message, params },
});
}
/**
* Create and return a transform stream based on the given stage and options.
*
* Allowed transformations includes 'filter' and 'map'.
*/
#createStageTransformStream<T extends TransferStage>(
key: T,
options: { includeGlobal?: boolean } = {}
@ -101,6 +156,11 @@ class TransferEngine<
return stream;
}
/**
* Update the Engine's transfer progress data for a given stage.
*
* Providing aggregate options enable custom computation to get the size (bytes) or the aggregate key associated with the data
*/
#updateTransferProgress<T = unknown>(
stage: TransferStage,
data: T,
@ -142,6 +202,11 @@ class TransferEngine<
}
}
/**
* Create and return a PassThrough stream.
*
* Upon writing data into it, it'll update the Engine's transfer progress data and trigger stage update events.
*/
#progressTracker(
stage: TransferStage,
aggregate?: {
@ -159,10 +224,16 @@ class TransferEngine<
});
}
/**
* Shorthand method used to trigger transfer update events to every listeners
*/
#emitTransferUpdate(type: 'init' | 'start' | 'finish' | 'error', payload?: object) {
this.progress.stream.emit(`transfer::${type}`, payload);
}
/**
* Shorthand method used to trigger stage update events to every listeners
*/
#emitStageUpdate(type: 'start' | 'finish' | 'progress' | 'skip', transferStage: TransferStage) {
this.progress.stream.emit(`stage::${type}`, {
data: this.progress.data,
@ -170,9 +241,25 @@ class TransferEngine<
});
}
/**
* Run a version check between two strapi version (source and destination) using the strategy given to the engine during initialization.
*
* If there is a mismatch, throws a validation error.
*/
#assertStrapiVersionIntegrity(sourceVersion?: string, destinationVersion?: string) {
const strategy = this.options.versionStrategy || DEFAULT_VERSION_STRATEGY;
const reject = () => {
throw new TransferEngineValidationError(
`The source and destination provide are targeting incompatible Strapi versions (using the "${strategy}" strategy). The source (${this.sourceProvider.name}) version is ${sourceVersion} and the destination (${this.destinationProvider.name}) version is ${destinationVersion}`,
{
check: 'strapi.version',
strategy,
versions: { source: sourceVersion, destination: destinationVersion },
}
);
};
if (
!sourceVersion ||
!destinationVersion ||
@ -185,11 +272,10 @@ class TransferEngine<
let diff;
try {
diff = semverDiff(sourceVersion, destinationVersion);
} catch (e: unknown) {
throw new Error(
`Strapi versions doesn't match (${strategy} check): ${sourceVersion} does not match with ${destinationVersion}`
);
} catch {
reject();
}
if (!diff) {
return;
}
@ -207,13 +293,17 @@ class TransferEngine<
return;
}
throw new Error(
`Strapi versions doesn't match (${strategy} check): ${sourceVersion} does not match with ${destinationVersion}`
);
reject();
}
/**
* Run a check between two set of schemas (source and destination) using the strategy given to the engine during initialization.
*
* If there are differences and/or incompatibilities between source and destination schemas, then throw a validation error.
*/
#assertSchemasMatching(sourceSchemas: SchemaMap, destinationSchemas: SchemaMap) {
const strategy = this.options.schemaStrategy || DEFAULT_SCHEMA_STRATEGY;
if (strategy === 'ignore') {
return;
}
@ -232,14 +322,48 @@ class TransferEngine<
});
if (!isEmpty(diffs)) {
throw new Error(
`Import process failed because the project doesn't have a matching data structure
${JSON.stringify(diffs, null, 2)}
`
const formattedDiffs = Object.entries(diffs)
.map(([uid, ctDiffs]) => {
let msg = `- ${uid}:${EOL}`;
msg += ctDiffs
.sort((a, b) => (a.kind > b.kind ? -1 : 1))
.map((diff) => {
if (diff.kind === 'added') {
return `Added "${diff.path}": "${diff.value}" (${diff.type})`;
}
if (diff.kind === 'deleted') {
return `Removed "${diff.path}"`;
}
if (diff.kind === 'modified') {
return `Modified "${diff.path}". "${diff.values[0]}" (${diff.types[0]}) => "${diff.values[1]}" (${diff.types[1]})`;
}
throw new Error(`Invalid diff found for "${uid}"`);
})
.map((line) => ` - ${line}`)
.join(EOL);
return msg;
})
.join(EOL);
throw new TransferEngineValidationError(
`Invalid schema changes detected during integrity checks (using the ${strategy} strategy). Please find a summary of the changes below:\n${formattedDiffs}`,
{
check: 'schema.changes',
strategy,
diffs,
}
);
}
}
/**
* Build a run a stage transfer based on the given parameters.
*/
async #transferStage(options: {
stage: TransferStage;
source?: Readable;
@ -251,7 +375,7 @@ class TransferEngine<
if (!source || !destination) {
// Wait until source and destination are closed
await Promise.allSettled(
const results = await Promise.allSettled(
[source, destination].map((stream) => {
// if stream is undefined or already closed, resolve immediately
if (!stream || stream.destroyed) {
@ -265,6 +389,12 @@ class TransferEngine<
})
);
results.forEach((state) => {
if (state.status === 'rejected') {
this.#reportWarning(state.reason, `transfer(${stage})`);
}
});
this.#emitStageUpdate('skip', stage);
return;
@ -283,7 +413,15 @@ class TransferEngine<
stream = stream.pipe(tracker);
}
stream.pipe(destination).on('error', reject).on('close', resolve);
stream
.pipe(destination)
.on('error', (e) => {
// TODO ?
// this.#reportError(e, 'error');
// destination.destroy(e);
reject(e);
})
.on('close', resolve);
});
this.#emitStageUpdate('finish', stage);
@ -302,12 +440,36 @@ class TransferEngine<
}
}
/**
* Run the bootstrap method in both source and destination providers
*/
async bootstrap(): Promise<void> {
await Promise.all([this.sourceProvider.bootstrap?.(), this.destinationProvider.bootstrap?.()]);
const results = await Promise.allSettled([
this.sourceProvider.bootstrap?.(),
this.destinationProvider.bootstrap?.(),
]);
results.forEach((result) => {
if (result.status === 'rejected') {
this.#panic(result.reason);
}
});
}
/**
* Run the close method in both source and destination providers
*/
async close(): Promise<void> {
await Promise.all([this.sourceProvider.close?.(), this.destinationProvider.close?.()]);
const results = await Promise.allSettled([
this.sourceProvider.close?.(),
this.destinationProvider.bootstrap?.(),
]);
results.forEach((result) => {
if (result.status === 'rejected') {
this.#panic(result.reason);
}
});
}
async #resolveProviderResource() {
@ -323,7 +485,7 @@ class TransferEngine<
}
}
async integrityCheck(): Promise<boolean> {
async integrityCheck() {
try {
const sourceMetadata = await this.sourceProvider.getMetadata();
const destinationMetadata = await this.destinationProvider.getMetadata();
@ -341,10 +503,12 @@ class TransferEngine<
if (sourceSchemas && destinationSchemas) {
this.#assertSchemasMatching(sourceSchemas, destinationSchemas);
}
return true;
} catch (error) {
return false;
if (error instanceof Error) {
this.#panic(error);
}
throw error;
}
}
@ -356,17 +520,13 @@ class TransferEngine<
this.#emitTransferUpdate('init');
await this.bootstrap();
await this.init();
const isValidTransfer = await this.integrityCheck();
if (!isValidTransfer) {
// TODO: provide the log from the integrity check
throw new Error(
`Unable to transfer the data between ${this.sourceProvider.name} and ${this.destinationProvider.name}.\nPlease refer to the log above for more information.`
);
}
await this.integrityCheck();
this.#emitTransferUpdate('start');
await this.beforeTransfer();
// Run the transfer stages
await this.transferSchemas();
await this.transferEntities();
@ -380,9 +540,20 @@ class TransferEngine<
} catch (e: unknown) {
this.#emitTransferUpdate('error', { error: e });
const lastDiagnostic = last(this.diagnostics.stack.items);
// Do not report an error diagnostic if the last one reported the same error
if (
e instanceof Error &&
(!lastDiagnostic || lastDiagnostic.kind !== 'error' || lastDiagnostic.details.error !== e)
) {
this.#reportError(e, 'fatal');
}
// Rollback the destination provider if an exception is thrown during the transfer
// Note: This will be configurable in the future
await this.destinationProvider.rollback?.(e as Error);
throw e;
}
@ -394,8 +565,23 @@ class TransferEngine<
}
async beforeTransfer(): Promise<void> {
await this.sourceProvider.beforeTransfer?.();
await this.destinationProvider.beforeTransfer?.();
const runWithDiagnostic = async (provider: IProvider) => {
try {
await provider.beforeTransfer?.();
} catch (error) {
// Error happening during the before transfer step should be considered fatal errors
if (error instanceof Error) {
this.#panic(error);
} else {
this.#panic(
new Error(`Unknwon error when executing "beforeTransfer" on the ${origin} provider`)
);
}
}
};
await runWithDiagnostic(this.sourceProvider);
await runWithDiagnostic(this.destinationProvider);
}
async transferSchemas(): Promise<void> {
@ -443,7 +629,7 @@ class TransferEngine<
const transform = this.#createStageTransformStream(stage);
const tracker = this.#progressTracker(stage, {
size: (value: IAsset) => value.stats.size,
key: (value: IAsset) => extname(value.filename),
key: (value: IAsset) => extname(value.filename) ?? 'NA',
});
await this.#transferStage({ stage, source, destination, transform, tracker });
@ -462,10 +648,7 @@ class TransferEngine<
}
}
export const createTransferEngine = <
S extends ISourceProvider = ISourceProvider,
D extends IDestinationProvider = IDestinationProvider
>(
export const createTransferEngine = <S extends ISourceProvider, D extends IDestinationProvider>(
sourceProvider: S,
destinationProvider: D,
options: ITransferEngineOptions

View File

@ -1 +1,2 @@
export * as schemas from './schemas';
export * from './schemas';
export * from './provider';

View File

@ -0,0 +1,27 @@
import { capitalize } from 'lodash/fp';
import type { IDestinationProvider, ISourceProvider, ProviderType } from '../../../types';
import { TransferEngineValidationError } from '../errors';
const reject = (reason: string): never => {
throw new TransferEngineValidationError(`Invalid provider supplied. ${reason}`);
};
const validateProvider = <T extends ProviderType>(
type: ProviderType,
provider?: ([T] extends ['source'] ? ISourceProvider : IDestinationProvider) | null
) => {
if (!provider) {
return reject(
`Expected an instance of "${capitalize(type)}Provider", but got "${typeof provider}" instead.`
);
}
if (provider.type !== type) {
return reject(
`Expected the provider to be of type "${type}" but got "${provider.type}" instead.`
);
}
};
export { validateProvider };

View File

@ -0,0 +1,19 @@
import { Severity } from './constants';
class DataTransferError<T = unknown> extends Error {
origin: string;
severity: Severity;
details: T | null;
constructor(origin: string, severity: Severity, message?: string, details?: T | null) {
super(message);
this.origin = origin;
this.severity = severity;
this.details = details ?? null;
}
}
export { DataTransferError };

View File

@ -0,0 +1,6 @@
export const SeverityKind = {
FATAL: 1,
ERROR: 2,
SILLY: 3,
} as const;
export type Severity = typeof SeverityKind[keyof typeof SeverityKind];

View File

@ -0,0 +1,2 @@
export * from './constants';
export * from './base';

View File

@ -1,7 +1,10 @@
import type { IAsset, IEntity, ILink } from './common-entities';
import type { ITransferResults, TransferTransform } from './utils';
import type { ISourceProvider, IDestinationProvider } from './providers';
import { PassThrough } from 'stream';
import type { Schema } from '@strapi/strapi';
import type { IAsset, IEntity, ILink } from './common-entities';
import type { ITransferResults, TransferTransform, TransferProgress } from './utils';
import type { ISourceProvider, IDestinationProvider } from './providers';
import type { Severity } from '../src/errors';
import type { DiagnosticReporter } from '../src/engine/diagnostic';
/**
* Defines the capabilities and properties of the transfer engine
@ -22,6 +25,18 @@ export interface ITransferEngine<
* The options used to customize the behavio of the transfer engine
*/
options: ITransferEngineOptions;
/**
* A diagnostic reporter instance used to gather information about
* errors, warnings and information emitted by the engine
*/
diagnostics: DiagnosticReporter;
/**
* Utilities used to retrieve transfer progress data
*/
progress: {
data: TransferProgress;
stream: PassThrough;
};
/**
* Runs the integrity check which will make sure it's possible
@ -29,7 +44,7 @@ export interface ITransferEngine<
*
* Note: It requires to read the content of the source & destination metadata files
*/
integrityCheck(): Promise<boolean>;
integrityCheck(): Promise<void | never>;
/**
* Start streaming selected data from the source to the destination

View File

@ -74,6 +74,29 @@ module.exports = async (opts) => {
},
});
engine.diagnostics
.on('error', ({ details }) => {
const { createdAt, severity, name, message } = details;
logger.error(
chalk.red(`[${createdAt.toISOString()}] [error] (${severity}) ${name}: ${message}`)
);
})
.on('info', ({ details }) => {
const { createdAt, message, params } = details;
const msg = typeof message === 'function' ? message(params) : message;
logger.info(chalk.blue(`[${createdAt.toISOString()}] [info] ${msg}`));
})
.on('warning', ({ details }) => {
const { createdAt, origin, message } = details;
logger.warn(
chalk.yellow(`[${createdAt.toISOString()}] [warning] (${origin ?? 'transfer'}) ${message}`)
);
});
const progress = engine.progress.stream;
const getTelemetryPayload = (/* payload */) => {
@ -104,9 +127,9 @@ module.exports = async (opts) => {
logger.log(`${chalk.bold('Export process has been completed successfully!')}`);
logger.log(`Export archive is in ${chalk.green(outFile)}`);
} catch (e) {
} catch {
await strapi.telemetry.send('didDEITSProcessFail', getTelemetryPayload());
logger.error('Export process failed unexpectedly:', e.toString());
logger.error('Export process failed');
process.exit(1);
}

View File

@ -14,6 +14,7 @@ const {
const { isObject } = require('lodash/fp');
const path = require('path');
const chalk = require('chalk');
const strapi = require('../../index');
const { buildTransferTable, DEFAULT_IGNORED_CONTENT_TYPES } = require('./utils');
@ -83,6 +84,29 @@ module.exports = async (opts) => {
const engine = createTransferEngine(source, destination, engineOptions);
engine.diagnostics
.on('error', ({ details }) => {
const { createdAt, severity, name, message } = details;
logger.error(
chalk.red(`[${createdAt.toISOString()}] [error] (${severity}) ${name}: ${message}`)
);
})
.on('info', ({ details }) => {
const { createdAt, message, params } = details;
const msg = typeof message === 'function' ? message(params) : message;
logger.info(chalk.blue(`[${createdAt.toISOString()}] [info] ${msg}`));
})
.on('warning', ({ details }) => {
const { createdAt, origin, message } = details;
logger.warn(
chalk.yellow(`[${createdAt.toISOString()}] [warning] (${origin ?? 'transfer'}) ${message}`)
);
});
const progress = engine.progress.stream;
const getTelemetryPayload = () => {
return {
@ -106,8 +130,8 @@ module.exports = async (opts) => {
logger.info('Import process has been completed successfully!');
} catch (e) {
await strapiInstance.telemetry.send('didDEITSProcessFail', getTelemetryPayload());
logger.error('Import process failed unexpectedly:');
logger.error(e);
logger.error('Import process failed');
process.exit(1);
}