send chunks of data instead of 1 at a time

This commit is contained in:
Bassel 2023-02-20 18:04:39 +02:00
parent 1472a5369c
commit 4c6a4d97dc
3 changed files with 77 additions and 22 deletions

View File

@ -151,7 +151,6 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
} }
const wsProtocol = url.protocol === 'https:' ? 'wss:' : 'ws:'; const wsProtocol = url.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${wsProtocol}//${url.host}${url.pathname}${TRANSFER_PATH}`; const wsUrl = `${wsProtocol}//${url.host}${url.pathname}${TRANSFER_PATH}`;
// No auth defined, trying public access for transfer // No auth defined, trying public access for transfer
if (!auth) { if (!auth) {
ws = new WebSocket(wsUrl); ws = new WebSocket(wsUrl);
@ -215,35 +214,58 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
} }
createEntitiesWriteStream(): Writable { createEntitiesWriteStream(): Writable {
const chunkSize = 100;
let entities: IEntity[] = [];
const startEntitiesTransferOnce = this.#startStepOnce('entities'); const startEntitiesTransferOnce = this.#startStepOnce('entities');
return new Writable({ return new Writable({
objectMode: true, objectMode: true,
final: async (callback) => { final: async (callback) => {
if (entities.length > 0) {
const streamError = await this.#streamStep('entities', entities);
entities = [];
if (streamError) {
return callback(streamError);
}
}
const e = await this.#endStep('entities'); const e = await this.#endStep('entities');
callback(e); callback(e);
}, },
write: async (entity: IEntity, _encoding, callback) => { write: async (entity: IEntity, _encoding, callback) => {
const startError = await startEntitiesTransferOnce(); const startError = await startEntitiesTransferOnce();
if (startError) { if (startError) {
return callback(startError); return callback(startError);
} }
entities.push(entity);
const streamError = await this.#streamStep('entities', entity); if (entities.length === chunkSize) {
const streamError = await this.#streamStep('entities', entities);
callback(streamError); if (streamError) {
return callback(streamError);
}
entities = [];
}
callback();
}, },
}); });
} }
createLinksWriteStream(): Writable { createLinksWriteStream(): Writable {
const chunkSize = 100;
let links: ILink[] = [];
const startLinksTransferOnce = this.#startStepOnce('links'); const startLinksTransferOnce = this.#startStepOnce('links');
return new Writable({ return new Writable({
objectMode: true, objectMode: true,
final: async (callback) => { final: async (callback) => {
if (links.length > 0) {
const streamError = await this.#streamStep('links', links);
if (streamError) {
return callback(streamError);
}
links = [];
}
const e = await this.#endStep('links'); const e = await this.#endStep('links');
callback(e); callback(e);
@ -255,19 +277,35 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
return callback(startError); return callback(startError);
} }
const streamError = await this.#streamStep('links', link); if (links.length === chunkSize) {
const streamError = await this.#streamStep('links', links);
callback(streamError); if (streamError) {
return callback(streamError);
}
links = [];
}
callback();
}, },
}); });
} }
createConfigurationWriteStream(): Writable { createConfigurationWriteStream(): Writable {
const chunkSize = 100;
let configurations: IConfiguration[] = [];
const startConfigurationTransferOnce = this.#startStepOnce('configuration'); const startConfigurationTransferOnce = this.#startStepOnce('configuration');
return new Writable({ return new Writable({
objectMode: true, objectMode: true,
final: async (callback) => { final: async (callback) => {
if (configurations.length > 0) {
const streamError = await this.#streamStep('configuration', configurations);
if (streamError) {
return callback(streamError);
}
configurations = [];
}
const e = await this.#endStep('configuration'); const e = await this.#endStep('configuration');
callback(e); callback(e);
@ -279,9 +317,17 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
return callback(startError); return callback(startError);
} }
const streamError = await this.#streamStep('configuration', configuration); configurations.push(configuration);
if (configurations.length === chunkSize) {
const streamError = await this.#streamStep('configuration', configurations);
callback(streamError); if (streamError) {
return callback(streamError);
}
configurations = [];
}
callback();
}, },
}); });
} }

View File

@ -70,28 +70,37 @@ const createPushController = (options: ILocalStrapiDestinationProviderOptions):
}, },
transfer: { transfer: {
async entities(entity) { async entities(entities) {
if (!streams.entities) { if (!streams.entities) {
streams.entities = provider.createEntitiesWriteStream(); streams.entities = provider.createEntitiesWriteStream();
} }
entities.map(async (entity) => {
await writeAsync(streams.entities, entity); if (streams.entities) {
await writeAsync(streams.entities, entity);
}
});
}, },
async links(link) { async links(links) {
if (!streams.links) { if (!streams.links) {
streams.links = await provider.createLinksWriteStream(); streams.links = await provider.createLinksWriteStream();
} }
links.map(async (link) => {
await writeAsync(streams.links, link); if (streams.links) {
await writeAsync(streams.links, link);
}
});
}, },
async configuration(config) { async configuration(configs) {
if (!streams.configuration) { if (!streams.configuration) {
streams.configuration = await provider.createConfigurationWriteStream(); streams.configuration = await provider.createConfigurationWriteStream();
} }
configs.map(async (config) => {
await writeAsync(streams.configuration, config); if (streams.configuration) {
await writeAsync(streams.configuration, config);
}
});
}, },
async assets(payload) { async assets(payload) {

View File

@ -3,9 +3,9 @@ import type { IEntity, ILink, IConfiguration, IAsset } from '../../../../common-
export type TransferPushMessage = CreateTransferMessage< export type TransferPushMessage = CreateTransferMessage<
'step', 'step',
| TransferStepCommands<'entities', IEntity> | TransferStepCommands<'entities', IEntity[]>
| TransferStepCommands<'links', ILink> | TransferStepCommands<'links', ILink[]>
| TransferStepCommands<'configuration', IConfiguration> | TransferStepCommands<'configuration', IConfiguration[]>
| TransferStepCommands<'assets', TransferAssetFlow | null> | TransferStepCommands<'assets', TransferAssetFlow | null>
>; >;