diff --git a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts index 6b56e8d8c7..9b9c2699a2 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts @@ -333,13 +333,7 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { } createAssetsWriteStream(): Writable | Promise { - type TransferAssetFlow = { - assetID: string; - action: 'stream'; - data: Omit; - chunk: Buffer; - }; - let batch: TransferAssetFlow[] = []; + let batch: any[] = []; const batchSize = 100; const startAssetsTransferOnce = this.#startStepOnce('assets'); @@ -375,23 +369,33 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { const { filename, filepath, stats, stream } = asset; const assetID = v4(); + batch.push({ + action: 'start', + assetID, + data: { filename, filepath, stats }, + }); + if (batch.length === batchSize) { + await this.#streamStep('assets', batch); + batch = []; + } for await (const chunk of stream) { - batch.push({ - action: 'stream', - assetID, - data: { filename, filepath, stats }, - chunk, - }); + batch.push({ action: 'stream', assetID, data: chunk }); if (batch.length === batchSize) { await this.#streamStep('assets', batch); batch = []; } } + + batch.push({ + action: 'end', + assetID, + }); if (batch.length === batchSize) { await this.#streamStep('assets', batch); batch = []; } + callback(); }, }); diff --git a/packages/core/data-transfer/src/strapi/remote/controllers/push.ts b/packages/core/data-transfer/src/strapi/remote/controllers/push.ts index a2b6048ebc..3dd8bbcb2e 100644 --- a/packages/core/data-transfer/src/strapi/remote/controllers/push.ts +++ b/packages/core/data-transfer/src/strapi/remote/controllers/push.ts @@ -103,54 +103,45 @@ const createPushController = (options: ILocalStrapiDestinationProviderOptions): }); }, - async assets(payload) { + async assets(payloads) { // TODO: close the stream upong receiving an 'end' event instead - if (payload === null) { - const assetsKeys = Object.keys(assets); - const previousAssetId = assetsKeys[assetsKeys.length - 1]; - const { stream } = assets[previousAssetId]; - stream - .on('close', () => { - delete assets[previousAssetId]; - }) - .on('error', (error) => { - throw new Error(`error while closing stream ${error}`); - }) - .end(); + if (payloads === null) { streams.assets?.end(); return; } - if (!streams.assets) { streams.assets = await provider.createAssetsWriteStream(); } - payload.map(async (asset) => { - const { action, assetID, data, chunk } = asset; - const assetsKeys = Object.keys(assets); - if (action === 'stream') { - if (!assets[assetID] && assetsKeys.length !== 0) { - const previousAssetId = assetsKeys[assetsKeys.length - 1]; - const { stream } = assets[previousAssetId]; - stream - .on('close', () => { - delete assets[previousAssetId]; - }) - .on('error', (error) => { - throw new Error(`error while closing stream ${error}`); - }) - .end(); - } + payloads.forEach(async (payload) => { + const { action, assetID } = payload; - if (!assets[assetID] && streams.assets) { - assets[assetID] = { ...data, stream: new PassThrough() }; - writeAsync(streams.assets, assets[assetID]); - } + if (action === 'start' && streams.assets) { + assets[assetID] = { ...payload.data, stream: new PassThrough() }; + writeAsync(streams.assets, assets[assetID]); + } + + if (action === 'stream') { // The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array } // We need to transform it back into a Buffer instance - const rawBuffer = chunk as unknown as { type: 'Buffer'; data: Uint8Array }; - const buffer = Buffer.from(rawBuffer.data); - await writeAsync(assets[assetID].stream, buffer); + const rawBuffer = payload.data as unknown as { type: 'Buffer'; data: Uint8Array }; + const chunk = Buffer.from(rawBuffer.data); + + await writeAsync(assets[assetID].stream, chunk); + } + + if (action === 'end') { + await new Promise((resolve, reject) => { + const { stream } = assets[assetID]; + + stream + .on('close', () => { + delete assets[assetID]; + resolve(); + }) + .on('error', reject) + .end(); + }); } }); }, diff --git a/packages/core/data-transfer/types/remote/protocol/client/transfer/push.d.ts b/packages/core/data-transfer/types/remote/protocol/client/transfer/push.d.ts index 53881403a3..38867fad1b 100644 --- a/packages/core/data-transfer/types/remote/protocol/client/transfer/push.d.ts +++ b/packages/core/data-transfer/types/remote/protocol/client/transfer/push.d.ts @@ -24,9 +24,8 @@ type TransferStepCommands = { step: T } & TransferStepFlow< type TransferStepFlow = { action: 'start' } | { action: 'stream'; data: U } | { action: 'end' }; -type TransferAssetFlow = { - assetID: string; - action: 'stream'; - data: Omit; - chunk: Buffer; -}; +type TransferAssetFlow = { assetID: string } & ( + | { action: 'start'; data: Omit } + | { action: 'stream'; data: Buffer } + | { action: 'end' } +);