本文介紹了協(xié)程的概念,并討論了 Tars Cpp 協(xié)程的實現(xiàn)原理和源碼分析。
一、前言
Tars 是 Linux 基金會的開源項目,它是基于名字服務(wù)使用 Tars 協(xié)議的高性能 RPC 開發(fā)框架,配套一體化的運(yùn)營管理平臺,并通過伸縮調(diào)度,實現(xiàn)運(yùn)維半托管服務(wù)。Tars 集可擴(kuò)展協(xié)議編解碼、高性能 RPC 通信框架、名字路由與發(fā)現(xiàn)、發(fā)布監(jiān)控、日志統(tǒng)計、配置管理等于一體,通過它可以快速用微服務(wù)的方式構(gòu)建自己的穩(wěn)定可靠的分布式應(yīng)用,并實現(xiàn)完整有效的服務(wù)治理。
Tars 目前支持 C++,Java,PHP,Nodejs,Go 語言,其中 TarsCpp 3.x 全面啟用對協(xié)程的支持,服務(wù)框架全面融合協(xié)程。本文基于TarsCpp-v3.0.0版本,討論了協(xié)程在TarsCpp服務(wù)框架的實現(xiàn)。
二、協(xié)程的介紹
2.1 什么是協(xié)程
協(xié)程的概念最早出現(xiàn)在Melvin Conway在1963年的論文("Design of a separable transition-diagram compiler"),協(xié)程認(rèn)為是“可以暫停和恢復(fù)執(zhí)行”的函數(shù)。

協(xié)程可以看成一種特殊的函數(shù),相比于函數(shù),協(xié)程最大的特點就是支持掛起(yield)和恢復(fù)(resume)的能力。如上圖所示:函數(shù)不能主動中斷執(zhí)行流;而協(xié)程支持主動掛起,中斷執(zhí)行流,并在一定時機(jī)恢復(fù)執(zhí)行。
協(xié)程的作用:
降低并發(fā)編碼的復(fù)雜度,尤其是異步編程(callback hell)。
協(xié)程在用戶態(tài)中實現(xiàn)調(diào)度,避免了陷入內(nèi)核,上下文切換開銷小。
2.2 進(jìn)程、線程和協(xié)程
我們可以簡單的認(rèn)為協(xié)程是用戶態(tài)的線程。協(xié)程和線程主要異同:
相同點:都可以實現(xiàn)上下文切換(保存和恢復(fù)執(zhí)行流)
不同點:線程的上下文切換在內(nèi)核實現(xiàn),切換的時機(jī)由內(nèi)核調(diào)度器控制。協(xié)程的上下文切換在用戶態(tài)實現(xiàn),切換的時機(jī)由調(diào)用方自身控制。
進(jìn)程、線程和協(xié)程的比較:

2.3 協(xié)程的分類
按控制傳遞(Control-transfer)機(jī)制分為:對稱(Symmetric)協(xié)程和非對稱(Asymmetric)協(xié)程。
對稱協(xié)程:協(xié)程之間相互獨(dú)立,調(diào)度權(quán)(CPU)可以在任意協(xié)程之間轉(zhuǎn)移。協(xié)程只有一種控制傳遞操作(yield)。對稱協(xié)程一般需要調(diào)度器支持,通過調(diào)度算法選擇下一個目標(biāo)協(xié)程。
非對稱協(xié)程:協(xié)程之間存在調(diào)用關(guān)系,協(xié)程讓出的調(diào)度權(quán)只能返回給調(diào)用者。協(xié)程有兩種控制操作:恢復(fù)(resume)和掛起(yield)。
下圖演示了對稱協(xié)程的調(diào)度權(quán)轉(zhuǎn)移流程,協(xié)程只有一個操作yield,表示讓出CPU,返回給調(diào)度器。

對稱協(xié)程示意圖
下圖演示了非對稱協(xié)程的調(diào)度權(quán)轉(zhuǎn)移流程。協(xié)程可以有兩個操作,即resume和yield。resume表示轉(zhuǎn)移CPU給被調(diào)用者,yield表示被調(diào)用者返回CPU給調(diào)用者。

