深入线程池

在使用线程时,我们可以直接 new 一个线程,但是如果并发的线程很多,并且每个线程都是执行一个时间很短的任务就结束了,像这样频繁创建线程会大大降低系统的效率,创建线程和销毁线程都需要消耗系统资源,而线程池可以使线程在执行完一个任务后,并不是销毁,而是去执行其他任务,使线程得到复用。

概述

JDK 的线程池使用了 Executor 框架,这个框架是在 JDK 1.5 引入的。Executor 是一个异步执行框架,支持多种不同类型的任务策略,提供了一种标准的方法将任务的提交过程和任务的执行过程解耦,基于生产者-消费者模型,提交任务的线程相当于生产者,执行任务的线程相当于消费者,使用 Runnable 表示任务,同时还提供生命周期管理、统计信息收集、性能监视等功能。

executor

  • Executor
    一个接口,其定义了一个接收 Runnable 对象的方法 execute。

  • ExecutorService
    一个比 Executor 使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回 Future 的方法。

  • AbstractExecutorService
    ExecutorService 执行方法的默认实现。

  • ScheduledExecutorService
    一个可定时调度任务的接口。

  • ScheduledThreadPoolExecutor
    ScheduledExecutorService 的实现,一个可定时调度任务的线程池。

  • ThreadPoolExecutor
    线程池,可以通过调用 Executors 中的静态工厂方法来创建线程池并返回一个 ExecutorService 对象。

构造方法

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

核心池的大小(corePoolSize)
在创建了线程池后,默认情况下,线程池中并没有任何线程,当有任务提交时,就会创建一个线程去执行任务,即使核心池中存在其他空闲的能够执行新任务的线程,当线程池中的线程数目达到 corePoolSize 后,就会把之后提交的任务放到缓存队列当中。

如果在创建线程池之后调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,则会在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。

线程池最大线程数量(maximumPoolSize)
线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

线程没有任务执行时的存活时间(keepAliveTime)
当工作线程数大于核心线程数时,线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

ThreadPoolExecutor 中额定的工作线程数量由 corePoolSize 决定,当任务数量超过额定线程数量时,将任务缓存在 BlockingQueue 之中,当发现如果连 queue 中也放不下时,ThreadPoolExecutor 会请求再加几个线程(但是不能无上限地加,设置 maximumPoolSize 为上限)。当到达最大上限后,接下来可能会发生两种情况:一种是不再有新任务提交,则 keepAliveTime;另一种是仍然有新任务提交,则执行 RejectedExecutionHandler。

存活时间的单位(unit)
可选的单位有天、小时、分钟、秒、毫秒、微秒和纳秒。

阻塞队列(workQueue)
用来存储等待执行的任务,当运行的线程数少于 corePoolSize 时,在有新任务时直接创建新线程来执行任务而无需进入队列排队,当运行的线程数等于或多于 corePoolSize 时,在有新任务提交时就会进入队列排队。当队列满时,有新任务提交再创建新线程,直到到达线程池的最大线程数量 maximumPoolSize。一般队列有以下几种选择:

阻塞队列 描述
ArrayBlockingQueue 一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。这是一个经典的“有界缓冲区”,队列中有一个固定大小的数组用来存放由生产者插入的元素,并由消费者提取。一旦创建,容量不能改变。尝试将元素放入一个满队列将导致操作阻塞;尝试从一个空队列中取一个元素也会被阻塞
LinkedBlockingQueue 一个基于链表结构的有界阻塞队列,此队列按 FIFO(先进先出)排序元素,新加入的元素放入尾部,默认长度为 Integer.MAX_VALUE,吞吐量通常要高于 ArrayBlockingQueue,但是在大多数并发应用中性能较差
SynchronousQueue 一个不存储元素的阻塞队列,没有任何内部容量。不能调用 peek 方法来查看队列中是否有元素,因为一个元素只有在你尝试取走时才会出现,不取走只想看一眼是不行的。只有另一个线程试图取出一个元素时才可以往队列中放入一个元素。因为不存储元素,所以也不能迭代。对于其他 Collection 方法,例如 contains,SynchronousQueue 充当空集合,该队列不允许空元素存在。它的吞吐量通常要高于 LinkedBlockingQueue
PriorityBlockingQueue 一个支持线程优先级排序的无界阻塞队列,默认按照字典顺序排序,也可以自定义实现 compareTo 方法来指定排序规则,不能保证同优先级元素的顺序

线程工厂(threadFactory)
线程池中使用 ThreadFactory 来创建新的线程,默认使用 defaultThreadFactory 创建线程。

