Merge pull request #17122 from strapi/fix/dts-pull-hang

This commit is contained in:
Jean-Sébastien Herbaux 2023-07-03 17:20:54 +02:00 committed by GitHub
commit 8c7ea46ccf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 137 additions and 33 deletions

View File

@ -38,7 +38,7 @@ Used for switching between stages of a transfer and streaming the actual data of
Accepts the following `action` values:
- `start`: sent with a `step` value for the name of the step/stage
- any number of `stream`: sent with a `step` value and the `data` being sent (ie, an entity)
- any number of `stream`: sent with a `step` value and the `data` being sent (ie, an array of entities)
- `end`: sent with a `step` value for the step being ended
### dispatchTransferAction

View File

@ -1,6 +1,7 @@
import type { Schema, Utils } from '@strapi/strapi';
import { PassThrough, Readable } from 'stream';
import { PassThrough, Readable, Writable } from 'stream';
import { WebSocket } from 'ws';
import { castArray } from 'lodash/fp';
import type {
IAsset,
@ -8,6 +9,7 @@ import type {
ISourceProvider,
ISourceProviderTransferResults,
MaybePromise,
Protocol,
ProviderType,
TransferStage,
} from '../../../../types';
@ -83,7 +85,11 @@ class RemoteStrapiSourceProvider implements ISourceProvider {
return;
}
stream.push(data);
// if we get a single items instead of a batch
// TODO V5: in v5 only allow array
for (const item of castArray(data)) {
stream.push(item);
}
this.ws?.once('message', listener);
@ -103,36 +109,67 @@ class RemoteStrapiSourceProvider implements ISourceProvider {
return this.#createStageReadStream('links');
}
writeAsync = <T>(stream: Writable, data: T) => {
return new Promise<void>((resolve, reject) => {
stream.write(data, (error) => {
if (error) {
reject(error);
}
resolve();
});
});
};
async createAssetsReadStream(): Promise<Readable> {
const assets: { [filename: string]: Readable } = {};
const assets: {
[filename: string]: IAsset & {
stream: PassThrough;
};
} = {};
const stream = await this.#createStageReadStream('assets');
const pass = new PassThrough({ objectMode: true });
stream
.on(
'data',
(asset: Omit<IAsset, 'stream'> & { chunk: { type: 'Buffer'; data: Uint8Array } }) => {
const { chunk, ...rest } = asset;
.on('data', async (payload: Protocol.Client.TransferAssetFlow[]) => {
for (const item of payload) {
const { action } = item;
if (!(asset.filename in assets)) {
const assetStream = new PassThrough();
assets[asset.filename] = assetStream;
pass.push({ ...rest, stream: assetStream });
// Creates the stream to send the incoming asset through
if (action === 'start') {
// Each asset has its own stream identified by its assetID
assets[item.assetID] = { ...item.data, stream: new PassThrough() };
await this.writeAsync(pass, assets[item.assetID]);
}
if (asset.filename in assets) {
// The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array }
// We need to transform it back into a Buffer instance
assets[asset.filename].push(Buffer.from(chunk.data));
// Writes the asset data to the created stream
else if (action === 'stream') {
// Converts data into buffer
const rawBuffer = item.data as unknown as {
type: 'Buffer';
data: Uint8Array;
};
const chunk = Buffer.from(rawBuffer.data);
await this.writeAsync(assets[item.assetID].stream, chunk);
}
// The asset has been transferred
else if (action === 'end') {
await new Promise<void>((resolve, reject) => {
const { stream: assetStream } = assets[item.assetID];
assetStream
.on('close', () => {
// Deletes the stream for the asset
delete assets[item.assetID];
resolve();
})
.on('error', reject)
.end();
});
}
}
)
.on('end', () => {
Object.values(assets).forEach((s) => {
s.push(null);
});
})
.on('close', () => {
pass.end();

View File

@ -6,6 +6,7 @@ import { handlerControllerFactory, isDataTransferMessage } from './utils';
import { createLocalStrapiSourceProvider, ILocalStrapiSourceProvider } from '../../providers';
import { ProviderTransferError } from '../../../errors/providers';
import type { IAsset, TransferStage, Protocol } from '../../../../types';
import { Client } from '../../../../types/remote/protocol';
const TRANSFER_KIND = 'pull';
const VALID_TRANSFER_ACTIONS = ['bootstrap', 'close', 'getMetadata', 'getSchemas'] as const;
@ -133,19 +134,40 @@ export const createPullController = handlerControllerFactory<Partial<PullHandler
return this.provider?.[action]();
},
// TODO: Optimize performances (batching, client packets reconstruction, etc...)
async flush(this: PullHandler, stage: TransferStage, id) {
async flush(this: PullHandler, stage: Exclude<Client.TransferPullStep, 'assets'>, id) {
type Stage = typeof stage;
const batchSize = 1024 * 1024;
let batch = [] as Client.GetTransferPullStreamData<Stage>;
const stream = this.streams?.[stage];
const batchLength = () => Buffer.byteLength(JSON.stringify(batch));
const sendBatch = async () => {
await this.confirm({
type: 'transfer',
data: batch,
ended: false,
error: null,
id,
});
};
if (!stream) {
throw new ProviderTransferError(`No available stream found for ${stage}`);
}
try {
for await (const chunk of stream) {
await this.confirm({ type: 'transfer', data: chunk, ended: false, error: null, id });
batch.push(chunk);
if (batchLength() >= batchSize) {
await sendBatch();
batch = [];
}
}
if (batch.length > 0) {
await sendBatch();
batch = [];
}
await this.confirm({ type: 'transfer', data: null, ended: true, error: null, id });
} catch (e) {
await this.confirm({ type: 'transfer', data: null, ended: true, error: e, id });
@ -163,7 +185,6 @@ export const createPullController = handlerControllerFactory<Partial<PullHandler
const flushUUID = randomUUID();
await this.createReadableStreamForStep(step);
this.flush(step, flushUUID);
return { ok: true, id: flushUUID };
@ -191,18 +212,55 @@ export const createPullController = handlerControllerFactory<Partial<PullHandler
configuration: () => this.provider?.createConfigurationReadStream(),
assets: () => {
const assets = this.provider?.createAssetsReadStream();
let batch: Protocol.Client.TransferAssetFlow[] = [];
const batchLength = () => {
return batch.reduce(
(acc, chunk) => (chunk.action === 'stream' ? acc + chunk.data.byteLength : acc),
0
);
};
const BATCH_MAX_SIZE = 1024 * 1024; // 1MB
if (!assets) {
throw new Error('bad');
}
/**
* Generates batches of 1MB of data from the assets stream to avoid
* sending too many small chunks
*
* @param stream Assets stream from the local source provider
*/
async function* generator(stream: Readable) {
let hasStarted = false;
let assetID = '';
for await (const chunk of stream) {
const { stream: assetStream, ...rest } = chunk as IAsset;
const { stream: assetStream, ...assetData } = chunk as IAsset;
if (!hasStarted) {
assetID = randomUUID();
// Start the transfer of a new asset
batch.push({ action: 'start', assetID, data: assetData });
hasStarted = true;
}
for await (const assetChunk of assetStream) {
yield { ...rest, chunk: assetChunk };
// Add the asset data to the batch
batch.push({ action: 'stream', assetID, data: assetChunk });
// if the batch size is bigger than BATCH_MAX_SIZE stream the batch
if (batchLength() >= BATCH_MAX_SIZE) {
yield batch;
batch = [];
}
}
// All the asset data has been streamed and gets ready for the next one
hasStarted = false;
batch.push({ action: 'end', assetID });
yield batch;
batch = [];
}
}

View File

@ -3,14 +3,23 @@ import { CreateTransferMessage, TransferAssetFlow } from './utils';
export type TransferPullMessage = CreateTransferMessage<
'step',
| TransferStepCommands<'entities', IEntity>
| TransferStepCommands<'links', ILink>
| TransferStepCommands<'configuration', IConfiguration>
| TransferStepCommands<'assets', TransferAssetFlow | null>
| TransferStepCommands<'entities', IEntity[]>
| TransferStepCommands<'links', ILink[]>
| TransferStepCommands<'configuration', IConfiguration[]>
| TransferStepCommands<'assets', TransferAssetFlow[] | null>
>;
export type TransferPullStep = TransferPullMessage['step'];
export type GetTransferPullStreamData<T extends TransferPullStep> = {
[key in TransferPullStep]: {
action: 'stream';
step: key;
} & TransferPullMessage;
}[T] extends { data: infer U }
? U
: never;
type TransferStepCommands<T extends string, U> = { step: T } & TransferStepFlow<U>;
type TransferStepFlow<U> = { action: 'start' } | { action: 'stream'; data: U } | { action: 'end' };