fix(transport): dispatch messages in separate tasks (#841)

Fixes a bug in our pipe, and the same one in the non-standard `ws` module. Our protocol messages are I/O events, and therefore they should each be executed in their own task.
This commit is contained in:
Joel Einbinder 2020-02-06 14:14:46 -08:00 committed by GitHub
parent 6202ff12fd
commit 126eb505e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 6 deletions

View File

@ -276,6 +276,42 @@ export function fetchUrl(url: string): Promise<string> {
});
}
// See https://joel.tools/microtasks/
export function makeWaitForNextTask() {
assert(isNode, 'Waitng for the next task is only supported in nodejs');
if (parseInt(process.versions.node, 10) >= 11)
return setImmediate;
// Unlike Node 11, Node 10 and less have a bug with Task and MicroTask execution order:
// - https://github.com/nodejs/node/issues/22257
//
// So we can't simply run setImmediate to dispatch code in a following task.
// However, we can run setImmediate from-inside setImmediate to make sure we're getting
// in the following task.
let spinning = false;
const callbacks: (() => void)[] = [];
const loop = () => {
const callback = callbacks.shift();
if (!callback) {
spinning = false;
return;
}
setImmediate(loop);
// Make sure to call callback() as the last thing since it's
// untrusted code that might throw.
callback();
};
return (callback: () => void) => {
callbacks.push(callback);
if (!spinning) {
spinning = true;
setImmediate(loop);
}
};
}
export class WebSocketTransport implements ConnectionTransport {
private _ws: WebSocket;
@ -292,10 +328,19 @@ export class WebSocketTransport implements ConnectionTransport {
this._ws.addEventListener('open', () => fulfill());
this._ws.addEventListener('error', event => reject(new Error(event.toString())));
});
// The 'ws' module in node sometimes sends us multiple messages in a single task.
// In Web, all IO callbacks (e.g. WebSocket callbacks)
// are dispatched into separate tasks, so there's no need
// to do anything extra.
const messageWrap: (cb :() => void) => void = isNode ? makeWaitForNextTask() : cb => cb();
this._ws.addEventListener('message', event => {
if (this.onmessage)
this.onmessage.call(null, event.data);
messageWrap(() => {
if (this.onmessage)
this.onmessage.call(null, event.data);
});
});
this._ws.addEventListener('close', event => {
if (this.onclose)
this.onclose.call(null);

View File

@ -17,11 +17,13 @@
import { debugError, helper, RegisteredListener } from '../helper';
import { ConnectionTransport } from '../transport';
import { makeWaitForNextTask } from '../platform';
export class PipeTransport implements ConnectionTransport {
private _pipeWrite: NodeJS.WritableStream | null;
private _pendingMessage = '';
private _eventListeners: RegisteredListener[];
private _waitForNextTask = makeWaitForNextTask();
onmessage?: (message: string) => void;
onclose?: () => void;
@ -52,14 +54,19 @@ export class PipeTransport implements ConnectionTransport {
return;
}
const message = this._pendingMessage + buffer.toString(undefined, 0, end);
if (this.onmessage)
this.onmessage.call(null, message);
this._waitForNextTask(() => {
if (this.onmessage)
this.onmessage.call(null, message);
});
let start = end + 1;
end = buffer.indexOf('\0', start);
while (end !== -1) {
if (this.onmessage)
this.onmessage.call(null, buffer.toString(undefined, start, end));
const message = buffer.toString(undefined, start, end);
this._waitForNextTask(() => {
if (this.onmessage)
this.onmessage.call(null, message);
});
start = end + 1;
end = buffer.indexOf('\0', start);
}