Init push transfer capabilities

Co-authored-by: Christian <christiancp100@gmail.com>
Co-authored-by: Ben Irvin <innerdvations@users.noreply.github.com>
This commit is contained in:
Convly 2022-12-22 12:30:45 +01:00
parent 7127ce5d6a
commit 4b2b00a3dd
14 changed files with 729 additions and 24 deletions

View File

@ -52,6 +52,7 @@
"@fortawesome/react-fontawesome": "^0.2.0",
"@pmmmwh/react-refresh-webpack-plugin": "0.5.7",
"@strapi/babel-plugin-switch-ee-ce": "4.5.4",
"@strapi/data-transfer": "4.5.4",
"@strapi/design-system": "1.4.0",
"@strapi/helper-plugin": "4.5.4",
"@strapi/icons": "1.4.0",

View File

@ -1,5 +1,7 @@
'use strict';
const { register: registerDataTransfer } = require('@strapi/data-transfer');
const registerAdminPanelRoute = require('./routes/serve-admin-panel');
const adminAuthStrategy = require('./strategies/admin');
const apiTokenAuthStrategy = require('./strategies/api-token');
@ -14,4 +16,6 @@ module.exports = ({ strapi }) => {
if (strapi.config.serveAdminPanel) {
registerAdminPanelRoute({ strapi });
}
registerDataTransfer(strapi);
};

View File

@ -239,6 +239,21 @@ class TransferEngine<
const { stage, source, destination, transform, tracker } = options;
if (!source || !destination) {
// Wait until source and destination are closed
await Promise.allSettled(
[source, destination].map((stream) => {
// if stream is undefined or already closed, resolve immediately
if (!stream || stream.destroyed) {
return Promise.resolve();
}
// Wait until the close event is produced and then destroy the stream and resolve
return new Promise((resolve, reject) => {
stream.on('close', resolve).on('error', reject).destroy();
});
})
);
return;
}

View File

@ -1,2 +1,4 @@
export * from './engine';
export * from './providers';
export { default as register } from './register';

View File

@ -5,3 +5,5 @@ export * from './local-strapi-source-provider';
// destination providers
export * from './local-file-destination-provider';
export * from './local-strapi-destination-provider';
export * from './remote-strapi-destination-provider';

View File

