1 /**
2  * IO related functions
3  */
4 
5 module unit_threaded.io;
6 
7 import std.concurrency;
8 import std.stdio;
9 import std.conv;
10 
11 /**
12  * Write if debug output was enabled.
13  */
14 void writelnUt(T...)(T args) {
15     import std.conv: text;
16     import unit_threaded: TestCase;
17     if(isDebugOutputEnabled)
18         TestCase.currentTest.getWriter.writeln(text(args));
19 }
20 
21 unittest {
22     import unit_threaded.testcase: TestCase;
23     import unit_threaded.should;
24     import std.string: splitLines;
25 
26     enableDebugOutput(false);
27     class TestOutput: Output {
28         string output;
29         override void send(in string output) {
30             import std.conv: text;
31             this.output ~= output;
32         }
33 
34         override void flush() {}
35     }
36 
37     class PrintTest: TestCase {
38         override void test() {
39             writelnUt("foo", "bar");
40         }
41         override string getPath() @safe pure nothrow const {
42             return "PrintTest";
43         }
44     }
45 
46     auto test = new PrintTest;
47     auto writer = new TestOutput;
48     test.setOutput(writer);
49     test();
50 
51     writer.output.splitLines.shouldEqual(
52         [
53             "PrintTest:",
54         ]
55     );
56 }
57 
58 unittest {
59     import unit_threaded.should;
60     import unit_threaded.testcase: TestCase;
61     import unit_threaded.reflection: TestData;
62     import unit_threaded.factory: createTestCase;
63     import std.traits: fullyQualifiedName;
64     import std.string: splitLines;
65 
66     enableDebugOutput;
67     scope(exit) enableDebugOutput(false);
68 
69     class TestOutput: Output {
70         string output;
71         override void send(in string output) {
72             import std.conv: text;
73             this.output ~= output;
74         }
75 
76         override void flush() {}
77     }
78 
79     class PrintTest: TestCase {
80         override void test() {
81             writelnUt("foo", "bar");
82         }
83         override string getPath() @safe pure nothrow const {
84             return "PrintTest";
85         }
86     }
87 
88     auto test = new PrintTest;
89     auto writer = new TestOutput;
90     test.setOutput(writer);
91     test();
92 
93     writer.output.splitLines.shouldEqual(
94         [
95             "PrintTest:",
96             "foobar",
97         ]
98     );
99 }
100 
101 private shared(bool) _debugOutput = false; ///print debug msgs?
102 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway?
103 bool _useEscCodes;
104 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"];
105 
106 
107 static this() {
108     version (Posix) {
109         import std.stdio: stdout;
110         import core.sys.posix.unistd: isatty;
111         _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0;
112     }
113 }
114 
115 
116 package void enableDebugOutput(bool value = true) nothrow {
117     synchronized {
118         _debugOutput = value;
119     }
120 }
121 
122 package bool isDebugOutputEnabled() nothrow {
123     synchronized {
124         return _debugOutput;
125     }
126 }
127 
128 package void forceEscCodes() nothrow {
129     synchronized {
130         _forceEscCodes = true;
131     }
132 }
133 
134 interface Output {
135     void send(in string output);
136     void flush();
137 }
138 
139 private enum Colour {
140     red,
141     green,
142     yellow,
143     cancel,
144 }
145 
146 private string colour(alias C)(in string msg) {
147     return escCode(C) ~ msg ~ escCode(Colour.cancel);
148 }
149 
150 private alias green = colour!(Colour.green);
151 private alias red = colour!(Colour.red);
152 private alias yellow = colour!(Colour.yellow);
153 
154 /**
155  * Send escape code to the console
156  */
157 private string escCode(in Colour code) @safe {
158     return _useEscCodes ? _escCodes[code] : "";
159 }
160 
161 
162 /**
163  * Writes the args in a thread-safe manner.
164  */
165 void write(T...)(Output output, T args) {
166     output.send(text(args));
167 }
168 
169 /**
170  * Writes the args in a thread-safe manner and appends a newline.
171  */
172 void writeln(T...)(Output output, T args) {
173     write(output, args, "\n");
174 }
175 
176 /**
177  * Writes the args in a thread-safe manner in green (POSIX only).
178  * and appends a newline.
179  */
180 void writelnGreen(T...)(Output output, T args) {
181     output.send(green(text(args) ~ "\n"));
182 }
183 
184 /**
185  * Writes the args in a thread-safe manner in red (POSIX only)
186  * and appends a newline.
187  */
188 void writelnRed(T...)(Output output, T args) {
189     output.send(red(text(args) ~ "\n"));
190 }
191 
192 /**
193  * Writes the args in a thread-safe manner in red (POSIX only).
194  * and appends a newline.
195  */
196 void writeRed(T...)(Output output, T args) {
197     output.send(red(text(args)));
198 }
199 
200 /**
201  * Writes the args in a thread-safe manner in yellow (POSIX only).
202  * and appends a newline.
203  */
204 void writeYellow(T...)(Output output, T args) {
205     output.send(yellow(text(args)));
206 }
207 
208 /**
209  * Thread to output to stdout
210  */
211 class WriterThread: Output {
212     /**
213      * Returns a reference to the only instance of this class.
214      */
215     static WriterThread get() {
216         if (!_instantiated) {
217             synchronized {
218                 if (_instance is null) {
219                     _instance = new WriterThread;
220                 }
221                 _instantiated = true;
222             }
223         }
224         return _instance;
225     }
226 
227 
228     override void send(in string output) {
229         version(unitUnthreaded) {
230             import std.stdio: write;
231             write(output);
232         } else
233               _tid.send(output, thisTid);
234     }
235 
236     override void flush() {
237         version(unitUnthreaded) {}
238         else _tid.send(Flush());
239     }
240 
241     /**
242      * Creates the singleton instance and waits until it's ready.
243      */
244     static void start() {
245         version(unitUnthreaded) {}
246         else {
247             WriterThread.get._tid.send(ThreadWait());
248             receiveOnly!ThreadStarted;
249         }
250     }
251 
252     /**
253      * Waits for the writer thread to terminate.
254      */
255     void join() {
256         version(unitUnthreaded) {}
257         else {
258             _tid.send(ThreadFinish()); //tell it to join
259             receiveOnly!ThreadEnded;
260             _instance = null;
261             _instantiated = false;
262         }
263     }
264 
265 private:
266 
267     this() {
268         version(unitUnthreaded) {}
269         else
270             _tid = spawn(&threadWriter!(stdout, stderr), thisTid);
271     }
272 
273 
274     Tid _tid;
275 
276     static bool _instantiated; /// Thread local
277     __gshared WriterThread _instance;
278 }
279 
280 unittest
281 {
282     //make sure this can be brought up and down again
283     WriterThread.get.join;
284     WriterThread.get.join;
285 }
286 
287 private struct ThreadWait{};
288 private struct ThreadFinish{};
289 private struct ThreadStarted{};
290 private struct ThreadEnded{};
291 private struct Flush{};
292 
293 version (Posix) {
294     enum nullFileName = "/dev/null";
295 } else {
296     enum nullFileName = "NUL";
297 }
298 
299 shared bool gBool;
300 private void threadWriter(alias OUT, alias ERR)(Tid tid)
301 {
302     auto done = false;
303 
304     auto saveStdout = OUT;
305     auto saveStderr = ERR;
306 
307     void restore() {
308         saveStdout.flush();
309         OUT = saveStdout;
310         ERR = saveStderr;
311     }
312 
313     scope (failure) restore;
314 
315     if (!isDebugOutputEnabled()) {
316         OUT = typeof(OUT)(nullFileName, "w");
317         ERR = typeof(ERR)(nullFileName, "w");
318     }
319 
320     // the first thread to send output becomes the current
321     // until that thread sends a Flush message no other thread
322     // can print to stdout, so we store their outputs in the meanwhile
323     string[Tid] outputs;
324     Tid currentTid;
325 
326     void actuallyPrint(in string msg) {
327         if(msg.length) saveStdout.write(msg);
328     }
329 
330     while (!done) {
331         receive(
332             (string msg, Tid originTid) {
333                 if(currentTid == currentTid.init)
334                     currentTid = originTid;
335 
336                 if(currentTid == originTid)
337                     actuallyPrint(msg);
338                 else
339                     outputs[originTid] ~= msg;
340             },
341             (ThreadWait w) {
342                 tid.send(ThreadStarted());
343             },
344             (ThreadFinish f) {
345                 done = true;
346             },
347             (Flush f) {
348 
349                 foreach(t, output; outputs)
350                     actuallyPrint(output);
351 
352                 outputs = outputs.init;
353                 currentTid = currentTid.init;
354             },
355             (OwnerTerminated trm) {
356                 done = true;
357             }
358         );
359     }
360 
361     restore;
362     tid.send(ThreadEnded());
363 }
364 
365 version(testing_unit_threaded) {
366     struct FakeFile {
367         string fileName;
368         string mode;
369         string output;
370         void flush() shared {}
371         void write(string s) shared {
372             output ~= s;
373         }
374         string[] lines() shared const @safe pure {
375             import std.string: splitLines;
376             return output.splitLines;
377         }
378     }
379     shared FakeFile gOut;
380     shared FakeFile gErr;
381     void resetFakeFiles() {
382         gOut = FakeFile("out", "mode");
383         gErr = FakeFile("err", "mode");
384     }
385 }
386 
387 unittest {
388     import std.concurrency: spawn;
389     import unit_threaded.should;
390 
391     enableDebugOutput(false);
392     resetFakeFiles;
393 
394     auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
395     tid.send(ThreadWait());
396     receiveOnly!ThreadStarted;
397 
398     gOut.shouldEqual(shared FakeFile(nullFileName, "w"));
399     gErr.shouldEqual(shared FakeFile(nullFileName, "w"));
400 
401     tid.send(ThreadFinish());
402     receiveOnly!ThreadEnded;
403 }
404 
405 unittest {
406     import std.concurrency: spawn;
407     import unit_threaded.should;
408 
409     enableDebugOutput(true);
410     scope(exit) enableDebugOutput(false);
411     resetFakeFiles;
412 
413     auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
414     tid.send(ThreadWait());
415     receiveOnly!ThreadStarted;
416 
417     gOut.shouldEqual(shared FakeFile("out", "mode"));
418     gErr.shouldEqual(shared FakeFile("err", "mode"));
419 
420     tid.send(ThreadFinish());
421     receiveOnly!ThreadEnded;
422 }
423 
424 unittest {
425     import std.concurrency: spawn;
426     import unit_threaded.should;
427 
428     resetFakeFiles;
429 
430     auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
431     tid.send(ThreadWait());
432     receiveOnly!ThreadStarted;
433 
434     tid.send("foobar\n", thisTid);
435     tid.send("toto\n", thisTid);
436     gOut.output.shouldBeEmpty; // since it writes to the old gOut
437 
438     tid.send(ThreadFinish());
439     receiveOnly!ThreadEnded;
440 
441     // gOut is restored so the output should be here
442     gOut.lines.shouldEqual(
443         [
444             "foobar",
445             "toto",
446         ]
447     );
448 }
449 
450 
451 version(testing_unit_threaded) {
452     void otherThread(Tid writerTid, Tid testTid) {
453         try {
454             writerTid.send("what about me?\n", thisTid);
455             testTid.send(true);
456             receiveOnly!bool;
457             writerTid.send("seriously, what about me?\n", thisTid);
458             testTid.send(true);
459             receiveOnly!bool;
460             writerTid.send("final attempt\n", thisTid);
461             testTid.send(true);
462             receiveOnly!bool;
463         } catch(OwnerTerminated ex) {}
464     }
465 }
466 
467 unittest {
468     import std.concurrency: spawn;
469     import unit_threaded.should;
470 
471     resetFakeFiles;
472 
473     auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid);
474     writerTid.send(ThreadWait());
475     receiveOnly!ThreadStarted;
476 
477     writerTid.send("foobar\n", thisTid);
478     auto otherTid = spawn(&otherThread, writerTid, thisTid);
479     receiveOnly!bool; //wait for otherThread 1st message
480     writerTid.send("toto\n", thisTid);
481     otherTid.send(true); //tell otherThread to continue
482     receiveOnly!bool; //wait for otherThread 2nd message
483     writerTid.send("last one from me\n", thisTid);
484     writerTid.send(Flush()); //finish with our output
485     otherTid.send(true); //finish
486     receiveOnly!bool;
487 
488     writerTid.send(ThreadFinish());
489     receiveOnly!ThreadEnded;
490 
491     // gOut is restored so the output should be here
492     // the output should also be serialised despite
493     // sending messages from two threads
494     gOut.lines.shouldEqual(
495         [
496             "foobar",
497             "toto",
498             "last one from me",
499             "what about me?",
500             "seriously, what about me?",
501             "final attempt",
502         ]
503     );
504 }