diff --git a/docs/docs/docs/01-core/data-transfer/02-providers/05-remote-strapi/01-websocket.md b/docs/docs/docs/01-core/data-transfer/02-providers/05-remote-strapi/01-websocket.md index ecdb3b7148..31fad7beb5 100644 --- a/docs/docs/docs/01-core/data-transfer/02-providers/05-remote-strapi/01-websocket.md +++ b/docs/docs/docs/01-core/data-transfer/02-providers/05-remote-strapi/01-websocket.md @@ -38,7 +38,7 @@ Used for switching between stages of a transfer and streaming the actual data of Accepts the following `action` values: - `start`: sent with a `step` value for the name of the step/stage - - any number of `stream`: sent with a `step` value and the `data` being sent (ie, an entity) + - any number of `stream`: sent with a `step` value and the `data` being sent (ie, an array of entities) - `end`: sent with a `step` value for the step being ended ### dispatchTransferAction diff --git a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts index 3fb7fcd8a8..d210b4cf5b 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-source/index.ts @@ -1,6 +1,7 @@ import type { Schema, Utils } from '@strapi/strapi'; -import { PassThrough, Readable } from 'stream'; +import { PassThrough, Readable, Writable } from 'stream'; import { WebSocket } from 'ws'; +import { castArray } from 'lodash/fp'; import type { IAsset, @@ -8,6 +9,7 @@ import type { ISourceProvider, ISourceProviderTransferResults, MaybePromise, + Protocol, ProviderType, TransferStage, } from '../../../../types'; @@ -83,7 +85,11 @@ class RemoteStrapiSourceProvider implements ISourceProvider { return; } - stream.push(data); + // if we get a single items instead of a batch + // TODO V5: in v5 only allow array + for (const item of castArray(data)) { + stream.push(item); + } this.ws?.once('message', listener); @@ -103,36 +109,67 @@ class RemoteStrapiSourceProvider implements ISourceProvider { return this.#createStageReadStream('links'); } + writeAsync = (stream: Writable, data: T) => { + return new Promise((resolve, reject) => { + stream.write(data, (error) => { + if (error) { + reject(error); + } + + resolve(); + }); + }); + }; + async createAssetsReadStream(): Promise { - const assets: { [filename: string]: Readable } = {}; + const assets: { + [filename: string]: IAsset & { + stream: PassThrough; + }; + } = {}; const stream = await this.#createStageReadStream('assets'); const pass = new PassThrough({ objectMode: true }); stream - .on( - 'data', - (asset: Omit & { chunk: { type: 'Buffer'; data: Uint8Array } }) => { - const { chunk, ...rest } = asset; + .on('data', async (payload: Protocol.Client.TransferAssetFlow[]) => { + for (const item of payload) { + const { action } = item; - if (!(asset.filename in assets)) { - const assetStream = new PassThrough(); - assets[asset.filename] = assetStream; - - pass.push({ ...rest, stream: assetStream }); + // Creates the stream to send the incoming asset through + if (action === 'start') { + // Each asset has its own stream identified by its assetID + assets[item.assetID] = { ...item.data, stream: new PassThrough() }; + await this.writeAsync(pass, assets[item.assetID]); } - if (asset.filename in assets) { - // The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array } - // We need to transform it back into a Buffer instance - assets[asset.filename].push(Buffer.from(chunk.data)); + // Writes the asset data to the created stream + else if (action === 'stream') { + // Converts data into buffer + const rawBuffer = item.data as unknown as { + type: 'Buffer'; + data: Uint8Array; + }; + const chunk = Buffer.from(rawBuffer.data); + + await this.writeAsync(assets[item.assetID].stream, chunk); + } + + // The asset has been transferred + else if (action === 'end') { + await new Promise((resolve, reject) => { + const { stream: assetStream } = assets[item.assetID]; + assetStream + .on('close', () => { + // Deletes the stream for the asset + delete assets[item.assetID]; + resolve(); + }) + .on('error', reject) + .end(); + }); } } - ) - .on('end', () => { - Object.values(assets).forEach((s) => { - s.push(null); - }); }) .on('close', () => { pass.end(); diff --git a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts index ace6e41158..f6a1af0ad3 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers/pull.ts @@ -6,6 +6,7 @@ import { handlerControllerFactory, isDataTransferMessage } from './utils'; import { createLocalStrapiSourceProvider, ILocalStrapiSourceProvider } from '../../providers'; import { ProviderTransferError } from '../../../errors/providers'; import type { IAsset, TransferStage, Protocol } from '../../../../types'; +import { Client } from '../../../../types/remote/protocol'; const TRANSFER_KIND = 'pull'; const VALID_TRANSFER_ACTIONS = ['bootstrap', 'close', 'getMetadata', 'getSchemas'] as const; @@ -133,19 +134,40 @@ export const createPullController = handlerControllerFactory, id) { + type Stage = typeof stage; + const batchSize = 1024 * 1024; + let batch = [] as Client.GetTransferPullStreamData; const stream = this.streams?.[stage]; + const batchLength = () => Buffer.byteLength(JSON.stringify(batch)); + const sendBatch = async () => { + await this.confirm({ + type: 'transfer', + data: batch, + ended: false, + error: null, + id, + }); + }; + if (!stream) { throw new ProviderTransferError(`No available stream found for ${stage}`); } try { for await (const chunk of stream) { - await this.confirm({ type: 'transfer', data: chunk, ended: false, error: null, id }); + batch.push(chunk); + if (batchLength() >= batchSize) { + await sendBatch(); + batch = []; + } } + if (batch.length > 0) { + await sendBatch(); + batch = []; + } await this.confirm({ type: 'transfer', data: null, ended: true, error: null, id }); } catch (e) { await this.confirm({ type: 'transfer', data: null, ended: true, error: e, id }); @@ -163,7 +185,6 @@ export const createPullController = handlerControllerFactory this.provider?.createConfigurationReadStream(), assets: () => { const assets = this.provider?.createAssetsReadStream(); + let batch: Protocol.Client.TransferAssetFlow[] = []; + + const batchLength = () => { + return batch.reduce( + (acc, chunk) => (chunk.action === 'stream' ? acc + chunk.data.byteLength : acc), + 0 + ); + }; + + const BATCH_MAX_SIZE = 1024 * 1024; // 1MB if (!assets) { throw new Error('bad'); } - + /** + * Generates batches of 1MB of data from the assets stream to avoid + * sending too many small chunks + * + * @param stream Assets stream from the local source provider + */ async function* generator(stream: Readable) { + let hasStarted = false; + let assetID = ''; + for await (const chunk of stream) { - const { stream: assetStream, ...rest } = chunk as IAsset; + const { stream: assetStream, ...assetData } = chunk as IAsset; + if (!hasStarted) { + assetID = randomUUID(); + // Start the transfer of a new asset + batch.push({ action: 'start', assetID, data: assetData }); + hasStarted = true; + } for await (const assetChunk of assetStream) { - yield { ...rest, chunk: assetChunk }; + // Add the asset data to the batch + batch.push({ action: 'stream', assetID, data: assetChunk }); + + // if the batch size is bigger than BATCH_MAX_SIZE stream the batch + if (batchLength() >= BATCH_MAX_SIZE) { + yield batch; + batch = []; + } } + + // All the asset data has been streamed and gets ready for the next one + hasStarted = false; + batch.push({ action: 'end', assetID }); + yield batch; + batch = []; } } diff --git a/packages/core/data-transfer/types/remote/protocol/client/transfer/pull.d.ts b/packages/core/data-transfer/types/remote/protocol/client/transfer/pull.d.ts index 97c28acfe7..e12641bcd8 100644 --- a/packages/core/data-transfer/types/remote/protocol/client/transfer/pull.d.ts +++ b/packages/core/data-transfer/types/remote/protocol/client/transfer/pull.d.ts @@ -3,14 +3,23 @@ import { CreateTransferMessage, TransferAssetFlow } from './utils'; export type TransferPullMessage = CreateTransferMessage< 'step', - | TransferStepCommands<'entities', IEntity> - | TransferStepCommands<'links', ILink> - | TransferStepCommands<'configuration', IConfiguration> - | TransferStepCommands<'assets', TransferAssetFlow | null> + | TransferStepCommands<'entities', IEntity[]> + | TransferStepCommands<'links', ILink[]> + | TransferStepCommands<'configuration', IConfiguration[]> + | TransferStepCommands<'assets', TransferAssetFlow[] | null> >; export type TransferPullStep = TransferPullMessage['step']; +export type GetTransferPullStreamData = { + [key in TransferPullStep]: { + action: 'stream'; + step: key; + } & TransferPullMessage; +}[T] extends { data: infer U } + ? U + : never; + type TransferStepCommands = { step: T } & TransferStepFlow; type TransferStepFlow = { action: 'start' } | { action: 'stream'; data: U } | { action: 'end' };