Thread Concurrency using ExecutorService in Java 8

Thread Concurrency using ExecutorService in Java 8 1200x630

Programming multi threading and concurrency has been in Java for a while. Concurrency utility package java.util.concurrent (introduced Java 5) made it very easy to span multiple threads to do multiple tasks concurrently. ExecutorService is main concept behind it, which does thread management (create, start and stop) abstract for us. Since Java 5 concurrency package has been improved a lot and Java 8 Lambda is like cherry on top of the cake. Lambda allows us to easily provide anonymous tasks to ExecutorService without much ceremonial code.

Lets explore how do we use ExecutorService with Java 8

Different implementations of ExecutorService

ThreadPoolExecutor – Provides a pool of threads to execute given tasks. Each given task is executed only once. We can configure No of Threads in the pool while creating Executor service.

ScheduledThreadPoolExecutor – Has everything of ThreadPoolExecutor plus provide facility to schedule a task to be executed multiple times at given fixed interval (ex. Every 5 mins) or fixed delay (ex. At delay of 5 min from finish of last execution).

ForkJoinPool – It implements work stealing mechanism where all threads in the pool try to steal work/tasks submitted to the ExecutorService and work on it. Threads in the pool do blocking waiting if no task is available.

I will focus on first 2 of above in this blog.

How to create ExecutorService instance

We can manually create and configure new instance of appropriate implementation of ExecutorService.

int corePoolSize = 10;
int maximumPoolSize = 10;
long keepAliveTime = 0L;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue workQueue = new LinkedBlockingQueue();

ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
                                                        keepAliveTime, unit, workQueue);

There is very useful utility class java.util.concurrent.Executors. It has many static helper methods to create instance ExecutorService that makes our life easy. These helper methods are sufficient mostly unless we have any specific configuration need.

Following creates a ThreadPoolExecutor with min and max of 10 threads. It can be reconfigured, for example thread pool size can be changed later on.

ExecutorService executorService = Executors.newFixedThreadPool(10);

Following creates a ThreadPoolExecutor with 1 thread wrapped inside FinalizableDelegatedExecutorService, which guarantees it cannot be reconfigured.

ExecutorService executorService = Executors.newSingleThreadExecutor();

Following creates a ScheduledThreadPoolExecutor with 10 thread. It can be reconfigured, for example thread pool size can be changed later on.

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);

Above are just a few helper methods. Explore java.util.concurrent.Executors for more.

Send tasks to ExecutorService

We can use following methods of ExecutorService to send task to it for execution

execute(Runnable)
Executes given Runnable task some time in future. Returns nothing.

submit(Callable or Runnable)
Executes given Callable or Runnable task some time in future. Returns Future object that can be used to check if given task is complete and get returned value by Callable.

invokeAll(Collection of Callable)
Executes all given Callable tasks some time in future. Returns collection of Future objects, one for each task that can be used to check if given task, is complete and get returned value by Callable.

invokeAny(Collection of Callable)
Executes given Callable tasks some time in future and returns result of the task that completed successfully first. On return all pending tasks (tasks from given collection which are not yet started or still executing) are cancelled. Note that it does not interfere with tasks send to same executor service by other invokeAny method or any other method calls.

Apart from above ways ScheduledExecutorService supports following more ways

schedule(Callable or Runnable and initial delay)
Executes given task after given initial delay. Return ScheduledFuture object which can be used to check remaining delay to the execution time, completion and get return value of the task.

scheduleAtFixedRate(Runnable, initial delay and period)
Executes given task periodically multiple times until executor service shutdown. The task will be first executed after initial delay and subsequently after given period of time. Note that subsequent execution will not start until previous execution finish. If task execution takes more time to finish than period in that case subsequent task execution will be delayed but will run in parallel.

scheduleAtFixedDelay(Runnable, initial delay and delay)
Executes given task periodically multiple times until executor service shutdown. The task will be first executed after initial delay and subsequently after given delay of time from finish of previous task. Note that subsequent execution will wait for given delay from finish of previous execution, which means task execution will not over run each other and will never run in parallel.

