欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

如何進(jìn)行ThreadPoolExecutor源碼解析

如何進(jìn)行ThreadPoolExecutor 源碼解析,針對(duì)這個(gè)問(wèn)題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問(wèn)題的小伙伴找到更簡(jiǎn)單易行的方法。

創(chuàng)新互聯(lián)主要從事成都做網(wǎng)站、成都網(wǎng)站制作、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)萬(wàn)載,十載網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專(zhuān)業(yè),歡迎來(lái)電咨詢(xún)建站服務(wù):18982081108

一、線程

  1. 線程是CPU 調(diào)度的最小操作單位,線程模型分為KLT 模型和ULT 模型,JVM 使用的是KLT 模型。

  2. 線程的狀態(tài) :NEW,RUNNABLE,BLOCKED,TERMINATED

二、線程池

1. 線程池解決的兩大核心問(wèn)題:
  • 在執(zhí)行大量異步運(yùn)算的時(shí)候,線程池用優(yōu)化系統(tǒng)性能,減少線程的反復(fù)創(chuàng)建所帶來(lái)的的系統(tǒng)開(kāi)銷(xiāo)

  • 提供了一種限制和管理資源的方法

2. 7 大核心參數(shù):
  1. corePoolSize :線程池中的核心線程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到 阻塞隊(duì)列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會(huì) 提前創(chuàng)建并啟動(dòng)所有核心線程。

  2. maximumPoolSize :線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize;

  3. keepAliveTime :線程池維護(hù)線程所允許的空閑時(shí)間。當(dāng)線程池中的線程數(shù)量大corePoolSize的時(shí) 候,如果這時(shí)沒(méi)有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷(xiāo)毀,而是會(huì)等待,直到等待 的時(shí)間超過(guò)了keepAliveTime;

  4. unit: keepAliveTime的單位;

  5. workQueue: 用來(lái)保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供了如下阻塞隊(duì)列:

  • ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按FIFO排序任務(wù);

  • LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene; -

  • SynchronousQuene:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于 LinkedBlockingQuene;

  • priorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列;

  1. threadFactory:它是ThreadFactory類(lèi)型的變量,用來(lái)創(chuàng)建新線程。默認(rèn)使用 Executors.defaultThreadFactory() 來(lái)創(chuàng)建線程。使用默認(rèn)ThreadFactory來(lái)創(chuàng)建線程 時(shí),會(huì)使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級(jí)并且是非守護(hù)線程,同時(shí)也設(shè) 置了線程的名稱(chēng)。

  2. handler: 線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒(méi)有空閑的工作線程,如果繼續(xù)提交任務(wù),必 須采取一種策略處理該任務(wù),線程池提供了4種策略:

  • AbortPolicy:直接拋出異常,默認(rèn)策略;

  • CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù);

  • DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);

  • DiscardPolicy:直接丟棄任務(wù);

上面的4種策略都是ThreadPoolExecutor的內(nèi)部類(lèi)。當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如 記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。

3. 線程池的生命周期狀態(tài) :
  • NEW

  • RUNNABLE

  • WATING

  • BLOCKED

  • TIMED_WATING

  • TERMINATED

4. 線程池的重要屬性 :ctl
  1. ctl 是對(duì)線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個(gè)字段, 它包含兩 部分的信息: 線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這 里可以看到,使用了Integer類(lèi)型來(lái)保存,高3位保存runState,低29位保存 workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個(gè)1),這個(gè)常 量表示workerCount的上限值,大約是5億。

  2. runState 主要提供線程池生命周期的控制,主要值包括:

  • RUNNING

(1) 狀態(tài)說(shuō)明:線程池處在RUNNING狀態(tài)時(shí),能夠接收新任務(wù),以及對(duì)已添加的任務(wù)進(jìn)行 處理。

(2) 狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING。換句話說(shuō),線程池被一旦被創(chuàng)建,就處 于RUNNING狀態(tài),并且線程池中的任務(wù)數(shù)為0!

  • SHUTDOWN

(1) 狀態(tài)說(shuō)明:線程池處在SHUTDOWN狀態(tài)時(shí),不接收新任務(wù),但能處理已添加的任務(wù)。

(2) 狀態(tài)切換:調(diào)用線程池的shutdown()接口時(shí),線程池由RUNNING -> -SHUTDOWN。

  • STOP