任务被拒时的处理策略(handler)
默认使用 ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出 RejectExecutorException 异常。还有以下几种策略:

策略 描述
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出 RejectedExecutionException 异常
ThreadPoolExecutor.DiscardPolicy 也是丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务

线程池状态

线程池内部使用一个变量 ctl 来维护两个值:线程池的运行状态(runState)和线程数量(workerCount)。ctl 是一个 AtomicInteger 类型的变量,它的值当中,高 3 位用来保存 runState,低 29 位用于保存 workerCount,使用一个变量存储两个值可以避免在做相关决策的时候出现不一致的情况,不必为了维护两个状态的一致而使用锁。

运行状态

运行状态 描述
RUNNING 创建线程池后的初始状态,可以接受新的任务,也能够处理阻塞队列中的任务
SHUTDOWN 不再接受新提交的任务,但是还会继续处理阻塞队列中的任务(当然正在执行的任务也会继续)
STOP 不再接受新提交的任务,同时也不处理队列中的任务,并且会尝试中断正在执行的任务
TIDYING 所有的任务都终止了,workCount 的数量为 0(线程池和等待队列都为空)
TERMINATED 当线程池处于 TIDYING 状态,并且 terminated 钩子函数执行完毕后
运行状态转换 描述
RUNNING -> SHUTDOWN 调用 shutdown 方法,则线程池处于 SHUTDOWN 状态
RUNNING/SHUTDOWN -> STOP 调用 shutdownNow 方法,则线程池处于 STOP 状态
SHUTDOWN -> TIDYING 当线程池处于 SHUTDOWN 状态,并且线程池和等待队列都为空时,线程池状态为 TIDYING
STOP -> TIDYING 当线程池处于 STOP 状态,并且线程池为空时(等待队列肯定为空),线程池状态为 TIDYING
TIDYING -> TERMINATED 当线程池处于 TIDYING 状态,并且 terminate() 钩子方法完成后,线程池被设置为 TERMINATED 状态

runState

其实状态的变化基本上是通过 shutdown 和 shutdownNow 这两个方法来触发的,总的来说就是:这两个方法分别将运行状态设置为 SHUTDOWN 和 STOP,接下来或等待剩下的任务执行完成,或尝试中断正在执行的任务,最终它们都会调用 tryTerminate 方法,当线程池和等待队列都为空时,状态会变成 TIDYING,然后会调用 terminated 钩子函数(该方法是一个 protected 方法,由子类实现),最终将状态设置为 TERMINATED,并唤醒所有调用了 awaitTermination 方法,在 termination 条件上等待的线程。

提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
/**
* 如果活动线程数小于核心线程大小,新建线程
**/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* 如果活动线程数大于核心线程大小,将任务放入等待队列
**/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 执行到这里,说明队列已满,此时尝试新建线程,如果失败,则执行任务拒接策略
**/
else if (!addWorker(command, false))
reject(command);
}

execute() 方法是 Executor 中声明的方法,在 ThreadPoolExecutor 进行了具体的实现,这个方法是 ThreadPoolExecutor 的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit() 方法是在 ExecutorService 中声明的方法,在 AbstractExecutorService 中实现,在 ThreadPoolExecutor 中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和 execute() 方法不同,它能够返回任务执行的结果,查看 submit() 方法的实现,会发现它实际上还是调用的 execute() 方法,只不过它利用了 Future 模式来获取任务执行结果。

任务的执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 在 java 中,++i 和 i++ 操作并不是线程安全的,在使用的时候,不可避免的会用到 synchroized 关键字,
* 而 AtomicInteger 是一个提供原子操作的 Integer 类,通过线程安全的方式操作加减
* ctlOf(RUNNING, 0) = -536870912 | 0 = -536870912
**/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* Integer.SIZE = 32
**/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 536870911
**/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

/**
* -1: 10000001(原) -> 11111111(补) -> 左移一位 11111110(补) -> 10000010(原) = -2
* 左移 29 位,结果为-536870912
**/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 0: 00000000(原) -> 00000000(补) -> 左移一位 000000000(补) -> 00000000(原) = 0
* 左移 29 位,结果为 0
**/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 536870912
**/
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

