本篇博客主要介绍C++原生的多线程操作,包括启动线程、等待一个线程结束、如何给已启动的线程传递参数、线程的所有权转交。
在一切的一切开始之前,需要先 #include<thread>
。
启动线程
Cpp的多线程需要靠 std::thread
对象来实现,构造该类,只需要一个可调用(callable)类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #include <iostream> #include <thread> #include <chrono>
void worker() { std::cout << "子线程开始工作..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "子线程完成任务!" << std::endl; }
int main() { std::thread t(worker); t.detach();
std::cout << "主线程继续执行,不等待子线程。" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(3)); return 0; }
|
只要实例化一个新的 std::thread
对象,该线程就会开始运行。由于这里调用了 detach()
方法,所以两个线程会分离运行,主线程不会进行等待。
等待线程完成
想要在主线程中等待线程的完成,只需要对这个线程对象调用 join()
方法即可。join()
方法只是简单地等待一个线程完成,如果需要更灵活地控制或者等待,则需要使用条件变量或者期待(futrue)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #include <iostream> #include <thread> #include <chrono>
void worker() { std::cout << "子线程开始工作..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "子线程完成任务!" << std::endl; }
int main() { std::thread t(worker);
std::cout << "主线程正在等待子线程..." << std::endl; t.join();
std::cout << "主线程继续执行。" << std::endl; return 0; }
|
不等待线程完成
在前面的章节中,提到了使用 detach()
方法来不等待线程的运行,此时需要特别注意局部变量的访问问题。即在新线程中使用了局部变量的引用或者地址,然而主线程没有等待新线程,直接将局部变量释放,这会引发未定义的结果,并且很难发现错误:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| #include <iostream> #include <thread> #include <chrono>
void worker(int* value) { std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "子线程读取数据: " << *value << std::endl; }
int main() { int data = 42; std::thread t(worker, &data); t.detach(); std::cout << "主线程结束,局部变量`data`已被释放。" << std::endl; return 0; }
|
为了防止这种错误发生的方法,应该在传递局部变量时尽量以值传递,实在不得已需要传递引用或指针时,需要保证线程在局部变量的作用域到达前结束。
RAII封装
新建一个线程时,必须调用 detach
或 join
指明是否等待,否则在 thread
对象生命周期到达时,就会立即停止,导致程序崩溃。
为了防止忘记调用 detach
或 join
,可以采用RAII封装的方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class ThreadGuard { public: explicit ThreadGuard(std::thread& t) : t_(t) {} ~ThreadGuard() { if (t_.joinable()) { t_.join(); } } private: std::thread& t_; };
int main() { std::thread t(worker); ThreadGuard guard(t); return 0; }
|
向线程传递参数
默认按值拷贝对象
在实例化 std::thread
时,除了第一个参数外,还可以传递若干个参数作为新线程函数的参数(就像命令行一样,第0个参数是程序名字,第1以及之后的参数的真正的 argv
)。需要注意的是,当直接传入一个对象时,线程会生成一个副本,新线程操作的是副本,与原对象无关:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| #include <iostream> #include <thread>
void worker(int value) { value = 100; }
int main() { int data = 42; std::thread t(worker, data); t.join(); std::cout << "data = " << data << std::endl; return 0; }
|
如果希望向线程传递引用,需要显式使用 std::ref
来指定:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| #include <iostream> #include <thread>
void worker(int& value) { value = 100; }
int main() { int data = 42; std::thread t(worker, std::ref(data)); t.join(); std::cout << "data = " << data << std::endl; return 0; }
|
成员函数作为线程函数
前面提到,thread
接受所有 callable
类型作为参数,所以成员函数也可以作为线程函数,这是就要使用类似 __thiscall
的方式将对象的地址作为第二个参数传递给 thread
的构造函数:
1 2 3 4 5 6 7
| class X { public: void do_lengthy_work(); }; X my_x; std::thread t(&X::do_lengthy_work,&my_x);
|
以此类推,真正传递给 do_lengthy_work
的参数将从第三个参数开始。
转移线程所有权
这个概念听起来很玄乎,但是如果将线程的所有权与 unique_ptr
的所有权类比起来看,那么就很清晰了。thread
就像 unique_ptr
一样,不可以复制(很显然,复制是没有意义的),但是可以像 unique_ptr
一样调用 std::move
进行所有权的转移,语义也是一样的,转移之后,原有的资源就不能再使用了。
在实际编程常常使用 vector
等容器来统一管理线程,将线程装入这些容器使用的就是移动语义。
识别线程
就像进程有唯一标识符 pid
一样,线程也有标识符,可以通过以下两种方式获取:
- 在当前线程中,调用
std::this_thread::get_id()
;
- 在主线程中,通过
thread
对象的 get_id()
成员函数。
返回的类型是 std::thread::id
类型,该类型支持各种比较操作,标准库也提供了 std::hash<std::thread::id>
容器,因此可以作为无序容器的键值。
实践
下面是《C++并发编程实战》中给出的一个原生并行版 accumulate
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| template<typename Iterator,typename T> struct accumulate_block { void operator()(Iterator first,Iterator last,T& result) { result=std::accumulate(first,last,result); } };
template<typename Iterator,typename T> T parallel_accumulate(Iterator first,Iterator last,T init) { unsigned long const length=std::distance(first,last);
if(!length) return init;
unsigned long const min_per_thread=25; unsigned long const max_threads= (length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads= std::thread::hardware_concurrency();
unsigned long const num_threads= std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size=length/num_threads;
std::vector<T> results(num_threads); std::vector<std::thread> threads(num_threads-1);
Iterator block_start=first; for(unsigned long i=0; i < (num_threads-1); ++i) { Iterator block_end=block_start; std::advance(block_end,block_size); threads[i]=std::thread( accumulate_block<Iterator,T>(), block_start,block_end,std::ref(results[i])); block_start=block_end; } accumulate_block<Iterator,T>()( block_start,last,results[num_threads-1]); std::for_each(threads.begin(),threads.end(), std::mem_fn(&std::thread::join));
return std::accumulate(results.begin(),results.end(),init); }
|