feat(server): queue run-server clients (#16234)

This commit is contained in:
Pavel Feldman 2022-08-03 19:37:06 -07:00 committed by GitHub
parent e5cc78af67
commit db2972792b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 51 additions and 15 deletions

View File

@ -54,7 +54,7 @@ export class BrowserServerLauncherImpl implements BrowserServerLauncher {
path = options.wsPath.startsWith('/') ? options.wsPath : `/${options.wsPath}`;
// 2. Start the server
const server = new PlaywrightServer('use-pre-launched-browser', { path, maxClients: Infinity, enableSocksProxy: false, preLaunchedBrowser: browser });
const server = new PlaywrightServer('use-pre-launched-browser', { path, maxConcurrentConnections: Infinity, maxIncomingConnections: Infinity, enableSocksProxy: false, preLaunchedBrowser: browser });
const wsEndpoint = await server.listen(options.port);
// 3. Return the BrowserServer interface

View File

@ -47,7 +47,9 @@ export function runDriver() {
}
export async function runServer(port: number | undefined, path = '/', maxClients = Infinity, enableSocksProxy = true, reuseBrowser = false) {
const server = new PlaywrightServer(reuseBrowser ? 'reuse-browser' : 'auto', { path, maxClients, enableSocksProxy });
const maxIncomingConnections = maxClients;
const maxConcurrentConnections = reuseBrowser ? 1 : maxClients;
const server = new PlaywrightServer(reuseBrowser ? 'reuse-browser' : 'auto', { path, maxIncomingConnections, maxConcurrentConnections, enableSocksProxy });
const wsEndpoint = await server.listen(port);
process.on('exit', () => server.close().catch(console.error));
console.log('Listening on ' + wsEndpoint); // eslint-disable-line no-console

View File

@ -23,7 +23,7 @@ function launchGridBrowserWorker(gridURL: string, agentId: string, workerId: str
const log = debug(`pw:grid:worker:${workerId}`);
log('created');
const ws = new WebSocket(gridURL.replace('http://', 'ws://') + `/registerWorker?agentId=${agentId}&workerId=${workerId}`);
new PlaywrightConnection('auto', ws, { enableSocksProxy: true, browserAlias, launchOptions: {} }, { playwright: null, browser: null }, log, async () => {
new PlaywrightConnection(Promise.resolve(), 'auto', ws, { enableSocksProxy: true, browserAlias, launchOptions: {} }, { playwright: null, browser: null }, log, async () => {
log('exiting process');
setTimeout(() => process.exit(0), 30000);
// Meanwhile, try to gracefully close all browsers.

View File

@ -48,7 +48,7 @@ export class PlaywrightConnection {
private _options: Options;
private _root: Root;
constructor(mode: Mode, ws: WebSocket, options: Options, preLaunched: PreLaunched, log: (m: string) => void, onClose: () => void) {
constructor(lock: Promise<void>, mode: Mode, ws: WebSocket, options: Options, preLaunched: PreLaunched, log: (m: string) => void, onClose: () => void) {
this._ws = ws;
this._preLaunched = preLaunched;
this._options = options;
@ -60,11 +60,13 @@ export class PlaywrightConnection {
this._debugLog = log;
this._dispatcherConnection = new DispatcherConnection();
this._dispatcherConnection.onmessage = message => {
this._dispatcherConnection.onmessage = async message => {
await lock;
if (ws.readyState !== ws.CLOSING)
ws.send(JSON.stringify(message));
};
ws.on('message', (message: string) => {
ws.on('message', async (message: string) => {
await lock;
this._dispatcherConnection.dispatch(JSON.parse(Buffer.from(message).toString()));
});

View File

@ -24,6 +24,7 @@ import { PlaywrightConnection } from './playwrightConnection';
import { assert } from '../utils';
import { serverSideCallMetadata } from '../server/instrumentation';
import type { LaunchOptions } from '../server/types';
import { ManualPromise } from '../utils/manualPromise';
const debugLog = debug('pw:server');
@ -39,7 +40,8 @@ export type Mode = 'use-pre-launched-browser' | 'reuse-browser' | 'auto';
type ServerOptions = {
path: string;
maxClients: number;
maxIncomingConnections: number;
maxConcurrentConnections: number;
enableSocksProxy: boolean;
preLaunchedBrowser?: Browser
};
@ -47,7 +49,6 @@ type ServerOptions = {
export class PlaywrightServer {
private _preLaunchedPlaywright: Playwright | null = null;
private _wsServer: WebSocketServer | undefined;
private _clientsCount = 0;
private _mode: Mode;
private _options: ServerOptions;
@ -111,10 +112,9 @@ export class PlaywrightServer {
debugLog('Listening at ' + wsEndpoint);
this._wsServer = new wsServer({ server, path: this._options.path });
const originalShouldHandle = this._wsServer.shouldHandle.bind(this._wsServer);
this._wsServer.shouldHandle = request => originalShouldHandle(request) && this._clientsCount < this._options.maxClients;
this._wsServer.on('connection', async (ws, request) => {
if (this._clientsCount >= this._options.maxClients) {
const semaphore = new Semaphore(this._options.maxConcurrentConnections);
this._wsServer.on('connection', (ws, request) => {
if (semaphore.requested() >= this._options.maxIncomingConnections) {
ws.close(1013, 'Playwright Server is busy');
return;
}
@ -137,14 +137,14 @@ export class PlaywrightServer {
if (headlessValue && headlessValue !== '0')
launchOptions.headless = true;
this._clientsCount++;
const log = newLogger();
log(`serving connection: ${request.url}`);
const connection = new PlaywrightConnection(
semaphore.aquire(),
this._mode, ws,
{ enableSocksProxy, browserAlias, launchOptions },
{ playwright: this._preLaunchedPlaywright, browser: this._options.preLaunchedBrowser || null },
log, () => this._clientsCount--);
log, () => semaphore.release());
(ws as any)[kConnectionSymbol] = connection;
});
@ -174,3 +174,35 @@ export class PlaywrightServer {
debugLog('closed server');
}
}
export class Semaphore {
private _max: number;
private _aquired = 0;
private _queue: ManualPromise[] = [];
constructor(max: number) {
this._max = max;
}
aquire(): Promise<void> {
const lock = new ManualPromise();
this._queue.push(lock);
this._flush();
return lock;
}
requested() {
return this._aquired + this._queue.length;
}
release() {
--this._aquired;
this._flush();
}
private _flush() {
while (this._aquired < this._max && this._queue.length) {
++this._aquired;
this._queue.shift()!.resolve();
}
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
export class ManualPromise<T> extends Promise<T> {
export class ManualPromise<T = void> extends Promise<T> {
private _resolve!: (t: T) => void;
private _reject!: (e: Error) => void;
private _isDone: boolean;