Shutdown ExecutorService

We have seen how to create instance of ExecutorService and how to send tasks to it. Since ExecutorService creates manage pool of threads to execute given tasks, it is very important shutdown ExecutorService so that threads are released.

Following methods of ExecutorService are useful to shutdown it properly

shutdown()
Request for shutdown. After this call executor service stops taking more tasks.

awaitTermination(timeout)
Should be called after shutdown is requested. Blocking wait until all pending tasks are completed, maximum until given timeout period whichever happens first.

isTerminated()
Should be called after shutdown is requested. Returns true if all pending tasks are completed otherwise false.

shutdownNow()
Force shutdown executor service. Stops all pending or currently executing tasks, shutdown executor service an return all stopped tasks (Runnable).

Following is suggested way to use above methods and properly shutdown an ExecutorService

private void shutdownExecutor(ExecutorService executorService) {
    try {
        System.out.println("Shutdown executor service");
        // Request shutdown, it should stop taking more tasks.
        executorService.shutdown();
        
        System.out.println("Waiting for pending tasks to finish");
        //wait for pending tasks to finish for maximum 10 seconds
        executorService.awaitTermination(10, TimeUnit.SECONDS);
    }catch (InterruptedException e) {
        //Log exception and carry on with finally block
        System.out.println(e.getMessage());
    }finally {
        //Check if all pending tasks completed
        if(!executorService.isTerminated()) {
            System.out.println("All tasks not completed. Doing force shutdown, pending tasks not be done.");
        }
        
        // Do force shutdown anyways. Even if all pending tasks completed, it will not hurt
        executorService.shutdownNow();
        System.out.println("Shutdown finish");
    }
}

Code Examples

Lets look at some code examples of ExecutorService. We will use shutdownExecutor() method (shown above) and print() method shown below to keep code sample concise

private static void print(String str) {
    SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
    System.out.println(dateFormat.format(new Date()) + " - " + Thread.currentThread().getName() + " -> " + str);
}

Following example creates ThreadPoolExecutor with pool of 2 threads and sends 5 Runnable tasks to it using execute method. Runnable task implementation is provided as lambda expression. In output you can see that 2 different threads are executing tasks.

ExecutorService executorService = Executors.newFixedThreadPool(2);

for (int i = 1; i <= 5; i++) {
    int val = i;
    executorService.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        print("Value=" + val + " Square=" + (val * val));
    });
}

print("Tasks sent to executor service");
shutdownExecutor(executorService);

/* 
--- Output ---
20:26:41.694 - main -> Tasks sent to executor service
Shutdown executor service
Waiting for pending tasks to finish
20:26:42.445 - pool-1-thread-2 -> Value=2 Square=4
20:26:42.445 - pool-1-thread-1 -> Value=1 Square=1
20:26:43.451 - pool-1-thread-2 -> Value=3 Square=9
20:26:43.451 - pool-1-thread-1 -> Value=4 Square=16
20:26:44.453 - pool-1-thread-2 -> Value=5 Square=25
Shutdown finish
*/

Following example creates ThreadPoolExecutor with single thread and sends 5 Runnable tasks to it using execute method. Note that task throws RuntimeException for 2nd task (val = 2). In output you can see that exception in 2nd task closes current execution thread. In that case ExecutorService creates a fresh new thread in pool and further tasks are executed by thread.

ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 1; i <= 5; i++) {
    int val = i;
    executorService.execute(() -> {
        if(val==2) {
            throw new RuntimeException("Exception thrown for val="+2);
        }
        
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        print("Value=" + val + " Square=" + (val * val));
    });
}

print("Tasks sent to executor service");
shutdownExecutor(executorService);

