1 /*
2 * Copyright 2002-2009 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.springframework.core.task;
18
19 import java.io.Serializable;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.Future;
22 import java.util.concurrent.FutureTask;
23 import java.util.concurrent.ThreadFactory;
24
25 import org.springframework.util.Assert;
26 import org.springframework.util.ConcurrencyThrottleSupport;
27 import org.springframework.util.CustomizableThreadCreator;
28
29 /**
30 * {@link TaskExecutor} implementation that fires up a new Thread for each task,
31 * executing it asynchronously.
32 *
33 * <p>Supports limiting concurrent threads through the "concurrencyLimit"
34 * bean property. By default, the number of concurrent threads is unlimited.
35 *
36 * <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
37 * thread-pooling TaskExecutor implementation instead, in particular for
38 * executing a large number of short-lived tasks.
39 *
40 * @author Juergen Hoeller
41 * @since 2.0
42 * @see #setConcurrencyLimit
43 * @see SyncTaskExecutor
44 * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
45 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
46 */
47 public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncTaskExecutor, Serializable {
48
49 /**
50 * Permit any number of concurrent invocations: that is, don't throttle concurrency.
51 */
52 public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;
53
54 /**
55 * Switch concurrency 'off': that is, don't allow any concurrent invocations.
56 */
57 public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;
58
59
60 /** Internal concurrency throttle used by this executor */
61 private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();
62
63 private ThreadFactory threadFactory;
64
65
66 /**
67 * Create a new SimpleAsyncTaskExecutor with default thread name prefix.
68 */
69 public SimpleAsyncTaskExecutor() {
70 super();
71 }
72
73 /**
74 * Create a new SimpleAsyncTaskExecutor with the given thread name prefix.
75 * @param threadNamePrefix the prefix to use for the names of newly created threads
76 */
77 public SimpleAsyncTaskExecutor(String threadNamePrefix) {
78 super(threadNamePrefix);
79 }
80
81 /**
82 * Create a new SimpleAsyncTaskExecutor with the given external thread factory.
83 * @param threadFactory the factory to use for creating new Threads
84 */
85 public SimpleAsyncTaskExecutor(ThreadFactory threadFactory) {
86 this.threadFactory = threadFactory;
87 }
88
89
90 /**
91 * Specify an external factory to use for creating new Threads,
92 * instead of relying on the local properties of this executor.
93 * <p>You may specify an inner ThreadFactory bean or also a ThreadFactory reference
94 * obtained from JNDI (on a Java EE 6 server) or some other lookup mechanism.
95 * @see #setThreadNamePrefix
96 * @see #setThreadPriority
97 */
98 public void setThreadFactory(ThreadFactory threadFactory) {
99 this.threadFactory = threadFactory;
100 }
101
102 /**
103 * Return the external factory to use for creating new Threads, if any.
104 */
105 public final ThreadFactory getThreadFactory() {
106 return this.threadFactory;
107 }
108
109 /**
110 * Set the maximum number of parallel accesses allowed.
111 * -1 indicates no concurrency limit at all.
112 * <p>In principle, this limit can be changed at runtime,
113 * although it is generally designed as a config time setting.
114 * NOTE: Do not switch between -1 and any concrete limit at runtime,
115 * as this will lead to inconsistent concurrency counts: A limit
116 * of -1 effectively turns off concurrency counting completely.
117 * @see #UNBOUNDED_CONCURRENCY
118 */
119 public void setConcurrencyLimit(int concurrencyLimit) {
120 this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
121 }
122
123 /**
124 * Return the maximum number of parallel accesses allowed.
125 */
126 public final int getConcurrencyLimit() {
127 return this.concurrencyThrottle.getConcurrencyLimit();
128 }
129
130 /**
131 * Return whether this throttle is currently active.
132 * @return <code>true</code> if the concurrency limit for this instance is active
133 * @see #getConcurrencyLimit()
134 * @see #setConcurrencyLimit
135 */
136 public final boolean isThrottleActive() {
137 return this.concurrencyThrottle.isThrottleActive();
138 }
139
140
141 /**
142 * Executes the given task, within a concurrency throttle
143 * if configured (through the superclass's settings).
144 * @see #doExecute(Runnable)
145 */
146 public void execute(Runnable task) {
147 execute(task, TIMEOUT_INDEFINITE);
148 }
149
150 /**
151 * Executes the given task, within a concurrency throttle
152 * if configured (through the superclass's settings).
153 * <p>Executes urgent tasks (with 'immediate' timeout) directly,
154 * bypassing the concurrency throttle (if active). All other
155 * tasks are subject to throttling.
156 * @see #TIMEOUT_IMMEDIATE
157 * @see #doExecute(Runnable)
158 */
159 public void execute(Runnable task, long startTimeout) {
160 Assert.notNull(task, "Runnable must not be null");
161 if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
162 this.concurrencyThrottle.beforeAccess();
163 doExecute(new ConcurrencyThrottlingRunnable(task));
164 }
165 else {
166 doExecute(task);
167 }
168 }
169
170 public Future<?> submit(Runnable task) {
171 FutureTask<Object> future = new FutureTask<Object>(task, null);
172 execute(future, TIMEOUT_INDEFINITE);
173 return future;
174 }
175
176 public <T> Future<T> submit(Callable<T> task) {
177 FutureTask<T> future = new FutureTask<T>(task);
178 execute(future, TIMEOUT_INDEFINITE);
179 return future;
180 }
181
182 /**
183 * Template method for the actual execution of a task.
184 * <p>The default implementation creates a new Thread and starts it.
185 * @param task the Runnable to execute
186 * @see #setThreadFactory
187 * @see #createThread
188 * @see java.lang.Thread#start()
189 */
190 protected void doExecute(Runnable task) {
191 Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
192 thread.start();
193 }
194
195
196 /**
197 * Subclass of the general ConcurrencyThrottleSupport class,
198 * making <code>beforeAccess()</code> and <code>afterAccess()</code>
199 * visible to the surrounding class.
200 */
201 private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
202
203 @Override
204 protected void beforeAccess() {
205 super.beforeAccess();
206 }
207
208 @Override
209 protected void afterAccess() {
210 super.afterAccess();
211 }
212 }
213
214
215 /**
216 * This Runnable calls <code>afterAccess()</code> after the
217 * target Runnable has finished its execution.
218 */
219 private class ConcurrencyThrottlingRunnable implements Runnable {
220
221 private final Runnable target;
222
223 public ConcurrencyThrottlingRunnable(Runnable target) {
224 this.target = target;
225 }
226
227 public void run() {
228 try {
229 this.target.run();
230 }
231 finally {
232 concurrencyThrottle.afterAccess();
233 }
234 }
235 }
236
237 }