1 /**
2  * IO related functions
3  */
4 
5 module unit_threaded.runner.io;
6 
7 import unit_threaded.from;
8 
9 /**
10  * Write if debug output was enabled.
11  */
12 void writelnUt(T...)(auto ref T args) {
13     debug {
14         import unit_threaded.runner.testcase: TestCase;
15         if(isDebugOutputEnabled)
16             TestCase.currentTest.getWriter.writeln(args);
17     }
18 }
19 
20 
21 
22 private shared(bool) _debugOutput = false; ///print debug msgs?
23 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway?
24 package bool _useEscCodes;
25 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"];
26 
27 
28 
29 package bool shouldUseEscCodes() {
30     version (Posix) {
31         import std.stdio: stdout;
32         import core.sys.posix.unistd: isatty;
33         return _forceEscCodes || isatty(stdout.fileno()) != 0;
34     } else
35           return false;
36 }
37 
38 
39 void enableDebugOutput(bool value = true) nothrow {
40     synchronized {
41         _debugOutput = value;
42     }
43 }
44 
45 package bool isDebugOutputEnabled() nothrow @trusted {
46     synchronized {
47         return _debugOutput;
48     }
49 }
50 
51 package void forceEscCodes() nothrow {
52     synchronized {
53         _forceEscCodes = true;
54     }
55 }
56 
57 interface Output {
58     void send(in string output) @safe;
59     void flush() @safe;
60 }
61 
62 private enum Colour {
63     red,
64     green,
65     yellow,
66     cancel,
67 }
68 
69 private string colour(alias C)(in string msg) {
70     return escCode(C) ~ msg ~ escCode(Colour.cancel);
71 }
72 
73 private alias green = colour!(Colour.green);
74 private alias red = colour!(Colour.red);
75 private alias yellow = colour!(Colour.yellow);
76 
77 /**
78  * Send escape code to the console
79  */
80 private string escCode(in Colour code) @safe {
81     return _useEscCodes ? _escCodes[code] : "";
82 }
83 
84 
85 /**
86  * Writes the args in a thread-safe manner.
87  */
88 void write(T...)(Output output, auto ref T args) {
89     import std.conv: text;
90     output.send(text(args));
91 }
92 
93 /**
94  * Writes the args in a thread-safe manner and appends a newline.
95  */
96 void writeln(T...)(Output output, auto ref T args) {
97     write(output, args, "\n");
98 }
99 
100 /**
101  * Writes the args in a thread-safe manner in green (POSIX only).
102  * and appends a newline.
103  */
104 void writelnGreen(T...)(Output output, auto ref T args) {
105     import std.conv: text;
106     output.send(green(text(args) ~ "\n"));
107 }
108 
109 /**
110  * Writes the args in a thread-safe manner in red (POSIX only)
111  * and appends a newline.
112  */
113 void writelnRed(T...)(Output output, auto ref T args) {
114     writeRed(output, args, "\n");
115 }
116 
117 /**
118  * Writes the args in a thread-safe manner in red (POSIX only).
119  * and appends a newline.
120  */
121 void writeRed(T...)(Output output, auto ref T args) {
122     import std.conv: text;
123     output.send(red(text(args)));
124 }
125 
126 /**
127  * Writes the args in a thread-safe manner in yellow (POSIX only).
128  * and appends a newline.
129  */
130 void writeYellow(T...)(Output output, auto ref T args) {
131     import std.conv: text;
132     output.send(yellow(text(args)));
133 }
134 
135 /**
136  * Thread to output to stdout
137  */
138 class WriterThread: Output {
139 
140     import std.concurrency: Tid;
141 
142 
143     /**
144      * Returns a reference to the only instance of this class.
145      */
146     static WriterThread get() @trusted {
147         import std.concurrency: initOnce;
148         static __gshared WriterThread instance;
149         return initOnce!instance(new WriterThread);
150     }
151 
152     override void send(in string output) @safe {
153 
154         version(unitUnthreaded) {
155             import std.stdio: write;
156             write(output);
157         } else {
158             import std.concurrency: send, thisTid;
159             () @trusted { _tid.send(output, thisTid); }();
160         }
161     }
162 
163     override void flush() @safe {
164         version(unitUnthreaded) {}
165         else {
166             import std.concurrency: send, thisTid;
167             () @trusted { _tid.send(Flush(), thisTid); }();
168         }
169     }
170 
171 
172 private:
173 
174     this() {
175         version(unitUnthreaded) {}
176         else {
177             import std.concurrency: spawn, thisTid, receiveOnly, send;
178             import std.stdio: stdout, stderr;
179             _tid = spawn(&threadWriter!(stdout, stderr), thisTid);
180             _tid.send(ThreadWait());
181             receiveOnly!ThreadStarted;
182         }
183     }
184 
185 
186     Tid _tid;
187 }
188 
189 
190 struct ThreadWait{};
191 struct ThreadFinish{};
192 struct ThreadStarted{};
193 struct ThreadEnded{};
194 struct Flush{};
195 
196 version (Posix) {
197     enum nullFileName = "/dev/null";
198 } else {
199     enum nullFileName = "NUL";
200 }
201 
202 
203 void threadWriter(alias OUT, alias ERR)(from!"std.concurrency".Tid tid)
204 {
205     import std.concurrency: receive, send, OwnerTerminated, Tid;
206 
207     auto done = false;
208 
209     auto saveStdout = OUT;
210     auto saveStderr = ERR;
211 
212     void restore() {
213         saveStdout.flush();
214         OUT = saveStdout;
215         ERR = saveStderr;
216     }
217 
218     scope (failure) restore;
219 
220     if (!isDebugOutputEnabled()) {
221         OUT = typeof(OUT)(nullFileName, "w");
222         ERR = typeof(ERR)(nullFileName, "w");
223     }
224 
225     void actuallyPrint(in string msg) {
226         if(msg.length) saveStdout.write(msg);
227     }
228 
229     // the first thread to send output becomes the current
230     // until that thread sends a Flush message no other thread
231     // can print to stdout, so we store their outputs in the meanwhile
232     static struct ThreadOutput {
233         string currentOutput;
234         string[] outputs;
235 
236         void store(in string msg) {
237             currentOutput ~= msg;
238         }
239 
240         void flush() {
241             outputs ~= currentOutput;
242             currentOutput = "";
243         }
244     }
245     ThreadOutput[Tid] outputs;
246 
247     Tid currentTid;
248 
249     while (!done) {
250         receive(
251             (string msg, Tid originTid) {
252 
253                 if(currentTid == currentTid.init) {
254                     currentTid = originTid;
255 
256                     // it could be that this thread became the current thread but had output not yet printed
257                     if(originTid in outputs) {
258                         actuallyPrint(outputs[originTid].currentOutput);
259                         outputs[originTid].currentOutput = "";
260                     }
261                 }
262 
263                 if(currentTid == originTid)
264                     actuallyPrint(msg);
265                 else {
266                     if(originTid !in outputs) outputs[originTid] = typeof(outputs[originTid]).init;
267                     outputs[originTid].store(msg);
268                 }
269             },
270             (ThreadWait w) {
271                 tid.send(ThreadStarted());
272             },
273             (ThreadFinish f) {
274                 done = true;
275             },
276             (Flush f, Tid originTid) {
277 
278                 if(originTid in outputs) outputs[originTid].flush;
279 
280                 if(currentTid != currentTid.init && currentTid != originTid)
281                     return;
282 
283                 foreach(_, ref threadOutput; outputs) {
284                     foreach(o; threadOutput.outputs)
285                         actuallyPrint(o);
286                     threadOutput.outputs = [];
287                 }
288 
289                 currentTid = currentTid.init;
290             },
291             (OwnerTerminated trm) {
292                 done = true;
293             }
294         );
295     }
296 
297     restore;
298     tid.send(ThreadEnded());
299 }