c# - TransformBlock Items get stuck in the output queue. Why and how to fix? -
i have walked through tpl dataflow , have faced irritating problem occurred in code using trasformblock linked actionblock.
eventually have found items got stuck in transformblock's output queue, since outputcount property continuously returned value higher "0". that's why whole application deadlocked. however, unblocks call transformblock.tryreceiveall().
can anyone, please, let me know if there missed or how prevent such behavior?
static void main() { int total = 0; int itemsprocessing = 0; transformblock<int, tuple<int, double>> transformblock = new transformblock<int, tuple<int, double>>( => new tuple<int, double>(i, math.sqrt(i)), new executiondataflowblockoptions { boundedcapacity = 20, maxdegreeofparallelism = dataflowblockoptions.unbounded }); actionblock<tuple<int, double>> outputblock = new actionblock<tuple<int, double>>(async tuple => { await task.delay(1000); // simulating data output delay interlocked.decrement(ref itemsprocessing); }, new executiondataflowblockoptions { boundedcapacity = 5, maxdegreeofparallelism = dataflowblockoptions.unbounded }); transformblock.completion.continuewith(t => outputblock.complete()); using (timer timer = new timer(o => { console.title = string.format( "{0}: {1}/{2} {3}/{4}/{5}", assembly.getexecutingassembly().getname().name, volatile.read(ref itemsprocessing), volatile.read(ref total), transformblock.inputcount, transformblock.outputcount, outputblock.inputcount); }, null, 100, 100)) { using (transformblock.linkto(outputblock, new dataflowlinkoptions { propagatecompletion = true })) { (int = 0; < 40; i++) { thread.sleep(100); // simulating new item retrieval delay interlocked.increment(ref total); interlocked.increment(ref itemsprocessing); transformblock.sendasync(i).wait(); } } console.writeline("enqueued"); transformblock.complete(); outputblock.completion.wait(); console.writeline("finish"); timer.change(timeout.infinite, timeout.infinite); timer.dispose(); } }
invoking transformblock.linkto gets disposable registration. when dispose of registration blocks unlink.
your using scope ends , blocks unlink before transformblock has chance empty actionblock preventing being able complete. since first block doesn't complete next 1 doesn't start completing, let alone finish.
moving wait inside using block solves deadlock:
using (transformblock.linkto(outputblock, new dataflowlinkoptions { propagatecompletion = true })) { (int = 0; < 40; i++) { thread.sleep(100); // simulating new item retrieval delay interlocked.increment(ref total); interlocked.increment(ref itemsprocessing); transformblock.sendasync(i).wait(); } console.writeline("enqueued"); transformblock.complete(); outputblock.completion.wait(); console.writeline("finish"); } as side note, shouldn't blocking on async code in such way. simpler use async-await instead of wait(), task.delay instead of thread.sleep, etc.
also, since you're using propagatecompletion don't need call outputblock.complete() explicitly.
Comments
Post a Comment