trial.executor.parallel 127/137(92%) line coverage

      
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
170
180
190
200
210
220
230
240
250
260
270
280
290
307
310
320
330
347
350
360
370
380
390
400
410
420
430
440
450
460
470
480
490
500
510
520
530
540
550
560
570
580
590
600
610
620
630
640
650
660
670
680
690
700
710
720
730
740
750
760
770
780
790
809
818
820
830
840
850
860
870
880
890
900
910
920
930
940
950
960
970
980
990
1000
1010
1020
1030
1040
1050
1060
1070
1080
1090
1100
1110
1120
1130
114193059
1150
1160
1170
11815
1195
1205
1215
1220
1235
1245
1250
1265
1270
1280
12924
1308
1310
1320
13324
1348
1358
1360
1370
138288834
13996278
1400
1410
14221
1437
1440
1450
14621
1477
1480
1490
1503
1511
1520
1530
154290235
15596745
1560
1570
1580
1590
1600
1610
1620
16396745
1640
16596745
16696745
16796745
1680
16996745
1700
1710
1720
1730
1740
1750
1768
1778
1788
1790
1800
1810
1820
1830
1840
1850
1860
1870
1880
1890
1905
1915
1920
1935
1940
1954
1960
1970
1980
1990
2000
2010
2020
2030
2040
2050
2060
2070
2080
2090
2106
2116
2120
2136
2140
2150
2160
2176
2186
2190
2206
2210
2220
2230
2248
2250
2268
2276
2286
2290
2300
2318
2320
2330
23411
2350
2360
2378
2388
2398
2400
2418
2428
2430
2440
2450
2468
2470
2488
2490
2500
25111
2520
2530
2548
25516
2568
2570
2588
2590
2608
2618
2620
2630
2640
2657
2667
2677
2687
2690
2707
2717
2720
2737
2740
2750
2760
2777
2780
2797
2807
2817
2820
2837
2840
2850
2860
28796745
2880
28996745
2900
291290259
2928
2930
2940
295290277
29614
2977
2980
2990
30014
3017
3020
3030
3040
305290259
3068
3070
3088
3091
3100
3110
3128
3130
3140
315677971
316193867
3176
3180
3190
3200
32196745
3220
3230
3240
3255
3260
3270
328475
329475
330475
331475
3320
3330
3340
3350
3360
3370
3388
3390
3408
3418
3420
3438
3440
3458
3468
3470
3480
3498
3500
3511
3520
3530
3540
3558
3560
357192548
35896270
35996270
3600
3610
3628
3630
3640
3650
36639
3678
3688
36912
3700
3710
3728
3730
3740
3755
3765
3770
3780
3790
3805
3810
38233
3836
3840
3850
3860
3870
3885
3890
3905
3916
3920
3930
3945
3950
3960
/++ A module containing the parallel test runner Copyright: © 2017 Szabo Bogdan License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. Authors: Szabo Bogdan +/ module trial.executor.parallel; public import trial.interfaces; import std.datetime; import std.exception; import std.algorithm; import std.array; import core.thread; version(unittest) { version(Have_fluent_asserts) { import fluent.asserts; } } /// The Lifecycle listener used to send data from the tests threads to /// the main thread class ThreadLifeCycleListener : LifeCycleListeners { static string currentTest; override { void begin(string suite, string test, ref StepResult step) { ThreadProxy.instance.beginStep(currentTest, step.name, step.begin); } void end(string suite, string test, ref StepResult step) { ThreadProxy.instance.endStep(currentTest, step.name, step.end); } void end(string, ref TestResult test) { assert(false, "You can not call `end` outside of the main thread"); } void begin(string, ref TestResult test) { assert(false, "You can not call `begin` outside of the main thread"); } void add(T)(T listener) { assert(false, "You can not call `add` outside of the main thread"); } void begin(ulong) { assert(false, "You can not call `begin` outside of the main thread"); } void end(SuiteResult[] result) { assert(false, "You can not call `end` outside of the main thread"); } void begin(ref SuiteResult suite) { assert(false, "You can not call `begin` outside of the main thread"); } void end(ref SuiteResult suite) { assert(false, "You can not call `end` outside of the main thread"); } SuiteResult[] execute(ref const(TestCase)) { assert(false, "You can not call `execute` outside of the main thread"); } SuiteResult[] beginExecution(ref const(TestCase)[]) { assert(false, "You can not call `beginExecution` outside of the main thread"); } SuiteResult[] endExecution() { assert(false, "You can not call `endExecution` outside of the main thread"); } } } static ~this() { if(ThreadLifeCycleListener.currentTest != "") { ThreadProxy.instance.end(ThreadLifeCycleListener.currentTest); } } private { import core.atomic; struct StepAction { enum Type { begin, end } string test; string name; SysTime time; Type type; } synchronized class ThreadProxy { private shared static ThreadProxy _instance = new shared ThreadProxy; shared { private { string[] beginTests; string[] endTests; StepAction[] steps; Throwable[string] failures; ulong testCount; } static { shared(ThreadProxy) instance() { return _instance; } } void reset() { beginTests = []; endTests = []; steps = []; failures.clear; failures.rehash; testCount = 0; } void begin(string name) { beginTests ~= name; } void end(string name) { core.atomic.atomicOp!"+="(this.testCount, 1); endTests ~= name; } auto getTestCount() { return testCount; } void beginStep(shared(string) testName, string stepName, SysTime begin) { steps ~= StepAction(testName, stepName, begin, StepAction.Type.begin); } void endStep(shared(string) testName, string stepName, SysTime end) { steps ~= StepAction(testName, stepName, end, StepAction.Type.end); } void setFailure(string key, shared(Throwable) t) { failures[key] = t; } auto getStatus() { struct Status { string[] begin; StepAction[] steps; string[] end; Throwable[string] failures; ulong testCount; } auto status = shared Status(beginTests.dup, steps.dup, endTests.dup, failures, testCount); beginTests = []; steps = []; endTests = []; return status; } } } } private void testThreadSetup(string testName) { ThreadLifeCycleListener.currentTest = testName; LifeCycleListeners.instance = new ThreadLifeCycleListener; ThreadProxy.instance.begin(testName); } /// The parallel executors runs tests in a sepparate thread class ParallelExecutor : ITestExecutor { struct SuiteStats { SuiteResult result; ulong testsFinished; bool isDone; } this(uint maxTestCount = 0) { this.maxTestCount = maxTestCount; if(this.maxTestCount <= 0) { import core.cpuid : threadsPerCPU; this.maxTestCount = threadsPerCPU; } } private { ulong testCount; uint maxTestCount; string currentSuite = ""; SuiteStats[string] suiteStats; TestCase[string] testCases; StepResult[][string] stepStack; void addSuiteResult(string name) { suiteStats[name].result.begin = Clock.currTime; suiteStats[name].result.end = Clock.currTime; LifeCycleListeners.instance.begin(suiteStats[name].result); } void endSuiteResult(string name) { suiteStats[name].result.end = Clock.currTime; suiteStats[name].isDone = true; LifeCycleListeners.instance.end(suiteStats[name].result); } void addTestResult(string key) { auto testCase = testCases[key]; if(currentSuite != testCase.suiteName) { addSuiteResult(testCase.suiteName); currentSuite = testCase.suiteName; } auto testResult = suiteStats[testCase.suiteName] .result .tests .filter!(a => a.name == testCase.name) .front; testResult.begin = Clock.currTime; testResult.end = Clock.currTime; testResult.status = TestResult.Status.started; LifeCycleListeners.instance.begin(testCase.suiteName, testResult); stepStack[key] = [ testResult ]; } void endTestResult(string key, Throwable t) { auto testCase = testCases[key]; auto testResult = suiteStats[testCase.suiteName] .result .tests .filter!(a => a.name == testCase.name) .front; testResult.end = Clock.currTime; testResult.status = t is null ? TestResult.Status.success : TestResult.Status.failure; testResult.throwable = t; suiteStats[testCases[key].suiteName].testsFinished++; LifeCycleListeners.instance.end(testCases[key].suiteName, testResult); stepStack.remove(key); } void addStep(string key, string name, SysTime time) { auto step = new StepResult; step.name = name; step.begin = time; step.end = time; stepStack[key][stepStack[key].length - 1].steps ~= step; stepStack[key] ~= step; LifeCycleListeners.instance.begin(testCases[key].suiteName, testCases[key].name, step); } void endStep(string key, string name, SysTime time) { auto step = stepStack[key][stepStack[key].length - 1]; enforce(step.name == name, "unexpected step name"); step.end = time; stepStack[key] ~= stepStack[key][0..$-1]; LifeCycleListeners.instance.end(testCases[key].suiteName, testCases[key].name, step); } auto processEvents() { LifeCycleListeners.instance.update; auto status = ThreadProxy.instance.getStatus; foreach(beginKey; status.begin) { addTestResult(beginKey); } foreach(step; status.steps) { if(step.type == StepAction.Type.begin) { addStep(step.test, step.name, step.time); } if(step.type == StepAction.Type.end) { endStep(step.test, step.name, step.time); } } foreach(endKey; status.end) { Throwable failure = null; if(endKey in status.failures) { failure = cast() status.failures[endKey]; } endTestResult(endKey, failure); } foreach(ref index, ref stat; suiteStats.values) { if(!stat.isDone && stat.result.tests.length == stat.testsFinished) { endSuiteResult(stat.result.name); } } return status.testCount; } void wait() { ulong executedTestCount; do { LifeCycleListeners.instance.update(); executedTestCount = processEvents; Thread.sleep(1.msecs); } while(executedTestCount < testCount); } } SuiteResult[] execute(ref const(TestCase) testCase) { import std.parallelism; SuiteResult[] result; auto key = testCase.suiteName ~ "|" ~ testCase.name; testCases[key] = TestCase(testCase); testCount++; task({ testThreadSetup(key); try { testCase.func(); } catch(Throwable t) { ThreadProxy.instance.setFailure(key, cast(shared)t); } }).executeInNewThread(); auto runningTests = testCount - ThreadProxy.instance.getTestCount; while(maxTestCount <= runningTests && runningTests > 0) { processEvents; runningTests = testCount - ThreadProxy.instance.getTestCount; } return result; } SuiteResult[] beginExecution(ref const(TestCase)[] tests) { foreach(test; tests) { auto const suite = test.suiteName; if(suite !in suiteStats) { suiteStats[suite] = SuiteStats(SuiteResult(suite)); } suiteStats[suite].result.tests ~= new TestResult(test.name); } ThreadProxy.instance.reset(); return []; } SuiteResult[] endExecution() { wait; foreach(stat; suiteStats.values) { if(!stat.isDone) { endSuiteResult(stat.result.name); } } SuiteResult[] results; foreach(stat; suiteStats) { results ~= stat.result; } return results; } }