trial.executor.parallel 207/224(92%) line coverage

      
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
170
180
190
200
210
220
230
240
250
2610
270
280
290
3010
310
320
330
340
350
360
370
380
390
400
410
420
430
440
450
460
470
480
490
500
510
520
530
540
550
560
570
580
590
600
610
620
630
640
650
660
670
680
690
700
710
720
730
740
750
7612
7711
780
790
800
810
820
830
840
850
860
870
880
890
900
910
920
930
940
950
960
970
980
990
1000
1010
1020
1030
1040
1050
1060
1070
1080
1090
110337828
1110
1120
1130
11418
1156
1166
1176
1180
1196
1206
1210
1226
1230
1240
12533
12611
1270
1280
12933
13011
13111
1320
1330
134505821
135168607
1360
1370
13830
13910
1400
1410
14230
14310
1440
1450
1463
1471
1480
1490
150507516
151169172
1520
1530
1540
1550
1560
1570
1580
159169172
1600
161169172
162169172
163169172
1640
165169172
1660
1670
1680
1690
1700
1710
17211
17311
17411
1750
1760
1770
1780
1790
1800
1810
1820
1830
1840
1850
1866
1876
1880
1896
1900
1915
1920
1930
1940
1950
1960
1970
1980
1990
2000
2010
2020
2030
2040
2050
2067
2077
2080
2097
2100
2110
2120
2137
2147
2150
2167
2170
2180
2190
22011
2210
22211
2237
2247
2250
2260
22711
2280
2290
23017
2310
2320
23311
23411
23511
2360
23711
23811
2390
2400
2410
24211
2430
24411
2450
2460
24717
2480
2490
25011
25122
25211
2530
25411
2550
25611
25711
2580
2590
2600
26110
26210
26310
26410
2650
26610
26710
2680
26910
2700
2710
2720
27310
2740
27510
27610
27710
2780
27910
2800
2810
2820
283169172
2840
285169172
2860
287507549
28811
2890
2900
291507576
29220
29310
2940
2950
29620
29710
2980
2990
3000
301507549
30211
3030
30411
3051
3060
3070
30811
3090
3100
3111184964
312338724
3137
3140
3150
3160
317169172
3180
3190
3200
3216
3220
3230
324576
325576
326576
327576
3280
3290
3300
3310
3320
3330
33411
3350
33611
33711
3380
33911
3400
34111
34211
3430
3440
34511
3460
3471
3480
3490
3500
35111
3520
353337203
354168596
355168596
3560
3570
35811
3590
3600
3610
36251
36311
36411
36514
3660
3670
36811
3690
3700
3716
3726
3730
3740
3750
3766
3770
37839
3797
3800
3810
3820
3830
3846
3850
3866
3877
3880
3890
3906
3910
3920
3930
3940
3950
3960
3970
3980
3990
4001
4010
4020
4030
4048
40516
4068
4070
4080
4090
4102
4114
4122
4130
4140
4150
4160
4170
4180
4190
4200
4210
4220
4230
4240
4250
4260
4270
4280
4290
4300
4311
4320
4331
4341
4351
4361
4370
4381
4391
4400
4412
4422
4430
4442
4452
4462
4472
4480
4490
4500
4510
4520
4531
4540
4551
4561
4571
4581
4590
4601
4611
4620
4632
4642
4650
4662
4672
4682
4692
4700
4710
4720
4730
4740
4751
4760
4771
4780
479192
4800
4810
4820
4831
4840
4851
4861
4871
4880
4891
4901
4910
4921
4930
4942
4950
4960
4970
4980
4990
5001
5010
5021
5031
5041
5051
5060
5071
5080
5092
5102
5110
5122
5130
5140
5150
5160
5170
5181
5190
5201
5211
5221
5230
5241
5250
5261
5270
5282
5292
5300
5312
5320
5330
5340
5350
5360
5370
5380
5391
5401
5411
5420
5432
5440
5450
5460
5472
5480
5490
5500
5512
5520
5530
5540
5552
5560
5570
5580
5592
5600
5610
5620
5632
5640
5650
5660
5671
5680
5691
5701
5711
5720
5731
5741
5750
5761
5770
5782
5790
5802
5810
/++ A module containing the parallel test runner Copyright: © 2017 Szabo Bogdan License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. Authors: Szabo Bogdan +/ module trial.executor.parallel; public import trial.interfaces; import trial.runner; import std.datetime; import std.exception; import std.algorithm; import std.array; import core.thread; /// The Lifecycle listener used to send data from the tests threads to /// the main thread class ThreadLifeCycleListener : LifeCycleListeners { static string currentTest; override { void begin(string suite, string test, ref StepResult step) { ThreadProxy.instance.beginStep(currentTest, step.name, step.begin); } void end(string suite, string test, ref StepResult step) { ThreadProxy.instance.endStep(currentTest, step.name, step.end); } void end(string, ref TestResult test) { assert(false, "You can not call `end` outside of the main thread"); } void begin(string, ref TestResult test) { assert(false, "You can not call `begin` outside of the main thread"); } void add(T)(T listener) { assert(false, "You can not call `add` outside of the main thread"); } void begin(ulong) { assert(false, "You can not call `begin` outside of the main thread"); } void end(SuiteResult[] result) { assert(false, "You can not call `end` outside of the main thread"); } void begin(ref SuiteResult suite) { assert(false, "You can not call `begin` outside of the main thread"); } void end(ref SuiteResult suite) { assert(false, "You can not call `end` outside of the main thread"); } SuiteResult[] execute(ref const(TestCase)) { assert(false, "You can not call `execute` outside of the main thread"); } SuiteResult[] beginExecution(ref const(TestCase)[]) { assert(false, "You can not call `beginExecution` outside of the main thread"); } SuiteResult[] endExecution() { assert(false, "You can not call `endExecution` outside of the main thread"); } } } static ~this() { if(ThreadLifeCycleListener.currentTest != "") { ThreadProxy.instance.end(ThreadLifeCycleListener.currentTest); } } private { import core.atomic; struct StepAction { enum Type { begin, end } string test; string name; SysTime time; Type type; } synchronized class ThreadProxy { private shared static ThreadProxy _instance = new shared ThreadProxy; shared { private { string[] beginTests; string[] endTests; StepAction[] steps; Throwable[string] failures; ulong testCount; } static { shared(ThreadProxy) instance() { return _instance; } } void reset() { beginTests = []; endTests = []; steps = []; failures.clear; failures.rehash; testCount = 0; } void begin(string name) { beginTests ~= name; } void end(string name) { core.atomic.atomicOp!"+="(this.testCount, 1); endTests ~= name; } auto getTestCount() { return testCount; } void beginStep(shared(string) testName, string stepName, SysTime begin) { steps ~= StepAction(testName, stepName, begin, StepAction.Type.begin); } void endStep(shared(string) testName, string stepName, SysTime end) { steps ~= StepAction(testName, stepName, end, StepAction.Type.end); } void setFailure(string key, shared(Throwable) t) { failures[key] = t; } auto getStatus() { struct Status { string[] begin; StepAction[] steps; string[] end; Throwable[string] failures; ulong testCount; } auto status = shared Status(beginTests.dup, steps.dup, endTests.dup, failures, testCount); beginTests = []; steps = []; endTests = []; return status; } } } } private void testThreadSetup(string testName) { ThreadLifeCycleListener.currentTest = testName; LifeCycleListeners.instance = new ThreadLifeCycleListener; ThreadProxy.instance.begin(testName); } /// The parallel executors runs tests in a sepparate thread class ParallelExecutor : ITestExecutor { struct SuiteStats { SuiteResult result; ulong testsFinished; bool isDone; } this(uint maxTestCount = 0) { this.maxTestCount = maxTestCount; if(this.maxTestCount <= 0) { import core.cpuid : threadsPerCPU; this.maxTestCount = threadsPerCPU; } } private { ulong testCount; uint maxTestCount; string currentSuite = ""; SuiteStats[string] suiteStats; TestCase[string] testCases; StepResult[][string] stepStack; void addSuiteResult(string name) { suiteStats[name].result.begin = Clock.currTime; suiteStats[name].result.end = Clock.currTime; LifeCycleListeners.instance.begin(suiteStats[name].result); } void endSuiteResult(string name) { suiteStats[name].result.end = Clock.currTime; suiteStats[name].isDone = true; LifeCycleListeners.instance.end(suiteStats[name].result); } void addTestResult(string key) { auto testCase = testCases[key]; if(currentSuite != testCase.suiteName) { addSuiteResult(testCase.suiteName); currentSuite = testCase.suiteName; } auto testResult = suiteStats[testCase.suiteName] .result .tests .filter!(a => a.name == testCase.name) .front; testResult.begin = Clock.currTime; testResult.end = Clock.currTime; testResult.status = TestResult.Status.started; LifeCycleListeners.instance.begin(testCase.suiteName, testResult); stepStack[key] = [ testResult ]; } void endTestResult(string key, Throwable t) { auto testCase = testCases[key]; auto testResult = suiteStats[testCase.suiteName] .result .tests .filter!(a => a.name == testCase.name) .front; testResult.end = Clock.currTime; testResult.status = t is null ? TestResult.Status.success : TestResult.Status.failure; testResult.throwable = t; suiteStats[testCases[key].suiteName].testsFinished++; LifeCycleListeners.instance.end(testCases[key].suiteName, testResult); stepStack.remove(key); } void addStep(string key, string name, SysTime time) { auto step = new StepResult; step.name = name; step.begin = time; step.end = time; stepStack[key][stepStack[key].length - 1].steps ~= step; stepStack[key] ~= step; LifeCycleListeners.instance.begin(testCases[key].suiteName, testCases[key].name, step); } void endStep(string key, string name, SysTime time) { auto step = stepStack[key][stepStack[key].length - 1]; enforce(step.name == name, "unexpected step name"); step.end = time; stepStack[key] ~= stepStack[key][0..$-1]; LifeCycleListeners.instance.end(testCases[key].suiteName, testCases[key].name, step); } auto processEvents() { LifeCycleListeners.instance.update; auto status = ThreadProxy.instance.getStatus; foreach(beginKey; status.begin) { addTestResult(beginKey); } foreach(step; status.steps) { if(step.type == StepAction.Type.begin) { addStep(step.test, step.name, step.time); } if(step.type == StepAction.Type.end) { endStep(step.test, step.name, step.time); } } foreach(endKey; status.end) { Throwable failure = null; if(endKey in status.failures) { failure = cast() status.failures[endKey]; } endTestResult(endKey, failure); } foreach(ref index, ref stat; suiteStats.values) { if(!stat.isDone && stat.result.tests.length == stat.testsFinished) { endSuiteResult(stat.result.name); } } return status.testCount; } void wait() { ulong executedTestCount; do { LifeCycleListeners.instance.update(); executedTestCount = processEvents; Thread.sleep(1.msecs); } while(executedTestCount < testCount); } } SuiteResult[] execute(ref const(TestCase) testCase) { import std.parallelism; SuiteResult[] result; auto key = testCase.suiteName ~ "|" ~ testCase.name; testCases[key] = TestCase(testCase); testCount++; task({ testThreadSetup(key); try { testCase.func(); } catch(Throwable t) { ThreadProxy.instance.setFailure(key, cast(shared)t); } }).executeInNewThread(); auto runningTests = testCount - ThreadProxy.instance.getTestCount; while(maxTestCount <= runningTests && runningTests > 0) { processEvents; runningTests = testCount - ThreadProxy.instance.getTestCount; } return result; } SuiteResult[] beginExecution(ref const(TestCase)[] tests) { foreach(test; tests) { auto const suite = test.suiteName; if(suite !in suiteStats) { suiteStats[suite] = SuiteStats(SuiteResult(suite)); } suiteStats[suite].result.tests ~= new TestResult(test.name); } ThreadProxy.instance.reset(); return []; } SuiteResult[] endExecution() { wait; foreach(stat; suiteStats.values) { if(!stat.isDone) { endSuiteResult(stat.result.name); } } SuiteResult[] results; foreach(stat; suiteStats) { results ~= stat.result; } return results; } } version(unittest) { import fluent.asserts; import trial.step; import trial.runner; void failMock() @system { assert(false); } void stepMock1() @system { Thread.sleep(100.msecs); auto a = Step("some step"); executed = true; } void stepMock2() @system { Thread.sleep(200.msecs); auto a = Step("some step"); executed = true; } void stepMock3() @system { Thread.sleep(120.msecs); auto a = Step("some step"); executed = true; for(int i=0; i<3; i++) { Thread.sleep(120.msecs); stepFunction(i); Thread.sleep(120.msecs); } } } @("A parallel executor should get the result of a success test") unittest { TestCase[] tests = [ TestCase("suite1", "test1", &stepMock1)]; auto old = LifeCycleListeners.instance; scope(exit) LifeCycleListeners.instance = old; LifeCycleListeners.instance = new LifeCycleListeners; LifeCycleListeners.instance.add(new ParallelExecutor); auto begin = Clock.currTime; auto result = tests.runTests; result.length.should.equal(1); result[0].name.should.equal("suite1"); result[0].tests.length.should.equal(1); result[0].tests.length.should.equal(1); result[0].tests[0].status.should.equal(TestResult.Status.success); (result[0].tests[0].throwable is null).should.equal(true); } @("A parallel executor should get the result of a failing test") unittest { TestCase[] tests = [ TestCase("suite1", "test1", &failMock)]; auto old = LifeCycleListeners.instance; scope(exit) LifeCycleListeners.instance = old; LifeCycleListeners.instance = new LifeCycleListeners; LifeCycleListeners.instance.add(new ParallelExecutor); auto begin = Clock.currTime; auto result = tests.runTests; result.length.should.equal(1); result[0].name.should.equal("suite1"); result[0].tests.length.should.equal(1); result[0].tests.length.should.equal(1); result[0].tests[0].status.should.equal(TestResult.Status.failure); (result[0].tests[0].throwable !is null).should.equal(true); } @("it should call update() many times") unittest { ulong updated = 0; class MockListener : ILifecycleListener { void begin(ulong) {} void update() { updated++; } void end(SuiteResult[]) {} } TestCase[] tests = [ TestCase("suite2", "test1", &stepMock1) ]; auto old = LifeCycleListeners.instance; scope(exit) LifeCycleListeners.instance = old; LifeCycleListeners.instance = new LifeCycleListeners; LifeCycleListeners.instance.add(new MockListener); LifeCycleListeners.instance.add(new ParallelExecutor); auto results = tests.runTests; updated.should.be.greaterThan(50); } @("it should run the tests in parallel") unittest { TestCase[] tests = [ TestCase("suite2", "test1", &stepMock1), TestCase("suite2", "test3", &stepMock1), TestCase("suite2", "test2", &stepMock1) ]; auto old = LifeCycleListeners.instance; scope(exit) LifeCycleListeners.instance = old; LifeCycleListeners.instance = new LifeCycleListeners; LifeCycleListeners.instance.add(new ParallelExecutor); auto results = tests.runTests; results.length.should.equal(1); results[0].tests.length.should.equal(3); (results[0].end - results[0].begin).should.be.between(90.msecs, 120.msecs); } @("it should be able to limit the parallel tests number") unittest { TestCase[] tests = [ TestCase("suite2", "test1", &stepMock1), TestCase("suite2", "test3", &stepMock1), TestCase("suite2", "test2", &stepMock1) ]; auto old = LifeCycleListeners.instance; scope(exit) LifeCycleListeners.instance = old; LifeCycleListeners.instance = new LifeCycleListeners; LifeCycleListeners.instance.add(new ParallelExecutor(2)); auto results = tests.runTests; results.length.should.equal(1); results[0].tests.length.should.equal(3); (results[0].end - results[0].begin).should.be.between(200.msecs, 250.msecs); } @("A parallel executor should call the events in the right order") unittest { import core.thread; executed = false; string[] steps; class MockListener : IStepLifecycleListener, ITestCaseLifecycleListener, ISuiteLifecycleListener { void begin(string suite, string test, ref StepResult step) { steps ~= [ suite ~ "." ~ test ~ ".stepBegin " ~ step.name ]; } void end(string suite, string test, ref StepResult step) { steps ~= [ suite ~ "." ~ test ~ ".stepEnd " ~ step.name ]; } void begin(string suite, ref TestResult test) { steps ~= [ suite ~ ".testBegin " ~ test.name ]; } void end(string suite, ref TestResult test) { steps ~= [ suite ~ ".testEnd " ~ test.name ]; } void begin(ref SuiteResult suite) { steps ~= [ "begin " ~ suite.name ]; } void end(ref SuiteResult suite) { steps ~= [ "end " ~ suite.name ]; } } TestCase[] tests = [ TestCase("suite1", "test1", &stepMock1), TestCase("suite2","test2", &stepMock2) ]; auto old = LifeCycleListeners.instance; scope(exit) LifeCycleListeners.instance = old; LifeCycleListeners.instance = new LifeCycleListeners; LifeCycleListeners.instance.add(new MockListener); LifeCycleListeners.instance.add(new ParallelExecutor); auto results = tests.runTests; executed.should.equal(true); steps.should.contain(["begin suite1", "suite1.testBegin test1", "begin suite2", "suite2.testBegin test2", "suite1.test1.stepBegin some step", "suite1.test1.stepEnd some step", "suite2.test2.stepBegin some step", "suite2.test2.stepEnd some step", "suite1.testEnd test1", "suite2.testEnd test2", "end suite2", "end suite1"]); }