mirror of
https://github.com/strapi/strapi.git
synced 2025-09-19 05:23:05 +00:00
Merge branch 'deits/transfer-push' into deits/transfer-push-cli
This commit is contained in:
commit
bc609a2f0a
6
.github/workflows/tests.yml
vendored
6
.github/workflows/tests.yml
vendored
@ -34,6 +34,8 @@ jobs:
|
||||
key: ${{ runner.os }}-${{ matrix.node }}-${{ hashFiles('**/yarn.lock') }}
|
||||
|
||||
- run: yarn install --frozen-lockfile
|
||||
- name: Build TypeScript packages # for lint we need to build ts, for rules checking if paths exist
|
||||
run: yarn build:ts
|
||||
- name: Run lint
|
||||
run: yarn run -s lint
|
||||
|
||||
@ -55,6 +57,8 @@ jobs:
|
||||
key: ${{ runner.os }}-${{ matrix.node }}-${{ hashFiles('**/yarn.lock') }}
|
||||
|
||||
- run: yarn install --frozen-lockfile
|
||||
- name: Build TypeScript packages # for unit tests we need to build ts, for finding and mocking ts packages
|
||||
run: yarn build:ts
|
||||
- name: Run tests
|
||||
run: yarn run -s test:unit --coverage
|
||||
- name: Upload coverage to Codecov
|
||||
@ -231,6 +235,7 @@ jobs:
|
||||
SQLITE_PKG: ${{ matrix.sqlite_pkg }}
|
||||
with:
|
||||
dbOptions: '--dbclient=sqlite-legacy --dbfile=./tmp/data.db'
|
||||
|
||||
# EE
|
||||
api_ee_pg:
|
||||
runs-on: ubuntu-latest
|
||||
@ -275,6 +280,7 @@ jobs:
|
||||
with:
|
||||
dbOptions: '--dbclient=postgres --dbhost=localhost --dbport=5432 --dbname=strapi_test --dbusername=strapi --dbpassword=strapi'
|
||||
runEE: true
|
||||
|
||||
api_ee_mysql:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [lint, unit_back, unit_front]
|
||||
|
@ -33,6 +33,7 @@
|
||||
"clean": "lerna run --stream clean --no-private",
|
||||
"watch": "lerna run --stream watch --no-private --parallel",
|
||||
"build": "lerna run --stream build --no-private",
|
||||
"build:ts": "lerna run --stream build:ts --no-private",
|
||||
"generate": "plop --plopfile ./packages/generators/admin/plopfile.js",
|
||||
"lint": "npm-run-all -p lint:code lint:css",
|
||||
"lint:code": "eslint .",
|
||||
|
@ -1,7 +1,5 @@
|
||||
'use strict';
|
||||
|
||||
// TODO: we need to solve this issue with typescript modules
|
||||
// eslint-disable-next-line import/no-unresolved, node/no-missing-require
|
||||
const { register: registerDataTransfer } = require('@strapi/data-transfer');
|
||||
|
||||
const registerAdminPanelRoute = require('./routes/serve-admin-panel');
|
||||
|
@ -0,0 +1,2 @@
|
||||
export * from './push';
|
||||
export { default as createTransferController } from './transfer';
|
134
packages/core/data-transfer/lib/bootstrap/controllers/push.ts
Normal file
134
packages/core/data-transfer/lib/bootstrap/controllers/push.ts
Normal file
@ -0,0 +1,134 @@
|
||||
import { PassThrough, Writable } from 'stream-chain';
|
||||
|
||||
import { IAsset, IMetadata, PushTransferMessage, PushTransferStage } from '../../../types';
|
||||
import {
|
||||
createLocalStrapiDestinationProvider,
|
||||
ILocalStrapiDestinationProviderOptions,
|
||||
} from '../../providers';
|
||||
|
||||
export interface IPushController {
|
||||
streams: { [stage in PushTransferStage]?: Writable };
|
||||
actions: {
|
||||
getMetadata(): Promise<IMetadata>;
|
||||
getSchemas(): Strapi.Schemas;
|
||||
bootstrap(): Promise<void>;
|
||||
close(): Promise<void>;
|
||||
beforeTransfer(): Promise<void>;
|
||||
};
|
||||
transfer: {
|
||||
[key in PushTransferStage]: <T extends PushTransferMessage>(
|
||||
value: T extends { stage: key; data: infer U } ? U : never
|
||||
) => Promise<void>;
|
||||
};
|
||||
}
|
||||
|
||||
const createPushController = (options: ILocalStrapiDestinationProviderOptions): IPushController => {
|
||||
const provider = createLocalStrapiDestinationProvider(options);
|
||||
|
||||
const streams: { [stage in PushTransferStage]?: Writable } = {};
|
||||
const assets: { [filepath: string]: IAsset & { stream: PassThrough } } = {};
|
||||
|
||||
const writeAsync = <T>(stream: Writable, data: T) => {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
stream.write(data, (error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
}
|
||||
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
return {
|
||||
streams,
|
||||
|
||||
actions: {
|
||||
async getSchemas(): Promise<Strapi.Schemas> {
|
||||
return provider.getSchemas();
|
||||
},
|
||||
|
||||
async getMetadata() {
|
||||
return provider.getMetadata();
|
||||
},
|
||||
|
||||
async bootstrap() {
|
||||
return provider.bootstrap();
|
||||
},
|
||||
|
||||
async close() {
|
||||
return provider.close();
|
||||
},
|
||||
|
||||
async beforeTransfer() {
|
||||
return provider.beforeTransfer();
|
||||
},
|
||||
},
|
||||
|
||||
transfer: {
|
||||
async entities(entity) {
|
||||
if (!streams.entities) {
|
||||
streams.entities = provider.getEntitiesStream();
|
||||
}
|
||||
|
||||
await writeAsync(streams.entities!, entity);
|
||||
},
|
||||
|
||||
async links(link) {
|
||||
if (!streams.links) {
|
||||
streams.links = await provider.getLinksStream();
|
||||
}
|
||||
|
||||
await writeAsync(streams.links!, link);
|
||||
},
|
||||
|
||||
async configuration(config) {
|
||||
if (!streams.configuration) {
|
||||
streams.configuration = await provider.getConfigurationStream();
|
||||
}
|
||||
|
||||
await writeAsync(streams.configuration!, config);
|
||||
},
|
||||
|
||||
async assets(payload) {
|
||||
if (payload === null) {
|
||||
streams.assets?.end();
|
||||
return;
|
||||
}
|
||||
|
||||
const { step, assetID } = payload;
|
||||
|
||||
if (!streams.assets) {
|
||||
streams.assets = await provider.getAssetsStream();
|
||||
}
|
||||
|
||||
if (step === 'start') {
|
||||
assets[assetID] = { ...payload.data, stream: new PassThrough() };
|
||||
writeAsync(streams.assets, assets[assetID]);
|
||||
}
|
||||
|
||||
if (step === 'stream') {
|
||||
const chunk = Buffer.from(payload.data.chunk.data);
|
||||
|
||||
await writeAsync(assets[assetID].stream, chunk);
|
||||
}
|
||||
|
||||
if (step === 'end') {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const { stream } = assets[assetID];
|
||||
|
||||
stream
|
||||
.on('close', () => {
|
||||
delete assets[assetID];
|
||||
resolve();
|
||||
})
|
||||
.on('error', reject)
|
||||
.end();
|
||||
});
|
||||
}
|
||||
},
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
export default createPushController;
|
@ -0,0 +1,147 @@
|
||||
import type { Context } from 'koa';
|
||||
import type { ServerOptions } from 'ws';
|
||||
|
||||
import { v4 } from 'uuid';
|
||||
import { WebSocket } from 'ws';
|
||||
|
||||
import type { IPushController } from './push';
|
||||
|
||||
import { InitMessage, Message, TransferKind } from '../../../types';
|
||||
import createPushController from './push';
|
||||
|
||||
interface ITransferState {
|
||||
kind?: TransferKind;
|
||||
transferID?: string;
|
||||
controller?: IPushController;
|
||||
}
|
||||
|
||||
const createTransferController =
|
||||
(options: ServerOptions = {}) =>
|
||||
async (ctx: Context) => {
|
||||
const upgradeHeader = (ctx.request.headers.upgrade || '')
|
||||
.split(',')
|
||||
.map((s) => s.trim().toLowerCase());
|
||||
|
||||
// Create the websocket server
|
||||
const wss = new WebSocket.Server({ ...options, noServer: true });
|
||||
|
||||
if (upgradeHeader.includes('websocket')) {
|
||||
wss.handleUpgrade(ctx.req, ctx.request.socket, Buffer.alloc(0), (ws) => {
|
||||
// Create a connection between the client & the server
|
||||
wss.emit('connection', ws, ctx.req);
|
||||
|
||||
const state: ITransferState = {};
|
||||
let uuid: string | undefined;
|
||||
|
||||
const callback = <T = unknown>(e: Error | null = null, data?: T) => {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
if (!uuid) {
|
||||
reject(new Error('Missing uuid for this message'));
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = JSON.stringify({
|
||||
uuid,
|
||||
data: data ?? {},
|
||||
error: e,
|
||||
});
|
||||
|
||||
ws.send(payload, (error) => (error ? reject(error) : resolve()));
|
||||
});
|
||||
};
|
||||
|
||||
const answer = async <T = unknown>(fn: () => T) => {
|
||||
try {
|
||||
const response = await fn();
|
||||
callback(null, response);
|
||||
} catch (e) {
|
||||
if (e instanceof Error) {
|
||||
callback(e);
|
||||
} else if (typeof e === 'string') {
|
||||
callback(new Error(e));
|
||||
} else {
|
||||
callback(new Error('Unexpected error'));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const teardown = () => {
|
||||
delete state.kind;
|
||||
delete state.controller;
|
||||
delete state.transferID;
|
||||
|
||||
return { ok: true };
|
||||
};
|
||||
|
||||
const init = (msg: InitMessage) => {
|
||||
const { kind, options: controllerOptions } = msg;
|
||||
|
||||
if (state.controller) {
|
||||
throw new Error('Transfer already in progres');
|
||||
}
|
||||
|
||||
if (kind === 'push') {
|
||||
state.controller = createPushController({
|
||||
...controllerOptions,
|
||||
autoDestroy: false,
|
||||
getStrapi() {
|
||||
return strapi;
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Pull or others
|
||||
else {
|
||||
throw new Error(`${kind} transfer not implemented`);
|
||||
}
|
||||
|
||||
state.transferID = v4();
|
||||
|
||||
return { transferID: state.transferID };
|
||||
};
|
||||
|
||||
ws.on('close', () => {
|
||||
teardown();
|
||||
});
|
||||
|
||||
ws.on('error', (e) => {
|
||||
teardown();
|
||||
console.error(e);
|
||||
});
|
||||
|
||||
ws.on('message', async (raw) => {
|
||||
const msg: Message = JSON.parse(raw.toString());
|
||||
|
||||
if (!msg.uuid) {
|
||||
throw new Error('Missing uuid in message');
|
||||
}
|
||||
|
||||
uuid = msg.uuid;
|
||||
|
||||
if (msg.type === 'init') {
|
||||
await answer(() => init(msg));
|
||||
}
|
||||
|
||||
if (msg.type === 'teardown') {
|
||||
await answer(teardown);
|
||||
}
|
||||
|
||||
if (msg.type === 'action') {
|
||||
await answer(() => state.controller?.actions[msg.action]?.());
|
||||
}
|
||||
|
||||
if (msg.type === 'transfer') {
|
||||
await answer(() => {
|
||||
const { stage, data } = msg;
|
||||
|
||||
return state.controller?.transfer[stage](data as never);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
ctx.respond = false;
|
||||
}
|
||||
};
|
||||
|
||||
export default createTransferController;
|
@ -356,9 +356,7 @@ class TransferEngine<
|
||||
this.#emitTransferUpdate('init');
|
||||
await this.bootstrap();
|
||||
await this.init();
|
||||
|
||||
const isValidTransfer = await this.integrityCheck();
|
||||
|
||||
if (!isValidTransfer) {
|
||||
// TODO: provide the log from the integrity check
|
||||
throw new Error(
|
||||
@ -369,14 +367,12 @@ class TransferEngine<
|
||||
this.#emitTransferUpdate('start');
|
||||
|
||||
await this.beforeTransfer();
|
||||
|
||||
// Run the transfer stages
|
||||
await this.transferSchemas();
|
||||
await this.transferEntities();
|
||||
await this.transferAssets();
|
||||
await this.transferLinks();
|
||||
await this.transferConfiguration();
|
||||
|
||||
// Gracefully close the providers
|
||||
await this.close();
|
||||
|
||||
|
@ -0,0 +1,32 @@
|
||||
import { createTransferController } from '../../bootstrap/controllers';
|
||||
import register from '../../register';
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
const strapiMock = {
|
||||
admin: {
|
||||
routes: {
|
||||
push: jest.fn(),
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
jest.mock('../../bootstrap/controllers', () => ({
|
||||
createTransferController: jest.fn(),
|
||||
}));
|
||||
|
||||
describe('Register the Transfer route', () => {
|
||||
test('registers the /transfer route', () => {
|
||||
register(strapiMock);
|
||||
expect(strapiMock.admin.routes.push).toHaveBeenCalledWith({
|
||||
method: 'GET',
|
||||
path: '/transfer',
|
||||
handler: createTransferController(),
|
||||
config: {
|
||||
auth: false,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
@ -0,0 +1,48 @@
|
||||
import type { IRemoteStrapiDestinationProviderOptions } from '..';
|
||||
|
||||
import { createRemoteStrapiDestinationProvider } from '..';
|
||||
|
||||
const defaultOptions: IRemoteStrapiDestinationProviderOptions = {
|
||||
strategy: 'restore',
|
||||
url: 'ws://test.com/admin/transfer',
|
||||
};
|
||||
|
||||
jest.mock('../utils', () => ({
|
||||
dispatch: jest.fn(),
|
||||
}));
|
||||
|
||||
jest.mock('ws', () => ({
|
||||
WebSocket: jest.fn().mockImplementation(() => {
|
||||
return {
|
||||
...jest.requireActual('ws').WebSocket,
|
||||
send: jest.fn(),
|
||||
once: jest.fn((type, callback) => {
|
||||
callback();
|
||||
return {
|
||||
once: jest.fn((t, c) => c),
|
||||
};
|
||||
}),
|
||||
};
|
||||
}),
|
||||
}));
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('Remote Strapi Destination', () => {
|
||||
describe('Bootstrap', () => {
|
||||
test('Should not have a defined websocket connection if bootstrap has not been called', () => {
|
||||
const provider = createRemoteStrapiDestinationProvider(defaultOptions);
|
||||
|
||||
expect(provider.ws).toBeNull();
|
||||
});
|
||||
|
||||
test('Should have a defined websocket connection if bootstrap has been called', async () => {
|
||||
const provider = createRemoteStrapiDestinationProvider(defaultOptions);
|
||||
await provider.bootstrap();
|
||||
|
||||
expect(provider.ws).not.toBeNull();
|
||||
});
|
||||
});
|
||||
});
|
@ -0,0 +1,43 @@
|
||||
import { WebSocket } from 'ws';
|
||||
import { dispatch } from '../utils';
|
||||
|
||||
jest.mock('ws', () => ({
|
||||
WebSocket: jest.fn().mockImplementation(() => {
|
||||
return {
|
||||
...jest.requireActual('ws').WebSocket,
|
||||
send: jest.fn(),
|
||||
once: jest.fn(),
|
||||
};
|
||||
}),
|
||||
}));
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('Remote Strapi Destination Utils', () => {
|
||||
test('Dispatch method sends payload', () => {
|
||||
const ws = new WebSocket('ws://test/admin/transfer');
|
||||
const message = {
|
||||
test: 'hello',
|
||||
};
|
||||
|
||||
dispatch(ws, message);
|
||||
|
||||
expect.extend({
|
||||
toContain(receivedString, expected) {
|
||||
const jsonReceived = JSON.parse(receivedString);
|
||||
const pass = Object.keys(expected).every((key) => jsonReceived[key] === expected[key]);
|
||||
|
||||
return {
|
||||
message: () =>
|
||||
`Expected ${jsonReceived} ${!pass && 'not'} to contain properties ${expected}`,
|
||||
pass,
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
// @ts-ignore
|
||||
expect(ws.send).toHaveBeenCalledWith(expect.toContain(message), expect.anything());
|
||||
});
|
||||
});
|
@ -13,6 +13,7 @@ import type {
|
||||
IAsset,
|
||||
} from '../../../types';
|
||||
import type { ILocalStrapiDestinationProviderOptions } from '../local-strapi-destination-provider';
|
||||
import { dispatch } from './utils';
|
||||
|
||||
interface ITokenAuth {
|
||||
type: 'token';
|
||||
@ -25,7 +26,7 @@ interface ICredentialsAuth {
|
||||
password: string;
|
||||
}
|
||||
|
||||
interface IRemoteStrapiDestinationProvider
|
||||
export interface IRemoteStrapiDestinationProviderOptions
|
||||
extends Pick<ILocalStrapiDestinationProviderOptions, 'restore' | 'strategy'> {
|
||||
url: string;
|
||||
auth?: ITokenAuth | ICredentialsAuth;
|
||||
@ -34,7 +35,7 @@ interface IRemoteStrapiDestinationProvider
|
||||
type Actions = 'bootstrap' | 'close' | 'beforeTransfer' | 'getMetadata' | 'getSchemas';
|
||||
|
||||
export const createRemoteStrapiDestinationProvider = (
|
||||
options: IRemoteStrapiDestinationProvider
|
||||
options: IRemoteStrapiDestinationProviderOptions
|
||||
) => {
|
||||
return new RemoteStrapiDestinationProvider(options);
|
||||
};
|
||||
@ -44,61 +45,28 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
|
||||
type: ProviderType = 'destination';
|
||||
|
||||
options: IRemoteStrapiDestinationProvider;
|
||||
options: IRemoteStrapiDestinationProviderOptions;
|
||||
|
||||
ws: WebSocket | null;
|
||||
|
||||
constructor(options: IRemoteStrapiDestinationProvider) {
|
||||
constructor(options: IRemoteStrapiDestinationProviderOptions) {
|
||||
this.options = options;
|
||||
this.ws = null;
|
||||
}
|
||||
|
||||
async #dispatch<U = unknown, T extends object = object>(message: T): Promise<U> {
|
||||
const { ws } = this;
|
||||
|
||||
if (!ws) {
|
||||
throw new Error('No ws connection found');
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const uuid = v4();
|
||||
const payload = JSON.stringify({ ...message, uuid });
|
||||
|
||||
ws.send(payload, (error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
|
||||
ws.once('message', (raw) => {
|
||||
const response: { uuid: string; data: U; error: string | null } = JSON.parse(
|
||||
raw.toString()
|
||||
);
|
||||
|
||||
if (response.error) {
|
||||
return reject(new Error(response.error));
|
||||
}
|
||||
|
||||
if (response.uuid === uuid) {
|
||||
return resolve(response.data);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async #dispatchAction<T = unknown>(action: Actions) {
|
||||
return this.#dispatch<T>({ type: 'action', action });
|
||||
return dispatch<T>(this.ws, { type: 'action', action });
|
||||
}
|
||||
|
||||
async #dispatchTransfer<T = unknown>(stage: TransferStage, data: T) {
|
||||
try {
|
||||
await this.#dispatch({ type: 'transfer', stage, data });
|
||||
await dispatch(this.ws, { type: 'transfer', stage, data });
|
||||
} catch (e) {
|
||||
if (e instanceof Error) {
|
||||
return e;
|
||||
}
|
||||
|
||||
return new Error('Unexected error');
|
||||
return new Error('Unexpected error');
|
||||
}
|
||||
|
||||
return null;
|
||||
@ -131,7 +99,7 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
|
||||
// Wait for the connection to be made to the server, then init the transfer
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
ws.once('open', async () => {
|
||||
await this.#dispatch({ type: 'init', kind: 'push', data: { strategy, restore } });
|
||||
await dispatch(this.ws, { type: 'init', kind: 'push', options: { strategy, restore } });
|
||||
resolve();
|
||||
}).once('error', reject);
|
||||
});
|
||||
|
@ -0,0 +1,34 @@
|
||||
import { v4 } from 'uuid';
|
||||
import { WebSocket } from 'ws';
|
||||
|
||||
export async function dispatch<U = unknown, T extends object = object>(
|
||||
ws: WebSocket | null,
|
||||
message: T
|
||||
): Promise<U> {
|
||||
if (!ws) {
|
||||
throw new Error('No websocket connection found');
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const uuid = v4();
|
||||
const payload = JSON.stringify({ ...message, uuid });
|
||||
|
||||
ws.send(payload, (error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
|
||||
ws.once('message', (raw) => {
|
||||
const response: { uuid: string; data: U; error: string | null } = JSON.parse(raw.toString());
|
||||
|
||||
if (response.error) {
|
||||
return reject(new Error(response.error));
|
||||
}
|
||||
|
||||
if (response.uuid === uuid) {
|
||||
return resolve(response.data);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
@ -1,280 +1,4 @@
|
||||
import type { Context } from 'koa';
|
||||
import type { ServerOptions } from 'ws';
|
||||
import { WebSocket } from 'ws';
|
||||
import { Writable, PassThrough } from 'stream';
|
||||
import { v4 } from 'uuid';
|
||||
import {
|
||||
IAsset,
|
||||
Message,
|
||||
IMetadata,
|
||||
PushTransferMessage,
|
||||
TransferKind,
|
||||
InitMessage,
|
||||
PushTransferStage,
|
||||
} from '../types';
|
||||
import {
|
||||
ILocalStrapiDestinationProviderOptions,
|
||||
createLocalStrapiDestinationProvider,
|
||||
} from './providers';
|
||||
|
||||
interface ITransferState {
|
||||
kind?: TransferKind;
|
||||
transferID?: string;
|
||||
controller?: IPushController;
|
||||
}
|
||||
|
||||
// Controllers
|
||||
|
||||
interface IPushController {
|
||||
actions: {
|
||||
getMetadata(): Promise<IMetadata>;
|
||||
getSchemas(): Strapi.Schemas;
|
||||
bootstrap(): Promise<void>;
|
||||
close(): Promise<void>;
|
||||
beforeTransfer(): Promise<void>;
|
||||
};
|
||||
transfer: {
|
||||
[key in PushTransferStage]: <T extends PushTransferMessage>(
|
||||
value: T extends { stage: key; data: infer U } ? U : never
|
||||
) => Promise<void>;
|
||||
};
|
||||
}
|
||||
|
||||
const createPushController = (options: ILocalStrapiDestinationProviderOptions): IPushController => {
|
||||
const provider = createLocalStrapiDestinationProvider(options);
|
||||
|
||||
const streams: { [stage in PushTransferStage]?: Writable } = {};
|
||||
const assets: { [filepath: string]: IAsset & { stream: PassThrough } } = {};
|
||||
|
||||
const writeAsync = <T>(stream: Writable, data: T) => {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
stream.write(data, (error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
}
|
||||
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
return {
|
||||
actions: {
|
||||
async getSchemas(): Promise<Strapi.Schemas> {
|
||||
return provider.getSchemas();
|
||||
},
|
||||
|
||||
async getMetadata() {
|
||||
return provider.getMetadata();
|
||||
},
|
||||
|
||||
async bootstrap() {
|
||||
return provider.bootstrap();
|
||||
},
|
||||
|
||||
async close() {
|
||||
return provider.close();
|
||||
},
|
||||
|
||||
async beforeTransfer() {
|
||||
return provider.beforeTransfer();
|
||||
},
|
||||
},
|
||||
|
||||
transfer: {
|
||||
async entities(entity) {
|
||||
if (!streams.entities) {
|
||||
streams.entities = provider.getEntitiesStream();
|
||||
}
|
||||
|
||||
await writeAsync(streams.entities, entity);
|
||||
},
|
||||
|
||||
async links(link) {
|
||||
if (!streams.links) {
|
||||
streams.links = await provider.getLinksStream();
|
||||
}
|
||||
|
||||
await writeAsync(streams.links, link);
|
||||
},
|
||||
|
||||
async configuration(config) {
|
||||
if (!streams.configuration) {
|
||||
streams.configuration = await provider.getConfigurationStream();
|
||||
}
|
||||
|
||||
await writeAsync(streams.configuration, config);
|
||||
},
|
||||
|
||||
async assets(payload) {
|
||||
if (payload === null) {
|
||||
streams.assets?.end();
|
||||
return;
|
||||
}
|
||||
|
||||
const { step, assetID } = payload;
|
||||
|
||||
if (!streams.assets) {
|
||||
streams.assets = await provider.getAssetsStream();
|
||||
}
|
||||
|
||||
if (step === 'start') {
|
||||
assets[assetID] = { ...payload.data, stream: new PassThrough() };
|
||||
writeAsync(streams.assets, assets[assetID]);
|
||||
}
|
||||
|
||||
if (step === 'stream') {
|
||||
const chunk = Buffer.from(payload.data.chunk.data);
|
||||
|
||||
await writeAsync(assets[assetID].stream, chunk);
|
||||
}
|
||||
|
||||
if (step === 'end') {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const { stream } = assets[assetID];
|
||||
|
||||
stream
|
||||
.on('close', () => {
|
||||
delete assets[assetID];
|
||||
resolve();
|
||||
})
|
||||
.on('error', reject)
|
||||
.end();
|
||||
});
|
||||
}
|
||||
},
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
const createTransferController =
|
||||
(options: ServerOptions = {}) =>
|
||||
async (ctx: Context) => {
|
||||
const upgradeHeader = (ctx.request.headers.upgrade || '')
|
||||
.split(',')
|
||||
.map((s) => s.trim().toLowerCase());
|
||||
|
||||
// Create the websocket server
|
||||
const wss = new WebSocket.Server({ ...options, noServer: true });
|
||||
|
||||
if (upgradeHeader.includes('websocket')) {
|
||||
wss.handleUpgrade(ctx.req, ctx.request.socket, Buffer.alloc(0), (ws) => {
|
||||
// Create a connection between the client & the server
|
||||
wss.emit('connection', ws, ctx.req);
|
||||
|
||||
const state: ITransferState = {};
|
||||
let uuid: string | undefined;
|
||||
|
||||
const callback = <T = unknown>(e: Error | null = null, data?: T) => {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
if (!uuid) {
|
||||
reject(new Error('Missing uuid for this message'));
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = JSON.stringify({
|
||||
uuid,
|
||||
data: data ?? {},
|
||||
error: e,
|
||||
});
|
||||
|
||||
ws.send(payload, (error) => (error ? reject(error) : resolve()));
|
||||
});
|
||||
};
|
||||
|
||||
const answer = async <T = unknown>(fn: () => T) => {
|
||||
try {
|
||||
const response = await fn();
|
||||
callback(null, response);
|
||||
} catch (e) {
|
||||
if (e instanceof Error) {
|
||||
callback(e);
|
||||
} else if (typeof e === 'string') {
|
||||
callback(new Error(e));
|
||||
} else {
|
||||
callback(new Error('Unexpected error'));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const teardown = () => {
|
||||
delete state.kind;
|
||||
delete state.controller;
|
||||
delete state.transferID;
|
||||
|
||||
return { ok: true };
|
||||
};
|
||||
|
||||
const init = (msg: InitMessage) => {
|
||||
const { kind, options: controllerOptions } = msg;
|
||||
|
||||
if (state.controller) {
|
||||
throw new Error('Transfer already in progres');
|
||||
}
|
||||
|
||||
if (kind === 'push') {
|
||||
state.controller = createPushController({
|
||||
...controllerOptions,
|
||||
autoDestroy: false,
|
||||
getStrapi() {
|
||||
return strapi;
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Pull or others
|
||||
else {
|
||||
throw new Error(`${kind} transfer not implemented`);
|
||||
}
|
||||
|
||||
state.transferID = v4();
|
||||
|
||||
return { transferID: state.transferID };
|
||||
};
|
||||
|
||||
ws.on('close', () => {
|
||||
teardown();
|
||||
});
|
||||
|
||||
ws.on('error', (e) => {
|
||||
teardown();
|
||||
console.error(e);
|
||||
});
|
||||
|
||||
ws.on('message', async (raw) => {
|
||||
const msg: Message = JSON.parse(raw.toString());
|
||||
|
||||
if (!msg.uuid) {
|
||||
throw new Error('Missing uuid in message');
|
||||
}
|
||||
|
||||
uuid = msg.uuid;
|
||||
|
||||
if (msg.type === 'init') {
|
||||
await answer(() => init(msg));
|
||||
}
|
||||
|
||||
if (msg.type === 'teardown') {
|
||||
await answer(teardown);
|
||||
}
|
||||
|
||||
if (msg.type === 'action') {
|
||||
await answer(() => state.controller?.actions[msg.action]?.());
|
||||
}
|
||||
|
||||
if (msg.type === 'transfer') {
|
||||
await answer(() => {
|
||||
const { stage, data } = msg;
|
||||
|
||||
return state.controller?.transfer[stage](data as never);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
ctx.respond = false;
|
||||
}
|
||||
};
|
||||
import { createTransferController } from './bootstrap/controllers';
|
||||
|
||||
const registerTransferRoute = (strapi: any) => {
|
||||
strapi.admin.routes.push({
|
||||
|
@ -27,11 +27,13 @@
|
||||
"main": "./dist/index.js",
|
||||
"types": "./dist/index.d.ts",
|
||||
"scripts": {
|
||||
"build": "tsc -p tsconfig.json",
|
||||
"clean": "rimraf ./dist",
|
||||
"build": "yarn build:ts",
|
||||
"build:ts": "tsc -p tsconfig.json",
|
||||
"build:clean": "yarn clean && yarn build",
|
||||
"watch": "yarn build -w --preserveWatchOutput",
|
||||
"test:unit": "jest --verbose"
|
||||
"clean": "rimraf ./dist",
|
||||
"prepublishOnly": "yarn build:clean",
|
||||
"test:unit": "jest --verbose",
|
||||
"watch": "yarn build:ts -w --preserveWatchOutput"
|
||||
},
|
||||
"directories": {
|
||||
"lib": "./dist"
|
||||
|
@ -41,7 +41,7 @@ describe('export', () => {
|
||||
getDefaultExportName: jest.fn(() => defaultFileName),
|
||||
};
|
||||
jest.mock(
|
||||
'../transfer/utils',
|
||||
'../../transfer/utils',
|
||||
() => {
|
||||
return mockUtils;
|
||||
},
|
||||
@ -54,7 +54,7 @@ describe('export', () => {
|
||||
jest.spyOn(console, 'error').mockImplementation(() => {});
|
||||
|
||||
// Now that everything is mocked, import export command
|
||||
const exportCommand = require('../transfer/export');
|
||||
const exportCommand = require('../../transfer/export');
|
||||
|
||||
const expectExit = async (code, fn) => {
|
||||
const exit = jest.spyOn(process, 'exit').mockImplementation((number) => {
|
@ -0,0 +1,110 @@
|
||||
'use strict';
|
||||
|
||||
const utils = require('../../transfer/utils');
|
||||
|
||||
const mockDataTransfer = {
|
||||
createRemoteStrapiDestinationProvider: jest.fn(),
|
||||
createLocalStrapiSourceProvider: jest.fn(),
|
||||
createTransferEngine: jest.fn().mockReturnValue({
|
||||
transfer: jest.fn().mockReturnValue(Promise.resolve({})),
|
||||
}),
|
||||
};
|
||||
|
||||
jest.mock(
|
||||
'@strapi/data-transfer',
|
||||
() => {
|
||||
return mockDataTransfer;
|
||||
},
|
||||
{ virtual: true }
|
||||
);
|
||||
|
||||
const expectExit = async (code, fn) => {
|
||||
const exit = jest.spyOn(process, 'exit').mockImplementation((number) => {
|
||||
throw new Error(`process.exit: ${number}`);
|
||||
});
|
||||
await expect(async () => {
|
||||
await fn();
|
||||
}).rejects.toThrow();
|
||||
expect(exit).toHaveBeenCalledWith(code);
|
||||
exit.mockRestore();
|
||||
};
|
||||
|
||||
const transferCommand = require('../../transfer/transfer');
|
||||
|
||||
const logger = jest.spyOn(console, 'error').mockImplementation(() => {});
|
||||
|
||||
jest.mock('../../transfer/utils');
|
||||
|
||||
const destinationUrl = 'ws://strapi.com';
|
||||
|
||||
describe('transfer', () => {
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
});
|
||||
|
||||
it('uses destination url provided by user without authentication', async () => {
|
||||
await expectExit(1, async () => {
|
||||
await transferCommand({ from: 'local', to: destinationUrl });
|
||||
});
|
||||
|
||||
expect(mockDataTransfer.createRemoteStrapiDestinationProvider).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
url: destinationUrl,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it.todo('uses destination url provided by user with authentication');
|
||||
|
||||
it('uses restore as the default strategy', async () => {
|
||||
await expectExit(1, async () => {
|
||||
await transferCommand({ from: 'local', to: destinationUrl });
|
||||
});
|
||||
|
||||
expect(mockDataTransfer.createRemoteStrapiDestinationProvider).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
strategy: 'restore',
|
||||
})
|
||||
);
|
||||
});
|
||||
it('uses destination url provided by user without authentication', async () => {
|
||||
await expectExit(1, async () => {
|
||||
await transferCommand({ from: 'local', to: destinationUrl });
|
||||
});
|
||||
|
||||
expect(mockDataTransfer.createRemoteStrapiDestinationProvider).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
url: destinationUrl,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it('uses restore as the default strategy', async () => {
|
||||
await expectExit(1, async () => {
|
||||
await transferCommand({ from: 'local', to: destinationUrl });
|
||||
});
|
||||
|
||||
expect(mockDataTransfer.createRemoteStrapiDestinationProvider).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
strategy: 'restore',
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it('uses local strapi instance when local specified', async () => {
|
||||
await expectExit(1, async () => {
|
||||
await transferCommand({ from: 'local', to: destinationUrl });
|
||||
});
|
||||
|
||||
expect(mockDataTransfer.createLocalStrapiSourceProvider).toHaveBeenCalled();
|
||||
expect(utils.createStrapiInstance).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('Logs an error when the source provider does not exist', async () => {
|
||||
await expectExit(1, async () => {
|
||||
await transferCommand({ from: 'test', to: destinationUrl });
|
||||
});
|
||||
|
||||
expect(logger).toHaveBeenCalledWith("Couldn't create providers");
|
||||
});
|
||||
});
|
@ -4,8 +4,6 @@ const {
|
||||
createLocalFileDestinationProvider,
|
||||
createLocalStrapiSourceProvider,
|
||||
createTransferEngine,
|
||||
// TODO: we need to solve this issue with typescript modules
|
||||
// eslint-disable-next-line import/no-unresolved, node/no-missing-require
|
||||
} = require('@strapi/data-transfer');
|
||||
const { isObject, isString, isFinite, toNumber } = require('lodash/fp');
|
||||
const fs = require('fs-extra');
|
||||
|
@ -7,8 +7,6 @@ const {
|
||||
DEFAULT_VERSION_STRATEGY,
|
||||
DEFAULT_SCHEMA_STRATEGY,
|
||||
DEFAULT_CONFLICT_STRATEGY,
|
||||
// TODO: we need to solve this issue with typescript modules
|
||||
// eslint-disable-next-line import/no-unresolved, node/no-missing-require
|
||||
} = require('@strapi/data-transfer');
|
||||
const { isObject } = require('lodash/fp');
|
||||
const path = require('path');
|
||||
|
Loading…
x
Reference in New Issue
Block a user