1 /** 2 * IO related functions 3 */ 4 5 module unit_threaded.io; 6 7 import std.concurrency; 8 import std.stdio; 9 import std.conv; 10 11 /** 12 * Write if debug output was enabled. 13 */ 14 void writelnUt(T...)(T args) { 15 import std.conv: text; 16 import unit_threaded: TestCase; 17 if(isDebugOutputEnabled) 18 TestCase.currentTest.getWriter.writeln(text(args)); 19 } 20 21 unittest { 22 import unit_threaded.testcase: TestCase; 23 import unit_threaded.should; 24 import std.string: splitLines; 25 26 enableDebugOutput(false); 27 class TestOutput: Output { 28 string output; 29 override void send(in string output) { 30 import std.conv: text; 31 this.output ~= output; 32 } 33 34 override void flush() {} 35 } 36 37 class PrintTest: TestCase { 38 override void test() { 39 writelnUt("foo", "bar"); 40 } 41 override string getPath() @safe pure nothrow const { 42 return "PrintTest"; 43 } 44 } 45 46 auto test = new PrintTest; 47 auto writer = new TestOutput; 48 test.setOutput(writer); 49 test(); 50 51 writer.output.splitLines.shouldEqual( 52 [ 53 "PrintTest:", 54 ] 55 ); 56 } 57 58 unittest { 59 import unit_threaded.should; 60 import unit_threaded.testcase: TestCase; 61 import unit_threaded.reflection: TestData; 62 import unit_threaded.factory: createTestCase; 63 import std.traits: fullyQualifiedName; 64 import std.string: splitLines; 65 66 enableDebugOutput; 67 scope(exit) enableDebugOutput(false); 68 69 class TestOutput: Output { 70 string output; 71 override void send(in string output) { 72 import std.conv: text; 73 this.output ~= output; 74 } 75 76 override void flush() {} 77 } 78 79 class PrintTest: TestCase { 80 override void test() { 81 writelnUt("foo", "bar"); 82 } 83 override string getPath() @safe pure nothrow const { 84 return "PrintTest"; 85 } 86 } 87 88 auto test = new PrintTest; 89 auto writer = new TestOutput; 90 test.setOutput(writer); 91 test(); 92 93 writer.output.splitLines.shouldEqual( 94 [ 95 "PrintTest:", 96 "foobar", 97 ] 98 ); 99 } 100 101 private shared(bool) _debugOutput = false; ///print debug msgs? 102 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway? 103 bool _useEscCodes; 104 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"]; 105 106 107 static this() { 108 version (Posix) { 109 import std.stdio: stdout; 110 import core.sys.posix.unistd: isatty; 111 _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0; 112 } 113 } 114 115 116 package void enableDebugOutput(bool value = true) nothrow { 117 synchronized { 118 _debugOutput = value; 119 } 120 } 121 122 package bool isDebugOutputEnabled() nothrow { 123 synchronized { 124 return _debugOutput; 125 } 126 } 127 128 package void forceEscCodes() nothrow { 129 synchronized { 130 _forceEscCodes = true; 131 } 132 } 133 134 interface Output { 135 void send(in string output); 136 void flush(); 137 } 138 139 private enum Colour { 140 red, 141 green, 142 yellow, 143 cancel, 144 } 145 146 private string colour(alias C)(in string msg) { 147 return escCode(C) ~ msg ~ escCode(Colour.cancel); 148 } 149 150 private alias green = colour!(Colour.green); 151 private alias red = colour!(Colour.red); 152 private alias yellow = colour!(Colour.yellow); 153 154 /** 155 * Send escape code to the console 156 */ 157 private string escCode(in Colour code) @safe { 158 return _useEscCodes ? _escCodes[code] : ""; 159 } 160 161 162 /** 163 * Writes the args in a thread-safe manner. 164 */ 165 void write(T...)(Output output, T args) { 166 output.send(text(args)); 167 } 168 169 /** 170 * Writes the args in a thread-safe manner and appends a newline. 171 */ 172 void writeln(T...)(Output output, T args) { 173 write(output, args, "\n"); 174 } 175 176 /** 177 * Writes the args in a thread-safe manner in green (POSIX only). 178 * and appends a newline. 179 */ 180 void writelnGreen(T...)(Output output, T args) { 181 output.send(green(text(args) ~ "\n")); 182 } 183 184 /** 185 * Writes the args in a thread-safe manner in red (POSIX only) 186 * and appends a newline. 187 */ 188 void writelnRed(T...)(Output output, T args) { 189 output.send(red(text(args) ~ "\n")); 190 } 191 192 /** 193 * Writes the args in a thread-safe manner in red (POSIX only). 194 * and appends a newline. 195 */ 196 void writeRed(T...)(Output output, T args) { 197 output.send(red(text(args))); 198 } 199 200 /** 201 * Writes the args in a thread-safe manner in yellow (POSIX only). 202 * and appends a newline. 203 */ 204 void writeYellow(T...)(Output output, T args) { 205 output.send(yellow(text(args))); 206 } 207 208 /** 209 * Thread to output to stdout 210 */ 211 class WriterThread: Output { 212 /** 213 * Returns a reference to the only instance of this class. 214 */ 215 static WriterThread get() { 216 if (!_instantiated) { 217 synchronized { 218 if (_instance is null) { 219 _instance = new WriterThread; 220 } 221 _instantiated = true; 222 } 223 } 224 return _instance; 225 } 226 227 228 override void send(in string output) { 229 version(unitUnthreaded) { 230 import std.stdio: write; 231 write(output); 232 } else 233 _tid.send(output, thisTid); 234 } 235 236 override void flush() { 237 version(unitUnthreaded) {} 238 else _tid.send(Flush()); 239 } 240 241 /** 242 * Creates the singleton instance and waits until it's ready. 243 */ 244 static void start() { 245 version(unitUnthreaded) {} 246 else { 247 WriterThread.get._tid.send(ThreadWait()); 248 receiveOnly!ThreadStarted; 249 } 250 } 251 252 /** 253 * Waits for the writer thread to terminate. 254 */ 255 void join() { 256 version(unitUnthreaded) {} 257 else { 258 _tid.send(ThreadFinish()); //tell it to join 259 receiveOnly!ThreadEnded; 260 _instance = null; 261 _instantiated = false; 262 } 263 } 264 265 private: 266 267 this() { 268 version(unitUnthreaded) {} 269 else 270 _tid = spawn(&threadWriter!(stdout, stderr), thisTid); 271 } 272 273 274 Tid _tid; 275 276 static bool _instantiated; /// Thread local 277 __gshared WriterThread _instance; 278 } 279 280 unittest 281 { 282 //make sure this can be brought up and down again 283 WriterThread.get.join; 284 WriterThread.get.join; 285 } 286 287 private struct ThreadWait{}; 288 private struct ThreadFinish{}; 289 private struct ThreadStarted{}; 290 private struct ThreadEnded{}; 291 private struct Flush{}; 292 293 version (Posix) { 294 enum nullFileName = "/dev/null"; 295 } else { 296 enum nullFileName = "NUL"; 297 } 298 299 shared bool gBool; 300 private void threadWriter(alias OUT, alias ERR)(Tid tid) 301 { 302 auto done = false; 303 304 auto saveStdout = OUT; 305 auto saveStderr = ERR; 306 307 void restore() { 308 saveStdout.flush(); 309 OUT = saveStdout; 310 ERR = saveStderr; 311 } 312 313 scope (failure) restore; 314 315 if (!isDebugOutputEnabled()) { 316 OUT = typeof(OUT)(nullFileName, "w"); 317 ERR = typeof(ERR)(nullFileName, "w"); 318 } 319 320 // the first thread to send output becomes the current 321 // until that thread sends a Flush message no other thread 322 // can print to stdout, so we store their outputs in the meanwhile 323 string[Tid] outputs; 324 Tid currentTid; 325 326 void actuallyPrint(in string msg) { 327 if(msg.length) saveStdout.write(msg); 328 } 329 330 while (!done) { 331 receive( 332 (string msg, Tid originTid) { 333 if(currentTid == currentTid.init) 334 currentTid = originTid; 335 336 if(currentTid == originTid) 337 actuallyPrint(msg); 338 else 339 outputs[originTid] ~= msg; 340 }, 341 (ThreadWait w) { 342 tid.send(ThreadStarted()); 343 }, 344 (ThreadFinish f) { 345 done = true; 346 }, 347 (Flush f) { 348 349 foreach(t, output; outputs) 350 actuallyPrint(output); 351 352 outputs = outputs.init; 353 currentTid = currentTid.init; 354 }, 355 (OwnerTerminated trm) { 356 done = true; 357 } 358 ); 359 } 360 361 restore; 362 tid.send(ThreadEnded()); 363 } 364 365 version(testing_unit_threaded) { 366 struct FakeFile { 367 string fileName; 368 string mode; 369 string output; 370 void flush() shared {} 371 void write(string s) shared { 372 output ~= s; 373 } 374 string[] lines() shared const @safe pure { 375 import std.string: splitLines; 376 return output.splitLines; 377 } 378 } 379 shared FakeFile gOut; 380 shared FakeFile gErr; 381 void resetFakeFiles() { 382 gOut = FakeFile("out", "mode"); 383 gErr = FakeFile("err", "mode"); 384 } 385 } 386 387 unittest { 388 import std.concurrency: spawn; 389 import unit_threaded.should; 390 391 enableDebugOutput(false); 392 resetFakeFiles; 393 394 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 395 tid.send(ThreadWait()); 396 receiveOnly!ThreadStarted; 397 398 gOut.shouldEqual(shared FakeFile(nullFileName, "w")); 399 gErr.shouldEqual(shared FakeFile(nullFileName, "w")); 400 401 tid.send(ThreadFinish()); 402 receiveOnly!ThreadEnded; 403 } 404 405 unittest { 406 import std.concurrency: spawn; 407 import unit_threaded.should; 408 409 enableDebugOutput(true); 410 scope(exit) enableDebugOutput(false); 411 resetFakeFiles; 412 413 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 414 tid.send(ThreadWait()); 415 receiveOnly!ThreadStarted; 416 417 gOut.shouldEqual(shared FakeFile("out", "mode")); 418 gErr.shouldEqual(shared FakeFile("err", "mode")); 419 420 tid.send(ThreadFinish()); 421 receiveOnly!ThreadEnded; 422 } 423 424 unittest { 425 import std.concurrency: spawn; 426 import unit_threaded.should; 427 428 resetFakeFiles; 429 430 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 431 tid.send(ThreadWait()); 432 receiveOnly!ThreadStarted; 433 434 tid.send("foobar\n", thisTid); 435 tid.send("toto\n", thisTid); 436 gOut.output.shouldBeEmpty; // since it writes to the old gOut 437 438 tid.send(ThreadFinish()); 439 receiveOnly!ThreadEnded; 440 441 // gOut is restored so the output should be here 442 gOut.lines.shouldEqual( 443 [ 444 "foobar", 445 "toto", 446 ] 447 ); 448 } 449 450 451 version(testing_unit_threaded) { 452 void otherThread(Tid writerTid, Tid testTid) { 453 try { 454 writerTid.send("what about me?\n", thisTid); 455 testTid.send(true); 456 receiveOnly!bool; 457 writerTid.send("seriously, what about me?\n", thisTid); 458 testTid.send(true); 459 receiveOnly!bool; 460 writerTid.send("final attempt\n", thisTid); 461 testTid.send(true); 462 receiveOnly!bool; 463 } catch(OwnerTerminated ex) {} 464 } 465 } 466 467 unittest { 468 import std.concurrency: spawn; 469 import unit_threaded.should; 470 471 resetFakeFiles; 472 473 auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid); 474 writerTid.send(ThreadWait()); 475 receiveOnly!ThreadStarted; 476 477 writerTid.send("foobar\n", thisTid); 478 auto otherTid = spawn(&otherThread, writerTid, thisTid); 479 receiveOnly!bool; //wait for otherThread 1st message 480 writerTid.send("toto\n", thisTid); 481 otherTid.send(true); //tell otherThread to continue 482 receiveOnly!bool; //wait for otherThread 2nd message 483 writerTid.send("last one from me\n", thisTid); 484 writerTid.send(Flush()); //finish with our output 485 otherTid.send(true); //finish 486 receiveOnly!bool; 487 488 writerTid.send(ThreadFinish()); 489 receiveOnly!ThreadEnded; 490 491 // gOut is restored so the output should be here 492 // the output should also be serialised despite 493 // sending messages from two threads 494 gOut.lines.shouldEqual( 495 [ 496 "foobar", 497 "toto", 498 "last one from me", 499 "what about me?", 500 "seriously, what about me?", 501 "final attempt", 502 ] 503 ); 504 }