1 /**
2  * IO related functions
3  */
4 
5 module unit_threaded.io;
6 
7 import unit_threaded.from;
8 
9 /**
10  * Write if debug output was enabled.
11  */
12 void writelnUt(T...)(auto ref T args) {
13     debug {
14         import unit_threaded.testcase: TestCase;
15         if(isDebugOutputEnabled)
16             TestCase.currentTest.getWriter.writeln(args);
17     }
18 }
19 
20 
21 
22 private shared(bool) _debugOutput = false; ///print debug msgs?
23 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway?
24 bool _useEscCodes;
25 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"];
26 
27 
28 static this() {
29     version (Posix) {
30         import std.stdio: stdout;
31         import core.sys.posix.unistd: isatty;
32         _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0;
33     }
34 }
35 
36 
37 package void enableDebugOutput(bool value = true) nothrow {
38     synchronized {
39         _debugOutput = value;
40     }
41 }
42 
43 package bool isDebugOutputEnabled() nothrow @trusted {
44     synchronized {
45         return _debugOutput;
46     }
47 }
48 
49 package void forceEscCodes() nothrow {
50     synchronized {
51         _forceEscCodes = true;
52     }
53 }
54 
55 interface Output {
56     void send(in string output) @safe;
57     void flush() @safe;
58 }
59 
60 private enum Colour {
61     red,
62     green,
63     yellow,
64     cancel,
65 }
66 
67 private string colour(alias C)(in string msg) {
68     return escCode(C) ~ msg ~ escCode(Colour.cancel);
69 }
70 
71 private alias green = colour!(Colour.green);
72 private alias red = colour!(Colour.red);
73 private alias yellow = colour!(Colour.yellow);
74 
75 /**
76  * Send escape code to the console
77  */
78 private string escCode(in Colour code) @safe {
79     return _useEscCodes ? _escCodes[code] : "";
80 }
81 
82 
83 /**
84  * Writes the args in a thread-safe manner.
85  */
86 void write(T...)(Output output, auto ref T args) {
87     import std.conv: text;
88     output.send(text(args));
89 }
90 
91 /**
92  * Writes the args in a thread-safe manner and appends a newline.
93  */
94 void writeln(T...)(Output output, auto ref T args) {
95     write(output, args, "\n");
96 }
97 
98 /**
99  * Writes the args in a thread-safe manner in green (POSIX only).
100  * and appends a newline.
101  */
102 void writelnGreen(T...)(Output output, auto ref T args) {
103     import std.conv: text;
104     output.send(green(text(args) ~ "\n"));
105 }
106 
107 /**
108  * Writes the args in a thread-safe manner in red (POSIX only)
109  * and appends a newline.
110  */
111 void writelnRed(T...)(Output output, auto ref T args) {
112     writeRed(output, args, "\n");
113 }
114 
115 /**
116  * Writes the args in a thread-safe manner in red (POSIX only).
117  * and appends a newline.
118  */
119 void writeRed(T...)(Output output, auto ref T args) {
120     import std.conv: text;
121     output.send(red(text(args)));
122 }
123 
124 /**
125  * Writes the args in a thread-safe manner in yellow (POSIX only).
126  * and appends a newline.
127  */
128 void writeYellow(T...)(Output output, auto ref T args) {
129     import std.conv: text;
130     output.send(yellow(text(args)));
131 }
132 
133 /**
134  * Thread to output to stdout
135  */
136 class WriterThread: Output {
137 
138     import std.concurrency: Tid;
139 
140 
141     /**
142      * Returns a reference to the only instance of this class.
143      */
144     static WriterThread get() @trusted {
145         import std.concurrency: initOnce;
146         static __gshared WriterThread instance;
147         return initOnce!instance(new WriterThread);
148     }
149 
150     override void send(in string output) @safe {
151 
152         version(unitUnthreaded) {
153             import std.stdio: write;
154             write(output);
155         } else {
156             import std.concurrency: send, thisTid;
157             () @trusted { _tid.send(output, thisTid); }();
158         }
159     }
160 
161     override void flush() @safe {
162         version(unitUnthreaded) {}
163         else {
164             import std.concurrency: send, thisTid;
165             () @trusted { _tid.send(Flush(), thisTid); }();
166         }
167     }
168 
169 
170 private:
171 
172     this() {
173         version(unitUnthreaded) {}
174         else {
175             import std.concurrency: spawn, thisTid, receiveOnly, send;
176             import std.stdio: stdout, stderr;
177             _tid = spawn(&threadWriter!(stdout, stderr), thisTid);
178             _tid.send(ThreadWait());
179             receiveOnly!ThreadStarted;
180         }
181     }
182 
183 
184     Tid _tid;
185 }
186 
187 
188 struct ThreadWait{};
189 struct ThreadFinish{};
190 struct ThreadStarted{};
191 struct ThreadEnded{};
192 struct Flush{};
193 
194 version (Posix) {
195     enum nullFileName = "/dev/null";
196 } else {
197     enum nullFileName = "NUL";
198 }
199 
200 
201 void threadWriter(alias OUT, alias ERR)(from!"std.concurrency".Tid tid)
202 {
203     import std.concurrency: receive, send, OwnerTerminated, Tid;
204 
205     auto done = false;
206 
207     auto saveStdout = OUT;
208     auto saveStderr = ERR;
209 
210     void restore() {
211         saveStdout.flush();
212         OUT = saveStdout;
213         ERR = saveStderr;
214     }
215 
216     scope (failure) restore;
217 
218     if (!isDebugOutputEnabled()) {
219         OUT = typeof(OUT)(nullFileName, "w");
220         ERR = typeof(ERR)(nullFileName, "w");
221     }
222 
223     void actuallyPrint(in string msg) {
224         if(msg.length) saveStdout.write(msg);
225     }
226 
227     // the first thread to send output becomes the current
228     // until that thread sends a Flush message no other thread
229     // can print to stdout, so we store their outputs in the meanwhile
230     static struct ThreadOutput {
231         string currentOutput;
232         string[] outputs;
233 
234         void store(in string msg) {
235             currentOutput ~= msg;
236         }
237 
238         void flush() {
239             outputs ~= currentOutput;
240             currentOutput = "";
241         }
242     }
243     ThreadOutput[Tid] outputs;
244 
245     Tid currentTid;
246 
247     while (!done) {
248         receive(
249             (string msg, Tid originTid) {
250 
251                 if(currentTid == currentTid.init) {
252                     currentTid = originTid;
253 
254                     // it could be that this thread became the current thread but had output not yet printed
255                     if(originTid in outputs) {
256                         actuallyPrint(outputs[originTid].currentOutput);
257                         outputs[originTid].currentOutput = "";
258                     }
259                 }
260 
261                 if(currentTid == originTid)
262                     actuallyPrint(msg);
263                 else {
264                     if(originTid !in outputs) outputs[originTid] = typeof(outputs[originTid]).init;
265                     outputs[originTid].store(msg);
266                 }
267             },
268             (ThreadWait w) {
269                 tid.send(ThreadStarted());
270             },
271             (ThreadFinish f) {
272                 done = true;
273             },
274             (Flush f, Tid originTid) {
275 
276                 if(originTid in outputs) outputs[originTid].flush;
277 
278                 if(currentTid != currentTid.init && currentTid != originTid)
279                     return;
280 
281                 foreach(tid, ref threadOutput; outputs) {
282                     foreach(o; threadOutput.outputs)
283                         actuallyPrint(o);
284                     threadOutput.outputs = [];
285                 }
286 
287                 currentTid = currentTid.init;
288             },
289             (OwnerTerminated trm) {
290                 done = true;
291             }
292         );
293     }
294 
295     restore;
296     tid.send(ThreadEnded());
297 }