From 126eb505e8395a88c0386ba10fcebaeec91ac27c Mon Sep 17 00:00:00 2001 From: Joel Einbinder Date: Thu, 6 Feb 2020 14:14:46 -0800 Subject: [PATCH] fix(transport): dispatch messages in separate tasks (#841) Fixes a bug in our pipe, and the same one in the non-standard `ws` module. Our protocol messages are I/O events, and therefore they should each be executed in their own task. --- src/platform.ts | 49 +++++++++++++++++++++++++++++++++++-- src/server/pipeTransport.ts | 15 +++++++++--- 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/src/platform.ts b/src/platform.ts index c4883ad546..8572f20a14 100644 --- a/src/platform.ts +++ b/src/platform.ts @@ -276,6 +276,42 @@ export function fetchUrl(url: string): Promise { }); } +// See https://joel.tools/microtasks/ +export function makeWaitForNextTask() { + assert(isNode, 'Waitng for the next task is only supported in nodejs'); + if (parseInt(process.versions.node, 10) >= 11) + return setImmediate; + + // Unlike Node 11, Node 10 and less have a bug with Task and MicroTask execution order: + // - https://github.com/nodejs/node/issues/22257 + // + // So we can't simply run setImmediate to dispatch code in a following task. + // However, we can run setImmediate from-inside setImmediate to make sure we're getting + // in the following task. + + let spinning = false; + const callbacks: (() => void)[] = []; + const loop = () => { + const callback = callbacks.shift(); + if (!callback) { + spinning = false; + return; + } + setImmediate(loop); + // Make sure to call callback() as the last thing since it's + // untrusted code that might throw. + callback(); + }; + + return (callback: () => void) => { + callbacks.push(callback); + if (!spinning) { + spinning = true; + setImmediate(loop); + } + }; +} + export class WebSocketTransport implements ConnectionTransport { private _ws: WebSocket; @@ -292,10 +328,19 @@ export class WebSocketTransport implements ConnectionTransport { this._ws.addEventListener('open', () => fulfill()); this._ws.addEventListener('error', event => reject(new Error(event.toString()))); }); + // The 'ws' module in node sometimes sends us multiple messages in a single task. + // In Web, all IO callbacks (e.g. WebSocket callbacks) + // are dispatched into separate tasks, so there's no need + // to do anything extra. + const messageWrap: (cb :() => void) => void = isNode ? makeWaitForNextTask() : cb => cb(); + this._ws.addEventListener('message', event => { - if (this.onmessage) - this.onmessage.call(null, event.data); + messageWrap(() => { + if (this.onmessage) + this.onmessage.call(null, event.data); + }); }); + this._ws.addEventListener('close', event => { if (this.onclose) this.onclose.call(null); diff --git a/src/server/pipeTransport.ts b/src/server/pipeTransport.ts index cb1a858e87..76a01375c7 100644 --- a/src/server/pipeTransport.ts +++ b/src/server/pipeTransport.ts @@ -17,11 +17,13 @@ import { debugError, helper, RegisteredListener } from '../helper'; import { ConnectionTransport } from '../transport'; +import { makeWaitForNextTask } from '../platform'; export class PipeTransport implements ConnectionTransport { private _pipeWrite: NodeJS.WritableStream | null; private _pendingMessage = ''; private _eventListeners: RegisteredListener[]; + private _waitForNextTask = makeWaitForNextTask(); onmessage?: (message: string) => void; onclose?: () => void; @@ -52,14 +54,19 @@ export class PipeTransport implements ConnectionTransport { return; } const message = this._pendingMessage + buffer.toString(undefined, 0, end); - if (this.onmessage) - this.onmessage.call(null, message); + this._waitForNextTask(() => { + if (this.onmessage) + this.onmessage.call(null, message); + }); let start = end + 1; end = buffer.indexOf('\0', start); while (end !== -1) { - if (this.onmessage) - this.onmessage.call(null, buffer.toString(undefined, start, end)); + const message = buffer.toString(undefined, start, end); + this._waitForNextTask(() => { + if (this.onmessage) + this.onmessage.call(null, message); + }); start = end + 1; end = buffer.indexOf('\0', start); }