diff --git a/packages/playwright-test/src/dispatcher.ts b/packages/playwright-test/src/dispatcher.ts index 2272a83bbd..f88779b6f8 100644 --- a/packages/playwright-test/src/dispatcher.ts +++ b/packages/playwright-test/src/dispatcher.ts @@ -17,10 +17,11 @@ import child_process from 'child_process'; import path from 'path'; import { EventEmitter } from 'events'; -import { RunPayload, TestBeginPayload, TestEndPayload, DonePayload, TestOutputPayload, WorkerInitParams, StepBeginPayload, StepEndPayload } from './ipc'; +import { RunPayload, TestBeginPayload, TestEndPayload, DonePayload, TestOutputPayload, WorkerInitParams, StepBeginPayload, StepEndPayload, SerializedLoaderData } from './ipc'; import type { TestResult, Reporter, TestStep } from '../types/testReporter'; import { Suite, TestCase } from './test'; import { Loader } from './loader'; +import { ManualPromise } from 'playwright-core/src/utils/async'; export type TestGroup = { workerHash: string; @@ -31,17 +32,15 @@ export type TestGroup = { }; export class Dispatcher { - private _workers = new Set(); - private _freeWorkers: Worker[] = []; - private _workerClaimers: (() => void)[] = []; + private _workerSlots: { busy: boolean, worker?: Worker }[] = []; + private _queue: TestGroup[] = []; + private _finished = new ManualPromise(); + private _isStopped = false; private _testById = new Map, stepStack: Set }>(); - private _queue: TestGroup[] = []; - private _stopCallback = () => {}; - readonly _loader: Loader; + private _loader: Loader; private _reporter: Reporter; private _hasWorkerErrors = false; - private _isStopped = false; private _failureCount = 0; constructor(loader: Loader, testGroups: TestGroup[], reporter: Reporter) { @@ -57,29 +56,77 @@ export class Dispatcher { } } - async run() { - // Loop in case job schedules more jobs - while (this._queue.length && !this._isStopped) - await this._dispatchQueue(); + private async _scheduleJob() { + // 1. Find a job to run. + if (this._isStopped || !this._queue.length) + return; + const job = this._queue[0]; + + // 2. Find a worker with the same hash, or just some free worker. + let index = this._workerSlots.findIndex(w => !w.busy && w.worker && w.worker.hash() === job.workerHash); + if (index === -1) + index = this._workerSlots.findIndex(w => !w.busy); + // No workers available, bail out. + if (index === -1) + return; + + // 3. Claim both the job and the worker, run the job and release the worker. + this._queue.shift(); + this._workerSlots[index].busy = true; + await this._startJobInWorker(index, job); + this._workerSlots[index].busy = false; + + // 4. Check the "finished" condition. + this._checkFinished(); + + // 5. We got a free worker - perhaps we can immediately start another job? + this._scheduleJob(); } - async _dispatchQueue() { - const jobs = []; - while (this._queue.length) { - if (this._isStopped) - break; - const testGroup = this._queue.shift()!; - const requiredHash = testGroup.workerHash; - let worker = await this._obtainWorker(testGroup); - while (worker && worker.hash && worker.hash !== requiredHash) { - worker.stop(); - worker = await this._obtainWorker(testGroup); - } - if (this._isStopped || !worker) - break; - jobs.push(this._runJob(worker, testGroup)); + private async _startJobInWorker(index: number, job: TestGroup) { + let worker = this._workerSlots[index].worker; + + // 1. Restart the worker if it has the wrong hash. + if (worker && worker.hash() !== job.workerHash) { + await worker.stop(); + worker = undefined; + if (this._isStopped) // Check stopped signal after async hop. + return; } - await Promise.all(jobs); + + // 2. Start the worker if it is down. + if (!worker) { + worker = this._createWorker(job.workerHash); + this._workerSlots[index].worker = worker; + worker.on('exit', () => this._workerSlots[index].worker = undefined); + await worker.init(job, this._loader.serialize()); + if (this._isStopped) // Check stopped signal after async hop. + return; + } + + // 3. Run the job. + await this._runJob(worker, job); + } + + private _checkFinished() { + const hasMoreJobs = !!this._queue.length && !this._isStopped; + const allWorkersFree = this._workerSlots.every(w => !w.busy); + if (!hasMoreJobs && allWorkersFree) + this._finished.resolve(); + } + + async run() { + this._workerSlots = []; + // 1. Allocate workers. + for (let i = 0; i < this._loader.fullConfig().workers; i++) + this._workerSlots.push({ busy: false }); + // 2. Schedule enough jobs. + for (let i = 0; i < this._workerSlots.length; i++) + this._scheduleJob(); + this._checkFinished(); + // 3. More jobs are scheduled when the worker becomes free, or a new job is added. + // 4. Wait for all jobs to finish. + await this._finished; } async _runJob(worker: Worker, testGroup: TestGroup) { @@ -185,15 +232,12 @@ export class Dispatcher { // - we are here not because something failed // - no unrecoverable worker error if (!remaining.length && !failedTestIds.size && !params.fatalError) { - this._freeWorkers.push(worker); - this._notifyWorkerClaimer(); doneWithJob(); return; } // When worker encounters error, we will stop it and create a new one. - worker.stop(); - worker.didFail = true; + worker.stop(true /* didFail */); // In case of fatal error, report first remaining test as failing with this error, // and all others as skipped. @@ -274,61 +318,30 @@ export class Dispatcher { } } - if (remaining.length) + if (remaining.length) { this._queue.unshift({ ...testGroup, tests: remaining }); + // Perhaps we can immediately start the new job if there is a worker available? + this._scheduleJob(); + } // This job is over, we just scheduled another one. doneWithJob(); }; worker.on('done', onDone); - const onExit = () => { - if (worker.didSendStop) - onDone({}); - else - onDone({ fatalError: { value: 'Worker process exited unexpectedly' } }); + const onExit = (expectedly: boolean) => { + onDone(expectedly ? {} : { fatalError: { value: 'Worker process exited unexpectedly' } }); }; worker.on('exit', onExit); return result; } - async _obtainWorker(testGroup: TestGroup) { - const claimWorker = (): Promise | null => { - if (this._isStopped) - return null; - // Use available worker. - if (this._freeWorkers.length) - return Promise.resolve(this._freeWorkers.pop()!); - // Create a new worker. - if (this._workers.size < this._loader.fullConfig().workers) - return this._createWorker(testGroup); - return null; - }; - - // Note: it is important to claim the worker synchronously, - // so that we won't miss a _notifyWorkerClaimer call while awaiting. - let worker = claimWorker(); - if (!worker) { - // Wait for available or stopped worker. - await new Promise(f => this._workerClaimers.push(f)); - worker = claimWorker(); - } - return worker; - } - - async _notifyWorkerClaimer() { - if (this._isStopped || !this._workerClaimers.length) - return; - const callback = this._workerClaimers.shift()!; - callback(); - } - - _createWorker(testGroup: TestGroup) { - const worker = new Worker(this); + _createWorker(hash: string) { + const worker = new Worker(hash); worker.on('stdOut', (params: TestOutputPayload) => { const chunk = chunkFromParams(params); - if (worker.didFail) { + if (worker.didFail()) { // Note: we keep reading stdout from workers that are currently stopping after failure, // to debug teardown issues. However, we avoid spoiling the test result from // the next retry. @@ -342,7 +355,7 @@ export class Dispatcher { }); worker.on('stdErr', (params: TestOutputPayload) => { const chunk = chunkFromParams(params); - if (worker.didFail) { + if (worker.didFail()) { // Note: we keep reading stderr from workers that are currently stopping after failure, // to debug teardown issues. However, we avoid spoiling the test result from // the next retry. @@ -358,26 +371,13 @@ export class Dispatcher { this._hasWorkerErrors = true; this._reporter.onError?.(error); }); - worker.on('exit', () => { - this._workers.delete(worker); - this._notifyWorkerClaimer(); - if (this._stopCallback && !this._workers.size) - this._stopCallback(); - }); - this._workers.add(worker); - return worker.init(testGroup).then(() => worker); + return worker; } async stop() { this._isStopped = true; - if (this._workers.size) { - const result = new Promise(f => this._stopCallback = f); - for (const worker of this._workers) - worker.stop(); - await result; - } - while (this._workerClaimers.length) - this._workerClaimers.shift()!(); + await Promise.all(this._workerSlots.map(({ worker }) => worker?.stop())); + this._checkFinished(); } private _hasReachedMaxFailures() { @@ -402,30 +402,33 @@ export class Dispatcher { let lastWorkerIndex = 0; class Worker extends EventEmitter { - process: child_process.ChildProcess; - runner: Dispatcher; - hash = ''; - index: number; - didSendStop = false; - didFail = false; + private process: child_process.ChildProcess; + private _hash: string; + private workerIndex: number; + private didSendStop = false; + private _didFail = false; + private didExit = false; - constructor(runner: Dispatcher) { + constructor(hash: string) { super(); - this.runner = runner; - this.index = lastWorkerIndex++; + this.workerIndex = lastWorkerIndex++; + this._hash = hash; this.process = child_process.fork(path.join(__dirname, 'worker.js'), { detached: false, env: { FORCE_COLOR: process.stdout.isTTY ? '1' : '0', DEBUG_COLORS: process.stdout.isTTY ? '1' : '0', - TEST_WORKER_INDEX: String(this.index), + TEST_WORKER_INDEX: String(this.workerIndex), ...process.env }, // Can't pipe since piping slows down termination for some reason. stdio: ['ignore', 'ignore', process.env.PW_RUNNER_DEBUG ? 'inherit' : 'ignore', 'ipc'] }); - this.process.on('exit', () => this.emit('exit')); + this.process.on('exit', () => { + this.didExit = true; + this.emit('exit', this.didSendStop /* expectedly */); + }); this.process.on('error', e => {}); // do not yell at a send to dead process. this.process.on('message', (message: any) => { const { method, params } = message; @@ -433,13 +436,12 @@ class Worker extends EventEmitter { }); } - async init(testGroup: TestGroup) { - this.hash = testGroup.workerHash; + async init(testGroup: TestGroup, loaderData: SerializedLoaderData) { const params: WorkerInitParams = { - workerIndex: this.index, + workerIndex: this.workerIndex, repeatEachIndex: testGroup.repeatEachIndex, projectIndex: testGroup.projectIndex, - loader: this.runner._loader.serialize(), + loader: loaderData, }; this.process.send({ method: 'init', params }); await new Promise(f => this.process.once('message', f)); // Ready ack @@ -455,10 +457,24 @@ class Worker extends EventEmitter { this.process.send({ method: 'run', params: runPayload }); } - stop() { - if (!this.didSendStop) + didFail() { + return this._didFail; + } + + hash() { + return this._hash; + } + + async stop(didFail?: boolean) { + if (didFail) + this._didFail = true; + if (this.didExit) + return; + if (!this.didSendStop) { this.process.send({ method: 'stop' }); - this.didSendStop = true; + this.didSendStop = true; + } + await new Promise(f => this.once('exit', f)); } } diff --git a/tests/playwright-test/runner.spec.ts b/tests/playwright-test/runner.spec.ts index ed70a77884..df287a26a7 100644 --- a/tests/playwright-test/runner.spec.ts +++ b/tests/playwright-test/runner.spec.ts @@ -14,7 +14,7 @@ * limitations under the License. */ import path from 'path'; -import { test, expect } from './playwright-test-fixtures'; +import { test, expect, stripAscii } from './playwright-test-fixtures'; test('it should not allow multiple tests with the same name per suite', async ({ runInlineTest }) => { const result = await runInlineTest({ @@ -166,3 +166,39 @@ test('worker interrupt should report errors', async ({ runInlineTest }) => { expect(result.output).toContain('%%SEND-SIGINT%%'); expect(result.output).toContain('Error: INTERRUPT'); }); + +test('should not stall when workers are available', async ({ runInlineTest }) => { + const result = await runInlineTest({ + 'a.spec.js': ` + const { test } = pwt + test('fails-1', async () => { + console.log('\\n%%fails-1-started'); + await new Promise(f => setTimeout(f, 2000)); + console.log('\\n%%fails-1-done'); + expect(1).toBe(2); + }); + test('passes-1', async () => { + console.log('\\n%%passes-1'); + }); + `, + 'b.spec.js': ` + const { test } = pwt + test('passes-2', async () => { + await new Promise(f => setTimeout(f, 1000)); + console.log('\\n%%passes-2-started'); + await new Promise(f => setTimeout(f, 3000)); + console.log('\\n%%passes-2-done'); + }); + `, + }, { workers: 2 }); + expect(result.exitCode).toBe(1); + expect(result.passed).toBe(2); + expect(result.failed).toBe(1); + expect(stripAscii(result.output).split('\n').filter(line => line.startsWith('%%'))).toEqual([ + '%%fails-1-started', + '%%passes-2-started', + '%%fails-1-done', + '%%passes-1', + '%%passes-2-done', + ]); +});