1 /**
2  * IO related functions
3  */
4 
5 module unit_threaded.io;
6 
7 import std.concurrency: Tid;
8 
9 /**
10  * Write if debug output was enabled.
11  */
12 void writelnUt(T...)(auto ref T args) {
13     import unit_threaded.testcase: TestCase;
14     if(isDebugOutputEnabled)
15         TestCase.currentTest.getWriter.writeln(args);
16 }
17 
18 
19 unittest {
20     import unit_threaded.testcase: TestCase;
21     import unit_threaded.should;
22     import std..string: splitLines;
23 
24     enableDebugOutput(false);
25     class TestOutput: Output {
26         string output;
27         override void send(in string output) {
28             import std.conv: text;
29             this.output ~= output;
30         }
31 
32         override void flush() {}
33     }
34 
35     class PrintTest: TestCase {
36         override void test() {
37             writelnUt("foo", "bar");
38         }
39         override string getPath() @safe pure nothrow const {
40             return "PrintTest";
41         }
42     }
43 
44     auto test = new PrintTest;
45     auto writer = new TestOutput;
46     test.setOutput(writer);
47     test();
48 
49     writer.output.splitLines.shouldEqual(
50         [
51             "PrintTest:",
52         ]
53     );
54 }
55 
56 unittest {
57     import unit_threaded.should;
58     import unit_threaded.testcase: TestCase;
59     import unit_threaded.reflection: TestData;
60     import unit_threaded.factory: createTestCase;
61     import std.traits: fullyQualifiedName;
62     import std..string: splitLines;
63 
64     enableDebugOutput;
65     scope(exit) enableDebugOutput(false);
66 
67     class TestOutput: Output {
68         string output;
69         override void send(in string output) {
70             import std.conv: text;
71             this.output ~= output;
72         }
73 
74         override void flush() {}
75     }
76 
77     class PrintTest: TestCase {
78         override void test() {
79             writelnUt("foo", "bar");
80         }
81         override string getPath() @safe pure nothrow const {
82             return "PrintTest";
83         }
84     }
85 
86     auto test = new PrintTest;
87     auto writer = new TestOutput;
88     test.setOutput(writer);
89     test();
90 
91     writer.output.splitLines.shouldEqual(
92         [
93             "PrintTest:",
94             "foobar",
95         ]
96     );
97 }
98 
99 private shared(bool) _debugOutput = false; ///print debug msgs?
100 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway?
101 bool _useEscCodes;
102 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"];
103 
104 
105 static this() {
106     version (Posix) {
107         import std.stdio: stdout;
108         import core.sys.posix.unistd: isatty;
109         _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0;
110     }
111 }
112 
113 
114 package void enableDebugOutput(bool value = true) nothrow {
115     synchronized {
116         _debugOutput = value;
117     }
118 }
119 
120 package bool isDebugOutputEnabled() nothrow @trusted {
121     synchronized {
122         return _debugOutput;
123     }
124 }
125 
126 package void forceEscCodes() nothrow {
127     synchronized {
128         _forceEscCodes = true;
129     }
130 }
131 
132 interface Output {
133     void send(in string output) @safe;
134     void flush() @safe;
135 }
136 
137 private enum Colour {
138     red,
139     green,
140     yellow,
141     cancel,
142 }
143 
144 private string colour(alias C)(in string msg) {
145     return escCode(C) ~ msg ~ escCode(Colour.cancel);
146 }
147 
148 private alias green = colour!(Colour.green);
149 private alias red = colour!(Colour.red);
150 private alias yellow = colour!(Colour.yellow);
151 
152 /**
153  * Send escape code to the console
154  */
155 private string escCode(in Colour code) @safe {
156     return _useEscCodes ? _escCodes[code] : "";
157 }
158 
159 
160 /**
161  * Writes the args in a thread-safe manner.
162  */
163 void write(T...)(Output output, auto ref T args) {
164     import std.conv: text;
165     output.send(text(args));
166 }
167 
168 /**
169  * Writes the args in a thread-safe manner and appends a newline.
170  */
171 void writeln(T...)(Output output, auto ref T args) {
172     write(output, args, "\n");
173 }
174 
175 /**
176  * Writes the args in a thread-safe manner in green (POSIX only).
177  * and appends a newline.
178  */
179 void writelnGreen(T...)(Output output, auto ref T args) {
180     import std.conv: text;
181     output.send(green(text(args) ~ "\n"));
182 }
183 
184 /**
185  * Writes the args in a thread-safe manner in red (POSIX only)
186  * and appends a newline.
187  */
188 void writelnRed(T...)(Output output, auto ref T args) {
189     writeRed(output, args, "\n");
190 }
191 
192 /**
193  * Writes the args in a thread-safe manner in red (POSIX only).
194  * and appends a newline.
195  */
196 void writeRed(T...)(Output output, auto ref T args) {
197     import std.conv: text;
198     output.send(red(text(args)));
199 }
200 
201 /**
202  * Writes the args in a thread-safe manner in yellow (POSIX only).
203  * and appends a newline.
204  */
205 void writeYellow(T...)(Output output, auto ref T args) {
206     import std.conv: text;
207     output.send(yellow(text(args)));
208 }
209 
210 /**
211  * Thread to output to stdout
212  */
213 class WriterThread: Output {
214 
215     import std.concurrency: Tid;
216 
217     /**
218      * Returns a reference to the only instance of this class.
219      */
220     static WriterThread get() @trusted {
221         if (!_instantiated) {
222             synchronized {
223                 if (_instance is null) {
224                     _instance = new WriterThread;
225                 }
226                 _instantiated = true;
227             }
228         }
229         return _instance;
230     }
231 
232 
233     override void send(in string output) @safe {
234 
235         version(unitUnthreaded) {
236             import std.stdio: write;
237             write(output);
238         } else {
239             import std.concurrency: send, thisTid;
240             () @trusted { _tid.send(output, thisTid); }();
241         }
242     }
243 
244     override void flush() @safe {
245         version(unitUnthreaded) {}
246         else {
247             import std.concurrency: send;
248             () @trusted { _tid.send(Flush()); }();
249         }
250     }
251 
252     /**
253      * Creates the singleton instance and waits until it's ready.
254      */
255     static void start() {
256         version(unitUnthreaded) {}
257         else {
258             import std.concurrency: send, receiveOnly;
259             WriterThread.get._tid.send(ThreadWait());
260             receiveOnly!ThreadStarted;
261         }
262     }
263 
264     /**
265      * Waits for the writer thread to terminate.
266      */
267     void join() {
268         version(unitUnthreaded) {}
269         else {
270             import std.concurrency: send, receiveOnly;
271             _tid.send(ThreadFinish()); //tell it to join
272             receiveOnly!ThreadEnded;
273             _instance = null;
274             _instantiated = false;
275         }
276     }
277 
278 private:
279 
280     this() {
281         version(unitUnthreaded) {}
282         else {
283             import std.concurrency: spawn, thisTid;
284             import std.stdio: stdout, stderr;
285             _tid = spawn(&threadWriter!(stdout, stderr), thisTid);
286         }
287     }
288 
289 
290     Tid _tid;
291 
292     static bool _instantiated; /// Thread local
293     __gshared WriterThread _instance;
294 }
295 
296 unittest
297 {
298     //make sure this can be brought up and down again
299     WriterThread.get.join;
300     WriterThread.get.join;
301 }
302 
303 private struct ThreadWait{};
304 private struct ThreadFinish{};
305 private struct ThreadStarted{};
306 private struct ThreadEnded{};
307 private struct Flush{};
308 
309 version (Posix) {
310     enum nullFileName = "/dev/null";
311 } else {
312     enum nullFileName = "NUL";
313 }
314 
315 shared bool gBool;
316 private void threadWriter(alias OUT, alias ERR)(Tid tid)
317 {
318     import std.concurrency: receive, send, OwnerTerminated;
319 
320     auto done = false;
321 
322     auto saveStdout = OUT;
323     auto saveStderr = ERR;
324 
325     void restore() {
326         saveStdout.flush();
327         OUT = saveStdout;
328         ERR = saveStderr;
329     }
330 
331     scope (failure) restore;
332 
333     if (!isDebugOutputEnabled()) {
334         OUT = typeof(OUT)(nullFileName, "w");
335         ERR = typeof(ERR)(nullFileName, "w");
336     }
337 
338     // the first thread to send output becomes the current
339     // until that thread sends a Flush message no other thread
340     // can print to stdout, so we store their outputs in the meanwhile
341     string[Tid] outputs;
342     Tid currentTid;
343 
344     void actuallyPrint(in string msg) {
345         if(msg.length) saveStdout.write(msg);
346     }
347 
348     while (!done) {
349         receive(
350             (string msg, Tid originTid) {
351                 if(currentTid == currentTid.init)
352                     currentTid = originTid;
353 
354                 if(currentTid == originTid)
355                     actuallyPrint(msg);
356                 else
357                     outputs[originTid] ~= msg;
358             },
359             (ThreadWait w) {
360                 tid.send(ThreadStarted());
361             },
362             (ThreadFinish f) {
363                 done = true;
364             },
365             (Flush f) {
366 
367                 foreach(t, output; outputs)
368                     actuallyPrint(output);
369 
370                 outputs = outputs.init;
371                 currentTid = currentTid.init;
372             },
373             (OwnerTerminated trm) {
374                 done = true;
375             }
376         );
377     }
378 
379     restore;
380     tid.send(ThreadEnded());
381 }
382 
383 version(testing_unit_threaded) {
384     struct FakeFile {
385         string fileName;
386         string mode;
387         string output;
388         void flush() shared {}
389         void write(string s) shared {
390             output ~= s;
391         }
392         string[] lines() shared const @safe pure {
393             import std..string: splitLines;
394             return output.splitLines;
395         }
396     }
397     shared FakeFile gOut;
398     shared FakeFile gErr;
399     void resetFakeFiles() {
400         gOut = FakeFile("out", "mode");
401         gErr = FakeFile("err", "mode");
402     }
403 
404 
405     unittest {
406         import std.concurrency: spawn, thisTid, send, receiveOnly;
407         import unit_threaded.should;
408 
409         enableDebugOutput(false);
410         resetFakeFiles;
411 
412         auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
413         tid.send(ThreadWait());
414         receiveOnly!ThreadStarted;
415 
416         gOut.shouldEqual(shared FakeFile(nullFileName, "w"));
417         gErr.shouldEqual(shared FakeFile(nullFileName, "w"));
418 
419         tid.send(ThreadFinish());
420         receiveOnly!ThreadEnded;
421     }
422 
423     unittest {
424         import std.concurrency: spawn, send, thisTid, receiveOnly;
425         import unit_threaded.should;
426 
427         enableDebugOutput(true);
428         scope(exit) enableDebugOutput(false);
429         resetFakeFiles;
430 
431         auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
432         tid.send(ThreadWait());
433         receiveOnly!ThreadStarted;
434 
435         gOut.shouldEqual(shared FakeFile("out", "mode"));
436         gErr.shouldEqual(shared FakeFile("err", "mode"));
437 
438         tid.send(ThreadFinish());
439         receiveOnly!ThreadEnded;
440     }
441 
442     unittest {
443         import std.concurrency: spawn, thisTid, send, receiveOnly;
444         import unit_threaded.should;
445 
446         resetFakeFiles;
447 
448         auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
449         tid.send(ThreadWait());
450         receiveOnly!ThreadStarted;
451 
452         tid.send("foobar\n", thisTid);
453         tid.send("toto\n", thisTid);
454         gOut.output.shouldBeEmpty; // since it writes to the old gOut
455 
456         tid.send(ThreadFinish());
457         receiveOnly!ThreadEnded;
458 
459         // gOut is restored so the output should be here
460         gOut.lines.shouldEqual(
461             [
462                 "foobar",
463                 "toto",
464                 ]
465             );
466     }
467 
468     void otherThread(Tid writerTid, Tid testTid) {
469         import std.concurrency: send, receiveOnly, OwnerTerminated, thisTid;
470         try {
471             writerTid.send("what about me?\n", thisTid);
472             testTid.send(true);
473             receiveOnly!bool;
474             writerTid.send("seriously, what about me?\n", thisTid);
475             testTid.send(true);
476             receiveOnly!bool;
477             writerTid.send("final attempt\n", thisTid);
478             testTid.send(true);
479             receiveOnly!bool;
480         } catch(OwnerTerminated ex) {}
481     }
482 
483 
484     unittest {
485         import std.concurrency: spawn, thisTid, send, receiveOnly;
486         import unit_threaded.should;
487 
488         resetFakeFiles;
489 
490         auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid);
491         writerTid.send(ThreadWait());
492         receiveOnly!ThreadStarted;
493 
494         writerTid.send("foobar\n", thisTid);
495         auto otherTid = spawn(&otherThread, writerTid, thisTid);
496         receiveOnly!bool; //wait for otherThread 1st message
497         writerTid.send("toto\n", thisTid);
498         otherTid.send(true); //tell otherThread to continue
499         receiveOnly!bool; //wait for otherThread 2nd message
500         writerTid.send("last one from me\n", thisTid);
501         writerTid.send(Flush()); //finish with our output
502         otherTid.send(true); //finish
503         receiveOnly!bool;
504 
505         writerTid.send(ThreadFinish());
506         receiveOnly!ThreadEnded;
507 
508         // gOut is restored so the output should be here
509         // the output should also be serialised despite
510         // sending messages from two threads
511         gOut.lines.shouldEqual(
512             [
513                 "foobar",
514                 "toto",
515                 "last one from me",
516                 "what about me?",
517                 "seriously, what about me?",
518                 "final attempt",
519                 ]
520             );
521     }
522 }