(1) 狀態(tài)說(shuō)明:線程池處在STOP狀態(tài)時(shí),不接收新任務(wù),不處理已添加的任務(wù),并且會(huì)中 斷正在處理的任務(wù)。

(2) 狀態(tài)切換:調(diào)用線程池的shutdownNow()接口時(shí),線程池由(RUNNING or SHUTDOWN ) -> STOP。

  • TIDYING

(1) 狀態(tài)說(shuō)明:當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會(huì)變?yōu)門(mén)IDYING 狀態(tài)。當(dāng)線程池變?yōu)門(mén)IDYING狀態(tài)時(shí),會(huì)執(zhí)行鉤子函數(shù)terminated()。terminated()在 ThreadPoolExecutor類(lèi)中是空的,若用戶想在線程池變?yōu)門(mén)IDYING時(shí),進(jìn)行相應(yīng)的處理; 可以通過(guò)重載terminated()函數(shù)來(lái)實(shí)現(xiàn)。

(2) 狀態(tài)切換:當(dāng)線程池在SHUTDOWN狀態(tài)下,阻塞隊(duì)列為空并且線程池中執(zhí)行的任務(wù)也 為空時(shí),就會(huì)由 SHUTDOWN -> TIDYING。 當(dāng)線程池在STOP狀態(tài)下,線程池中執(zhí)行的 任務(wù)為空時(shí),就會(huì)由STOP -> TIDYING。

  • TERMINATED

(1) 狀態(tài)說(shuō)明:線程池徹底終止,就變成TERMINATED狀態(tài)。

(2) 狀態(tài)切換:線程池處在TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后,就會(huì)由 TIDYING - > TERMINATED。 進(jìn)入TERMINATED的條件如下: 線程池不是RUNNING狀態(tài); 線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài); 如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空; workerCount為0,設(shè)置TIDYING狀態(tài)成功。

如何進(jìn)行ThreadPoolExecutor 源碼解析

  1. ctl相關(guān) API

  • runStateOf():獲取運(yùn)行狀態(tài);

  • workerCountOf():獲取活動(dòng)線程數(shù);

  • ctlOf():獲取運(yùn)行狀態(tài)和活動(dòng)線程數(shù)的值。

5. 線程池的行為
  • execute(Runnable command):執(zhí)行Ruannable類(lèi)型的任務(wù)

  • submit(task):可用來(lái)提交Callable或Runnable任務(wù),并返回代表此任務(wù)的Future 對(duì)象

  • shutdown():在完成已提交的任務(wù)后封閉辦事,不再接管新任務(wù),

  • shutdownNow():停止所有正在履行的任務(wù)并封閉辦事。

  • isTerminated():測(cè)試是否所有任務(wù)都履行完畢了。

  • isShutdown():測(cè)試是否該ExecutorService已被關(guān)閉。

6. 常用線程池的具體實(shí)現(xiàn)
ThreadPoolExecutor 默認(rèn)線程池 
ScheduledThreadPoolExecutor 定時(shí)線程池
7. 線程池監(jiān)控API
  • public long getTaskCount() //線程池已執(zhí)行與未執(zhí)行的任務(wù)總數(shù)

  • public long getCompletedTaskCount() //已完成的任務(wù)數(shù)

  • public int getPoolSize() //線程池當(dāng)前的線程數(shù)

  • public int getActiveCount() //線程池中正在執(zhí)行任務(wù)的線程數(shù)量

三、源碼解析

