diff --git a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts index bb7e1d7500..157a7e4020 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts @@ -151,7 +151,6 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { } const wsProtocol = url.protocol === 'https:' ? 'wss:' : 'ws:'; const wsUrl = `${wsProtocol}//${url.host}${url.pathname}${TRANSFER_PATH}`; - // No auth defined, trying public access for transfer if (!auth) { ws = new WebSocket(wsUrl); @@ -215,35 +214,58 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { } createEntitiesWriteStream(): Writable { + const chunkSize = 100; + let entities: IEntity[] = []; const startEntitiesTransferOnce = this.#startStepOnce('entities'); return new Writable({ objectMode: true, final: async (callback) => { + if (entities.length > 0) { + const streamError = await this.#streamStep('entities', entities); + entities = []; + if (streamError) { + return callback(streamError); + } + } const e = await this.#endStep('entities'); callback(e); }, write: async (entity: IEntity, _encoding, callback) => { const startError = await startEntitiesTransferOnce(); - if (startError) { return callback(startError); } - - const streamError = await this.#streamStep('entities', entity); - - callback(streamError); + entities.push(entity); + if (entities.length === chunkSize) { + const streamError = await this.#streamStep('entities', entities); + if (streamError) { + return callback(streamError); + } + entities = []; + } + callback(); }, }); } createLinksWriteStream(): Writable { + const chunkSize = 100; + let links: ILink[] = []; const startLinksTransferOnce = this.#startStepOnce('links'); return new Writable({ objectMode: true, final: async (callback) => { + if (links.length > 0) { + const streamError = await this.#streamStep('links', links); + + if (streamError) { + return callback(streamError); + } + links = []; + } const e = await this.#endStep('links'); callback(e); @@ -255,19 +277,35 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { return callback(startError); } - const streamError = await this.#streamStep('links', link); + if (links.length === chunkSize) { + const streamError = await this.#streamStep('links', links); - callback(streamError); + if (streamError) { + return callback(streamError); + } + links = []; + } + callback(); }, }); } createConfigurationWriteStream(): Writable { + const chunkSize = 100; + let configurations: IConfiguration[] = []; const startConfigurationTransferOnce = this.#startStepOnce('configuration'); return new Writable({ objectMode: true, final: async (callback) => { + if (configurations.length > 0) { + const streamError = await this.#streamStep('configuration', configurations); + + if (streamError) { + return callback(streamError); + } + configurations = []; + } const e = await this.#endStep('configuration'); callback(e); @@ -279,9 +317,17 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { return callback(startError); } - const streamError = await this.#streamStep('configuration', configuration); + configurations.push(configuration); + if (configurations.length === chunkSize) { + const streamError = await this.#streamStep('configuration', configurations); - callback(streamError); + if (streamError) { + return callback(streamError); + } + configurations = []; + } + + callback(); }, }); } diff --git a/packages/core/data-transfer/src/strapi/remote/controllers/push.ts b/packages/core/data-transfer/src/strapi/remote/controllers/push.ts index 9181e955ad..a1a0a33324 100644 --- a/packages/core/data-transfer/src/strapi/remote/controllers/push.ts +++ b/packages/core/data-transfer/src/strapi/remote/controllers/push.ts @@ -70,28 +70,37 @@ const createPushController = (options: ILocalStrapiDestinationProviderOptions): }, transfer: { - async entities(entity) { + async entities(entities) { if (!streams.entities) { streams.entities = provider.createEntitiesWriteStream(); } - - await writeAsync(streams.entities, entity); + entities.map(async (entity) => { + if (streams.entities) { + await writeAsync(streams.entities, entity); + } + }); }, - async links(link) { + async links(links) { if (!streams.links) { streams.links = await provider.createLinksWriteStream(); } - - await writeAsync(streams.links, link); + links.map(async (link) => { + if (streams.links) { + await writeAsync(streams.links, link); + } + }); }, - async configuration(config) { + async configuration(configs) { if (!streams.configuration) { streams.configuration = await provider.createConfigurationWriteStream(); } - - await writeAsync(streams.configuration, config); + configs.map(async (config) => { + if (streams.configuration) { + await writeAsync(streams.configuration, config); + } + }); }, async assets(payload) { diff --git a/packages/core/data-transfer/types/remote/protocol/client/transfer/push.d.ts b/packages/core/data-transfer/types/remote/protocol/client/transfer/push.d.ts index cc5fc9b760..a76afd377e 100644 --- a/packages/core/data-transfer/types/remote/protocol/client/transfer/push.d.ts +++ b/packages/core/data-transfer/types/remote/protocol/client/transfer/push.d.ts @@ -3,9 +3,9 @@ import type { IEntity, ILink, IConfiguration, IAsset } from '../../../../common- export type TransferPushMessage = CreateTransferMessage< 'step', - | TransferStepCommands<'entities', IEntity> - | TransferStepCommands<'links', ILink> - | TransferStepCommands<'configuration', IConfiguration> + | TransferStepCommands<'entities', IEntity[]> + | TransferStepCommands<'links', ILink[]> + | TransferStepCommands<'configuration', IConfiguration[]> | TransferStepCommands<'assets', TransferAssetFlow | null> >;