From 0e83bc28d9e9a5bf4bdf1ffeee9ae3230fb6ecb1 Mon Sep 17 00:00:00 2001 From: Christian Capeans Date: Fri, 23 Jun 2023 13:57:54 +0200 Subject: [PATCH 1/8] Optimise pull transfer for entities, configurations and links --- .../strapi/providers/remote-source/index.ts | 4 ++- .../src/strapi/remote/handlers/pull.ts | 29 +++++++++++++++++-- .../remote/protocol/client/transfer/pull.d.ts | 17 ++++++++--- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts index d51e6a9a41..48e7a541e3 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts @@ -85,7 +85,9 @@ class RemoteStrapiSourceProvider implements ISourceProvider { return; } - stream.push(data); + for (const item of data) { + stream.push(item); + } this.ws?.once('message', listener); diff --git a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts index ace6e41158..27d56426a2 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts @@ -6,6 +6,7 @@ import { handlerControllerFactory, isDataTransferMessage } from './utils'; import { createLocalStrapiSourceProvider, ILocalStrapiSourceProvider } from '../../providers'; import { ProviderTransferError } from '../../../errors/providers'; import type { IAsset, TransferStage, Protocol } from '../../../../types'; +import { Client } from '../../../../types/remote/protocol'; const TRANSFER_KIND = 'pull'; const VALID_TRANSFER_ACTIONS = ['bootstrap', 'close', 'getMetadata', 'getSchemas'] as const; @@ -134,18 +135,40 @@ export const createPullController = handlerControllerFactory, id) { + type Stage = typeof stage; + const batchSize = 1024 * 1024; + let batch = [] as Client.GetTransferPullStreamData; const stream = this.streams?.[stage]; + const batchLength = () => Buffer.byteLength(JSON.stringify(batch)); + const sendBatch = async () => { + await this.confirm({ + type: 'transfer', + data: batch, + ended: false, + error: null, + id, + }); + }; + if (!stream) { throw new ProviderTransferError(`No available stream found for ${stage}`); } try { for await (const chunk of stream) { - await this.confirm({ type: 'transfer', data: chunk, ended: false, error: null, id }); + batch.push(chunk); + if (batchLength() >= batchSize) { + await sendBatch(); + batch = []; + } } + if (batch.length > 0) { + await sendBatch(); + batch = []; + } await this.confirm({ type: 'transfer', data: null, ended: true, error: null, id }); } catch (e) { await this.confirm({ type: 'transfer', data: null, ended: true, error: e, id }); @@ -163,7 +186,7 @@ export const createPullController = handlerControllerFactory - | TransferStepCommands<'links', ILink> - | TransferStepCommands<'configuration', IConfiguration> - | TransferStepCommands<'assets', TransferAssetFlow | null> + | TransferStepCommands<'entities', IEntity[]> + | TransferStepCommands<'links', ILink[]> + | TransferStepCommands<'configuration', IConfiguration[]> + | TransferStepCommands<'assets', TransferAssetFlow[] | null> >; export type TransferPullStep = TransferPullMessage['step']; +export type GetTransferPullStreamData = { + [key in TransferPullStep]: { + action: 'stream'; + step: key; + } & TransferPullMessage; +}[T] extends { data: infer U } + ? U + : never; + type TransferStepCommands = { step: T } & TransferStepFlow; type TransferStepFlow = { action: 'start' } | { action: 'stream'; data: U } | { action: 'end' }; From 6359676a9b79506581b4d730829d8b36f0661a3c Mon Sep 17 00:00:00 2001 From: Christian Capeans Date: Wed, 28 Jun 2023 09:30:04 +0200 Subject: [PATCH 2/8] Optimize assets transfer --- .../strapi/providers/remote-source/index.ts | 65 +++++++++++++------ .../src/strapi/remote/handlers/pull.ts | 34 ++++++++-- 2 files changed, 74 insertions(+), 25 deletions(-) diff --git a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts index 48e7a541e3..12b979fb98 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts @@ -1,5 +1,5 @@ import type { Schema, Utils } from '@strapi/strapi'; -import { PassThrough, Readable } from 'stream'; +import { PassThrough, Readable, Writable } from 'stream'; import { WebSocket } from 'ws'; import type { @@ -8,6 +8,7 @@ import type { ISourceProvider, ISourceProviderTransferResults, MaybePromise, + Protocol, ProviderType, TransferStage, } from '../../../../types'; @@ -107,36 +108,58 @@ class RemoteStrapiSourceProvider implements ISourceProvider { return this.#createStageReadStream('links'); } + writeAsync = (stream: Writable, data: T) => { + return new Promise((resolve, reject) => { + stream.write(data, (error) => { + if (error) { + reject(error); + } + + resolve(); + }); + }); + }; + async createAssetsReadStream(): Promise { - const assets: { [filename: string]: Readable } = {}; + const assets: { + [filename: string]: IAsset & { + stream: PassThrough; + }; + } = {}; const stream = await this.#createStageReadStream('assets'); const pass = new PassThrough({ objectMode: true }); stream - .on( - 'data', - (asset: Omit & { chunk: { type: 'Buffer'; data: Uint8Array } }) => { - const { chunk, ...rest } = asset; - - if (!(asset.filename in assets)) { - const assetStream = new PassThrough(); - assets[asset.filename] = assetStream; - - pass.push({ ...rest, stream: assetStream }); + .on('data', async (payload: Protocol.Client.TransferAssetFlow[]) => { + for (const item of payload) { + const { action } = item; + if (action === 'start') { + assets[item.assetID] = { ...item.data, stream: new PassThrough() }; + await this.writeAsync(pass, assets[item.assetID]); } + if (action === 'stream') { + const rawBuffer = item.data as unknown as { + type: 'Buffer'; + data: Uint8Array; + }; + const chunk = Buffer.from(rawBuffer.data); - if (asset.filename in assets) { - // The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array } - // We need to transform it back into a Buffer instance - assets[asset.filename].push(Buffer.from(chunk.data)); + await this.writeAsync(assets[item.assetID].stream, chunk); + } + if (action === 'end') { + await new Promise((resolve, reject) => { + const { stream: assetStream } = assets[item.assetID]; + assetStream + .on('close', () => { + delete assets[item.assetID]; + resolve(); + }) + .on('error', reject) + .end(); + }); } } - ) - .on('end', () => { - Object.values(assets).forEach((s) => { - s.push(null); - }); }) .on('close', () => { pass.end(); diff --git a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts index 27d56426a2..4c73d0bf40 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts @@ -134,7 +134,6 @@ export const createPullController = handlerControllerFactory, id) { type Stage = typeof stage; const batchSize = 1024 * 1024; @@ -186,7 +185,6 @@ export const createPullController = handlerControllerFactory this.provider?.createConfigurationReadStream(), assets: () => { const assets = this.provider?.createAssetsReadStream(); + let batch: Protocol.Client.TransferAssetFlow[] = []; + + const batchLength = () => { + return batch.reduce( + (acc, chunk) => (chunk.action === 'stream' ? acc + chunk.data.byteLength : acc), + 0 + ); + }; + + const batchSize = 1024 * 1024; // 1MB if (!assets) { throw new Error('bad'); } async function* generator(stream: Readable) { + let hasStarted = false; + let assetID = ''; + for await (const chunk of stream) { - const { stream: assetStream, ...rest } = chunk as IAsset; + const { stream: assetStream, ...assetData } = chunk as IAsset; + if (!hasStarted) { + assetID = randomUUID(); + batch.push({ action: 'start', assetID, data: assetData }); + hasStarted = true; + } for await (const assetChunk of assetStream) { - yield { ...rest, chunk: assetChunk }; + batch.push({ action: 'stream', assetID, data: assetChunk }); + if (batchLength() >= batchSize) { + yield batch; + batch = []; + } } + + // end + hasStarted = false; + batch.push({ action: 'end', assetID }); + yield batch; + batch = []; } } From e9e5727f9fa5866551769d50c174af4f29772d98 Mon Sep 17 00:00:00 2001 From: Christian Capeans Date: Thu, 29 Jun 2023 12:24:26 +0200 Subject: [PATCH 3/8] Add comments to clarify the code --- .../src/strapi/providers/remote-source/index.ts | 9 +++++++++ .../src/strapi/remote/handlers/pull.ts | 17 +++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts index 12b979fb98..c7c0eca328 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts @@ -134,11 +134,17 @@ class RemoteStrapiSourceProvider implements ISourceProvider { .on('data', async (payload: Protocol.Client.TransferAssetFlow[]) => { for (const item of payload) { const { action } = item; + + // Creates the stream to send the incoming asset through if (action === 'start') { + // Each asset has its own stream identified by its assetID assets[item.assetID] = { ...item.data, stream: new PassThrough() }; await this.writeAsync(pass, assets[item.assetID]); } + + // Writes the asset data to the created stream if (action === 'stream') { + // Converts data into buffer const rawBuffer = item.data as unknown as { type: 'Buffer'; data: Uint8Array; @@ -147,11 +153,14 @@ class RemoteStrapiSourceProvider implements ISourceProvider { await this.writeAsync(assets[item.assetID].stream, chunk); } + + // The asset has been transferred if (action === 'end') { await new Promise((resolve, reject) => { const { stream: assetStream } = assets[item.assetID]; assetStream .on('close', () => { + // Deletes the stream for the asset delete assets[item.assetID]; resolve(); }) diff --git a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts index 4c73d0bf40..f6a1af0ad3 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts @@ -221,12 +221,17 @@ export const createPullController = handlerControllerFactory= batchSize) { + + // if the batch size is bigger than BATCH_MAX_SIZE stream the batch + if (batchLength() >= BATCH_MAX_SIZE) { yield batch; batch = []; } } - // end + // All the asset data has been streamed and gets ready for the next one hasStarted = false; batch.push({ action: 'end', assetID }); yield batch; From 623ab4747a59df8a3eb9b7a10a5fae4c35c19071 Mon Sep 17 00:00:00 2001 From: Christian Capeans Date: Thu, 29 Jun 2023 12:30:18 +0200 Subject: [PATCH 4/8] Update documentation --- .../data-transfer/02-providers/05-remote-strapi/01-websocket.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/docs/docs/01-core/data-transfer/02-providers/05-remote-strapi/01-websocket.md b/docs/docs/docs/01-core/data-transfer/02-providers/05-remote-strapi/01-websocket.md index ecdb3b7148..31fad7beb5 100644 --- a/docs/docs/docs/01-core/data-transfer/02-providers/05-remote-strapi/01-websocket.md +++ b/docs/docs/docs/01-core/data-transfer/02-providers/05-remote-strapi/01-websocket.md @@ -38,7 +38,7 @@ Used for switching between stages of a transfer and streaming the actual data of Accepts the following `action` values: - `start`: sent with a `step` value for the name of the step/stage - - any number of `stream`: sent with a `step` value and the `data` being sent (ie, an entity) + - any number of `stream`: sent with a `step` value and the `data` being sent (ie, an array of entities) - `end`: sent with a `step` value for the step being ended ### dispatchTransferAction From 9f7b6a28aefbed1f99d3bc21e1d65c2983dfc051 Mon Sep 17 00:00:00 2001 From: Christian Date: Fri, 30 Jun 2023 13:24:54 +0200 Subject: [PATCH 5/8] Update packages/core/data-transfer/src/strapi/providers/remote-source/index.ts Co-authored-by: Ben Irvin --- .../data-transfer/src/strapi/providers/remote-source/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts index 887f0ebd8d..906b22eb44 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts @@ -141,7 +141,7 @@ class RemoteStrapiSourceProvider implements ISourceProvider { } // Writes the asset data to the created stream - if (action === 'stream') { + else if (action === 'stream') { // Converts data into buffer const rawBuffer = item.data as unknown as { type: 'Buffer'; From 5229428c2005c14ec14e401d25dbc7d32058c946 Mon Sep 17 00:00:00 2001 From: Christian Date: Fri, 30 Jun 2023 13:24:59 +0200 Subject: [PATCH 6/8] Update packages/core/data-transfer/src/strapi/providers/remote-source/index.ts Co-authored-by: Ben Irvin --- .../data-transfer/src/strapi/providers/remote-source/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts index 906b22eb44..6496753c44 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts @@ -153,7 +153,7 @@ class RemoteStrapiSourceProvider implements ISourceProvider { } // The asset has been transferred - if (action === 'end') { + else if (action === 'end') { await new Promise((resolve, reject) => { const { stream: assetStream } = assets[item.assetID]; assetStream From 533bd1f310a080b88ac4f3c4b6787520c78b57af Mon Sep 17 00:00:00 2001 From: Ben Irvin Date: Fri, 30 Jun 2023 14:44:43 +0200 Subject: [PATCH 7/8] cast to array for backwards compatibility --- .../data-transfer/src/strapi/providers/remote-source/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts index 6496753c44..4c7029da7f 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts @@ -1,6 +1,7 @@ import type { Schema, Utils } from '@strapi/strapi'; import { PassThrough, Readable, Writable } from 'stream'; import { WebSocket } from 'ws'; +import { castArray } from 'lodash/fp'; import type { IAsset, From 67e2799601bef2fe9d3c153bf6056c979f64bcb5 Mon Sep 17 00:00:00 2001 From: Ben Irvin Date: Fri, 30 Jun 2023 14:46:43 +0200 Subject: [PATCH 8/8] fix for backwards compatibility --- .../data-transfer/src/strapi/providers/remote-source/index.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts index 4c7029da7f..d210b4cf5b 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts @@ -85,7 +85,9 @@ class RemoteStrapiSourceProvider implements ISourceProvider { return; } - for (const item of data) { + // if we get a single items instead of a batch + // TODO V5: in v5 only allow array + for (const item of castArray(data)) { stream.push(item); }