public class LimitingTaskExecutor extends AbstractExecutorService
This class is a utility class implementing ExecutorService which can sit on top of any ExecutorService like ThreadPoolExecutor and limit the number of concurrent tasks submitted to it using a configured limit.
It sort of provides a fixed size view of a global ThreadPool of higher capacity instead of creating a new Fixed Size Thread Pool. It prevents eating into threads of the underlying Thread Pool when the concurrent tasks exceed the specified limit by queuing the tasks.
It will be useful for purposes like providing a small size ThreadPool for message subscription from a global Thread Pool.
Notes:
Each task executed is run by an instance of LimitingTaskExecutor.TaskWrapper
. The TaskWrapper which
is submitted to the underlying executorService re-uses the same run method for running
any tasks in LimitingTaskExecutors queue. So the competedTasks count of the underlying
executorService may not indicate the correct number of tasks processed from the LimitingTaskExecutor.
When the underlying executorService rejects a task it is queued if there is at least one
outstanding task submitted successfully to de-queue it. Otherwise the task is rejected
with the LimitingExecutor's configured RejectedTaskHandler. Rejections by the underlying
executorSerice are detected by handling the RejectedExecutionException
in case
of the default configuration or by using a ThreadLocal in case of ThreadPoolExecutor.CallerRunsPolicy
.
Note that you should not use the ThreadPoolExecutor.DiscardPolicy
handler because LimitingTaskExecutor cannot detect task rejection in that case. If you want to configure
a discard policy for the underlying executorService configure your own RejectedExecutionHandler
and include a call to LimitingTaskExecutor.TaskWrapper.rejectExecution()
in it as below:
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof TaskWrapper) {
((TaskWrapper) r).rejectExecution();
}
}
TaskRejection can also happen when submitting a task when the max allowed limit of tasks
are already submitted and the queue is full. The default queue is configured with a limit
of 150. A task is also rejected when the LimitingTaskExecutor or the underlying executorService
is shutdown. In all cases the Configured RejectedTaskHandler
's rejectedExecution method is called.
The default handler is ABORT_POLICY which throws a RejectedExecutionException
.
The number of outstanding tasks submitted to the underlying executorService is usually within the specified limit though in some corner cases it could exceed the limit.
Modifier and Type | Class and Description |
---|---|
class |
LimitingTaskExecutor.TaskWrapper
TaskWrapper responsible for running the tasks which is submitted to the underlying
ExecutorService.
|
Modifier and Type | Field and Description |
---|---|
static RejectedTaskHandler |
ABORT_POLICY
RejectedTaskHandler which Aborts the rejected task by throwing a RejectedExecutionException.
|
static RejectedTaskHandler |
DISCARD_POLICY
RejectedTaskHandler which discards the rejected task by doing nothing.
|
static RejectedTaskHandler |
RUN_IN_PLACE_POLICY
RejectedTaskHandler which runs the rejected task in place by calling the run method
of the submitted task so that the task runs in the same thread as the caller effectively
slowing down the caller thread.
|
Constructor and Description |
---|
LimitingTaskExecutor(ExecutorService pexecutorService,
int ptaskLimit)
Creates a Limiting TaskExecutor with ABORT_POLICY as the RejectedExecutionPolicy.
|
LimitingTaskExecutor(ExecutorService pexecutorService,
int ptaskLimit,
BlockingQueue<Runnable> pwaitingTasks)
Creates a Limiting TaskExecutor with ABORT_POLICY as the RejectedExecutionPolicy.
|
LimitingTaskExecutor(ExecutorService pexecutorService,
int ptaskLimit,
BlockingQueue<Runnable> pwaitingTasks,
RejectedTaskHandler prejectedTaskHandler)
Creates a Limiting TaskExecutor.
|
Modifier and Type | Method and Description |
---|---|
boolean |
awaitTermination(long timeout,
TimeUnit unit)
Calls awaitTermination on the underlying Executor Service.
|
void |
execute(Runnable command)
Executes the command by passing it on to the underlying executorService or queues it
for later execution if the number of outstanding tasks submitted to the executorService
is already the specified limit.
|
int |
getNumCompetedTasks()
Returns the number of tasks that have been completed so far by this LimitingTaskExecutor
which is a continously increasing value.
|
int |
getNumRunningTasks()
Returns the number of outstanding tasks submitted to the underlying executorService.
|
int |
getTaskLimit()
Returns the Task Limit which specifies the max no.
|
BlockingQueue<Runnable> |
getWaitingTaskQueue()
Returns the queue used to enqueue tasks for later processing.
|
boolean |
isShutdown()
Return true if shutdown has been called on this LimitingTaskExecutor or the underlying
executorService is shutdown.
|
boolean |
isTerminated()
Checks if this LimitingTaskExecutor is terminated after a shutdown call or a shutdown
of the underlying executorService.
|
void |
setTaskLimit(int ptaskLimit)
Sets a new value of the task limit which should be between 1 and 1000.
|
void |
shutdown()
Shuts down this LimitingTaskExecutor.
|
List<Runnable> |
shutdownNow()
In addition to shut down the task queue is emptied and returned.
|
String |
toString()
Returns a string identifying this executor, as well as its state,
including indications of run state and estimated worker and
task counts.
|
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
public static final RejectedTaskHandler ABORT_POLICY
public static final RejectedTaskHandler RUN_IN_PLACE_POLICY
public static final RejectedTaskHandler DISCARD_POLICY
public LimitingTaskExecutor(ExecutorService pexecutorService, int ptaskLimit, BlockingQueue<Runnable> pwaitingTasks, RejectedTaskHandler prejectedTaskHandler)
pexecutorService
- Underlying Executor Service used to submit the tasks toptaskLimit
- Task Limit. When this many tasks are running (i.e submitted but not completed) on
the underlying Executor Service, new tasks are added to the waiting tasks QpwaitingTasks
- The Q to use for adding waiting tasksprejectedTaskHandler
- Specifies what to do with tasks submitted when the max number of tasks
are running and the waiting tasks Q is also full.public LimitingTaskExecutor(ExecutorService pexecutorService, int ptaskLimit, BlockingQueue<Runnable> pwaitingTasks)
pexecutorService
- Underlying Executor Service used to submit the tasks toptaskLimit
- Task Limit. When this many tasks are running (i.e submitted but not completed) on
the underlying Executor Service, new tasks are added to the waiting tasks QpwaitingTasks
- The Q to use for adding waiting taskspublic LimitingTaskExecutor(ExecutorService pexecutorService, int ptaskLimit)
pexecutorService
- Underlying Executor Service used to submit the tasks toptaskLimit
- Task Limit. When this many tasks are running (i.e submitted but not completed) on
the underlying Executor Service, new tasks are added to the waiting tasks Qpublic void shutdown()
ExecutorService.shutdown()
public List<Runnable> shutdownNow()
ExecutorService.shutdownNow()
public boolean isShutdown()
ExecutorService.isShutdown()
public boolean isTerminated()
ExecutorService.isTerminated()
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
public void execute(Runnable command)
Executor.execute(java.lang.Runnable)
public String toString()
public int getTaskLimit()
public void setTaskLimit(int ptaskLimit)
ptaskLimit
- New value of task limitpublic BlockingQueue<Runnable> getWaitingTaskQueue()
public int getNumRunningTasks()
public int getNumCompetedTasks()
Copyright © 2013- Suresh Mahalingam. All Rights Reserved.