10203040506070809010011012013014015016017018019020021022023024025026027028029030731032033034735036037038039040041042043044045046047048049050051052053054055056057058059060061062063064065066067068069070071072073074075076077078079080981882083084085086087088089090091092093094095096097098099010001010102010301040105010601070108010901100111011201130114595906115011601170118151195120512151220123512401255126012701282412981300131013224133813481350136013789308813829769613901400141211427143014401452114671470148014931501151015201538945221540155015601570158015901600161016229817416301642981741652981741662981741670168298174169017001710172017301740175817681778178017901800181018201830184018501860187018801895190519101925193019441950196019701980199020002010202020302040205020602070208020962106211021262130214021502166217621802196220022102220223822402258226622762280229023082310232023311234023502368237823882390240824182420243024402458246024782480249025011251025202538254162558256025782580259826082610262026302647265726672677268026972707271027272730274027502767277027872797280728102827283028402850286298174287028829817428902908945462918292029302948945642951429672970298029914300730103020303030489454630583060307830813090310031183120313031417896233155967333166317031803190320298174321032203230324532503260327486328486329486330486331033203330334033503360337833803398340834103428343034483458346034703488349035013510352035303548355035659538435729768835829768835903600361836203630364036539366836783681236903700371837203730374537553760377037803795380038133382638303840385038603875388038911390639103920393539403950 /++ A module containing the parallel test runner Copyright: © 2017 Szabo Bogdan License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. Authors: Szabo Bogdan +/ module trial.executor.parallel; public import trial.interfaces; import std.datetime; import std.exception; import std.algorithm; import std.array; import core.thread; version(unittest) { version(Have_fluent_asserts) { import fluent.asserts; } } /// The Lifecycle listener used to send data from the tests threads to /// the main thread class ThreadLifeCycleListener : LifeCycleListeners { static string currentTest; override { void begin(string suite, string test, ref StepResult step) { ThreadProxy.instance.beginStep(currentTest, step.name, step.begin); } void end(string suite, string test, ref StepResult step) { ThreadProxy.instance.endStep(currentTest, step.name, step.end); } void end(string, ref TestResult test) { assert(false, "You can not call `end` outside of the main thread"); } void begin(string, ref TestResult test) { assert(false, "You can not call `begin` outside of the main thread"); } void add(T)(T listener) { assert(false, "You can not call `add` outside of the main thread"); } void begin(ulong) { assert(false, "You can not call `begin` outside of the main thread"); } void end(SuiteResult[] result) { assert(false, "You can not call `end` outside of the main thread"); } void begin(ref SuiteResult suite) { assert(false, "You can not call `begin` outside of the main thread"); } void end(ref SuiteResult suite) { assert(false, "You can not call `end` outside of the main thread"); } SuiteResult[] execute(ref const(TestCase)) { assert(false, "You can not call `execute` outside of the main thread"); } SuiteResult[] beginExecution(ref const(TestCase)[]) { assert(false, "You can not call `beginExecution` outside of the main thread"); } SuiteResult[] endExecution() { assert(false, "You can not call `endExecution` outside of the main thread"); } } } static ~this() { if(ThreadLifeCycleListener.currentTest != "") { ThreadProxy.instance.end(ThreadLifeCycleListener.currentTest); } } private { import core.atomic; struct StepAction { enum Type { begin, end } string test; string name; SysTime time; Type type; } synchronized class ThreadProxy { private shared static ThreadProxy _instance = new shared ThreadProxy; shared { private { string[] beginTests; string[] endTests; StepAction[] steps; Throwable[string] failures; ulong testCount; } static { shared(ThreadProxy) instance() { return _instance; } } void reset() { beginTests = []; endTests = []; steps = []; failures = typeof(failures).init; testCount = 0; } void begin(string name) { beginTests ~= name; } void end(string name) { core.atomic.atomicOp!"+="(this.testCount, 1); endTests ~= name; } auto getTestCount() { return testCount; } void beginStep(shared(string) testName, string stepName, SysTime begin) { steps ~= StepAction(testName, stepName, begin, StepAction.Type.begin); } void endStep(shared(string) testName, string stepName, SysTime end) { steps ~= StepAction(testName, stepName, end, StepAction.Type.end); } void setFailure(string key, shared(Throwable) t) { failures[key] = t; } auto getStatus() { struct Status { string[] begin; StepAction[] steps; string[] end; Throwable[string] failures; ulong testCount; } auto status = shared Status(beginTests.dup, steps.dup, endTests.dup, failures, testCount); beginTests = []; steps = []; endTests = []; return status; } } } } private void testThreadSetup(string testName) { ThreadLifeCycleListener.currentTest = testName; LifeCycleListeners.instance = new ThreadLifeCycleListener; ThreadProxy.instance.begin(testName); } /// The parallel executors runs tests in a sepparate thread class ParallelExecutor : ITestExecutor { struct SuiteStats { SuiteResult result; ulong testsFinished; bool isDone; } this(uint maxTestCount = 0) { this.maxTestCount = maxTestCount; if(this.maxTestCount <= 0) { import core.cpuid : threadsPerCPU; this.maxTestCount = threadsPerCPU; } } private { ulong testCount; uint maxTestCount; string currentSuite = ""; SuiteStats[string] suiteStats; TestCase[string] testCases; StepResult[][string] stepStack; void addSuiteResult(string name) { suiteStats[name].result.begin = Clock.currTime; suiteStats[name].result.end = Clock.currTime; LifeCycleListeners.instance.begin(suiteStats[name].result); } void endSuiteResult(string name) { suiteStats[name].result.end = Clock.currTime; suiteStats[name].isDone = true; LifeCycleListeners.instance.end(suiteStats[name].result); } void addTestResult(string key) { auto testCase = testCases[key]; if(currentSuite != testCase.suiteName) { addSuiteResult(testCase.suiteName); currentSuite = testCase.suiteName; } auto testResult = suiteStats[testCase.suiteName] .result .tests .filter!(a => a.name == testCase.name) .front; testResult.begin = Clock.currTime; testResult.end = Clock.currTime; testResult.status = TestResult.Status.started; LifeCycleListeners.instance.begin(testCase.suiteName, testResult); stepStack[key] = [ testResult ]; } void endTestResult(string key, Throwable t) { auto testCase = testCases[key]; auto testResult = suiteStats[testCase.suiteName] .result .tests .filter!(a => a.name == testCase.name) .front; testResult.end = Clock.currTime; testResult.status = t is null ? TestResult.Status.success : TestResult.Status.failure; testResult.throwable = t; suiteStats[testCases[key].suiteName].testsFinished++; LifeCycleListeners.instance.end(testCases[key].suiteName, testResult); stepStack.remove(key); } void addStep(string key, string name, SysTime time) { auto step = new StepResult; step.name = name; step.begin = time; step.end = time; stepStack[key][stepStack[key].length - 1].steps ~= step; stepStack[key] ~= step; LifeCycleListeners.instance.begin(testCases[key].suiteName, testCases[key].name, step); } void endStep(string key, string name, SysTime time) { auto step = stepStack[key][stepStack[key].length - 1]; enforce(step.name == name, "unexpected step name"); step.end = time; stepStack[key] ~= stepStack[key][0..$-1]; LifeCycleListeners.instance.end(testCases[key].suiteName, testCases[key].name, step); } auto processEvents() { LifeCycleListeners.instance.update; auto status = ThreadProxy.instance.getStatus; foreach(beginKey; status.begin) { addTestResult(beginKey); } foreach(step; status.steps) { if(step.type == StepAction.Type.begin) { addStep(step.test, step.name, step.time); } if(step.type == StepAction.Type.end) { endStep(step.test, step.name, step.time); } } foreach(endKey; status.end) { Throwable failure = null; if(endKey in status.failures) { failure = cast() status.failures[endKey]; } endTestResult(endKey, failure); } foreach(ref index, ref stat; suiteStats.values) { if(!stat.isDone && stat.result.tests.length == stat.testsFinished) { endSuiteResult(stat.result.name); } } return status.testCount; } void wait() { ulong executedTestCount; do { LifeCycleListeners.instance.update(); executedTestCount = processEvents; Thread.sleep(1.msecs); } while(executedTestCount < testCount); } } SuiteResult[] execute(ref const(TestCase) testCase) { import std.parallelism; SuiteResult[] result; auto key = testCase.suiteName ~ "|" ~ testCase.name; testCases[key] = TestCase(testCase); testCount++; task({ testThreadSetup(key); try { testCase.func(); } catch(Throwable t) { ThreadProxy.instance.setFailure(key, cast(shared)t); } }).executeInNewThread(); auto runningTests = testCount - ThreadProxy.instance.getTestCount; while(maxTestCount <= runningTests && runningTests > 0) { processEvents; runningTests = testCount - ThreadProxy.instance.getTestCount; } return result; } SuiteResult[] beginExecution(ref const(TestCase)[] tests) { foreach(test; tests) { auto const suite = test.suiteName; if(suite !in suiteStats) { suiteStats[suite] = SuiteStats(SuiteResult(suite)); } suiteStats[suite].result.tests ~= new TestResult(test.name); } ThreadProxy.instance.reset(); return []; } SuiteResult[] endExecution() { wait; foreach(stat; suiteStats.values) { if(!stat.isDone) { endSuiteResult(stat.result.name); } } SuiteResult[] results; foreach(stat; suiteStats) { results ~= stat.result; } return results; } }
/++ A module containing the parallel test runner Copyright: © 2017 Szabo Bogdan License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. Authors: Szabo Bogdan +/ module trial.executor.parallel; public import trial.interfaces; import std.datetime; import std.exception; import std.algorithm; import std.array; import core.thread; version(unittest) { version(Have_fluent_asserts) { import fluent.asserts; } } /// The Lifecycle listener used to send data from the tests threads to /// the main thread class ThreadLifeCycleListener : LifeCycleListeners { static string currentTest; override { void begin(string suite, string test, ref StepResult step) { ThreadProxy.instance.beginStep(currentTest, step.name, step.begin); } void end(string suite, string test, ref StepResult step) { ThreadProxy.instance.endStep(currentTest, step.name, step.end); } void end(string, ref TestResult test) { assert(false, "You can not call `end` outside of the main thread"); } void begin(string, ref TestResult test) { assert(false, "You can not call `begin` outside of the main thread"); } void add(T)(T listener) { assert(false, "You can not call `add` outside of the main thread"); } void begin(ulong) { assert(false, "You can not call `begin` outside of the main thread"); } void end(SuiteResult[] result) { assert(false, "You can not call `end` outside of the main thread"); } void begin(ref SuiteResult suite) { assert(false, "You can not call `begin` outside of the main thread"); } void end(ref SuiteResult suite) { assert(false, "You can not call `end` outside of the main thread"); } SuiteResult[] execute(ref const(TestCase)) { assert(false, "You can not call `execute` outside of the main thread"); } SuiteResult[] beginExecution(ref const(TestCase)[]) { assert(false, "You can not call `beginExecution` outside of the main thread"); } SuiteResult[] endExecution() { assert(false, "You can not call `endExecution` outside of the main thread"); } } } static ~this() { if(ThreadLifeCycleListener.currentTest != "") { ThreadProxy.instance.end(ThreadLifeCycleListener.currentTest); } } private { import core.atomic; struct StepAction { enum Type { begin, end } string test; string name; SysTime time; Type type; } synchronized class ThreadProxy { private shared static ThreadProxy _instance = new shared ThreadProxy; shared { private { string[] beginTests; string[] endTests; StepAction[] steps; Throwable[string] failures; ulong testCount; } static { shared(ThreadProxy) instance() { return _instance; } } void reset() { beginTests = []; endTests = []; steps = []; failures = typeof(failures).init; testCount = 0; } void begin(string name) { beginTests ~= name; } void end(string name) { core.atomic.atomicOp!"+="(this.testCount, 1); endTests ~= name; } auto getTestCount() { return testCount; } void beginStep(shared(string) testName, string stepName, SysTime begin) { steps ~= StepAction(testName, stepName, begin, StepAction.Type.begin); } void endStep(shared(string) testName, string stepName, SysTime end) { steps ~= StepAction(testName, stepName, end, StepAction.Type.end); } void setFailure(string key, shared(Throwable) t) { failures[key] = t; } auto getStatus() { struct Status { string[] begin; StepAction[] steps; string[] end; Throwable[string] failures; ulong testCount; } auto status = shared Status(beginTests.dup, steps.dup, endTests.dup, failures, testCount); beginTests = []; steps = []; endTests = []; return status; } } } } private void testThreadSetup(string testName) { ThreadLifeCycleListener.currentTest = testName; LifeCycleListeners.instance = new ThreadLifeCycleListener; ThreadProxy.instance.begin(testName); } /// The parallel executors runs tests in a sepparate thread class ParallelExecutor : ITestExecutor { struct SuiteStats { SuiteResult result; ulong testsFinished; bool isDone; } this(uint maxTestCount = 0) { this.maxTestCount = maxTestCount; if(this.maxTestCount <= 0) { import core.cpuid : threadsPerCPU; this.maxTestCount = threadsPerCPU; } } private { ulong testCount; uint maxTestCount; string currentSuite = ""; SuiteStats[string] suiteStats; TestCase[string] testCases; StepResult[][string] stepStack; void addSuiteResult(string name) { suiteStats[name].result.begin = Clock.currTime; suiteStats[name].result.end = Clock.currTime; LifeCycleListeners.instance.begin(suiteStats[name].result); } void endSuiteResult(string name) { suiteStats[name].result.end = Clock.currTime; suiteStats[name].isDone = true; LifeCycleListeners.instance.end(suiteStats[name].result); } void addTestResult(string key) { auto testCase = testCases[key]; if(currentSuite != testCase.suiteName) { addSuiteResult(testCase.suiteName); currentSuite = testCase.suiteName; } auto testResult = suiteStats[testCase.suiteName] .result .tests .filter!(a => a.name == testCase.name) .front; testResult.begin = Clock.currTime; testResult.end = Clock.currTime; testResult.status = TestResult.Status.started; LifeCycleListeners.instance.begin(testCase.suiteName, testResult); stepStack[key] = [ testResult ]; } void endTestResult(string key, Throwable t) { auto testCase = testCases[key]; auto testResult = suiteStats[testCase.suiteName] .result .tests .filter!(a => a.name == testCase.name) .front; testResult.end = Clock.currTime; testResult.status = t is null ? TestResult.Status.success : TestResult.Status.failure; testResult.throwable = t; suiteStats[testCases[key].suiteName].testsFinished++; LifeCycleListeners.instance.end(testCases[key].suiteName, testResult); stepStack.remove(key); } void addStep(string key, string name, SysTime time) { auto step = new StepResult; step.name = name; step.begin = time; step.end = time; stepStack[key][stepStack[key].length - 1].steps ~= step; stepStack[key] ~= step; LifeCycleListeners.instance.begin(testCases[key].suiteName, testCases[key].name, step); } void endStep(string key, string name, SysTime time) { auto step = stepStack[key][stepStack[key].length - 1]; enforce(step.name == name, "unexpected step name"); step.end = time; stepStack[key] ~= stepStack[key][0..$-1]; LifeCycleListeners.instance.end(testCases[key].suiteName, testCases[key].name, step); } auto processEvents() { LifeCycleListeners.instance.update; auto status = ThreadProxy.instance.getStatus; foreach(beginKey; status.begin) { addTestResult(beginKey); } foreach(step; status.steps) { if(step.type == StepAction.Type.begin) { addStep(step.test, step.name, step.time); } if(step.type == StepAction.Type.end) { endStep(step.test, step.name, step.time); } } foreach(endKey; status.end) { Throwable failure = null; if(endKey in status.failures) { failure = cast() status.failures[endKey]; } endTestResult(endKey, failure); } foreach(ref index, ref stat; suiteStats.values) { if(!stat.isDone && stat.result.tests.length == stat.testsFinished) { endSuiteResult(stat.result.name); } } return status.testCount; } void wait() { ulong executedTestCount; do { LifeCycleListeners.instance.update(); executedTestCount = processEvents; Thread.sleep(1.msecs); } while(executedTestCount < testCount); } } SuiteResult[] execute(ref const(TestCase) testCase) { import std.parallelism; SuiteResult[] result; auto key = testCase.suiteName ~ "|" ~ testCase.name; testCases[key] = TestCase(testCase); testCount++; task({ testThreadSetup(key); try { testCase.func(); } catch(Throwable t) { ThreadProxy.instance.setFailure(key, cast(shared)t); } }).executeInNewThread(); auto runningTests = testCount - ThreadProxy.instance.getTestCount; while(maxTestCount <= runningTests && runningTests > 0) { processEvents; runningTests = testCount - ThreadProxy.instance.getTestCount; } return result; } SuiteResult[] beginExecution(ref const(TestCase)[] tests) { foreach(test; tests) { auto const suite = test.suiteName; if(suite !in suiteStats) { suiteStats[suite] = SuiteStats(SuiteResult(suite)); } suiteStats[suite].result.tests ~= new TestResult(test.name); } ThreadProxy.instance.reset(); return []; } SuiteResult[] endExecution() { wait; foreach(stat; suiteStats.values) { if(!stat.isDone) { endSuiteResult(stat.result.name); } } SuiteResult[] results; foreach(stat; suiteStats) { results ~= stat.result; } return results; } }