diff --git a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts index 413f73d4c5..bb7e1d7500 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts @@ -1,6 +1,7 @@ import { WebSocket } from 'ws'; import { v4 } from 'uuid'; import { Writable } from 'stream'; +import { once } from 'lodash/fp'; import { createDispatcher } from './utils'; @@ -72,6 +73,46 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { }); } + #startStepOnce(stage: client.TransferPushStep) { + return once(() => this.#startStep(stage)); + } + + async #startStep(step: T) { + try { + await this.dispatcher?.dispatchTransferStep({ action: 'start', step }); + } catch (e) { + if (e instanceof Error) { + return e; + } + + if (typeof e === 'string') { + return new ProviderTransferError(e); + } + + return new ProviderTransferError('Unexpected error'); + } + + return null; + } + + async #endStep(step: T) { + try { + await this.dispatcher?.dispatchTransferStep({ action: 'end', step }); + } catch (e) { + if (e instanceof Error) { + return e; + } + + if (typeof e === 'string') { + return new ProviderTransferError(e); + } + + return new ProviderTransferError('Unexpected error'); + } + + return null; + } + async #streamStep( step: T, data: client.GetTransferPushStreamData @@ -174,48 +215,106 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { } createEntitiesWriteStream(): Writable { + const startEntitiesTransferOnce = this.#startStepOnce('entities'); + return new Writable({ objectMode: true, - write: async (entity: IEntity, _encoding, callback) => { - const e = await this.#streamStep('entities', entity); + final: async (callback) => { + const e = await this.#endStep('entities'); callback(e); }, + write: async (entity: IEntity, _encoding, callback) => { + const startError = await startEntitiesTransferOnce(); + + if (startError) { + return callback(startError); + } + + const streamError = await this.#streamStep('entities', entity); + + callback(streamError); + }, }); } createLinksWriteStream(): Writable { + const startLinksTransferOnce = this.#startStepOnce('links'); + return new Writable({ objectMode: true, - write: async (link: ILink, _encoding, callback) => { - const e = await this.#streamStep('links', link); + final: async (callback) => { + const e = await this.#endStep('links'); callback(e); }, + write: async (link: ILink, _encoding, callback) => { + const startError = await startLinksTransferOnce(); + + if (startError) { + return callback(startError); + } + + const streamError = await this.#streamStep('links', link); + + callback(streamError); + }, }); } createConfigurationWriteStream(): Writable { + const startConfigurationTransferOnce = this.#startStepOnce('configuration'); + return new Writable({ objectMode: true, - write: async (configuration: IConfiguration, _encoding, callback) => { - const e = await this.#streamStep('configuration', configuration); + final: async (callback) => { + const e = await this.#endStep('configuration'); callback(e); }, + write: async (configuration: IConfiguration, _encoding, callback) => { + const startError = await startConfigurationTransferOnce(); + + if (startError) { + return callback(startError); + } + + const streamError = await this.#streamStep('configuration', configuration); + + callback(streamError); + }, }); } createAssetsWriteStream(): Writable | Promise { + const startAssetsTransferOnce = this.#startStepOnce('assets'); + return new Writable({ objectMode: true, final: async (callback) => { // TODO: replace this stream call by an end call - const e = await this.#streamStep('assets', null); + const endError = await this.#streamStep('assets', null); - callback(e); + if (endError) { + return callback(endError); + } + + const endStepError = await this.#endStep('assets'); + + if (endStepError) { + return callback(endStepError); + } + + return callback(null); }, + write: async (asset: IAsset, _encoding, callback) => { + const startError = await startAssetsTransferOnce(); + + if (startError) { + return callback(startError); + } + const { filename, filepath, stats, stream } = asset; const assetID = v4(); diff --git a/packages/core/data-transfer/src/strapi/providers/remote-destination/utils.ts b/packages/core/data-transfer/src/strapi/providers/remote-destination/utils.ts index 2f55ca9227..89f9f7bfad 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-destination/utils.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-destination/utils.ts @@ -2,6 +2,7 @@ import { v4 } from 'uuid'; import { RawData, WebSocket } from 'ws'; import type { client, server } from '../../../../types/remote/protocol'; +import { ProviderError } from '../../../errors/providers'; interface IDispatcherState { transfer?: { kind: client.TransferKind; id: string }; @@ -46,7 +47,7 @@ const createDispatcher = (ws: WebSocket) => { const response: server.Message = JSON.parse(raw.toString()); if (response.uuid === uuid) { if (response.error) { - return reject(new Error(response.error.message)); + return reject(new ProviderError('error', response.error.message)); } resolve(response.data ?? null); diff --git a/packages/core/data-transfer/src/strapi/remote/flows/default.ts b/packages/core/data-transfer/src/strapi/remote/flows/default.ts new file mode 100644 index 0000000000..6f1dbf41dc --- /dev/null +++ b/packages/core/data-transfer/src/strapi/remote/flows/default.ts @@ -0,0 +1,40 @@ +import type { Step } from '.'; + +export default [ + { + kind: 'action', + action: 'bootstrap', + }, + { + kind: 'action', + action: 'init', + }, + { + kind: 'action', + action: 'beforeTransfer', + }, + { + kind: 'transfer', + stage: 'schemas', + }, + { + kind: 'transfer', + stage: 'entities', + }, + { + kind: 'transfer', + stage: 'assets', + }, + { + kind: 'transfer', + stage: 'links', + }, + { + kind: 'transfer', + stage: 'configuration', + }, + { + kind: 'action', + action: 'close', + }, +] as readonly Step[]; diff --git a/packages/core/data-transfer/src/strapi/remote/flows/index.ts b/packages/core/data-transfer/src/strapi/remote/flows/index.ts new file mode 100644 index 0000000000..528670eb28 --- /dev/null +++ b/packages/core/data-transfer/src/strapi/remote/flows/index.ts @@ -0,0 +1,78 @@ +import type { TransferStage } from '../../../../types'; + +export type Step = + | { kind: 'action'; action: string } + | { kind: 'transfer'; stage: TransferStage; locked?: boolean }; + +export { default as DEFAULT_TRANSFER_FLOW } from './default'; + +interface IState { + step: Step | null; +} + +export const createFlow = (flow: readonly Step[]) => { + const state: IState = { step: null }; + + /** + * Equality check between two steps + */ + const stepEqual = (stepA: Step, stepB: Step): boolean => { + if (stepA.kind === 'action' && stepB.kind === 'action') { + return stepA.action === stepB.action; + } + + if (stepA.kind === 'transfer' && stepB.kind === 'transfer') { + return stepA.stage === stepB.stage; + } + + return false; + }; + + /** + * Find the index for a given step + */ + const findStepIndex = (step: Step) => flow.findIndex((flowStep) => stepEqual(step, flowStep)); + + return { + has(step: Step) { + return findStepIndex(step) !== -1; + }, + + can(step: Step) { + if (state.step === null) { + return true; + } + + const indexesDifference = findStepIndex(step) - findStepIndex(state.step); + + // It's possible to send multiple time the same transfer step in a row + if (indexesDifference === 0 && step.kind === 'transfer') { + return true; + } + + return indexesDifference > 0; + }, + + cannot(step: Step) { + return !this.can(step); + }, + + set(step: Step) { + const canSwitch = this.can(step); + + if (!canSwitch) { + throw new Error('Impossible to proceed to the given step'); + } + + state.step = step; + + return this; + }, + + get() { + return state.step; + }, + }; +}; + +export type TransferFlow = ReturnType; diff --git a/packages/core/data-transfer/src/strapi/remote/handlers.ts b/packages/core/data-transfer/src/strapi/remote/handlers.ts index 9b4cf00534..cf8c33c0d7 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers.ts @@ -5,16 +5,23 @@ import { randomUUID } from 'crypto'; import { WebSocket } from 'ws'; import type { IPushController } from './controllers/push'; +import type { TransferFlow, Step } from './flows'; +import type { client, server } from '../../../types/remote/protocol'; import createPushController from './controllers/push'; -import type { client, server } from '../../../types/remote/protocol'; import { ProviderTransferError, ProviderInitializationError } from '../../errors/providers'; import { TRANSFER_METHODS } from './constants'; +import { createFlow, DEFAULT_TRANSFER_FLOW } from './flows'; type TransferMethod = typeof TRANSFER_METHODS[number]; interface ITransferState { - transfer?: { id: string; kind: client.TransferKind }; + transfer?: { + id: string; + kind: client.TransferKind; + startedAt: number; + flow: TransferFlow; + }; controller?: IPushController; } @@ -44,6 +51,16 @@ export const createTransferHandler = (options: IHandlerOptions) => { const state: ITransferState = {}; let uuid: string | undefined; + function assertValidTransfer( + transferState: ITransferState + ): asserts transferState is Required { + const { transfer, controller } = transferState; + + if (!controller || !transfer) { + throw new ProviderTransferError('Invalid transfer process'); + } + } + /** * Format error & message to follow the remote transfer protocol */ @@ -91,11 +108,19 @@ export const createTransferHandler = (options: IHandlerOptions) => { } }; - const teardown = async (): Promise> => { - await verifyAuth(state.transfer?.kind); - + const teardown = (): void => { delete state.controller; delete state.transfer; + }; + + const end = async (msg: client.EndCommand): Promise> => { + await verifyAuth(state.transfer?.kind); + + if (msg.params.transferID !== state.transfer?.id) { + throw new ProviderTransferError('Bad transfer ID provided'); + } + + teardown(); return { ok: true }; }; @@ -103,8 +128,9 @@ export const createTransferHandler = (options: IHandlerOptions) => { const init = async ( msg: client.InitCommand ): Promise> => { - // TODO: this only checks for this instance of node: we should consider a database lock - if (state.controller) { + // TODO: For push transfer, we'll probably have to trigger a + // maintenance mode to prevent other transfer at the same time. + if (state.transfer || state.controller) { throw new ProviderInitializationError('Transfer already in progres'); } @@ -131,11 +157,32 @@ export const createTransferHandler = (options: IHandlerOptions) => { }); } - state.transfer = { id: randomUUID(), kind: transfer }; + state.transfer = { + id: randomUUID(), + kind: transfer, + startedAt: Date.now(), + flow: createFlow(DEFAULT_TRANSFER_FLOW), + }; return { transferID: state.transfer.id }; }; + const status = (): server.Payload => { + if (state.transfer) { + const { transfer } = state; + const elapsed = Date.now() - transfer.startedAt; + + return { + active: true, + kind: transfer.kind, + startedAt: transfer.startedAt, + elapsed, + }; + } + + return { active: false, kind: null, elapsed: null, startedAt: null }; + }; + /** * On command message (init, end, status, ...) */ @@ -147,29 +194,27 @@ export const createTransferHandler = (options: IHandlerOptions) => { } if (command === 'end') { - await answer(teardown); + assertValidTransfer(state); + await answer(() => end(msg)); } if (command === 'status') { - await callback( - new ProviderTransferError('Command not implemented: "status"', { - command, - validCommands: ['init', 'end', 'status'], - }) - ); + await answer(status); } }; const onTransferCommand = async (msg: client.TransferMessage) => { - const { transferID, kind } = msg; - const { controller } = state; + assertValidTransfer(state); - await verifyAuth(state.transfer?.kind); + const { transferID, kind } = msg; + const { controller, transfer } = state; + + await verifyAuth(transfer.kind); // TODO: (re)move this check // It shouldn't be possible to start a pull transfer for now, so reaching // this code should be impossible too, but this has been added by security - if (state.transfer?.kind === 'pull') { + if (transfer.kind === 'pull') { return callback(new ProviderTransferError('Pull transfer not implemented')); } @@ -194,6 +239,24 @@ export const createTransferHandler = (options: IHandlerOptions) => { ); } + const step: Step = { kind: 'action', action }; + const isStepRegistered = transfer.flow.has(step); + + if (isStepRegistered) { + if (transfer.flow.cannot(step)) { + return callback( + new ProviderTransferError( + `Invalid action "${action}" found for the current flow `, + { + action, + } + ) + ); + } + + transfer.flow.set(step); + } + await answer(() => controller.actions[action as keyof typeof controller.actions]()); } @@ -202,19 +265,78 @@ export const createTransferHandler = (options: IHandlerOptions) => { // We can only have push transfer message for the moment const message = msg as client.TransferPushMessage; - // TODO: lock transfer process + const currentStep = transfer.flow.get(); + const step: Step = { kind: 'transfer', stage: message.step }; + + // Lock the current transfer stage if (message.action === 'start') { - // console.log('Starting transfer for ', message.step); + if (currentStep?.kind === 'transfer' && currentStep.locked) { + return callback( + new ProviderTransferError( + `It's not possible to start a new transfer stage (${message.step}) while another one is in progress (${currentStep.stage})` + ) + ); + } + + if (transfer.flow.cannot(step)) { + return callback( + new ProviderTransferError( + `Invalid stage (${message.step}) provided for the current flow`, + { step } + ) + ); + } + + transfer?.flow.set({ ...step, locked: true }); + + return callback(null, { ok: true }); } - // Stream step - else if (message.action === 'stream') { + // Stream operation on the current transfer stage + if (message.action === 'stream') { + if (currentStep?.kind === 'transfer' && !currentStep.locked) { + return callback( + new ProviderTransferError( + `You need to initialize the transfer stage (${message.step}) before starting to stream data` + ) + ); + } + if (transfer?.flow.cannot(step)) { + return callback( + new ProviderTransferError( + `Invalid stage (${message.step}) provided for the current flow`, + { step } + ) + ); + } + await answer(() => controller.transfer[message.step]?.(message.data as never)); } - // TODO: unlock transfer process - else if (message.action === 'end') { - // console.log('Ending transfer for ', message.step); + // Unlock the current transfer stage + if (message.action === 'end') { + // Cannot unlock if not locked (aka: started) + if (currentStep?.kind === 'transfer' && !currentStep.locked) { + return callback( + new ProviderTransferError( + `You need to initialize the transfer stage (${message.step}) before ending it` + ) + ); + } + + // Cannot unlock if invalid step provided + if (transfer?.flow.cannot(step)) { + return callback( + new ProviderTransferError( + `Invalid stage (${message.step}) provided for the current flow`, + { step } + ) + ); + } + + transfer?.flow.set({ ...step, locked: false }); + + return callback(null, { ok: true }); } } }; diff --git a/packages/core/data-transfer/types/remote/protocol/server/messaging.d.ts b/packages/core/data-transfer/types/remote/protocol/server/messaging.d.ts index 1c0d5cd71d..1ce09c071e 100644 --- a/packages/core/data-transfer/types/remote/protocol/server/messaging.d.ts +++ b/packages/core/data-transfer/types/remote/protocol/server/messaging.d.ts @@ -1,3 +1,4 @@ +import { TransferKind } from '../client'; import type { ServerError } from './error'; export type Message = { @@ -10,5 +11,9 @@ export type Message = { export type OKMessage = Message<{ ok: true }>; export type InitMessage = Message<{ transferID: string }>; export type EndMessage = OKMessage; +export type StatusMessage = Message< + | { active: true; kind: TransferKind; startedAt: number; elapsed: number } + | { active: false; kind: null; startedAt: null; elapsed: null } +>; export type Payload = T['data'];