1 /**
2  * IO related functions
3  */
4 
5 module unit_threaded.io;
6 
7 import std.concurrency: Tid;
8 
9 /**
10  * Write if debug output was enabled.
11  */
12 void writelnUt(T...)(auto ref T args) {
13     import unit_threaded.testcase: TestCase;
14     if(isDebugOutputEnabled)
15         TestCase.currentTest.getWriter.writeln(args);
16 }
17 
18 
19 unittest {
20     import unit_threaded.testcase: TestCase;
21     import unit_threaded.should;
22     import std.string: splitLines;
23 
24     enableDebugOutput(false);
25     class TestOutput: Output {
26         string output;
27         override void send(in string output) {
28             import std.conv: text;
29             this.output ~= output;
30         }
31 
32         override void flush() {}
33     }
34 
35     class PrintTest: TestCase {
36         override void test() {
37             writelnUt("foo", "bar");
38         }
39         override string getPath() @safe pure nothrow const {
40             return "PrintTest";
41         }
42     }
43 
44     auto test = new PrintTest;
45     auto writer = new TestOutput;
46     test.setOutput(writer);
47     test();
48 
49     writer.output.splitLines.shouldEqual(
50         [
51             "PrintTest:",
52         ]
53     );
54 }
55 
56 unittest {
57     import unit_threaded.should;
58     import unit_threaded.testcase: TestCase;
59     import unit_threaded.reflection: TestData;
60     import unit_threaded.factory: createTestCase;
61     import std.traits: fullyQualifiedName;
62     import std.string: splitLines;
63 
64     enableDebugOutput;
65     scope(exit) enableDebugOutput(false);
66 
67     class TestOutput: Output {
68         string output;
69         override void send(in string output) {
70             import std.conv: text;
71             this.output ~= output;
72         }
73 
74         override void flush() {}
75     }
76 
77     class PrintTest: TestCase {
78         override void test() {
79             writelnUt("foo", "bar");
80         }
81         override string getPath() @safe pure nothrow const {
82             return "PrintTest";
83         }
84     }
85 
86     auto test = new PrintTest;
87     auto writer = new TestOutput;
88     test.setOutput(writer);
89     test();
90 
91     writer.output.splitLines.shouldEqual(
92         [
93             "PrintTest:",
94             "foobar",
95         ]
96     );
97 }
98 
99 private shared(bool) _debugOutput = false; ///print debug msgs?
100 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway?
101 bool _useEscCodes;
102 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"];
103 
104 
105 static this() {
106     version (Posix) {
107         import std.stdio: stdout;
108         import core.sys.posix.unistd: isatty;
109         _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0;
110     }
111 }
112 
113 
114 package void enableDebugOutput(bool value = true) nothrow {
115     synchronized {
116         _debugOutput = value;
117     }
118 }
119 
120 package bool isDebugOutputEnabled() nothrow @trusted {
121     synchronized {
122         return _debugOutput;
123     }
124 }
125 
126 package void forceEscCodes() nothrow {
127     synchronized {
128         _forceEscCodes = true;
129     }
130 }
131 
132 interface Output {
133     void send(in string output) @safe;
134     void flush() @safe;
135 }
136 
137 private enum Colour {
138     red,
139     green,
140     yellow,
141     cancel,
142 }
143 
144 private string colour(alias C)(in string msg) {
145     return escCode(C) ~ msg ~ escCode(Colour.cancel);
146 }
147 
148 private alias green = colour!(Colour.green);
149 private alias red = colour!(Colour.red);
150 private alias yellow = colour!(Colour.yellow);
151 
152 /**
153  * Send escape code to the console
154  */
155 private string escCode(in Colour code) @safe {
156     return _useEscCodes ? _escCodes[code] : "";
157 }
158 
159 
160 /**
161  * Writes the args in a thread-safe manner.
162  */
163 void write(T...)(Output output, auto ref T args) {
164     import std.conv: text;
165     output.send(text(args));
166 }
167 
168 /**
169  * Writes the args in a thread-safe manner and appends a newline.
170  */
171 void writeln(T...)(Output output, auto ref T args) {
172     write(output, args, "\n");
173 }
174 
175 /**
176  * Writes the args in a thread-safe manner in green (POSIX only).
177  * and appends a newline.
178  */
179 void writelnGreen(T...)(Output output, auto ref T args) {
180     import std.conv: text;
181     output.send(green(text(args) ~ "\n"));
182 }
183 
184 /**
185  * Writes the args in a thread-safe manner in red (POSIX only)
186  * and appends a newline.
187  */
188 void writelnRed(T...)(Output output, auto ref T args) {
189     writeRed(output, args, "\n");
190 }
191 
192 /**
193  * Writes the args in a thread-safe manner in red (POSIX only).
194  * and appends a newline.
195  */
196 void writeRed(T...)(Output output, auto ref T args) {
197     import std.conv: text;
198     output.send(red(text(args)));
199 }
200 
201 /**
202  * Writes the args in a thread-safe manner in yellow (POSIX only).
203  * and appends a newline.
204  */
205 void writeYellow(T...)(Output output, auto ref T args) {
206     import std.conv: text;
207     output.send(yellow(text(args)));
208 }
209 
210 /**
211  * Thread to output to stdout
212  */
213 class WriterThread: Output {
214 
215     import std.concurrency: Tid;
216 
217     /**
218      * Returns a reference to the only instance of this class.
219      */
220     static WriterThread get() @trusted {
221         if (!_instantiated) {
222             synchronized {
223                 if (_instance is null) {
224                     _instance = new WriterThread;
225                 }
226                 _instantiated = true;
227             }
228         }
229         return _instance;
230     }
231 
232 
233     override void send(in string output) @safe {
234 
235         version(unitUnthreaded) {
236             import std.stdio: write;
237             write(output);
238         } else {
239             import std.concurrency: send, thisTid;
240             () @trusted { _tid.send(output, thisTid); }();
241         }
242     }
243 
244     override void flush() @safe {
245         version(unitUnthreaded) {}
246         else {
247             import std.concurrency: send, thisTid;
248             () @trusted { _tid.send(Flush(), thisTid); }();
249         }
250     }
251 
252     /**
253      * Creates the singleton instance and waits until it's ready.
254      */
255     static void start() {
256         version(unitUnthreaded) {}
257         else {
258             import std.concurrency: send, receiveOnly;
259             WriterThread.get._tid.send(ThreadWait());
260             receiveOnly!ThreadStarted;
261         }
262     }
263 
264 
265     static void stop() {
266 
267         void impl() {
268             WriterThread.get.flush;
269             WriterThread.get.join;
270         }
271 
272         if (_instantiated) {
273             impl;
274             return;
275         }
276 
277         synchronized {
278             if (_instance !is null) {
279                 impl;
280             }
281         }
282     }
283 
284     /**
285      * Waits for the writer thread to terminate.
286      */
287     void join() {
288         version(unitUnthreaded) {}
289         else {
290             import std.concurrency: send, receiveOnly;
291             _tid.send(ThreadFinish()); //tell it to join
292             receiveOnly!ThreadEnded;
293             _instance = null;
294             _instantiated = false;
295         }
296     }
297 
298 private:
299 
300     this() {
301         version(unitUnthreaded) {}
302         else {
303             import std.concurrency: spawn, thisTid;
304             import std.stdio: stdout, stderr;
305             _tid = spawn(&threadWriter!(stdout, stderr), thisTid);
306         }
307     }
308 
309 
310     Tid _tid;
311 
312     static bool _instantiated; /// Thread local
313     __gshared WriterThread _instance;
314 }
315 
316 unittest
317 {
318     //make sure this can be brought up and down again
319     WriterThread.get.join;
320     WriterThread.get.join;
321 }
322 
323 private struct ThreadWait{};
324 private struct ThreadFinish{};
325 private struct ThreadStarted{};
326 private struct ThreadEnded{};
327 private struct Flush{};
328 
329 version (Posix) {
330     enum nullFileName = "/dev/null";
331 } else {
332     enum nullFileName = "NUL";
333 }
334 
335 shared bool gBool;
336 private void threadWriter(alias OUT, alias ERR)(Tid tid)
337 {
338     import std.concurrency: receive, send, OwnerTerminated;
339 
340     auto done = false;
341 
342     auto saveStdout = OUT;
343     auto saveStderr = ERR;
344 
345     void restore() {
346         saveStdout.flush();
347         OUT = saveStdout;
348         ERR = saveStderr;
349     }
350 
351     scope (failure) restore;
352 
353     if (!isDebugOutputEnabled()) {
354         OUT = typeof(OUT)(nullFileName, "w");
355         ERR = typeof(ERR)(nullFileName, "w");
356     }
357 
358     // the first thread to send output becomes the current
359     // until that thread sends a Flush message no other thread
360     // can print to stdout, so we store their outputs in the meanwhile
361     string[Tid] outputs;
362     Tid currentTid;
363 
364     void actuallyPrint(in string msg) {
365         if(msg.length) saveStdout.write(msg);
366     }
367 
368     while (!done) {
369         receive(
370             (string msg, Tid originTid) {
371                 if(currentTid == currentTid.init)
372                     currentTid = originTid;
373 
374                 if(currentTid == originTid)
375                     actuallyPrint(msg);
376                 else
377                     outputs[originTid] ~= msg;
378             },
379             (ThreadWait w) {
380                 tid.send(ThreadStarted());
381             },
382             (ThreadFinish f) {
383                 done = true;
384             },
385             (Flush f, Tid originTid) {
386 
387                 if(currentTid != currentTid.init && currentTid != originTid)
388                     return;
389 
390                 foreach(t, output; outputs)
391                     actuallyPrint(output);
392 
393                 outputs = outputs.init;
394                 currentTid = currentTid.init;
395             },
396             (OwnerTerminated trm) {
397                 done = true;
398             }
399         );
400     }
401 
402     restore;
403     tid.send(ThreadEnded());
404 }
405 
406 version(testing_unit_threaded) {
407     struct FakeFile {
408         string fileName;
409         string mode;
410         string output;
411         void flush() shared {}
412         void write(string s) shared {
413             output ~= s;
414         }
415         string[] lines() shared const @safe pure {
416             import std.string: splitLines;
417             return output.splitLines;
418         }
419     }
420     shared FakeFile gOut;
421     shared FakeFile gErr;
422     void resetFakeFiles() {
423         gOut = FakeFile("out", "mode");
424         gErr = FakeFile("err", "mode");
425     }
426 
427 
428     unittest {
429         import std.concurrency: spawn, thisTid, send, receiveOnly;
430         import unit_threaded.should;
431 
432         enableDebugOutput(false);
433         resetFakeFiles;
434 
435         auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
436         tid.send(ThreadWait());
437         receiveOnly!ThreadStarted;
438 
439         gOut.shouldEqual(shared FakeFile(nullFileName, "w"));
440         gErr.shouldEqual(shared FakeFile(nullFileName, "w"));
441 
442         tid.send(ThreadFinish());
443         receiveOnly!ThreadEnded;
444     }
445 
446     unittest {
447         import std.concurrency: spawn, send, thisTid, receiveOnly;
448         import unit_threaded.should;
449 
450         enableDebugOutput(true);
451         scope(exit) enableDebugOutput(false);
452         resetFakeFiles;
453 
454         auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
455         tid.send(ThreadWait());
456         receiveOnly!ThreadStarted;
457 
458         gOut.shouldEqual(shared FakeFile("out", "mode"));
459         gErr.shouldEqual(shared FakeFile("err", "mode"));
460 
461         tid.send(ThreadFinish());
462         receiveOnly!ThreadEnded;
463     }
464 
465     unittest {
466         import std.concurrency: spawn, thisTid, send, receiveOnly;
467         import unit_threaded.should;
468 
469         resetFakeFiles;
470 
471         auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
472         tid.send(ThreadWait());
473         receiveOnly!ThreadStarted;
474 
475         tid.send("foobar\n", thisTid);
476         tid.send("toto\n", thisTid);
477         gOut.output.shouldBeEmpty; // since it writes to the old gOut
478 
479         tid.send(ThreadFinish());
480         receiveOnly!ThreadEnded;
481 
482         // gOut is restored so the output should be here
483         gOut.lines.shouldEqual(
484             [
485                 "foobar",
486                 "toto",
487                 ]
488             );
489     }
490 
491     void otherThread(Tid writerTid, Tid testTid) {
492         import std.concurrency: send, receiveOnly, OwnerTerminated, thisTid;
493         try {
494             writerTid.send("what about me?\n", thisTid);
495             testTid.send(true);
496             receiveOnly!bool;
497 
498             writerTid.send("seriously, what about me?\n", thisTid);
499             testTid.send(true);
500             receiveOnly!bool;
501 
502             writerTid.send(Flush(), thisTid);
503             testTid.send(true);
504             receiveOnly!bool;
505 
506             writerTid.send("final attempt\n", thisTid);
507             testTid.send(true);
508 
509         } catch(OwnerTerminated ex) {}
510     }
511 
512 
513     unittest {
514         import std.concurrency: spawn, thisTid, send, receiveOnly;
515         import unit_threaded.should;
516 
517         resetFakeFiles;
518 
519         auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid);
520         writerTid.send(ThreadWait());
521         receiveOnly!ThreadStarted;
522 
523         writerTid.send("foobar\n", thisTid);
524         auto otherTid = spawn(&otherThread, writerTid, thisTid);
525         receiveOnly!bool; //wait for otherThread 1st message
526 
527         writerTid.send("toto\n", thisTid);
528         otherTid.send(true); //tell otherThread to continue
529         receiveOnly!bool; //wait for otherThread 2nd message
530 
531         writerTid.send("last one from me\n", thisTid);
532         otherTid.send(true); // tell otherThread to continue
533         receiveOnly!bool; // wait for otherThread to try and flush (won't work)
534 
535         writerTid.send(Flush(), thisTid); //finish with our output
536         otherTid.send(true); //finish
537         receiveOnly!bool; // wait for otherThread to finish
538 
539         writerTid.send(ThreadFinish());
540         receiveOnly!ThreadEnded;
541 
542         // gOut is restored so the output should be here
543         // the output should also be serialised despite
544         // sending messages from two threads
545         gOut.lines.shouldEqual(
546             [
547                 "foobar",
548                 "toto",
549                 "last one from me",
550                 "what about me?",
551                 "seriously, what about me?",
552                 "final attempt",
553                 ]
554             );
555     }
556 }