From db2972792badc38456ed7d4f0365066ee38f59e3 Mon Sep 17 00:00:00 2001 From: Pavel Feldman Date: Wed, 3 Aug 2022 19:37:06 -0700 Subject: [PATCH] feat(server): queue run-server clients (#16234) --- .../playwright-core/src/browserServerImpl.ts | 2 +- packages/playwright-core/src/cli/driver.ts | 4 +- .../src/grid/gridBrowserWorker.ts | 2 +- .../src/remote/playwrightConnection.ts | 8 ++-- .../src/remote/playwrightServer.ts | 48 +++++++++++++++---- .../src/utils/manualPromise.ts | 2 +- 6 files changed, 51 insertions(+), 15 deletions(-) diff --git a/packages/playwright-core/src/browserServerImpl.ts b/packages/playwright-core/src/browserServerImpl.ts index b6a26d7f6f..494cecf2ba 100644 --- a/packages/playwright-core/src/browserServerImpl.ts +++ b/packages/playwright-core/src/browserServerImpl.ts @@ -54,7 +54,7 @@ export class BrowserServerLauncherImpl implements BrowserServerLauncher { path = options.wsPath.startsWith('/') ? options.wsPath : `/${options.wsPath}`; // 2. Start the server - const server = new PlaywrightServer('use-pre-launched-browser', { path, maxClients: Infinity, enableSocksProxy: false, preLaunchedBrowser: browser }); + const server = new PlaywrightServer('use-pre-launched-browser', { path, maxConcurrentConnections: Infinity, maxIncomingConnections: Infinity, enableSocksProxy: false, preLaunchedBrowser: browser }); const wsEndpoint = await server.listen(options.port); // 3. Return the BrowserServer interface diff --git a/packages/playwright-core/src/cli/driver.ts b/packages/playwright-core/src/cli/driver.ts index 0007d1f72a..29842ae6d0 100644 --- a/packages/playwright-core/src/cli/driver.ts +++ b/packages/playwright-core/src/cli/driver.ts @@ -47,7 +47,9 @@ export function runDriver() { } export async function runServer(port: number | undefined, path = '/', maxClients = Infinity, enableSocksProxy = true, reuseBrowser = false) { - const server = new PlaywrightServer(reuseBrowser ? 'reuse-browser' : 'auto', { path, maxClients, enableSocksProxy }); + const maxIncomingConnections = maxClients; + const maxConcurrentConnections = reuseBrowser ? 1 : maxClients; + const server = new PlaywrightServer(reuseBrowser ? 'reuse-browser' : 'auto', { path, maxIncomingConnections, maxConcurrentConnections, enableSocksProxy }); const wsEndpoint = await server.listen(port); process.on('exit', () => server.close().catch(console.error)); console.log('Listening on ' + wsEndpoint); // eslint-disable-line no-console diff --git a/packages/playwright-core/src/grid/gridBrowserWorker.ts b/packages/playwright-core/src/grid/gridBrowserWorker.ts index 2638c516b0..a49ce27d03 100644 --- a/packages/playwright-core/src/grid/gridBrowserWorker.ts +++ b/packages/playwright-core/src/grid/gridBrowserWorker.ts @@ -23,7 +23,7 @@ function launchGridBrowserWorker(gridURL: string, agentId: string, workerId: str const log = debug(`pw:grid:worker:${workerId}`); log('created'); const ws = new WebSocket(gridURL.replace('http://', 'ws://') + `/registerWorker?agentId=${agentId}&workerId=${workerId}`); - new PlaywrightConnection('auto', ws, { enableSocksProxy: true, browserAlias, launchOptions: {} }, { playwright: null, browser: null }, log, async () => { + new PlaywrightConnection(Promise.resolve(), 'auto', ws, { enableSocksProxy: true, browserAlias, launchOptions: {} }, { playwright: null, browser: null }, log, async () => { log('exiting process'); setTimeout(() => process.exit(0), 30000); // Meanwhile, try to gracefully close all browsers. diff --git a/packages/playwright-core/src/remote/playwrightConnection.ts b/packages/playwright-core/src/remote/playwrightConnection.ts index 812a9bf637..352101ed75 100644 --- a/packages/playwright-core/src/remote/playwrightConnection.ts +++ b/packages/playwright-core/src/remote/playwrightConnection.ts @@ -48,7 +48,7 @@ export class PlaywrightConnection { private _options: Options; private _root: Root; - constructor(mode: Mode, ws: WebSocket, options: Options, preLaunched: PreLaunched, log: (m: string) => void, onClose: () => void) { + constructor(lock: Promise, mode: Mode, ws: WebSocket, options: Options, preLaunched: PreLaunched, log: (m: string) => void, onClose: () => void) { this._ws = ws; this._preLaunched = preLaunched; this._options = options; @@ -60,11 +60,13 @@ export class PlaywrightConnection { this._debugLog = log; this._dispatcherConnection = new DispatcherConnection(); - this._dispatcherConnection.onmessage = message => { + this._dispatcherConnection.onmessage = async message => { + await lock; if (ws.readyState !== ws.CLOSING) ws.send(JSON.stringify(message)); }; - ws.on('message', (message: string) => { + ws.on('message', async (message: string) => { + await lock; this._dispatcherConnection.dispatch(JSON.parse(Buffer.from(message).toString())); }); diff --git a/packages/playwright-core/src/remote/playwrightServer.ts b/packages/playwright-core/src/remote/playwrightServer.ts index 4357f48dd6..091ee3de04 100644 --- a/packages/playwright-core/src/remote/playwrightServer.ts +++ b/packages/playwright-core/src/remote/playwrightServer.ts @@ -24,6 +24,7 @@ import { PlaywrightConnection } from './playwrightConnection'; import { assert } from '../utils'; import { serverSideCallMetadata } from '../server/instrumentation'; import type { LaunchOptions } from '../server/types'; +import { ManualPromise } from '../utils/manualPromise'; const debugLog = debug('pw:server'); @@ -39,7 +40,8 @@ export type Mode = 'use-pre-launched-browser' | 'reuse-browser' | 'auto'; type ServerOptions = { path: string; - maxClients: number; + maxIncomingConnections: number; + maxConcurrentConnections: number; enableSocksProxy: boolean; preLaunchedBrowser?: Browser }; @@ -47,7 +49,6 @@ type ServerOptions = { export class PlaywrightServer { private _preLaunchedPlaywright: Playwright | null = null; private _wsServer: WebSocketServer | undefined; - private _clientsCount = 0; private _mode: Mode; private _options: ServerOptions; @@ -111,10 +112,9 @@ export class PlaywrightServer { debugLog('Listening at ' + wsEndpoint); this._wsServer = new wsServer({ server, path: this._options.path }); - const originalShouldHandle = this._wsServer.shouldHandle.bind(this._wsServer); - this._wsServer.shouldHandle = request => originalShouldHandle(request) && this._clientsCount < this._options.maxClients; - this._wsServer.on('connection', async (ws, request) => { - if (this._clientsCount >= this._options.maxClients) { + const semaphore = new Semaphore(this._options.maxConcurrentConnections); + this._wsServer.on('connection', (ws, request) => { + if (semaphore.requested() >= this._options.maxIncomingConnections) { ws.close(1013, 'Playwright Server is busy'); return; } @@ -137,14 +137,14 @@ export class PlaywrightServer { if (headlessValue && headlessValue !== '0') launchOptions.headless = true; - this._clientsCount++; const log = newLogger(); log(`serving connection: ${request.url}`); const connection = new PlaywrightConnection( + semaphore.aquire(), this._mode, ws, { enableSocksProxy, browserAlias, launchOptions }, { playwright: this._preLaunchedPlaywright, browser: this._options.preLaunchedBrowser || null }, - log, () => this._clientsCount--); + log, () => semaphore.release()); (ws as any)[kConnectionSymbol] = connection; }); @@ -174,3 +174,35 @@ export class PlaywrightServer { debugLog('closed server'); } } + +export class Semaphore { + private _max: number; + private _aquired = 0; + private _queue: ManualPromise[] = []; + constructor(max: number) { + this._max = max; + } + + aquire(): Promise { + const lock = new ManualPromise(); + this._queue.push(lock); + this._flush(); + return lock; + } + + requested() { + return this._aquired + this._queue.length; + } + + release() { + --this._aquired; + this._flush(); + } + + private _flush() { + while (this._aquired < this._max && this._queue.length) { + ++this._aquired; + this._queue.shift()!.resolve(); + } + } +} diff --git a/packages/playwright-core/src/utils/manualPromise.ts b/packages/playwright-core/src/utils/manualPromise.ts index 18b0aaf910..21ce630ca6 100644 --- a/packages/playwright-core/src/utils/manualPromise.ts +++ b/packages/playwright-core/src/utils/manualPromise.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -export class ManualPromise extends Promise { +export class ManualPromise extends Promise { private _resolve!: (t: T) => void; private _reject!: (e: Error) => void; private _isDone: boolean;