1 /** 2 * IO related functions 3 */ 4 5 module unit_threaded.io; 6 7 import std.concurrency: Tid; 8 9 /** 10 * Write if debug output was enabled. 11 */ 12 void writelnUt(T...)(auto ref T args) { 13 import unit_threaded.testcase: TestCase; 14 if(isDebugOutputEnabled) 15 TestCase.currentTest.getWriter.writeln(args); 16 } 17 18 19 unittest { 20 import unit_threaded.testcase: TestCase; 21 import unit_threaded.should; 22 import std.string: splitLines; 23 24 enableDebugOutput(false); 25 class TestOutput: Output { 26 string output; 27 override void send(in string output) { 28 import std.conv: text; 29 this.output ~= output; 30 } 31 32 override void flush() {} 33 } 34 35 class PrintTest: TestCase { 36 override void test() { 37 writelnUt("foo", "bar"); 38 } 39 override string getPath() @safe pure nothrow const { 40 return "PrintTest"; 41 } 42 } 43 44 auto test = new PrintTest; 45 auto writer = new TestOutput; 46 test.setOutput(writer); 47 test(); 48 49 writer.output.splitLines.shouldEqual( 50 [ 51 "PrintTest:", 52 ] 53 ); 54 } 55 56 unittest { 57 import unit_threaded.should; 58 import unit_threaded.testcase: TestCase; 59 import unit_threaded.reflection: TestData; 60 import unit_threaded.factory: createTestCase; 61 import std.traits: fullyQualifiedName; 62 import std.string: splitLines; 63 64 enableDebugOutput; 65 scope(exit) enableDebugOutput(false); 66 67 class TestOutput: Output { 68 string output; 69 override void send(in string output) { 70 import std.conv: text; 71 this.output ~= output; 72 } 73 74 override void flush() {} 75 } 76 77 class PrintTest: TestCase { 78 override void test() { 79 writelnUt("foo", "bar"); 80 } 81 override string getPath() @safe pure nothrow const { 82 return "PrintTest"; 83 } 84 } 85 86 auto test = new PrintTest; 87 auto writer = new TestOutput; 88 test.setOutput(writer); 89 test(); 90 91 writer.output.splitLines.shouldEqual( 92 [ 93 "PrintTest:", 94 "foobar", 95 ] 96 ); 97 } 98 99 private shared(bool) _debugOutput = false; ///print debug msgs? 100 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway? 101 bool _useEscCodes; 102 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"]; 103 104 105 static this() { 106 version (Posix) { 107 import std.stdio: stdout; 108 import core.sys.posix.unistd: isatty; 109 _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0; 110 } 111 } 112 113 114 package void enableDebugOutput(bool value = true) nothrow { 115 synchronized { 116 _debugOutput = value; 117 } 118 } 119 120 package bool isDebugOutputEnabled() nothrow @trusted { 121 synchronized { 122 return _debugOutput; 123 } 124 } 125 126 package void forceEscCodes() nothrow { 127 synchronized { 128 _forceEscCodes = true; 129 } 130 } 131 132 interface Output { 133 void send(in string output) @safe; 134 void flush() @safe; 135 } 136 137 private enum Colour { 138 red, 139 green, 140 yellow, 141 cancel, 142 } 143 144 private string colour(alias C)(in string msg) { 145 return escCode(C) ~ msg ~ escCode(Colour.cancel); 146 } 147 148 private alias green = colour!(Colour.green); 149 private alias red = colour!(Colour.red); 150 private alias yellow = colour!(Colour.yellow); 151 152 /** 153 * Send escape code to the console 154 */ 155 private string escCode(in Colour code) @safe { 156 return _useEscCodes ? _escCodes[code] : ""; 157 } 158 159 160 /** 161 * Writes the args in a thread-safe manner. 162 */ 163 void write(T...)(Output output, auto ref T args) { 164 import std.conv: text; 165 output.send(text(args)); 166 } 167 168 /** 169 * Writes the args in a thread-safe manner and appends a newline. 170 */ 171 void writeln(T...)(Output output, auto ref T args) { 172 write(output, args, "\n"); 173 } 174 175 /** 176 * Writes the args in a thread-safe manner in green (POSIX only). 177 * and appends a newline. 178 */ 179 void writelnGreen(T...)(Output output, auto ref T args) { 180 import std.conv: text; 181 output.send(green(text(args) ~ "\n")); 182 } 183 184 /** 185 * Writes the args in a thread-safe manner in red (POSIX only) 186 * and appends a newline. 187 */ 188 void writelnRed(T...)(Output output, auto ref T args) { 189 writeRed(output, args, "\n"); 190 } 191 192 /** 193 * Writes the args in a thread-safe manner in red (POSIX only). 194 * and appends a newline. 195 */ 196 void writeRed(T...)(Output output, auto ref T args) { 197 import std.conv: text; 198 output.send(red(text(args))); 199 } 200 201 /** 202 * Writes the args in a thread-safe manner in yellow (POSIX only). 203 * and appends a newline. 204 */ 205 void writeYellow(T...)(Output output, auto ref T args) { 206 import std.conv: text; 207 output.send(yellow(text(args))); 208 } 209 210 /** 211 * Thread to output to stdout 212 */ 213 class WriterThread: Output { 214 215 import std.concurrency: Tid; 216 217 /** 218 * Returns a reference to the only instance of this class. 219 */ 220 static WriterThread get() @trusted { 221 if (!_instantiated) { 222 synchronized { 223 if (_instance is null) { 224 _instance = new WriterThread; 225 } 226 _instantiated = true; 227 } 228 } 229 return _instance; 230 } 231 232 233 override void send(in string output) @safe { 234 235 version(unitUnthreaded) { 236 import std.stdio: write; 237 write(output); 238 } else { 239 import std.concurrency: send, thisTid; 240 () @trusted { _tid.send(output, thisTid); }(); 241 } 242 } 243 244 override void flush() @safe { 245 version(unitUnthreaded) {} 246 else { 247 import std.concurrency: send; 248 () @trusted { _tid.send(Flush()); }(); 249 } 250 } 251 252 /** 253 * Creates the singleton instance and waits until it's ready. 254 */ 255 static void start() { 256 version(unitUnthreaded) {} 257 else { 258 import std.concurrency: send, receiveOnly; 259 WriterThread.get._tid.send(ThreadWait()); 260 receiveOnly!ThreadStarted; 261 } 262 } 263 264 /** 265 * Waits for the writer thread to terminate. 266 */ 267 void join() { 268 version(unitUnthreaded) {} 269 else { 270 import std.concurrency: send, receiveOnly; 271 _tid.send(ThreadFinish()); //tell it to join 272 receiveOnly!ThreadEnded; 273 _instance = null; 274 _instantiated = false; 275 } 276 } 277 278 private: 279 280 this() { 281 version(unitUnthreaded) {} 282 else { 283 import std.concurrency: spawn, thisTid; 284 import std.stdio: stdout, stderr; 285 _tid = spawn(&threadWriter!(stdout, stderr), thisTid); 286 } 287 } 288 289 290 Tid _tid; 291 292 static bool _instantiated; /// Thread local 293 __gshared WriterThread _instance; 294 } 295 296 unittest 297 { 298 //make sure this can be brought up and down again 299 WriterThread.get.join; 300 WriterThread.get.join; 301 } 302 303 private struct ThreadWait{}; 304 private struct ThreadFinish{}; 305 private struct ThreadStarted{}; 306 private struct ThreadEnded{}; 307 private struct Flush{}; 308 309 version (Posix) { 310 enum nullFileName = "/dev/null"; 311 } else { 312 enum nullFileName = "NUL"; 313 } 314 315 shared bool gBool; 316 private void threadWriter(alias OUT, alias ERR)(Tid tid) 317 { 318 import std.concurrency: receive, send, OwnerTerminated; 319 320 auto done = false; 321 322 auto saveStdout = OUT; 323 auto saveStderr = ERR; 324 325 void restore() { 326 saveStdout.flush(); 327 OUT = saveStdout; 328 ERR = saveStderr; 329 } 330 331 scope (failure) restore; 332 333 if (!isDebugOutputEnabled()) { 334 OUT = typeof(OUT)(nullFileName, "w"); 335 ERR = typeof(ERR)(nullFileName, "w"); 336 } 337 338 // the first thread to send output becomes the current 339 // until that thread sends a Flush message no other thread 340 // can print to stdout, so we store their outputs in the meanwhile 341 string[Tid] outputs; 342 Tid currentTid; 343 344 void actuallyPrint(in string msg) { 345 if(msg.length) saveStdout.write(msg); 346 } 347 348 while (!done) { 349 receive( 350 (string msg, Tid originTid) { 351 if(currentTid == currentTid.init) 352 currentTid = originTid; 353 354 if(currentTid == originTid) 355 actuallyPrint(msg); 356 else 357 outputs[originTid] ~= msg; 358 }, 359 (ThreadWait w) { 360 tid.send(ThreadStarted()); 361 }, 362 (ThreadFinish f) { 363 done = true; 364 }, 365 (Flush f) { 366 367 foreach(t, output; outputs) 368 actuallyPrint(output); 369 370 outputs = outputs.init; 371 currentTid = currentTid.init; 372 }, 373 (OwnerTerminated trm) { 374 done = true; 375 } 376 ); 377 } 378 379 restore; 380 tid.send(ThreadEnded()); 381 } 382 383 version(testing_unit_threaded) { 384 struct FakeFile { 385 string fileName; 386 string mode; 387 string output; 388 void flush() shared {} 389 void write(string s) shared { 390 output ~= s; 391 } 392 string[] lines() shared const @safe pure { 393 import std.string: splitLines; 394 return output.splitLines; 395 } 396 } 397 shared FakeFile gOut; 398 shared FakeFile gErr; 399 void resetFakeFiles() { 400 gOut = FakeFile("out", "mode"); 401 gErr = FakeFile("err", "mode"); 402 } 403 } 404 405 unittest { 406 import std.concurrency: spawn, thisTid, send, receiveOnly; 407 import unit_threaded.should; 408 409 enableDebugOutput(false); 410 resetFakeFiles; 411 412 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 413 tid.send(ThreadWait()); 414 receiveOnly!ThreadStarted; 415 416 gOut.shouldEqual(shared FakeFile(nullFileName, "w")); 417 gErr.shouldEqual(shared FakeFile(nullFileName, "w")); 418 419 tid.send(ThreadFinish()); 420 receiveOnly!ThreadEnded; 421 } 422 423 unittest { 424 import std.concurrency: spawn, send, thisTid, receiveOnly; 425 import unit_threaded.should; 426 427 enableDebugOutput(true); 428 scope(exit) enableDebugOutput(false); 429 resetFakeFiles; 430 431 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 432 tid.send(ThreadWait()); 433 receiveOnly!ThreadStarted; 434 435 gOut.shouldEqual(shared FakeFile("out", "mode")); 436 gErr.shouldEqual(shared FakeFile("err", "mode")); 437 438 tid.send(ThreadFinish()); 439 receiveOnly!ThreadEnded; 440 } 441 442 unittest { 443 import std.concurrency: spawn, thisTid, send, receiveOnly; 444 import unit_threaded.should; 445 446 resetFakeFiles; 447 448 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 449 tid.send(ThreadWait()); 450 receiveOnly!ThreadStarted; 451 452 tid.send("foobar\n", thisTid); 453 tid.send("toto\n", thisTid); 454 gOut.output.shouldBeEmpty; // since it writes to the old gOut 455 456 tid.send(ThreadFinish()); 457 receiveOnly!ThreadEnded; 458 459 // gOut is restored so the output should be here 460 gOut.lines.shouldEqual( 461 [ 462 "foobar", 463 "toto", 464 ] 465 ); 466 } 467 468 469 version(testing_unit_threaded) { 470 void otherThread(Tid writerTid, Tid testTid) { 471 import std.concurrency: send, receiveOnly, OwnerTerminated, thisTid; 472 try { 473 writerTid.send("what about me?\n", thisTid); 474 testTid.send(true); 475 receiveOnly!bool; 476 writerTid.send("seriously, what about me?\n", thisTid); 477 testTid.send(true); 478 receiveOnly!bool; 479 writerTid.send("final attempt\n", thisTid); 480 testTid.send(true); 481 receiveOnly!bool; 482 } catch(OwnerTerminated ex) {} 483 } 484 } 485 486 unittest { 487 import std.concurrency: spawn, thisTid, send, receiveOnly; 488 import unit_threaded.should; 489 490 resetFakeFiles; 491 492 auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid); 493 writerTid.send(ThreadWait()); 494 receiveOnly!ThreadStarted; 495 496 writerTid.send("foobar\n", thisTid); 497 auto otherTid = spawn(&otherThread, writerTid, thisTid); 498 receiveOnly!bool; //wait for otherThread 1st message 499 writerTid.send("toto\n", thisTid); 500 otherTid.send(true); //tell otherThread to continue 501 receiveOnly!bool; //wait for otherThread 2nd message 502 writerTid.send("last one from me\n", thisTid); 503 writerTid.send(Flush()); //finish with our output 504 otherTid.send(true); //finish 505 receiveOnly!bool; 506 507 writerTid.send(ThreadFinish()); 508 receiveOnly!ThreadEnded; 509 510 // gOut is restored so the output should be here 511 // the output should also be serialised despite 512 // sending messages from two threads 513 gOut.lines.shouldEqual( 514 [ 515 "foobar", 516 "toto", 517 "last one from me", 518 "what about me?", 519 "seriously, what about me?", 520 "final attempt", 521 ] 522 ); 523 }