c# - TransformBlock Items get stuck in the output queue. Why and how to fix? -


i have walked through tpl dataflow , have faced irritating problem occurred in code using trasformblock linked actionblock.

eventually have found items got stuck in transformblock's output queue, since outputcount property continuously returned value higher "0". that's why whole application deadlocked. however, unblocks call transformblock.tryreceiveall().

can anyone, please, let me know if there missed or how prevent such behavior?

static void main() {     int total = 0;     int itemsprocessing = 0;      transformblock<int, tuple<int, double>> transformblock = new transformblock<int, tuple<int, double>>(         => new tuple<int, double>(i, math.sqrt(i)),         new executiondataflowblockoptions         {             boundedcapacity = 20,             maxdegreeofparallelism = dataflowblockoptions.unbounded         });      actionblock<tuple<int, double>> outputblock = new actionblock<tuple<int, double>>(async tuple =>         {             await task.delay(1000); // simulating data output delay             interlocked.decrement(ref itemsprocessing);         },         new executiondataflowblockoptions         {             boundedcapacity = 5,             maxdegreeofparallelism = dataflowblockoptions.unbounded         });      transformblock.completion.continuewith(t => outputblock.complete());      using (timer timer = new timer(o =>         {             console.title = string.format(                 "{0}: {1}/{2} {3}/{4}/{5}",                 assembly.getexecutingassembly().getname().name,                 volatile.read(ref itemsprocessing), volatile.read(ref total),                 transformblock.inputcount, transformblock.outputcount, outputblock.inputcount);         }, null, 100, 100))     {         using (transformblock.linkto(outputblock, new dataflowlinkoptions { propagatecompletion = true }))         {             (int = 0; < 40; i++)             {                 thread.sleep(100); // simulating new item retrieval delay                  interlocked.increment(ref total);                 interlocked.increment(ref itemsprocessing);                  transformblock.sendasync(i).wait();             }         }          console.writeline("enqueued");          transformblock.complete();         outputblock.completion.wait();          console.writeline("finish");          timer.change(timeout.infinite, timeout.infinite);         timer.dispose();     } } 

invoking transformblock.linkto gets disposable registration. when dispose of registration blocks unlink.

your using scope ends , blocks unlink before transformblock has chance empty actionblock preventing being able complete. since first block doesn't complete next 1 doesn't start completing, let alone finish.

moving wait inside using block solves deadlock:

using (transformblock.linkto(outputblock, new dataflowlinkoptions { propagatecompletion = true })) {     (int = 0; < 40; i++)     {         thread.sleep(100); // simulating new item retrieval delay          interlocked.increment(ref total);         interlocked.increment(ref itemsprocessing);          transformblock.sendasync(i).wait();     }      console.writeline("enqueued");     transformblock.complete();     outputblock.completion.wait();     console.writeline("finish"); } 

as side note, shouldn't blocking on async code in such way. simpler use async-await instead of wait(), task.delay instead of thread.sleep, etc.

also, since you're using propagatecompletion don't need call outputblock.complete() explicitly.


Comments

Popular posts from this blog

1111. appearing after print sequence - php -

java - WARN : org.springframework.web.servlet.PageNotFound - No mapping found for HTTP request with URI [/board/] in DispatcherServlet with name 'appServlet' -

Ruby on Rails, ActiveRecord, Postgres, UTF-8 and ASCII-8BIT encodings -