diff --git a/packages/core/data-transfer/src/engine/index.ts b/packages/core/data-transfer/src/engine/index.ts index 2f04bce86e..f430d4e65e 100644 --- a/packages/core/data-transfer/src/engine/index.ts +++ b/packages/core/data-transfer/src/engine/index.ts @@ -691,7 +691,7 @@ class TransferEngine< const transform = this.#createStageTransformStream(stage); const tracker = this.#progressTracker(stage, { size: (value: IAsset) => value.stats.size, - key: (value: IAsset) => extname(value.filename) ?? 'NA', + key: (value: IAsset) => extname(value.filename) || 'No extension', }); await this.#transferStage({ stage, source, destination, transform, tracker }); diff --git a/packages/core/data-transfer/src/strapi/providers/local-destination/index.ts b/packages/core/data-transfer/src/strapi/providers/local-destination/index.ts index f732d07165..93a60c1709 100644 --- a/packages/core/data-transfer/src/strapi/providers/local-destination/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/local-destination/index.ts @@ -171,9 +171,14 @@ class LocalStrapiDestinationProvider implements IDestinationProvider { const entryPath = path.join(assetsDirectory, chunk.filename); const writableStream = fse.createWriteStream(entryPath); + console.log('hello from the other side?'); + chunk.stream .pipe(writableStream) - .on('close', callback) + .on('close', () => { + console.log('close/end substream'); + callback(null); + }) .on('error', async (error: NodeJS.ErrnoException) => { const errorMessage = error.code === 'ENOSPC' diff --git a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts index c89c8b18cb..609c5d3d91 100644 --- a/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts +++ b/packages/core/data-transfer/src/strapi/providers/remote-destination/index.ts @@ -5,15 +5,7 @@ import { once } from 'lodash/fp'; import { createDispatcher } from './utils'; -import type { - IDestinationProvider, - IEntity, - ILink, - IMetadata, - ProviderType, - IConfiguration, - IAsset, -} from '../../../../types'; +import type { IDestinationProvider, IMetadata, ProviderType, IAsset } from '../../../../types'; import type { client, server } from '../../../../types/remote/protocol'; import type { ILocalStrapiDestinationProviderOptions } from '../local-destination'; import { TRANSFER_PATH } from '../../remote/constants'; @@ -30,6 +22,8 @@ export interface IRemoteStrapiDestinationProviderOptions auth?: ITransferTokenAuth; } +const jsonLength = (obj: object) => Buffer.byteLength(JSON.stringify(obj)); + class RemoteStrapiDestinationProvider implements IDestinationProvider { name = 'destination::remote-strapi'; @@ -134,6 +128,59 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { return null; } + #writeStream(step: Exclude): Writable { + type Step = typeof step; + + const batchSize = 1024 * 1024; // 1MB; + const startTransferOnce = this.#startStepOnce(step); + + let batch = [] as client.GetTransferPushStreamData; + + const batchLength = () => jsonLength(batch); + + return new Writable({ + objectMode: true, + + final: async (callback) => { + if (batch.length > 0) { + const streamError = await this.#streamStep(step, batch); + + batch = []; + + if (streamError) { + return callback(streamError); + } + } + const e = await this.#endStep(step); + + callback(e); + }, + + write: async (chunk, _encoding, callback) => { + const startError = await startTransferOnce(); + if (startError) { + return callback(startError); + } + + batch.push(chunk); + + if (batchLength() >= batchSize) { + console.log('flushing', batchLength(), '>', batchSize); + console.log(batch.length, 'items at once'); + const streamError = await this.#streamStep(step, batch); + + if (streamError) { + return callback(streamError); + } + + batch = []; + } + + callback(); + }, + }); + } + async bootstrap(): Promise { const { url, auth } = this.options; const validProtocols = ['https:', 'http:']; @@ -214,135 +261,45 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { } createEntitiesWriteStream(): Writable { - const chunkSize = 100; - let entities: IEntity[] = []; - const startEntitiesTransferOnce = this.#startStepOnce('entities'); - - return new Writable({ - objectMode: true, - final: async (callback) => { - if (entities.length > 0) { - const streamError = await this.#streamStep('entities', entities); - entities = []; - if (streamError) { - return callback(streamError); - } - } - const e = await this.#endStep('entities'); - - callback(e); - }, - write: async (entity: IEntity, _encoding, callback) => { - const startError = await startEntitiesTransferOnce(); - if (startError) { - return callback(startError); - } - entities.push(entity); - if (entities.length === chunkSize) { - const streamError = await this.#streamStep('entities', entities); - if (streamError) { - return callback(streamError); - } - entities = []; - } - callback(); - }, - }); + return this.#writeStream('entities'); } createLinksWriteStream(): Writable { - const chunkSize = 100; - let links: ILink[] = []; - const startLinksTransferOnce = this.#startStepOnce('links'); - - return new Writable({ - objectMode: true, - final: async (callback) => { - if (links.length > 0) { - const streamError = await this.#streamStep('links', links); - - if (streamError) { - return callback(streamError); - } - links = []; - } - const e = await this.#endStep('links'); - - callback(e); - }, - write: async (link: ILink, _encoding, callback) => { - const startError = await startLinksTransferOnce(); - - if (startError) { - return callback(startError); - } - - if (links.length === chunkSize) { - const streamError = await this.#streamStep('links', links); - - if (streamError) { - return callback(streamError); - } - links = []; - } - callback(); - }, - }); + return this.#writeStream('links'); } createConfigurationWriteStream(): Writable { - const chunkSize = 100; - let configurations: IConfiguration[] = []; - const startConfigurationTransferOnce = this.#startStepOnce('configuration'); - - return new Writable({ - objectMode: true, - final: async (callback) => { - if (configurations.length > 0) { - const streamError = await this.#streamStep('configuration', configurations); - - if (streamError) { - return callback(streamError); - } - configurations = []; - } - const e = await this.#endStep('configuration'); - - callback(e); - }, - write: async (configuration: IConfiguration, _encoding, callback) => { - const startError = await startConfigurationTransferOnce(); - - if (startError) { - return callback(startError); - } - - configurations.push(configuration); - if (configurations.length === chunkSize) { - const streamError = await this.#streamStep('configuration', configurations); - - if (streamError) { - return callback(streamError); - } - configurations = []; - } - - callback(); - }, - }); + return this.#writeStream('configuration'); } createAssetsWriteStream(): Writable | Promise { let batch: client.TransferAssetFlow[] = []; - const batchSize = 100; + const batchSize = 1024 * 1024; // 1MB; + const batchLength = () => { + return batch.reduce( + (acc, chunk) => (chunk.action === 'stream' ? acc + chunk.data.byteLength : acc), + 0 + ); + }; const startAssetsTransferOnce = this.#startStepOnce('assets'); + const flush = async () => { + await this.#streamStep('assets', batch); + batch = []; + }; + + const safePush = async (chunk: client.TransferAssetFlow) => { + batch.push(chunk); + if (batchLength() >= batchSize) { + await flush(); + } + }; + return new Writable({ objectMode: true, final: async (callback) => { if (batch.length > 0) { - await this.#streamStep('assets', batch); - batch = []; + await flush(); } // TODO: replace this stream call by an end call const endError = await this.#streamStep('assets', null); @@ -360,41 +317,23 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider { return callback(null); }, - write: async (asset: IAsset, _encoding, callback) => { + async write(asset: IAsset, _encoding, callback) { const startError = await startAssetsTransferOnce(); if (startError) { return callback(startError); } - const { filename, filepath, stats, stream } = asset; const assetID = v4(); - batch.push({ - action: 'start', - assetID, - data: { filename, filepath, stats }, - }); - if (batch.length === batchSize) { - await this.#streamStep('assets', batch); - batch = []; - } + const { filename, filepath, stats, stream } = asset; + + await safePush({ action: 'start', assetID, data: { filename, filepath, stats } }); for await (const chunk of stream) { - batch.push({ action: 'stream', assetID, data: chunk }); - if (batch.length === batchSize) { - await this.#streamStep('assets', batch); - batch = []; - } + await safePush({ action: 'stream', assetID, data: chunk }); } - batch.push({ - action: 'end', - assetID, - }); - if (batch.length === batchSize) { - await this.#streamStep('assets', batch); - batch = []; - } + await safePush({ action: 'end', assetID }); callback(); }, diff --git a/packages/core/data-transfer/src/strapi/remote/controllers/push.ts b/packages/core/data-transfer/src/strapi/remote/controllers/push.ts index 3dd8bbcb2e..bc3e966298 100644 --- a/packages/core/data-transfer/src/strapi/remote/controllers/push.ts +++ b/packages/core/data-transfer/src/strapi/remote/controllers/push.ts @@ -109,11 +109,16 @@ const createPushController = (options: ILocalStrapiDestinationProviderOptions): streams.assets?.end(); return; } + if (!streams.assets) { streams.assets = await provider.createAssetsWriteStream(); } - payloads.forEach(async (payload) => { + for (const payload of payloads) { + if (streams.assets.closed) { + return; + } + const { action, assetID } = payload; if (action === 'start' && streams.assets) { @@ -139,11 +144,15 @@ const createPushController = (options: ILocalStrapiDestinationProviderOptions): delete assets[assetID]; resolve(); }) - .on('error', reject) + .on('error', (e) => { + reject(e); + }) .end(); }); } - }); + } + + console.log('[assets] done'); }, }, }; diff --git a/packages/core/data-transfer/src/strapi/remote/handlers.ts b/packages/core/data-transfer/src/strapi/remote/handlers.ts index cf8c33c0d7..e59328170c 100644 --- a/packages/core/data-transfer/src/strapi/remote/handlers.ts +++ b/packages/core/data-transfer/src/strapi/remote/handlers.ts @@ -13,7 +13,7 @@ import { ProviderTransferError, ProviderInitializationError } from '../../errors import { TRANSFER_METHODS } from './constants'; import { createFlow, DEFAULT_TRANSFER_FLOW } from './flows'; -type TransferMethod = typeof TRANSFER_METHODS[number]; +type TransferMethod = (typeof TRANSFER_METHODS)[number]; interface ITransferState { transfer?: {