在上一篇文章C++使用socket實現(xiàn)與微信小程序通信(下)中,小懵白就給大家簡要地講解了線程池的原理。
今天呢,小懵白就給大家繼續(xù)講解C++如何實現(xiàn)封裝線程池類。
第一步
首先,我們需要定義生產(chǎn)者、消費者的存儲容器類型。
在這里呢,消費者的容器是通過使用SLT中的vector容器來實現(xiàn)的:std::vector<std::thread> pool;
而任務(wù)生產(chǎn)者容器則是由SLT的queue容器來實現(xiàn):std::queue<Task>task ;
定義完生產(chǎn)者、消費者的容器類型后,接著就是要構(gòu)建消費者容器:往pool里添加一定數(shù)量N的阻塞線程。
簡要代碼如下:
bool Init_ThreadPool(){
for(int i=0;i<max_thread;i++) //max_thread為初始化線程最大化量
pool.push_back(std::thread(&ThreadPool::runtask,this));
return true;
}
在初始化中,每一個線程都綁定了runtask函數(shù),也就是線程要執(zhí)行的函數(shù),該函數(shù)是用來實現(xiàn)獲取task的。
簡要代碼如下:
void runtask(){
while(!stop){
std::unique_lock<std::mutex>lk(_mutex);
//當(dāng)任務(wù)隊列為空或者stop為假時,阻塞當(dāng)前線程,直到條件變量喚醒
condition.wait(lk,[this]{return (!task.empty()||stop);});
/**********************************************
std::move是將對象的狀態(tài)或者所有權(quán)從一個對象轉(zhuǎn)移到另一個對象,
只是轉(zhuǎn)移,沒有內(nèi)存的搬遷或者內(nèi)存拷貝。
***********************************************/
Task ta=move(task.front());// 取一個 task
task.pop(); //從隊列移除正在執(zhí)行的任務(wù)
lk.unlock();
ta(); //執(zhí)行task任務(wù)
condition.notify_one(); //通知等待一個線程
}
//此處補充銷毀線程代碼
}
在該代碼中,bool型的stop變量是用來判定線程是否終止的,每當(dāng)一次while循環(huán)后進行判斷stop,若為假則繼續(xù)阻塞線程然后獲取任務(wù),否則的話結(jié)束線程。
消費者實現(xiàn)后,接下來就是實現(xiàn)生產(chǎn)者了。在此之前,還需要做一件事情:同一規(guī)劃任務(wù)變量。
using Task = std::function<void()>;
這是由于每一個task函數(shù)的返回類型都不可能是統(tǒng)一的,有的是void類型的,有的是bool類型的,更還有的是指針類型的。
為了能夠?qū)崿F(xiàn)代碼的高效性,這里采用了std::function<void()>思想,定義了Task類型。
接著就是實現(xiàn)生產(chǎn)者了:
bool Add_task(const Task&t){
if(task.size()==max_queue)Warn_LOG("添加任務(wù),任務(wù)隊列已滿,準(zhǔn)備阻塞");
if(thread_run==max_thread)Warn_LOG("添加任務(wù),但線程已經(jīng)分配完");
std::unique_lock<std::mutex>lk(_mutex);
while(task.size()==max_queue||stop){ //要是任務(wù)數(shù)量到了最大,就等待處理完再添加
condition.wait(lk);
}
if(stop)return false;
task.push(t);//給隊列中添加任務(wù)
task_numble=task.size();
std::cout<<"添加成功。"<<std::endl<<std::endl;
condition.notify_one(); //通知等待一個線程
return true;
}
為此,到這里的時候,生產(chǎn)者和消費者的線程池初步構(gòu)建完成了
第二步:解說
在第一步中,我們通過使用STL中的vector容器存儲了已初始化的線程后,那么接下來是如何實現(xiàn)生產(chǎn)者-消費者模式呢。
在生產(chǎn)者pool容器中,每一個線程都是綁定了runtask函數(shù),也就是說每一個線程都是在執(zhí)行runtask函數(shù)。
而在runtask函數(shù)代碼塊里,首先是判斷!stop條件是否滿足,若不滿足,則結(jié)束線程。
否則的話,添加互斥鎖,若任務(wù)隊列為空或者stop為假時,使用條件鎖std::condition_variable阻塞當(dāng)前線程。
直到條件變量喚醒后,才從任務(wù)隊列中獲取一個task,解鎖互斥鎖并執(zhí)行task,直至完成并通知等待一個線程,然后重新循環(huán)判斷。
第三步:封裝
實現(xiàn)完上面的功能后,接下來就是對功能進行封裝了。代碼如下:
class ThreadPool{
using Task = std::function<void()>;
public:
ThreadPool();
~ThreadPool();
bool Init_ThreadPool(); //初始化線程
bool Add_task(const Task&t); //添加任務(wù)
void End_threadpool(); //銷毀線程
int thread_num; //當(dāng)前的線程數(shù)量
int task_numble; //任務(wù)隊列
protected:
void runtask();
void Expand_thread();
bool Destroy_thread();
int id;
private:
int max_thread;//初始化線程數(shù)量
int max_queue;//初始化 任務(wù)隊列數(shù)量
std::vector<std::thread> pool;// 線程池、任務(wù)隊列
std::queue<Task>task ;
std::mutex _mutex ;//互斥鎖條件變量
std::condition_variable condition;
bool stop; //停止標(biāo)志位
int key; //目前正在執(zhí)行任務(wù)的線程數(shù)量
};
其中,~ThreadPool()的代碼實現(xiàn)如下:
ThreadPool::~ThreadPool(){
End_threadpool();
}
void ThreadPool::End_threadpool(){
stop=true; //停止讀取任務(wù)
while(thread_run){}//等待線程還沒有執(zhí)行完的任務(wù)
for(int i =0 ;i<pool.size();i++) //銷毀線程
pool[i].join() ;
std::queue<Task> empty;
swap(empty,task);//清空任務(wù)隊列
pool.clear(); //清空線程池
}
至此,線程池類封裝就實現(xiàn)了,其余功能函數(shù)在這就不多講了,有興趣的同學(xué)可自行完成。
好了,今天關(guān)于C++實現(xiàn)線程池的話題,就講到這里了,關(guān)于源代碼的問題,我整理好后會放在公眾號(小懵白生活小趣談)后臺里面。