execute() 方法
//在將來(lái)的某個(gè)時(shí)間執(zhí)行給定的任務(wù)。任務(wù)可以是新起一個(gè)新線程或者復(fù)用現(xiàn)有池線程中的線程去執(zhí)行
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 執(zhí)行過(guò)程還是分為 3 步:
         *
         * 1.執(zhí)行任務(wù): 
         * 如果小于核心線程數(shù),嘗試創(chuàng)建一個(gè)新線程來(lái)執(zhí)行給定的任務(wù)。
         * 方法 addWorker() 就是真正的創(chuàng)建一個(gè)新線程來(lái)執(zhí)行任務(wù)的方法。
         * addWorker()方法會(huì)對(duì) runState 和 workerCount進(jìn)行原子檢查。
         * addWorker()方法會(huì)返回一個(gè) boolean 值,通過(guò)返回 false 值來(lái)防止在不應(yīng)該添加線程的情況下發(fā)出錯(cuò)誤警報(bào)
         * 
         *
         * 2.添加到阻塞隊(duì)列:
         * 未能滿足條件執(zhí)行完步驟 1 則添加到阻塞隊(duì)列。
         * 如果任務(wù)可以成功排隊(duì),會(huì)再次進(jìn)行檢查,檢查是否應(yīng)該添加線程(因?yàn)楝F(xiàn)有線程自上次檢查后就死了),
         * 或者自進(jìn)入此方法以來(lái)該池已關(guān)閉。因此,需要重新檢查狀態(tài),并在停止的情況下在必要時(shí)回滾隊(duì)列,如果沒(méi)有,則啟動(dòng)一個(gè)新線程。 
         *
         * 3.拒絕任務(wù): 
         * 如果無(wú)法將任務(wù)添加至阻塞隊(duì)列,最大線程數(shù)也未達(dá)到最大會(huì)嘗試添加一個(gè)新的線程。如果失敗,說(shuō)明線程池已關(guān)閉或處于飽和狀態(tài),因此拒絕該任務(wù)。
         */
         
         //clt記錄著runState和workerCount
        int c = ctl.get();
        /*
         * workerCountOf方法取出低29位的值,表示當(dāng)前活動(dòng)的線程數(shù);
         * 如果當(dāng)前活動(dòng)線程數(shù)小于corePoolSize,則新建一個(gè)線程放入線程池中;并把任務(wù)添加到該線程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
             * addWorker中的第二個(gè)參數(shù)表示限制添加線程的數(shù)量是根據(jù)corePoolSize來(lái)判斷還是maximumPoolSize來(lái)判斷
             * 如果為true,根據(jù)corePoolSize來(lái)判斷;
             * 如果為false,則根據(jù)maximumPoolSize來(lái)判斷
             */
            if (addWorker(command, true))
                return;
            //如果添加失敗,則重新獲取ctl值    
            c = ctl.get();
        }
        //執(zhí)行到此處說(shuō)明從核心線程里給當(dāng)前任務(wù)分配線程失敗
        //如果當(dāng)前線程池是運(yùn)行狀態(tài)并且任務(wù)添加到隊(duì)列成功
        if (isRunning(c) && workQueue.offer(command)) {
            //重新獲取ctl值。即使添加隊(duì)列成功也要再次檢查,如果不是運(yùn)行狀態(tài),由于之前已經(jīng)把任務(wù)添加到workerQueue 中了,所以要移除該任務(wù),執(zhí)行過(guò)后通過(guò)handler使用拒絕策略對(duì)該任務(wù)進(jìn)行處理,整個(gè)方法返回
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
                /*
                 * 獲取線程池中的有效線程數(shù),如果數(shù)量是0,則執(zhí)行addWorker方法 這里傳入的參數(shù)表示:
                 * 第一個(gè)參數(shù)為null,表示在線程池中創(chuàng)建一個(gè)線程,但不去啟動(dòng);
                 * 第二個(gè)參數(shù)為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPolSize,添加線程時(shí)根據(jù)maximumPoolSize來(lái)判斷;
                 * 如果判斷workerCount大于0,則直接返回,在workQueue中新增的comman 會(huì)在將來(lái)的某個(gè)時(shí)刻被執(zhí)行。
                 */
            //因?yàn)?nbsp;任務(wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時(shí)候,會(huì)直接從workQueue中 獲取任務(wù)。     
            else if (workerCountOf(recheck) == 0)
                //執(zhí)行到這里說(shuō)明任務(wù)已經(jīng)添加到阻塞隊(duì)列里了,最大線程數(shù)也未飽和,則創(chuàng)建一個(gè)新的線程去阻塞隊(duì)列里拿任務(wù)
                //這步操作也就是創(chuàng)建一個(gè)線程,但并沒(méi)有傳入任務(wù),因?yàn)槿蝿?wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時(shí)候,會(huì)直接從workQueue中 獲取任務(wù)。
                //為什么要這樣做呢?是為了保證線程池在RUNNING狀態(tài)下必須要有一個(gè)線程來(lái)執(zhí)行任務(wù)。
                addWorker(null, false);
        }
        /*
         * 如果執(zhí)行到這里,有兩種情況:
         * 1. 線程池已經(jīng)不是RUNNING狀態(tài);
         * 2. 線程池是RUNNING狀態(tài),但workerCount >= corePoolSize并且workQueue已滿。
         * 這時(shí),再次調(diào)用addWorker方法,但第二個(gè)參數(shù)傳入為false,將線程池的 有限線程數(shù)量的上限設(shè)置為maximumPoolSize;
         * 如果失敗則拒絕該任務(wù)
         */
        else if (!addWorker(command, false))
            reject(command);
    }
