2021-06-06 17:09:53 -07:00
|
|
|
/**
|
|
|
|
* Copyright Microsoft Corporation. All rights reserved.
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
import child_process from 'child_process';
|
|
|
|
import path from 'path';
|
|
|
|
import { EventEmitter } from 'events';
|
|
|
|
import { RunPayload, TestBeginPayload, TestEndPayload, DonePayload, TestOutputPayload, WorkerInitParams } from './ipc';
|
2021-07-28 15:43:37 -07:00
|
|
|
import type { TestResult, Reporter } from '../../types/testReporter';
|
2021-07-27 11:04:38 -07:00
|
|
|
import { TestCase } from './test';
|
2021-06-06 17:09:53 -07:00
|
|
|
import { Loader } from './loader';
|
|
|
|
|
2021-07-27 11:04:38 -07:00
|
|
|
// TODO: use TestGroup instead of DispatcherEntry
|
2021-06-06 17:09:53 -07:00
|
|
|
type DispatcherEntry = {
|
|
|
|
runPayload: RunPayload;
|
|
|
|
hash: string;
|
|
|
|
repeatEachIndex: number;
|
|
|
|
projectIndex: number;
|
|
|
|
};
|
|
|
|
|
2021-07-27 11:04:38 -07:00
|
|
|
export type TestGroup = {
|
|
|
|
workerHash: string;
|
|
|
|
requireFile: string;
|
|
|
|
repeatEachIndex: number;
|
|
|
|
projectIndex: number;
|
|
|
|
tests: TestCase[];
|
|
|
|
};
|
|
|
|
|
2021-06-06 17:09:53 -07:00
|
|
|
export class Dispatcher {
|
|
|
|
private _workers = new Set<Worker>();
|
|
|
|
private _freeWorkers: Worker[] = [];
|
|
|
|
private _workerClaimers: (() => void)[] = [];
|
|
|
|
|
2021-07-19 14:54:18 -07:00
|
|
|
private _testById = new Map<string, { test: TestCase, result: TestResult }>();
|
2021-06-06 17:09:53 -07:00
|
|
|
private _queue: DispatcherEntry[] = [];
|
|
|
|
private _stopCallback = () => {};
|
|
|
|
readonly _loader: Loader;
|
|
|
|
private _reporter: Reporter;
|
|
|
|
private _hasWorkerErrors = false;
|
|
|
|
private _isStopped = false;
|
|
|
|
private _failureCount = 0;
|
|
|
|
|
2021-07-27 11:04:38 -07:00
|
|
|
constructor(loader: Loader, testGroups: TestGroup[], reporter: Reporter) {
|
2021-06-06 17:09:53 -07:00
|
|
|
this._loader = loader;
|
|
|
|
this._reporter = reporter;
|
|
|
|
|
2021-07-27 11:04:38 -07:00
|
|
|
this._queue = [];
|
|
|
|
for (const group of testGroups) {
|
|
|
|
const entry: DispatcherEntry = {
|
|
|
|
runPayload: {
|
|
|
|
file: group.requireFile,
|
|
|
|
entries: []
|
|
|
|
},
|
|
|
|
hash: group.workerHash,
|
|
|
|
repeatEachIndex: group.repeatEachIndex,
|
|
|
|
projectIndex: group.projectIndex,
|
|
|
|
};
|
|
|
|
for (const test of group.tests) {
|
|
|
|
const result = test._appendTestResult();
|
|
|
|
this._testById.set(test._id, { test, result });
|
2021-07-15 22:02:10 -07:00
|
|
|
entry.runPayload.entries.push({
|
2021-07-27 11:04:38 -07:00
|
|
|
retry: result.retry,
|
2021-07-15 22:02:10 -07:00
|
|
|
testId: test._id,
|
|
|
|
});
|
2021-06-06 17:09:53 -07:00
|
|
|
}
|
2021-07-27 11:04:38 -07:00
|
|
|
this._queue.push(entry);
|
2021-06-06 17:09:53 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async run() {
|
|
|
|
// Loop in case job schedules more jobs
|
|
|
|
while (this._queue.length && !this._isStopped)
|
|
|
|
await this._dispatchQueue();
|
|
|
|
}
|
|
|
|
|
|
|
|
async _dispatchQueue() {
|
|
|
|
const jobs = [];
|
|
|
|
while (this._queue.length) {
|
|
|
|
if (this._isStopped)
|
|
|
|
break;
|
|
|
|
const entry = this._queue.shift()!;
|
|
|
|
const requiredHash = entry.hash;
|
|
|
|
let worker = await this._obtainWorker(entry);
|
|
|
|
while (!this._isStopped && worker.hash && worker.hash !== requiredHash) {
|
|
|
|
worker.stop();
|
|
|
|
worker = await this._obtainWorker(entry);
|
|
|
|
}
|
|
|
|
if (this._isStopped)
|
|
|
|
break;
|
|
|
|
jobs.push(this._runJob(worker, entry));
|
|
|
|
}
|
|
|
|
await Promise.all(jobs);
|
|
|
|
}
|
|
|
|
|
|
|
|
async _runJob(worker: Worker, entry: DispatcherEntry) {
|
|
|
|
worker.run(entry.runPayload);
|
|
|
|
let doneCallback = () => {};
|
|
|
|
const result = new Promise<void>(f => doneCallback = f);
|
2021-07-07 12:04:43 -07:00
|
|
|
const doneWithJob = () => {
|
|
|
|
worker.removeListener('testBegin', onTestBegin);
|
|
|
|
worker.removeListener('testEnd', onTestEnd);
|
|
|
|
worker.removeListener('done', onDone);
|
|
|
|
worker.removeListener('exit', onExit);
|
|
|
|
doneCallback();
|
|
|
|
};
|
|
|
|
|
|
|
|
const remainingByTestId = new Map(entry.runPayload.entries.map(e => [ e.testId, e ]));
|
|
|
|
let lastStartedTestId: string | undefined;
|
|
|
|
|
|
|
|
const onTestBegin = (params: TestBeginPayload) => {
|
|
|
|
lastStartedTestId = params.testId;
|
|
|
|
};
|
|
|
|
worker.addListener('testBegin', onTestBegin);
|
|
|
|
|
|
|
|
const onTestEnd = (params: TestEndPayload) => {
|
|
|
|
remainingByTestId.delete(params.testId);
|
|
|
|
};
|
|
|
|
worker.addListener('testEnd', onTestEnd);
|
|
|
|
|
|
|
|
const onDone = (params: DonePayload) => {
|
|
|
|
let remaining = [...remainingByTestId.values()];
|
|
|
|
|
2021-06-06 17:09:53 -07:00
|
|
|
// We won't file remaining if:
|
|
|
|
// - there are no remaining
|
|
|
|
// - we are here not because something failed
|
|
|
|
// - no unrecoverable worker error
|
2021-07-07 12:04:43 -07:00
|
|
|
if (!remaining.length && !params.failedTestId && !params.fatalError) {
|
2021-06-06 17:09:53 -07:00
|
|
|
this._freeWorkers.push(worker);
|
|
|
|
this._notifyWorkerClaimer();
|
2021-07-07 12:04:43 -07:00
|
|
|
doneWithJob();
|
2021-06-06 17:09:53 -07:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// When worker encounters error, we will stop it and create a new one.
|
|
|
|
worker.stop();
|
|
|
|
|
|
|
|
const failedTestIds = new Set<string>();
|
|
|
|
|
2021-07-28 15:43:37 -07:00
|
|
|
// In case of fatal error, report first remaining test as failing with this error,
|
|
|
|
// and all others as skipped.
|
2021-06-06 17:09:53 -07:00
|
|
|
if (params.fatalError) {
|
2021-07-28 15:43:37 -07:00
|
|
|
let first = true;
|
2021-06-06 17:09:53 -07:00
|
|
|
for (const { testId } of remaining) {
|
|
|
|
const { test, result } = this._testById.get(testId)!;
|
2021-07-28 15:43:37 -07:00
|
|
|
if (this._hasReachedMaxFailures())
|
|
|
|
break;
|
2021-07-07 12:04:43 -07:00
|
|
|
// There might be a single test that has started but has not finished yet.
|
|
|
|
if (testId !== lastStartedTestId)
|
2021-07-28 15:43:37 -07:00
|
|
|
this._reporter.onTestBegin?.(test);
|
2021-06-06 17:09:53 -07:00
|
|
|
result.error = params.fatalError;
|
2021-07-28 15:43:37 -07:00
|
|
|
result.status = first ? 'failed' : 'skipped';
|
|
|
|
this._reportTestEnd(test, result);
|
2021-06-06 17:09:53 -07:00
|
|
|
failedTestIds.add(testId);
|
2021-07-28 15:43:37 -07:00
|
|
|
first = false;
|
2021-06-06 17:09:53 -07:00
|
|
|
}
|
2021-07-07 12:04:43 -07:00
|
|
|
// Since we pretend that all remaining tests failed, there is nothing else to run,
|
2021-06-06 17:09:53 -07:00
|
|
|
// except for possible retries.
|
|
|
|
remaining = [];
|
|
|
|
}
|
|
|
|
if (params.failedTestId)
|
|
|
|
failedTestIds.add(params.failedTestId);
|
|
|
|
|
|
|
|
// Only retry expected failures, not passes and only if the test failed.
|
|
|
|
for (const testId of failedTestIds) {
|
|
|
|
const pair = this._testById.get(testId)!;
|
2021-06-14 22:16:16 -07:00
|
|
|
if (!this._isStopped && pair.test.expectedStatus === 'passed' && pair.test.results.length < pair.test.retries + 1) {
|
2021-06-06 17:09:53 -07:00
|
|
|
pair.result = pair.test._appendTestResult();
|
|
|
|
remaining.unshift({
|
|
|
|
retry: pair.result.retry,
|
|
|
|
testId: pair.test._id,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (remaining.length)
|
|
|
|
this._queue.unshift({ ...entry, runPayload: { ...entry.runPayload, entries: remaining } });
|
|
|
|
|
|
|
|
// This job is over, we just scheduled another one.
|
2021-07-07 12:04:43 -07:00
|
|
|
doneWithJob();
|
|
|
|
};
|
|
|
|
worker.on('done', onDone);
|
|
|
|
|
|
|
|
const onExit = () => {
|
2021-07-28 15:43:37 -07:00
|
|
|
if (worker.didSendStop)
|
|
|
|
onDone({});
|
|
|
|
else
|
|
|
|
onDone({ fatalError: { value: 'Worker process exited unexpectedly' } });
|
2021-07-07 12:04:43 -07:00
|
|
|
};
|
|
|
|
worker.on('exit', onExit);
|
|
|
|
|
2021-06-06 17:09:53 -07:00
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
async _obtainWorker(entry: DispatcherEntry) {
|
|
|
|
const claimWorker = (): Promise<Worker> | null => {
|
|
|
|
// Use available worker.
|
|
|
|
if (this._freeWorkers.length)
|
|
|
|
return Promise.resolve(this._freeWorkers.pop()!);
|
|
|
|
// Create a new worker.
|
|
|
|
if (this._workers.size < this._loader.fullConfig().workers)
|
|
|
|
return this._createWorker(entry);
|
|
|
|
return null;
|
|
|
|
};
|
|
|
|
|
|
|
|
// Note: it is important to claim the worker synchronously,
|
|
|
|
// so that we won't miss a _notifyWorkerClaimer call while awaiting.
|
|
|
|
let worker = claimWorker();
|
|
|
|
if (!worker) {
|
|
|
|
// Wait for available or stopped worker.
|
|
|
|
await new Promise<void>(f => this._workerClaimers.push(f));
|
|
|
|
worker = claimWorker();
|
|
|
|
}
|
|
|
|
return worker!;
|
|
|
|
}
|
|
|
|
|
|
|
|
async _notifyWorkerClaimer() {
|
|
|
|
if (this._isStopped || !this._workerClaimers.length)
|
|
|
|
return;
|
|
|
|
const callback = this._workerClaimers.shift()!;
|
|
|
|
callback();
|
|
|
|
}
|
|
|
|
|
|
|
|
_createWorker(entry: DispatcherEntry) {
|
|
|
|
const worker = new Worker(this);
|
|
|
|
worker.on('testBegin', (params: TestBeginPayload) => {
|
2021-07-28 15:43:37 -07:00
|
|
|
if (this._hasReachedMaxFailures())
|
|
|
|
return;
|
2021-06-06 17:09:53 -07:00
|
|
|
const { test, result: testRun } = this._testById.get(params.testId)!;
|
|
|
|
testRun.workerIndex = params.workerIndex;
|
2021-07-18 17:40:59 -07:00
|
|
|
testRun.startTime = new Date(params.startWallTime);
|
2021-07-28 15:43:37 -07:00
|
|
|
this._reporter.onTestBegin?.(test);
|
2021-06-06 17:09:53 -07:00
|
|
|
});
|
|
|
|
worker.on('testEnd', (params: TestEndPayload) => {
|
2021-07-28 15:43:37 -07:00
|
|
|
if (this._hasReachedMaxFailures())
|
|
|
|
return;
|
2021-06-06 17:09:53 -07:00
|
|
|
const { test, result } = this._testById.get(params.testId)!;
|
|
|
|
result.duration = params.duration;
|
|
|
|
result.error = params.error;
|
2021-07-16 13:48:37 -07:00
|
|
|
result.attachments = params.attachments.map(a => ({
|
|
|
|
name: a.name,
|
|
|
|
path: a.path,
|
|
|
|
contentType: a.contentType,
|
|
|
|
body: a.body ? Buffer.from(a.body, 'base64') : undefined
|
|
|
|
}));
|
2021-07-28 15:43:37 -07:00
|
|
|
result.status = params.status;
|
2021-06-06 17:09:53 -07:00
|
|
|
test.expectedStatus = params.expectedStatus;
|
|
|
|
test.annotations = params.annotations;
|
|
|
|
test.timeout = params.timeout;
|
2021-07-28 15:43:37 -07:00
|
|
|
this._reportTestEnd(test, result);
|
2021-06-06 17:09:53 -07:00
|
|
|
});
|
|
|
|
worker.on('stdOut', (params: TestOutputPayload) => {
|
|
|
|
const chunk = chunkFromParams(params);
|
|
|
|
const pair = params.testId ? this._testById.get(params.testId) : undefined;
|
|
|
|
if (pair)
|
|
|
|
pair.result.stdout.push(chunk);
|
2021-07-16 12:40:33 -07:00
|
|
|
this._reporter.onStdOut?.(chunk, pair ? pair.test : undefined);
|
2021-06-06 17:09:53 -07:00
|
|
|
});
|
|
|
|
worker.on('stdErr', (params: TestOutputPayload) => {
|
|
|
|
const chunk = chunkFromParams(params);
|
|
|
|
const pair = params.testId ? this._testById.get(params.testId) : undefined;
|
|
|
|
if (pair)
|
|
|
|
pair.result.stderr.push(chunk);
|
2021-07-16 12:40:33 -07:00
|
|
|
this._reporter.onStdErr?.(chunk, pair ? pair.test : undefined);
|
2021-06-06 17:09:53 -07:00
|
|
|
});
|
|
|
|
worker.on('teardownError', ({error}) => {
|
|
|
|
this._hasWorkerErrors = true;
|
2021-07-16 12:40:33 -07:00
|
|
|
this._reporter.onError?.(error);
|
2021-06-06 17:09:53 -07:00
|
|
|
});
|
|
|
|
worker.on('exit', () => {
|
|
|
|
this._workers.delete(worker);
|
|
|
|
this._notifyWorkerClaimer();
|
|
|
|
if (this._stopCallback && !this._workers.size)
|
|
|
|
this._stopCallback();
|
|
|
|
});
|
|
|
|
this._workers.add(worker);
|
|
|
|
return worker.init(entry).then(() => worker);
|
|
|
|
}
|
|
|
|
|
|
|
|
async stop() {
|
|
|
|
this._isStopped = true;
|
|
|
|
if (this._workers.size) {
|
|
|
|
const result = new Promise<void>(f => this._stopCallback = f);
|
|
|
|
for (const worker of this._workers)
|
|
|
|
worker.stop();
|
|
|
|
await result;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-28 15:43:37 -07:00
|
|
|
private _hasReachedMaxFailures() {
|
2021-07-07 12:04:43 -07:00
|
|
|
const maxFailures = this._loader.fullConfig().maxFailures;
|
2021-07-28 15:43:37 -07:00
|
|
|
return maxFailures > 0 && this._failureCount >= maxFailures;
|
2021-07-07 12:04:43 -07:00
|
|
|
}
|
|
|
|
|
2021-07-28 15:43:37 -07:00
|
|
|
private _reportTestEnd(test: TestCase, result: TestResult) {
|
2021-06-06 17:09:53 -07:00
|
|
|
if (result.status !== 'skipped' && result.status !== test.expectedStatus)
|
|
|
|
++this._failureCount;
|
2021-07-28 15:43:37 -07:00
|
|
|
this._reporter.onTestEnd?.(test, result);
|
2021-06-06 17:09:53 -07:00
|
|
|
const maxFailures = this._loader.fullConfig().maxFailures;
|
|
|
|
if (maxFailures && this._failureCount === maxFailures)
|
2021-07-07 12:04:43 -07:00
|
|
|
this.stop().catch(e => {});
|
2021-06-06 17:09:53 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
hasWorkerErrors(): boolean {
|
|
|
|
return this._hasWorkerErrors;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let lastWorkerIndex = 0;
|
|
|
|
|
|
|
|
class Worker extends EventEmitter {
|
|
|
|
process: child_process.ChildProcess;
|
|
|
|
runner: Dispatcher;
|
|
|
|
hash = '';
|
|
|
|
index: number;
|
2021-07-28 15:43:37 -07:00
|
|
|
didSendStop = false;
|
2021-06-06 17:09:53 -07:00
|
|
|
|
|
|
|
constructor(runner: Dispatcher) {
|
|
|
|
super();
|
|
|
|
this.runner = runner;
|
|
|
|
this.index = lastWorkerIndex++;
|
|
|
|
|
|
|
|
this.process = child_process.fork(path.join(__dirname, 'worker.js'), {
|
|
|
|
detached: false,
|
|
|
|
env: {
|
|
|
|
FORCE_COLOR: process.stdout.isTTY ? '1' : '0',
|
|
|
|
DEBUG_COLORS: process.stdout.isTTY ? '1' : '0',
|
|
|
|
TEST_WORKER_INDEX: String(this.index),
|
|
|
|
...process.env
|
|
|
|
},
|
|
|
|
// Can't pipe since piping slows down termination for some reason.
|
|
|
|
stdio: ['ignore', 'ignore', process.env.PW_RUNNER_DEBUG ? 'inherit' : 'ignore', 'ipc']
|
|
|
|
});
|
|
|
|
this.process.on('exit', () => this.emit('exit'));
|
|
|
|
this.process.on('error', e => {}); // do not yell at a send to dead process.
|
|
|
|
this.process.on('message', (message: any) => {
|
|
|
|
const { method, params } = message;
|
|
|
|
this.emit(method, params);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
async init(entry: DispatcherEntry) {
|
|
|
|
this.hash = entry.hash;
|
|
|
|
const params: WorkerInitParams = {
|
|
|
|
workerIndex: this.index,
|
|
|
|
repeatEachIndex: entry.repeatEachIndex,
|
|
|
|
projectIndex: entry.projectIndex,
|
|
|
|
loader: this.runner._loader.serialize(),
|
|
|
|
};
|
|
|
|
this.process.send({ method: 'init', params });
|
|
|
|
await new Promise(f => this.process.once('message', f)); // Ready ack
|
|
|
|
}
|
|
|
|
|
|
|
|
run(runPayload: RunPayload) {
|
|
|
|
this.process.send({ method: 'run', params: runPayload });
|
|
|
|
}
|
|
|
|
|
|
|
|
stop() {
|
2021-07-07 12:04:43 -07:00
|
|
|
if (!this.didSendStop)
|
|
|
|
this.process.send({ method: 'stop' });
|
|
|
|
this.didSendStop = true;
|
2021-06-06 17:09:53 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function chunkFromParams(params: TestOutputPayload): string | Buffer {
|
|
|
|
if (typeof params.text === 'string')
|
|
|
|
return params.text;
|
|
|
|
return Buffer.from(params.buffer!, 'base64');
|
|
|
|
}
|