非對稱協(xié)程示意圖
根據(jù)協(xié)程是否有獨(dú)立的??臻g,協(xié)程分為有棧協(xié)程(stackful)和無棧協(xié)程(stackless)兩種。
有棧協(xié)程:每個協(xié)程有獨(dú)立的??臻g,保存獨(dú)立的上下文(執(zhí)行棧、寄存器等),協(xié)程的喚醒和掛起就是拷貝和切換上下文。優(yōu)點:協(xié)程調(diào)度可以嵌套,在內(nèi)存中的任意位置、任意時刻進(jìn)行。局限:協(xié)程數(shù)目增大,內(nèi)存開銷增大。
無棧協(xié)程:單個線程內(nèi)所有協(xié)程都共享同一個??臻g(共享棧),協(xié)程的切換就是簡單的函數(shù)調(diào)用和返回,無棧協(xié)程通常是基于狀態(tài)機(jī)或閉包來實現(xiàn)。優(yōu)點:減小內(nèi)存開銷。局限:協(xié)程調(diào)度產(chǎn)生的局部變量都在共享棧上, 一旦新的協(xié)程運(yùn)行后共享棧中的數(shù)據(jù)就會被覆蓋, 先前協(xié)程的局部變量也就不再有效, 進(jìn)而無法實現(xiàn)參數(shù)傳遞、嵌套調(diào)用等高級協(xié)程交互。
Golang 中的 goroutine、Lua 中的協(xié)程都是有棧協(xié)程;ES6的 await/async、Python 的 Generator、C++20 中的 cooroutine 都是無棧協(xié)程。
三、Tars 協(xié)程實現(xiàn)
實現(xiàn)協(xié)程的核心有兩點:
實現(xiàn)用戶態(tài)的上下文切換。
實現(xiàn)協(xié)程的調(diào)度。
Tars 協(xié)程的由下面幾個類實現(xiàn):
TC_CoroutineInfo 協(xié)程信息類:實現(xiàn)協(xié)程的上下文切換。每個協(xié)程對應(yīng)一個 TC_CoroutineInfo 對象,上下文切換基于boost.context實現(xiàn)。
TC_CoroutineScheduler 協(xié)程調(diào)度器類:實現(xiàn)了協(xié)程的管理和調(diào)度。
TC_Coroutine 協(xié)程類:繼承于線程類(TC_Thread),方便業(yè)務(wù)快速使用協(xié)程。
Tars 協(xié)程有幾個特點:
有棧協(xié)程。每個協(xié)程都分配了獨(dú)立的??臻g。
對稱協(xié)程。協(xié)程之間相互獨(dú)立,由調(diào)度器負(fù)責(zé)調(diào)度。
基于 epoll 實現(xiàn)協(xié)程調(diào)度,和網(wǎng)絡(luò)IO無縫結(jié)合。
3.1 用戶態(tài)上下文切換的實現(xiàn)方式
協(xié)程可以看成一種特殊的函數(shù),和普通函數(shù)不同,協(xié)程函數(shù)有掛起(yield)和恢復(fù)(resume)的能力,即可以中斷自己的執(zhí)行流,并且在合適的時候恢復(fù)執(zhí)行流,這也稱為上下文切換的能力。
協(xié)程執(zhí)行的過程,依賴兩個關(guān)鍵要素:協(xié)程棧和寄存器,協(xié)程的上下文環(huán)境其實就是寄存器和棧的狀態(tài)。實現(xiàn)上下文切換的核心就是實現(xiàn)保存并恢復(fù)當(dāng)前執(zhí)行環(huán)境的寄存器狀態(tài)的能力。
實現(xiàn)用戶態(tài)上下文切換一般有以下方式:

