mirror of
https://github.com/strapi/strapi.git
synced 2025-08-08 08:46:42 +00:00
Merge pull request #15323 from strapi/deits/transfer-protocol
This commit is contained in:
commit
2dedb47b45
@ -31,6 +31,7 @@ module.exports = {
|
||||
'no-continue': 'warn',
|
||||
'no-process-exit': 'off',
|
||||
'no-loop-func': 'off',
|
||||
'max-classes-per-file': 'off',
|
||||
'no-param-reassign': [
|
||||
'error',
|
||||
{
|
||||
|
@ -22,6 +22,7 @@ module.exports = {
|
||||
'@typescript-eslint/brace-style': 'off', // TODO: fix conflict with prettier/prettier in data-transfer/engine/index.ts
|
||||
// to be cleaned up throughout codebase (too many to fix at the moment)
|
||||
'@typescript-eslint/no-use-before-define': 'warn',
|
||||
'@typescript-eslint/comma-dangle': 'off',
|
||||
},
|
||||
// Disable only for tests
|
||||
overrides: [
|
||||
|
@ -63,7 +63,8 @@
|
||||
"@types/tar": "6.1.3",
|
||||
"@types/tar-stream": "2.2.2",
|
||||
"@types/uuid": "9.0.0",
|
||||
"@types/koa": "2.13.1",
|
||||
"koa": "2.13.4",
|
||||
"@types/koa": "2.13.4",
|
||||
"rimraf": "3.0.2",
|
||||
"typescript": "4.6.2"
|
||||
},
|
||||
|
@ -1,14 +1,15 @@
|
||||
import { WebSocket } from 'ws';
|
||||
import type { IRemoteStrapiDestinationProviderOptions } from '..';
|
||||
|
||||
import { createRemoteStrapiDestinationProvider } from '..';
|
||||
|
||||
const defaultOptions: IRemoteStrapiDestinationProviderOptions = {
|
||||
strategy: 'restore',
|
||||
url: 'ws://test.com/admin/transfer',
|
||||
url: '<some_url>',
|
||||
};
|
||||
|
||||
jest.mock('../utils', () => ({
|
||||
dispatch: jest.fn(),
|
||||
createDispatcher: jest.fn(),
|
||||
}));
|
||||
|
||||
jest.mock('ws', () => ({
|
||||
@ -40,9 +41,14 @@ describe('Remote Strapi Destination', () => {
|
||||
|
||||
test('Should have a defined websocket connection if bootstrap has been called', async () => {
|
||||
const provider = createRemoteStrapiDestinationProvider(defaultOptions);
|
||||
await provider.bootstrap();
|
||||
try {
|
||||
await provider.bootstrap();
|
||||
} catch {
|
||||
// ignore ws connection error
|
||||
}
|
||||
|
||||
expect(provider.ws).not.toBeNull();
|
||||
expect(provider.ws?.readyState).toBe(WebSocket.CLOSED);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -1,5 +1,6 @@
|
||||
import { WebSocket } from 'ws';
|
||||
import { dispatch } from '../utils';
|
||||
import { CommandMessage } from '../../../../../types/remote/protocol/client';
|
||||
import { createDispatcher } from '../utils';
|
||||
|
||||
jest.mock('ws', () => ({
|
||||
WebSocket: jest.fn().mockImplementation(() => {
|
||||
@ -18,11 +19,12 @@ afterEach(() => {
|
||||
describe('Remote Strapi Destination Utils', () => {
|
||||
test('Dispatch method sends payload', () => {
|
||||
const ws = new WebSocket('ws://test/admin/transfer');
|
||||
const message = {
|
||||
test: 'hello',
|
||||
const message: CommandMessage = {
|
||||
type: 'command',
|
||||
command: 'status',
|
||||
};
|
||||
|
||||
dispatch(ws, message);
|
||||
createDispatcher(ws).dispatch(message);
|
||||
|
||||
expect.extend({
|
||||
toContain(receivedString, expected) {
|
||||
|
@ -2,6 +2,8 @@ import { WebSocket } from 'ws';
|
||||
import { v4 } from 'uuid';
|
||||
import { Writable } from 'stream';
|
||||
|
||||
import { createDispatcher } from './utils';
|
||||
|
||||
import type {
|
||||
IDestinationProvider,
|
||||
IEntity,
|
||||
@ -9,11 +11,10 @@ import type {
|
||||
IMetadata,
|
||||
ProviderType,
|
||||
IConfiguration,
|
||||
TransferStage,
|
||||
IAsset,
|
||||
} from '../../../../types';
|
||||
import type { client, server } from '../../../../types/remote/protocol';
|
||||
import type { ILocalStrapiDestinationProviderOptions } from '../local-destination';
|
||||
import { dispatch } from './utils';
|
||||
|
||||
interface ITokenAuth {
|
||||
type: 'token';
|
||||
@ -32,14 +33,6 @@ export interface IRemoteStrapiDestinationProviderOptions
|
||||
auth?: ITokenAuth | ICredentialsAuth;
|
||||
}
|
||||
|
||||
type Actions = 'bootstrap' | 'close' | 'beforeTransfer' | 'getMetadata' | 'getSchemas';
|
||||
|
||||
export const createRemoteStrapiDestinationProvider = (
|
||||
options: IRemoteStrapiDestinationProviderOptions
|
||||
) => {
|
||||
return new RemoteStrapiDestinationProvider(options);
|
||||
};
|
||||
|
||||
class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
name = 'destination::remote-strapi';
|
||||
|
||||
@ -49,23 +42,53 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
|
||||
ws: WebSocket | null;
|
||||
|
||||
dispatcher: ReturnType<typeof createDispatcher> | null;
|
||||
|
||||
constructor(options: IRemoteStrapiDestinationProviderOptions) {
|
||||
this.options = options;
|
||||
this.ws = null;
|
||||
this.dispatcher = null;
|
||||
}
|
||||
|
||||
async #dispatchAction<T = unknown>(action: Actions) {
|
||||
return dispatch<T>(this.ws, { type: 'action', action });
|
||||
async initTransfer(): Promise<string> {
|
||||
const { strategy, restore } = this.options;
|
||||
|
||||
// Wait for the connection to be made to the server, then init the transfer
|
||||
return new Promise<string>((resolve, reject) => {
|
||||
this.ws
|
||||
?.once('open', async () => {
|
||||
const query = this.dispatcher?.dispatchCommand({
|
||||
command: 'init',
|
||||
params: { options: { strategy, restore }, transfer: 'push' },
|
||||
});
|
||||
|
||||
const res = (await query) as server.Payload<server.InitMessage>;
|
||||
|
||||
if (!res?.transferID) {
|
||||
return reject(new Error('Init failed, invalid response from the server'));
|
||||
}
|
||||
|
||||
resolve(res.transferID);
|
||||
})
|
||||
.once('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
async #dispatchTransfer<T = unknown>(stage: TransferStage, data: T) {
|
||||
async #streamStep<T extends client.TransferPushStep>(
|
||||
step: T,
|
||||
data: client.GetTransferPushStreamData<T>
|
||||
) {
|
||||
try {
|
||||
await dispatch(this.ws, { type: 'transfer', stage, data });
|
||||
await this.dispatcher?.dispatchTransferStep({ action: 'stream', step, data });
|
||||
} catch (e) {
|
||||
if (e instanceof Error) {
|
||||
return e;
|
||||
}
|
||||
|
||||
if (typeof e === 'string') {
|
||||
return new Error(e);
|
||||
}
|
||||
|
||||
return new Error('Unexpected error');
|
||||
}
|
||||
|
||||
@ -73,7 +96,7 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
}
|
||||
|
||||
async bootstrap(): Promise<void> {
|
||||
const { url, auth, strategy, restore } = this.options;
|
||||
const { url, auth } = this.options;
|
||||
|
||||
let ws: WebSocket;
|
||||
|
||||
@ -95,21 +118,17 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
}
|
||||
|
||||
this.ws = ws;
|
||||
this.dispatcher = createDispatcher(this.ws);
|
||||
|
||||
// Wait for the connection to be made to the server, then init the transfer
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
ws.once('open', async () => {
|
||||
await dispatch(this.ws, { type: 'init', kind: 'push', options: { strategy, restore } });
|
||||
resolve();
|
||||
}).once('error', reject);
|
||||
});
|
||||
const transferID = await this.initTransfer();
|
||||
|
||||
// Run the bootstrap
|
||||
await this.#dispatchAction('bootstrap');
|
||||
this.dispatcher.setTransferProperties({ id: transferID, kind: 'push' });
|
||||
|
||||
await this.dispatcher.dispatchTransferAction('bootstrap');
|
||||
}
|
||||
|
||||
async close() {
|
||||
await this.#dispatchAction('close');
|
||||
await this.dispatcher?.dispatchTransferAction('close');
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const { ws } = this;
|
||||
@ -124,22 +143,26 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
}
|
||||
|
||||
getMetadata() {
|
||||
return this.#dispatchAction<IMetadata>('getMetadata');
|
||||
return this.dispatcher?.dispatchTransferAction<IMetadata>('getMetadata') ?? null;
|
||||
}
|
||||
|
||||
async beforeTransfer() {
|
||||
await this.#dispatchAction('beforeTransfer');
|
||||
await this.dispatcher?.dispatchTransferAction('beforeTransfer');
|
||||
}
|
||||
|
||||
getSchemas(): Promise<Strapi.Schemas> {
|
||||
return this.#dispatchAction<Strapi.Schemas>('getSchemas');
|
||||
getSchemas(): Promise<Strapi.Schemas | null> {
|
||||
if (!this.dispatcher) {
|
||||
return Promise.resolve(null);
|
||||
}
|
||||
|
||||
return this.dispatcher.dispatchTransferAction<Strapi.Schemas>('getSchemas');
|
||||
}
|
||||
|
||||
getEntitiesStream(): Writable {
|
||||
return new Writable({
|
||||
objectMode: true,
|
||||
write: async (entity: IEntity, _encoding, callback) => {
|
||||
const e = await this.#dispatchTransfer('entities', entity);
|
||||
const e = await this.#streamStep('entities', entity);
|
||||
|
||||
callback(e);
|
||||
},
|
||||
@ -150,7 +173,7 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
return new Writable({
|
||||
objectMode: true,
|
||||
write: async (link: ILink, _encoding, callback) => {
|
||||
const e = await this.#dispatchTransfer('links', link);
|
||||
const e = await this.#streamStep('links', link);
|
||||
|
||||
callback(e);
|
||||
},
|
||||
@ -161,7 +184,7 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
return new Writable({
|
||||
objectMode: true,
|
||||
write: async (configuration: IConfiguration, _encoding, callback) => {
|
||||
const e = await this.#dispatchTransfer('configuration', configuration);
|
||||
const e = await this.#streamStep('configuration', configuration);
|
||||
|
||||
callback(e);
|
||||
},
|
||||
@ -172,29 +195,31 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
return new Writable({
|
||||
objectMode: true,
|
||||
final: async (callback) => {
|
||||
const e = await this.#dispatchTransfer('assets', null);
|
||||
// TODO: replace this stream call by an end call
|
||||
const e = await this.#streamStep('assets', null);
|
||||
|
||||
callback(e);
|
||||
},
|
||||
write: async (asset: IAsset, _encoding, callback) => {
|
||||
const { filename, filepath, stats, stream } = asset;
|
||||
const assetID = v4();
|
||||
|
||||
await this.#dispatchTransfer('assets', {
|
||||
step: 'start',
|
||||
await this.#streamStep('assets', {
|
||||
action: 'start',
|
||||
assetID,
|
||||
data: { filename, filepath, stats },
|
||||
});
|
||||
|
||||
for await (const chunk of stream) {
|
||||
await this.#dispatchTransfer('assets', {
|
||||
step: 'stream',
|
||||
await this.#streamStep('assets', {
|
||||
action: 'stream',
|
||||
assetID,
|
||||
data: { chunk },
|
||||
data: chunk,
|
||||
});
|
||||
}
|
||||
|
||||
await this.#dispatchTransfer('assets', {
|
||||
step: 'end',
|
||||
await this.#streamStep('assets', {
|
||||
action: 'end',
|
||||
assetID,
|
||||
});
|
||||
|
||||
@ -203,3 +228,9 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export const createRemoteStrapiDestinationProvider = (
|
||||
options: IRemoteStrapiDestinationProviderOptions
|
||||
) => {
|
||||
return new RemoteStrapiDestinationProvider(options);
|
||||
};
|
||||
|
@ -1,34 +1,123 @@
|
||||
import { set } from 'lodash/fp';
|
||||
import { v4 } from 'uuid';
|
||||
import { WebSocket } from 'ws';
|
||||
import { RawData, WebSocket } from 'ws';
|
||||
|
||||
export async function dispatch<U = unknown, T extends object = object>(
|
||||
ws: WebSocket | null,
|
||||
message: T
|
||||
): Promise<U> {
|
||||
if (!ws) {
|
||||
throw new Error('No websocket connection found');
|
||||
}
|
||||
import type { client, server } from '../../../../types/remote/protocol';
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const uuid = v4();
|
||||
const payload = JSON.stringify({ ...message, uuid });
|
||||
|
||||
ws.send(payload, (error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
|
||||
ws.once('message', (raw) => {
|
||||
const response: { uuid: string; data: U; error: string | null } = JSON.parse(raw.toString());
|
||||
|
||||
if (response.error) {
|
||||
return reject(new Error(response.error));
|
||||
}
|
||||
|
||||
if (response.uuid === uuid) {
|
||||
return resolve(response.data);
|
||||
}
|
||||
});
|
||||
});
|
||||
interface IDispatcherState {
|
||||
transfer?: { kind: client.TransferKind; id: string };
|
||||
}
|
||||
|
||||
interface IDispatchOptions {
|
||||
attachTransfer?: boolean;
|
||||
}
|
||||
|
||||
type Dispatch<T> = Omit<T, 'transferID' | 'uuid'>;
|
||||
|
||||
const createDispatcher = (ws: WebSocket) => {
|
||||
const state: IDispatcherState = {};
|
||||
|
||||
type DispatchMessage = Dispatch<client.Message>;
|
||||
|
||||
const dispatch = async <U = null>(
|
||||
message: DispatchMessage,
|
||||
options: IDispatchOptions = {}
|
||||
): Promise<U | null> => {
|
||||
if (!ws) {
|
||||
throw new Error('No websocket connection found');
|
||||
}
|
||||
|
||||
return new Promise<U | null>((resolve, reject) => {
|
||||
const uuid = v4();
|
||||
const payload = { ...message, uuid };
|
||||
|
||||
if (options.attachTransfer) {
|
||||
Object.assign(payload, { transferID: state.transfer?.id });
|
||||
}
|
||||
|
||||
const stringifiedPayload = JSON.stringify(payload);
|
||||
|
||||
ws.send(stringifiedPayload, (error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
|
||||
const onResponse = (raw: RawData) => {
|
||||
const response: server.Message<U> = JSON.parse(raw.toString());
|
||||
if (response.uuid === uuid) {
|
||||
if (response.error) {
|
||||
return reject(new Error(response.error.message));
|
||||
}
|
||||
|
||||
resolve(response.data ?? null);
|
||||
} else {
|
||||
ws.once('message', onResponse);
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: What happens if the server sends another message (not a response to this message)
|
||||
ws.once('message', onResponse);
|
||||
});
|
||||
};
|
||||
|
||||
const dispatchCommand = <U extends client.Command>(
|
||||
payload: {
|
||||
command: U;
|
||||
} & ([client.GetCommandParams<U>] extends [never]
|
||||
? unknown
|
||||
: { params: client.GetCommandParams<U> })
|
||||
) => {
|
||||
return dispatch({ type: 'command', ...payload } as client.CommandMessage);
|
||||
};
|
||||
|
||||
const dispatchTransferAction = async <T>(action: client.Action['action']) => {
|
||||
const payload: Dispatch<client.Action> = { type: 'transfer', kind: 'action', action };
|
||||
|
||||
return dispatch<T>(payload, { attachTransfer: true }) ?? Promise.resolve(null);
|
||||
};
|
||||
|
||||
const dispatchTransferStep = async <
|
||||
T,
|
||||
A extends client.TransferPushMessage['action'] = client.TransferPushMessage['action'],
|
||||
S extends client.TransferPushStep = client.TransferPushStep
|
||||
>(
|
||||
payload: {
|
||||
step: S;
|
||||
action: A;
|
||||
} & (A extends 'stream' ? { data: client.GetTransferPushStreamData<S> } : unknown)
|
||||
) => {
|
||||
const message: Dispatch<client.TransferPushMessage> = {
|
||||
type: 'transfer',
|
||||
kind: 'step',
|
||||
...payload,
|
||||
};
|
||||
|
||||
return dispatch<T>(message, { attachTransfer: true }) ?? Promise.resolve(null);
|
||||
};
|
||||
|
||||
const setTransferProperties = (
|
||||
properties: Exclude<IDispatcherState['transfer'], undefined>
|
||||
): void => {
|
||||
state.transfer = { ...properties };
|
||||
};
|
||||
|
||||
return {
|
||||
get transferID() {
|
||||
return state.transfer?.id;
|
||||
},
|
||||
|
||||
get transferKind() {
|
||||
return state.transfer?.kind;
|
||||
},
|
||||
|
||||
setTransferProperties,
|
||||
|
||||
dispatch,
|
||||
dispatchCommand,
|
||||
dispatchTransferAction,
|
||||
dispatchTransferStep,
|
||||
};
|
||||
};
|
||||
|
||||
export { createDispatcher };
|
||||
|
@ -1,13 +1,17 @@
|
||||
import { PassThrough, Writable } from 'stream-chain';
|
||||
|
||||
import { IAsset, IMetadata, PushTransferMessage, PushTransferStage } from '../../../../types';
|
||||
import type { IAsset, IMetadata } from '../../../../types';
|
||||
import type {
|
||||
TransferPushMessage,
|
||||
TransferPushStep,
|
||||
} from '../../../../types/remote/protocol/client';
|
||||
import {
|
||||
createLocalStrapiDestinationProvider,
|
||||
ILocalStrapiDestinationProviderOptions,
|
||||
} from '../../providers';
|
||||
|
||||
export interface IPushController {
|
||||
streams: { [stage in PushTransferStage]?: Writable };
|
||||
streams: { [stage in TransferPushStep]?: Writable };
|
||||
actions: {
|
||||
getMetadata(): Promise<IMetadata>;
|
||||
getSchemas(): Strapi.Schemas;
|
||||
@ -16,8 +20,8 @@ export interface IPushController {
|
||||
beforeTransfer(): Promise<void>;
|
||||
};
|
||||
transfer: {
|
||||
[key in PushTransferStage]: <T extends PushTransferMessage>(
|
||||
value: T extends { stage: key; data: infer U } ? U : never
|
||||
[key in TransferPushStep]: <T extends TransferPushMessage>(
|
||||
value: T extends { step: key; data: infer U } ? U : never
|
||||
) => Promise<void>;
|
||||
};
|
||||
}
|
||||
@ -25,7 +29,7 @@ export interface IPushController {
|
||||
const createPushController = (options: ILocalStrapiDestinationProviderOptions): IPushController => {
|
||||
const provider = createLocalStrapiDestinationProvider(options);
|
||||
|
||||
const streams: { [stage in PushTransferStage]?: Writable } = {};
|
||||
const streams: { [stage in TransferPushStep]?: Writable } = {};
|
||||
const assets: { [filepath: string]: IAsset & { stream: PassThrough } } = {};
|
||||
|
||||
const writeAsync = <T>(stream: Writable, data: T) => {
|
||||
@ -91,29 +95,33 @@ const createPushController = (options: ILocalStrapiDestinationProviderOptions):
|
||||
},
|
||||
|
||||
async assets(payload) {
|
||||
// TODO: close the stream upong receiving an 'end' event instead
|
||||
if (payload === null) {
|
||||
streams.assets?.end();
|
||||
return;
|
||||
}
|
||||
|
||||
const { step, assetID } = payload;
|
||||
const { action, assetID } = payload;
|
||||
|
||||
if (!streams.assets) {
|
||||
streams.assets = await provider.getAssetsStream();
|
||||
}
|
||||
|
||||
if (step === 'start') {
|
||||
if (action === 'start') {
|
||||
assets[assetID] = { ...payload.data, stream: new PassThrough() };
|
||||
writeAsync(streams.assets, assets[assetID]);
|
||||
}
|
||||
|
||||
if (step === 'stream') {
|
||||
const chunk = Buffer.from(payload.data.chunk.data);
|
||||
if (action === 'stream') {
|
||||
// The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array }
|
||||
// We need to transform it back into a Buffer instance
|
||||
const rawBuffer = payload.data as unknown as { type: 'Buffer'; data: Uint8Array };
|
||||
const chunk = Buffer.from(rawBuffer.data);
|
||||
|
||||
await writeAsync(assets[assetID].stream, chunk);
|
||||
}
|
||||
|
||||
if (step === 'end') {
|
||||
if (action === 'end') {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const { stream } = assets[assetID];
|
||||
|
||||
|
@ -1,18 +1,16 @@
|
||||
// eslint-disable-next-line node/no-extraneous-import
|
||||
import type { Context } from 'koa';
|
||||
import type { ServerOptions } from 'ws';
|
||||
|
||||
import { v4 } from 'uuid';
|
||||
import { randomUUID } from 'crypto';
|
||||
import { WebSocket } from 'ws';
|
||||
|
||||
import type { IPushController } from './controllers/push';
|
||||
|
||||
import { InitMessage, Message, TransferKind } from '../../../types';
|
||||
import createPushController from './controllers/push';
|
||||
import type { client, server } from '../../../types/remote/protocol';
|
||||
|
||||
interface ITransferState {
|
||||
kind?: TransferKind;
|
||||
transferID?: string;
|
||||
transfer?: { id: string; kind: client.TransferKind };
|
||||
controller?: IPushController;
|
||||
}
|
||||
|
||||
@ -34,6 +32,9 @@ export const createTransferHandler =
|
||||
const state: ITransferState = {};
|
||||
let uuid: string | undefined;
|
||||
|
||||
/**
|
||||
* Format error & message to follow the remote transfer protocol
|
||||
*/
|
||||
const callback = <T = unknown>(e: Error | null = null, data?: T) => {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
if (!uuid) {
|
||||
@ -43,14 +44,22 @@ export const createTransferHandler =
|
||||
|
||||
const payload = JSON.stringify({
|
||||
uuid,
|
||||
data: data ?? {},
|
||||
error: e,
|
||||
data: data ?? null,
|
||||
error: e
|
||||
? {
|
||||
code: 'ERR',
|
||||
message: e?.message,
|
||||
}
|
||||
: null,
|
||||
});
|
||||
|
||||
ws.send(payload, (error) => (error ? reject(error) : resolve()));
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* Wrap a function call to catch errors and answer the request with the correct format
|
||||
*/
|
||||
const answer = async <T = unknown>(fn: () => T) => {
|
||||
try {
|
||||
const response = await fn();
|
||||
@ -66,39 +75,110 @@ export const createTransferHandler =
|
||||
}
|
||||
};
|
||||
|
||||
const teardown = () => {
|
||||
delete state.kind;
|
||||
const teardown = (): server.Payload<server.EndMessage> => {
|
||||
delete state.controller;
|
||||
delete state.transferID;
|
||||
delete state.transfer;
|
||||
|
||||
return { ok: true };
|
||||
};
|
||||
|
||||
const init = (msg: InitMessage) => {
|
||||
const { kind, options: controllerOptions } = msg;
|
||||
|
||||
const init = (msg: client.InitCommand): server.Payload<server.InitMessage> => {
|
||||
if (state.controller) {
|
||||
throw new Error('Transfer already in progres');
|
||||
}
|
||||
|
||||
if (kind === 'push') {
|
||||
const { transfer } = msg.params;
|
||||
|
||||
// Push transfer
|
||||
if (transfer === 'push') {
|
||||
const { options: controllerOptions } = msg.params;
|
||||
|
||||
state.controller = createPushController({
|
||||
...controllerOptions,
|
||||
autoDestroy: false,
|
||||
getStrapi() {
|
||||
return strapi;
|
||||
},
|
||||
getStrapi: () => strapi,
|
||||
});
|
||||
}
|
||||
|
||||
// Pull or others
|
||||
// Pull or any other string
|
||||
else {
|
||||
throw new Error(`${kind} transfer not implemented`);
|
||||
throw new Error(`Transfer not implemented: "${transfer}"`);
|
||||
}
|
||||
|
||||
state.transferID = v4();
|
||||
state.transfer = { id: randomUUID(), kind: transfer };
|
||||
|
||||
return { transferID: state.transferID };
|
||||
return { transferID: state.transfer.id };
|
||||
};
|
||||
|
||||
/**
|
||||
* On command message (init, end, status, ...)
|
||||
*/
|
||||
const onCommand = async (msg: client.CommandMessage) => {
|
||||
const { command } = msg;
|
||||
|
||||
if (command === 'init') {
|
||||
await answer(() => init(msg));
|
||||
}
|
||||
|
||||
if (command === 'end') {
|
||||
await answer(teardown);
|
||||
}
|
||||
|
||||
if (command === 'status') {
|
||||
await callback(new Error('Command not implemented: "status"'));
|
||||
}
|
||||
};
|
||||
|
||||
const onTransferCommand = async (msg: client.TransferMessage) => {
|
||||
const { transferID, kind } = msg;
|
||||
const { controller } = state;
|
||||
|
||||
// TODO: (re)move this check
|
||||
// It shouldn't be possible to strart a pull transfer for now, so reaching
|
||||
// this code should be impossible too, but this has been added by security
|
||||
if (state.transfer?.kind === 'pull') {
|
||||
return callback(new Error('Pull transfer not implemented'));
|
||||
}
|
||||
|
||||
if (!controller) {
|
||||
return callback(new Error("The transfer hasn't been initialized"));
|
||||
}
|
||||
|
||||
if (!transferID) {
|
||||
return callback(new Error('Missing transfer ID'));
|
||||
}
|
||||
|
||||
// Action
|
||||
if (kind === 'action') {
|
||||
const { action } = msg;
|
||||
|
||||
if (!(action in controller.actions)) {
|
||||
return callback(new Error(`Invalid action provided: "${action}"`));
|
||||
}
|
||||
|
||||
await answer(() => controller.actions[action as keyof typeof controller.actions]());
|
||||
}
|
||||
|
||||
// Transfer
|
||||
else if (kind === 'step') {
|
||||
// We can only have push transfer message for the moment
|
||||
const message = msg as client.TransferPushMessage;
|
||||
|
||||
// TODO: lock transfer process
|
||||
if (message.action === 'start') {
|
||||
// console.log('Starting transfer for ', message.step);
|
||||
}
|
||||
|
||||
// Stream step
|
||||
else if (message.action === 'stream') {
|
||||
await answer(() => controller.transfer[message.step]?.(message.data as never));
|
||||
}
|
||||
|
||||
// TODO: unlock transfer process
|
||||
else if (message.action === 'end') {
|
||||
// console.log('Ending transfer for ', message.step);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
ws.on('close', () => {
|
||||
@ -111,7 +191,7 @@ export const createTransferHandler =
|
||||
});
|
||||
|
||||
ws.on('message', async (raw) => {
|
||||
const msg: Message = JSON.parse(raw.toString());
|
||||
const msg: client.Message = JSON.parse(raw.toString());
|
||||
|
||||
if (!msg.uuid) {
|
||||
throw new Error('Missing uuid in message');
|
||||
@ -119,24 +199,19 @@ export const createTransferHandler =
|
||||
|
||||
uuid = msg.uuid;
|
||||
|
||||
if (msg.type === 'init') {
|
||||
await answer(() => init(msg));
|
||||
// Regular command message (init, end, status)
|
||||
if (msg.type === 'command') {
|
||||
await onCommand(msg);
|
||||
}
|
||||
|
||||
if (msg.type === 'teardown') {
|
||||
await answer(teardown);
|
||||
// Transfer message (the transfer must be initialized first)
|
||||
else if (msg.type === 'transfer') {
|
||||
await onTransferCommand(msg);
|
||||
}
|
||||
|
||||
if (msg.type === 'action') {
|
||||
await answer(() => state.controller?.actions[msg.action]?.());
|
||||
}
|
||||
|
||||
if (msg.type === 'transfer') {
|
||||
await answer(() => {
|
||||
const { stage, data } = msg;
|
||||
|
||||
return state.controller?.transfer[stage](data as never);
|
||||
});
|
||||
// Invalid messages
|
||||
else {
|
||||
await callback(new Error('Bad request'));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
90
packages/core/data-transfer/types/remote.d.ts
vendored
90
packages/core/data-transfer/types/remote.d.ts
vendored
@ -1,90 +0,0 @@
|
||||
import type { ILocalStrapiDestinationProviderOptions } from '../src/strapi/providers';
|
||||
import type { IAsset, IConfiguration, IEntity, ILink } from './common-entities';
|
||||
|
||||
/**
|
||||
* Utils
|
||||
*/
|
||||
|
||||
type EmptyObject = Record<string, never>;
|
||||
|
||||
/**
|
||||
* Messages
|
||||
*/
|
||||
|
||||
export type Message = { uuid: string | null | undefined } & (
|
||||
| InitMessage
|
||||
| ActionMessage
|
||||
| PushTransferMessage
|
||||
| TeardownMessage
|
||||
);
|
||||
|
||||
export type MessageType = Message['type'];
|
||||
export type TransferKind = InitMessage['kind'];
|
||||
export type PushTransferStage = PushTransferMessage['stage'];
|
||||
|
||||
/**
|
||||
* Init
|
||||
*/
|
||||
|
||||
// init should return a transfer ID used in the teardown
|
||||
export type InitMessage = { type: 'init' } & (
|
||||
| { kind: 'pull'; options: EmptyObject }
|
||||
| { kind: 'push'; options: Pick<ILocalStrapiDestinationProviderOptions, 'strategy' | 'restore'> }
|
||||
);
|
||||
|
||||
/**
|
||||
* Action
|
||||
*/
|
||||
|
||||
export type ActionMessage = { type: 'action' } & (
|
||||
| { action: 'getMetadata'; options: EmptyObject }
|
||||
| { action: 'getSchemas'; options: EmptyObject }
|
||||
| { action: 'bootstrap'; options: EmptyObject }
|
||||
| { action: 'close'; options: EmptyObject }
|
||||
| { action: 'beforeTransfer'; options: EmptyObject }
|
||||
);
|
||||
|
||||
/**
|
||||
* Transfer
|
||||
*/
|
||||
|
||||
export type PushTransferMessage = {
|
||||
type: 'transfer';
|
||||
} & (
|
||||
| PushEntitiesTransferMessage
|
||||
| PushLinksTransferMessage
|
||||
| PushConfigurationTransferMessage
|
||||
| PushAssetTransferMessage
|
||||
);
|
||||
|
||||
export type PushEntitiesTransferMessage = {
|
||||
stage: 'entities';
|
||||
data: IEntity | null;
|
||||
};
|
||||
|
||||
export type PushLinksTransferMessage = { stage: 'links'; data: ILink | null };
|
||||
|
||||
export type PushConfigurationTransferMessage = {
|
||||
stage: 'configuration';
|
||||
data: IConfiguration | null;
|
||||
};
|
||||
|
||||
export type PushAssetTransferMessage = {
|
||||
stage: 'assets';
|
||||
data:
|
||||
| ({ assetID: string } & (
|
||||
| { step: 'start'; data: Omit<IAsset, 'stream'> }
|
||||
| { step: 'stream'; data: { chunk: { type: 'Buffer'; data: number[] } } }
|
||||
| { step: 'end'; data: EmptyObject }
|
||||
))
|
||||
| null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Teardown
|
||||
*/
|
||||
|
||||
export type TeardownMessage = {
|
||||
type: 'teardown';
|
||||
transferID: string;
|
||||
};
|
1
packages/core/data-transfer/types/remote/index.d.ts
vendored
Normal file
1
packages/core/data-transfer/types/remote/index.d.ts
vendored
Normal file
@ -0,0 +1 @@
|
||||
export * as protocol from './protocol';
|
30
packages/core/data-transfer/types/remote/protocol/client/commands.d.ts
vendored
Normal file
30
packages/core/data-transfer/types/remote/protocol/client/commands.d.ts
vendored
Normal file
@ -0,0 +1,30 @@
|
||||
import type { ILocalStrapiDestinationProviderOptions } from '../../../../src/strapi/providers';
|
||||
|
||||
export type CommandMessage = { type: 'command' } & (InitCommand | EndCommand | StatusCommand);
|
||||
|
||||
export type Command = CommandMessage['command'];
|
||||
|
||||
export type GetCommandParams<T extends Command> = {
|
||||
[key in Command]: { command: key } & CommandMessage;
|
||||
}[T] extends { params: infer U }
|
||||
? U
|
||||
: never;
|
||||
|
||||
export type InitCommand = CreateCommand<
|
||||
'init',
|
||||
| {
|
||||
transfer: 'push';
|
||||
options: Pick<ILocalStrapiDestinationProviderOptions, 'strategy' | 'restore'>;
|
||||
}
|
||||
| { transfer: 'pull' }
|
||||
>;
|
||||
export type TransferKind = InitCommand['params']['transfer'];
|
||||
|
||||
export type EndCommand = CreateCommand<'end', { transferID: string }>;
|
||||
|
||||
export type StatusCommand = CreateCommand<'status'>;
|
||||
|
||||
type CreateCommand<T extends string, U extends Record<string, unknown> = never> = {
|
||||
type: 'command';
|
||||
command: T;
|
||||
} & ([U] extends [never] ? unknown : { params: U });
|
8
packages/core/data-transfer/types/remote/protocol/client/index.d.ts
vendored
Normal file
8
packages/core/data-transfer/types/remote/protocol/client/index.d.ts
vendored
Normal file
@ -0,0 +1,8 @@
|
||||
import type { CommandMessage } from './commands';
|
||||
import type { TransferMessage } from './transfer';
|
||||
|
||||
export * from './commands';
|
||||
export * from './transfer';
|
||||
|
||||
export type Message = { uuid: string } & (CommandMessage | TransferMessage);
|
||||
export type MessageType = Message['type'];
|
3
packages/core/data-transfer/types/remote/protocol/client/transfer/action.d.ts
vendored
Normal file
3
packages/core/data-transfer/types/remote/protocol/client/transfer/action.d.ts
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
import type { CreateTransferMessage } from './utils';
|
||||
|
||||
export type Action = CreateTransferMessage<'action', { action: string }>;
|
13
packages/core/data-transfer/types/remote/protocol/client/transfer/index.d.ts
vendored
Normal file
13
packages/core/data-transfer/types/remote/protocol/client/transfer/index.d.ts
vendored
Normal file
@ -0,0 +1,13 @@
|
||||
import type { Action } from './action';
|
||||
import type { TransferPullMessage } from './pull';
|
||||
import type { TransferPushMessage } from './push';
|
||||
|
||||
export * from './action';
|
||||
export * from './pull';
|
||||
export * from './push';
|
||||
|
||||
export type TransferMessage = { type: 'transfer'; transferID: string } & (
|
||||
| Action
|
||||
| TransferPushMessage
|
||||
| TransferPullMessage
|
||||
);
|
8
packages/core/data-transfer/types/remote/protocol/client/transfer/pull.d.ts
vendored
Normal file
8
packages/core/data-transfer/types/remote/protocol/client/transfer/pull.d.ts
vendored
Normal file
@ -0,0 +1,8 @@
|
||||
import { CreateTransferMessage } from './utils';
|
||||
|
||||
export type TransferPullMessage = CreateTransferMessage<
|
||||
'step',
|
||||
{
|
||||
action: 'start' | 'stop';
|
||||
}
|
||||
>;
|
31
packages/core/data-transfer/types/remote/protocol/client/transfer/push.d.ts
vendored
Normal file
31
packages/core/data-transfer/types/remote/protocol/client/transfer/push.d.ts
vendored
Normal file
@ -0,0 +1,31 @@
|
||||
import type { CreateTransferMessage } from './utils';
|
||||
import type { IEntity, ILink, IConfiguration, IAsset } from '../../../../common-entities';
|
||||
|
||||
export type TransferPushMessage = CreateTransferMessage<
|
||||
'step',
|
||||
| TransferStepCommands<'entities', IEntity>
|
||||
| TransferStepCommands<'links', ILink>
|
||||
| TransferStepCommands<'configuration', IConfiguration>
|
||||
| TransferStepCommands<'assets', TransferAssetFlow | null>
|
||||
>;
|
||||
|
||||
export type GetTransferPushStreamData<T extends TransferPushStep> = {
|
||||
[key in TransferPushStep]: {
|
||||
action: 'stream';
|
||||
step: key;
|
||||
} & TransferPushMessage;
|
||||
}[T] extends { data: infer U }
|
||||
? U
|
||||
: never;
|
||||
|
||||
export type TransferPushStep = TransferPushMessage['step'];
|
||||
|
||||
type TransferStepCommands<T extends string, U> = { step: T } & TransferStepFlow<U>;
|
||||
|
||||
type TransferStepFlow<U> = { action: 'start' } | { action: 'stream'; data: U } | { action: 'end' };
|
||||
|
||||
type TransferAssetFlow = { assetID: string } & (
|
||||
| { action: 'start'; data: Omit<IAsset, 'stream'> }
|
||||
| { action: 'stream'; data: Buffer }
|
||||
| { action: 'end' }
|
||||
);
|
5
packages/core/data-transfer/types/remote/protocol/client/transfer/utils.d.ts
vendored
Normal file
5
packages/core/data-transfer/types/remote/protocol/client/transfer/utils.d.ts
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
export type CreateTransferMessage<T extends string, U = unknown> = {
|
||||
type: 'transfer';
|
||||
kind: T;
|
||||
transferID: string;
|
||||
} & U;
|
2
packages/core/data-transfer/types/remote/protocol/index.d.ts
vendored
Normal file
2
packages/core/data-transfer/types/remote/protocol/index.d.ts
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
export * as client from './client';
|
||||
export * as server from './server';
|
29
packages/core/data-transfer/types/remote/protocol/server/error.d.ts
vendored
Normal file
29
packages/core/data-transfer/types/remote/protocol/server/error.d.ts
vendored
Normal file
@ -0,0 +1,29 @@
|
||||
export enum ErrorKind {
|
||||
// Generic
|
||||
Unknown = 0,
|
||||
// Chunk transfer
|
||||
DiscardChunk = 1,
|
||||
InvalidChunkFormat = 2,
|
||||
}
|
||||
|
||||
export class ServerError extends Error {
|
||||
constructor(
|
||||
public code: ErrorKind,
|
||||
public message: string,
|
||||
public details?: Record<string, unknown> | null
|
||||
) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
||||
export class UnknownError extends ServerError {
|
||||
constructor(message: string, details?: Record<string, unknown> | null) {
|
||||
super(ErrorKind.Unknown, message, details);
|
||||
}
|
||||
}
|
||||
|
||||
export class DiscardChunkError extends ServerError {
|
||||
constructor(message: string, details?: Record<string, unknown> | null) {
|
||||
super(ErrorKind.DiscardChunk, message, details);
|
||||
}
|
||||
}
|
2
packages/core/data-transfer/types/remote/protocol/server/index.d.ts
vendored
Normal file
2
packages/core/data-transfer/types/remote/protocol/server/index.d.ts
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
export * from './messaging';
|
||||
export * as error from './error';
|
14
packages/core/data-transfer/types/remote/protocol/server/messaging.d.ts
vendored
Normal file
14
packages/core/data-transfer/types/remote/protocol/server/messaging.d.ts
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
import type { ServerError } from './error';
|
||||
|
||||
export type Message<T = unknown> = {
|
||||
uuid?: string;
|
||||
data?: T | null;
|
||||
error?: ServerError | null;
|
||||
};
|
||||
|
||||
// Successful
|
||||
export type OKMessage = Message<{ ok: true }>;
|
||||
export type InitMessage = Message<{ transferID: string }>;
|
||||
export type EndMessage = OKMessage;
|
||||
|
||||
export type Payload<T extends Message> = T['data'];
|
@ -6300,10 +6300,10 @@
|
||||
"@types/koa-compose" "*"
|
||||
"@types/node" "*"
|
||||
|
||||
"@types/koa@2.13.1":
|
||||
version "2.13.1"
|
||||
resolved "https://registry.yarnpkg.com/@types/koa/-/koa-2.13.1.tgz#e29877a6b5ad3744ab1024f6ec75b8cbf6ec45db"
|
||||
integrity sha512-Qbno7FWom9nNqu0yHZ6A0+RWt4mrYBhw3wpBAQ3+IuzGcLlfeYkzZrnMq5wsxulN2np8M4KKeUpTodsOsSad5Q==
|
||||
"@types/koa@2.13.4":
|
||||
version "2.13.4"
|
||||
resolved "https://registry.yarnpkg.com/@types/koa/-/koa-2.13.4.tgz#10620b3f24a8027ef5cbae88b393d1b31205726b"
|
||||
integrity sha512-dfHYMfU+z/vKtQB7NUrthdAEiSvnLebvBjwHtfFmpZmB7em2N3WVQdHgnFq+xvyVgxW5jKDmjWfLD3lw4g4uTw==
|
||||
dependencies:
|
||||
"@types/accepts" "*"
|
||||
"@types/content-disposition" "*"
|
||||
|
Loading…
x
Reference in New Issue
Block a user