Categories

  • articles

Tags

  • java

Something I have now seen a few times in projects is people not being aware of how ThreadPoolExecutor works. When creating a ThreadPoolExecutor you have to arguments to set minPoolSize, maxPoolSize, keepAliveTime and workQueue. Now the gotcha is in when the ThreadPoolExecutor creates a new thread for execution especially when using unbounded size Queues. For example when looking at this ThreadPoolExecutor constructor:

new ThreadPoolExecutor(coreThreads, maximumThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());

You would expect the TPE (ThreadPoolExecutor) to spawn new threads up to the maximumThreads limit as long as there are tasks in the Queue provided. The problem is the ThreadPoolExecutor will not create more Threads than the coreThreads due to the way TPE is implemented. If you read the JavaDoc carefully you will see:

Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:

  • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
  • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
  • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

As you see TPE prefers to queue a request rather than create more threads than the coreThreads. It will only create more threads up to the maximumPoolSize limit if the queue.offer returns false.

Now for a little hack/solution to the problem I use. Firstly you need to create a TPE aware Queue.

public class TpeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

	private static final long	serialVersionUID	= 9119326121182411879L;

	private ThreadPoolExecutor	executor;

	public TpeLinkedBlockingQueue() {
		super();
	}

	public TpeLinkedBlockingQueue(int capacity) {
		super(capacity);
	}

	public void setExecutor(ThreadPoolExecutor executor) {
		this.executor = executor;
	}

	@Override
	public boolean offer(final E e) {
		final int poolSize = executor.getPoolSize();
		final int maximumPoolSize = executor.getMaximumPoolSize();
		if (poolSize >= maximumPoolSize || poolSize > executor.getActiveCount()) {
			return super.offer(e);
		}
		return false;
	}
}

What I have done here is overridden the offer of the queue to return false (Queue is “full”) when the TPE has active Threads are less than the maximumPoolSize. This changes the behaviour of the TPE to rather create more threads as there is work to do.

One last thing you must not forget is the since this will be running in a multi-threaded environment you will get a race condition here where during the queue offer period the TPE is not at maxPoolSize but at time of creating a new Thread it is. This is where the RejectedExecutionHandler comes in. Here is a simple RejectionHandler that simply re-queues the task in that condition.

public class ForceReQueuePolicy implements RejectedExecutionHandler {
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		try {
			executor.getQueue().put(r);
		} catch (InterruptedException e) {
			// should never happen since we never wait
			throw new RejectedExecutionException(e);
		}
	}
}

That is it! Now the TPE will create more threads up to the maximumPoolSize as long as there is more work to do. Here is just example of basic usage:

TpeLinkedBlockingQueue<Runnable> queue = new TpeLinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 50, 1L, TimeUnit.SECONDS, queue,
				new ForceReQueuePolicy());
queue.setExecutor(executor);