本篇內(nèi)容介紹了“什么是ThreadPoolExecutor”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
創(chuàng)新互聯(lián)建站堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的興文網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
ThreadPoolExecutor是一個(gè)通過(guò)使用可能幾個(gè)池線程之一來(lái)執(zhí)行每個(gè)提交任務(wù)的ExecutorService,這些線程池通常通過(guò)Executors工廠方法進(jìn)行配置。
ThreadPoolExecutor中的線程池處理了兩個(gè)不同的問(wèn)題:
1、由于減少了每個(gè)任務(wù)調(diào)用的開銷,在執(zhí)行大量的異步任務(wù)時(shí)它們通常提供改進(jìn)的性能;
2、它們提供了邊界和管理資源的一種手段,包括多線程,在執(zhí)行任務(wù)集合時(shí)的消耗。
每個(gè)ThreadPoolExecutor還維護(hù)一些基本的統(tǒng)計(jì)數(shù)據(jù),例如完成任務(wù)的數(shù)量。
AtomicInteger類型的ctl代表了ThreadPoolExecutor中的控制狀態(tài),它是一個(gè)復(fù)核類型的成員變量,是一個(gè)原子整數(shù),借助高低位包裝了兩個(gè)概念:
(1)workerCount:線程池中當(dāng)前活動(dòng)的線程數(shù)量,占據(jù)ctl的低29位;
(2)runState:線程池運(yùn)行狀態(tài),占據(jù)ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五種狀態(tài)。
//COUNT_BITS分割32位二進(jìn)制偏移量,Integer.SIZE即Integer類型長(zhǎng)度(32),COUNT_BITS=29,高3位保存線程池的狀態(tài),低29位用來(lái)計(jì)量對(duì)象池中工作線程數(shù) private static final int COUNT_BITS = Integer.SIZE - 3; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
AtomicInteger ctl的定義如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //異或運(yùn)算符 100100|111=100111 private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池理想的最大工作線程數(shù)(上限):
//(1<<COUNT_BITS)-1=0x20000000-1=0x1fffffff private static final int CAPACITY = (1 << COUNT_BITS) - 1;
獲取線程池當(dāng)前的工作線程數(shù):
//通過(guò)(與)運(yùn)算符,示例 11001&1111=1001,CAPACITY=0x1fffffff,所以就是ctl的值(&)CAPACITY就是只獲取ctl低29位的值就是當(dāng)前線程池的工作線程數(shù) private static int workerCountOf(int c) { return c & CAPACITY; }
//用以下文中workers集合操作的鎖 private final ReentrantLock mainLock = new ReentrantLock(); //用于保存任務(wù)并傳遞給工作線程的隊(duì)列 private final BlockingQueue<Runnable> workQueue; /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. * 保存線程池中所有工作線程的集合,僅在獲取mainLock鎖權(quán)限時(shí)可操作 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * Wait condition to support awaitTermination 創(chuàng)建線程池的線程通過(guò)調(diào)用線程池引用.awaitTermination方法中通過(guò)termination實(shí)現(xiàn)持有鎖后釋放鎖掛起等待工作線程tryTerminate操作成功喚醒,或者超時(shí)自動(dòng)喚醒中斷失敗。 */ private final Condition termination = mainLock.newCondition(); /** * Tracks largest attained pool size. Accessed only under * mainLock. 獲取線程池工作集合歷史最大容量,需獲得鎖 */ private int largestPoolSize; /** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. 池完成任務(wù)數(shù),在processWorkerExit函數(shù)持鎖增量更新 */ private long completedTaskCount; //用以持鎖任務(wù)創(chuàng)建worker時(shí)創(chuàng)建線程的工廠類 private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. * 在線程非RUNNING狀態(tài)或者池容量和隊(duì)列容器容量滿載時(shí)拒絕處理對(duì)象 */ private volatile RejectedExecutionHandler handler; /** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. * 空閑線程等待工作的超時(shí)時(shí)間(以納秒為單位)。 * 當(dāng)超過(guò)corePoolSize工作線程書或allowCoreThreadTimeOut為true時(shí),線程將使用此超時(shí)。 * 否則,他們將永遠(yuǎn)等待新的工作 */ private volatile long keepAliveTime; /** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. * 如果為false(默認(rèn)值為false),則即使處于空閑狀態(tài),核心線程也會(huì)保持活動(dòng)狀態(tài)。 * 如果為true,則活躍線程使用keepAliveTime來(lái)超時(shí)等待工作,達(dá)到閾值就會(huì)釋放線程 */ private volatile boolean allowCoreThreadTimeOut; /** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. * 除非設(shè)置allowCoreThreadTimeOut,否則核心池大小是保持活動(dòng)狀態(tài)(不允許超時(shí)等)的最低數(shù)量, * 在這種情況下,最小值為零 */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. * 線程池最大工作線程書數(shù),受CAPACITY約束,最大不會(huì)超過(guò)CAPACITY */ private volatile int maximumPoolSize; /** * The default rejected execution handler. * 默認(rèn)拒絕策略處理器 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
RUNNING: Accept new tasks and process queued tasks(運(yùn)行狀態(tài),接受新任務(wù),并處理隊(duì)列任務(wù))
// 二進(jìn)制位 -1=0x8000001(取反碼后+1得補(bǔ)碼)=0xfffffffe+1=0xffffffff // 右移29位后=0xe0000000即 1110 0000 0000 0000 0000 0000 0000 0000 // -536870912 private static final int RUNNING = -1 << COUNT_BITS;
SHUTDOWN: Don't accept new tasks, but process queued tasks(停止運(yùn)行狀態(tài),不接受新任務(wù),但處理隊(duì)列中任務(wù))
//SHUTDOWN=0 private static final int SHUTDOWN = 0 << COUNT_BITS;
STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks(中斷線程池工作狀態(tài),不接受新任務(wù),不處理隊(duì)列中準(zhǔn)備彈出的任務(wù),但是會(huì)執(zhí)行完現(xiàn)有的工作任務(wù)(前提是在修改為STOP前,彈出隊(duì)列的任務(wù)已經(jīng)走過(guò)線程狀態(tài)判斷,執(zhí)行業(yè)務(wù)方法,若正好彈出準(zhǔn)備判斷線程狀態(tài),STOP扭轉(zhuǎn)成功,當(dāng)前任務(wù)也會(huì)被攔截))
// STOP=0x20000000=0010 0000 0000 0000 0000 0000 0000 0000 // 536870912 private static final int STOP = 1 << COUNT_BITS;
TIDYING:All tasks have terminated, workerCount is zero,the thread transitioning to state TIDYING.will run the terminated() hook method(任務(wù)處理結(jié)束狀態(tài),workcount=0,線程池運(yùn)行狀態(tài)修改為TIDYING,并且會(huì)執(zhí)行==terminated()==鉤子函數(shù))
// TIDYING=0x40000000=0100 0000 0000 0000 0000 0000 0000 0000 // 1073741824 private static final int TIDYING = 2 << COUNT_BITS;
TERMINATED: terminated() has completed() (terminated()執(zhí)行完成后,會(huì)修改成TERMINATED狀態(tài))
// TERMINATED=0x60000000=0110 0000 0000 0000 0000 0000 0000 0000 // 1610612736 private static final int TERMINATED = 3 << COUNT_BITS;
/** The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() 顯式調(diào)用線程池showdown()方法,或者線程池對(duì)象不被引用,被GC回收時(shí)調(diào)用finalize()函數(shù),finalize()函數(shù)中調(diào)用shutdown() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() 顯式調(diào)用shutdownNow()扭轉(zhuǎn)狀態(tài),修改線程池中workers所有工作線程為中斷狀態(tài),讓接下來(lái)隊(duì)列彈出的任務(wù)都跳過(guò)執(zhí)行任務(wù) * SHUTDOWN -> TIDYING * When both queue and pool are empty 工作線程全部執(zhí)行完成且隊(duì)列也是空,則扭轉(zhuǎn)狀態(tài) * STOP -> TIDYING * When pool is empty 當(dāng)沒(méi)有任務(wù)時(shí),狀態(tài)扭轉(zhuǎn) * TIDYING -> TERMINATED * When the terminated() hook method has completed 執(zhí)行terminated(),try{terminated()}finally{扭轉(zhuǎn)狀態(tài)} */
// Packing and unpacking ctl //~CAPACITY 連同符號(hào)位反轉(zhuǎn)(即相反數(shù)-1,若不理解百度反碼和補(bǔ)碼) 得 0xe0000000 ,就是取高三位做位與計(jì)算 private static int runStateOf(int c) { return c & ~CAPACITY; }
public void execute(Runnable command) { //任務(wù)非空校驗(yàn) if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ //獲取當(dāng)前線程池計(jì)數(shù)器值 int c = ctl.get(); //判斷當(dāng)前工作線程池的活動(dòng)線程數(shù)是否<核心線程數(shù) if (workerCountOf(c) < corePoolSize) { //進(jìn)入addWorker函數(shù),參數(shù)true(標(biāo)識(shí)創(chuàng)建核心線程數(shù)工作線程,該函數(shù)中會(huì)對(duì)該標(biāo)識(shí)識(shí)別是當(dāng)前工作數(shù)量數(shù)比較核心線程數(shù)還是最大線程數(shù)),檢查是否可創(chuàng)建worker任務(wù)線程 if (addWorker(command, true)) return; //執(zhí)行此步,意味著進(jìn)入addWorker函數(shù),資源被其他線程爭(zhēng)奪,導(dǎo)致該任務(wù)沒(méi)有搶到創(chuàng)建核心工作線程的資源,二次獲取最新活動(dòng)線程數(shù) c = ctl.get(); } //檢查線程池狀態(tài)是否為RUNNING狀態(tài),并且任務(wù)隊(duì)列是否可追加該任務(wù) if (isRunning(c) && workQueue.offer(command)) { //重新獲取線程池ctl值 int recheck = ctl.get(); //檢查當(dāng)前線程池狀態(tài)為非RUNNING狀態(tài),且從隊(duì)列容器中回滾該任務(wù) if (! isRunning(recheck) && remove(command)) //拒絕加入任務(wù),實(shí)則調(diào)用上文中handlder的rejectedExecution()拋出異常(默認(rèn)AbortPolicy中止策略) reject(command); //獲取最后一次獲取的計(jì)量值,判斷是否工作線程均已完成任務(wù),因?yàn)楹苡锌赡苤霸?6行操作之前工作線程數(shù)已達(dá)最大線程數(shù)閾值,但是正好剛加入到隊(duì)列中后,線程已全部執(zhí)行完成,且釋放了,所以需要?jiǎng)?chuàng)建一個(gè)空任務(wù)的worker線程用以調(diào)用runWorker中從隊(duì)列中彈出任務(wù)去執(zhí)行(具體查看getTask()) else if (workerCountOf(recheck) == 0) addWorker(null, false); } //第三步則意味著第二步可能池狀態(tài)非RUNNING,當(dāng)然如果是非RUNNING狀態(tài),在addWorker判斷池狀態(tài)是否可接受新非核心任務(wù)。 //也有可能是隊(duì)列滿載,該任務(wù)會(huì)插隊(duì)嘗試創(chuàng)建非核心工作線程,如果創(chuàng)建失敗,會(huì)觸發(fā)拒絕策略異常 else if (!addWorker(command, false)) reject(command); } //拒絕任務(wù) final void reject(Runnable command) { //默認(rèn)拒絕策略Handler-AbortPolicy handler.rejectedExecution(command, this); }
流程圖:
2.創(chuàng)建任務(wù)線程(addWorker)(流程圖)
/** * firstTask:創(chuàng)建任務(wù)工作線程RunWorker-執(zhí)行的第一個(gè)任務(wù),也有可能是空任務(wù)(喚醒任務(wù)) * core:true(核心工作線程),false(非核心工作線程) 對(duì)應(yīng)的是比較corePoolSize和maxPoolSize條件。 **/ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //STOP、TIDYING、TERMINATED狀態(tài)不接受新任務(wù),所以直接拒絕,創(chuàng)建任務(wù)失敗 //SHUTDOWN狀態(tài)下,僅允許創(chuàng)建task為null的喚醒任務(wù)(前提隊(duì)列中存在任務(wù)),因?yàn)殛?duì)列中有任務(wù),否則喚醒任務(wù)創(chuàng)建線程無(wú)意義不允許創(chuàng)建工作線程 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //根據(jù)core標(biāo)識(shí),對(duì)應(yīng)比較閾值,首先保證不能>=(1>>29)-1,否則不允許創(chuàng)建工作線程 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS當(dāng)前值+1替換當(dāng)前值,根絕替換返回值判斷是否替換成功,成功,直接跳出循環(huán) if (compareAndIncrementWorkerCount(c)) break retry; //意味著CAS替換失敗,重新取值,判斷最新池狀態(tài)是否還是RUNNING,RUNNNING狀態(tài)則繼續(xù)執(zhí)行該循環(huán)體,嘗試ctl+1操作 //否則直接跳入外循環(huán),進(jìn)行狀態(tài)判斷是否允許創(chuàng)建任務(wù)線程 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //此次任務(wù)工作的創(chuàng)建標(biāo)記以及對(duì)應(yīng)的線程啟動(dòng)標(biāo)記 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //創(chuàng)建任務(wù)工作線程,查看下文代碼的Worker源碼 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //嘗試持有worker集合的權(quán)限獨(dú)占鎖 mainLock.lock(); try { //如果獲得鎖時(shí),線程池狀態(tài)非RUNNING或SHUTDOWN狀態(tài)TASK不為空,則不允許該任務(wù)工作對(duì)象加入集合,也不允許線程啟動(dòng) int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //檢查線程狀態(tài)是否可啟動(dòng) if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); //記錄worker集合在某一刻的長(zhǎng)度最大數(shù),按照配置來(lái)說(shuō),也就是同時(shí)存活存貨線程數(shù)最大也頂多就是MaxPoolSize if (s > largestPoolSize) largestPoolSize = s; //開啟任務(wù)工作對(duì)象加入集合成功標(biāo)記 workerAdded = true; } } finally { //釋放worker集合的權(quán)限獨(dú)占鎖,因?yàn)榭赡芡粫r(shí)刻有N個(gè)任務(wù)需要?jiǎng)?chuàng)建對(duì)象加入workers集合 mainLock.unlock(); } if (workerAdded) { //加入工作集合成功,則需要啟動(dòng)本次工作對(duì)象的內(nèi)置線程 t.start(); //工作對(duì)象線程啟動(dòng)成功標(biāo)記 workerStarted = true; } } } finally { if (! workerStarted) //添加任務(wù)工作對(duì)象失敗,看下文源碼 addWorkerFailed(w); } return workerStarted; } //持MainLock鎖,讓workers集合移除添加失敗的任務(wù),以及上文中ctl的cas自增操作回滾,嘗試中止線程,最終釋放鎖 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); //遞歸循環(huán),直至降一操作成功 decrementWorkerCount(); //嘗試停止線程池,該處不詳做介紹,在runWorker中會(huì)有介紹 tryTerminate(); } finally { mainLock.unlock(); } } //ThreadPoolExectutor內(nèi)部類 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //該任務(wù)工作對(duì)象內(nèi)置線程(用來(lái)處理該工作對(duì)象中的任務(wù),以及隊(duì)列中的任務(wù)),如果是ThreadFactory是異常的,則thread一定是null final Thread thread; //該任務(wù)工作對(duì)象的初始化任務(wù),有可能是NULL(喚醒任務(wù)) Runnable firstTask; //該工作任務(wù)執(zhí)行完成的任務(wù)次數(shù) volatile long completedTasks; /** * 構(gòu)造New實(shí)例的內(nèi)部變量 * state默認(rèn)為-1,在runWorker中執(zhí)行到持鎖修改state=1,才可以觸發(fā)線程中斷信號(hào),查看下文interruptIfStarted */ Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 將處理邏輯交給ThrearunWorker */ public void run() { runWorker(this); } //判斷該任務(wù)工作對(duì)象是否有線程持有鎖 protected boolean isHeldExclusively() { return getState() != 0; } //線程池修改為STOP狀態(tài)時(shí),會(huì)對(duì)worker集合中的工作對(duì)象內(nèi)置線程發(fā)送中斷信號(hào) void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } //上面的任務(wù)工作創(chuàng)建失敗后的回滾工作線程數(shù)自增操作 private void decrementWorkerCount() { //可以看到是遞歸降一操作,循環(huán)降一操作,直至成功才退出循環(huán) do {} while (! compareAndDecrementWorkerCount(ctl.get())); }
流程圖:
3.任務(wù)工作線程執(zhí)行解析(runWorker)
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //獲取工作初始化對(duì)象時(shí)的任務(wù) Runnable task = w.firstTask; //然后置空,防止重復(fù)執(zhí)行 w.firstTask = null; //本處并不是釋放鎖,只是把默認(rèn)state(-1)修改為0,允許中斷。 w.unlock(); boolean completedAbruptly = true; try { //如果內(nèi)置任務(wù)是NULL,就會(huì)去從隊(duì)列中彈出任務(wù)處理,空隊(duì)列就會(huì)阻塞或者超時(shí)阻塞。 while (task != null || (task = getTask()) != null) { w.lock(); /** * 兩次檢查 * 第一次檢查 如果是>=STOP狀態(tài) * 第二次檢查 獲取當(dāng)前線程中斷信號(hào)(該靜態(tài)方法會(huì)清除中斷信號(hào))且判斷是否>=STOP狀態(tài) * 前兩次檢查任一滿足,則繼續(xù)檢查該線程是否中斷,未中斷將中斷該線程 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //子實(shí)現(xiàn)類執(zhí)行任務(wù)前的鉤子函數(shù) beforeExecute(wt, task); Throwable thrown = null; try { //執(zhí)行execute傳入的任務(wù),或者execute加入到隊(duì)列中的任務(wù) task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //子實(shí)現(xiàn)類執(zhí)行任務(wù)后的鉤子函數(shù) afterExecute(task, thrown); } } finally { task = null; //每個(gè)工作對(duì)象內(nèi)部會(huì)記錄worker對(duì)應(yīng)的線程處理了多少個(gè)任務(wù)(無(wú)論任務(wù)內(nèi)部工作是否有異常),但是如果遇到某個(gè)任務(wù)拋出異常后,該線程就會(huì)釋放 //比如隊(duì)列容量80個(gè),池工作最大工作線程數(shù)是20個(gè),然后隊(duì)列滿載的情況下,極有可能每個(gè)線程在執(zhí)行初始化的內(nèi)置任務(wù)zhi w.completedTasks++; w.unlock(); } } //如果任務(wù)執(zhí)行過(guò)程中出現(xiàn)異常,不會(huì)執(zhí)行此步 completedAbruptly = false; } finally { //線程釋放后的退出處理工作,會(huì)把此次執(zhí)行任務(wù)的結(jié)果和工作對(duì)象傳遞給該函數(shù)。 processWorkerExit(w, completedAbruptly); } } //用于子類實(shí)現(xiàn)類的runWorker的任務(wù)執(zhí)行前置鉤子函數(shù) protected void beforeExecute(Thread t, Runnable r) { } //用于子類實(shí)現(xiàn)類的runWorker的任務(wù)執(zhí)行后置鉤子函數(shù) protected void afterExecute(Thread t, Runnable r) { } private void processWorkerExit(Worker w, boolean completedAbruptly) { //未完成標(biāo)記,就先回滾數(shù)量,比如工作任務(wù)執(zhí)行異常,或者開啟核心線程超時(shí)配置,指定時(shí)間未收到隊(duì)列喚醒 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //持鎖,統(tǒng)計(jì)工作對(duì)象中完成的次數(shù),累加到線程池的累計(jì)變量 completedTaskCount += w.completedTasks; //集合移除該工作對(duì)象 workers.remove(w); } finally { mainLock.unlock(); } //嘗試中斷,RUNNING狀態(tài)或者SHUTDOWN狀態(tài)下隊(duì)列中有任務(wù),該操作無(wú)需理會(huì),下文有該方法詳細(xì)介紹 tryTerminate(); int c = ctl.get(); /** * 1.如果是STOP狀態(tài)即值以上的狀態(tài),該操作跳過(guò) * 2.該步操作主要是該線程處理任務(wù)結(jié)果來(lái)判斷,如果是異常退出,直接創(chuàng)建一個(gè)空任務(wù)的處理處理線程 * 3.如果正常線程處理完成釋放的線程,判斷是allowCoreThreadTimeOut是否是true,如果是且隊(duì)列是空,則有可能是線程超時(shí)未取到任務(wù)而釋放線程的,則所有線程return返回直接釋放,無(wú)需創(chuàng)建線程,否則則查看隊(duì)列中是否有任務(wù)未處理完(如果有任務(wù)則需要最少一個(gè)線程,如果是最后一個(gè)線程,需要再創(chuàng)建一個(gè)空任務(wù)處理線程,由隊(duì)列彈出任務(wù)來(lái)自旋處理),如果是allowCoreThreadTimeOut為默認(rèn)值false,判斷是否超過(guò)核心線程數(shù)如果超過(guò)就直接釋放線程,否則需要再創(chuàng)建一個(gè)空任務(wù)的處理線程 **/ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } //創(chuàng)建一個(gè)非核心的空任務(wù)線程用來(lái)處理隊(duì)列中的任務(wù) addWorker(null, false); } } final void tryTerminate() { //自旋嘗試停止線程池,前提是非RUNNING狀態(tài)或非(SHUTDOWN狀態(tài)下隊(duì)列不為空)的情況之一,否則直接跳出循環(huán) for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //如果是活躍線程數(shù)>0,就會(huì)從工作者列表中從第一個(gè)開始取,直到?jīng)]有中斷的工作線程,然后對(duì)該線程發(fā)送中斷信號(hào) if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //在持有鎖之后,則把shutdown或者stopz狀態(tài)嘗試扭轉(zhuǎn)為tidying狀態(tài) if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); //喚醒嘗試tryTerminate過(guò)程中阻塞在condition隊(duì)列中的線程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // 繼續(xù)自旋嘗試該次操作 } }
“什么是ThreadPoolExecutor”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
文章題目:什么是ThreadPoolExecutor
網(wǎng)頁(yè)網(wǎng)址:http://chinadenli.net/article10/gpppgo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、響應(yīng)式網(wǎng)站、外貿(mào)建站、營(yíng)銷型網(wǎng)站建設(shè)、自適應(yīng)網(wǎng)站、外貿(mào)網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)