refactor assets streaming

This commit is contained in:
Bassel 2023-02-23 15:25:19 +02:00
parent 1c5fd3d1b1
commit e40e411433
3 changed files with 50 additions and 56 deletions

View File

@ -333,13 +333,7 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
} }
createAssetsWriteStream(): Writable | Promise<Writable> { createAssetsWriteStream(): Writable | Promise<Writable> {
type TransferAssetFlow = { let batch: any[] = [];
assetID: string;
action: 'stream';
data: Omit<IAsset, 'stream'>;
chunk: Buffer;
};
let batch: TransferAssetFlow[] = [];
const batchSize = 100; const batchSize = 100;
const startAssetsTransferOnce = this.#startStepOnce('assets'); const startAssetsTransferOnce = this.#startStepOnce('assets');
@ -375,23 +369,33 @@ class RemoteStrapiDestinationProvider implements IDestinationProvider {
const { filename, filepath, stats, stream } = asset; const { filename, filepath, stats, stream } = asset;
const assetID = v4(); const assetID = v4();
for await (const chunk of stream) {
batch.push({ batch.push({
action: 'stream', action: 'start',
assetID, assetID,
data: { filename, filepath, stats }, data: { filename, filepath, stats },
chunk,
}); });
if (batch.length === batchSize) { if (batch.length === batchSize) {
await this.#streamStep('assets', batch); await this.#streamStep('assets', batch);
batch = []; batch = [];
} }
}
for await (const chunk of stream) {
batch.push({ action: 'stream', assetID, data: chunk });
if (batch.length === batchSize) { if (batch.length === batchSize) {
await this.#streamStep('assets', batch); await this.#streamStep('assets', batch);
batch = []; batch = [];
} }
}
batch.push({
action: 'end',
assetID,
});
if (batch.length === batchSize) {
await this.#streamStep('assets', batch);
batch = [];
}
callback(); callback();
}, },
}); });

View File

@ -103,54 +103,45 @@ const createPushController = (options: ILocalStrapiDestinationProviderOptions):
}); });
}, },
async assets(payload) { async assets(payloads) {
// TODO: close the stream upong receiving an 'end' event instead // TODO: close the stream upong receiving an 'end' event instead
if (payload === null) { if (payloads === null) {
const assetsKeys = Object.keys(assets);
const previousAssetId = assetsKeys[assetsKeys.length - 1];
const { stream } = assets[previousAssetId];
stream
.on('close', () => {
delete assets[previousAssetId];
})
.on('error', (error) => {
throw new Error(`error while closing stream ${error}`);
})
.end();
streams.assets?.end(); streams.assets?.end();
return; return;
} }
if (!streams.assets) { if (!streams.assets) {
streams.assets = await provider.createAssetsWriteStream(); streams.assets = await provider.createAssetsWriteStream();
} }
payload.map(async (asset) => { payloads.forEach(async (payload) => {
const { action, assetID, data, chunk } = asset; const { action, assetID } = payload;
const assetsKeys = Object.keys(assets);
if (action === 'stream') {
if (!assets[assetID] && assetsKeys.length !== 0) {
const previousAssetId = assetsKeys[assetsKeys.length - 1];
const { stream } = assets[previousAssetId];
stream
.on('close', () => {
delete assets[previousAssetId];
})
.on('error', (error) => {
throw new Error(`error while closing stream ${error}`);
})
.end();
}
if (!assets[assetID] && streams.assets) { if (action === 'start' && streams.assets) {
assets[assetID] = { ...data, stream: new PassThrough() }; assets[assetID] = { ...payload.data, stream: new PassThrough() };
writeAsync(streams.assets, assets[assetID]); writeAsync(streams.assets, assets[assetID]);
} }
if (action === 'stream') {
// The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array } // The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array }
// We need to transform it back into a Buffer instance // We need to transform it back into a Buffer instance
const rawBuffer = chunk as unknown as { type: 'Buffer'; data: Uint8Array }; const rawBuffer = payload.data as unknown as { type: 'Buffer'; data: Uint8Array };
const buffer = Buffer.from(rawBuffer.data); const chunk = Buffer.from(rawBuffer.data);
await writeAsync(assets[assetID].stream, buffer);
await writeAsync(assets[assetID].stream, chunk);
}
if (action === 'end') {
await new Promise<void>((resolve, reject) => {
const { stream } = assets[assetID];
stream
.on('close', () => {
delete assets[assetID];
resolve();
})
.on('error', reject)
.end();
});
} }
}); });
}, },

View File

@ -24,9 +24,8 @@ type TransferStepCommands<T extends string, U> = { step: T } & TransferStepFlow<
type TransferStepFlow<U> = { action: 'start' } | { action: 'stream'; data: U } | { action: 'end' }; type TransferStepFlow<U> = { action: 'start' } | { action: 'stream'; data: U } | { action: 'end' };
type TransferAssetFlow = { type TransferAssetFlow = { assetID: string } & (
assetID: string; | { action: 'start'; data: Omit<IAsset, 'stream'> }
action: 'stream'; | { action: 'stream'; data: Buffer }
data: Omit<IAsset, 'stream'>; | { action: 'end' }
chunk: Buffer; );
};