@ -9,8 +9,9 @@ import * as utils from '../../utils';
export const VALID_CONFLICT_STRATEGIES = ['restore', 'merge'];
export const DEFAULT_CONFLICT_STRATEGY = 'restore';
interface ILocalStrapiDestinationProviderOptions {
export interface ILocalStrapiDestinationProviderOptions {
getStrapi(): Strapi.Strapi | Promise<Strapi.Strapi>;
autoDestroy?: boolean;
restore?: restore.IRestoreOptions;
strategy: 'restore' | 'merge';
}
@ -37,7 +38,12 @@ class LocalStrapiDestinationProvider implements IDestinationProvider {
}
async close(): Promise<void> {
await this.strapi?.destroy?.();
const { autoDestroy } = this.options;
// Basically `!== false` but more deterministic
if (autoDestroy === undefined || autoDestroy === true) {
await this.strapi?.destroy();
}
}
#validateOptions() {
@ -60,7 +66,7 @@ class LocalStrapiDestinationProvider implements IDestinationProvider {
}
}
getMetadata(): IMetadata | Promise<IMetadata> {
getMetadata(): IMetadata {
const strapiVersion = strapi.config.get('info.strapi');
const createdAt = new Date().toISOString();

View File

@ -8,24 +8,23 @@ import type { IConfiguration } from '../../../types';
* Create a readable stream that export the Strapi app configuration
*/
export const createConfigurationStream = (strapi: Strapi.Strapi): Readable => {
// Core Store
const coreStoreStream = chain([
strapi.db.queryBuilder('strapi::core-store').stream(),
(data) => set('value', JSON.parse(data.value), data),
wrapConfigurationItem('core-store'),
]);
// Webhook
const webhooksStream = chain([
strapi.db.queryBuilder('webhook').stream(),
wrapConfigurationItem('webhook'),
]);
const streams = [coreStoreStream, webhooksStream];
// Readable configuration stream
return Readable.from(
(async function* configurationGenerator(): AsyncGenerator<IConfiguration> {
// Core Store
const coreStoreStream = chain([
strapi.db.queryBuilder('strapi::core-store').stream(),
(data) => set('value', JSON.parse(data.value), data),
wrapConfigurationItem('core-store'),
]);
// Webhook
const webhooksStream = chain([
strapi.db.queryBuilder('webhook').stream(),
wrapConfigurationItem('webhook'),
]);
const streams = [coreStoreStream, webhooksStream];
for (const stream of streams) {
for await (const item of stream) {
yield item;

View File

@ -32,11 +32,15 @@ export const createEntitiesStream = (strapi: Strapi.Strapi): Readable => {
contentType: ContentTypeSchema;
}> {
for await (const { stream, contentType } of contentTypeStreamGenerator()) {
for await (const entity of stream) {
yield { entity, contentType };
try {
for await (const entity of stream) {
yield { entity, contentType };
}
} catch {
// ignore
} finally {
stream.destroy();
}
stream.destroy();
}
})()
);

View File

@ -0,0 +1,201 @@
import { WebSocket } from 'ws';
import { v4 } from 'uuid';
import { Writable } from 'stream';
import type {
IDestinationProvider,
IEntity,
ILink,
IMetadata,
ProviderType,
IConfiguration,
TransferStage,
} from '../../../types';
import type { ILocalStrapiDestinationProviderOptions } from '../local-strapi-destination-provider';
interface ITokenAuth {
type: 'token';
token: string;
}
interface ICredentialsAuth {
type: 'credentials';
email: string;
password: string;
}
interface IRemoteStrapiDestinationProvider {
url: string;
auth?: ITokenAuth | ICredentialsAuth;
strategy: ILocalStrapiDestinationProviderOptions['strategy'];
}
type Actions = 'bootstrap' | 'close' | 'beforeTransfer' | 'getMetadata' | 'getSchemas';
export const createRemoteStrapiDestinationProvider = (
options: IRemoteStrapiDestinationProvider
) => {
return new RemoteStrapiDestinationProvider(options);
};
class RemoteStrapiDestinationProvider implements IDestinationProvider {
name = 'destination::remote-strapi';
type: ProviderType = 'destination';
options: IRemoteStrapiDestinationProvider;
ws: WebSocket | null;
constructor(options: IRemoteStrapiDestinationProvider) {
this.options = options;
this.ws = null;
}
async #dispatch<U = unknown, T extends object = object>(message: T): Promise<U> {
const { ws } = this;
if (!ws) {
throw new Error('No ws connection found');
}
return new Promise((resolve, reject) => {
const uuid = v4();
const payload = JSON.stringify({ ...message, uuid });
ws.send(payload, (error) => {
if (error) {
reject(error);
}
});
ws.once('message', (raw) => {
const response: { uuid: string; data: U; error: string | null } = JSON.parse(
raw.toString()
);
if (response.error) {
return reject(new Error(response.error));
}
if (response.uuid === uuid) {
return resolve(response.data);
}
});
});
}
async #dispatchAction<T = unknown>(action: Actions) {
return this.#dispatch<T>({ type: 'action', action });
}
async #dispatchTransfer<T = unknown>(stage: TransferStage, data: T) {
try {
await this.#dispatch({ type: 'transfer', stage, data });
} catch (e) {
if (e instanceof Error) {
return e;
}
return new Error('Unexected error');
}
return null;
}
async bootstrap(): Promise<void> {
const { url, auth, strategy } = this.options;
let ws: WebSocket;
// No auth defined, trying public access for transfer
if (!auth) {
ws = new WebSocket(url);
}
// Common token auth, this should be the main auth method
else if (auth.type === 'token') {
const headers = { Authentication: `Bearer ${auth.token}` };
ws = new WebSocket(this.options.url, { headers });
}
// Invalid auth method provided
else {
throw new Error('Auth method not implemented');
}
this.ws = ws;
// Wait for the connection to be made to the server, then init the transfer
await new Promise<void>((resolve, reject) => {
ws.once('open', async () => {
await this.#dispatch({ type: 'init', kind: 'push', data: { strategy } });
resolve();
}).once('error', reject);
});
// Run the bootstrap
await this.#dispatchAction('bootstrap');
}
async close() {
await this.#dispatchAction('close');
await new Promise<void>((resolve) => {
const { ws } = this;
if (!ws || ws.CLOSED) {
resolve();
return;
}
ws.on('close', () => resolve()).close();
});
}
getMetadata() {
return this.#dispatchAction<IMetadata>('getMetadata');
}
async beforeTransfer() {
await this.#dispatchAction('beforeTransfer');
}
getSchemas(): Promise<Strapi.Schemas> {
return this.#dispatchAction<Strapi.Schemas>('getSchemas');
}
getEntitiesStream(): Writable {
return new Writable({
objectMode: true,
write: async (entity: IEntity, _encoding, callback) => {
const e = await this.#dispatchTransfer('entities', entity);
callback(e);
},
});
}
getLinksStream(): Writable {
return new Writable({
objectMode: true,
write: async (link: ILink, _encoding, callback) => {
const e = await this.#dispatchTransfer('links', link);
callback(e);
},
});
}
getConfigurationStream(): Writable {
return new Writable({
objectMode: true,
write: async (configuration: IConfiguration, _encoding, callback) => {
const e = await this.#dispatchTransfer('configuration', configuration);
callback(e);
},
});
}
}

View File

@ -0,0 +1,295 @@
import type { Context } from 'koa';
import type { ServerOptions } from 'ws';
import { WebSocket } from 'ws';
import { Writable } from 'stream';
import { IAsset, IConfiguration, IEntity, ILink, IMetadata, TransferStage } from '../types';
import {
createLocalStrapiDestinationProvider,
ILocalStrapiDestinationProviderOptions,
} from './providers';
type PushTransferStage = Exclude<TransferStage, 'schemas'>;
type MessageKind = 'push' | 'pull';
type Message = { uuid: string } & (InitMessage | TransferMessage | ActionMessage | TeardownMessage);
// init
type InitMessage = { type: 'init' } & (IPushInitMessage | IPullInitMessage);
interface IPushInitMessage {
type: 'init';
kind: 'push';
data: { strategy: ILocalStrapiDestinationProviderOptions['strategy'] };
}
interface IPullInitMessage {
type: 'init';
kind: 'pull';
}
// teardown
type TeardownMessage = { type: 'teardown' };
// action
type ActionMessage = {
type: 'action';
action: 'bootstrap' | 'close' | 'beforeTransfer' | 'getMetadata' | 'getSchemas';
};
// transfer
type TransferMessage = PushTransferMessage;
type PushTransferMessage = { type: 'transfer' } & (
| PushEntityMessage
| PushLinkMessage
| PushAssetMessage
| PushConfigurationMessage
);
type PushEntityMessage = { stage: 'entities'; data: IEntity };
type PushLinkMessage = { stage: 'links'; data: ILink };
type PushAssetMessage = { stage: 'assets'; data: IAsset };
type PushConfigurationMessage = { stage: 'configuration'; data: IConfiguration };
// Internal state
interface ITransferState {
kind?: MessageKind;
controller?: IPushController;
}
// Controllers
interface IPushController {
actions: {
getMetadata(): Promise<IMetadata>;
getSchemas(): Strapi.Schemas;
bootstrap(): Promise<void>;
close(): Promise<void>;
beforeTransfer(): Promise<void>;
};
transfer: {
entities(entity: IEntity): Promise<void> | void;
links(link: ILink): Promise<void> | void;
configuration(configuration: IConfiguration): Promise<void> | void;
assets(asset: IAsset): Promise<void> | void;
};
}
const createPushController = (
ws: WebSocket,
options: ILocalStrapiDestinationProviderOptions
): IPushController => {
const provider = createLocalStrapiDestinationProvider(options);
const streams: { [stage in PushTransferStage]?: Writable } = {};
const writeAsync = <T>(stream: Writable, data: T) => {
return new Promise<void>((resolve, reject) => {
stream.write(data, (error) => {
if (error) {
reject(error);
}
resolve();
});
});
};
return {
actions: {
async getSchemas() {
return provider.getSchemas();
},
async getMetadata() {
return provider.getMetadata();
},
async bootstrap() {
return provider.bootstrap();
},
async close() {
return provider.close();
},
async beforeTransfer() {
return provider.beforeTransfer();
},
},
transfer: {
async entities(entity) {
if (!streams.entities) {
streams.entities = provider.getEntitiesStream();
}
await writeAsync(streams.entities, entity);
},
async links(link) {
if (!streams.links) {
streams.links = await provider.getLinksStream();
}
await writeAsync(streams.links, link);
},
async configuration(config) {
if (!streams.configuration) {
streams.configuration = await provider.getConfigurationStream();
}
await writeAsync(streams.configuration, config);
},
async assets(asset) {
if (!streams.assets) {
streams.assets = await provider.getAssetsStream();
}
await writeAsync(streams.assets, asset);
},
},
};
};
const createTransferController =
(options: ServerOptions = {}) =>
async (ctx: Context) => {
const upgradeHeader = (ctx.request.headers.upgrade || '')
.split(',')
.map((s) => s.trim().toLowerCase());
// Create the websocket server
const wss = new WebSocket.Server({ ...options, noServer: true });
if (upgradeHeader.includes('websocket')) {
wss.handleUpgrade(ctx.req, ctx.request.socket, Buffer.alloc(0), (ws) => {
// Create a connection between the client & the server
wss.emit('connection', ws, ctx.req);
const state: ITransferState = {};
let uuid: string | undefined;
const callback = <T = unknown>(e: Error | null = null, data?: T) => {
return new Promise<void>((resolve, reject) => {
if (!uuid) {
reject(new Error('Missing uuid for this message'));
return;
}
const payload = JSON.stringify({
uuid,
data: data ?? {},
error: e,
});
ws.send(payload, (error) => (error ? reject(error) : resolve()));
});
};
const answer = async <T = unknown>(fn: () => T) => {
try {
const response = await fn();
callback(null, response);
} catch (e) {
if (e instanceof Error) {
callback(e);
} else if (typeof e === 'string') {
callback(new Error(e));
} else {
callback(new Error('Unexpected error'));
}
}
};
const teardown = () => {
delete state.kind;
delete state.controller;
return { ok: true };
};
const init = (kind: MessageKind, data: unknown = {}) => {
if (state.controller) {
throw new Error('Transfer already in progres');
}
if (kind === 'push') {
const { strategy } = data as Partial<IPushInitMessage['data']>;
if (!strategy) {
throw new Error('Tried to initiate a push transfer without a strategy');
}
state.controller = createPushController(ws, {
strategy,
autoDestroy: false,
getStrapi() {
return strapi;
},
});
}
return { ok: true };
};
ws.on('close', () => {
teardown();
});
ws.on('error', (e) => {
teardown();
console.error(e);
});
ws.on('message', async (raw) => {
const msg: Message = JSON.parse(raw.toString());
if (!msg.uuid) {
throw new Error('Missing uuid in message');
}
uuid = msg.uuid;
if (msg.type === 'init') {
await answer(() => init(msg.kind, (msg as any)?.data));
}
if (msg.type === 'teardown') {
await answer(teardown);
}
if (msg.type === 'action') {
await answer(() => state.controller?.actions[msg.action]?.());
}
if (msg.type === 'transfer') {
await answer(() => state.controller?.transfer[msg.stage]?.(msg.data as any));
}
});
});
ctx.respond = false;
}
};
const registerTransferRoute = (strapi: any) => {
strapi.admin.routes.push({
method: 'GET',
path: '/transfer',
handler: createTransferController(),
config: { auth: false },
});
};
const register = (strapi: any) => {
registerTransferRoute(strapi);
};
export default register;

View File

@ -47,7 +47,9 @@
"stream-chain": "2.2.5",
"stream-json": "1.7.4",
"tar": "6.1.12",
"tar-stream": "2.2.0"
"tar-stream": "2.2.0",
"uuid": "9.0.0",
"ws": "8.11.0"
},
"devDependencies": {
"@tsconfig/node16": "1.0.3",
@ -58,6 +60,8 @@
"@types/stream-json": "1.7.2",
"@types/tar": "6.1.3",
"@types/tar-stream": "2.2.2",
"@types/uuid": "9.0.0",
"koa": "2.14.1",
"rimraf": "3.0.2",
"typescript": "4.6.2"
},

View File

@ -258,6 +258,15 @@ program
.option('-s, --silent', `Run the generation silently, without any output`, false)
.action(getLocalScript('ts/generate-types'));
// `$ strapi transfer`
program
.command('transfer')
.description('Transfer data from one source to another')
.addOption(new Option('--from <source>', `Source of your data`).default('local'))
.addOption(new Option('--to <destination>', `Destination of your data`).default('remote'))
.allowExcessArguments(false)
.action(getLocalScript('transfer/transfer'));
// `$ strapi export`
program
.command('export')

View File

@ -0,0 +1,119 @@
'use strict';
const {
createRemoteStrapiDestinationProvider,
createLocalStrapiSourceProvider,
createTransferEngine,
// TODO: we need to solve this issue with typescript modules
// eslint-disable-next-line import/no-unresolved, node/no-missing-require
} = require('@strapi/data-transfer');
const { isObject } = require('lodash/fp');
const chalk = require('chalk');
const {
buildTransferTable,
createStrapiInstance,
DEFAULT_IGNORED_CONTENT_TYPES,
} = require('./utils');
/**
* @typedef ImportCommandOptions Options given to the CLI import command
*
* @property {string} [file] The file path to import
* @property {boolean} [encrypt] Used to encrypt the final archive
* @property {string} [key] Encryption key, only useful when encryption is enabled
* @property {boolean} [compress] Used to compress the final archive
*/
const logger = console;
/**
* Import command.
*
* It transfers data from a local file to a local strapi instance
*
* @param {ImportCommandOptions} opts
*/
module.exports = async (opts) => {
// Validate inputs from Commander
if (!isObject(opts)) {
logger.error('Could not parse command arguments');
process.exit(1);
}
const strapi = await createStrapiInstance();
let source;
let destination;
if (opts.from === 'local') {
source = createSourceProvider(strapi);
}
if (opts.to) {
destination = createDestinationProvider({
url: opts.to,
auth: false,
strategy: 'restore',
});
}
if (!source || !destination) {
logger.error("Couldn't create providers");
process.exit(1);
}
const engine = createTransferEngine(source, destination, {
versionStrategy: 'ignore', // for an export to file, versionStrategy will always be skipped
schemaStrategy: 'ignore', // for an export to file, schemaStrategy will always be skipped
transforms: {
links: [
{
filter(link) {
return (
!DEFAULT_IGNORED_CONTENT_TYPES.includes(link.left.type) &&
!DEFAULT_IGNORED_CONTENT_TYPES.includes(link.right.type)
);
},
},
],
entities: [
{
filter(entity) {
return !DEFAULT_IGNORED_CONTENT_TYPES.includes(entity.type);
},
},
],
},
});
try {
logger.log(`Starting export...`);
const results = await engine.transfer();
const table = buildTransferTable(results.engine);
logger.log(table.toString());
logger.log(`${chalk.bold('Transfer process has been completed successfully!')}`);
process.exit(0);
} catch (e) {
logger.error('Transfer process failed unexpectedly:', e);
process.exit(1);
}
};
/**
* It creates a local strapi destination provider
*/
const createSourceProvider = (strapi) => {
return createLocalStrapiSourceProvider({
async getStrapi() {
return strapi;
},
});
};
/**
* It creates a remote strapi destination provider based on the given options
*/
const createDestinationProvider = (opts) => {
return createRemoteStrapiDestinationProvider(opts);
};

View File

@ -6713,6 +6713,11 @@
resolved "https://registry.yarnpkg.com/@types/unist/-/unist-2.0.6.tgz#250a7b16c3b91f672a24552ec64678eeb1d3a08d"
integrity sha512-PBjIUxZHOuj0R15/xuwJYjFi+KZdNFrehocChv4g5hu6aFroHue8m0lBP0POdK2nKzbw0cgV1mws8+V/JAcEkQ==
"@types/uuid@9.0.0":
version "9.0.0"
resolved "https://registry.yarnpkg.com/@types/uuid/-/uuid-9.0.0.tgz#53ef263e5239728b56096b0a869595135b7952d2"
integrity sha512-kr90f+ERiQtKWMz5rP32ltJ/BtULDI5RVO0uavn1HQUOwjx0R1h0rnDYNL0CepF1zL5bSY6FISAfd9tOdDhU5Q==
"@types/webpack-env@^1.16.0":
version "1.18.0"
resolved "https://registry.yarnpkg.com/@types/webpack-env/-/webpack-env-1.18.0.tgz#ed6ecaa8e5ed5dfe8b2b3d00181702c9925f13fb"
@ -15557,6 +15562,35 @@ koa@2.13.4, koa@^2.13.4:
type-is "^1.6.16"
vary "^1.1.2"
koa@2.14.1:
version "2.14.1"
resolved "https://registry.yarnpkg.com/koa/-/koa-2.14.1.tgz#defb9589297d8eb1859936e777f3feecfc26925c"
integrity sha512-USJFyZgi2l0wDgqkfD27gL4YGno7TfUkcmOe6UOLFOVuN+J7FwnNu4Dydl4CUQzraM1lBAiGed0M9OVJoT0Kqw==
dependencies:
accepts "^1.3.5"
cache-content-type "^1.0.0"
content-disposition "~0.5.2"
content-type "^1.0.4"
cookies "~0.8.0"
debug "^4.3.2"
delegates "^1.0.0"
depd "^2.0.0"
destroy "^1.0.4"
encodeurl "^1.0.2"
escape-html "^1.0.3"
fresh "~0.5.2"
http-assert "^1.3.0"
http-errors "^1.6.3"
is-generator-function "^1.0.7"
koa-compose "^4.1.0"
koa-convert "^2.0.0"
on-finished "^2.3.0"
only "~0.0.2"
parseurl "^1.3.2"
statuses "^1.5.0"
type-is "^1.6.16"
vary "^1.1.2"
kuler@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/kuler/-/kuler-2.0.0.tgz#e2c570a3800388fb44407e851531c1d670b061b3"
@ -22716,6 +22750,11 @@ uuid@8.0.0:
resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.0.0.tgz#bc6ccf91b5ff0ac07bbcdbf1c7c4e150db4dbb6c"
integrity sha512-jOXGuXZAWdsTH7eZLtyXMqUb9EcWMGZNbL9YcGBJl4MH4nrxHmZJhEHvyLFrkxo+28uLb/NYRcStH48fnD0Vzw==
uuid@9.0.0:
version "9.0.0"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-9.0.0.tgz#592f550650024a38ceb0c562f2f6aa435761efb5"
integrity sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==
uuid@^3.3.2:
version "3.4.0"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.4.0.tgz#b23e4358afa8a202fe7a100af1f5f883f02007ee"
@ -23421,6 +23460,11 @@ write-pkg@^4.0.0:
type-fest "^0.4.1"
write-json-file "^3.2.0"
ws@8.11.0:
version "8.11.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.11.0.tgz#6a0d36b8edfd9f96d8b25683db2f8d7de6e8e143"
integrity sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==
"ws@^5.2.0 || ^6.0.0 || ^7.0.0", ws@^7.3.1:
version "7.5.9"
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591"