833 lines
24 KiB
TypeScript
Raw Normal View History

import { PassThrough, Transform, Readable, Writable } from 'stream';
2022-12-12 20:37:41 +01:00
import { extname } from 'path';
2023-01-13 16:43:20 +01:00
import { EOL } from 'os';
import { isEmpty, uniq, last, isNumber, difference, omit, set } from 'lodash/fp';
2022-12-12 20:36:26 +01:00
import { diff as semverDiff } from 'semver';
2022-12-12 20:37:41 +01:00
import type { Schema } from '@strapi/strapi';
import type {
IAsset,
IDestinationProvider,
IEntity,
ILink,
IMetadata,
ISourceProvider,
ITransferEngine,
ITransferEngineOptions,
2022-12-14 09:59:38 +01:00
TransferProgress,
ITransferResults,
2022-11-11 14:24:30 +01:00
TransferStage,
TransferTransform,
2023-01-13 16:43:20 +01:00
IProvider,
2023-01-16 18:44:14 +01:00
TransferFilters,
TransferFilterPreset,
} from '../../types';
import type { Diff } from '../utils/json';
2023-01-13 16:43:20 +01:00
import { compareSchemas, validateProvider } from './validation';
import { filter, map } from '../utils/stream';
import { TransferEngineError, TransferEngineValidationError } from './errors';
2023-01-13 16:43:20 +01:00
import {
createDiagnosticReporter,
IDiagnosticReporter,
ErrorDiagnosticSeverity,
} from './diagnostic';
import { DataTransferError } from '../errors';
2023-01-13 16:43:20 +01:00
2022-12-13 15:11:00 +01:00
export const TRANSFER_STAGES: ReadonlyArray<TransferStage> = Object.freeze([
2022-12-12 14:24:39 +01:00
'entities',
'links',
'assets',
'schemas',
'configuration',
2022-12-12 16:38:19 +01:00
]);
2022-11-11 13:41:01 +01:00
2023-01-16 18:44:14 +01:00
export type TransferGroupFilter = Record<TransferFilterPreset, TransferFilters>;
/**
* Preset filters for only/exclude options
* */
export const TransferGroupPresets: TransferGroupFilter = {
content: {
links: true, // Example: content includes the entire links stage
entities: true,
// TODO: If we need to implement filtering on a running stage, it would be done like this, but we still need to implement it
// [
// // Example: content processes the entities stage, but filters individual entities
// {
// filter(data) {
// return shouldIncludeThisData(data);
// },
// },
// ],
},
files: {
assets: true,
links: true,
},
config: {
configuration: true,
},
};
export const DEFAULT_VERSION_STRATEGY = 'ignore';
export const DEFAULT_SCHEMA_STRATEGY = 'strict';
2022-12-12 16:51:23 +01:00
type SchemaMap = Record<string, Schema>;
2022-11-11 13:41:01 +01:00
class TransferEngine<
S extends ISourceProvider = ISourceProvider,
D extends IDestinationProvider = IDestinationProvider
> implements ITransferEngine
{
sourceProvider: ISourceProvider;
2022-11-28 16:54:36 +01:00
destinationProvider: IDestinationProvider;
2022-11-28 16:54:36 +01:00
options: ITransferEngineOptions;
2022-11-28 16:54:36 +01:00
#metadata: { source?: IMetadata; destination?: IMetadata } = {};
2022-11-15 14:30:42 +01:00
2023-03-06 10:24:41 +01:00
// Progress of the current stage
progress: {
2023-03-06 10:24:41 +01:00
// metrics on the progress such as size and record count
data: TransferProgress;
2023-03-06 10:24:41 +01:00
// stream that emits events
stream: PassThrough;
};
2022-11-11 13:41:01 +01:00
2023-01-13 16:43:20 +01:00
diagnostics: IDiagnosticReporter;
2023-03-06 10:24:41 +01:00
// Save the currently open stream so that we can access it at any time
#currentStream?: Writable;
2023-01-13 16:43:20 +01:00
constructor(sourceProvider: S, destinationProvider: D, options: ITransferEngineOptions) {
this.diagnostics = createDiagnosticReporter();
validateProvider('source', sourceProvider);
2023-01-17 14:30:39 +02:00
validateProvider('destination', destinationProvider);
2023-01-13 16:43:20 +01:00
this.sourceProvider = sourceProvider;
this.destinationProvider = destinationProvider;
this.options = options;
this.progress = { data: {}, stream: new PassThrough({ objectMode: true }) };
}
2023-01-13 16:43:20 +01:00
/**
* Report a fatal error and throw it
*/
#panic(error: Error) {
this.#reportError(error, 'fatal');
throw error;
}
/**
* Report an error diagnostic
*/
#reportError(error: Error, severity: ErrorDiagnosticSeverity) {
this.diagnostics.report({
kind: 'error',
details: {
severity,
createdAt: new Date(),
name: error.name,
message: error.message,
error,
},
});
}
/**
* Report a warning diagnostic
*/
#reportWarning(message: string, origin?: string) {
this.diagnostics.report({
kind: 'warning',
details: { createdAt: new Date(), message, origin },
});
}
/**
* Report an info diagnostic
*/
#reportInfo(message: string, params?: unknown) {
this.diagnostics.report({
kind: 'info',
details: { createdAt: new Date(), message, params },
});
}
/**
* Create and return a transform stream based on the given stage and options.
*
* Allowed transformations includes 'filter' and 'map'.
*/
#createStageTransformStream<T extends TransferStage>(
key: T,
options: { includeGlobal?: boolean } = {}
): PassThrough | Transform {
const { includeGlobal = true } = options;
2023-03-09 17:47:19 +01:00
const { throttle } = this.options;
2022-12-12 16:51:23 +01:00
const { global: globalTransforms, [key]: stageTransforms } = this.options?.transforms ?? {};
let stream = new PassThrough({ objectMode: true });
const applyTransforms = <U>(transforms: TransferTransform<U>[] = []) => {
for (const transform of transforms) {
if ('filter' in transform) {
stream = stream.pipe(filter(transform.filter));
}
if ('map' in transform) {
stream = stream.pipe(map(transform.map));
}
}
};
if (includeGlobal) {
2022-12-12 16:51:23 +01:00
applyTransforms(globalTransforms);
}
2023-03-09 17:47:19 +01:00
if (isNumber(throttle) && throttle > 0) {
stream = stream.pipe(
new PassThrough({
objectMode: true,
async transform(data, _encoding, callback) {
await new Promise((resolve) => {
setTimeout(resolve, throttle);
});
callback(null, data);
},
})
);
}
2022-12-12 16:51:23 +01:00
applyTransforms(stageTransforms as TransferTransform<unknown>[]);
return stream;
}
2023-01-13 16:43:20 +01:00
/**
* Update the Engine's transfer progress data for a given stage.
*
* Providing aggregate options enable custom computation to get the size (bytes) or the aggregate key associated with the data
*/
#updateTransferProgress<T = unknown>(
stage: TransferStage,
data: T,
aggregate?: {
size?: (value: T) => number;
key?: (value: T) => string;
}
) {
if (!this.progress.data[stage]) {
2023-02-22 09:59:29 +01:00
this.progress.data[stage] = { count: 0, bytes: 0, startTime: Date.now() };
2022-11-11 13:41:01 +01:00
}
2022-12-12 16:51:23 +01:00
const stageProgress = this.progress.data[stage];
if (!stageProgress) {
return;
}
const size = aggregate?.size?.(data) ?? JSON.stringify(data).length;
const key = aggregate?.key?.(data);
stageProgress.count += 1;
stageProgress.bytes += size;
// Handle aggregate updates if necessary
if (key) {
if (!stageProgress.aggregates) {
stageProgress.aggregates = {};
2022-11-11 16:14:21 +01:00
}
const { aggregates } = stageProgress;
if (!aggregates[key]) {
aggregates[key] = { count: 0, bytes: 0 };
2022-11-11 16:14:21 +01:00
}
aggregates[key].count += 1;
aggregates[key].bytes += size;
2022-11-11 16:14:21 +01:00
}
2022-11-11 13:41:01 +01:00
}
2023-01-13 16:43:20 +01:00
/**
* Create and return a PassThrough stream.
*
* Upon writing data into it, it'll update the Engine's transfer progress data and trigger stage update events.
*/
#progressTracker(
stage: TransferStage,
aggregate?: {
size?(value: unknown): number;
key?(value: unknown): string;
}
) {
2022-11-11 13:41:01 +01:00
return new PassThrough({
objectMode: true,
2022-11-15 14:30:42 +01:00
transform: (data, _encoding, callback) => {
this.#updateTransferProgress(stage, data, aggregate);
this.#emitStageUpdate('progress', stage);
2022-11-11 15:25:19 +01:00
callback(null, data);
2022-11-11 13:41:01 +01:00
},
});
}
2022-11-11 13:41:01 +01:00
2023-01-13 16:43:20 +01:00
/**
* Shorthand method used to trigger transfer update events to every listeners
*/
2022-12-28 16:08:08 +01:00
#emitTransferUpdate(type: 'init' | 'start' | 'finish' | 'error', payload?: object) {
2022-12-12 16:31:44 +01:00
this.progress.stream.emit(`transfer::${type}`, payload);
2022-12-12 14:24:39 +01:00
}
2023-01-13 16:43:20 +01:00
/**
* Shorthand method used to trigger stage update events to every listeners
*/
#emitStageUpdate(
2023-03-03 14:40:44 +01:00
type: 'start' | 'finish' | 'progress' | 'skip' | 'error',
transferStage: TransferStage
) {
2022-12-12 14:24:39 +01:00
this.progress.stream.emit(`stage::${type}`, {
data: this.progress.data,
2022-11-15 18:23:31 +01:00
stage: transferStage,
2022-11-11 13:41:01 +01:00
});
}
2022-11-11 13:41:01 +01:00
2023-01-13 16:43:20 +01:00
/**
* Run a version check between two strapi version (source and destination) using the strategy given to the engine during initialization.
*
* If there is a mismatch, throws a validation error.
*/
#assertStrapiVersionIntegrity(sourceVersion?: string, destinationVersion?: string) {
const strategy = this.options.versionStrategy || DEFAULT_VERSION_STRATEGY;
2023-01-13 16:43:20 +01:00
const reject = () => {
throw new TransferEngineValidationError(
`The source and destination provide are targeting incompatible Strapi versions (using the "${strategy}" strategy). The source (${this.sourceProvider.name}) version is ${sourceVersion} and the destination (${this.destinationProvider.name}) version is ${destinationVersion}`,
{
check: 'strapi.version',
strategy,
versions: { source: sourceVersion, destination: destinationVersion },
}
);
};
if (
!sourceVersion ||
!destinationVersion ||
strategy === 'ignore' ||
2022-11-23 14:08:47 +01:00
destinationVersion === sourceVersion
) {
return;
}
2022-11-23 14:08:47 +01:00
let diff;
try {
diff = semverDiff(sourceVersion, destinationVersion);
2023-01-13 16:43:20 +01:00
} catch {
reject();
2022-11-23 14:08:47 +01:00
}
2023-01-13 16:43:20 +01:00
if (!diff) {
return;
}
const validPatch = ['prelease', 'build'];
const validMinor = [...validPatch, 'patch', 'prepatch'];
const validMajor = [...validMinor, 'minor', 'preminor'];
if (strategy === 'patch' && validPatch.includes(diff)) {
return;
}
if (strategy === 'minor' && validMinor.includes(diff)) {
return;
}
if (strategy === 'major' && validMajor.includes(diff)) {
return;
}
2023-01-13 16:43:20 +01:00
reject();
}
2023-01-13 16:43:20 +01:00
/**
* Run a check between two set of schemas (source and destination) using the strategy given to the engine during initialization.
*
* If there are differences and/or incompatibilities between source and destination schemas, then throw a validation error.
*/
2022-12-12 16:51:23 +01:00
#assertSchemasMatching(sourceSchemas: SchemaMap, destinationSchemas: SchemaMap) {
const strategy = this.options.schemaStrategy || DEFAULT_SCHEMA_STRATEGY;
2023-01-13 16:43:20 +01:00
2022-12-13 16:44:07 +01:00
if (strategy === 'ignore') {
return;
}
const keys = uniq(Object.keys(sourceSchemas).concat(Object.keys(destinationSchemas)));
const diffs: { [key: string]: Diff[] } = {};
keys.forEach((key) => {
const sourceSchema = sourceSchemas[key];
const destinationSchema = destinationSchemas[key];
2023-01-27 13:58:10 +01:00
const schemaDiffs = compareSchemas(sourceSchema, destinationSchema, strategy);
if (schemaDiffs.length) {
2023-03-31 18:08:59 +02:00
diffs[key] = schemaDiffs as Diff<Schema>[];
}
});
if (!isEmpty(diffs)) {
2023-01-13 16:43:20 +01:00
const formattedDiffs = Object.entries(diffs)
.map(([uid, ctDiffs]) => {
let msg = `- ${uid}:${EOL}`;
msg += ctDiffs
.sort((a, b) => (a.kind > b.kind ? -1 : 1))
.map((diff) => {
const path = diff.path.join('.');
2023-01-13 16:43:20 +01:00
if (diff.kind === 'added') {
2023-01-27 13:58:10 +01:00
return `${path} exists in destination schema but not in source schema`;
2023-01-13 16:43:20 +01:00
}
if (diff.kind === 'deleted') {
2023-01-27 13:58:10 +01:00
return `${path} exists in source schema but not in destination schema`;
2023-01-13 16:43:20 +01:00
}
if (diff.kind === 'modified') {
2023-01-30 10:32:41 +01:00
if (diff.types[0] === diff.types[1]) {
return `Schema value changed at "${path}": "${diff.values[0]}" (${diff.types[0]}) => "${diff.values[1]}" (${diff.types[1]})`;
2023-01-30 10:15:47 +01:00
}
2023-01-27 14:06:33 +01:00
2023-01-30 10:32:41 +01:00
return `Schema has differing data types at "${path}": "${diff.values[0]}" (${diff.types[0]}) => "${diff.values[1]}" (${diff.types[1]})`;
2023-01-13 16:43:20 +01:00
}
throw new TransferEngineValidationError(`Invalid diff found for "${uid}"`, {
check: `schema on ${uid}`,
});
2023-01-13 16:43:20 +01:00
})
.map((line) => ` - ${line}`)
.join(EOL);
return msg;
})
.join(EOL);
throw new TransferEngineValidationError(
`Invalid schema changes detected during integrity checks (using the ${strategy} strategy). Please find a summary of the changes below:\n${formattedDiffs}`,
{
check: 'schema.changes',
strategy,
diffs,
}
);
}
}
2023-01-16 18:44:14 +01:00
shouldSkipStage(stage: TransferStage) {
const { exclude, only } = this.options;
2023-01-18 09:33:27 +01:00
// schemas must always be included
if (stage === 'schemas') {
return false;
}
2023-01-16 18:44:14 +01:00
// everything is included by default unless 'only' has been set
let included = isEmpty(only);
if (only?.length > 0) {
included = only.some((transferGroup) => {
return TransferGroupPresets[transferGroup][stage];
});
}
if (exclude?.length > 0) {
if (included) {
included = !exclude.some((transferGroup) => {
return TransferGroupPresets[transferGroup][stage];
});
}
}
return !included;
}
async #transferStage(options: {
stage: TransferStage;
source?: Readable;
destination?: Writable;
transform?: PassThrough;
tracker?: PassThrough;
}) {
const { stage, source, destination, transform, tracker } = options;
2023-02-22 09:59:29 +01:00
const updateEndTime = () => {
const stageData = this.progress.data[stage];
if (stageData) {
stageData.endTime = Date.now();
}
};
2023-01-16 18:44:14 +01:00
if (!source || !destination || this.shouldSkipStage(stage)) {
// Wait until source and destination are closed
2023-01-13 16:43:20 +01:00
const results = await Promise.allSettled(
[source, destination].map((stream) => {
// if stream is undefined or already closed, resolve immediately
if (!stream || stream.destroyed) {
return Promise.resolve();
}
// Wait until the close event is produced and then destroy the stream and resolve
return new Promise((resolve, reject) => {
stream.on('close', resolve).on('error', reject).destroy();
});
})
);
2023-01-13 16:43:20 +01:00
results.forEach((state) => {
if (state.status === 'rejected') {
this.#reportWarning(state.reason, `transfer(${stage})`);
}
});
this.#emitStageUpdate('skip', stage);
return;
}
this.#emitStageUpdate('start', stage);
await new Promise<void>((resolve, reject) => {
let stream: Readable = source;
if (transform) {
stream = stream.pipe(transform);
}
if (tracker) {
stream = stream.pipe(tracker);
}
this.#currentStream = stream
2023-01-13 16:43:20 +01:00
.pipe(destination)
.on('error', (e) => {
2023-02-22 09:59:29 +01:00
updateEndTime();
2023-03-03 14:40:44 +01:00
this.#emitStageUpdate('error', stage);
2023-01-25 16:31:51 +01:00
this.#reportError(e, 'error');
destination.destroy(e);
2023-01-13 16:43:20 +01:00
reject(e);
})
2023-02-22 09:59:29 +01:00
.on('close', () => {
this.#currentStream = undefined;
2023-02-22 09:59:29 +01:00
updateEndTime();
resolve();
});
});
this.#emitStageUpdate('finish', stage);
}
// Cause an ongoing transfer to abort gracefully
async abortTransfer(): Promise<void> {
2023-03-03 14:40:44 +01:00
this.#currentStream?.destroy(new TransferEngineError('fatal', 'Transfer aborted.'));
}
async init(): Promise<void> {
// Resolve providers' resource and store
// them in the engine's internal state
await this.#resolveProviderResource();
// Update the destination provider's source metadata
const { source: sourceMetadata } = this.#metadata;
if (sourceMetadata) {
this.destinationProvider.setMetadata?.('source', sourceMetadata);
}
}
2023-01-13 16:43:20 +01:00
/**
* Run the bootstrap method in both source and destination providers
*/
2022-11-15 18:51:43 +01:00
async bootstrap(): Promise<void> {
2023-01-13 16:43:20 +01:00
const results = await Promise.allSettled([
this.sourceProvider.bootstrap?.(),
this.destinationProvider.bootstrap?.(),
]);
results.forEach((result) => {
if (result.status === 'rejected') {
this.#panic(result.reason);
}
});
}
2023-01-13 16:43:20 +01:00
/**
* Run the close method in both source and destination providers
*/
async close(): Promise<void> {
2023-01-13 16:43:20 +01:00
const results = await Promise.allSettled([
this.sourceProvider.close?.(),
this.destinationProvider.close?.(),
2023-01-13 16:43:20 +01:00
]);
results.forEach((result) => {
if (result.status === 'rejected') {
this.#panic(result.reason);
}
});
}
async #resolveProviderResource() {
const sourceMetadata = await this.sourceProvider.getMetadata();
const destinationMetadata = await this.destinationProvider.getMetadata();
if (sourceMetadata) {
this.#metadata.source = sourceMetadata;
}
if (destinationMetadata) {
this.#metadata.destination = destinationMetadata;
}
}
#schemaDiffs: Record<string, Diff[]> = {};
2023-01-13 16:43:20 +01:00
async integrityCheck() {
try {
const sourceMetadata = await this.sourceProvider.getMetadata();
const destinationMetadata = await this.destinationProvider.getMetadata();
if (sourceMetadata && destinationMetadata) {
this.#assertStrapiVersionIntegrity(
sourceMetadata?.strapi?.version,
destinationMetadata?.strapi?.version
);
}
2022-12-12 16:51:23 +01:00
const sourceSchemas = (await this.sourceProvider.getSchemas?.()) as SchemaMap;
const destinationSchemas = (await this.destinationProvider.getSchemas?.()) as SchemaMap;
if (sourceSchemas && destinationSchemas) {
this.#assertSchemasMatching(sourceSchemas, destinationSchemas);
}
} catch (error) {
if (error instanceof TransferEngineValidationError) {
this.#schemaDiffs = error.details?.details?.diffs as Record<string, Diff[]>;
// TODO: implement a confirmation callback to confirm or reject the error
Object.entries(this.#schemaDiffs).forEach(([uid, diffs]) => {
for (const diff of diffs) {
this.#reportWarning(`${diff.path.join('.')} for ${uid}`, 'Schema Integrity Check');
// await new Promise((resolve, reject) => {
// this.confirm('Continue with diffs ?', resolve, reject);
// });
}
});
return;
2023-01-13 16:43:20 +01:00
}
throw error;
}
}
async transfer(): Promise<ITransferResults<S, D>> {
2022-12-12 16:30:56 +01:00
// reset data between transfers
this.progress.data = {};
try {
2022-12-28 16:08:08 +01:00
this.#emitTransferUpdate('init');
2022-11-15 18:51:43 +01:00
await this.bootstrap();
2022-12-28 13:01:35 +01:00
await this.init();
2023-01-13 16:43:20 +01:00
await this.integrityCheck();
2022-12-28 16:08:08 +01:00
this.#emitTransferUpdate('start');
2022-12-28 13:01:35 +01:00
await this.beforeTransfer();
2023-01-13 16:43:20 +01:00
// Run the transfer stages
2022-12-28 13:01:35 +01:00
await this.transferSchemas();
await this.transferEntities();
await this.transferAssets();
await this.transferLinks();
await this.transferConfiguration();
// Gracefully close the providers
2022-12-28 13:01:35 +01:00
await this.close();
2022-12-21 10:23:18 +01:00
this.#emitTransferUpdate('finish');
2022-11-17 14:32:28 +01:00
} catch (e: unknown) {
2022-12-12 17:06:53 +01:00
this.#emitTransferUpdate('error', { error: e });
2022-12-12 14:24:39 +01:00
2023-01-13 16:43:20 +01:00
const lastDiagnostic = last(this.diagnostics.stack.items);
// Do not report an error diagnostic if the last one reported the same error
if (
e instanceof Error &&
(!lastDiagnostic || lastDiagnostic.kind !== 'error' || lastDiagnostic.details.error !== e)
) {
this.#reportError(e, (e as DataTransferError).severity || 'fatal');
2023-01-13 16:43:20 +01:00
}
// Rollback the destination provider if an exception is thrown during the transfer
// Note: This will be configurable in the future
2022-11-17 14:32:28 +01:00
await this.destinationProvider.rollback?.(e as Error);
2023-01-13 16:43:20 +01:00
throw e;
}
return {
source: this.sourceProvider.results,
destination: this.destinationProvider.results,
engine: this.progress.data,
};
}
async beforeTransfer(): Promise<void> {
2023-01-13 16:43:20 +01:00
const runWithDiagnostic = async (provider: IProvider) => {
try {
await provider.beforeTransfer?.();
} catch (error) {
// Error happening during the before transfer step should be considered fatal errors
if (error instanceof Error) {
this.#panic(error);
} else {
this.#panic(
new Error(`Unknwon error when executing "beforeTransfer" on the ${origin} provider`)
);
}
}
};
await runWithDiagnostic(this.sourceProvider);
await runWithDiagnostic(this.destinationProvider);
2022-11-16 15:00:29 +02:00
}
2022-11-03 10:12:16 +02:00
async transferSchemas(): Promise<void> {
const stage: TransferStage = 'schemas';
2022-11-07 11:10:05 +02:00
const source = await this.sourceProvider.createSchemasReadStream?.();
const destination = await this.destinationProvider.createSchemasWriteStream?.();
const transform = this.#createStageTransformStream(stage);
const tracker = this.#progressTracker(stage, { key: (value: Schema) => value.modelType });
2022-11-03 10:12:16 +02:00
await this.#transferStage({ stage, source, destination, transform, tracker });
2022-11-03 10:12:16 +02:00
}
async transferEntities(): Promise<void> {
const stage: TransferStage = 'entities';
const source = await this.sourceProvider.createEntitiesReadStream?.();
const destination = await this.destinationProvider.createEntitiesWriteStream?.();
const transform = this.#createStageTransformStream(stage).pipe(
new Transform({
objectMode: true,
transform: async (entity: IEntity, _encoding, callback) => {
const schemas = await this.destinationProvider.getSchemas?.();
if (!schemas) {
return callback(null, entity);
}
const availableContentTypes = Object.entries(schemas)
.filter(([, schema]) => schema.modelType === 'contentType')
.map(([uid]) => uid);
// If the type of the transferred entity doesn't exist in the destination, then discard it
if (!availableContentTypes.includes(entity.type)) {
return callback(null, undefined);
}
const { type, data } = entity;
const attributes = (schemas[type] as Record<string, unknown>).attributes as object;
const attributesToRemove = difference(Object.keys(data), Object.keys(attributes));
const updatedEntity = set('data', omit(attributesToRemove, data), entity);
callback(null, updatedEntity);
},
})
);
const tracker = this.#progressTracker(stage, { key: (value: IEntity) => value.type });
await this.#transferStage({ stage, source, destination, transform, tracker });
}
async transferLinks(): Promise<void> {
const stage: TransferStage = 'links';
const source = await this.sourceProvider.createLinksReadStream?.();
const destination = await this.destinationProvider.createLinksWriteStream?.();
2022-11-11 13:41:01 +01:00
const transform = this.#createStageTransformStream(stage).pipe(
new Transform({
objectMode: true,
transform: async (link: ILink, _encoding, callback) => {
const schemas = await this.destinationProvider.getSchemas?.();
if (!schemas) {
return callback(null, link);
}
const availableContentTypes = Object.entries(schemas)
.filter(([, schema]) => schema.modelType === 'contentType')
.map(([uid]) => uid);
const isValidType = (uid: string) => availableContentTypes.includes(uid);
if (!isValidType(link.left.type) || !isValidType(link.right.type)) {
return callback(null, undefined); // ignore the link
}
callback(null, link);
},
})
);
const tracker = this.#progressTracker(stage);
await this.#transferStage({ stage, source, destination, transform, tracker });
}
2022-11-24 16:07:07 +01:00
async transferAssets(): Promise<void> {
const stage: TransferStage = 'assets';
if (this.shouldSkipStage(stage)) {
return;
}
2022-11-21 18:55:51 +01:00
const source = await this.sourceProvider.createAssetsReadStream?.();
const destination = await this.destinationProvider.createAssetsWriteStream?.();
2022-11-21 18:55:51 +01:00
const transform = this.#createStageTransformStream(stage);
const tracker = this.#progressTracker(stage, {
size: (value: IAsset) => value.stats.size,
key: (value: IAsset) => extname(value.filename) || 'No extension',
2022-11-21 18:55:51 +01:00
});
await this.#transferStage({ stage, source, destination, transform, tracker });
}
async transferConfiguration(): Promise<void> {
const stage: TransferStage = 'configuration';
2022-11-02 17:26:32 +01:00
const source = await this.sourceProvider.createConfigurationReadStream?.();
const destination = await this.destinationProvider.createConfigurationWriteStream?.();
2022-11-02 17:26:32 +01:00
const transform = this.#createStageTransformStream(stage);
const tracker = this.#progressTracker(stage);
2022-11-11 13:41:01 +01:00
await this.#transferStage({ stage, source, destination, transform, tracker });
}
}
2023-01-13 16:43:20 +01:00
export const createTransferEngine = <S extends ISourceProvider, D extends IDestinationProvider>(
sourceProvider: S,
destinationProvider: D,
options: ITransferEngineOptions
): TransferEngine<S, D> => {
return new TransferEngine<S, D>(sourceProvider, destinationProvider, options);
};
export * as errors from './errors';