3.2 基于boost.context實現(xiàn)上下文切換
Tars 協(xié)程是基于 boost.context 實現(xiàn),boost.context 提供了兩個接口(make_fcontext, jump_fcontext)實現(xiàn)協(xié)程的上下文切換。
代碼1:
/**
* @biref 執(zhí)行環(huán)境上下文
*/
typedef void* fcontext_t;
/**
* @biref 事件參數(shù)包裝
*/
struct transfer_t {
fcontext_t fctx; // 來源的執(zhí)行上下文。來源的上下文指的是從什么位置跳轉(zhuǎn)過來的
void* data; // 接口傳入的自定義的指針
};
/**
* @biref 初始化執(zhí)行環(huán)境上下文
* @param sp ??臻g地址
* @param size 棧空間的大小
* @param fn 入口函數(shù)
* @return 返回初始化完成后的執(zhí)行環(huán)境上下文
*/
extern "C" fcontext_t make_fcontext(void * stack, std::size_t stack_size, void (* fn)( transfer_t));
/**
* @biref 跳轉(zhuǎn)到目標(biāo)上下文
* @param to 目標(biāo)上下文
* @param vp 目標(biāo)上下文的附加參數(shù),會設(shè)置為transfer_t里的data成員
* @return 跳轉(zhuǎn)來源
*/
extern "C" transfer_t jump_fcontext(fcontext_t const to, void * vp);
(1)make_fcontext創(chuàng)建協(xié)程
接受三個參數(shù),stack是為協(xié)程分配的棧底,stack_size是棧的大小,fn是協(xié)程的入口函數(shù)
返回初始化完成后的執(zhí)行環(huán)境上下文
(2)jump_fcontext切換協(xié)程
接受兩個參數(shù),目標(biāo)上下文地址和參數(shù)指針
返回一個上下文,指向當(dāng)前上下文從哪個上下文跳轉(zhuǎn)過來