addWorker() 方法
/**
 * 檢查是否可以根據(jù)當(dāng)前線程池的狀態(tài)添加一個(gè)新的工作線程去執(zhí)行任務(wù)。
 * addWorker(runnable,true)表示從核心工作線程數(shù)中分配線程執(zhí)行傳進(jìn)來(lái)的任務(wù);
 * addWorker(null,false)表示從最大線程數(shù)中分配線程執(zhí)行阻塞隊(duì)列中的任務(wù)。
 * 線程池如果停止或者關(guān)閉則直接返回 false,如果線程池創(chuàng)建新線程失敗同樣也會(huì)返回 false。
 * 如果創(chuàng)建線程失敗,或者線程工廠返回 null,或者執(zhí)行當(dāng)前 addWorker()的線程拋出異常,(注意是當(dāng)前線程拋出異常,當(dāng)前線程拋出異常只與當(dāng)前任務(wù)有關(guān),并不影響其他任務(wù)的執(zhí)行),線程池的相關(guān)屬性會(huì)立即回滾
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外層 for 循環(huán)就是為了能給任務(wù)分配線程做準(zhǔn)備,判斷狀態(tài)--> 原子遞增 workerCount
    // 直到線程池狀態(tài)不符合條件返回 false ,或者自增成功跳出 for 循環(huán)
    // 同樣的,getTask()從阻塞隊(duì)列中獲取任務(wù)的時(shí)候也是這么個(gè)邏輯,先對(duì) workerCount 原子遞減,再去執(zhí)行任務(wù)
    for (;;) {
        //可以看到,每一步操作都會(huì)對(duì)線程池的狀態(tài)參數(shù)做判斷
        int c = ctl.get();
        int rs = runStateOf(c);
        
        //也是對(duì)線程池狀態(tài),隊(duì)列狀態(tài)做檢查
        /**
         * 這里的狀態(tài)判斷也很好理解:
         * 線程池狀態(tài)為SHUTDOWN,不會(huì)再接受新的任務(wù)了,返回 false
         * 想城池狀態(tài)不為SHUTDOWN,傳進(jìn)來(lái)的任務(wù)為空,并且阻塞隊(duì)列里也沒(méi)任務(wù),那還執(zhí)行個(gè)錘子任務(wù),同樣返回 false 
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //前面已經(jīng)判斷過(guò)滿足為任務(wù)分配一個(gè)線程去執(zhí)行任務(wù)
        //這個(gè) for 循環(huán)就是為了創(chuàng)建任務(wù)做準(zhǔn)備,先去原子性的遞增 workerCount,workerCount 遞增成功了才會(huì)去真正的為任務(wù)分配線程去執(zhí)行
        for (;;) {
            //當(dāng)前工作線程數(shù)
            int wc = workerCountOf(c);
            //當(dāng)前工作線程數(shù)大于corePoolSize 或者 maximumPoolSize (跟誰(shuí)比較就是根據(jù)傳進(jìn)來(lái)的參數(shù) core 判斷),
            //說(shuō)明也沒(méi)有分配的線程可以執(zhí)行任務(wù)了,返回 false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /** 
             * 執(zhí)行到這里說(shuō)明滿足條件了,可以分配出來(lái)線程去執(zhí)行任務(wù)了
             * 嘗試增加workerCount,如果成功,則跳出第一個(gè)for循環(huán)
             * 這里是進(jìn)行 CAS 自增 ctl 的 workerCount(先把數(shù)量自增,再跳出 for 循環(huán)創(chuàng)建新的線程去執(zhí)行任務(wù))
             * 該方法內(nèi)部也是調(diào)用了原子類(lèi) AtomicInteger.compareAndSet()方法,保證原子遞增
             */    
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //如果嘗試添加新的工作線程失敗則會(huì)繼續(xù)判斷當(dāng)前線程池的狀態(tài),狀態(tài)滿足繼續(xù)嘗試為當(dāng)前線程分配工作線程    
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //跳出 for 循環(huán)之后說(shuō)明線程池的工作線程數(shù) workerCount 已經(jīng)調(diào)節(jié)過(guò)了,接下來(lái)要做到就是真正的分配線程,執(zhí)行任務(wù)
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //Worker 對(duì)象是個(gè)內(nèi)部類(lèi),其實(shí)就是用threatFactory 生成一個(gè)新的線程
        //繼承 AQS 類(lèi),實(shí)現(xiàn)Runable 接口,重寫(xiě) run()方法,重寫(xiě)的 run()方法也很重要,后面會(huì)講
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //加鎖保證同步
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //還是先進(jìn)行一通的線程池狀態(tài)檢查
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //這個(gè) workers 是個(gè) HashSet,線程池也是通過(guò)維護(hù)這個(gè) workers 控制任務(wù)的執(zhí)行
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        //largestPoolSize記錄著線程池中出現(xiàn)過(guò)的最大線程數(shù)量
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //終于,調(diào)用線程的 start() 方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果創(chuàng)建線程失敗,就要回滾線程池的狀態(tài)了
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Worker 類(lèi)

