Batching using the amount of bytes

Co-authored-by: Jean-Sébastien Herbaux <Convly@users.noreply.github.com>
This commit is contained in:
Bassel 2023-02-24 13:31:58 +02:00
parent db1c010f5b
commit 2bea33bf67
5 changed files with 105 additions and 152 deletions

View File

@ -691,7 +691,7 @@ class TransferEngine<
const transform = this.#createStageTransformStream(stage);
const tracker = this.#progressTracker(stage, {
size: (value: IAsset) => value.stats.size,
key: (value: IAsset) => extname(value.filename) ?? 'NA',
key: (value: IAsset) => extname(value.filename) || 'No extension',
});
await this.#transferStage({ stage, source, destination, transform, tracker });

View File

@ -171,9 +171,14 @@ class LocalStrapiDestinationProvider implements IDestinationProvider {
const entryPath = path.join(assetsDirectory, chunk.filename);
const writableStream = fse.createWriteStream(entryPath);
console.log('hello from the other side?');
chunk.stream
.pipe(writableStream)
.on('close', callback)
.on('close', () => {
console.log('close/end substream');
callback(null);
})
.on('error', async (error: NodeJS.ErrnoException) => {
const errorMessage =
error.code === 'ENOSPC'

View File

@ -5,15 +5,7 @@ import { once } from 'lodash/fp';
import { createDispatcher } from './utils';
import type {
IDestinationProvider,
IEntity,
ILink,
IMetadata,
ProviderType,
IConfiguration,
IAsset,
} from '../../../../types';
import type { IDestinationProvider, IMetadata, ProviderType, IAsset } from '../../../../types';
import type { client, server } from '../../../../types/remote/protocol';
import type { ILocalStrapiDestinationProviderOptions } from '../local-destination';
import { TRANSFER_PATH } from '../../remote/constants';
@ -30,6 +22,8 @@ export interface IRemoteStrapiDestinationProviderOptions
auth?: ITransferTokenAuth;
}
const jsonLength = (obj: object) => Buffer.byteLength(JSON.stringify(obj));
class RemoteStrapiDestinationProvider implements IDestinationProvider {
name = 'destination::remote-strapi';
@ -134,6 +128,59 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
return null;
}
#writeStream(step: Exclude<client.TransferPushStep, 'assets'>): Writable {
type Step = typeof step;
const batchSize = 1024 * 1024; // 1MB;
const startTransferOnce = this.#startStepOnce(step);
let batch = [] as client.GetTransferPushStreamData<Step>;
const batchLength = () => jsonLength(batch);
return new Writable({
objectMode: true,
final: async (callback) => {
if (batch.length > 0) {
const streamError = await this.#streamStep(step, batch);
batch = [];
if (streamError) {
return callback(streamError);
}
}
const e = await this.#endStep(step);
callback(e);
},
write: async (chunk, _encoding, callback) => {
const startError = await startTransferOnce();
if (startError) {
return callback(startError);
}
batch.push(chunk);
if (batchLength() >= batchSize) {
console.log('flushing', batchLength(), '>', batchSize);
console.log(batch.length, 'items at once');
const streamError = await this.#streamStep(step, batch);
if (streamError) {
return callback(streamError);
}
batch = [];
}
callback();
},
});
}
async bootstrap(): Promise<void> {
const { url, auth } = this.options;
const validProtocols = ['https:', 'http:'];
@ -214,135 +261,45 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
}
createEntitiesWriteStream(): Writable {
const chunkSize = 100;
let entities: IEntity[] = [];
const startEntitiesTransferOnce = this.#startStepOnce('entities');
return new Writable({
objectMode: true,
final: async (callback) => {
if (entities.length > 0) {
const streamError = await this.#streamStep('entities', entities);
entities = [];
if (streamError) {
return callback(streamError);
}
}
const e = await this.#endStep('entities');
callback(e);
},
write: async (entity: IEntity, _encoding, callback) => {
const startError = await startEntitiesTransferOnce();
if (startError) {
return callback(startError);
}
entities.push(entity);
if (entities.length === chunkSize) {
const streamError = await this.#streamStep('entities', entities);
if (streamError) {
return callback(streamError);
}
entities = [];
}
callback();
},
});
return this.#writeStream('entities');
}
createLinksWriteStream(): Writable {
const chunkSize = 100;
let links: ILink[] = [];
const startLinksTransferOnce = this.#startStepOnce('links');
return new Writable({
objectMode: true,
final: async (callback) => {
if (links.length > 0) {
const streamError = await this.#streamStep('links', links);
if (streamError) {
return callback(streamError);
}
links = [];
}
const e = await this.#endStep('links');
callback(e);
},
write: async (link: ILink, _encoding, callback) => {
const startError = await startLinksTransferOnce();
if (startError) {
return callback(startError);
}
if (links.length === chunkSize) {
const streamError = await this.#streamStep('links', links);
if (streamError) {
return callback(streamError);
}
links = [];
}
callback();
},
});
return this.#writeStream('links');
}
createConfigurationWriteStream(): Writable {
const chunkSize = 100;
let configurations: IConfiguration[] = [];
const startConfigurationTransferOnce = this.#startStepOnce('configuration');
return new Writable({
objectMode: true,
final: async (callback) => {
if (configurations.length > 0) {
const streamError = await this.#streamStep('configuration', configurations);
if (streamError) {
return callback(streamError);
}
configurations = [];
}
const e = await this.#endStep('configuration');
callback(e);
},
write: async (configuration: IConfiguration, _encoding, callback) => {
const startError = await startConfigurationTransferOnce();
if (startError) {
return callback(startError);
}
configurations.push(configuration);
if (configurations.length === chunkSize) {
const streamError = await this.#streamStep('configuration', configurations);
if (streamError) {
return callback(streamError);
}
configurations = [];
}
callback();
},
});
return this.#writeStream('configuration');
}
createAssetsWriteStream(): Writable | Promise<Writable> {
let batch: client.TransferAssetFlow[] = [];
const batchSize = 100;
const batchSize = 1024 * 1024; // 1MB;
const batchLength = () => {
return batch.reduce(
(acc, chunk) => (chunk.action === 'stream' ? acc + chunk.data.byteLength : acc),
0
);
};
const startAssetsTransferOnce = this.#startStepOnce('assets');
const flush = async () => {
await this.#streamStep('assets', batch);
batch = [];
};
const safePush = async (chunk: client.TransferAssetFlow) => {
batch.push(chunk);
if (batchLength() >= batchSize) {
await flush();
}
};
return new Writable({
objectMode: true,
final: async (callback) => {
if (batch.length > 0) {
await this.#streamStep('assets', batch);
batch = [];
await flush();
}
// TODO: replace this stream call by an end call
const endError = await this.#streamStep('assets', null);
@ -360,41 +317,23 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
return callback(null);
},
write: async (asset: IAsset, _encoding, callback) => {
async write(asset: IAsset, _encoding, callback) {
const startError = await startAssetsTransferOnce();
if (startError) {
return callback(startError);
}
const { filename, filepath, stats, stream } = asset;
const assetID = v4();
batch.push({
action: 'start',
assetID,
data: { filename, filepath, stats },
});
if (batch.length === batchSize) {
await this.#streamStep('assets', batch);
batch = [];
}
const { filename, filepath, stats, stream } = asset;
await safePush({ action: 'start', assetID, data: { filename, filepath, stats } });
for await (const chunk of stream) {
batch.push({ action: 'stream', assetID, data: chunk });
if (batch.length === batchSize) {
await this.#streamStep('assets', batch);
batch = [];
}
await safePush({ action: 'stream', assetID, data: chunk });
}
batch.push({
action: 'end',
assetID,
});
if (batch.length === batchSize) {
await this.#streamStep('assets', batch);
batch = [];
}
await safePush({ action: 'end', assetID });
callback();
},

View File

@ -109,11 +109,16 @@ const createPushController = (options: ILocalStrapiDestinationProviderOptions):
streams.assets?.end();
return;
}
if (!streams.assets) {
streams.assets = await provider.createAssetsWriteStream();
}
payloads.forEach(async (payload) => {
for (const payload of payloads) {
if (streams.assets.closed) {
return;
}
const { action, assetID } = payload;
if (action === 'start' && streams.assets) {
@ -139,11 +144,15 @@ const createPushController = (options: ILocalStrapiDestinationProviderOptions):
delete assets[assetID];
resolve();
})
.on('error', reject)
.on('error', (e) => {
reject(e);
})
.end();
});
}
});
}
console.log('[assets] done');
},
},
};

View File

@ -13,7 +13,7 @@ import { ProviderTransferError, ProviderInitializationError } from '../../errors
import { TRANSFER_METHODS } from './constants';
import { createFlow, DEFAULT_TRANSFER_FLOW } from './flows';
type TransferMethod = typeof TRANSFER_METHODS[number];
type TransferMethod = (typeof TRANSFER_METHODS)[number];
interface ITransferState {
transfer?: {