fcontext的結(jié)構(gòu)
boost context 是通過 fcontext_t結(jié)構(gòu)體來保存協(xié)程狀態(tài)。相對于其它匯編實現(xiàn)的協(xié)程庫,boost的context和stack是一起的,棧底指針就是context,切換context就是切換stack。
3.3 Tars協(xié)程信息類
TC_CoroutineInfo 協(xié)程信息類,包裝了 boost.context 提供的接口,表示一個 TARS 協(xié)程。
其中,TC_CoroutineInfo::registerFunc 定義了協(xié)程的創(chuàng)建。
代碼2:
void TC_CoroutineInfo::registerFunc(const std::function& callback) { _callback = callback; _init_func.coroFunc = TC_CoroutineInfo::corotineProc; _init_func.args = this; fcontext_t ctx = make_fcontext(_stack_ctx.sp, _stack_ctx.size, TC_CoroutineInfo::corotineEntry); // 創(chuàng)建協(xié)程 transfer_t tf = jump_fcontext(ctx, this); // context 切換 //實際的ctx this->setCtx(tf.fctx); } void TC_CoroutineInfo::corotineEntry(transfer_t tf) { TC_CoroutineInfo * coro = static_cast< TC_CoroutineInfo * >(tf.data); // this auto func = coro->_init_func.coroFunc; void* args = coro->_init_func.args; transfer_t t = jump_fcontext(tf.fctx, NULL); //拿到自己的協(xié)程堆棧, 當(dāng)前協(xié)程結(jié)束以后, 好跳轉(zhuǎn)到main coro->_scheduler->setMainCtx(t.fctx); //再跳轉(zhuǎn)到具體函數(shù) func(args, t); }
TC_CoroutineInfo::switchCoro 定義了協(xié)程切換。
代碼3
void TC_CoroutineScheduler::switchCoro(TC_CoroutineInfo *to)
{
//跳轉(zhuǎn)到to協(xié)程
_currentCoro = to;
transfer_t t = jump_fcontext(to->getCtx(), NULL);
//并保存協(xié)程堆棧
to->setCtx(t.fctx);
}
四、Tars 協(xié)程調(diào)度器
基于 boost.context 的 TC_CoroutineInfo 類實現(xiàn)了協(xié)程的上下文切換,協(xié)程的管理和調(diào)度,則是由 TC_CoroutineScheduler 協(xié)程調(diào)度器類來負(fù)責(zé),分管理和調(diào)度兩個方面來說明 TC_CoroutineScheduler 調(diào)度類。
協(xié)程管理:目的是需要合理的數(shù)據(jù)結(jié)構(gòu)來組織協(xié)程(TC_CoroutineInfo),方便調(diào)度的實現(xiàn)。
協(xié)程調(diào)度:目的是控制協(xié)程的啟動、休眠和喚醒,實現(xiàn)了 yield, sleep 等功能,本質(zhì)就是實現(xiàn)協(xié)程的狀態(tài)機(jī),完成協(xié)程的狀態(tài)切換。Tars 協(xié)程分為 5 個狀態(tài):FREE, ACTIVE, AVAIL, INACTIVE, TIMEOUT
代碼4:
/**
* 協(xié)程的狀態(tài)信息
*/
enum CORO_STATUS
{
CORO_FREE = 0,
CORO_ACTIVE = 1,
CORO_AVAIL = 2,
CORO_INACTIVE = 3,
CORO_TIMEOUT = 4
};
4.1 Tars 協(xié)程的管理
TC_CoroutineScheduler 主要通過以下方法管理協(xié)程:
TC_CoroutineScheduler::create()
創(chuàng)建TC_CoroutineScheduler對象
TC_CoroutineScheduler::init()初始化,分配協(xié)程棧內(nèi)存
TC_CoroutineScheduler::run()啟動調(diào)度
TC_CoroutineScheduler::terminate()停止調(diào)度
TC_CoroutineScheduler::destroy()資源銷毀,釋放協(xié)程棧內(nèi)存
我們可以通過 TC_CoroutineScheduler::init()看到數(shù)據(jù)結(jié)構(gòu)的初始化過程。
代碼5:
void TC_CoroutineScheduler::init()
{
... ....
createCoroutineInfo(_poolSize); // _all_coro = new TC_CoroutineInfo*[_poolSize+1];
TC_CoroutineInfo::CoroutineHeadInit(&_active);
TC_CoroutineInfo::CoroutineHeadInit(&_avail);
TC_CoroutineInfo::CoroutineHeadInit(&_inactive);
TC_CoroutineInfo::CoroutineHeadInit(&_timeout);
TC_CoroutineInfo::CoroutineHeadInit(&_free);
int iSucc = 0;
for(size_t i = 0; i < _currentSize; ++i)
{
//iId=0不使用, 給mainCoro使用!!!!
uint32_t iId = generateId();
stack_context s_ctx = stack_traits::allocate(_stackSize); // 分配協(xié)程棧內(nèi)存
TC_CoroutineInfo *coro = new TC_CoroutineInfo(this, iId, s_ctx);
_all_coro[iId] = coro;
TC_CoroutineInfo::CoroutineAddTail(coro, &_free);
++iSucc;
}
_currentSize = iSucc;
_mainCoro.setUid(0);
_mainCoro.setStatus(TC_CoroutineInfo::CORO_FREE);
_currentCoro = &_mainCoro;
}
通過下面的 TC_CoroutineScheduler 調(diào)度類數(shù)據(jù)結(jié)構(gòu)圖,可以更清楚的看到協(xié)程的組織方式:

Tars調(diào)度類數(shù)據(jù)結(jié)構(gòu)
使用協(xié)程之前,需要在協(xié)程數(shù)組(_all_coro),創(chuàng)建指定數(shù)量的協(xié)程對象,并為每個協(xié)程分配協(xié)程棧內(nèi)存。
通過鏈表的方式管理協(xié)程,每個狀態(tài)都有一個鏈表。協(xié)程狀態(tài)切換,對應(yīng)協(xié)程在不同狀態(tài)鏈表的轉(zhuǎn)移。
4.2Tars 協(xié)程的調(diào)度
Tars 調(diào)度是基于epoll實現(xiàn),在 epoll 循環(huán)里檢查是否有需要執(zhí)行的協(xié)程, 有則執(zhí)行之, 沒有則等待在epoll對象上, 直到有喚醒或者超時。使用 epoll 實現(xiàn)的好處是可以和網(wǎng)絡(luò)IO無縫粘合, 當(dāng)有數(shù)據(jù)發(fā)送/接收時, 喚醒epoll對象, 從而完成協(xié)程的切換。
Tars協(xié)程調(diào)度的核心邏輯是
TC_CoroutineScheduler::run()
代碼6:
void TC_CoroutineScheduler::run()
{
... ...
while(!_epoller->isTerminate())
{
if(_activeCoroQueue.empty() && TC_CoroutineInfo::CoroutineHeadEmpty(&_avail) && TC_CoroutineInfo::CoroutineHeadEmpty(&_active))
{
_epoller->done(1000); // epoll_wait(..., 1000ms) 先處理epoll的網(wǎng)絡(luò)事件
}
//喚醒需要激活的協(xié)程
wakeup();
//喚醒sleep的協(xié)程
wakeupbytimeout();
//喚醒yield的協(xié)程
wakeupbyself();
int iLoop = 100;
//執(zhí)行active協(xié)程, 每次執(zhí)行100個, 避免占滿cpu
while(iLoop > 0 && !TC_CoroutineInfo::CoroutineHeadEmpty(&_active))
{
TC_CoroutineInfo *coro = _active._next;
switchCoro(coro);
--iLoop;
}
//執(zhí)行available協(xié)程, 每次執(zhí)行1個
if(!TC_CoroutineInfo::CoroutineHeadEmpty(&_avail))
{
TC_CoroutineInfo *coro = _avail._next;
switchCoro(coro);
}
}
... ...
}
下圖可以更清楚得看到協(xié)程調(diào)度和狀態(tài)轉(zhuǎn)移的過程。

Tars協(xié)程調(diào)度狀態(tài)轉(zhuǎn)移圖
TC_CoroutineScheduler 提供了下面四種方法實現(xiàn)協(xié)程的調(diào)度:
(1) TC_CoroutineScheduler:啟動協(xié)程。
(2)TC_CoroutineScheduler:當(dāng)前協(xié)程放棄繼續(xù)執(zhí)行。并提供了兩種方式,支持不同的喚醒策略。
yield(true):會自動喚醒(等到下次協(xié)程調(diào)度,都會再激活當(dāng)前線程)
yield(false):不再自動喚醒,除非自己調(diào)度該協(xié)程(比如put到調(diào)度器中)
(3)TC_CoroutineScheduler:當(dāng)前協(xié)程休眠iSleepTime時間(單位:毫秒),然后會被喚醒繼續(xù)執(zhí)行。
(4)TC_CoroutineScheduler:放入需要喚醒的協(xié)程, 將協(xié)程放入到調(diào)度器中, 馬上會被調(diào)度器調(diào)度。
五、總結(jié)
本文介紹了協(xié)程的概念,并討論了 Tars Cpp 協(xié)程的實現(xiàn)原理和源碼分析。
審核編輯:劉清
-
RPC
+關(guān)注
關(guān)注
0文章
114瀏覽量
12289 -
C++語言
+關(guān)注
關(guān)注
0文章
147瀏覽量
7774 -
Lua語言
+關(guān)注
關(guān)注
0文章
9瀏覽量
1657 -
調(diào)度器
+關(guān)注
關(guān)注
0文章
99瀏覽量
5718
原文標(biāo)題:Tars-Cpp協(xié)程實現(xiàn)分析
文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
談?wù)?b class='flag-5'>協(xié)程的那些事兒
協(xié)程和線程有什么區(qū)別
怎樣使用C語言去實現(xiàn)Linux系統(tǒng)協(xié)程呢
Tars在ARM平臺上的移植是如何去實現(xiàn)的
Tars移植到ARM64平臺上的過程實現(xiàn)
Python后端項目的協(xié)程是什么
Python協(xié)程與JavaScript協(xié)程的對比及經(jīng)驗技巧
使用channel控制協(xié)程數(shù)量
詳解Linux線程、線程與異步編程、協(xié)程與異步
協(xié)程的概念及協(xié)程的掛起函數(shù)介紹
FreeRTOS任務(wù)與協(xié)程介紹
協(xié)程的作用、結(jié)構(gòu)及原理
基于TarsCpp-v3.0.0討論協(xié)程在TarsCpp服務(wù)框架的實現(xiàn)
評論