feat(test runner): rewrite dispatcher to avoid unneded stalls (#9629)

In some circumstances, dispatcher was waiting for all exisitng jobs
to finish before scheduling a new one. This leads to unneded stalls.
Instead, we can schedule jobs right away, if we have a worker
available.
This commit is contained in:
Dmitry Gozman 2021-10-22 11:10:37 -07:00 committed by GitHub
parent 8d05cdacbc
commit 193c79a685
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 160 additions and 108 deletions

View File

@ -17,10 +17,11 @@
import child_process from 'child_process'; import child_process from 'child_process';
import path from 'path'; import path from 'path';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { RunPayload, TestBeginPayload, TestEndPayload, DonePayload, TestOutputPayload, WorkerInitParams, StepBeginPayload, StepEndPayload } from './ipc'; import { RunPayload, TestBeginPayload, TestEndPayload, DonePayload, TestOutputPayload, WorkerInitParams, StepBeginPayload, StepEndPayload, SerializedLoaderData } from './ipc';
import type { TestResult, Reporter, TestStep } from '../types/testReporter'; import type { TestResult, Reporter, TestStep } from '../types/testReporter';
import { Suite, TestCase } from './test'; import { Suite, TestCase } from './test';
import { Loader } from './loader'; import { Loader } from './loader';
import { ManualPromise } from 'playwright-core/src/utils/async';
export type TestGroup = { export type TestGroup = {
workerHash: string; workerHash: string;
@ -31,17 +32,15 @@ export type TestGroup = {
}; };
export class Dispatcher { export class Dispatcher {
private _workers = new Set<Worker>(); private _workerSlots: { busy: boolean, worker?: Worker }[] = [];
private _freeWorkers: Worker[] = []; private _queue: TestGroup[] = [];
private _workerClaimers: (() => void)[] = []; private _finished = new ManualPromise<void>();
private _isStopped = false;
private _testById = new Map<string, { test: TestCase, result: TestResult, steps: Map<string, TestStep>, stepStack: Set<TestStep> }>(); private _testById = new Map<string, { test: TestCase, result: TestResult, steps: Map<string, TestStep>, stepStack: Set<TestStep> }>();
private _queue: TestGroup[] = []; private _loader: Loader;
private _stopCallback = () => {};
readonly _loader: Loader;
private _reporter: Reporter; private _reporter: Reporter;
private _hasWorkerErrors = false; private _hasWorkerErrors = false;
private _isStopped = false;
private _failureCount = 0; private _failureCount = 0;
constructor(loader: Loader, testGroups: TestGroup[], reporter: Reporter) { constructor(loader: Loader, testGroups: TestGroup[], reporter: Reporter) {
@ -57,29 +56,77 @@ export class Dispatcher {
} }
} }
async run() { private async _scheduleJob() {
// Loop in case job schedules more jobs // 1. Find a job to run.
while (this._queue.length && !this._isStopped) if (this._isStopped || !this._queue.length)
await this._dispatchQueue(); return;
const job = this._queue[0];
// 2. Find a worker with the same hash, or just some free worker.
let index = this._workerSlots.findIndex(w => !w.busy && w.worker && w.worker.hash() === job.workerHash);
if (index === -1)
index = this._workerSlots.findIndex(w => !w.busy);
// No workers available, bail out.
if (index === -1)
return;
// 3. Claim both the job and the worker, run the job and release the worker.
this._queue.shift();
this._workerSlots[index].busy = true;
await this._startJobInWorker(index, job);
this._workerSlots[index].busy = false;
// 4. Check the "finished" condition.
this._checkFinished();
// 5. We got a free worker - perhaps we can immediately start another job?
this._scheduleJob();
} }
async _dispatchQueue() { private async _startJobInWorker(index: number, job: TestGroup) {
const jobs = []; let worker = this._workerSlots[index].worker;
while (this._queue.length) {
if (this._isStopped) // 1. Restart the worker if it has the wrong hash.
break; if (worker && worker.hash() !== job.workerHash) {
const testGroup = this._queue.shift()!; await worker.stop();
const requiredHash = testGroup.workerHash; worker = undefined;
let worker = await this._obtainWorker(testGroup); if (this._isStopped) // Check stopped signal after async hop.
while (worker && worker.hash && worker.hash !== requiredHash) { return;
worker.stop();
worker = await this._obtainWorker(testGroup);
}
if (this._isStopped || !worker)
break;
jobs.push(this._runJob(worker, testGroup));
} }
await Promise.all(jobs);
// 2. Start the worker if it is down.
if (!worker) {
worker = this._createWorker(job.workerHash);
this._workerSlots[index].worker = worker;
worker.on('exit', () => this._workerSlots[index].worker = undefined);
await worker.init(job, this._loader.serialize());
if (this._isStopped) // Check stopped signal after async hop.
return;
}
// 3. Run the job.
await this._runJob(worker, job);
}
private _checkFinished() {
const hasMoreJobs = !!this._queue.length && !this._isStopped;
const allWorkersFree = this._workerSlots.every(w => !w.busy);
if (!hasMoreJobs && allWorkersFree)
this._finished.resolve();
}
async run() {
this._workerSlots = [];
// 1. Allocate workers.
for (let i = 0; i < this._loader.fullConfig().workers; i++)
this._workerSlots.push({ busy: false });
// 2. Schedule enough jobs.
for (let i = 0; i < this._workerSlots.length; i++)
this._scheduleJob();
this._checkFinished();
// 3. More jobs are scheduled when the worker becomes free, or a new job is added.
// 4. Wait for all jobs to finish.
await this._finished;
} }
async _runJob(worker: Worker, testGroup: TestGroup) { async _runJob(worker: Worker, testGroup: TestGroup) {
@ -185,15 +232,12 @@ export class Dispatcher {
// - we are here not because something failed // - we are here not because something failed
// - no unrecoverable worker error // - no unrecoverable worker error
if (!remaining.length && !failedTestIds.size && !params.fatalError) { if (!remaining.length && !failedTestIds.size && !params.fatalError) {
this._freeWorkers.push(worker);
this._notifyWorkerClaimer();
doneWithJob(); doneWithJob();
return; return;
} }
// When worker encounters error, we will stop it and create a new one. // When worker encounters error, we will stop it and create a new one.
worker.stop(); worker.stop(true /* didFail */);
worker.didFail = true;
// In case of fatal error, report first remaining test as failing with this error, // In case of fatal error, report first remaining test as failing with this error,
// and all others as skipped. // and all others as skipped.
@ -274,61 +318,30 @@ export class Dispatcher {
} }
} }
if (remaining.length) if (remaining.length) {
this._queue.unshift({ ...testGroup, tests: remaining }); this._queue.unshift({ ...testGroup, tests: remaining });
// Perhaps we can immediately start the new job if there is a worker available?
this._scheduleJob();
}
// This job is over, we just scheduled another one. // This job is over, we just scheduled another one.
doneWithJob(); doneWithJob();
}; };
worker.on('done', onDone); worker.on('done', onDone);
const onExit = () => { const onExit = (expectedly: boolean) => {
if (worker.didSendStop) onDone(expectedly ? {} : { fatalError: { value: 'Worker process exited unexpectedly' } });
onDone({});
else
onDone({ fatalError: { value: 'Worker process exited unexpectedly' } });
}; };
worker.on('exit', onExit); worker.on('exit', onExit);
return result; return result;
} }
async _obtainWorker(testGroup: TestGroup) { _createWorker(hash: string) {
const claimWorker = (): Promise<Worker> | null => { const worker = new Worker(hash);
if (this._isStopped)
return null;
// Use available worker.
if (this._freeWorkers.length)
return Promise.resolve(this._freeWorkers.pop()!);
// Create a new worker.
if (this._workers.size < this._loader.fullConfig().workers)
return this._createWorker(testGroup);
return null;
};
// Note: it is important to claim the worker synchronously,
// so that we won't miss a _notifyWorkerClaimer call while awaiting.
let worker = claimWorker();
if (!worker) {
// Wait for available or stopped worker.
await new Promise<void>(f => this._workerClaimers.push(f));
worker = claimWorker();
}
return worker;
}
async _notifyWorkerClaimer() {
if (this._isStopped || !this._workerClaimers.length)
return;
const callback = this._workerClaimers.shift()!;
callback();
}
_createWorker(testGroup: TestGroup) {
const worker = new Worker(this);
worker.on('stdOut', (params: TestOutputPayload) => { worker.on('stdOut', (params: TestOutputPayload) => {
const chunk = chunkFromParams(params); const chunk = chunkFromParams(params);
if (worker.didFail) { if (worker.didFail()) {
// Note: we keep reading stdout from workers that are currently stopping after failure, // Note: we keep reading stdout from workers that are currently stopping after failure,
// to debug teardown issues. However, we avoid spoiling the test result from // to debug teardown issues. However, we avoid spoiling the test result from
// the next retry. // the next retry.
@ -342,7 +355,7 @@ export class Dispatcher {
}); });
worker.on('stdErr', (params: TestOutputPayload) => { worker.on('stdErr', (params: TestOutputPayload) => {
const chunk = chunkFromParams(params); const chunk = chunkFromParams(params);
if (worker.didFail) { if (worker.didFail()) {
// Note: we keep reading stderr from workers that are currently stopping after failure, // Note: we keep reading stderr from workers that are currently stopping after failure,
// to debug teardown issues. However, we avoid spoiling the test result from // to debug teardown issues. However, we avoid spoiling the test result from
// the next retry. // the next retry.
@ -358,26 +371,13 @@ export class Dispatcher {
this._hasWorkerErrors = true; this._hasWorkerErrors = true;
this._reporter.onError?.(error); this._reporter.onError?.(error);
}); });
worker.on('exit', () => { return worker;
this._workers.delete(worker);
this._notifyWorkerClaimer();
if (this._stopCallback && !this._workers.size)
this._stopCallback();
});
this._workers.add(worker);
return worker.init(testGroup).then(() => worker);
} }
async stop() { async stop() {
this._isStopped = true; this._isStopped = true;
if (this._workers.size) { await Promise.all(this._workerSlots.map(({ worker }) => worker?.stop()));
const result = new Promise<void>(f => this._stopCallback = f); this._checkFinished();
for (const worker of this._workers)
worker.stop();
await result;
}
while (this._workerClaimers.length)
this._workerClaimers.shift()!();
} }
private _hasReachedMaxFailures() { private _hasReachedMaxFailures() {
@ -402,30 +402,33 @@ export class Dispatcher {
let lastWorkerIndex = 0; let lastWorkerIndex = 0;
class Worker extends EventEmitter { class Worker extends EventEmitter {
process: child_process.ChildProcess; private process: child_process.ChildProcess;
runner: Dispatcher; private _hash: string;
hash = ''; private workerIndex: number;
index: number; private didSendStop = false;
didSendStop = false; private _didFail = false;
didFail = false; private didExit = false;
constructor(runner: Dispatcher) { constructor(hash: string) {
super(); super();
this.runner = runner; this.workerIndex = lastWorkerIndex++;
this.index = lastWorkerIndex++; this._hash = hash;
this.process = child_process.fork(path.join(__dirname, 'worker.js'), { this.process = child_process.fork(path.join(__dirname, 'worker.js'), {
detached: false, detached: false,
env: { env: {
FORCE_COLOR: process.stdout.isTTY ? '1' : '0', FORCE_COLOR: process.stdout.isTTY ? '1' : '0',
DEBUG_COLORS: process.stdout.isTTY ? '1' : '0', DEBUG_COLORS: process.stdout.isTTY ? '1' : '0',
TEST_WORKER_INDEX: String(this.index), TEST_WORKER_INDEX: String(this.workerIndex),
...process.env ...process.env
}, },
// Can't pipe since piping slows down termination for some reason. // Can't pipe since piping slows down termination for some reason.
stdio: ['ignore', 'ignore', process.env.PW_RUNNER_DEBUG ? 'inherit' : 'ignore', 'ipc'] stdio: ['ignore', 'ignore', process.env.PW_RUNNER_DEBUG ? 'inherit' : 'ignore', 'ipc']
}); });
this.process.on('exit', () => this.emit('exit')); this.process.on('exit', () => {
this.didExit = true;
this.emit('exit', this.didSendStop /* expectedly */);
});
this.process.on('error', e => {}); // do not yell at a send to dead process. this.process.on('error', e => {}); // do not yell at a send to dead process.
this.process.on('message', (message: any) => { this.process.on('message', (message: any) => {
const { method, params } = message; const { method, params } = message;
@ -433,13 +436,12 @@ class Worker extends EventEmitter {
}); });
} }
async init(testGroup: TestGroup) { async init(testGroup: TestGroup, loaderData: SerializedLoaderData) {
this.hash = testGroup.workerHash;
const params: WorkerInitParams = { const params: WorkerInitParams = {
workerIndex: this.index, workerIndex: this.workerIndex,
repeatEachIndex: testGroup.repeatEachIndex, repeatEachIndex: testGroup.repeatEachIndex,
projectIndex: testGroup.projectIndex, projectIndex: testGroup.projectIndex,
loader: this.runner._loader.serialize(), loader: loaderData,
}; };
this.process.send({ method: 'init', params }); this.process.send({ method: 'init', params });
await new Promise(f => this.process.once('message', f)); // Ready ack await new Promise(f => this.process.once('message', f)); // Ready ack
@ -455,10 +457,24 @@ class Worker extends EventEmitter {
this.process.send({ method: 'run', params: runPayload }); this.process.send({ method: 'run', params: runPayload });
} }
stop() { didFail() {
if (!this.didSendStop) return this._didFail;
}
hash() {
return this._hash;
}
async stop(didFail?: boolean) {
if (didFail)
this._didFail = true;
if (this.didExit)
return;
if (!this.didSendStop) {
this.process.send({ method: 'stop' }); this.process.send({ method: 'stop' });
this.didSendStop = true; this.didSendStop = true;
}
await new Promise(f => this.once('exit', f));
} }
} }

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
import path from 'path'; import path from 'path';
import { test, expect } from './playwright-test-fixtures'; import { test, expect, stripAscii } from './playwright-test-fixtures';
test('it should not allow multiple tests with the same name per suite', async ({ runInlineTest }) => { test('it should not allow multiple tests with the same name per suite', async ({ runInlineTest }) => {
const result = await runInlineTest({ const result = await runInlineTest({
@ -166,3 +166,39 @@ test('worker interrupt should report errors', async ({ runInlineTest }) => {
expect(result.output).toContain('%%SEND-SIGINT%%'); expect(result.output).toContain('%%SEND-SIGINT%%');
expect(result.output).toContain('Error: INTERRUPT'); expect(result.output).toContain('Error: INTERRUPT');
}); });
test('should not stall when workers are available', async ({ runInlineTest }) => {
const result = await runInlineTest({
'a.spec.js': `
const { test } = pwt
test('fails-1', async () => {
console.log('\\n%%fails-1-started');
await new Promise(f => setTimeout(f, 2000));
console.log('\\n%%fails-1-done');
expect(1).toBe(2);
});
test('passes-1', async () => {
console.log('\\n%%passes-1');
});
`,
'b.spec.js': `
const { test } = pwt
test('passes-2', async () => {
await new Promise(f => setTimeout(f, 1000));
console.log('\\n%%passes-2-started');
await new Promise(f => setTimeout(f, 3000));
console.log('\\n%%passes-2-done');
});
`,
}, { workers: 2 });
expect(result.exitCode).toBe(1);
expect(result.passed).toBe(2);
expect(result.failed).toBe(1);
expect(stripAscii(result.output).split('\n').filter(line => line.startsWith('%%'))).toEqual([
'%%fails-1-started',
'%%passes-2-started',
'%%fails-1-done',
'%%passes-1',
'%%passes-2-done',
]);
});