288 lines
8.0 KiB
TypeScript
Raw Normal View History

import type {
IDestinationProvider,
IMetadata,
ISourceProvider,
ITransferEngine,
ITransferEngineOptions,
} from '../../types';
class TransferEngine implements ITransferEngine {
sourceProvider: ISourceProvider;
destinationProvider: IDestinationProvider;
options: ITransferEngineOptions;
#metadata: { source?: IMetadata; destination?: IMetadata } = {};
constructor(
sourceProvider: ISourceProvider,
destinationProvider: IDestinationProvider,
options: ITransferEngineOptions
) {
this.sourceProvider = sourceProvider;
this.destinationProvider = destinationProvider;
this.options = options;
}
#assertStrapiVersionIntegrity(sourceVersion?: string, destinationVersion?: string) {
const strategy = this.options.versionMatching;
if (!sourceVersion || !destinationVersion) {
return;
}
if (strategy === 'ignore') {
return;
}
if (strategy === 'exact' && sourceVersion === destinationVersion) {
return;
}
const sourceTokens = sourceVersion.split('.');
const destinationTokens = destinationVersion.split('.');
const [major, minor, patch] = sourceTokens.map(
(value, index) => value === destinationTokens[index]
);
if (
(strategy === 'major' && major) ||
(strategy === 'minor' && major && minor) ||
(strategy === 'patch' && major && minor && patch)
) {
return;
}
throw new Error(
`Strapi versions doesn't match (${strategy} check): ${sourceVersion} does not match with ${destinationVersion} `
);
}
async init(): Promise<void> {
// Resolve providers' resource and store
// them in the engine's internal state
await this.#resolveProviderResource();
// Update the destination provider's source metadata
const { source: sourceMetadata } = this.#metadata;
if (sourceMetadata) {
this.destinationProvider.setMetadata?.('source', sourceMetadata);
}
}
2022-11-16 11:08:08 +01:00
async bootstrap(): Promise<void> {
await Promise.all([
// bootstrap source provider
this.sourceProvider.bootstrap?.(),
// bootstrap destination provider
this.destinationProvider.bootstrap?.(),
]);
}
async close(): Promise<void> {
await Promise.all([
// close source provider
this.sourceProvider.close?.(),
// close destination provider
this.destinationProvider.close?.(),
]);
}
async #resolveProviderResource() {
const sourceMetadata = await this.sourceProvider.getMetadata();
const destinationMetadata = await this.destinationProvider.getMetadata();
if (sourceMetadata) {
this.#metadata.source = sourceMetadata;
}
if (destinationMetadata) {
this.#metadata.destination = destinationMetadata;
}
}
async integrityCheck(): Promise<boolean> {
const { source: sourceMetadata, destination: destinationMetadata } = this.#metadata;
if (!sourceMetadata || !destinationMetadata) {
return true;
}
try {
// Version check
this.#assertStrapiVersionIntegrity(
sourceMetadata?.strapi?.version,
destinationMetadata?.strapi?.version
);
return true;
} catch (error) {
if (error instanceof Error) {
console.error('Integrity checks failed:', error.message);
}
return false;
}
}
async transfer(): Promise<void> {
try {
2022-11-16 11:08:08 +01:00
await this.bootstrap();
await this.init();
const isValidTransfer = await this.integrityCheck();
if (!isValidTransfer) {
throw new Error(
`Unable to transfer the data between ${this.sourceProvider.name} and ${this.destinationProvider.name}.\nPlease refer to the log above for more information.`
);
}
2022-11-16 15:00:29 +02:00
await this.prepareTransfer();
// Run the transfer stages
2022-11-03 10:12:16 +02:00
await this.transferSchemas();
await this.transferEntities();
await this.transferMedia();
await this.transferLinks();
await this.transferConfiguration();
// Gracefully close the providers
await this.close();
} catch (e) {
console.log('error', e);
// Rollback the destination provider if an exception is thrown during the transfer
// Note: This will be configurable in the future
// await this.destinationProvider?.rollback(e);
}
}
2022-11-16 15:00:29 +02:00
async prepareTransfer(): Promise<void> {
await this.destinationProvider.prepareForStreaming?.();
}
2022-11-03 10:12:16 +02:00
async transferSchemas(): Promise<void> {
const inStream = await this.sourceProvider.streamSchemas?.();
const outStream = await this.destinationProvider.getSchemasStream?.();
if (!inStream) {
throw new Error('Unable to transfer schemas, source stream is missing');
}
if (!outStream) {
throw new Error('Unable to transfer schemas, destination stream is missing');
2022-11-03 10:12:16 +02:00
}
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
.on('error', reject);
outStream
// Throw on error in the destination
.on('error', reject)
2022-11-03 10:12:16 +02:00
// Resolve the promise when the destination has finished reading all the data from the source
.on('close', resolve);
inStream.pipe(outStream);
});
}
async transferEntities(): Promise<void> {
const inStream = await this.sourceProvider.streamEntities?.();
const outStream = await this.destinationProvider.getEntitiesStream?.();
2022-10-31 10:21:23 +01:00
if (!inStream) {
throw new Error('Unable to transfer entities, source stream is missing');
}
2022-10-31 10:21:23 +01:00
if (!outStream) {
throw new Error('Unable to transfer entities, destination stream is missing');
}
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
.on('error', (e) => {
reject(e);
});
outStream
// Throw on error in the destination
.on('error', (e) => {
reject(e);
})
// Resolve the promise when the destination has finished reading all the data from the source
.on('close', resolve);
inStream.pipe(outStream);
});
}
async transferLinks(): Promise<void> {
const inStream = await this.sourceProvider.streamLinks?.();
const outStream = await this.destinationProvider.getLinksStream?.();
2022-10-31 10:21:23 +01:00
if (!inStream) {
throw new Error('Unable to transfer links, source stream is missing');
}
2022-10-31 10:21:23 +01:00
if (!outStream) {
throw new Error('Unable to transfer links, destination stream is missing');
}
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
.on('error', reject);
outStream
// Throw on error in the destination
.on('error', reject)
// Resolve the promise when the destination has finished reading all the data from the source
.on('close', resolve);
inStream.pipe(outStream);
});
}
async transferMedia(): Promise<void> {
console.warn('transferMedia not yet implemented');
return new Promise((resolve) => resolve());
}
async transferConfiguration(): Promise<void> {
2022-11-02 17:26:32 +01:00
const inStream = await this.sourceProvider.streamConfiguration?.();
const outStream = await this.destinationProvider.getConfigurationStream?.();
if (!inStream) {
throw new Error('Unable to transfer configuration, source stream is missing');
}
2022-11-02 17:26:32 +01:00
if (!outStream) {
throw new Error('Unable to transfer configuration, destination stream is missing');
}
return new Promise((resolve, reject) => {
inStream
// Throw on error in the source
.on('error', reject);
outStream
// Throw on error in the destination
.on('error', reject)
// Resolve the promise when the destination has finished reading all the data from the source
.on('close', resolve);
inStream.pipe(outStream);
});
}
}
export const createTransferEngine = <T extends ISourceProvider, U extends IDestinationProvider>(
sourceProvider: T,
destinationProvider: U,
options: ITransferEngineOptions
): TransferEngine => {
return new TransferEngine(sourceProvider, destinationProvider, options);
};