Worker 類(lèi)是用來(lái)干嘛的,存在的意義

回頭再看 Worker 類(lèi),線程池中的每一個(gè)線程被封裝成一個(gè)Worker對(duì)象,ThreadPool維護(hù)的其實(shí)就是一組 Worker對(duì)象(HashSet)。

Worker類(lèi)繼承了AQS,并實(shí)現(xiàn)了Runnable接口,注意其中的firstTask和thread屬 性:firstTask用它來(lái)保存?zhèn)魅氲娜蝿?wù);thread是在調(diào)用構(gòu)造方法時(shí)通過(guò)ThreadFactory來(lái)創(chuàng) 建的線程,是用來(lái)處理任務(wù)的線程。

Worker繼承了AQS,使用AQS來(lái)實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用ReentrantLock來(lái) 實(shí)現(xiàn)呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:

  1. lock方法一旦獲取了獨(dú)占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中;

  2. 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程;

  3. 如果該線程現(xiàn)在不是獨(dú)占鎖的狀態(tài),也就是空閑的狀態(tài),說(shuō)明它沒(méi)有在處理任務(wù), 這時(shí)可以對(duì)該線程進(jìn)行中斷;

  4. 線程池在執(zhí)行shutdown方法或tryTerminate方法時(shí)會(huì)調(diào)用interruptIdleWorkers 方法來(lái)中斷空閑的線程,interruptIdleWorkers方法會(huì)使用tryLock方法來(lái)判斷線程 池中的線程是否是空閑狀態(tài);

  5. 之所以設(shè)置為不可重入,是因?yàn)槲覀儾幌M蝿?wù)在調(diào)用像setCorePoolSize這樣的 線程池控制方法時(shí)重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果 在任務(wù)中調(diào)用了如setCorePoolSize這類(lèi)線程池控制的方法,會(huì)中斷正在運(yùn)行的線程。 所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷。 此外,在構(gòu)造方法中執(zhí)行了setState(-1);,把state變量設(shè)置為-1,為什么這么做呢? 是因?yàn)锳QS中默認(rèn)的state是0,如果剛創(chuàng)建了一個(gè)Worker對(duì)象,還沒(méi)有執(zhí)行任務(wù)時(shí),這時(shí)就不應(yīng)該被中斷,看一下tryAquire方法:

Worker 類(lèi)中以及涉及到的重要的方法

  • tryAcquire(int unused) 方法

  /**
    * 用于判斷線程是否空閑以及是否可以被中斷
    */
   protected boolean tryAcquire(int unused) {
           //cas 修改狀態(tài),不可重入
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

tryAcquire方法是根據(jù)state是否是0來(lái)判斷的,所以,將state設(shè)置為-1是 為了禁止在執(zhí)行任務(wù)前對(duì)線程進(jìn)行中斷。

正因?yàn)槿绱?,在runWorker方法中會(huì)先調(diào)用Worker對(duì)象的unlock方法將state設(shè)置為 0。

  • runWorker(Worker w)方法

