我們知道,多線程是Android開發(fā)中必現(xiàn)的場景,很多原生API和開源項目都有多線程的內(nèi)容,這里簡單總結(jié)和探討一下常見的多線程切換方式。

創(chuàng)新互聯(lián)公司自2013年起,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項目成都網(wǎng)站設(shè)計、成都做網(wǎng)站網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元酒泉做網(wǎng)站,已為上家服務(wù),為酒泉各地企業(yè)和個人服務(wù),聯(lián)系電話:18980820575
我們先回顧一下Java多線程的幾個基礎(chǔ)內(nèi)容,然后再分析總結(jié)一些經(jīng)典代碼中對于線程切換的實現(xiàn)方式。
幾點基礎(chǔ)
多線程切換,大概可以切分為這樣幾個內(nèi)容:如何開啟多個線程,如何定義每個線程的任務(wù),如何在線程之間互相通信。
Thread
Thread可以解決開啟多個線程的問題。
Thread是Java中實現(xiàn)多線程的線程類,每個Thread對象都可以啟動一個新的線程,注意是可以啟動,也可以不啟動新線程:
thread.run();//不啟動新線程,在當(dāng)前線程執(zhí)行 thread.start();//啟動新線程。
另外就是Thread存在線程優(yōu)先級問題,如果為Thread設(shè)置較高的線程優(yōu)先級,就有機(jī)會獲得更多的CPU資源,注意這里也是有機(jī)會,優(yōu)先級高的Thread不是必然會先于其他Thread執(zhí)行,只是系統(tǒng)會傾向于給它分配更多的CPU。
默認(rèn)情況下,新建的Thread和當(dāng)前Thread的線程優(yōu)先級一致。
設(shè)置線程優(yōu)先級有兩種方式:
thread.setPriority(Thread.MAX_PRIORITY);//1~10,通過線程設(shè)置 Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);//-20~19,通過進(jìn)程設(shè)置
這兩種設(shè)置方式是相對獨立的,在Android中,一般建議通過Process進(jìn)程設(shè)置優(yōu)先級。
ThreadPool
Thread本身是需要占用內(nèi)存的,開啟/銷毀過量的工作線程會造成過量的資源損耗,這種場景我們一般會通過對資源的復(fù)用來進(jìn)行優(yōu)化,針對IO資源我們會做IO復(fù)用(例如Http的KeepAlive),針對內(nèi)存我們會做內(nèi)存池復(fù)用(例如Fresco的內(nèi)存池),針對CPU資源,我們一般會做線程復(fù)用,也就是線程池。
所以,在Android開發(fā)中,一般不會直接開啟大量的Thread,而是會使用ThreadPool來復(fù)用線程。
Runnable
Runnable主要解決如何定義每個線程的工作任務(wù)的問題。
Runnable是Java中實現(xiàn)多線程的接口,相對Thread而言,Runnable接口更容易擴(kuò)展(不需要單繼承),而且,Thread本身也是一種Runnable:
public class Thread implements Runnable {相比Thread而言,Runnable不關(guān)注如何調(diào)度線程,只關(guān)心如何定義要執(zhí)行的工作任務(wù),所以在實際開發(fā)中,多使用Runnable接口完成多線程開發(fā)。
Callable
Callable和Runnable基本類似,但是Callable可以返回執(zhí)行結(jié)果。
線程間通信
Thread和Runnable能實現(xiàn)切換到另一個線程工作(Runnable需要額外指派工作線程),但它們完成任務(wù)后就會退出,并不注重如何在線程間實現(xiàn)通信,所以切換線程時,還需要在線程間通信,這就需要一些線程間通信機(jī)制。
Future
一般來說,如果要做簡單的通信,我們最常用的是通過接口回調(diào)來實現(xiàn)。
Future就是這樣一種接口,它可以部分地解決線程通信的問題,F(xiàn)uture接口定義了done、canceled等回調(diào)函數(shù),當(dāng)工作線程的任務(wù)完成時,它會(在工作線程中)通過回調(diào)告知我們,我們再采用其他手段通知其他線程。
mFuture = new FutureTask<MyBizClass>(runnable) {
@Override
protected void done() {
...//還是在工作線程里
}
};
Condition
Condition其實是和Lock一起使用的,但如果把它視為一種線程間通信的工具,也說的通。
因為,Condition本身定位就是一種多線程間協(xié)調(diào)通信的工具,Condition可以在某些條件下,喚醒等待線程。
Lock lock = new ReentrantLock();
Condition notFull = lock.newCondition(); //定義Lock的Condition
...
while (count == items.length)
notFull.await();//等待condition的狀態(tài)
...
notFull.signal();//達(dá)到condition的狀態(tài)Handler
其實,最完整的線程間通信機(jī)制,也是我們最熟悉的線程間通信機(jī)制,莫過于Handler通信機(jī)制,Handler利用線程封閉的ThreadLocal維持一個消息隊列,Handler的核心是通過這個消息隊列來傳遞Message,從而實現(xiàn)線程間通信。
AsyncTask的多線程切換
回顧完多線程的幾個基礎(chǔ)概念,先來看看簡單的多線程切換,Android自帶的AsyncTask。
AsyncTask主要在doInBackground函數(shù)中定義工作線程的工作內(nèi)容,在其他函數(shù)中定義主線程的工作內(nèi)容,例如
onPostExecute,這里面必然涉及兩個問題:
1.如何實現(xiàn)把doInBackground拋給工作線程
2.如何實現(xiàn)把onPostExecute拋給主線程
其實非常簡單,我們先看第一個
1.如何實現(xiàn)把doInBackground拋給工作線程
在使用AsyncTask時,我們一般會創(chuàng)建一個基于AsyncTask的擴(kuò)展類或匿名類,在其中實現(xiàn)幾個抽象函數(shù),例如:
private class MyTask extends AsyncTask<String, Object, Long> {
@Override
protected void onPreExecute() {... }
@Override
protected Long doInBackground(String... params) {... }
@Override
protected void onProgressUpdate(Object... values) {... }
@Override
protected void onPostExecute(Long aLong) {... }
@Override
protected void onCancelled() {... }
然后,我們會實例化這個AsyncTask:
MyTask mTask = new MyTask();
在AsyncTask源碼中,我們看到,構(gòu)造函數(shù)里會創(chuàng)建一個WorkerRunnable:
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {//這是一個Callable
public Result call() throws Exception {
...
result = doInBackground(mParams);//在工作線程中執(zhí)行
...WorkerRunnable實際上是一個Callable對象,所以,doInBackground是被包在一個Callable對象中了,這個Callable還會被繼續(xù)包裝,最終被交給一個線程池去執(zhí)行:
Runnable mActive;
...
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);//交給線程池執(zhí)行
}梳理一下,大致過程為:
定義doInBackground-->被一個Callable調(diào)用-->層層包為一個Runnable-->交給線程池執(zhí)行。
這樣就解決了第一個問題,如何實現(xiàn)把doInBackground拋給工作線程。
我們再來看第二個問題。
2.如何實現(xiàn)把onPostExecute拋給主線程
首先,我們要知道工作任務(wù)何時執(zhí)行完畢,就需要在工作完成時觸發(fā)一個接口回調(diào),也就是前面說過的Future,還是看AsyncTask源碼:
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
...
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {//Future的回調(diào)
try {
postResultIfNotInvoked(get());//get()是FutureTask接口函數(shù)
...
}
};
}
這樣,我們就知道可以處理onPostExecute函數(shù)了,但是,我們還需要把它拋給主線程,主要源碼如下:
//mWorker、mFuture和都會指向postResult函數(shù)
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
//getHandler()會指向InternalHandler
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());//指向MainLooper
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);//通過handler機(jī)制,回到主線程,調(diào)用finish函數(shù)
...
}
//在Handler中,最終會在主線程中調(diào)用finish
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);//調(diào)用onPostExecute接口函數(shù)
}
mStatus = Status.FINISHED;
}從源碼可以看到,其實AsyncTask還是通過Handler機(jī)制,把任務(wù)拋給了主線程。
總體來說,AsyncTask的多線程任務(wù)是通過線程池實現(xiàn)的工作線程,在完成任務(wù)后利用Future的done回調(diào)來通知任務(wù)完成,并通過handler機(jī)制通知主線程去執(zhí)行onPostExecute等回調(diào)函數(shù)。
EventBus的多線程切換
EventBus會為每個訂閱事件注冊一個目標(biāo)線程,所以需要從發(fā)布事件的線程中,根據(jù)注冊信息,實時切換到目標(biāo)線程中,所以,這是個很典型的多線程切換場景。
根據(jù)EventBus源碼,多線程切換的主要判斷代碼如下:
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);//直接在當(dāng)前線程執(zhí)行
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);//在當(dāng)前主線程執(zhí)行
} else {
mainThreadPoster.enqueue(subscription, event);//當(dāng)然不是主線程,交給主線程執(zhí)行
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);//當(dāng)前線程為主線程,交給工作線程
} else {
invokeSubscriber(subscription, event);//直接在當(dāng)前工作線程執(zhí)行
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);//異步執(zhí)行
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
所以,在EventBus里,如果需要做線程間切換,主要是拋給不同的任務(wù)隊列,實現(xiàn)線程間切換。
從任務(wù)隊列判斷,切換目標(biāo)包括主線程Poster、backgroundPoster和asyncPoster這樣三種。
我們先看任務(wù)隊列的設(shè)計:
任務(wù)隊列
因為EventBus不能判斷有哪些任務(wù)會并行,所以它采用了隊列的設(shè)計,多線程任務(wù)(EventBus的事件)會先進(jìn)入隊列,然后再處理隊列中的工作任務(wù),這是典型的生產(chǎn)--消費場景。
主線程Poster、backgroundPoster和asyncPoster都是任務(wù)隊列的不同實現(xiàn)。
主線程Poster
負(fù)責(zé)處理主線程的mainThreadPoster是Handler的子類:
final class HandlerPoster extends Handler {
...
void enqueue(Subscription subscription, Object event) {
...
synchronized (this) {//因為主線程只有一個,需要線程安全
queue.enqueue(pendingPost);
...
if (!sendMessage(obtainMessage())) {//作為handler發(fā)送消息
...
//在主線程中處理消息
@Override
public void handleMessage(Message msg) {
...
}
從源碼可以看出,這個Poster其實是一個Handler,它采用了哪個線程的消息隊列,就決定了它能和哪個線程通信,我們確認(rèn)一下:
EventBus(EventBusBuilder builder) {
...
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);//獲取主線程的MainLooper所以,EventBus是擴(kuò)展了一個Handler,作為主線程的Handler,通過Handler消息機(jī)制實現(xiàn)的多線程切換。
當(dāng)然,這個Handler本事,又多了一層queue。
backgroundPoster和asyncPoster
backgroundPoster和asyncPoster其實都是使用了EventBus的線程池,默認(rèn)是個緩存線程池:
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
所以,backgroundPoster和asyncPoster都是把任務(wù)交給線程池處理,這樣實現(xiàn)的多線程切換。
不過,backgroundPoster和asyncPoster也有一些不同,我們知道,在newCachedThreadPool中,最大線程數(shù)就是Integer的最大值,相當(dāng)于不設(shè)上限,所以可以盡可能多的啟動線程,asyncPoster就是這樣做的,enqueu和run都沒做同步,為每個事件單獨開啟新線程處理。
而在backgroundPoster中,可以盡量復(fù)用線程,主要方法是在run的時候,做個1秒的等待:
@Override
public void run() {
...
PendingPost pendingPost = queue.poll(1000);//允許等待1秒因為做了這一秒的掛起等待,在enqueue和run時,都需要用synchronized (this) 來確保線程安全。
另外,其實這里面還有個很重要的用法,就是Executors.newCachedThreadPool()中的SynchronousQueue:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());//用于輔助線程切換的阻塞隊列
}這個SynchronousQueue,在OkHttp中也使用了:
//okhttp3.Dispatcher源碼
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));//用于輔助線程切換的阻塞隊列
}
return executorService;
}SynchronousQueue與普通隊列不同,不是數(shù)據(jù)等線程,而是線程等數(shù)據(jù),這樣每次向SynchronousQueue里傳入數(shù)據(jù)時,都會立即交給一個線程執(zhí)行,這樣可以提高數(shù)據(jù)得到處理的速度。
總的來看,EventBus還是采用線程池實現(xiàn)工作線程,采用handler機(jī)制通知到主線程。不同在于,它采用的queue的隊列方式來管理所有的跨線程請求,而且它利用了SynchronousQueue阻塞隊列來輔助實現(xiàn)線程切換。
RxJava的多線程切換
其實在多線程管理這方面,RxJava的線程管理能力是非常令人贊嘆的。
RxJava的主要概念是工作流,它可以把一序列工作流定義在一個線程類型上:
myWorkFlow.getActResponse(myParam)
.subscribeOn(Schedulers.io())//指定線程
.xxx//其他操作這個構(gòu)建工作流的過程其實挺復(fù)雜的,不過如果我們只看線程操作這部分,其實流程非常清晰,我們追蹤一下subscribeOn的源碼(RxJava2):
//進(jìn)入subscribeOn
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}
//繼續(xù)進(jìn)入subscribeOn
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}然后,進(jìn)入FlowableSubscribeOn類
//進(jìn)入FlowableSubscribeOn類
public FlowableSubscribeOn(Flowable<T> source, Scheduler scheduler, boolean nonScheduledRequests) {
...
this.scheduler = scheduler;
...
}
@Override
public void subscribeActual(final Subscriber<? super T> s) {
Scheduler.Worker w = scheduler.createWorker();//根據(jù)參數(shù)值,如Schedulers.io()創(chuàng)建worker
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);//根據(jù)worker創(chuàng)建SubscribeOnSubscriber
s.onSubscribe(sos);
w.schedule(sos);
}
這個SubscribeOnSubscriber是個內(nèi)部類:
SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) {
...
this.worker = worker;
...
}
...
void requestUpstream(final long n, final Subscription s) {
...
worker.schedule(new Request(s, n));//worker會安排如何執(zhí)行runnable(Request是一個runnable)
...
}而這個worker,其實就是我們輸入的線程參數(shù),如Schedulers.io(),這個io是這樣定義的:
//io.reactivex.schedulers.Schedulers源碼
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
...
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class SingleTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return SingleHolder.DEFAULT;
}
}
static final class ComputationTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return ComputationHolder.DEFAULT;
}
}
...
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
這里的IO,最終會指向一個Scheduler,如IoScheduler:
//io.reactivex.internal.schedulers.IoScheduler源碼
...
static final class EventLoopWorker extends Scheduler.Worker {//Scheduler.Worker的實現(xiàn)類
...
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);//交給線程池
}
這樣,Scheculer中的具體任務(wù)就交給了某個線程池來處理。
需要特別說明的是,RxJava中調(diào)用Android主線程(AndroidSchedulers.mainThread),其實還是使用了Handler機(jī)制:
public final class AndroidSchedulers {
...
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
這個HandlerScheduler其實就是實現(xiàn)了Scheduler和Scheduler.Worker內(nèi)部類。
···
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
private static final class HandlerWorker extends Worker {
...
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
···總的來看,RxJava的多線程切換其實是利用了Scheculer.Worker這個內(nèi)部類,把任務(wù)交給Scheculer的Worker去做,而這個Scheculer的Worker是根據(jù)定義的線程來實現(xiàn)了不同的線程池,其實還是交給線程池去處理了。
至于主線程,RxJava也是使用了Handler機(jī)制。
總結(jié)
小小總結(jié)一下,基本上來說,Android中的多線程切換,主要使用Runnable和Callable來定義工作內(nèi)容,使用線程池來實現(xiàn)異步并行,使用Handler機(jī)制來通知主線程,有些場景下會視情況需要,使用Future的接口回調(diào),使用SynchronousQueue阻塞隊列等。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。
文章題目:淺談Android中多線程切換的幾種方法
本文URL:http://chinadenli.net/article32/ihhspc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供用戶體驗、軟件開發(fā)、動態(tài)網(wǎng)站、網(wǎng)站收錄、網(wǎng)站內(nèi)鏈、企業(yè)網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)