Optimize assets transfer

This commit is contained in:
Christian Capeans 2023-06-28 09:30:04 +02:00
parent 0e83bc28d9
commit 6359676a9b
2 changed files with 74 additions and 25 deletions

View File

@ -1,5 +1,5 @@
import type { Schema, Utils } from '@strapi/strapi';
import { PassThrough, Readable } from 'stream';
import { PassThrough, Readable, Writable } from 'stream';
import { WebSocket } from 'ws';
import type {
@ -8,6 +8,7 @@ import type {
ISourceProvider,
ISourceProviderTransferResults,
MaybePromise,
Protocol,
ProviderType,
TransferStage,
} from '../../../../types';
@ -107,36 +108,58 @@ class RemoteStrapiSourceProvider implements ISourceProvider {
return this.#createStageReadStream('links');
}
writeAsync = <T>(stream: Writable, data: T) => {
return new Promise<void>((resolve, reject) => {
stream.write(data, (error) => {
if (error) {
reject(error);
}
resolve();
});
});
};
async createAssetsReadStream(): Promise<Readable> {
const assets: { [filename: string]: Readable } = {};
const assets: {
[filename: string]: IAsset & {
stream: PassThrough;
};
} = {};
const stream = await this.#createStageReadStream('assets');
const pass = new PassThrough({ objectMode: true });
stream
.on(
'data',
(asset: Omit<IAsset, 'stream'> & { chunk: { type: 'Buffer'; data: Uint8Array } }) => {
const { chunk, ...rest } = asset;
if (!(asset.filename in assets)) {
const assetStream = new PassThrough();
assets[asset.filename] = assetStream;
pass.push({ ...rest, stream: assetStream });
.on('data', async (payload: Protocol.Client.TransferAssetFlow[]) => {
for (const item of payload) {
const { action } = item;
if (action === 'start') {
assets[item.assetID] = { ...item.data, stream: new PassThrough() };
await this.writeAsync(pass, assets[item.assetID]);
}
if (action === 'stream') {
const rawBuffer = item.data as unknown as {
type: 'Buffer';
data: Uint8Array;
};
const chunk = Buffer.from(rawBuffer.data);
if (asset.filename in assets) {
// The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array }
// We need to transform it back into a Buffer instance
assets[asset.filename].push(Buffer.from(chunk.data));
await this.writeAsync(assets[item.assetID].stream, chunk);
}
if (action === 'end') {
await new Promise<void>((resolve, reject) => {
const { stream: assetStream } = assets[item.assetID];
assetStream
.on('close', () => {
delete assets[item.assetID];
resolve();
})
.on('error', reject)
.end();
});
}
}
)
.on('end', () => {
Object.values(assets).forEach((s) => {
s.push(null);
});
})
.on('close', () => {
pass.end();

View File

@ -134,7 +134,6 @@ export const createPullController = handlerControllerFactory<Partial<PullHandler
return this.provider?.[action]();
},
// TODO: Optimize performances (batching, client packets reconstruction, etc...)
async flush(this: PullHandler, stage: Exclude<Client.TransferPullStep, 'assets'>, id) {
type Stage = typeof stage;
const batchSize = 1024 * 1024;
@ -186,7 +185,6 @@ export const createPullController = handlerControllerFactory<Partial<PullHandler
const flushUUID = randomUUID();
await this.createReadableStreamForStep(step);
console.log('flushing', step);
this.flush(step, flushUUID);
return { ok: true, id: flushUUID };
@ -214,18 +212,46 @@ export const createPullController = handlerControllerFactory<Partial<PullHandler
configuration: () => this.provider?.createConfigurationReadStream(),
assets: () => {
const assets = this.provider?.createAssetsReadStream();
let batch: Protocol.Client.TransferAssetFlow[] = [];
const batchLength = () => {
return batch.reduce(
(acc, chunk) => (chunk.action === 'stream' ? acc + chunk.data.byteLength : acc),
0
);
};
const batchSize = 1024 * 1024; // 1MB
if (!assets) {
throw new Error('bad');
}
async function* generator(stream: Readable) {
let hasStarted = false;
let assetID = '';
for await (const chunk of stream) {
const { stream: assetStream, ...rest } = chunk as IAsset;
const { stream: assetStream, ...assetData } = chunk as IAsset;
if (!hasStarted) {
assetID = randomUUID();
batch.push({ action: 'start', assetID, data: assetData });
hasStarted = true;
}
for await (const assetChunk of assetStream) {
yield { ...rest, chunk: assetChunk };
batch.push({ action: 'stream', assetID, data: assetChunk });
if (batchLength() >= batchSize) {
yield batch;
batch = [];
}
}
// end
hasStarted = false;
batch.push({ action: 'end', assetID });
yield batch;
batch = [];
}
}