chore: use transport for BrowserType.connect (#12196)

This gives us logging, ECONNRESET error handling and proper cleanup.
This commit is contained in:
Dmitry Gozman 2022-02-17 20:48:14 -08:00 committed by GitHub
parent a98babec69
commit 15043801cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 58 deletions

View File

@ -20,12 +20,12 @@ import * as channels from '../protocol/channels';
import { Dispatcher, DispatcherScope } from './dispatcher'; import { Dispatcher, DispatcherScope } from './dispatcher';
import { BrowserContextDispatcher } from './browserContextDispatcher'; import { BrowserContextDispatcher } from './browserContextDispatcher';
import { CallMetadata } from '../server/instrumentation'; import { CallMetadata } from '../server/instrumentation';
import WebSocket from 'ws';
import { JsonPipeDispatcher } from '../dispatchers/jsonPipeDispatcher'; import { JsonPipeDispatcher } from '../dispatchers/jsonPipeDispatcher';
import { getUserAgent, makeWaitForNextTask } from '../utils/utils'; import { getUserAgent } from '../utils/utils';
import { ManualPromise } from '../utils/async';
import * as socks from '../utils/socksProxy'; import * as socks from '../utils/socksProxy';
import EventEmitter from 'events'; import EventEmitter from 'events';
import { ProgressController } from '../server/progress';
import { WebSocketTransport } from '../server/transport';
export class BrowserTypeDispatcher extends Dispatcher<BrowserType, channels.BrowserTypeChannel> implements channels.BrowserTypeChannel { export class BrowserTypeDispatcher extends Dispatcher<BrowserType, channels.BrowserTypeChannel> implements channels.BrowserTypeChannel {
_type_BrowserType = true; _type_BrowserType = true;
@ -55,54 +55,41 @@ export class BrowserTypeDispatcher extends Dispatcher<BrowserType, channels.Brow
}; };
} }
async connect(params: channels.BrowserTypeConnectParams): Promise<channels.BrowserTypeConnectResult> { async connect(params: channels.BrowserTypeConnectParams, metadata: CallMetadata): Promise<channels.BrowserTypeConnectResult> {
const waitForNextTask = params.slowMo const controller = new ProgressController(metadata, this._object);
? (cb: () => any) => setTimeout(cb, params.slowMo) controller.setLogName('browser');
: makeWaitForNextTask(); return await controller.run(async progress => {
const paramsHeaders = Object.assign({ 'User-Agent': getUserAgent() }, params.headers || {}); const paramsHeaders = Object.assign({ 'User-Agent': getUserAgent() }, params.headers || {});
const ws = new WebSocket(params.wsEndpoint, [], { const transport = await WebSocketTransport.connect(progress, params.wsEndpoint, paramsHeaders, true);
perMessageDeflate: false, let socksInterceptor: SocksInterceptor | undefined;
maxPayload: 256 * 1024 * 1024, // 256Mb, const pipe = new JsonPipeDispatcher(this._scope);
handshakeTimeout: params.timeout, transport.onmessage = json => {
headers: paramsHeaders, if (json.method === '__create__' && json.params.type === 'SocksSupport')
followRedirects: true, socksInterceptor = new SocksInterceptor(transport, params.socksProxyRedirectPortForTest, json.params.guid);
}); if (socksInterceptor?.interceptMessage(json))
let socksInterceptor: SocksInterceptor | undefined; return;
const pipe = new JsonPipeDispatcher(this._scope); const cb = () => {
const openPromise = new ManualPromise<{ pipe: JsonPipeDispatcher }>(); try {
ws.on('open', () => openPromise.resolve({ pipe }));
ws.on('close', () => {
socksInterceptor?.cleanup();
pipe.wasClosed();
});
ws.on('error', error => {
socksInterceptor?.cleanup();
if (openPromise.isDone()) {
pipe.wasClosed(error);
} else {
pipe.dispose();
openPromise.reject(error);
}
});
pipe.on('close', () => {
socksInterceptor?.cleanup();
ws.close();
});
pipe.on('message', message => ws.send(JSON.stringify(message)));
ws.addEventListener('message', event => {
waitForNextTask(() => {
try {
const json = JSON.parse(event.data as string);
if (json.method === '__create__' && json.params.type === 'SocksSupport')
socksInterceptor = new SocksInterceptor(ws, params.socksProxyRedirectPortForTest, json.params.guid);
if (!socksInterceptor?.interceptMessage(json))
pipe.dispatch(json); pipe.dispatch(json);
} catch (e) { } catch (e) {
ws.close(); transport.close();
} }
};
if (params.slowMo)
setTimeout(cb, params.slowMo);
else
cb();
};
pipe.on('message', message => {
transport.send(message);
}); });
}); transport.onclose = () => {
return openPromise; socksInterceptor?.cleanup();
pipe.wasClosed();
};
pipe.on('close', () => transport.close());
return { pipe };
}, params.timeout || 0);
} }
} }
@ -112,7 +99,7 @@ class SocksInterceptor {
private _socksSupportObjectGuid: string; private _socksSupportObjectGuid: string;
private _ids = new Set<number>(); private _ids = new Set<number>();
constructor(ws: WebSocket, redirectPortForTest: number | undefined, socksSupportObjectGuid: string) { constructor(transport: WebSocketTransport, redirectPortForTest: number | undefined, socksSupportObjectGuid: string) {
this._handler = new socks.SocksProxyHandler(redirectPortForTest); this._handler = new socks.SocksProxyHandler(redirectPortForTest);
this._socksSupportObjectGuid = socksSupportObjectGuid; this._socksSupportObjectGuid = socksSupportObjectGuid;
@ -125,7 +112,7 @@ class SocksInterceptor {
try { try {
const id = --lastId; const id = --lastId;
this._ids.add(id); this._ids.add(id);
ws.send(JSON.stringify({ id, guid: socksSupportObjectGuid, method: prop, params, metadata: { stack: [], apiName: '', internal: true } })); transport.send({ id, guid: socksSupportObjectGuid, method: prop, params, metadata: { stack: [], apiName: '', internal: true } } as any);
} catch (e) { } catch (e) {
} }
}; };

View File

@ -52,9 +52,9 @@ export class WebSocketTransport implements ConnectionTransport {
onclose?: () => void; onclose?: () => void;
readonly wsEndpoint: string; readonly wsEndpoint: string;
static async connect(progress: Progress, url: string, headers?: { [key: string]: string; }): Promise<WebSocketTransport> { static async connect(progress: Progress, url: string, headers?: { [key: string]: string; }, followRedirects?: boolean): Promise<WebSocketTransport> {
progress.log(`<ws connecting> ${url}`); progress.log(`<ws connecting> ${url}`);
const transport = new WebSocketTransport(progress, url, headers); const transport = new WebSocketTransport(progress, url, headers, followRedirects);
let success = false; let success = false;
progress.cleanupWhenAborted(async () => { progress.cleanupWhenAborted(async () => {
if (!success) if (!success)
@ -75,13 +75,14 @@ export class WebSocketTransport implements ConnectionTransport {
return transport; return transport;
} }
constructor(progress: Progress, url: string, headers?: { [key: string]: string; }) { constructor(progress: Progress, url: string, headers?: { [key: string]: string; }, followRedirects?: boolean) {
this.wsEndpoint = url; this.wsEndpoint = url;
this._ws = new WebSocket(url, [], { this._ws = new WebSocket(url, [], {
perMessageDeflate: false, perMessageDeflate: false,
maxPayload: 256 * 1024 * 1024, // 256Mb, maxPayload: 256 * 1024 * 1024, // 256Mb,
handshakeTimeout: progress.timeUntilDeadline(), handshakeTimeout: progress.timeUntilDeadline(),
headers headers,
followRedirects,
}); });
this._progress = progress; this._progress = progress;
// The 'ws' module in node sometimes sends us multiple messages in a single task. // The 'ws' module in node sometimes sends us multiple messages in a single task.
@ -102,12 +103,12 @@ export class WebSocketTransport implements ConnectionTransport {
}); });
this._ws.addEventListener('close', event => { this._ws.addEventListener('close', event => {
this._progress && this._progress.log(`<ws disconnected> ${url}`); this._progress && this._progress.log(`<ws disconnected> ${url} code=${event.code} reason=${event.reason}`);
if (this.onclose) if (this.onclose)
this.onclose.call(null); this.onclose.call(null);
}); });
// Prevent Error: read ECONNRESET. // Prevent Error: read ECONNRESET.
this._ws.addEventListener('error', () => {}); this._ws.addEventListener('error', error => this._progress && this._progress.log(`<ws error> ${error}`));
} }
send(message: ProtocolRequest) { send(message: ProtocolRequest) {
@ -120,6 +121,8 @@ export class WebSocketTransport implements ConnectionTransport {
} }
async closeAndWait() { async closeAndWait() {
if (this._ws.readyState === WebSocket.CLOSED)
return;
const promise = new Promise(f => this._ws.once('close', f)); const promise = new Promise(f => this._ws.once('close', f));
this.close(); this.close();
await promise; // Make sure to await the actual disconnect. await promise; // Make sure to await the actual disconnect.

View File

@ -107,7 +107,7 @@ test('should timeout in socket while connecting', async ({ browserType, startRem
wsEndpoint: `ws://localhost:${server.PORT}/ws-slow`, wsEndpoint: `ws://localhost:${server.PORT}/ws-slow`,
timeout: 1000, timeout: 1000,
}).catch(e => e); }).catch(e => e);
expect(e.message).toContain('browserType.connect: Opening handshake has timed out'); expect(e.message).toContain('browserType.connect: Timeout 1000ms exceeded');
}); });
test('should timeout in connect while connecting', async ({ browserType, startRemoteServer, server }) => { test('should timeout in connect while connecting', async ({ browserType, startRemoteServer, server }) => {