1 /**
2  * IO related functions
3  */
4 
5 module unit_threaded.io;
6 
7 import std.concurrency: Tid;
8 
9 /**
10  * Write if debug output was enabled.
11  */
12 void writelnUt(T...)(auto ref T args) {
13     import unit_threaded.testcase: TestCase;
14     if(isDebugOutputEnabled)
15         TestCase.currentTest.getWriter.writeln(args);
16 }
17 
18 
19 unittest {
20     import unit_threaded.testcase: TestCase;
21     import unit_threaded.should;
22     import std.string: splitLines;
23 
24     enableDebugOutput(false);
25     class TestOutput: Output {
26         string output;
27         override void send(in string output) {
28             import std.conv: text;
29             this.output ~= output;
30         }
31 
32         override void flush() {}
33     }
34 
35     class PrintTest: TestCase {
36         override void test() {
37             writelnUt("foo", "bar");
38         }
39         override string getPath() @safe pure nothrow const {
40             return "PrintTest";
41         }
42     }
43 
44     auto test = new PrintTest;
45     auto writer = new TestOutput;
46     test.setOutput(writer);
47     test();
48 
49     writer.output.splitLines.shouldEqual(
50         [
51             "PrintTest:",
52         ]
53     );
54 }
55 
56 unittest {
57     import unit_threaded.should;
58     import unit_threaded.testcase: TestCase;
59     import unit_threaded.reflection: TestData;
60     import unit_threaded.factory: createTestCase;
61     import std.traits: fullyQualifiedName;
62     import std.string: splitLines;
63 
64     enableDebugOutput;
65     scope(exit) enableDebugOutput(false);
66 
67     class TestOutput: Output {
68         string output;
69         override void send(in string output) {
70             import std.conv: text;
71             this.output ~= output;
72         }
73 
74         override void flush() {}
75     }
76 
77     class PrintTest: TestCase {
78         override void test() {
79             writelnUt("foo", "bar");
80         }
81         override string getPath() @safe pure nothrow const {
82             return "PrintTest";
83         }
84     }
85 
86     auto test = new PrintTest;
87     auto writer = new TestOutput;
88     test.setOutput(writer);
89     test();
90 
91     writer.output.splitLines.shouldEqual(
92         [
93             "PrintTest:",
94             "foobar",
95         ]
96     );
97 }
98 
99 private shared(bool) _debugOutput = false; ///print debug msgs?
100 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway?
101 bool _useEscCodes;
102 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"];
103 
104 
105 static this() {
106     version (Posix) {
107         import std.stdio: stdout;
108         import core.sys.posix.unistd: isatty;
109         _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0;
110     }
111 }
112 
113 
114 package void enableDebugOutput(bool value = true) nothrow {
115     synchronized {
116         _debugOutput = value;
117     }
118 }
119 
120 package bool isDebugOutputEnabled() nothrow @trusted {
121     synchronized {
122         return _debugOutput;
123     }
124 }
125 
126 package void forceEscCodes() nothrow {
127     synchronized {
128         _forceEscCodes = true;
129     }
130 }
131 
132 interface Output {
133     void send(in string output) @safe;
134     void flush() @safe;
135 }
136 
137 private enum Colour {
138     red,
139     green,
140     yellow,
141     cancel,
142 }
143 
144 private string colour(alias C)(in string msg) {
145     return escCode(C) ~ msg ~ escCode(Colour.cancel);
146 }
147 
148 private alias green = colour!(Colour.green);
149 private alias red = colour!(Colour.red);
150 private alias yellow = colour!(Colour.yellow);
151 
152 /**
153  * Send escape code to the console
154  */
155 private string escCode(in Colour code) @safe {
156     return _useEscCodes ? _escCodes[code] : "";
157 }
158 
159 
160 /**
161  * Writes the args in a thread-safe manner.
162  */
163 void write(T...)(Output output, auto ref T args) {
164     import std.conv: text;
165     output.send(text(args));
166 }
167 
168 /**
169  * Writes the args in a thread-safe manner and appends a newline.
170  */
171 void writeln(T...)(Output output, auto ref T args) {
172     write(output, args, "\n");
173 }
174 
175 /**
176  * Writes the args in a thread-safe manner in green (POSIX only).
177  * and appends a newline.
178  */
179 void writelnGreen(T...)(Output output, auto ref T args) {
180     import std.conv: text;
181     output.send(green(text(args) ~ "\n"));
182 }
183 
184 /**
185  * Writes the args in a thread-safe manner in red (POSIX only)
186  * and appends a newline.
187  */
188 void writelnRed(T...)(Output output, auto ref T args) {
189     writeRed(output, args, "\n");
190 }
191 
192 /**
193  * Writes the args in a thread-safe manner in red (POSIX only).
194  * and appends a newline.
195  */
196 void writeRed(T...)(Output output, auto ref T args) {
197     import std.conv: text;
198     output.send(red(text(args)));
199 }
200 
201 /**
202  * Writes the args in a thread-safe manner in yellow (POSIX only).
203  * and appends a newline.
204  */
205 void writeYellow(T...)(Output output, auto ref T args) {
206     import std.conv: text;
207     output.send(yellow(text(args)));
208 }
209 
210 /**
211  * Thread to output to stdout
212  */
213 class WriterThread: Output {
214 
215     import std.concurrency: Tid;
216 
217     /**
218      * Returns a reference to the only instance of this class.
219      */
220     static WriterThread get() @trusted {
221         if (!_instantiated) {
222             synchronized {
223                 if (_instance is null) {
224                     _instance = new WriterThread;
225                 }
226                 _instantiated = true;
227             }
228         }
229         return _instance;
230     }
231 
232 
233     override void send(in string output) @safe {
234 
235         version(unitUnthreaded) {
236             import std.stdio: write;
237             write(output);
238         } else {
239             import std.concurrency: send, thisTid;
240             () @trusted { _tid.send(output, thisTid); }();
241         }
242     }
243 
244     override void flush() @safe {
245         version(unitUnthreaded) {}
246         else {
247             import std.concurrency: send;
248             () @trusted { _tid.send(Flush()); }();
249         }
250     }
251 
252     /**
253      * Creates the singleton instance and waits until it's ready.
254      */
255     static void start() {
256         version(unitUnthreaded) {}
257         else {
258             import std.concurrency: send, receiveOnly;
259             WriterThread.get._tid.send(ThreadWait());
260             receiveOnly!ThreadStarted;
261         }
262     }
263 
264     /**
265      * Waits for the writer thread to terminate.
266      */
267     void join() {
268         version(unitUnthreaded) {}
269         else {
270             import std.concurrency: send, receiveOnly;
271             _tid.send(ThreadFinish()); //tell it to join
272             receiveOnly!ThreadEnded;
273             _instance = null;
274             _instantiated = false;
275         }
276     }
277 
278 private:
279 
280     this() {
281         version(unitUnthreaded) {}
282         else {
283             import std.concurrency: spawn, thisTid;
284             import std.stdio: stdout, stderr;
285             _tid = spawn(&threadWriter!(stdout, stderr), thisTid);
286         }
287     }
288 
289 
290     Tid _tid;
291 
292     static bool _instantiated; /// Thread local
293     __gshared WriterThread _instance;
294 }
295 
296 unittest
297 {
298     //make sure this can be brought up and down again
299     WriterThread.get.join;
300     WriterThread.get.join;
301 }
302 
303 private struct ThreadWait{};
304 private struct ThreadFinish{};
305 private struct ThreadStarted{};
306 private struct ThreadEnded{};
307 private struct Flush{};
308 
309 version (Posix) {
310     enum nullFileName = "/dev/null";
311 } else {
312     enum nullFileName = "NUL";
313 }
314 
315 shared bool gBool;
316 private void threadWriter(alias OUT, alias ERR)(Tid tid)
317 {
318     import std.concurrency: receive, send, OwnerTerminated;
319 
320     auto done = false;
321 
322     auto saveStdout = OUT;
323     auto saveStderr = ERR;
324 
325     void restore() {
326         saveStdout.flush();
327         OUT = saveStdout;
328         ERR = saveStderr;
329     }
330 
331     scope (failure) restore;
332 
333     if (!isDebugOutputEnabled()) {
334         OUT = typeof(OUT)(nullFileName, "w");
335         ERR = typeof(ERR)(nullFileName, "w");
336     }
337 
338     // the first thread to send output becomes the current
339     // until that thread sends a Flush message no other thread
340     // can print to stdout, so we store their outputs in the meanwhile
341     string[Tid] outputs;
342     Tid currentTid;
343 
344     void actuallyPrint(in string msg) {
345         if(msg.length) saveStdout.write(msg);
346     }
347 
348     while (!done) {
349         receive(
350             (string msg, Tid originTid) {
351                 if(currentTid == currentTid.init)
352                     currentTid = originTid;
353 
354                 if(currentTid == originTid)
355                     actuallyPrint(msg);
356                 else
357                     outputs[originTid] ~= msg;
358             },
359             (ThreadWait w) {
360                 tid.send(ThreadStarted());
361             },
362             (ThreadFinish f) {
363                 done = true;
364             },
365             (Flush f) {
366 
367                 foreach(t, output; outputs)
368                     actuallyPrint(output);
369 
370                 outputs = outputs.init;
371                 currentTid = currentTid.init;
372             },
373             (OwnerTerminated trm) {
374                 done = true;
375             }
376         );
377     }
378 
379     restore;
380     tid.send(ThreadEnded());
381 }
382 
383 version(testing_unit_threaded) {
384     struct FakeFile {
385         string fileName;
386         string mode;
387         string output;
388         void flush() shared {}
389         void write(string s) shared {
390             output ~= s;
391         }
392         string[] lines() shared const @safe pure {
393             import std.string: splitLines;
394             return output.splitLines;
395         }
396     }
397     shared FakeFile gOut;
398     shared FakeFile gErr;
399     void resetFakeFiles() {
400         gOut = FakeFile("out", "mode");
401         gErr = FakeFile("err", "mode");
402     }
403 }
404 
405 unittest {
406     import std.concurrency: spawn, thisTid, send, receiveOnly;
407     import unit_threaded.should;
408 
409     enableDebugOutput(false);
410     resetFakeFiles;
411 
412     auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
413     tid.send(ThreadWait());
414     receiveOnly!ThreadStarted;
415 
416     gOut.shouldEqual(shared FakeFile(nullFileName, "w"));
417     gErr.shouldEqual(shared FakeFile(nullFileName, "w"));
418 
419     tid.send(ThreadFinish());
420     receiveOnly!ThreadEnded;
421 }
422 
423 unittest {
424     import std.concurrency: spawn, send, thisTid, receiveOnly;
425     import unit_threaded.should;
426 
427     enableDebugOutput(true);
428     scope(exit) enableDebugOutput(false);
429     resetFakeFiles;
430 
431     auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
432     tid.send(ThreadWait());
433     receiveOnly!ThreadStarted;
434 
435     gOut.shouldEqual(shared FakeFile("out", "mode"));
436     gErr.shouldEqual(shared FakeFile("err", "mode"));
437 
438     tid.send(ThreadFinish());
439     receiveOnly!ThreadEnded;
440 }
441 
442 unittest {
443     import std.concurrency: spawn, thisTid, send, receiveOnly;
444     import unit_threaded.should;
445 
446     resetFakeFiles;
447 
448     auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
449     tid.send(ThreadWait());
450     receiveOnly!ThreadStarted;
451 
452     tid.send("foobar\n", thisTid);
453     tid.send("toto\n", thisTid);
454     gOut.output.shouldBeEmpty; // since it writes to the old gOut
455 
456     tid.send(ThreadFinish());
457     receiveOnly!ThreadEnded;
458 
459     // gOut is restored so the output should be here
460     gOut.lines.shouldEqual(
461         [
462             "foobar",
463             "toto",
464         ]
465     );
466 }
467 
468 
469 version(testing_unit_threaded) {
470     void otherThread(Tid writerTid, Tid testTid) {
471         import std.concurrency: send, receiveOnly, OwnerTerminated, thisTid;
472         try {
473             writerTid.send("what about me?\n", thisTid);
474             testTid.send(true);
475             receiveOnly!bool;
476             writerTid.send("seriously, what about me?\n", thisTid);
477             testTid.send(true);
478             receiveOnly!bool;
479             writerTid.send("final attempt\n", thisTid);
480             testTid.send(true);
481             receiveOnly!bool;
482         } catch(OwnerTerminated ex) {}
483     }
484 }
485 
486 unittest {
487     import std.concurrency: spawn, thisTid, send, receiveOnly;
488     import unit_threaded.should;
489 
490     resetFakeFiles;
491 
492     auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid);
493     writerTid.send(ThreadWait());
494     receiveOnly!ThreadStarted;
495 
496     writerTid.send("foobar\n", thisTid);
497     auto otherTid = spawn(&otherThread, writerTid, thisTid);
498     receiveOnly!bool; //wait for otherThread 1st message
499     writerTid.send("toto\n", thisTid);
500     otherTid.send(true); //tell otherThread to continue
501     receiveOnly!bool; //wait for otherThread 2nd message
502     writerTid.send("last one from me\n", thisTid);
503     writerTid.send(Flush()); //finish with our output
504     otherTid.send(true); //finish
505     receiveOnly!bool;
506 
507     writerTid.send(ThreadFinish());
508     receiveOnly!ThreadEnded;
509 
510     // gOut is restored so the output should be here
511     // the output should also be serialised despite
512     // sending messages from two threads
513     gOut.lines.shouldEqual(
514         [
515             "foobar",
516             "toto",
517             "last one from me",
518             "what about me?",
519             "seriously, what about me?",
520             "final attempt",
521         ]
522     );
523 }