publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse;
for (;;) { intwc= workerCountOf(c); /** * 如果活动线程数大于等于核心线程或最大线程大小,则不创建线程,直接返回 false **/ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; /** * workerCount 增加 1 **/ if (compareAndIncrementWorkerCount(c)) break retry; // Re-read ctl c = ctl.get(); if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
booleanworkerStarted=false; booleanworkerAdded=false; Workerw=null; try { /** * Worker 是 ThreadPoolExecutor 的一个内部类,类里持有一个 Runnable 和一个 Thread, * 当通过构造器传入一个任务时,类中的 Runnable 用来接收任务, * Thread 用来接收通过 ThreadFactory 将当前 worker 作为对象传入后创建的线程 * * Worker(Runnable firstTask) { * setState(-1); // inhibit interrupts until runWorker * this.firstTask = firstTask; * this.thread = getThreadFactory().newThread(this); * } **/ w = newWorker(firstTask); /** * 此时这个 t 就是通过 ThreadFactory 创建的线程,线程执行的代码就是 worker 中 run 方法的代码 **/ finalThreadt= w.thread; if (t != null) { finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. intrs= runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable thrownewIllegalThreadStateException(); /** * 将 worker 放入活动线程容器中 **/ workers.add(w); ints= workers.size(); if (s > largestPoolSize) //记录出现的最大线程数 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { /** * 线程 start,会调用 worker 的 run 方法 **/ t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } /** * 尝试使用 CAS-increment 来保证不使用悲观锁(使用乐观锁)的情况下增加 workerCount */ privatebooleancompareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ privatestaticfinallongserialVersionUID=6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatilelong completedTasks;
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
/** Delegates main run loop to outer runWorker */ publicvoidrun() { runWorker(this); } } finalvoidrunWorker(Worker w) { Threadwt= Thread.currentThread(); Runnabletask= w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts booleancompletedAbruptly=true; try { /** * 循环取任务 * * 如果是第一次执行,会先执行创建线程时传入的任务,如果任务执行结束, * 会通过 getTask 从等待队列中取任务 **/ while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { /** * 任务执行前的一些处理,可以通过继承 ThreadPoolExecutor 后重写 **/ beforeExecute(wt, task); Throwablethrown=null; try { //实际执行的还是真正我们需要执行的任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; thrownewError(x); } finally { /** * 任务执行后的一些处理,如统计等,可以通过继承 ThreadPoolExecutor 重写 **/ afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } /** * 线程退出工作处理 **/ privatevoidprocessWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount();
run 方法中调用 runWorker 方法将 worker 实例传入,这里的重点是调用的是任务的 run 方法,第一次先执行一开始构造的第一个任务 firstTask,执行结束后,从等待队列中循环取新任务来执行。也就是说,新线程调用的 run 方法会通过循环取队列的方式来执行不同的任务,从而达到线程复用的目的。