通過之前的一篇文章,我們總結了Executor框架。而在Executor框架中,ThreadPoolExecutor 是最核心的類。
ThreadPoolExecutor 看字面意思,是線程池的執(zhí)行器。我們本篇文章就基于ThreadPoolExecutor 這個類來展開總結線程池。
下篇文章會從源碼的角度解析ThreadPoolExecutor原理。
構造方法源碼如下:
public ThreadPoolExecutor(int corePoolSize,//核心線程數
int maximumPoolSize,//大線程數,可同時運行的大線程數
long keepAliveTime,//除核心線程數之外的,空閑下來的線程的存活時間
TimeUnit unit,//時間單位
BlockingQueueworkQueue,//裝任務的隊列,存儲等待執(zhí)行的任務
ThreadFactory threadFactory,//線程工廠,可自定義一個產生線程的類
RejectedExecutionHandler handler) {//拒絕策略
if (corePoolSize< 0 ||
maximumPoolSize<= 0 ||
maximumPoolSize< corePoolSize ||
keepAliveTime< 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
我們做個詳解:
corePoolSize : 核心線程數,就是可同時運行的最小數量,不會被銷毀的線程數,空閑時不還給操作系統(tǒng)。
maximumPoolSize:大線程數,任務有一個等待隊列,當這個隊列滿了,會啟動除核心線程之外的線程,而當前可啟動的大線程數就是這個參數值了。
workQueue:任務隊列,如果新來了任務,先判斷當前運行的線程數量是否達到核心線程數,沒達到就執(zhí)行,如果達到的話,新任務就會被存放在隊列中。
keepAliveTime :當線程池中的線程數量大于核心線程數的時候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了這個時間才會被回收銷毀;
unit:keepAliveTime的時間單位;
threadFactory:生產線程的類;
handler:拒絕策略,如果當前同時運行的線程數量達到大線程數量并且隊列也已經被放滿了任務時,ThreadPoolExecutor會員拒絕策略。
拒絕策略有以下四種:
2、ThreadPoolExecutor使用AbortPolicy:拋出 RejectedExecutionException來拒絕新任務的處理。(拋異常拒絕)
DiscardPolicy :不處理新任務,直接丟棄掉。(不處理)
DiscardOldestPolicy:丟掉最早的未處理的任務。(丟隊列最前端的任務)
CallerRunsPolicy:調用執(zhí)行自己的線程運行任務,也就是直接在調用execute方法的線程中運行(run)被拒絕的任務,如果執(zhí)行程序已關閉,則會丟棄該任務。因此這種策略會降低對于新任務提交速度,影響程序的整體性能。如果您的應用程序可以承受此延遲并且你要求任何一個任務請求都要被執(zhí)行的話,你可以選擇這個策略。(執(zhí)行此任務)當大池被填滿時,此策略為我們提供可伸縮隊列。
示例代碼如下:
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {//創(chuàng)建任務
Callablecallable = () ->Thread.currentThread().getName();
//線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy());
//提交并執(zhí)行任務
for (int i = 0; i< 10; i++) {Futuresubmit = executor.submit(callable);
String s = submit.get(3, TimeUnit.SECONDS);
System.out.println(s);
}
//關閉線程池
executor.shutdown();
}
運行結果如下:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
二、常見線程池
1、FixedThreadPool(指定線程數)其構造函數源碼如下:
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
從源碼可以看出,FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被設置為我們自己傳遞的nThreads參數,所以,除核心線程外沒有多余的線程。
《Java 并發(fā)編程的藝術》圖片如下:
線程數小于nThreads時,來新任務就創(chuàng)建新線程,等于nThreads時,就加入隊列等待,然后線程閑下來就立刻從隊列中取任務執(zhí)行。
為什么不推薦使用FixedThreadPool?
因為maximumPoolSize無效,而LinkedBlockingQueue隊列的大值是 Integer.MAX_VALUE,運行中的線程池會一直接受任務,直到隊列滿了還會接受,極端情況下會造成OOM。
構造函數源碼如下:
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
threadFactory));
}
就核心線程和大線程就是1。
《Java 并發(fā)編程的藝術》圖片如下:
和FixedThreadPool一樣,但是就1個線程,任務來了就加入隊列等待。
為什么不推薦使用SingleThreadExecutor ?
和FixedThreadPool一樣,而LinkedBlockingQueue隊列的大值是 Integer.MAX_VALUE,運行中的線程池會一直接受任務,直到隊列滿了還會接受,極端情況下會造成OOM。
構造函數代碼如下:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue(),
threadFactory);
}
從代碼可以看出,其沒有核心線程,來一個任務就創(chuàng)建一個線程(如果之前的線程不閑下來的話),一直創(chuàng)建到Integer.MAX_VALUE為止。極端情況下,這樣會導致耗盡 cpu 和內存資源。
《Java 并發(fā)編程的藝術》圖片如下:
1.首先執(zhí)行 SynchronousQueue.offer(Runnable task) 提交任務到任務隊列。如果當前 maximumPool 中有閑線程正在執(zhí)行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主線程執(zhí)行 offer 操作與空閑線程執(zhí)行的 poll 操作配對成功,主線程把任務交給空閑線程執(zhí)行,execute()方法執(zhí)行完成,否則執(zhí)行下面的步驟 2;(有空閑線程就執(zhí)行任務)
2.當初始 maximumPool 為空,或者 maximumPool 中沒有空閑線程時,將沒有線程執(zhí)行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,步驟 1 將失敗,此時 CachedThreadPool 會創(chuàng)建新線程執(zhí)行任務,execute 方法執(zhí)行完成;(沒有空閑線程就創(chuàng)建線程執(zhí)行任務)
為什么不推薦使用CachedThreadPool?
允許創(chuàng)建的線程數量為 Integer.MAX_VALUE ,可能會創(chuàng)建大量線程,從而導致 OOM。
主要用來在給定的延遲后運行任務,或者定期執(zhí)行任務。
構造函數代碼如下:
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPoolExecutor 使用的任務隊列 DelayQueue 封裝了一個 PriorityQueue,PriorityQueue會對隊列中的任務進行排序,執(zhí)行所需時間短的放在前面先被執(zhí)行(ScheduledFutureTask 的 time變量小的先執(zhí)行),如果執(zhí)行所需時間相同則先提交的任務將被先執(zhí)行(ScheduledFutureTask 的 squenceNumber 變量小的先執(zhí)行)。
運行機制圖片如下:
1、當調用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay() 方法時,會向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一個實現了 RunnableScheduledFuture 接口的 ScheduledFutureTask 。
2、線程池中的線程從 DelayQueue 中獲取 ScheduledFutureTask,然后執(zhí)行任務。
為什么不推薦使用ScheduledThreadPoolExecutor ?
允許創(chuàng)建的線程數量為 Integer.MAX_VALUE ,可能會創(chuàng)建大量線程,從而導致 OOM。而且在實際項目中應用較少,了解即可。
《阿里開發(fā)手冊》華山版的并發(fā)編程這一節(jié),有如下規(guī)定,參考:
多線程編程中一般線程的個數都大于 CPU 核心的個數,而一個 CPU 核心在任意時刻只能被一個線程使用,為了讓這些線程都能得到有效執(zhí)行,CPU 采取的策略是為每個線程分配時間片并輪轉的形式。當一個線程的時間片用完的時候就會重新處于就緒狀態(tài)讓給其他線程使用,這個過程就屬于一次上下文切換。
概括來說就是:當前任務在執(zhí)行完 CPU 時間片切換到另一個任務之前會先保存自己的狀態(tài),以便下次再切換回這個任務時,可以再加載這個任務的狀態(tài)。
任務從保存到再加載的過程就是一次上下文切換。
簡單的公式:
CPU 密集型任務(N+1): 這種任務消耗的主要是 CPU 資源,可以將線程數設置為 N(CPU 核心數)+1,比 CPU 核心數多出來的一個線程是為了防止線程偶發(fā)的缺頁中斷,或者其它原因導致的任務暫停而帶來的影響。一旦任務暫停,CPU 就會處于空閑狀態(tài),而在這種情況下多出來的一個線程就可以充分利用 CPU 的空閑時間。
I/O 密集型任務(2N): 這種任務應用起來,系統(tǒng)會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內不會占用 CPU 來處理,這時就可以將 CPU 交出給其它線程使用。因此在 I/O 密集型任務的應用中,我們可以多配置一些線程,具體的計算方法是 2N。
你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網查看詳情吧
新聞標題:多線程與高并發(fā)(15)——線程池詳解(非源碼層面)-創(chuàng)新互聯(lián)
轉載源于:http://chinadenli.net/article6/deejog.html
成都網站建設公司_創(chuàng)新互聯(lián),為您提供Google、網站收錄、網站營銷、手機網站建設、移動網站建設、虛擬主機
聲明:本網站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)