1 /** 2 * IO related functions 3 */ 4 5 module unit_threaded.io; 6 7 import unit_threaded.from; 8 9 /** 10 * Write if debug output was enabled. 11 */ 12 void writelnUt(T...)(auto ref T args) { 13 debug { 14 import unit_threaded.testcase: TestCase; 15 if(isDebugOutputEnabled) 16 TestCase.currentTest.getWriter.writeln(args); 17 } 18 } 19 20 21 22 private shared(bool) _debugOutput = false; ///print debug msgs? 23 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway? 24 bool _useEscCodes; 25 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"]; 26 27 28 static this() { 29 version (Posix) { 30 import std.stdio: stdout; 31 import core.sys.posix.unistd: isatty; 32 _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0; 33 } 34 } 35 36 37 package void enableDebugOutput(bool value = true) nothrow { 38 synchronized { 39 _debugOutput = value; 40 } 41 } 42 43 package bool isDebugOutputEnabled() nothrow @trusted { 44 synchronized { 45 return _debugOutput; 46 } 47 } 48 49 package void forceEscCodes() nothrow { 50 synchronized { 51 _forceEscCodes = true; 52 } 53 } 54 55 interface Output { 56 void send(in string output) @safe; 57 void flush() @safe; 58 } 59 60 private enum Colour { 61 red, 62 green, 63 yellow, 64 cancel, 65 } 66 67 private string colour(alias C)(in string msg) { 68 return escCode(C) ~ msg ~ escCode(Colour.cancel); 69 } 70 71 private alias green = colour!(Colour.green); 72 private alias red = colour!(Colour.red); 73 private alias yellow = colour!(Colour.yellow); 74 75 /** 76 * Send escape code to the console 77 */ 78 private string escCode(in Colour code) @safe { 79 return _useEscCodes ? _escCodes[code] : ""; 80 } 81 82 83 /** 84 * Writes the args in a thread-safe manner. 85 */ 86 void write(T...)(Output output, auto ref T args) { 87 import std.conv: text; 88 output.send(text(args)); 89 } 90 91 /** 92 * Writes the args in a thread-safe manner and appends a newline. 93 */ 94 void writeln(T...)(Output output, auto ref T args) { 95 write(output, args, "\n"); 96 } 97 98 /** 99 * Writes the args in a thread-safe manner in green (POSIX only). 100 * and appends a newline. 101 */ 102 void writelnGreen(T...)(Output output, auto ref T args) { 103 import std.conv: text; 104 output.send(green(text(args) ~ "\n")); 105 } 106 107 /** 108 * Writes the args in a thread-safe manner in red (POSIX only) 109 * and appends a newline. 110 */ 111 void writelnRed(T...)(Output output, auto ref T args) { 112 writeRed(output, args, "\n"); 113 } 114 115 /** 116 * Writes the args in a thread-safe manner in red (POSIX only). 117 * and appends a newline. 118 */ 119 void writeRed(T...)(Output output, auto ref T args) { 120 import std.conv: text; 121 output.send(red(text(args))); 122 } 123 124 /** 125 * Writes the args in a thread-safe manner in yellow (POSIX only). 126 * and appends a newline. 127 */ 128 void writeYellow(T...)(Output output, auto ref T args) { 129 import std.conv: text; 130 output.send(yellow(text(args))); 131 } 132 133 /** 134 * Thread to output to stdout 135 */ 136 class WriterThread: Output { 137 138 import std.concurrency: Tid; 139 140 141 /** 142 * Returns a reference to the only instance of this class. 143 */ 144 static WriterThread get() @trusted { 145 import std.concurrency: initOnce; 146 static __gshared WriterThread instance; 147 return initOnce!instance(new WriterThread); 148 } 149 150 override void send(in string output) @safe { 151 152 version(unitUnthreaded) { 153 import std.stdio: write; 154 write(output); 155 } else { 156 import std.concurrency: send, thisTid; 157 () @trusted { _tid.send(output, thisTid); }(); 158 } 159 } 160 161 override void flush() @safe { 162 version(unitUnthreaded) {} 163 else { 164 import std.concurrency: send, thisTid; 165 () @trusted { _tid.send(Flush(), thisTid); }(); 166 } 167 } 168 169 170 private: 171 172 this() { 173 version(unitUnthreaded) {} 174 else { 175 import std.concurrency: spawn, thisTid, receiveOnly, send; 176 import std.stdio: stdout, stderr; 177 _tid = spawn(&threadWriter!(stdout, stderr), thisTid); 178 _tid.send(ThreadWait()); 179 receiveOnly!ThreadStarted; 180 } 181 } 182 183 184 Tid _tid; 185 } 186 187 188 struct ThreadWait{}; 189 struct ThreadFinish{}; 190 struct ThreadStarted{}; 191 struct ThreadEnded{}; 192 struct Flush{}; 193 194 version (Posix) { 195 enum nullFileName = "/dev/null"; 196 } else { 197 enum nullFileName = "NUL"; 198 } 199 200 201 void threadWriter(alias OUT, alias ERR)(from!"std.concurrency".Tid tid) 202 { 203 import std.concurrency: receive, send, OwnerTerminated, Tid; 204 205 auto done = false; 206 207 auto saveStdout = OUT; 208 auto saveStderr = ERR; 209 210 void restore() { 211 saveStdout.flush(); 212 OUT = saveStdout; 213 ERR = saveStderr; 214 } 215 216 scope (failure) restore; 217 218 if (!isDebugOutputEnabled()) { 219 OUT = typeof(OUT)(nullFileName, "w"); 220 ERR = typeof(ERR)(nullFileName, "w"); 221 } 222 223 void actuallyPrint(in string msg) { 224 if(msg.length) saveStdout.write(msg); 225 } 226 227 // the first thread to send output becomes the current 228 // until that thread sends a Flush message no other thread 229 // can print to stdout, so we store their outputs in the meanwhile 230 static struct ThreadOutput { 231 string currentOutput; 232 string[] outputs; 233 234 void store(in string msg) { 235 currentOutput ~= msg; 236 } 237 238 void flush() { 239 outputs ~= currentOutput; 240 currentOutput = ""; 241 } 242 } 243 ThreadOutput[Tid] outputs; 244 245 Tid currentTid; 246 247 while (!done) { 248 receive( 249 (string msg, Tid originTid) { 250 251 if(currentTid == currentTid.init) { 252 currentTid = originTid; 253 254 // it could be that this thread became the current thread but had output not yet printed 255 if(originTid in outputs) { 256 actuallyPrint(outputs[originTid].currentOutput); 257 outputs[originTid].currentOutput = ""; 258 } 259 } 260 261 if(currentTid == originTid) 262 actuallyPrint(msg); 263 else { 264 if(originTid !in outputs) outputs[originTid] = typeof(outputs[originTid]).init; 265 outputs[originTid].store(msg); 266 } 267 }, 268 (ThreadWait w) { 269 tid.send(ThreadStarted()); 270 }, 271 (ThreadFinish f) { 272 done = true; 273 }, 274 (Flush f, Tid originTid) { 275 276 if(originTid in outputs) outputs[originTid].flush; 277 278 if(currentTid != currentTid.init && currentTid != originTid) 279 return; 280 281 foreach(tid, ref threadOutput; outputs) { 282 foreach(o; threadOutput.outputs) 283 actuallyPrint(o); 284 threadOutput.outputs = []; 285 } 286 287 currentTid = currentTid.init; 288 }, 289 (OwnerTerminated trm) { 290 done = true; 291 } 292 ); 293 } 294 295 restore; 296 tid.send(ThreadEnded()); 297 }