前言
創(chuàng)新互聯(lián)公司憑借在網(wǎng)站建設(shè)、網(wǎng)站推廣領(lǐng)域領(lǐng)先的技術(shù)能力和多年的行業(yè)經(jīng)驗(yàn),為客戶提供超值的營(yíng)銷(xiāo)型網(wǎng)站建設(shè)服務(wù),我們始終認(rèn)為:好的營(yíng)銷(xiāo)型網(wǎng)站就是好的業(yè)務(wù)員。我們已成功為企業(yè)單位、個(gè)人等客戶提供了做網(wǎng)站、網(wǎng)站建設(shè)服務(wù),以良好的商業(yè)信譽(yù),完善的服務(wù)及深厚的技術(shù)力量處于同行領(lǐng)先地位。
實(shí)現(xiàn)消息隊(duì)列的關(guān)鍵因素是考量不同線程訪問(wèn)消息隊(duì)列的同步問(wèn)題。本實(shí)現(xiàn)涉及到幾個(gè)知識(shí)點(diǎn)
std::lock_guard 介紹
std::lock_gurad 是 C++11 中定義的模板類(lèi)。定義如下:
template <class Mutex> class lock_guard;
lock_guard 對(duì)象通常用于管理某個(gè)鎖(Lock)對(duì)象,因此與 Mutex RAII 相關(guān),方便線程對(duì)互斥量上鎖,即在某個(gè) lock_guard 對(duì)象的聲明周期內(nèi),它所管理的鎖對(duì)象會(huì)一直保持上鎖狀態(tài);而 lock_guard 的生命周期結(jié)束之后,它所管理的鎖對(duì)象會(huì)被解鎖(注:類(lèi)似 shared_ptr 等智能指針管理動(dòng)態(tài)分配的內(nèi)存資源 )。
模板參數(shù) Mutex 代表互斥量類(lèi)型,例如 std::mutex 類(lèi)型,它應(yīng)該是一個(gè)基本的 BasicLockable 類(lèi)型,標(biāo)準(zhǔn)庫(kù)中定義幾種基本的 BasicLockable 類(lèi)型,分別 std::mutex, std::recursive_mutex, std::timed_mutex,std::recursive_timed_mutex 以及 std::unique_lock
std::unique_lock 介紹
lock_guard 最大的缺點(diǎn)也是簡(jiǎn)單,沒(méi)有給程序員提供足夠的靈活度,因此,C++11 標(biāo)準(zhǔn)中定義了另外一個(gè)與 Mutex RAII 相關(guān)類(lèi) unique_lock,該類(lèi)與 lock_guard 類(lèi)相似,也很方便線程對(duì)互斥量上鎖,但它提供了更好的上鎖和解鎖控制。
顧名思義,unique_lock 對(duì)象以獨(dú)占所有權(quán)的方式( unique owership)管理 mutex 對(duì)象的上鎖和解鎖操作,所謂獨(dú)占所有權(quán),就是沒(méi)有其他的 unique_lock 對(duì)象同時(shí)擁有某個(gè) mutex 對(duì)象的所有權(quán)。
新創(chuàng)建的 unique_lock 對(duì)象管理 Mutex 對(duì)象 m,并嘗試調(diào)用 m.lock() 對(duì) Mutex 對(duì)象進(jìn)行上鎖,如果此時(shí)另外某個(gè) unique_lock 對(duì)象已經(jīng)管理了該 Mutex 對(duì)象 m,則當(dāng)前線程將會(huì)被阻塞。
std::condition介紹
當(dāng) std::condition_variable 對(duì)象的某個(gè) wait 函數(shù)被調(diào)用的時(shí)候,它使用 std::unique_lock(通過(guò) std::mutex) 來(lái)鎖住當(dāng)前線程。當(dāng)前線程會(huì)一直被阻塞,直到另外一個(gè)線程在相同的 std::condition_variable 對(duì)象上調(diào)用了 notification 函數(shù)來(lái)喚醒當(dāng)前線程。
std::condition_variable 提供了兩種 wait() 函數(shù)。當(dāng)前線程調(diào)用 wait() 后將被阻塞(此時(shí)當(dāng)前線程應(yīng)該獲得了鎖(mutex),不妨設(shè)獲得鎖 lck),直到另外某個(gè)線程調(diào)用 notify_* 喚醒了當(dāng)前線程。
在線程被阻塞時(shí),該函數(shù)會(huì)自動(dòng)調(diào)用 lck.unlock() 釋放鎖,使得其他被阻塞在鎖競(jìng)爭(zhēng)上的線程得以繼續(xù)執(zhí)行。另外,一旦當(dāng)前線程獲得通知(notified,通常是另外某個(gè)線程調(diào)用 notify_* 喚醒了當(dāng)前線程),wait() 函數(shù)也是自動(dòng)調(diào)用 lck.lock(),使得 lck 的狀態(tài)和 wait 函數(shù)被調(diào)用時(shí)相同。
在第二種情況下(即設(shè)置了 Predicate),只有當(dāng) pred 條件為 false 時(shí)調(diào)用 wait() 才會(huì)阻塞當(dāng)前線程,并且在收到其他線程的通知后只有當(dāng) pred 為 true 時(shí)才會(huì)被解除阻塞。因此第二種情況類(lèi)似以下代碼:
while (!pred()) wait(lck);
std::function介紹
使用std::function可以將普通函數(shù),lambda表達(dá)式和函數(shù)對(duì)象類(lèi)統(tǒng)一起來(lái)。它們并不是相同的類(lèi)型,然而通過(guò)function模板類(lèi),可以轉(zhuǎn)化為相同類(lèi)型的對(duì)象(function對(duì)象),從而放入一個(gè)vector或其他容器里,方便回調(diào)。
代碼實(shí)現(xiàn):
#pragma once #ifndef MESSAGE_QUEUE_H #define MESSAGE_QUEUE_H #include <queue> #include <mutex> #include <condition_variable> template<class Type> class CMessageQueue { public: CMessageQueue& operator = (const CMessageQueue&) = delete; CMessageQueue(const CMessageQueue& mq) = delete; CMessageQueue() :_queue(), _mutex(), _condition(){} virtual ~CMessageQueue(){} void Push(Type msg){ std::lock_guard <std::mutex> lock(_mutex); _queue.push(msg); //當(dāng)使用阻塞模式從消息隊(duì)列中獲取消息時(shí),由condition在新消息到達(dá)時(shí)提醒等待線程 _condition.notify_one(); } //blocked定義訪問(wèn)方式是同步阻塞或者非阻塞模式 bool Pop(Type& msg, bool isBlocked = true){ if (isBlocked) { std::unique_lock <std::mutex> lock(_mutex); while (_queue.empty()) { _condition.wait(lock); } //注意這一段必須放在if語(yǔ)句中,因?yàn)閘ock的生命域僅僅在if大括號(hào)內(nèi) msg = std::move(_queue.front()); _queue.pop(); return true; } else { std::lock_guard<std::mutex> lock(_mutex); if (_queue.empty()) return false; msg = std::move(_queue.front()); _queue.pop(); return true; } } int32_t Size(){ std::lock_guard<std::mutex> lock(_mutex); return _queue.size(); } bool Empty(){ std::lock_guard<std::mutex> lock(_mutex); return _queue.empty(); } private: std::queue<Type> _queue;//存儲(chǔ)消息的隊(duì)列 mutable std::mutex _mutex;//同步鎖 std::condition_variable _condition;//實(shí)現(xiàn)同步式獲取消息 }; #endif//MESSAGE_QUEUE_H
線程池可以直接在構(gòu)造函數(shù)中構(gòu)造線程,并傳入回調(diào)函數(shù),也可以寫(xiě)一個(gè)Run函數(shù)顯示調(diào)用。這里我們選擇了第二種,對(duì)比:
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <functional> #include <vector> #include <thread> #include "MessageQueue.h" #define MIN_THREADS 1 template<class Type> class CThreadPool { CThreadPool& operator = (const CThreadPool&) = delete; CThreadPool(const CThreadPool& other) = delete; public: CThreadPool(int32_t threads, std::function<void(Type& record, CThreadPool<Type>* pSub)> handler); virtual ~CThreadPool(); void Run(); virtual void PreHandler(){} virtual void AfterHandler(){} void Submit(Type record); private: bool _shutdown; int32_t _threads; std::function<void(Type& record, CThreadPool<Type>* pSub)> _handler; std::vector<std::thread> _workers; CMessageQueue<Type> _tasks; }; template<class Type> CThreadPool<Type>::CThreadPool(int32_t threads, std::function<void(Type& record, CThreadPool<Type>* pSub)> handler) :_shutdown(false), _threads(threads), _handler(handler), _workers(), _tasks() { //第一種實(shí)現(xiàn)方案,注意這里的虛函數(shù)調(diào)用不正確 /*if (_threads < MIN_THREADS) _threads = MIN_THREADS; for (int32_t i = 0; i < _threads; i++) { _workers.emplace_back( [this]{ PreHandler(); while (!_shutdown){ Type record; _tasks.Pop(record, true); _handler(record, this); } AfterHandler(); } ); }*/ } //第二種實(shí)現(xiàn)方案 template<class Type> void CThreadPool<Type>::Run() { if (_threads < MIN_THREADS) _threads = MIN_THREADS; for (int32_t i = 0; i < _threads; i++) { _workers.emplace_back( [this]{ PreHandler(); while (!_shutdown){ Type record; _tasks.Pop(record, true); _handler(record, this); } AfterHandler(); } ); } } template<class Type> CThreadPool<Type>::~CThreadPool() { for (std::thread& worker : _workers) worker.join(); } template<class Type> void CThreadPool<Type>::Submit(Type record) { _tasks.Push(record); } #endif // !THREAD_POOL_H
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)創(chuàng)新互聯(lián)的支持。
新聞標(biāo)題:C++基于消息隊(duì)列的多線程實(shí)現(xiàn)示例代碼
瀏覽路徑:http://chinadenli.net/article10/jdpogo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供面包屑導(dǎo)航、服務(wù)器托管、網(wǎng)站內(nèi)鏈、小程序開(kāi)發(fā)、網(wǎng)站設(shè)計(jì)、自適應(yīng)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)