Merge pull request #15801 from strapi/data-transfer/remote-protocol-restrictions

This commit is contained in:
Jean-Sébastien Herbaux 2023-02-14 18:43:55 +01:00 committed by GitHub
commit 1472a5369c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 380 additions and 35 deletions

View File

@ -1,6 +1,7 @@
import { WebSocket } from 'ws'; import { WebSocket } from 'ws';
import { v4 } from 'uuid'; import { v4 } from 'uuid';
import { Writable } from 'stream'; import { Writable } from 'stream';
import { once } from 'lodash/fp';
import { createDispatcher } from './utils'; import { createDispatcher } from './utils';
@ -72,6 +73,46 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
}); });
} }
#startStepOnce(stage: client.TransferPushStep) {
return once(() => this.#startStep(stage));
}
async #startStep<T extends client.TransferPushStep>(step: T) {
try {
await this.dispatcher?.dispatchTransferStep({ action: 'start', step });
} catch (e) {
if (e instanceof Error) {
return e;
}
if (typeof e === 'string') {
return new ProviderTransferError(e);
}
return new ProviderTransferError('Unexpected error');
}
return null;
}
async #endStep<T extends client.TransferPushStep>(step: T) {
try {
await this.dispatcher?.dispatchTransferStep({ action: 'end', step });
} catch (e) {
if (e instanceof Error) {
return e;
}
if (typeof e === 'string') {
return new ProviderTransferError(e);
}
return new ProviderTransferError('Unexpected error');
}
return null;
}
async #streamStep<T extends client.TransferPushStep>( async #streamStep<T extends client.TransferPushStep>(
step: T, step: T,
data: client.GetTransferPushStreamData<T> data: client.GetTransferPushStreamData<T>
@ -174,48 +215,106 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
} }
createEntitiesWriteStream(): Writable { createEntitiesWriteStream(): Writable {
const startEntitiesTransferOnce = this.#startStepOnce('entities');
return new Writable({ return new Writable({
objectMode: true, objectMode: true,
write: async (entity: IEntity, _encoding, callback) => { final: async (callback) => {
const e = await this.#streamStep('entities', entity); const e = await this.#endStep('entities');
callback(e); callback(e);
}, },
write: async (entity: IEntity, _encoding, callback) => {
const startError = await startEntitiesTransferOnce();
if (startError) {
return callback(startError);
}
const streamError = await this.#streamStep('entities', entity);
callback(streamError);
},
}); });
} }
createLinksWriteStream(): Writable { createLinksWriteStream(): Writable {
const startLinksTransferOnce = this.#startStepOnce('links');
return new Writable({ return new Writable({
objectMode: true, objectMode: true,
write: async (link: ILink, _encoding, callback) => { final: async (callback) => {
const e = await this.#streamStep('links', link); const e = await this.#endStep('links');
callback(e); callback(e);
}, },
write: async (link: ILink, _encoding, callback) => {
const startError = await startLinksTransferOnce();
if (startError) {
return callback(startError);
}
const streamError = await this.#streamStep('links', link);
callback(streamError);
},
}); });
} }
createConfigurationWriteStream(): Writable { createConfigurationWriteStream(): Writable {
const startConfigurationTransferOnce = this.#startStepOnce('configuration');
return new Writable({ return new Writable({
objectMode: true, objectMode: true,
write: async (configuration: IConfiguration, _encoding, callback) => { final: async (callback) => {
const e = await this.#streamStep('configuration', configuration); const e = await this.#endStep('configuration');
callback(e); callback(e);
}, },
write: async (configuration: IConfiguration, _encoding, callback) => {
const startError = await startConfigurationTransferOnce();
if (startError) {
return callback(startError);
}
const streamError = await this.#streamStep('configuration', configuration);
callback(streamError);
},
}); });
} }
createAssetsWriteStream(): Writable | Promise<Writable> { createAssetsWriteStream(): Writable | Promise<Writable> {
const startAssetsTransferOnce = this.#startStepOnce('assets');
return new Writable({ return new Writable({
objectMode: true, objectMode: true,
final: async (callback) => { final: async (callback) => {
// TODO: replace this stream call by an end call // TODO: replace this stream call by an end call
const e = await this.#streamStep('assets', null); const endError = await this.#streamStep('assets', null);
callback(e); if (endError) {
return callback(endError);
}
const endStepError = await this.#endStep('assets');
if (endStepError) {
return callback(endStepError);
}
return callback(null);
}, },
write: async (asset: IAsset, _encoding, callback) => { write: async (asset: IAsset, _encoding, callback) => {
const startError = await startAssetsTransferOnce();
if (startError) {
return callback(startError);
}
const { filename, filepath, stats, stream } = asset; const { filename, filepath, stats, stream } = asset;
const assetID = v4(); const assetID = v4();

View File

@ -2,6 +2,7 @@ import { v4 } from 'uuid';
import { RawData, WebSocket } from 'ws'; import { RawData, WebSocket } from 'ws';
import type { client, server } from '../../../../types/remote/protocol'; import type { client, server } from '../../../../types/remote/protocol';
import { ProviderError } from '../../../errors/providers';
interface IDispatcherState { interface IDispatcherState {
transfer?: { kind: client.TransferKind; id: string }; transfer?: { kind: client.TransferKind; id: string };
@ -46,7 +47,7 @@ const createDispatcher = (ws: WebSocket) => {
const response: server.Message<U> = JSON.parse(raw.toString()); const response: server.Message<U> = JSON.parse(raw.toString());
if (response.uuid === uuid) { if (response.uuid === uuid) {
if (response.error) { if (response.error) {
return reject(new Error(response.error.message)); return reject(new ProviderError('error', response.error.message));
} }
resolve(response.data ?? null); resolve(response.data ?? null);

View File

@ -0,0 +1,40 @@
import type { Step } from '.';
export default [
{
kind: 'action',
action: 'bootstrap',
},
{
kind: 'action',
action: 'init',
},
{
kind: 'action',
action: 'beforeTransfer',
},
{
kind: 'transfer',
stage: 'schemas',
},
{
kind: 'transfer',
stage: 'entities',
},
{
kind: 'transfer',
stage: 'assets',
},
{
kind: 'transfer',
stage: 'links',
},
{
kind: 'transfer',
stage: 'configuration',
},
{
kind: 'action',
action: 'close',
},
] as readonly Step[];

View File

@ -0,0 +1,78 @@
import type { TransferStage } from '../../../../types';
export type Step =
| { kind: 'action'; action: string }
| { kind: 'transfer'; stage: TransferStage; locked?: boolean };
export { default as DEFAULT_TRANSFER_FLOW } from './default';
interface IState {
step: Step | null;
}
export const createFlow = (flow: readonly Step[]) => {
const state: IState = { step: null };
/**
* Equality check between two steps
*/
const stepEqual = (stepA: Step, stepB: Step): boolean => {
if (stepA.kind === 'action' && stepB.kind === 'action') {
return stepA.action === stepB.action;
}
if (stepA.kind === 'transfer' && stepB.kind === 'transfer') {
return stepA.stage === stepB.stage;
}
return false;
};
/**
* Find the index for a given step
*/
const findStepIndex = (step: Step) => flow.findIndex((flowStep) => stepEqual(step, flowStep));
return {
has(step: Step) {
return findStepIndex(step) !== -1;
},
can(step: Step) {
if (state.step === null) {
return true;
}
const indexesDifference = findStepIndex(step) - findStepIndex(state.step);
// It's possible to send multiple time the same transfer step in a row
if (indexesDifference === 0 && step.kind === 'transfer') {
return true;
}
return indexesDifference > 0;
},
cannot(step: Step) {
return !this.can(step);
},
set(step: Step) {
const canSwitch = this.can(step);
if (!canSwitch) {
throw new Error('Impossible to proceed to the given step');
}
state.step = step;
return this;
},
get() {
return state.step;
},
};
};
export type TransferFlow = ReturnType<typeof createFlow>;

View File

@ -5,16 +5,23 @@ import { randomUUID } from 'crypto';
import { WebSocket } from 'ws'; import { WebSocket } from 'ws';
import type { IPushController } from './controllers/push'; import type { IPushController } from './controllers/push';
import type { TransferFlow, Step } from './flows';
import type { client, server } from '../../../types/remote/protocol';
import createPushController from './controllers/push'; import createPushController from './controllers/push';
import type { client, server } from '../../../types/remote/protocol';
import { ProviderTransferError, ProviderInitializationError } from '../../errors/providers'; import { ProviderTransferError, ProviderInitializationError } from '../../errors/providers';
import { TRANSFER_METHODS } from './constants'; import { TRANSFER_METHODS } from './constants';
import { createFlow, DEFAULT_TRANSFER_FLOW } from './flows';
type TransferMethod = typeof TRANSFER_METHODS[number]; type TransferMethod = typeof TRANSFER_METHODS[number];
interface ITransferState { interface ITransferState {
transfer?: { id: string; kind: client.TransferKind }; transfer?: {
id: string;
kind: client.TransferKind;
startedAt: number;
flow: TransferFlow;
};
controller?: IPushController; controller?: IPushController;
} }
@ -44,6 +51,16 @@ export const createTransferHandler = (options: IHandlerOptions) => {
const state: ITransferState = {}; const state: ITransferState = {};
let uuid: string | undefined; let uuid: string | undefined;
function assertValidTransfer(
transferState: ITransferState
): asserts transferState is Required<ITransferState> {
const { transfer, controller } = transferState;
if (!controller || !transfer) {
throw new ProviderTransferError('Invalid transfer process');
}
}
/** /**
* Format error & message to follow the remote transfer protocol * Format error & message to follow the remote transfer protocol
*/ */
@ -91,11 +108,19 @@ export const createTransferHandler = (options: IHandlerOptions) => {
} }
}; };
const teardown = async (): Promise<server.Payload<server.EndMessage>> => { const teardown = (): void => {
await verifyAuth(state.transfer?.kind);
delete state.controller; delete state.controller;
delete state.transfer; delete state.transfer;
};
const end = async (msg: client.EndCommand): Promise<server.Payload<server.EndMessage>> => {
await verifyAuth(state.transfer?.kind);
if (msg.params.transferID !== state.transfer?.id) {
throw new ProviderTransferError('Bad transfer ID provided');
}
teardown();
return { ok: true }; return { ok: true };
}; };
@ -103,8 +128,9 @@ export const createTransferHandler = (options: IHandlerOptions) => {
const init = async ( const init = async (
msg: client.InitCommand msg: client.InitCommand
): Promise<server.Payload<server.InitMessage>> => { ): Promise<server.Payload<server.InitMessage>> => {
// TODO: this only checks for this instance of node: we should consider a database lock // TODO: For push transfer, we'll probably have to trigger a
if (state.controller) { // maintenance mode to prevent other transfer at the same time.
if (state.transfer || state.controller) {
throw new ProviderInitializationError('Transfer already in progres'); throw new ProviderInitializationError('Transfer already in progres');
} }
@ -131,11 +157,32 @@ export const createTransferHandler = (options: IHandlerOptions) => {
}); });
} }
state.transfer = { id: randomUUID(), kind: transfer }; state.transfer = {
id: randomUUID(),
kind: transfer,
startedAt: Date.now(),
flow: createFlow(DEFAULT_TRANSFER_FLOW),
};
return { transferID: state.transfer.id }; return { transferID: state.transfer.id };
}; };
const status = (): server.Payload<server.StatusMessage> => {
if (state.transfer) {
const { transfer } = state;
const elapsed = Date.now() - transfer.startedAt;
return {
active: true,
kind: transfer.kind,
startedAt: transfer.startedAt,
elapsed,
};
}
return { active: false, kind: null, elapsed: null, startedAt: null };
};
/** /**
* On command message (init, end, status, ...) * On command message (init, end, status, ...)
*/ */
@ -147,29 +194,27 @@ export const createTransferHandler = (options: IHandlerOptions) => {
} }
if (command === 'end') { if (command === 'end') {
await answer(teardown); assertValidTransfer(state);
await answer(() => end(msg));
} }
if (command === 'status') { if (command === 'status') {
await callback( await answer(status);
new ProviderTransferError('Command not implemented: "status"', {
command,
validCommands: ['init', 'end', 'status'],
})
);
} }
}; };
const onTransferCommand = async (msg: client.TransferMessage) => { const onTransferCommand = async (msg: client.TransferMessage) => {
const { transferID, kind } = msg; assertValidTransfer(state);
const { controller } = state;
await verifyAuth(state.transfer?.kind); const { transferID, kind } = msg;
const { controller, transfer } = state;
await verifyAuth(transfer.kind);
// TODO: (re)move this check // TODO: (re)move this check
// It shouldn't be possible to start a pull transfer for now, so reaching // It shouldn't be possible to start a pull transfer for now, so reaching
// this code should be impossible too, but this has been added by security // this code should be impossible too, but this has been added by security
if (state.transfer?.kind === 'pull') { if (transfer.kind === 'pull') {
return callback(new ProviderTransferError('Pull transfer not implemented')); return callback(new ProviderTransferError('Pull transfer not implemented'));
} }
@ -194,6 +239,24 @@ export const createTransferHandler = (options: IHandlerOptions) => {
); );
} }
const step: Step = { kind: 'action', action };
const isStepRegistered = transfer.flow.has(step);
if (isStepRegistered) {
if (transfer.flow.cannot(step)) {
return callback(
new ProviderTransferError(
`Invalid action "${action}" found for the current flow `,
{
action,
}
)
);
}
transfer.flow.set(step);
}
await answer(() => controller.actions[action as keyof typeof controller.actions]()); await answer(() => controller.actions[action as keyof typeof controller.actions]());
} }
@ -202,19 +265,78 @@ export const createTransferHandler = (options: IHandlerOptions) => {
// We can only have push transfer message for the moment // We can only have push transfer message for the moment
const message = msg as client.TransferPushMessage; const message = msg as client.TransferPushMessage;
// TODO: lock transfer process const currentStep = transfer.flow.get();
const step: Step = { kind: 'transfer', stage: message.step };
// Lock the current transfer stage
if (message.action === 'start') { if (message.action === 'start') {
// console.log('Starting transfer for ', message.step); if (currentStep?.kind === 'transfer' && currentStep.locked) {
return callback(
new ProviderTransferError(
`It's not possible to start a new transfer stage (${message.step}) while another one is in progress (${currentStep.stage})`
)
);
}
if (transfer.flow.cannot(step)) {
return callback(
new ProviderTransferError(
`Invalid stage (${message.step}) provided for the current flow`,
{ step }
)
);
}
transfer?.flow.set({ ...step, locked: true });
return callback(null, { ok: true });
} }
// Stream step // Stream operation on the current transfer stage
else if (message.action === 'stream') { if (message.action === 'stream') {
if (currentStep?.kind === 'transfer' && !currentStep.locked) {
return callback(
new ProviderTransferError(
`You need to initialize the transfer stage (${message.step}) before starting to stream data`
)
);
}
if (transfer?.flow.cannot(step)) {
return callback(
new ProviderTransferError(
`Invalid stage (${message.step}) provided for the current flow`,
{ step }
)
);
}
await answer(() => controller.transfer[message.step]?.(message.data as never)); await answer(() => controller.transfer[message.step]?.(message.data as never));
} }
// TODO: unlock transfer process // Unlock the current transfer stage
else if (message.action === 'end') { if (message.action === 'end') {
// console.log('Ending transfer for ', message.step); // Cannot unlock if not locked (aka: started)
if (currentStep?.kind === 'transfer' && !currentStep.locked) {
return callback(
new ProviderTransferError(
`You need to initialize the transfer stage (${message.step}) before ending it`
)
);
}
// Cannot unlock if invalid step provided
if (transfer?.flow.cannot(step)) {
return callback(
new ProviderTransferError(
`Invalid stage (${message.step}) provided for the current flow`,
{ step }
)
);
}
transfer?.flow.set({ ...step, locked: false });
return callback(null, { ok: true });
} }
} }
}; };

View File

@ -1,3 +1,4 @@
import { TransferKind } from '../client';
import type { ServerError } from './error'; import type { ServerError } from './error';
export type Message<T = unknown> = { export type Message<T = unknown> = {
@ -10,5 +11,9 @@ export type Message<T = unknown> = {
export type OKMessage = Message<{ ok: true }>; export type OKMessage = Message<{ ok: true }>;
export type InitMessage = Message<{ transferID: string }>; export type InitMessage = Message<{ transferID: string }>;
export type EndMessage = OKMessage; export type EndMessage = OKMessage;
export type StatusMessage = Message<
| { active: true; kind: TransferKind; startedAt: number; elapsed: number }
| { active: false; kind: null; startedAt: null; elapsed: null }
>;
export type Payload<T extends Message> = T['data']; export type Payload<T extends Message> = T['data'];