fix: make Transport.send() synchronous (#1177)

This commit is contained in:
Yury Semikhatsky 2020-03-02 13:51:32 -08:00 committed by GitHub
parent 5bd6e4970b
commit f242e0c74f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 83 additions and 82 deletions

View File

@ -140,12 +140,12 @@ export class CRSession extends platform.EventEmitter {
this.once = super.once; this.once = super.once;
} }
send<T extends keyof Protocol.CommandParameters>( async send<T extends keyof Protocol.CommandParameters>(
method: T, method: T,
params?: Protocol.CommandParameters[T] params?: Protocol.CommandParameters[T]
): Promise<Protocol.CommandReturnValues[T]> { ): Promise<Protocol.CommandReturnValues[T]> {
if (!this._connection) if (!this._connection)
return Promise.reject(new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`)); throw new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`);
const id = this._connection._rawSend(this._sessionId, { method, params }); const id = this._connection._rawSend(this._sessionId, { method, params });
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this._callbacks.set(id, {resolve, reject, error: new Error(), method}); this._callbacks.set(id, {resolve, reject, error: new Error(), method});

View File

@ -72,22 +72,15 @@ export class CRExecutionContext implements js.ExecutionContextDelegate {
throw new Error('Passed function is not well-serializable!'); throw new Error('Passed function is not well-serializable!');
} }
} }
let callFunctionOnPromise;
try { const { exceptionDetails, result: remoteObject } = await this._client.send('Runtime.callFunctionOn', {
callFunctionOnPromise = this._client.send('Runtime.callFunctionOn', { functionDeclaration: functionText + '\n' + suffix + '\n',
functionDeclaration: functionText + '\n' + suffix + '\n', executionContextId: this._contextId,
executionContextId: this._contextId, arguments: args.map(convertArgument.bind(this)),
arguments: args.map(convertArgument.bind(this)), returnByValue,
returnByValue, awaitPromise: true,
awaitPromise: true, userGesture: true
userGesture: true }).catch(rewriteError);
});
} catch (err) {
if (err instanceof TypeError && err.message.startsWith('Converting circular structure to JSON'))
err.message += ' Are you passing a nested JSHandle?';
throw err;
}
const { exceptionDetails, result: remoteObject } = await callFunctionOnPromise.catch(rewriteError);
if (exceptionDetails) if (exceptionDetails)
throw new Error('Evaluation failed: ' + getExceptionMessage(exceptionDetails)); throw new Error('Evaluation failed: ' + getExceptionMessage(exceptionDetails));
return returnByValue ? valueFromRemoteObject(remoteObject) : context._createHandle(remoteObject); return returnByValue ? valueFromRemoteObject(remoteObject) : context._createHandle(remoteObject);
@ -127,6 +120,8 @@ export class CRExecutionContext implements js.ExecutionContextDelegate {
if (error.message.endsWith('Cannot find context with specified id') || error.message.endsWith('Inspected target navigated or closed') || error.message.endsWith('Execution context was destroyed.')) if (error.message.endsWith('Cannot find context with specified id') || error.message.endsWith('Inspected target navigated or closed') || error.message.endsWith('Execution context was destroyed.'))
throw new Error('Execution context was destroyed, most likely because of a navigation.'); throw new Error('Execution context was destroyed, most likely because of a navigation.');
if (error instanceof TypeError && error.message.startsWith('Converting circular structure to JSON'))
error.message += ' Are you passing a nested JSHandle?';
throw error; throw error;
} }
} }

View File

