Into the Future Thread - Part 2

Posted by BeCodeMonkey on March 4, 2016

Continuing with execution flow of Futures and thread. See Part 1 if you have not. I would like to highlight when future.get() completes in different scenarios. Firstly I know it is not best practice to use Futures in this way but I think it is still important to understand how the internals work.

Firstly I will do a basic Async (Future.thenApplyAsync) example, nothing special here.

package com.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class Test21Future {

  public static void main(String[] args) {

    Thread th1 = new Thread(() -> {
      try {
        task();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }, "master thread");
    th1.start();
  }

  public static void task() throws InterruptedException, ExecutionException {

    ExecutorService exeService = Executors.newFixedThreadPool(6, new ThreadFactory() {

      final AtomicInteger  counter  = new AtomicInteger();

      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, "PoolThread : " + counter.getAndIncrement());
      }
    });

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
      try {
        sleep(1000L);
      } catch (Exception e) {
        e.printStackTrace();
      }
      System.out.println("Future 1 Return in Thread [" + Thread.currentThread().getName() + "]");
      return "A";
    }, exeService);

    CompletableFuture<String> future2 = future1.thenApplyAsync(a -> {
      try {
        sleep(2000L);
      } catch (Exception e) {
        e.printStackTrace();
      }
      System.out.println("Future 2 Return in Thread [" + Thread.currentThread().getName() + "]");
      return a + "B";
    }, exeService);

    CompletableFuture<String> future3 = future2.thenApplyAsync(a -> {
      try {
        sleep(2000L);
      } catch (Exception e) {
        e.printStackTrace();
      }
      System.out.println("Future 3 Return in Thread [" + Thread.currentThread().getName() + "]");
      return a + "C";
    }, exeService);

    System.out.println("Starting the wait...");
    System.out.println("Future 1 Get : " + future1.get());
    System.out.println("Future 2 Get : " + future2.get());
    System.out.println("Future 3 Get : " + future3.get());
  }

  private static void sleep(long time) {
    try {
      Thread.sleep(time);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

Below is the output for the execution of above.

Starting the wait...
Future 1 Return in Thread [PoolThread : 0]
Future 1 Get : A
Future 2 Return in Thread [PoolThread : 1]
Future 2 Get : AB
Future 3 Return in Thread [PoolThread : 2]
Future 3 Get : ABC

Now as you expected as soon as the Future completes the get() block is removed and the code progresses. Also note how each future executes in a different thread since we use the Async method as discussed in Part 1.

Now things get interesting when you do not use Async (Future.thenApply()). Here is a example.

package com.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class Test21Future {

  public static void main(String[] args) {

    Thread th1 = new Thread(() -> {
      try {
        task();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }, "master thread");
    th1.start();

  }

  public static void task() throws InterruptedException, ExecutionException {

    ExecutorService exeService = Executors.newFixedThreadPool(6, new ThreadFactory() {

      final AtomicInteger  counter  = new AtomicInteger();

      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, "PoolThread : " + counter.getAndIncrement());
      }
    });

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
      try {
        sleep(1000L);
      } catch (Exception e) {
        e.printStackTrace();
      }
      System.out.println("Future 1 Return in Thread [" + Thread.currentThread().getName() + "]");
      return "A";
    }, exeService);

    CompletableFuture<String> future2 = future1.thenApply(a -> {
      try {
        sleep(2000L);
      } catch (Exception e) {
        e.printStackTrace();
      }
      System.out.println("Future 2 Return in Thread [" + Thread.currentThread().getName() + "]");
      return a + "B";
    });

    CompletableFuture<String> future3 = future2.thenApply(a -> {
      try {
        sleep(2000L);
      } catch (Exception e) {
        e.printStackTrace();
      }
      System.out.println("Future 3 Return in Thread [" + Thread.currentThread().getName() + "]");
      return a + "C";
    });
    
    CompletableFuture<String> future4 = future3.thenApply(a -> {
      try {
        sleep(2000L);
      } catch (Exception e) {
        e.printStackTrace();
      }
      System.out.println("Future 4 Return in Thread [" + Thread.currentThread().getName() + "]");
      return a + "D";
    });

    System.out.println("Starting the wait...");
    System.out.println("Future 1 Get : " + future1.get());
    System.out.println("Future 2 Get : " + future2.get());
    System.out.println("Future 3 Get : " + future3.get());
    System.out.println("Future 4 Get : " + future4.get());
  }

  private static void sleep(long time) {
    try {
      Thread.sleep(time);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

Below is the output:

Starting the wait...
Future 1 Return in Thread [PoolThread : 0]
Future 1 Get : A
Future 2 Return in Thread [PoolThread : 0]
Future 3 Return in Thread [PoolThread : 0]
Future 4 Return in Thread [PoolThread : 0]
Future 2 Get : AB
Future 3 Get : ABC
Future 4 Get : ABCD

As you can see here Future 2, 3 and 4 block on the get call until the last future (Future 4) completes. Future 1 will unblock on the get as expected since it is a Async call but since Future 2, 3 and 4 is not they will block.

It is a bit of a oddity to note about Futures.


Comments