/**
 * Worker 類(lèi)實(shí)現(xiàn) Runnable 接口,重寫(xiě)的 run()方法
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //允許中斷
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //while 循環(huán)就是不斷的去執(zhí)行任務(wù),當(dāng)自己的任務(wù)(firstTask)執(zhí)行完之后依然會(huì)從阻塞隊(duì)列里拿任務(wù)去執(zhí)行,就這樣的操作保證了線程的重用
        //task 為空則從阻塞隊(duì)列中獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /**
             * 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),如果不是的話,則要保證當(dāng)前線程不是中斷狀態(tài)
             * 這里為什么要這么做呢?考慮在執(zhí)行該if語(yǔ)句期間可能也執(zhí)行了shutdownNow方法,shutdownNow方法會(huì) 把狀態(tài)設(shè)置為STOP,
             * 回顧一下STOP狀態(tài):不能接受新任務(wù),也不處理隊(duì)列中的任務(wù),會(huì)中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時(shí),
             * 調(diào)用 shutdownNow() 方法會(huì)使線程池進(jìn)入到STOP狀態(tài)。
             * STOP狀態(tài)要中斷線程池中的所有線程,而這里使用Thread.interrupted()來(lái)判斷是否中斷是為了確保在 RUNNING或者SHUTDOWN狀態(tài)時(shí)線程是非中斷狀態(tài)的,
             * 因?yàn)門(mén)hread.interrupted()方法會(huì)重置中斷的狀態(tài)。
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

總結(jié)一下runWorker方法的執(zhí)行過(guò)程:

  1. while循環(huán)不斷地通過(guò)getTask()方法獲取任務(wù);

  2. getTask()方法從阻塞隊(duì)列中取任務(wù);

  3. 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是 中斷狀態(tài);

  4. 調(diào)用task.run()執(zhí)行任務(wù);

  5. 如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法;

  6. runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷(xiāo)毀線程。

  • getTask()方法

/**
 * 從阻塞隊(duì)列中獲取任務(wù),返回值是 Runnable
 * 線程池狀態(tài)不滿足執(zhí)行條件時(shí)直接返回 null
 */
