1 /** 2 * IO related functions 3 */ 4 5 module unit_threaded.io; 6 7 import std.concurrency: Tid; 8 9 /** 10 * Write if debug output was enabled. 11 */ 12 void writelnUt(T...)(auto ref T args) { 13 import unit_threaded.testcase: TestCase; 14 if(isDebugOutputEnabled) 15 TestCase.currentTest.getWriter.writeln(args); 16 } 17 18 19 unittest { 20 import unit_threaded.testcase: TestCase; 21 import unit_threaded.should; 22 import std.string: splitLines; 23 24 enableDebugOutput(false); 25 class TestOutput: Output { 26 string output; 27 override void send(in string output) { 28 import std.conv: text; 29 this.output ~= output; 30 } 31 32 override void flush() {} 33 } 34 35 class PrintTest: TestCase { 36 override void test() { 37 writelnUt("foo", "bar"); 38 } 39 override string getPath() @safe pure nothrow const { 40 return "PrintTest"; 41 } 42 } 43 44 auto test = new PrintTest; 45 auto writer = new TestOutput; 46 test.setOutput(writer); 47 test(); 48 49 writer.output.splitLines.shouldEqual( 50 [ 51 "PrintTest:", 52 ] 53 ); 54 } 55 56 unittest { 57 import unit_threaded.should; 58 import unit_threaded.testcase: TestCase; 59 import unit_threaded.reflection: TestData; 60 import unit_threaded.factory: createTestCase; 61 import std.traits: fullyQualifiedName; 62 import std.string: splitLines; 63 64 enableDebugOutput; 65 scope(exit) enableDebugOutput(false); 66 67 class TestOutput: Output { 68 string output; 69 override void send(in string output) { 70 import std.conv: text; 71 this.output ~= output; 72 } 73 74 override void flush() {} 75 } 76 77 class PrintTest: TestCase { 78 override void test() { 79 writelnUt("foo", "bar"); 80 } 81 override string getPath() @safe pure nothrow const { 82 return "PrintTest"; 83 } 84 } 85 86 auto test = new PrintTest; 87 auto writer = new TestOutput; 88 test.setOutput(writer); 89 test(); 90 91 writer.output.splitLines.shouldEqual( 92 [ 93 "PrintTest:", 94 "foobar", 95 ] 96 ); 97 } 98 99 private shared(bool) _debugOutput = false; ///print debug msgs? 100 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway? 101 bool _useEscCodes; 102 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"]; 103 104 105 static this() { 106 version (Posix) { 107 import std.stdio: stdout; 108 import core.sys.posix.unistd: isatty; 109 _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0; 110 } 111 } 112 113 114 package void enableDebugOutput(bool value = true) nothrow { 115 synchronized { 116 _debugOutput = value; 117 } 118 } 119 120 package bool isDebugOutputEnabled() nothrow @trusted { 121 synchronized { 122 return _debugOutput; 123 } 124 } 125 126 package void forceEscCodes() nothrow { 127 synchronized { 128 _forceEscCodes = true; 129 } 130 } 131 132 interface Output { 133 void send(in string output) @safe; 134 void flush() @safe; 135 } 136 137 private enum Colour { 138 red, 139 green, 140 yellow, 141 cancel, 142 } 143 144 private string colour(alias C)(in string msg) { 145 return escCode(C) ~ msg ~ escCode(Colour.cancel); 146 } 147 148 private alias green = colour!(Colour.green); 149 private alias red = colour!(Colour.red); 150 private alias yellow = colour!(Colour.yellow); 151 152 /** 153 * Send escape code to the console 154 */ 155 private string escCode(in Colour code) @safe { 156 return _useEscCodes ? _escCodes[code] : ""; 157 } 158 159 160 /** 161 * Writes the args in a thread-safe manner. 162 */ 163 void write(T...)(Output output, auto ref T args) { 164 import std.conv: text; 165 output.send(text(args)); 166 } 167 168 /** 169 * Writes the args in a thread-safe manner and appends a newline. 170 */ 171 void writeln(T...)(Output output, auto ref T args) { 172 write(output, args, "\n"); 173 } 174 175 /** 176 * Writes the args in a thread-safe manner in green (POSIX only). 177 * and appends a newline. 178 */ 179 void writelnGreen(T...)(Output output, auto ref T args) { 180 import std.conv: text; 181 output.send(green(text(args) ~ "\n")); 182 } 183 184 /** 185 * Writes the args in a thread-safe manner in red (POSIX only) 186 * and appends a newline. 187 */ 188 void writelnRed(T...)(Output output, auto ref T args) { 189 writeRed(output, args, "\n"); 190 } 191 192 /** 193 * Writes the args in a thread-safe manner in red (POSIX only). 194 * and appends a newline. 195 */ 196 void writeRed(T...)(Output output, auto ref T args) { 197 import std.conv: text; 198 output.send(red(text(args))); 199 } 200 201 /** 202 * Writes the args in a thread-safe manner in yellow (POSIX only). 203 * and appends a newline. 204 */ 205 void writeYellow(T...)(Output output, auto ref T args) { 206 import std.conv: text; 207 output.send(yellow(text(args))); 208 } 209 210 /** 211 * Thread to output to stdout 212 */ 213 class WriterThread: Output { 214 215 import std.concurrency: Tid; 216 217 /** 218 * Returns a reference to the only instance of this class. 219 */ 220 static WriterThread get() @trusted { 221 if (!_instantiated) { 222 synchronized { 223 if (_instance is null) { 224 _instance = new WriterThread; 225 } 226 _instantiated = true; 227 } 228 } 229 return _instance; 230 } 231 232 233 override void send(in string output) @safe { 234 235 version(unitUnthreaded) { 236 import std.stdio: write; 237 write(output); 238 } else { 239 import std.concurrency: send, thisTid; 240 () @trusted { _tid.send(output, thisTid); }(); 241 } 242 } 243 244 override void flush() @safe { 245 version(unitUnthreaded) {} 246 else { 247 import std.concurrency: send, thisTid; 248 () @trusted { _tid.send(Flush(), thisTid); }(); 249 } 250 } 251 252 /** 253 * Creates the singleton instance and waits until it's ready. 254 */ 255 static void start() { 256 version(unitUnthreaded) {} 257 else { 258 import std.concurrency: send, receiveOnly; 259 WriterThread.get._tid.send(ThreadWait()); 260 receiveOnly!ThreadStarted; 261 } 262 } 263 264 265 static void stop() { 266 267 void impl() { 268 WriterThread.get.flush; 269 WriterThread.get.join; 270 } 271 272 if (_instantiated) { 273 impl; 274 return; 275 } 276 277 synchronized { 278 if (_instance !is null) { 279 impl; 280 } 281 } 282 } 283 284 /** 285 * Waits for the writer thread to terminate. 286 */ 287 void join() { 288 version(unitUnthreaded) {} 289 else { 290 import std.concurrency: send, receiveOnly; 291 _tid.send(ThreadFinish()); //tell it to join 292 receiveOnly!ThreadEnded; 293 _instance = null; 294 _instantiated = false; 295 } 296 } 297 298 private: 299 300 this() { 301 version(unitUnthreaded) {} 302 else { 303 import std.concurrency: spawn, thisTid; 304 import std.stdio: stdout, stderr; 305 _tid = spawn(&threadWriter!(stdout, stderr), thisTid); 306 } 307 } 308 309 310 Tid _tid; 311 312 static bool _instantiated; /// Thread local 313 __gshared WriterThread _instance; 314 } 315 316 unittest 317 { 318 //make sure this can be brought up and down again 319 WriterThread.get.join; 320 WriterThread.get.join; 321 } 322 323 private struct ThreadWait{}; 324 private struct ThreadFinish{}; 325 private struct ThreadStarted{}; 326 private struct ThreadEnded{}; 327 private struct Flush{}; 328 329 version (Posix) { 330 enum nullFileName = "/dev/null"; 331 } else { 332 enum nullFileName = "NUL"; 333 } 334 335 shared bool gBool; 336 private void threadWriter(alias OUT, alias ERR)(Tid tid) 337 { 338 import std.concurrency: receive, send, OwnerTerminated; 339 340 auto done = false; 341 342 auto saveStdout = OUT; 343 auto saveStderr = ERR; 344 345 void restore() { 346 saveStdout.flush(); 347 OUT = saveStdout; 348 ERR = saveStderr; 349 } 350 351 scope (failure) restore; 352 353 if (!isDebugOutputEnabled()) { 354 OUT = typeof(OUT)(nullFileName, "w"); 355 ERR = typeof(ERR)(nullFileName, "w"); 356 } 357 358 // the first thread to send output becomes the current 359 // until that thread sends a Flush message no other thread 360 // can print to stdout, so we store their outputs in the meanwhile 361 string[Tid] outputs; 362 Tid currentTid; 363 364 void actuallyPrint(in string msg) { 365 if(msg.length) saveStdout.write(msg); 366 } 367 368 while (!done) { 369 receive( 370 (string msg, Tid originTid) { 371 if(currentTid == currentTid.init) 372 currentTid = originTid; 373 374 if(currentTid == originTid) 375 actuallyPrint(msg); 376 else 377 outputs[originTid] ~= msg; 378 }, 379 (ThreadWait w) { 380 tid.send(ThreadStarted()); 381 }, 382 (ThreadFinish f) { 383 done = true; 384 }, 385 (Flush f, Tid originTid) { 386 387 if(currentTid != currentTid.init && currentTid != originTid) 388 return; 389 390 foreach(t, output; outputs) 391 actuallyPrint(output); 392 393 outputs = outputs.init; 394 currentTid = currentTid.init; 395 }, 396 (OwnerTerminated trm) { 397 done = true; 398 } 399 ); 400 } 401 402 restore; 403 tid.send(ThreadEnded()); 404 } 405 406 version(testing_unit_threaded) { 407 struct FakeFile { 408 string fileName; 409 string mode; 410 string output; 411 void flush() shared {} 412 void write(string s) shared { 413 output ~= s; 414 } 415 string[] lines() shared const @safe pure { 416 import std.string: splitLines; 417 return output.splitLines; 418 } 419 } 420 shared FakeFile gOut; 421 shared FakeFile gErr; 422 void resetFakeFiles() { 423 gOut = FakeFile("out", "mode"); 424 gErr = FakeFile("err", "mode"); 425 } 426 427 428 unittest { 429 import std.concurrency: spawn, thisTid, send, receiveOnly; 430 import unit_threaded.should; 431 432 enableDebugOutput(false); 433 resetFakeFiles; 434 435 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 436 tid.send(ThreadWait()); 437 receiveOnly!ThreadStarted; 438 439 gOut.shouldEqual(shared FakeFile(nullFileName, "w")); 440 gErr.shouldEqual(shared FakeFile(nullFileName, "w")); 441 442 tid.send(ThreadFinish()); 443 receiveOnly!ThreadEnded; 444 } 445 446 unittest { 447 import std.concurrency: spawn, send, thisTid, receiveOnly; 448 import unit_threaded.should; 449 450 enableDebugOutput(true); 451 scope(exit) enableDebugOutput(false); 452 resetFakeFiles; 453 454 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 455 tid.send(ThreadWait()); 456 receiveOnly!ThreadStarted; 457 458 gOut.shouldEqual(shared FakeFile("out", "mode")); 459 gErr.shouldEqual(shared FakeFile("err", "mode")); 460 461 tid.send(ThreadFinish()); 462 receiveOnly!ThreadEnded; 463 } 464 465 unittest { 466 import std.concurrency: spawn, thisTid, send, receiveOnly; 467 import unit_threaded.should; 468 469 resetFakeFiles; 470 471 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 472 tid.send(ThreadWait()); 473 receiveOnly!ThreadStarted; 474 475 tid.send("foobar\n", thisTid); 476 tid.send("toto\n", thisTid); 477 gOut.output.shouldBeEmpty; // since it writes to the old gOut 478 479 tid.send(ThreadFinish()); 480 receiveOnly!ThreadEnded; 481 482 // gOut is restored so the output should be here 483 gOut.lines.shouldEqual( 484 [ 485 "foobar", 486 "toto", 487 ] 488 ); 489 } 490 491 void otherThread(Tid writerTid, Tid testTid) { 492 import std.concurrency: send, receiveOnly, OwnerTerminated, thisTid; 493 try { 494 writerTid.send("what about me?\n", thisTid); 495 testTid.send(true); 496 receiveOnly!bool; 497 498 writerTid.send("seriously, what about me?\n", thisTid); 499 testTid.send(true); 500 receiveOnly!bool; 501 502 writerTid.send(Flush(), thisTid); 503 testTid.send(true); 504 receiveOnly!bool; 505 506 writerTid.send("final attempt\n", thisTid); 507 testTid.send(true); 508 509 } catch(OwnerTerminated ex) {} 510 } 511 512 513 unittest { 514 import std.concurrency: spawn, thisTid, send, receiveOnly; 515 import unit_threaded.should; 516 517 resetFakeFiles; 518 519 auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid); 520 writerTid.send(ThreadWait()); 521 receiveOnly!ThreadStarted; 522 523 writerTid.send("foobar\n", thisTid); 524 auto otherTid = spawn(&otherThread, writerTid, thisTid); 525 receiveOnly!bool; //wait for otherThread 1st message 526 527 writerTid.send("toto\n", thisTid); 528 otherTid.send(true); //tell otherThread to continue 529 receiveOnly!bool; //wait for otherThread 2nd message 530 531 writerTid.send("last one from me\n", thisTid); 532 otherTid.send(true); // tell otherThread to continue 533 receiveOnly!bool; // wait for otherThread to try and flush (won't work) 534 535 writerTid.send(Flush(), thisTid); //finish with our output 536 otherTid.send(true); //finish 537 receiveOnly!bool; // wait for otherThread to finish 538 539 writerTid.send(ThreadFinish()); 540 receiveOnly!ThreadEnded; 541 542 // gOut is restored so the output should be here 543 // the output should also be serialised despite 544 // sending messages from two threads 545 gOut.lines.shouldEqual( 546 [ 547 "foobar", 548 "toto", 549 "last one from me", 550 "what about me?", 551 "seriously, what about me?", 552 "final attempt", 553 ] 554 ); 555 } 556 }