/**
* 等待队列
**/
private final BlockingQueue<Runnable> workQueue;
/**
* 线程池主要状态锁,对线程池的主要状态的改变都需要使用该锁
**/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 活动线程容器,只有在持有 mainLock 时才可以访问
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 记录曾经出现过的最大线程数量,只有在持有 mainLock 时才可以访问
*/
private int largestPoolSize;
/**
* 如果为 false(默认),则核心线程即使空闲也会保持活动状态。
* 如果为 true,则核心线程使用 keepAliveTime 超时等待工作。
*/
private volatile boolean allowCoreThreadTimeOut;

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
/**
* 如果活动线程数大于等于核心线程或最大线程大小,则不创建线程,直接返回 false
**/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**
* workerCount 增加 1
**/
if (compareAndIncrementWorkerCount(c))
break retry;
// Re-read ctl
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/**
* Worker 是 ThreadPoolExecutor 的一个内部类,类里持有一个 Runnable 和一个 Thread,
* 当通过构造器传入一个任务时,类中的 Runnable 用来接收任务,
* Thread 用来接收通过 ThreadFactory 将当前 worker 作为对象传入后创建的线程
*
* Worker(Runnable firstTask) {
* setState(-1); // inhibit interrupts until runWorker
* this.firstTask = firstTask;
* this.thread = getThreadFactory().newThread(this);
* }
**/
w = new Worker(firstTask);
/**
* 此时这个 t 就是通过 ThreadFactory 创建的线程,线程执行的代码就是 worker 中 run 方法的代码
**/
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
/**
* 将 worker 放入活动线程容器中
**/
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
//记录出现的最大线程数
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
/**
* 线程 start,会调用 worker 的 run 方法
**/
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

