440 lines
12 KiB
TypeScript
Raw Normal View History

2022-11-11 13:41:01 +01:00
import { PassThrough } from 'stream-chain';
import type {
Diff,
IDestinationProvider,
IMetadata,
ISourceProvider,
ITransferEngine,
ITransferEngineOptions,
ITransferResults,
2022-11-11 14:24:30 +01:00
TransferStage,
} from '../../types';
import { isEmpty, uniq, has } from 'lodash/fp';
2022-11-15 13:52:11 +01:00
import compareSchemas from '../strategies';
2022-11-11 13:41:01 +01:00
type TransferProgress = {
2022-11-11 14:24:30 +01:00
[key in TransferStage]?: {
2022-11-11 13:41:01 +01:00
count: number;
2022-11-16 11:04:40 +01:00
bytes: number;
2022-11-11 16:14:21 +01:00
aggregates?: {
[key: string]: {
count: number;
2022-11-16 11:04:40 +01:00
bytes: number;
2022-11-11 16:14:21 +01:00
};
};
2022-11-11 13:41:01 +01:00
};
};
2022-11-15 18:42:34 +01:00
type TransferEngineProgress = {
data: any;
stream: PassThrough;
};
2022-11-15 20:58:50 +01:00
export const VALID_STRATEGIES = ['restore', 'merge'];
class TransferEngine<
S extends ISourceProvider = ISourceProvider,
D extends IDestinationProvider = IDestinationProvider
> implements ITransferEngine
{
sourceProvider: ISourceProvider;
destinationProvider: IDestinationProvider;
options: ITransferEngineOptions;
#metadata: { source?: IMetadata; destination?: IMetadata } = {};
2022-11-15 14:30:42 +01:00
2022-11-15 18:42:34 +01:00
#transferProgress: TransferProgress = {};
2022-11-15 18:43:53 +01:00
// TODO: Type the stream chunks. Doesn't seem trivial, especially since PassThrough doesn't provide a PassThroughOptions type
2022-11-11 13:41:01 +01:00
#progressStream: PassThrough = new PassThrough({ objectMode: true });
2022-11-15 18:42:34 +01:00
get progress(): TransferEngineProgress {
return {
data: this.#transferProgress,
stream: this.#progressStream,
};
2022-11-11 13:41:01 +01:00
}
constructor(
sourceProvider: ISourceProvider,
destinationProvider: IDestinationProvider,
options: ITransferEngineOptions
) {
2022-11-17 21:19:33 +01:00
if (sourceProvider.type !== 'source') {
throw new Error("SourceProvider does not have type 'source'");
}
if (destinationProvider.type !== 'destination') {
throw new Error("DestinationProvider does not have type 'destination'");
2022-11-17 21:19:33 +01:00
}
this.sourceProvider = sourceProvider;
this.destinationProvider = destinationProvider;
this.options = options;
}
2022-11-15 14:30:42 +01:00
#increaseTransferProgress(transferStage: TransferStage, data: any, aggregateKey?: string) {
2022-11-15 18:42:34 +01:00
if (!this.#transferProgress[transferStage]) {
this.#transferProgress[transferStage] = { count: 0, bytes: 0 };
2022-11-11 13:41:01 +01:00
}
2022-11-15 18:42:34 +01:00
this.#transferProgress[transferStage]!.count += 1;
2022-11-11 16:14:21 +01:00
const size = JSON.stringify(data).length;
2022-11-15 18:42:34 +01:00
this.#transferProgress[transferStage]!.bytes! += size;
2022-11-11 16:14:21 +01:00
2022-11-18 09:47:09 +01:00
if (aggregateKey && data && data[aggregateKey]) {
2022-11-11 16:14:21 +01:00
const aggKeyValue = data[aggregateKey];
2022-11-18 09:47:09 +01:00
if (!this.#transferProgress[transferStage]!['aggregates']) {
2022-11-15 18:42:34 +01:00
this.#transferProgress[transferStage]!.aggregates = {};
2022-11-11 16:14:21 +01:00
}
2022-11-18 09:47:09 +01:00
if (
!(
this.#transferProgress[transferStage]!.aggregates &&
this.#transferProgress[transferStage]!.aggregates![aggKeyValue]
)
) {
2022-11-15 18:42:34 +01:00
this.#transferProgress[transferStage]!.aggregates![aggKeyValue] = { count: 0, bytes: 0 };
2022-11-11 16:14:21 +01:00
}
2022-11-15 18:42:34 +01:00
this.#transferProgress[transferStage]!.aggregates![aggKeyValue].count += 1;
this.#transferProgress[transferStage]!.aggregates![aggKeyValue].bytes! += size;
2022-11-11 16:14:21 +01:00
}
2022-11-11 13:41:01 +01:00
}
2022-11-15 14:30:42 +01:00
#countRecorder = (transferStage: TransferStage, aggregateKey?: string) => {
2022-11-11 13:41:01 +01:00
return new PassThrough({
objectMode: true,
2022-11-15 14:30:42 +01:00
transform: (data, _encoding, callback) => {
this.#increaseTransferProgress(transferStage, data, aggregateKey);
2022-11-15 18:23:31 +01:00
this.#updateStage('progress', transferStage);
2022-11-11 15:25:19 +01:00
callback(null, data);
2022-11-11 13:41:01 +01:00
},
});
};
2022-11-15 18:23:31 +01:00
#updateStage = (type: 'start' | 'complete' | 'progress', transferStage: TransferStage) => {
2022-11-15 18:42:34 +01:00
this.#progressStream.emit(type, {
data: this.#transferProgress,
2022-11-15 18:23:31 +01:00
stage: transferStage,
2022-11-11 13:41:01 +01:00
});
};
#assertStrapiVersionIntegrity(sourceVersion?: string, destinationVersion?: string) {
const strategy = this.options.versionMatching;
if (!sourceVersion || !destinationVersion) {
return;
}
if (strategy === 'ignore') {
return;
}
if (strategy === 'exact' && sourceVersion === destinationVersion) {
return;
}
const sourceTokens = sourceVersion.split('.');
const destinationTokens = destinationVersion.split('.');
const [major, minor, patch] = sourceTokens.map(
(value, index) => value === destinationTokens[index]
);
if (
(strategy === 'major' && major) ||
(strategy === 'minor' && major && minor) ||
(strategy === 'patch' && major && minor && patch)
) {
return;
}
throw new Error(
`Strapi versions doesn't match (${strategy} check): ${sourceVersion} does not match with ${destinationVersion}`
);
}
#assertSchemasMatching(sourceSchemas: any, destinationSchemas: any) {
const strategy = this.options.schemasMatching || 'strict';
const keys = uniq(Object.keys(sourceSchemas).concat(Object.keys(destinationSchemas)));
const diffs: { [key: string]: Diff[] } = {};
keys.forEach((key) => {
const sourceSchema = sourceSchemas[key];
const destinationSchema = destinationSchemas[key];
const schemaDiffs = compareSchemas(sourceSchema, destinationSchema, strategy);
if (schemaDiffs.length) {
diffs[key] = schemaDiffs;
}
});
if (!isEmpty(diffs)) {
throw new Error(
`Import process failed because the project doesn't have a matching data structure
${JSON.stringify(diffs, null, 2)}
`
);
}
}
async init(): Promise<void> {
// Resolve providers' resource and store
// them in the engine's internal state
await this.#resolveProviderResource();
// Update the destination provider's source metadata
const { source: sourceMetadata } = this.#metadata;
if (sourceMetadata) {
this.destinationProvider.setMetadata?.('source', sourceMetadata);
}
}
2022-11-15 18:51:43 +01:00
async bootstrap(): Promise<void> {
2022-11-17 20:06:53 +01:00
await Promise.all([this.sourceProvider.bootstrap?.(), this.destinationProvider.bootstrap?.()]);
}
async close(): Promise<void> {
2022-11-17 20:06:53 +01:00
await Promise.all([this.sourceProvider.close?.(), this.destinationProvider.close?.()]);
}
async #resolveProviderResource() {
const sourceMetadata = await this.sourceProvider.getMetadata();
const destinationMetadata = await this.destinationProvider.getMetadata();
if (sourceMetadata) {
this.#metadata.source = sourceMetadata;
}
if (destinationMetadata) {
this.#metadata.destination = destinationMetadata;
}
}
async integrityCheck(): Promise<boolean> {
try {
const sourceMetadata = await this.sourceProvider.getMetadata();
const destinationMetadata = await this.destinationProvider.getMetadata();
if (sourceMetadata && destinationMetadata) {
this.#assertStrapiVersionIntegrity(
sourceMetadata?.strapi?.version,
destinationMetadata?.strapi?.version
);
}
const sourceSchemas = await this.sourceProvider.getSchemas?.();
const destinationSchemas = await this.destinationProvider.getSchemas?.();
if (sourceSchemas && destinationSchemas) {
this.#assertSchemasMatching(sourceSchemas, destinationSchemas);
}
return true;
} catch (error) {
if (error instanceof Error) {
console.error('Integrity checks failed:', error.message);
}
return false;
}
}
2022-11-15 20:58:50 +01:00
validateTransferOptions() {
if (!VALID_STRATEGIES.includes(this.options.strategy)) {
throw new Error('Invalid stategy ' + this.options.strategy);
}
}
async transfer(): Promise<ITransferResults<S, D>> {
try {
2022-11-15 20:58:50 +01:00
this.validateTransferOptions();
2022-11-15 18:51:43 +01:00
await this.bootstrap();
await this.init();
const isValidTransfer = await this.integrityCheck();
if (!isValidTransfer) {
throw new Error(
`Unable to transfer the data between ${this.sourceProvider.name} and ${this.destinationProvider.name}.\nPlease refer to the log above for more information.`
);
}
// Run the transfer stages
2022-11-03 10:12:16 +02:00
await this.transferSchemas();
2022-11-08 15:23:14 +01:00
await this.transferEntities();
await this.transferMedia();
await this.transferLinks();
await this.transferConfiguration();
// Gracefully close the providers
await this.close();
2022-11-17 14:32:28 +01:00
} catch (e: unknown) {
// Rollback the destination provider if an exception is thrown during the transfer
// Note: This will be configurable in the future
2022-11-17 14:32:28 +01:00
await this.destinationProvider.rollback?.(e as Error);
throw e;
}
return {
source: this.sourceProvider.results,
destination: this.destinationProvider.results,
};
}
2022-11-03 10:12:16 +02:00
async transferSchemas(): Promise<void> {
2022-11-15 14:30:42 +01:00
const stageName: TransferStage = 'schemas';
2022-11-07 11:10:05 +02:00
2022-11-15 20:58:50 +01:00
const inStream = await this.sourceProvider.streamSchemas?.();
if (!inStream) {
2022-11-15 20:58:50 +01:00
return;
}
2022-11-15 20:58:50 +01:00
const outStream = await this.destinationProvider.getSchemasStream?.();
if (!outStream) {
2022-11-15 20:58:50 +01:00
return;
2022-11-03 10:12:16 +02:00
}
2022-11-15 14:30:42 +01:00
this.#updateStage('start', stageName);
2022-11-03 10:12:16 +02:00
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
.on('error', reject);
outStream
// Throw on error in the destination
.on('error', reject)
2022-11-03 10:12:16 +02:00
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
.on('close', () => {
2022-11-15 14:30:42 +01:00
this.#updateStage('complete', stageName);
2022-11-11 13:41:01 +01:00
resolve();
});
2022-11-03 10:12:16 +02:00
2022-11-15 14:30:42 +01:00
inStream.pipe(this.#countRecorder(stageName)).pipe(outStream);
2022-11-03 10:12:16 +02:00
});
}
async transferEntities(): Promise<void> {
2022-11-15 14:30:42 +01:00
const stageName: TransferStage = 'entities';
2022-11-15 20:58:50 +01:00
const inStream = await this.sourceProvider.streamEntities?.();
2022-10-31 10:21:23 +01:00
if (!inStream) {
2022-11-15 20:58:50 +01:00
return;
2022-10-31 10:21:23 +01:00
}
2022-11-15 20:58:50 +01:00
const outStream = await this.destinationProvider.getEntitiesStream?.();
2022-10-31 10:21:23 +01:00
if (!outStream) {
2022-11-15 20:58:50 +01:00
return;
}
2022-11-15 14:30:42 +01:00
this.#updateStage('start', stageName);
2022-11-11 13:41:01 +01:00
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
.on('error', (e) => {
reject(e);
});
outStream
// Throw on error in the destination
.on('error', (e) => {
reject(e);
})
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
.on('close', () => {
2022-11-15 14:30:42 +01:00
this.#updateStage('complete', stageName);
2022-11-11 13:41:01 +01:00
resolve();
});
2022-11-15 14:30:42 +01:00
inStream.pipe(this.#countRecorder(stageName, 'type')).pipe(outStream);
});
}
async transferLinks(): Promise<void> {
2022-11-15 14:30:42 +01:00
const stageName: TransferStage = 'links';
2022-11-15 20:58:50 +01:00
const inStream = await this.sourceProvider.streamLinks?.();
2022-10-31 10:21:23 +01:00
if (!inStream) {
2022-11-15 20:58:50 +01:00
return;
2022-10-31 10:21:23 +01:00
}
2022-11-15 20:58:50 +01:00
const outStream = await this.destinationProvider.getLinksStream?.();
2022-10-31 10:21:23 +01:00
if (!outStream) {
2022-11-15 20:58:50 +01:00
return;
}
2022-11-15 14:30:42 +01:00
this.#updateStage('start', 'links');
2022-11-11 13:41:01 +01:00
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
.on('error', reject);
outStream
// Throw on error in the destination
.on('error', reject)
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
.on('close', () => {
2022-11-15 14:30:42 +01:00
this.#updateStage('complete', stageName);
2022-11-11 13:41:01 +01:00
resolve();
});
2022-11-15 14:30:42 +01:00
inStream.pipe(this.#countRecorder(stageName)).pipe(outStream);
});
}
async transferMedia(): Promise<void> {
2022-11-15 14:30:42 +01:00
const stageName: TransferStage = 'media';
this.#updateStage('start', stageName);
console.warn('transferMedia not yet implemented');
2022-11-11 13:41:01 +01:00
return new Promise((resolve) =>
(() => {
2022-11-15 14:30:42 +01:00
this.#updateStage('complete', stageName);
2022-11-11 13:41:01 +01:00
resolve();
})()
);
}
async transferConfiguration(): Promise<void> {
2022-11-15 14:30:42 +01:00
const stageName: TransferStage = 'configuration';
2022-11-02 17:26:32 +01:00
2022-11-15 20:58:50 +01:00
const inStream = await this.sourceProvider.streamConfiguration?.();
2022-11-02 17:26:32 +01:00
if (!inStream) {
2022-11-15 20:58:50 +01:00
return;
2022-11-02 17:26:32 +01:00
}
2022-11-15 20:58:50 +01:00
const outStream = await this.destinationProvider.getConfigurationStream?.();
2022-11-02 17:26:32 +01:00
if (!outStream) {
2022-11-15 20:58:50 +01:00
return;
2022-11-02 17:26:32 +01:00
}
2022-11-15 14:30:42 +01:00
this.#updateStage('start', stageName);
2022-11-11 13:41:01 +01:00
2022-11-02 17:26:32 +01:00
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
.on('error', reject);
outStream
// Throw on error in the destination
.on('error', reject)
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
.on('close', () => {
2022-11-15 14:30:42 +01:00
this.#updateStage('complete', stageName);
2022-11-11 13:41:01 +01:00
resolve();
});
2022-11-02 17:26:32 +01:00
2022-11-15 14:30:42 +01:00
inStream.pipe(this.#countRecorder(stageName)).pipe(outStream);
2022-11-02 17:26:32 +01:00
});
}
}
2022-11-15 21:15:16 +01:00
export const createTransferEngine = <
S extends ISourceProvider = ISourceProvider,
D extends IDestinationProvider = IDestinationProvider
>(
sourceProvider: S,
destinationProvider: D,
options: ITransferEngineOptions
): TransferEngine<S, D> => {
return new TransferEngine<S, D>(sourceProvider, destinationProvider, options);
};