java - How to catch an exception when any Thread/Runnable/Callable in an ExecutorService fails while awaiting termination -


i have code looks this:

public void dothings() {     int numthreads = 4;     executorservice threadpool = executors.newfixedthreadpool(numthreads);     (int = 0; < numthreads; i++) {          final int index = i;         runnable runnable = () -> {              // things based on index         };          threadpool.execute(runnable);      }      threadpool.shutdown();      try {         // i'd catch exceptions here of runnables         threadpool.awaittermination(1, timeunit.hours);     } catch (interruptedexception e) {         utils.throwruntimeinterruptedexception(e);     } } 

basically create lot of work in parallel , wait done. if of processing fails need know , abort all. threadpool.awaittermination doesn't seem notice if exception thrown inside 1 of threads. see stacktrace in console.

i don't know lot concurrency i'm bit lost in available interfaces/objects such callable, future, task, etc.

i see threadpool.invokeall(callables) give me list<future> , future.get() can throw exceptions within thread, if call on (if callable throws exception in own thread). if .get each callable have in sequential collection won't know last 1 failing until others have finished.

my best guess have queue on runnables put boolean success or failure , take() queue many times there threads.

i feel inordinate amount of complexity (even code i've pasted surprisingly long) seems common, simple use case. , doesn't include aborting runnables when 1 fails. there has better way, , beginner don't know it.

i found executorcompletionservice designed this. wrote following class abstract process , simplify usage bit:

import java.util.iterator; import java.util.concurrent.callable; import java.util.concurrent.executionexception; import java.util.concurrent.executorcompletionservice; import java.util.concurrent.executorservice; import java.util.concurrent.atomic.atomicinteger;  /**  * wrapper around executorservice allows submit callables, results via iteration,  * , handle failure quickly. when submitted callable throws exception in thread  * result in runtimeexception when iterating on results. typical usage follows:  *  * <ol>  *     <li>create executorservice , pass constructor.</li>  *     <li>create callables , ensure respond interruption, e.g. regularly call: <pre>{@code  *     if (thread.currentthread().isinterrupted()) {            throw new runtimeexception("the thread interrupted, indicating failure in sibling thread.");  *     }}</pre></li>  *     <li>pass callables submit() method.</li>  *     <li>call finishedsubmitting().</li>  *     <li>iterate on object (e.g. foreach loop) results callables.  *     each iteration block waiting next result.  *     if 1 of callables throws unhandled exception or thread interrupted during iteration  *     executorservice.shutdownnow() called resulting in still running callables being interrupted,  *     , runtimeexception thrown </li>  * </ol>  */ public class executorserviceresultshandler<v> implements iterable<v> {      private executorcompletionservice<v> completionservice;     private executorservice executorservice;     atomicinteger taskcount = new atomicinteger(0);      public executorserviceresultshandler(executorservice executorservice) {         this.executorservice = executorservice;         completionservice = new executorcompletionservice<v>(executorservice);     }      public void submit(callable<v> task) {         completionservice.submit(task);         taskcount.incrementandget();     }      public void finishedsubmitting() {         executorservice.shutdown();     }      @override     public iterator<v> iterator() {         return new iterator<v>() {             @override             public boolean hasnext() {                 return taskcount.getanddecrement() > 0;             }              @override             public v next() {                 exception exception;                 try {                     return completionservice.take().get();                 } catch (interruptedexception e) {                     thread.currentthread().interrupt();                     exception = e;                 } catch (executionexception e) {                     exception = e;                 }                 executorservice.shutdownnow();                 executorservice = null;                 completionservice = null;                 throw new runtimeexception(exception);             }         };     }      /**      * convenience method wait callables finish when don't care results.      */     public void awaitcompletion() {         (v ignored : this) {             // nothing         }     }  } 

Comments

Popular posts from this blog

html - Outlook 2010 Anchor (url/address/link) -

javascript - Why does running this loop 9 times take 100x longer than running it 8 times? -

Getting gateway time-out Rails app with Nginx + Puma running on Digital Ocean -