private Runnable getTask() {
    //timeOut變量的值表示上次從阻塞隊(duì)列中取任務(wù)時(shí)是否超時(shí)
    boolean timedOut = false; // Did the last poll() time out?
    
    //這里兩個(gè) for 循環(huán)操作和 addWorker() 方法里的兩個(gè) for 循環(huán)操作思想一樣
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        //仍然檢查線程池狀態(tài)
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //timed變量用于判斷是否需要進(jìn)行超時(shí)控制
        //allowCoreThreadTimeOut默認(rèn)是false,也就是核心線程不允許進(jìn)行超時(shí)
        //wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量
        //對(duì)于超過(guò)核心線程數(shù)量的這些線程,需要進(jìn)行超時(shí)控制
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //和 addWorker() 里流程一樣,也是先對(duì)線程池中 workerCount 進(jìn)行控制,再進(jìn)行后面的執(zhí)行任務(wù)操作
        //滿足條件則 workerCount 數(shù)量減一
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //根據(jù)timed來(lái)判斷,如果為true,則通過(guò)阻塞隊(duì)列的poll方法進(jìn)行超時(shí)控制,如果在keepAliveTime時(shí)間內(nèi)沒(méi)有獲取到任務(wù),則返回null
            //否則通過(guò)take方法,如果這時(shí)隊(duì)列為空,則take方法會(huì)阻塞直到隊(duì)列不為空
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果 r == null,說(shuō)明已經(jīng)超時(shí),timedOut設(shè)置為true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果獲取任務(wù)時(shí)當(dāng)前線程發(fā)生了中斷,則設(shè)置timedOut為false并返回
            timedOut = false;
        }
    }
}
processWorkerExit() 
/**
 * getTask方法返回null時(shí),在runWorker方法中會(huì)跳出while循環(huán),然后會(huì)執(zhí)行 processWorkerExit方法。
 * 做線程池的善后工作
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果completedAbruptly值為true,則說(shuō)明線程執(zhí)行時(shí)出現(xiàn)了異常,需要將workerCount減1
    //如果線程執(zhí)行時(shí)沒(méi)有出現(xiàn)異常,說(shuō)明在getTask()方法中已經(jīng)已經(jīng)對(duì)workerCount減1了,這里就不需要再減了
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //統(tǒng)計(jì)完成的任務(wù)數(shù)
        completedTaskCount += w.completedTasks;
        // 從workers中移除,也就表示著從線程池中移除了一個(gè)工作線程
        // workers 是前面提到的 HashSet,線程池就是通過(guò)維護(hù)這個(gè) worker()來(lái)保證線程池運(yùn)作的 
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根據(jù)線程池狀態(tài)進(jìn)行判斷是否結(jié)束線程池
    tryTerminate();


    /**
     * 當(dāng)線程池是RUNNING或SHUTDOWN狀態(tài)時(shí),如果worker是異常結(jié)束,那么會(huì)直接addWorker;
     * 如果allowCoreThreadTimeOut=true,并且等待隊(duì)列有任務(wù),至少保留一個(gè)worker
     * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit執(zhí)行完之后,工作線程被銷(xiāo)毀,以上就是整個(gè)工作線程的生 命周期,從execute方法開(kāi)始,Worker使用ThreadFactory創(chuàng)建新的工作線程, runWorker通過(guò)getTask獲取任務(wù),然后執(zhí)行任務(wù),如果getTask返回null,進(jìn)入 processWorkerExit方法,整個(gè)線程結(jié)束

四、思考

  1. 線程池如何實(shí)現(xiàn)線程重用的?

就是在重寫(xiě)的 run()方法里,通過(guò) while 循環(huán),執(zhí)行完 firstTask 之后依然從阻塞隊(duì)列里獲取任務(wù)去執(zhí)行。

  1. 線程超時(shí)怎么處理?

當(dāng)前面任務(wù)拋出異常,后面的線程還會(huì)執(zhí)行嗎? 答案是會(huì)。也是 while 循環(huán)里找答案,當(dāng)前線程拋出異常只會(huì)對(duì)當(dāng)前線程產(chǎn)生影響,對(duì)線程池里其他任務(wù)不會(huì)有影響。

  1. 什么時(shí)候會(huì)銷(xiāo)毀?

是runWorker方法執(zhí)行完之后,也就是Worker中的run方法執(zhí)行完,由JVM自動(dòng)回收。

  1. 阻塞隊(duì)列選?。吭贘DK中提供了如下阻塞隊(duì)列:

  • ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按FIFO排序任務(wù);

  • LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene;

  • SynchronousQuene:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于 LinkedBlockingQuene;

  • PriorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列;

  1. 丟棄策略選???線程池提供了4種策略:

  • AbortPolicy:直接拋出異常,默認(rèn)策略;

  • CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù);

  • DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);

  • DiscardPolicy:直接丟棄任務(wù);

  1. 線程數(shù)如何設(shè)置?

  • 一般設(shè)法是會(huì)根據(jù)我們?nèi)蝿?wù)的類(lèi)型去設(shè)置,簡(jiǎn)單分為: CPU 密集型 :CPU 核數(shù) + 1 IO 密集型:2*CPU 核數(shù) + 1

《Java并發(fā)編程實(shí)戰(zhàn)》中最原始的公式是這樣的: Nthreads=Ncpu?Ucpu?(1+WC)Nthreads=Ncpu?Ucpu?(1+CW);

  • Ncpu代表CPU的個(gè)數(shù),

  • Ucpu代表CPU利用率的期望值(0<Ucpu<10<Ucpu<1),

  • WCCW仍然是等待時(shí)間與計(jì)算時(shí)間的比例。

上面提供的公式相當(dāng)于目標(biāo)CPU利用率為100%。 通常系統(tǒng)中不止一個(gè)線程池,所以實(shí)際配置線程數(shù)應(yīng)該將目標(biāo)CPU利用率計(jì)算進(jìn)去。

關(guān)于如何進(jìn)行ThreadPoolExecutor 源碼解析問(wèn)題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒(méi)有解開(kāi),可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。

新聞名稱(chēng):如何進(jìn)行ThreadPoolExecutor源碼解析
當(dāng)前路徑:http://chinadenli.net/article48/gjsiep.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站導(dǎo)航、微信公眾號(hào)電子商務(wù)、ChatGPTApp開(kāi)發(fā)、品牌網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

小程序開(kāi)發(fā)