Condition Variable in C++ 一個簡單的範例
- 2023-06-25
- Liu, An-Chi 劉安齊
¶ 前言
我第一次聽到 Condition Variable(條件變數) 是在台大資工作業系統課上,老實說當時我對電腦概念還非常模糊,糊里糊塗修完課後就忘記了這玩意(當時覺得老師教不好,為啥就是聽不懂,也許是慧根不夠)。
在交大資工所即使當了平行計算助教,很意外的還是沒用過,我現在也覺得挺意外的(?)題外話,平行計算本來就跟並行運算有些區別就是了,另外就是這兩門學問都博大精深,我到現在都不敢說自己「會」,即便我有一篇跟平行計算有關的論文上 LNCS,連工作面試時別人隨便考個平行計算都被電,常常讓我感慨學無止盡且艱難。
扯遠了,這幾天工作上要寫一個很經典的生產者消費者問題(Producer-Consumer Problem),問了一下同事我要處理的問題,他就說「阿這不就是 Condition Variable 嗎,去看一下應該會有幫助」,當然他是用英文跟我講啦。我覺得滿滿的慚愧,好歹學電腦也好幾年了,竟然到現在才實際去寫 Condition Variable,查了一下怎麼寫算是滿快就搞定,巧的是短期就有機會用了兩次。
廢話講完了,這篇文章不打算詳細解釋 Condition Variable 是什麼,可以詳閱恐龍書(作業系統概論)或是 Wikipedia 看一下。我也不會教很高深的實作技巧,這篇就是一個簡單的範例告訴你 C++ std::condition_variable
可以這樣用。
皆さん、始めましょう!
(長野地獄谷雪猴)
¶ Condition Variable 範例
說到 Condition Variable,簡單來說他的概念是讓多執行緒的程式中,使一個或多個執行緒去等待一個共享記憶體的資料變化,比方說 A 執行緒改變全域變數 FLAG
從 0 變 1,B 執行緒發現 FLAG
變成 1 的時候就開始執行任務。
前面提到 Condition Variable 可以用在生產者消費者問題上,這個情況下,生產者會產生一堆「事件」或「資料」,而消費者則會在「事件發生」或是「資料產生」時去作對應的事情。
要用 C++ 實作這個問題,首先我們會需要以下兩個變數放在全域:
std::condition_variable g_cond;
std::mutex g_mutex;
在此範例中我簡單把他放在全域變數,但實作上只需要讓生產者 P 和消費者 Q 都能存取的就好,例如 P 和 Q 都在同一個 Class,上面兩個變數可以是 Class 的成員變數即可。
¶ 生產者
先來看看生產者的程式碼:
void run_producer_thread(std::queue<std::string> &queue) {
for (;;) {
auto word = generateRandomString(); // 任意資料
std::lock_guard<std::mutex> lock{g_mutex}; // 鎖住 queue 以免同時被 consumer 讀取
queue.push(word);
g_cond.notify_one(); // 通知 consumer
}
}
在這個例子中,生產者把資料丟入佇列(Queue),這邊我們用到了 g_mutex
來確保 queue
不會同時被生產者讀取。
g_cond.notify_one
會負責發送一個「我好了!」的訊號給消費者。
¶ 消費者
再來看看消費者的程式碼:
void run_consumer_thread(std::queue<std::string> &queue) {
for (;;) {
std::unique_lock<std::mutex> lock(g_mutex); // 這邊的 lock 還不會真的上鎖
g_cond.wait(lock, [&] { return !queue.empty(); }); // 當進行檢查 queue 不為空時,lock 會上鎖
auto word = queue.front(); // 把資料拷貝出來,才能盡快解鎖
queue.pop();
lock.unlock(); // 解除 g_mutex 的鎖
}
}
注意這邊用到 std::unique_lock
,他的功能很多,包含以下「deferred locking, time-constrained attempts at locking, recursive locking, transfer of lock ownership, and use with condition variables」(抱歉我懶 :P)。
在這個範例下,白話解釋就是「晚一點才會用上你」的一種鎖,所以宣告當下並不會真的上鎖。
g_cond.wait
會一直做等待(或稱做阻塞,Blocking),並且當收到 notify_one
時,觸發進行條件檢查。這邊的檢查是確認 queue
是否不為空(亦即有資料進來)。如果 g_cond
的檢查得到 true
,則取消阻塞,反之繼續進行等待。
在每次做檢查之前,lock
的所有權會被取得,才能進行條件檢查(換句話說,lock
先上鎖,才檢查 !queue.empty()
)
檢查完如果未達成條件 lock
就會解鎖,反之就會繼續保持上鎖狀態,程式接著執行 g_cond.wait
之後的程式碼。
接者生產者就可以把資料從 queue
中拿出來。
然後進入下一個 for 循環,g_cond.wait
觸發開始等待。
值得注意的是,在實際事件發生之前,
g_cond.wait
可能會因為作業系統底層的機制而多次解除阻塞,稱做虛假喚醒(Spurious Wakeup)。在每次喚醒時,無論是虛驚一場或是真的來自notify_one
通知,lock
都會上鎖來做檢查條件。
¶ 完整範例程式碼
完整範例程式碼如下,強烈建議大家實際跑跑看,看看結果。
// g++ example.cpp -std=c++17 -lpthread
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <random>
#include <string>
#include <thread>
std::condition_variable g_cond; // 用來做通知
std::mutex g_mutex; // 用來保護資料
// 產生隨機字串,只是個 Helper 函數,不是本範例重點
std::string generateRandomString() {
std::string characters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
std::random_device rd;
std::mt19937 generator(rd());
std::uniform_int_distribution<int> distribution(0, characters.length() - 1);
std::string result;
int randomLength = distribution(generator);
for (int i = 0; i < randomLength; ++i) {
int randomIndex = distribution(generator);
result += characters[randomIndex];
}
return result;
}
void run_producer_thread(std::queue<std::string> &queue) {
for (;;) {
std::cout << "P: === This is producer thread ===" << std::endl << std::flush;
auto word = generateRandomString();
std::lock_guard<std::mutex> lock{g_mutex}; // 鎖住 queue 以免同時被 consumer 讀取
queue.push(word);
g_cond.notify_one(); // 通知 consumer
std::cout << "P: producer just push a word: `" << word << "`" << std::endl << std::flush;
std::cout << "P: producer sleep for 1500 ms" << std::endl << std::flush;
std::this_thread::sleep_for(std::chrono::milliseconds(1500)); // 睡 1500 ms 方便觀察
}
}
void run_consumer_thread(std::queue<std::string> &queue) {
for (;;) {
std::cout << "C: == This is consumer thread ==" << std::endl << std::flush;
std::unique_lock<std::mutex> lock(g_mutex); // 這邊的 lock 用來下一行檢查 queue 狀態
g_cond.wait(lock, [&] { return !queue.empty(); }); // 當 queue 不為空時
auto word = queue.front(); // 把資料拷貝出來,才能盡快解鎖
queue.pop();
lock.unlock(); // 解除 g_mutex 的鎖
std::cout << "C: Consumer get a word `" << word << "`" << std::endl << std::flush;
// 可以試試看取消註解來看影響,有什麼特別的情況,為什麼會這樣?
// std::cout << "C: consumer sleep for 1500 ms" << std::endl << std::flush;
// std::this_thread::sleep_for(std::chrono::milliseconds(1500)); // 睡 1500 ms 方便觀察
}
}
int main() {
std::queue<std::string> queue;
auto producer_thread = std::thread([&queue]() { run_producer_thread(queue); });
auto consumer_thread = std::thread([&queue]() { run_consumer_thread(queue); });
producer_thread.join();
consumer_thread.join();
}
¶ 結論
總結,Condition Variable 是多執行緒程式開發中確保執行緒之間的同步和資料傳遞。使用時要小心,上鎖的方式與時機,一不小心可能就會 Dead Lock。實際應用場景包含生產者消費者問題、讀者寫者問題(Readers-Writers Problem)、執行緒同步、任務協調、工作佇列等等。