@ -68,7 +68,7 @@ export class FFConnection extends platform.EventEmitter {
return this._sessions.get(sessionId) || null; return this._sessions.get(sessionId) || null;
} }
send<T extends keyof Protocol.CommandParameters>( async send<T extends keyof Protocol.CommandParameters>(
method: T, method: T,
params?: Protocol.CommandParameters[T] params?: Protocol.CommandParameters[T]
): Promise<Protocol.CommandReturnValues[T]> { ): Promise<Protocol.CommandReturnValues[T]> {
@ -179,12 +179,12 @@ export class FFSession extends platform.EventEmitter {
this.once = super.once; this.once = super.once;
} }
send<T extends keyof Protocol.CommandParameters>( async send<T extends keyof Protocol.CommandParameters>(
method: T, method: T,
params?: Protocol.CommandParameters[T] params?: Protocol.CommandParameters[T]
): Promise<Protocol.CommandReturnValues[T]> { ): Promise<Protocol.CommandReturnValues[T]> {
if (this._disposed) if (this._disposed)
return Promise.reject(new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`)); throw new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`);
const id = this._connection.nextMessageId(); const id = this._connection.nextMessageId();
this._rawSend({method, params, id}); this._rawSend({method, params, id});
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {

View File

@ -79,20 +79,13 @@ export class FFExecutionContext implements js.ExecutionContextDelegate {
return {unserializableValue: 'NaN'}; return {unserializableValue: 'NaN'};
return {value: arg}; return {value: arg};
}); });
let callFunctionPromise;
try { const payload = await this._session.send('Runtime.callFunction', {
callFunctionPromise = this._session.send('Runtime.callFunction', { functionDeclaration: functionText,
functionDeclaration: functionText, args: protocolArgs,
args: protocolArgs, returnByValue,
returnByValue, executionContextId: this._executionContextId
executionContextId: this._executionContextId }).catch(rewriteError);
});
} catch (err) {
if (err instanceof TypeError && err.message.startsWith('Converting circular structure to JSON'))
err.message += ' Are you passing a nested JSHandle?';
throw err;
}
const payload = await callFunctionPromise.catch(rewriteError);
checkException(payload.exceptionDetails); checkException(payload.exceptionDetails);
if (returnByValue) if (returnByValue)
return deserializeValue(payload.result!); return deserializeValue(payload.result!);
@ -103,6 +96,8 @@ export class FFExecutionContext implements js.ExecutionContextDelegate {
return {result: {type: 'undefined', value: undefined}}; return {result: {type: 'undefined', value: undefined}};
if (error.message.includes('Failed to find execution context with id') || error.message.includes('Execution context was destroyed!')) if (error.message.includes('Failed to find execution context with id') || error.message.includes('Execution context was destroyed!'))
throw new Error('Execution context was destroyed, most likely because of a navigation.'); throw new Error('Execution context was destroyed, most likely because of a navigation.');
if (error instanceof TypeError && error.message.startsWith('Converting circular structure to JSON'))
error.message += ' Are you passing a nested JSHandle?';
throw error; throw error;
} }
} }

View File