/* 
--- Output ---
20:33:49.160 - main -> Tasks sent to executor service
Shutdown executor service
Waiting for pending tasks to finish
20:33:50.086 - pool-1-thread-1 -> Value=1 Square=1
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: Exception thrown for val=2
    at com.readtorakesh.java8.executor.MainApp.lambda$1(MainApp.java:74)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20:33:51.087 - pool-1-thread-2 -> Value=3 Square=9
20:33:52.089 - pool-1-thread-2 -> Value=4 Square=16
20:33:53.095 - pool-1-thread-2 -> Value=5 Square=25
Shutdown finish
*/

Following example creates ThreadPoolExecutor with single thread and sends 5 Callable tasks to it using submit method. Collects Future object for each submitted task. After waiting for 5 seconds, loop through all Future objects and get value returned by each task. Note that Future.get() method call is blocking wait call until the task is complete.

ExecutorService executorService = Executors.newSingleThreadExecutor();

List<Future<String>> futures = new ArrayList<>();

for (int i = 1; i <= 5; i++) {
    int val = i;
    Future<String> future = executorService.submit(() -> {
        return "Value=" + val + " Square=" + (val * val);
    });
    futures.add(future);
}

print("Tasks submitted");

try {
    TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
    e.printStackTrace();
}

for(Future<String> future: futures) {
    try {
        print(future.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

shutdownExecutor(executorService);

/* 
--- Output ---
20:35:40.040 - main -> Tasks submitted
20:35:45.046 - main -> Value=1 Square=1
20:35:45.046 - main -> Value=2 Square=4
20:35:45.046 - main -> Value=3 Square=9
20:35:45.047 - main -> Value=4 Square=16
20:35:45.047 - main -> Value=5 Square=25
Shutdown executor service
Waiting for pending tasks to finish
Shutdown finish
*/

Following example creates ThreadPoolExecutor with single thread and sends collection of 3 Callable tasks to it using invokeAll method. Loop through collection of Future objects and get returned value by each task.

Callable<String> callable1 = () -> {return "Callable 1"; };
Callable<String> callable2 = () -> {return "Callable 2"; };
Callable<String> callable3 = () -> {return "Callable 3"; };

List<Callable<String>> tasks = new ArrayList<>();
tasks.add(callable1);
tasks.add(callable2);
tasks.add(callable3);

ExecutorService executorService = Executors.newSingleThreadExecutor();
print("Tasks submitted");

try {
    print("Output of tasks from Future object");
    List<Future<String>> futures = executorService.invokeAll(tasks);
    for(Future<String> future: futures) {
        print(future.get());
    }
} catch (Exception e) {
    e.printStackTrace();
}

shutdownExecutor(executorService);

/* 
--- Output ---
20:41:03.262 - main -> Tasks submitted
20:41:03.264 - main -> Output of tasks from Future object
20:41:03.265 - main -> Callable 1
20:41:03.266 - main -> Callable 2
20:41:03.266 - main -> Callable 3
Shutdown executor service
Waiting for pending tasks to finish
Shutdown finish
*/

Following example creates ScheduledThreadPoolExecutor with single thread and schedules a Runnable task to be executed after delay of 10 seconds. schedule method call returns ScheduledFuture object that we used to check if task is complete using ScheduledFuture.isDone(). ScheduledFuture.getDelay() show remaining delay in task execution. Negative value means it’s been that much time task execution started.

Runnable task = () -> {
    print("Task is running");
    print("Task is complete");
};

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.schedule(task, 10, TimeUnit.SECONDS);
print("Task scheduled to run after 10 seconds");

try {
    TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
    e.printStackTrace();
}

print("isDone(): "+scheduledFuture.isDone());
print("getDelay(): "+scheduledFuture.getDelay(TimeUnit.SECONDS));

shutdownExecutor(scheduledExecutorService);

/* 
--- Output ---
20:42:39.666 - main -> Task scheduled to run after 10 seconds
20:42:49.585 - pool-1-thread-1 -> Task is running
20:42:49.585 - pool-1-thread-1 -> Task is complete
20:42:54.670 - main -> isDone(): true
20:42:54.670 - main -> getDelay(): -5
Shutdown executor service
Waiting for pending tasks to finish
Shutdown finish
*/

Following example creates ScheduledThreadPoolExecutor with single thread and schedules a Callable task to be executed after delay of 10 seconds. schedule method call returns ScheduledFuture object that we used to check if task is complete using ScheduledFuture.isDone() and to get value returned by Callable task using ScheduledFuture.get().

Callable<String> task = () -> {
    print("Task is running");
    print("Task is complete");
    return "This is return value";
};

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
ScheduledFuture<String> scheduledFuture = scheduledExecutorService.schedule(task, 10, TimeUnit.SECONDS);
print("Task scheduled to run after 10 seconds");

try {
    TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
    e.printStackTrace();
}

print("isDone(): "+scheduledFuture.isDone());
try {
    print("Return Value: "+ scheduledFuture.get());
} catch (Exception e) {
    e.printStackTrace();
}

shutdownExecutor(scheduledExecutorService);

/* 
--- Output ---
20:44:14.504 - main -> Task scheduled to run after 10 seconds
20:44:24.398 - pool-1-thread-1 -> Task is running
20:44:24.399 - pool-1-thread-1 -> Task is complete
20:44:29.511 - main -> isDone(): true
20:44:29.511 - main -> Return Value: This is return value
Shutdown executor service
Waiting for pending tasks to finish
Shutdown finish
*/

Following example creates ScheduledThreadPoolExecutor with single thread and schedules a Runnable task using scheduleAtFixedRate to be executed after initial delay of 3 seconds and delay of 2 seconds for subsequent executions.

Runnable task = () -> {
    print("Task is running");
    print("Task is complete");
};

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(task, 3, 2, TimeUnit.SECONDS);
print("Task scheduled");
try {
    TimeUnit.SECONDS.sleep(9);
} catch (InterruptedException e) {
    e.printStackTrace();
}
shutdownExecutor(scheduledExecutorService);

/* 
--- Output ---
20:45:45.049 - main -> Task scheduled
20:45:47.977 - pool-1-thread-1 -> Task is running
20:45:47.977 - pool-1-thread-1 -> Task is complete
20:45:49.973 - pool-1-thread-1 -> Task is running
20:45:49.974 - pool-1-thread-1 -> Task is complete
20:45:51.975 - pool-1-thread-1 -> Task is running
20:45:51.975 - pool-1-thread-1 -> Task is complete
20:45:53.973 - pool-1-thread-1 -> Task is running
20:45:53.973 - pool-1-thread-1 -> Task is complete
Shutdown executor service
Waiting for pending tasks to finish
Shutdown finish
*/

Following example creates ScheduledThreadPoolExecutor with single thread and schedules a Runnable task using scheduleAtFixedDelay to be executed after initial delay of 3 seconds and delay of 2 seconds from finish of previous execution for subsequent executions. Note task execution takes 3 seconds (manually added sleep)

Runnable task = () -> { 
    print("Task is running");
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    print("Task is complete");
};

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleWithFixedDelay(task, 3, 2, TimeUnit.SECONDS);
print("Task scheduled");
try {
    TimeUnit.SECONDS.sleep(18);
} catch (InterruptedException e) {
    e.printStackTrace();
}
shutdownExecutor(scheduledExecutorService);

/* 
--- Output ---
20:46:59.286 - main -> Task scheduled
20:47:02.195 - pool-1-thread-1 -> Task is running
20:47:05.196 - pool-1-thread-1 -> Task is complete
20:47:07.196 - pool-1-thread-1 -> Task is running
20:47:10.199 - pool-1-thread-1 -> Task is complete
20:47:12.201 - pool-1-thread-1 -> Task is running
20:47:15.203 - pool-1-thread-1 -> Task is complete
20:47:17.206 - pool-1-thread-1 -> Task is running
Shutdown executor service
Waiting for pending tasks to finish
20:47:20.208 - pool-1-thread-1 -> Task is complete
Shutdown finish
*/

Reference:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html

Please share it and help others if you found this blog helpful. Feedback, questions and comments are always welcome.

Further Reading

2 Comments

Comments