/**
* 尝试使用 CAS-increment 来保证不使用悲观锁(使用乐观锁)的情况下增加 workerCount
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/**
* 循环取任务
*
* 如果是第一次执行,会先执行创建线程时传入的任务,如果任务执行结束,
* 会通过 getTask 从等待队列中取任务
**/
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
/**
* 任务执行前的一些处理,可以通过继承 ThreadPoolExecutor 后重写
**/
beforeExecute(wt, task);
Throwable thrown = null;
try {
//实际执行的还是真正我们需要执行的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
/**
* 任务执行后的一些处理,如统计等,可以通过继承 ThreadPoolExecutor 重写
**/
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

/**
* 线程退出工作处理
**/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

/**
* 从等待队列中取任务
**/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

/**
* Are workers subject to culling?
*
* case 1 timed=false
* allowCoreThreadTimeOut=false wc>corePoolSize=false
* 线程即使空闲也会保持活动,并且活动线程数没有超过核心线程数
* case 2 timed=true
* allowCoreThreadTimeOut=false wc>corePoolSize=true
* 虽然线程设置为即使空闲也会活动,但是活动线程数已经超过核心线程数
* case 3 timed=true
* allowCoreThreadTimeOut=true wc>corePoolSize=false
* 线程设置为空闲一定时间后销毁
* case 4 timed=true
* allowCoreThreadTimeout=true wc>corePoolSize=true
* 线程设置为空闲一定时间后销毁
**/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
/**
* poll 方法获取并删除队列头部元素,并且该方法会等待一段时间,如果超过时间则返回 null
* take 方法获取并删除队列头部元素,该方法会阻塞,直到元素可以获取
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}

线程池中线程初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务(execute 或 submit)之后才会创建线程。但是如果需要线程池创建之后立即创建线程,可以使用以下两种方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 初始化一个核心线程
* @return {@code true} if a thread was started
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}

/**
* 初始化所有核心线程
* @return the number of threads started
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}

线程池监控

方法 描述
getTaskCount 线程池需要执行的任务总数,由于任务和线程的状态在计算过程中可能会变化,因此这只是一个近似值
getCompletedTaskCount 线程池在运行过程中已完成的任务数量,小于或等于 taskCount,这是一个近似值
getLargestPoolSize 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了
getPoolSize 线程池的线程数量
getActiveCount 获取活动(正在执行任务)的线程数,这是一个近似值

线程池的关闭

ThreadPoolExecutor 提供了两个方法用于线程池的关闭,分别是 shutdown 和 shutdownNow。

方法 描述
shutdown 不会立即终止线程池,而是等待任务缓存队列中的任务都执行完后才终止,但不会接受新提交的任务
shutdownNow 立即终止线程池,并尝试中断正在执行的任务(通过 Thread.interrupt 方法实现,如果线程中没有使用 sleep、wait、Condition 相关的等一些可以响应中断的方法时,则无法中断线程),并且清空任务缓存队列,返回尚未执行的任务集合

整体分析

整体流程

首先创建线程池 ThreadPoolExecutor,设置初始化参数。新创建的线程池中并没有线程,需要通过 execute 方法或 submit 方法提交任务才会创建线程。

接着提交任务,任务提交后,线程池会执行以下操作。首先判断 corePoolSize 即核心线程池是否已满,没有满就新建线程,满了会再去判断 workQueue 即等待队列是否已满,没有满就放入等待队列,满了会再去判断线程池中线程数是否超过 maximumPoolSize 即最大线程数,没有就新建线程,如果超过了,执行拒绝策略。

这里所说的新建线程其实省略了很多必要的过程,首先执行的是 addWorker() 方法,该方法会创建一个 Worker 实例,Worker 是 ThreadPoolExecutor 的内部类,继承了 AQS,因此它是一个同步器,它的内部有一个 Thread 成员变量和一个 Runnable 成员变量。构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}

线程申请

firstTask 是初始化的任务,如果它的值是非空的,那么线程会在启动初期立即执行这个任务;如果它的值为 null,那么就会创建一个线程去等待队列中获取任务执行。getThreadFactory().newThread(this) 方法将 Worker 的实例传入来创建一个新的线程,任务对象就是 worker 实例。创建完 Worker 实例后,将该实例放入活动线程容器 workers 中,然后调用 workder 的 thread 属性的 start 方法启动该线程,即调用 worker 的 run 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

run 方法中调用 runWorker 方法将 worker 实例传入,这里的重点是调用的是任务的 run 方法,第一次先执行一开始构造的第一个任务 firstTask,执行结束后,从等待队列中循环取新任务来执行。也就是说,新线程调用的 run 方法会通过循环取队列的方式来执行不同的任务,从而达到线程复用的目的。

Executors

使用 Executors 可以使用 JDK 预置的一些线程池。

newFixedThreadPool

1
2
3
4
5
6
7
8
9
10
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

newFixedThreadPool 设置的 corePoolSize 和 maximumPoolSize 大小相同,阻塞队列使用的是 LinkedBlockingQueue,由于它没有给定一个初始化的队列容量,因此理论上该线程池永远不会拒绝任务,因此 maximumPoolSize 和 keepAliveTime 的设置都是无效的。

newSingleThreadExecutor

1
2
3
4
5
6
7
8
9
10
11
12
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

newSingleThreadExecutor 设置的 corePoolSize 和 maximumPoolSize 大小都为 1,阻塞队列使用的是 LinkedBlockingQueue。

newCachedThreadPool

1
2
3
4
5
6
7
8
9
10
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

将 corePoolSize 设置为 0,将 maximumPoolSize 设置为 Integer.MAX_VALUE,使用的是 SynchronousQueue,由于这个队列没有存储空间,这就意味着只要有任务就必须找到一个线程来处理,如果当前没有空闲的线程,那么就会创建一个新的线程来运行,当线程空闲超过 60 秒,就销毁线程。因此它比较适合来处理执行时间比较短的任务。

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(1);

executor.execute(() -> {
System.out.println("当前线程 ID=" + Thread.currentThread().getId()
+ " 线程名称=" + Thread.currentThread().getName());
Arrays.asList(1, 2, 3, 4, 5).forEach((e) -> System.out.println(e));
});

executor.execute(() -> System.out.println("当前线程 ID=" + Thread.currentThread().getId()
+ " 线程名称=" + Thread.currentThread().getName()));

executor.shutdown();
}

建议

阿里巴巴的 Java 开发手册中提到过,使用 Executors 的静态方法来创建线程池可能会导致 OOM。newFixedThreadPool 底层使用的是 LinkedBlockingQueue,这是一个通过链表实现的有界阻塞队列,容量可以指定,如果不指定就会变成一个无边界的阻塞队列,最大的长度为 Integer.MAX_VALUE。由于 newFixedThreadPool 在创建 LinkedBlockingQueue 时没有指定容量,所以可以不断地向队列中添加任务,这就可能出现因为任务过多而导致内存溢出。

同理,newCachedThreadPool 和 newScheduledThreadPool 使用的虽然不是 LinkedBlockingQueue,但是因为最大线程数指定为 Integer.MAX_VALUE,还是可能会出现 OOM。所以一般不推荐使用 JDK 提供的 Executors 来创建线程池,正确的做法是直接调用 ThreadPoolExecutor 的构造方法来创建线程池,在创建时,指定阻塞队列的容量。

1
2
private static ExecutorService executor =
new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(10));

这种情况下一旦提交的线程数超过当前可用线程数时,就会抛出 RejectedExecutionException,但是异常(Exception)总比发生错误(Error)要好。除了直接调用 ThreadPoolExecutor 的构造方法以外,还可以使用第三方的类库,比如 apache 或者 guava。

参考

http://www.cnblogs.com/dolphin0520/p/3932921.html

Java 线程池实现原理及其在美团业务中的实践