@ -311,22 +311,27 @@ export function makeWaitForNextTask() {
}; };
} }
export class WebSocketTransport implements ConnectionTransport { // 'onmessage' handler must be installed synchronously when 'onopen' callback is invoked to
private _ws: WebSocket; // avoid missing incoming messages.
export async function connectToWebsocket<T>(url: string, onopen: (transport: ConnectionTransport) => Promise<T>): Promise<T> {
const transport = new WebSocketTransport(url);
return new Promise<T>((fulfill, reject) => {
transport._ws.addEventListener('open', async () => fulfill(await onopen(transport)));
transport._ws.addEventListener('error', event => reject(new Error('WebSocket error: ' + (event as ErrorEvent).message)));
});
}
class WebSocketTransport implements ConnectionTransport {
_ws: WebSocket;
onmessage?: (message: string) => void; onmessage?: (message: string) => void;
onclose?: () => void; onclose?: () => void;
private _connectPromise: Promise<(Error|null)>;
constructor(url: string) { constructor(url: string) {
this._ws = (isNode ? new NodeWebSocket(url, [], { this._ws = (isNode ? new NodeWebSocket(url, [], {
perMessageDeflate: false, perMessageDeflate: false,
maxPayload: 256 * 1024 * 1024, // 256Mb maxPayload: 256 * 1024 * 1024, // 256Mb
}) : new WebSocket(url)) as WebSocket; }) : new WebSocket(url)) as WebSocket;
this._connectPromise = new Promise(fulfill => {
this._ws.addEventListener('open', () => fulfill(null));
this._ws.addEventListener('error', event => fulfill(new Error('WebSocket error: ' + (event as ErrorEvent).message)));
});
// The 'ws' module in node sometimes sends us multiple messages in a single task. // The 'ws' module in node sometimes sends us multiple messages in a single task.
// In Web, all IO callbacks (e.g. WebSocket callbacks) // In Web, all IO callbacks (e.g. WebSocket callbacks)
// are dispatched into separate tasks, so there's no need // are dispatched into separate tasks, so there's no need
@ -348,10 +353,7 @@ export class WebSocketTransport implements ConnectionTransport {
this._ws.addEventListener('error', () => {}); this._ws.addEventListener('error', () => {});
} }
async send(message: string) { send(message: string) {
const error = await this._connectPromise;
if (error)
throw error;
this._ws.send(message); this._ws.send(message);
} }

View File

@ -128,7 +128,7 @@ export class Chromium implements BrowserType {
// We try to gracefully close to prevent crash reporting and core dumps. // We try to gracefully close to prevent crash reporting and core dumps.
// Note that it's fine to reuse the pipe transport, since // Note that it's fine to reuse the pipe transport, since
// our connection ignores kBrowserCloseMessageId. // our connection ignores kBrowserCloseMessageId.
const t = transport || new platform.WebSocketTransport(browserWSEndpoint!); const t = transport || await platform.connectToWebsocket(browserWSEndpoint!, async transport => transport);
const message = { method: 'Browser.close', id: kBrowserCloseMessageId }; const message = { method: 'Browser.close', id: kBrowserCloseMessageId };
await t.send(JSON.stringify(message)); await t.send(JSON.stringify(message));
}, },
@ -153,8 +153,9 @@ export class Chromium implements BrowserType {
} }
async connect(options: ConnectOptions): Promise<CRBrowser> { async connect(options: ConnectOptions): Promise<CRBrowser> {
const transport = new platform.WebSocketTransport(options.wsEndpoint); return await platform.connectToWebsocket(options.wsEndpoint, transport => {
return CRBrowser.connect(transport, options.slowMo); return CRBrowser.connect(transport, options.slowMo);
});
} }
executablePath(): string { executablePath(): string {

View File

@ -15,25 +15,24 @@
* limitations under the License. * limitations under the License.
*/ */
import { FFBrowser } from '../firefox/ffBrowser';
import { BrowserFetcher, OnProgressCallback, BrowserFetcherOptions } from './browserFetcher';
import { DeviceDescriptors } from '../deviceDescriptors';
import { launchProcess, waitForLine } from './processLauncher';
import * as types from '../types';
import * as platform from '../platform';
import { kBrowserCloseMessageId } from '../firefox/ffConnection';
import * as fs from 'fs'; import * as fs from 'fs';
import * as os from 'os'; import * as os from 'os';
import * as path from 'path'; import * as path from 'path';
import * as util from 'util'; import * as util from 'util';
import { TimeoutError } from '../errors';
import { assert, helper } from '../helper';
import { LaunchOptions, BrowserArgOptions, BrowserType } from './browserType';
import { ConnectOptions, LaunchType } from '../browser'; import { ConnectOptions, LaunchType } from '../browser';
import { BrowserServer } from './browserServer';
import { Events } from '../events';
import { ConnectionTransport } from '../transport';
import { BrowserContext } from '../browserContext'; import { BrowserContext } from '../browserContext';
import { DeviceDescriptors } from '../deviceDescriptors';
import { TimeoutError } from '../errors';
import { Events } from '../events';
import { FFBrowser } from '../firefox/ffBrowser';
import { kBrowserCloseMessageId } from '../firefox/ffConnection';
import { assert, helper } from '../helper';
import * as platform from '../platform';
import * as types from '../types';
import { BrowserFetcher, BrowserFetcherOptions, OnProgressCallback } from './browserFetcher';
import { BrowserServer } from './browserServer';
import { BrowserArgOptions, BrowserType, LaunchOptions } from './browserType';
import { launchProcess, waitForLine } from './processLauncher';
const mkdtempAsync = platform.promisify(fs.mkdtemp); const mkdtempAsync = platform.promisify(fs.mkdtemp);
@ -64,8 +63,10 @@ export class Firefox implements BrowserType {
async launch(options?: LaunchOptions & { slowMo?: number }): Promise<FFBrowser> { async launch(options?: LaunchOptions & { slowMo?: number }): Promise<FFBrowser> {
if (options && (options as any).userDataDir) if (options && (options as any).userDataDir)
throw new Error('userDataDir option is not supported in `browserType.launch`. Use `browserType.launchPersistent` instead'); throw new Error('userDataDir option is not supported in `browserType.launch`. Use `browserType.launchPersistent` instead');
const { browserServer, transport } = await this._launchServer(options, 'local'); const browserServer = await this._launchServer(options, 'local');
const browser = await FFBrowser.connect(transport!, options && options.slowMo); const browser = await platform.connectToWebsocket(browserServer.wsEndpoint()!, transport => {
return FFBrowser.connect(transport, options && options.slowMo);
});
// Hack: for typical launch scenario, ensure that close waits for actual process termination. // Hack: for typical launch scenario, ensure that close waits for actual process termination.
browser.close = () => browserServer.close(); browser.close = () => browserServer.close();
(browser as any)['__server__'] = browserServer; (browser as any)['__server__'] = browserServer;
@ -73,13 +74,15 @@ export class Firefox implements BrowserType {
} }
async launchServer(options?: LaunchOptions & { port?: number }): Promise<BrowserServer> { async launchServer(options?: LaunchOptions & { port?: number }): Promise<BrowserServer> {
return (await this._launchServer(options, 'server', undefined, options && options.port)).browserServer; return await this._launchServer(options, 'server', undefined, options && options.port);
} }
async launchPersistent(userDataDir: string, options?: LaunchOptions): Promise<BrowserContext> { async launchPersistent(userDataDir: string, options?: LaunchOptions): Promise<BrowserContext> {
const { timeout = 30000 } = options || {}; const { timeout = 30000 } = options || {};
const { browserServer, transport } = await this._launchServer(options, 'persistent', userDataDir); const browserServer = await this._launchServer(options, 'persistent', userDataDir);
const browser = await FFBrowser.connect(transport!); const browser = await platform.connectToWebsocket(browserServer.wsEndpoint()!, transport => {
return FFBrowser.connect(transport);
});
await helper.waitWithTimeout(browser._waitForTarget(t => t.type() === 'page'), 'first page', timeout); await helper.waitWithTimeout(browser._waitForTarget(t => t.type() === 'page'), 'first page', timeout);
// Hack: for typical launch scenario, ensure that close waits for actual process termination. // Hack: for typical launch scenario, ensure that close waits for actual process termination.
const browserContext = browser._defaultContext; const browserContext = browser._defaultContext;
@ -87,7 +90,7 @@ export class Firefox implements BrowserType {
return browserContext; return browserContext;
} }
private async _launchServer(options: LaunchOptions = {}, launchType: LaunchType, userDataDir?: string, port?: number): Promise<{ browserServer: BrowserServer, transport?: ConnectionTransport }> { private async _launchServer(options: LaunchOptions = {}, launchType: LaunchType, userDataDir?: string, port?: number): Promise<BrowserServer> {
const { const {
ignoreDefaultArgs = false, ignoreDefaultArgs = false,
args = [], args = [],
@ -144,7 +147,7 @@ export class Firefox implements BrowserType {
// We try to gracefully close to prevent crash reporting and core dumps. // We try to gracefully close to prevent crash reporting and core dumps.
// Note that it's fine to reuse the pipe transport, since // Note that it's fine to reuse the pipe transport, since
// our connection ignores kBrowserCloseMessageId. // our connection ignores kBrowserCloseMessageId.
const transport = new platform.WebSocketTransport(browserWSEndpoint); const transport = await platform.connectToWebsocket(browserWSEndpoint, async transport => transport);
const message = { method: 'Browser.close', params: {}, id: kBrowserCloseMessageId }; const message = { method: 'Browser.close', params: {}, id: kBrowserCloseMessageId };
await transport.send(JSON.stringify(message)); await transport.send(JSON.stringify(message));
}, },
@ -157,13 +160,14 @@ export class Firefox implements BrowserType {
const timeoutError = new TimeoutError(`Timed out after ${timeout} ms while trying to connect to Firefox!`); const timeoutError = new TimeoutError(`Timed out after ${timeout} ms while trying to connect to Firefox!`);
const match = await waitForLine(launchedProcess, launchedProcess.stdout, /^Juggler listening on (ws:\/\/.*)$/, timeout, timeoutError); const match = await waitForLine(launchedProcess, launchedProcess.stdout, /^Juggler listening on (ws:\/\/.*)$/, timeout, timeoutError);
const browserWSEndpoint = match[1]; const browserWSEndpoint = match[1];
browserServer = new BrowserServer(launchedProcess, gracefullyClose, launchType === 'server' ? browserWSEndpoint : null); browserServer = new BrowserServer(launchedProcess, gracefullyClose, browserWSEndpoint);
return { browserServer, transport: launchType === 'server' ? undefined : new platform.WebSocketTransport(browserWSEndpoint) }; return browserServer;
} }
async connect(options: ConnectOptions): Promise<FFBrowser> { async connect(options: ConnectOptions): Promise<FFBrowser> {
const transport = new platform.WebSocketTransport(options.wsEndpoint); return await platform.connectToWebsocket(options.wsEndpoint, transport => {
return FFBrowser.connect(transport, options.slowMo); return FFBrowser.connect(transport, options.slowMo);
});
} }
executablePath(): string { executablePath(): string {

View File

@ -156,8 +156,9 @@ export class WebKit implements BrowserType {
} }
async connect(options: ConnectOptions): Promise<WKBrowser> { async connect(options: ConnectOptions): Promise<WKBrowser> {
const transport = new platform.WebSocketTransport(options.wsEndpoint); return await platform.connectToWebsocket(options.wsEndpoint, transport => {
return WKBrowser.connect(transport, options.slowMo); return WKBrowser.connect(transport, options.slowMo);
});
} }
executablePath(): string { executablePath(): string {
@ -273,7 +274,7 @@ class SequenceNumberMixer<V> {
} }
} }
async function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number) { function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number) {
const server = new ws.Server({ port }); const server = new ws.Server({ port });
const guid = uuidv4(); const guid = uuidv4();
const idMixer = new SequenceNumberMixer<{id: number, socket: ws}>(); const idMixer = new SequenceNumberMixer<{id: number, socket: ws}>();

View File

@ -22,20 +22,23 @@ import * as platform from './platform';
const connect = { const connect = {
chromium: { chromium: {
connect: async (url: string) => { connect: async (url: string) => {
const transport = new platform.WebSocketTransport(url); return await platform.connectToWebsocket(url, transport => {
return ChromiumBrowser.connect(transport); return ChromiumBrowser.connect(transport);
});
} }
}, },
webkit: { webkit: {
connect: async (url: string) => { connect: async (url: string) => {
const transport = new platform.WebSocketTransport(url); return await platform.connectToWebsocket(url, transport => {
return WebKitBrowser.connect(transport); return WebKitBrowser.connect(transport);
});
} }
}, },
firefox: { firefox: {
connect: async (url: string) => { connect: async (url: string) => {
const transport = new platform.WebSocketTransport(url); return await platform.connectToWebsocket(url, transport => {
return FirefoxBrowser.connect(transport); return FirefoxBrowser.connect(transport);
});
} }
} }
}; };