C++ coroutine TS 概述
Coroutine TS提供了一种通用机制,使得我们可以通过实现符合特定接口的类型来灵活地定制协程的行为,利用这套机制,我们可以对协程的流程实现细粒度的控制(这也使得它对于使用者来说有些过于繁重)。我们实现特定接口并使用关键字使得函数成为协程,然后编译器在特定位置插入代码并对其进行转换并将协程体转化为状态机使得重入的效率更高。同时,它使得我们能够将业务逻辑从框架中抽离出来,使得代码结构更加清晰。简单来说,协程可以看做是编译器支持的回调包装器。
如果函数的定义进行了下列操作之一,那么它是协程:
- 用
co_await
运算符暂停执行,直到恢复。
- 用
co_yield
暂停执行并返回一个值。
- 用
co_return
完成执行并返回一个值。
C++ coroutine TS中有三个比较核心的概念:promise_type
,awaiter
和coroutine_handle
,这里先简要介绍一下他们各自的职责。
promise_type
指定了定制协程本身的方法。我们可以灵活地定制协程被调用时做什么,协程返回时做什么(包括一般意义上返回和异常之后返回),还可以定制协程中调用co_await
和co_yield
时的行为。它从内部操控协程,并且充当调用者和协程之间的通信信道,协程通过promise_type
对象提交结果到外部(类似promise/future
当中promise
的用法),协程中的异常也通过它来传递并处理。
awaiter
接口指定了控制co_wait
表达式语义的方法。当一个值被co_await
时,代码被转换成一系列对awaiter
对象上的方法的调用,这些调用允许它指定:是否挂起当前协程,在它挂起后执行一些逻辑来调度(/安排)协程以便稍后恢复,并在协程恢复后执行一些逻辑,以生成co_await
表达式的结果。
coroutine_handle
代表了协程帧的非拥有句柄,它可以用于恢复协程的执行或者销毁协程帧。协程帧包含了协程的状态,包括promise_type
对象、值传递的函数参数、挂起点的位置以及局部变量和生命周期跨越挂起点的临时变量。当协程被挂起时,这些在栈帧上对象被复制到堆上的协程帧中,而当协程恢复时,它们又会重新从协程帧中复制会栈帧上(该过程可能被编译器优化掉)。由于协程的状态是持续存在的,其生命周期是独立的的,所以需要一个额外的对象管理其生命周期,使其生命周期和执行过程解绑,而这正是coroutine_handle
的职责,它就是一个指向协程帧的类型擦除指针,给我们提供了从外部控制协程的能力。
co_await
,awaitable
和awaiter
任何包含使用co_await
运算符的函数体都将被编译为一个协程,支持co_await
运算符的类型被称为awaitable
类型。我么可以通过重载co_await
操作符或在promise_type
中实现await_transform
来定制co_await
的行为。
标准库定义了两个平凡的awaitable
类型:std::suspend_always
和 std::suspend_never
如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
class suspend_never{
public:
bool await_ready() {return true;}
void await_suspend(coroutine_handle<void>){}
void await_resume(){}
};
class suspend_always{
public:
bool await_ready() {return false;}
void await_suspend(coroutine_handle<void>){}
void await_resume(){}
};
|
对于一个co_await <exp>
表达式来说,首先会执行该表达式,然后用下列方式获取awaitable
对象
- 如果
<exp>
由初始暂停点、最终暂停点或 yield 表达式所产生,那么awaitable
是<exp>
本身。
- 否则,如果当前协程的
promise_type
拥有成员函数 await_transform()
,那么awaitable
是promise.await_transform(<exp>)
。
- 否则,
awaitable
是 <exp>
本身。
Note: 上述过程是在<exp>
本身执行完之后(控制流返回到此处)所发生的流程,如果<exp>
也是一个协程(函数体中包含co_await/co_yield/co_return
的函数),那么会先执行该协程,该协程返回(可能是在该协程中的co_await promise.initial_suspend()
返回,也可能在该协程中的co_await promise.final_suspend()
返回等)的对象即为awaitable
对象。
然后继续通过以下方式获得awaiter
对象:
- 如果针对
operator co_await
的重载决议给出某个最佳重载,那么awaiter
是该调用的结果。
- 否则,
awaiter
是awaitable
本身
而一个awaiter
需要实现以下三个接口:
await_ready()
: 用于通知协程是否需要挂起,如果其返回值为false
,则协程被挂起
await_suspend()
: 协程挂起前执行的操作
await_resume()
: 协程恢复时执行的操作
当获取到awaiter
对象后,co_await awaiter
被大致翻译为下述代码(详细的描述见cppreference):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
{
if (!awaiter.await_ready()){
using handle_t = std::experimental::coroutine_handle<P>;
using await_suspend_result_t =decltype(awaiter.await_suspend(handle_t::from_promise(p)));
<suspend-coroutine>
if constexpr (std::is_void_v<await_suspend_result_t>){
awaiter.await_suspend(handle_t::from_promise(p));
<return-to-caller-or-resumer>
}
else if(std::is_bool_v<await_suspend_result_t>){
if (awaiter.await_suspend(handle_t::from_promise(p))){
<return-to-caller-or-resumer>
}
}
else{
auto h = awaiter.await_suspend(handle_t::from_promise(p));
h.resume();
<return-to-caller-or-resumer>
}
}
__resume_point:
return awaiter.await_resume();
}
|
对于返回值为void
版本的await_suspend()
,当对await_suspend()
的调用返回时无条件转回执行协程的主调函数/调用resume
者。而返回值为bool
类型的版本则允许awaiter
对象有条件地不返回主调函数/resumer而立即恢复协程。
如果 await_suspend
返回某个其他协程的coroutine_handle
,那么恢复该句柄,注意这可以连锁进行,并最终导致当前协程恢复。
下面我们通过一个例子来深入理解co_await
的作用。
Example
在这个例子中,有多个并发执行的协程等待,直到某个线程调用set()
方法,此时任何等待的协程都将恢复。如果已经有某个线程调用过set()
方法,那么协程将会不挂起而继续执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
T value;
async_manual_reset_event event;
// A single call to produce a value
void producer(){
value = some_long_running_computation();
// Publish the value by setting the event.
event.set();
}
// Supports multiple concurrent consumers
task<> consumer(){
// Wait until the event is signalled by call to event.set()
// in the producer() function.
co_await event;
// Now it's safe to consume 'value'
// This is guaranteed to 'happen after' assignment to 'value'
std::cout << value << std::endl;
}
|
事件只可能处于set或not set状态,当其处于not set时,将会有一个等待协程列表,等待它变成set
。我们使用std::atomic<void*>
来表示事件状态。当其值为事件对象的this
指针时表示set状态,因为我们知道它不能与任何列表项的地址相同。否则,事件处于“not set”状态,并且该值是指向正在等待的协程的单链表的头结点的指针。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
class async_manual_reset_event{
public:
async_manual_reset_event(bool initiallySet = false) noexcept;
// No copying/moving
async_manual_reset_event(const async_manual_reset_event&) = delete;
async_manual_reset_event(async_manual_reset_event&&) = delete;
async_manual_reset_event& operator=(const async_manual_reset_event&) = delete;
async_manual_reset_event& operator=(async_manual_reset_event&&) = delete;
bool is_set() const noexcept;
struct awaiter;
awaiter operator co_await() const noexcept;
void set() noexcept;
void reset() noexcept;
private:
friend struct awaiter;
// - 'this' => set state
// - otherwise => not set, head of linked list of awaiter*.
mutable std::atomic<void*> m_state;
};
|
我们可以将节点存储在协程帧的awaiter
对象里,这样就可以避免在堆上为链表分配节点带来的额外的系统调用。
下面让我们定义awaiter
类型。首先,它需要知道它将等待哪个async_manual_reset_event
事件对象,因此它需要一个对事件的引用和一个构造函数来初始化它。它还需要充当awaiter值链表中的节点,因此需要持有指向链表中下一个awaiter
对象的指针。它还需要存储正在执行co_await
表达式的等待着的协程的coroutine_handle
句柄,以便事件在变为“set”时可以恢复协程。我们不关心协程的promise
类型,所以我们只使用协程句柄coroutine_handle<>
。awaiter的基本类接口如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
struct async_manual_reset_event::awaiter{
awaiter(const async_manual_reset_event& event) noexcept: m_event(event){}
bool await_ready() const noexcept;
bool await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept;
void await_resume() noexcept {}
private:
const async_manual_reset_event& m_event;
std::experimental::coroutine_handle<> m_awaitingCoroutine;
awaiter* m_next;
};
bool async_manual_reset_event::awaiter::await_ready() const noexcept{
return m_event.is_set();
}
bool async_manual_reset_event::awaiter::await_suspend(
std::experimental::coroutine_handle<> awaitingCoroutine) noexcept{
// Special m_state value that indicates the event is in the 'set' state.
const void* const setState = &m_event;
// Remember the handle of the awaiting coroutine.
m_awaitingCoroutine = awaitingCoroutine;
// Try to atomically push this awaiter onto the front of the list.
void* oldValue = m_event.m_state.load(std::memory_order_acquire);
do{
// Resume immediately if already in 'set' state.
if (oldValue == setState) return false;
// Update linked list to point at current head.
m_next = static_cast<awaiter*>(oldValue);
// Finally, try to swap the old list head, inserting this awaiter
// as the new list head.
}while (!m_event.m_state.compare_exchange_weak(oldValue,this,std::memory_order_release,
std::memory_order_acquire));
// Successfully enqueued. Remain suspended.
return true;
}
|
在await_suspend()
方法中,它需要将等待着的协程的协程句柄存储到m_awaitingCoroutine
成员中,以便事件稍后可以对其调用resume()
。之后,我们尝试将awaiter
原子地进入到等待协程的链表。如果我们成功地将其入队,那么返回true以指示我们不希望立即恢复协程,否则如果我们发现事件已同时更改为set状态,那么我们返回false
以指示应立即恢复协程。
在加载旧状态时,我们使用acquire
内存顺序,这样,如果读取特殊的set值,就可以看到在调用set()
之前发生的写操作。如果compare-exchange
成功,则需要release
语义的内存顺序,以便随后对set()
的调用将看到我们对m_awaitingCoroutine
的写入以及对协程状态之前的写入。
下面是async_manua_reset_event
成员函数的实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
async_manual_reset_event::async_manual_reset_event(bool initiallySet) noexcept
: m_state(initiallySet ? this : nullptr){}
bool async_manual_reset_event::is_set() const noexcept{
return m_state.load(std::memory_order_acquire) == this;
}
void async_manual_reset_event::reset() noexcept{
void* oldValue = this;
m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire);
}
void async_manual_reset_event::set() noexcept{
// Needs to be 'release' so that subsequent 'co_await' has visibility of our prior writes.
// Needs to be 'acquire' so that we have visibility of prior writes by awaiting coroutines.
void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);
if (oldValue != this){
// Wasn't already in 'set' state.
// Treat old value as head of a linked-list of waiters
// which we have now acquired and need to resume.
auto* waiters = static_cast<awaiter*>(oldValue);
while (waiters != nullptr){
// Read m_next before resuming the coroutine as resuming
// the coroutine will likely destroy the awaiter object.
auto* next = waiters->m_next;
waiters->m_awaitingCoroutine.resume();
waiters = next;
}
}
}
async_manual_reset_event::awaiter async_manual_reset_event::operator co_await() const noexcept{
return awaiter{ *this };
}
|
在set()
方法中,我们希望通过将当前状态与特殊的“set”值——this
交换,从而转换到set状态,然后检查旧值是什么。如果有任何等待的协程,那么我们希望在返回之前依次恢复它们中的每一个。
promise_type
promise_type
从内部控制协程,它定义了协程本身的绝大多数行为,同时我们可以通过promise_type
跟踪并控制协程的状态。一个含有promise_type
的包装类可能有如下定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
template<typename T> struct generator {
struct promise_type;
using handle = std::coroutine_handle<promise_type>;
struct promise_type {
T *current_value;
static auto get_return_object_on_allocation_failure()noexcept{ return generator{nullptr}; }
auto get_return_object() { return generator{handle::from_promise(*this)}; }
auto initial_suspend() { return std::suspend_always{}; }
auto final_suspend() { return std::suspend_always{}; }
void unhandled_exception() { std::terminate(); }
void return_void() {}
auto yield_value(T &value)noexcept {
current_value = std::addressof(value);
return std::suspend_always{};
}
};
bool move_next() { return coro ? (coro.resume(), !coro.done()) : false; }
int current_value() { return coro.promise().current_value; }
generator(generator const&) = delete;
generator(generator && rhs) : coro(rhs.coro) { rhs.coro = nullptr; }
~generator() { if (coro) coro.destroy(); }
private:
generator(handle h) : coro(h) {}
handle coro;
};
|
当协程开始执行时,协程体将进行下列操作:
- 用
operator new
分配协程状态对象
- 将所有函数形参复制到协程状态中:按值传递的形参被移动或复制,按引用传递的参数保持为引用(如果在被指代对象的生存期结束后恢复协程,它可能变成悬垂引用)
- 调用承诺对象的构造函数。如果承诺类型拥有接收所有协程形参的构造函数,那么以复制后的协程实参调用该构造函数。否则调用其默认构造函数。
- 调用
promise.get_return_object()
并将其结果在局部变量中保持。该调用的结果将在协程首次暂停时返回给调用方。至此并包含这个步骤为止,任何抛出的异常均传播回调用方,而非置于承诺中。
- 调用
promise.initial_suspend()
并 co_await
其结果。
- 当
co_await promise.initial_suspend()
恢复时,开始协程体的执行。
用代码表达流程大致如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
auto example(Args... args) -> return_type{
using T = coroutine_traits<return_type, Args...>;
using promise_type = T::promise_type;
using frame_type=tuple<frame_prefix,promise_type,Args...>;
auto *frame=(frame_type*)promise_type::operator new(sizeof(frame_type));
promise_type *promise=addressof(get<1>(*frame));
promise_type promise{args...};
auto return_object = { p.get_return_object() };
co_await p.initial_suspend();
try {
// our code...
}
catch (...) {
p.unhandled_exception();
}
__final_suspend_point:
co_await p.final_suspend();
__destroy_point:
promise_type::operator delete(frame,sizeof(frame_type));
}
|
这里,(编译器插入的代码)首先会利用coroutine_traits<return_type,Args...>
根据协程的返回类型和参数列表(即协程的函数签名)来萃取出对应的promise_type
类型,然后根据promise_type
继续执行后续动作。于是我们可以通过模板特化/偏特化来定制不同协程的promise_type
以控制其流程。
我们可以通过重载promise_type
中的operator new
来定制协程帧分配的行为。同时,如果协程状态的生存期严格内嵌于调用方的生存期且协程帧的大小在调用点已知,那么编译器可以优化掉对operator new
的调用而将协程状态嵌入调用方栈帧中。
如果协程帧堆分配失败,那么协程抛出 std::bad_alloc
,如果promise
类型定义了成员函数get_return_object_on_allocation_failure()
,那么使用 operator new(size_t ,nothow_t )
的重载进行分配,而在分配失败时(返回nullptr
),协程会将 promise::get_return_object_on_allocation_failure()
获得的对象返回给调用方并且不抛出异常。
我们需要在启动协程函数体之前构造return-object
对象,是因为在调用coroutine_handle::resume()
返回之前,协程帧(以及promise
对象)可能在这个线程上或另一个线程上被破坏,因此,在开始执行协程函数体之后调用get_return_object()
是不安全的。
当协程函数到达它的第一个<return-to-caller-or-resumer>
点(或者没有到达这个点,协程已经执行完成),那么通过get_return_object()
构造的return-object
对象将返回给协程的主调函数。
当协程抵达co_return
语句时,它进行下列操作:
- 对下列情形调用
promise.return_void()
co_return
;
co_return <expr>
,其中 <expr>
具有 void
类型
- 控制流离开
void
的协程的末尾。此时如果promise_type
没有 promise_type::return_void()
成员函数,那么则行为未定义。
- 或对于
co_return <expr>
调用 promise.return_value(<expr>)
,其中expr
具有非void
类型
- 以创建的逆序销毁所有具有自动存储期的变量。
- 调用
promise.final_suspend()
并co_await
其结果。
如果协程因未捕捉的异常结束,那么它进行下列操作:
- 捕捉异常并在
catch
块内调用promise.unhandled_exception()
,
- 调用
promise.final_suspend()
并co_await
其结果(例如,以恢复某个继续或发布其结果)。此时开始恢复协程是未定义行为。
当经由 co_return
或未捕捉异常而终止协程导致协程状态被销毁,或经由其句柄而导致其被销毁时,它进行下列操作:
- 调用承诺对象的析构函数。
- 调用各个函数形参副本的析构函数。
- 调用
operator delete
以释放协程状态所用的内存。
- 转移执行回到调用方/恢复方。
而co_yield <exp>
表达式相当于co_await promise.yield_value(<exp>)
。正如前面所说的,promise
对象充当了我们和协程之间的信道,协程可以将值放入promise
对象中,之后我们在外部从promise
中再取出它,我们也可以做同样的操作。
下面是cppreference上的一个例子,展示了如何利用协程编写一个能够惰性求值的生成器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
#include <coroutine>
#include <iostream>
#include <optional>
template<std::movable T>
class Generator {
public:
struct promise_type {
Generator<T> get_return_object() {
return Generator{Handle::from_promise(*this)};
}
static std::suspend_always initial_suspend() noexcept {
return {};
}
static std::suspend_always final_suspend() noexcept {
return {};
}
std::suspend_always yield_value(T value) noexcept {
current_value = std::move(value);
return {};
}
// Disallow co_await in generator coroutines.
void await_transform() = delete;
[[noreturn]]
static void unhandled_exception() {
throw;
}
std::optional<T> current_value;
};
using Handle = std::coroutine_handle<promise_type>;
explicit Generator(const Handle coroutine) :
m_coroutine{coroutine}
{}
Generator() = default;
~Generator() {
if (m_coroutine) {
m_coroutine.destroy();
}
}
Generator(const Generator&) = delete;
Generator& operator=(const Generator&) = delete;
Generator(Generator&& other) noexcept :
m_coroutine{other.m_coroutine}
{
other.m_coroutine = {};
}
Generator& operator=(Generator&& other) noexcept {
if (this != &other) {
if (m_coroutine) {
m_coroutine.destroy();
}
m_coroutine = other.m_coroutine;
other.m_coroutine = {};
}
return *this;
}
// Range-based for loop support.
class Iter {
public:
void operator++() {
m_coroutine.resume();
}
const T& operator*() const {
return *m_coroutine.promise().current_value;
}
bool operator==(std::default_sentinel_t) const {
return !m_coroutine || m_coroutine.done();
}
explicit Iter(const Handle coroutine) : m_coroutine{coroutine} {}
private:
Handle m_coroutine;
};
Iter begin() {
if (m_coroutine) {
m_coroutine.resume();
}
return Iter{m_coroutine};
}
std::default_sentinel_t end() {
return {};
}
private:
Handle m_coroutine;
};
template<std::integral T>
Generator<T> range(T first, const T last) {
while (first < last) {
co_yield first++;
}
}
int main() {
for (const char i : range(65, 91)) {
std::cout << i << ' ';
}
std::cout << '\n';
return 0;
}
|
coroutine_handle
和协程帧
coroutine_handle
是协程帧的非拥有句柄,协程帧包含了必要的协程状态(promise_type
对象、值传递的函数参数、挂起点的位置以及局部变量和生命周期跨越挂起点的临时变量),于是我们可以通过coroutine_handle
来恢复协程的执行或者销毁协程帧。
下面是coruotine_handle
的接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
template <typename PromiseType = void>
class coroutine_handle;
template <>
class coroutine_handle<void>{
protected:
prefix_t prefix;
static_assert(sizeof(prefix_t) == sizeof(void*));
public:
//Checks whether *this is non-null
operator bool() const;
//Resumes the execution of the coroutine to which *this refers
void resume();
void operator()()const; //same as resume()
void destroy();
// Query if the coroutine is suspended at the final_suspend point.
// Undefined behaviour if coroutine is not currently suspended.
bool done() const;
// Convert to/from a void* for passing into C-style interop functions.
constexpr void* address() const;
static constexpr coroutine_handle from_address(void*);
};
template <typename PromiseType>
class coroutine_handle : public coroutine_handle<void>{
public:
using promise_type = PromiseType;
using coroutine_handle<void>::coroutine_handle;
// Access to the coroutine's promise object.
auto promise() -> promise_type&;
// Reconstruct the coroutine handle from the promise object.
static coroutine_handle from_promise(promise_type& prom);
};
|
我们有两种方法获得一个协程的coroutine_handle
- 在
co_await
表达式中,它被传递给await_suspend()
作为参数
- 利用
coroutine_handle<proimse_type>::from(*promise)
从promise
对象重新构造
需要注意的是,coroutine_handle
不是RAII对象。我们必须手动调用destroy()
来销毁协程帧并释放其资源。可以把它看作是用于管理内存的void*
的等价物。这样设计是因为性能的原因:使它成为一个RAII对象会给coroutine增加额外的开销,例如需要引用计数。
例如,在下面的例子中,在main2
函数的第一行,counter2()
返回一个ReturnObject2
的临时对象,我们通过定制operator std::coroutine_handle<>()
来保存其中的coroutine_handle
至main2
中的h
里,在第二行开始之前,该临时对象销毁(但不会销毁其中h_
所"指向"的协程帧),由于我们已经在外部保存了协程帧指针,之后我们通过该"协程帧指针"(coroutine_handler<>)来恢复协程,最后我们调用h.destroy()
来销毁协程帧。如果我们在调用counter()
的时候忽略它的返回值,或是在最后没有调用h.destroy()
,那么就会发生内存泄漏(分配在堆上的协程帧无法被销毁)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
struct ReturnObject2 {
struct promise_type {
ReturnObject2 get_return_object() {
return {.h_ = std::coroutine_handle<promise_type>::from_promise(*this)};
// C++20 designated initializer syntax
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void unhandled_exception() {}
};
std::coroutine_handle<promise_type> h_;
operator std::coroutine_handle<promise_type>() const { return h_; }
// A coroutine_handle<promise_type> converts to coroutine_handle<>
operator std::coroutine_handle<>() const { return h_; }
};
ReturnObject2 counter2(){
for (unsigned i = 0;; ++i) {
co_await std::suspend_always{};
std::cout << "counter2: " << i << std::endl;
}
}
void main2(){
std::coroutine_handle<> h = counter2();
for (int i = 0; i < 3; ++i) {
std::cout << "In main2 function\n";
h(); //same as h.resume()
}
h.destroy();
}
|
如果我们将协程中某个在协程帧当中的对象的指针通过promise_type
传递给外部,在协程被挂起后,我们可以通过该指针访问该对象。例如
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
struct ReturnObject3 {
struct promise_type {
unsigned* value_;
ReturnObject3 get_return_object() {
return ReturnObject3 {
.h_ = std::coroutine_handle<promise_type>::from_promise(*this)
};
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void unhandled_exception() {}
};
std::coroutine_handle<promise_type> h_;
operator std::coroutine_handle<promise_type>() const { return h_; }
};
template<typename PromiseType>
struct GetPromise {
PromiseType *p_;
bool await_ready() { return false; } // says yes call await_suspend
bool await_suspend(std::coroutine_handle<PromiseType> h) {
p_ = &h.promise();
return false; // says no don't suspend coroutine after all
}
PromiseType *await_resume() { return p_; }
};
ReturnObject3 counter3(){
auto pp = co_await GetPromise<ReturnObject3::promise_type>{};
for (unsigned i = 0;; ++i) {
pp->value_ = &i;
co_await std::suspend_always{};
}
}
void main3(){
std::coroutine_handle<ReturnObject3::promise_type> h = counter3();
ReturnObject3::promise_type &promise = h.promise();
for (int i = 0; i < 3; ++i) {
std::cout << "counter3: " << *promise.value_ << std::endl;
h();
}
h.destroy();
}
|
在这个例子中,couter3()
的for
循环中的i
是该协程生命周期跨过挂起点的临时变量,在每次协程挂起前我们指向它的指针放入promise_type
中,之后再在主函数中访问它。正如前面所说,生命周期跨过挂起点的临时变量会放在堆上,所以知道我们调用h.destroy()
之前他们都是可以访问的。
其中,我们利用GetPromise
对象获得了counter3()
这个协程自身的promise
对象。之前我们说过,有两种方法可以获得到一个协程的协程帧。这里,GetPromise
临时对象调用await_suspend(coroutine_handle<>)
时,调用者协程(即counter3()
这个协程)的协程帧被作为实参传递给h
,接着我们通过该协程帧拿到其对应的promnise
对象,再通过await_resume()
将其返回给调用者协程。
协程在线程间切换时的数据安全问题
我们知道,co_await
运算符让我们能够在协程挂起和执行权返回主调函数/resumer之前执行额外代码。这允许一个awaiter对象在协程挂起后初始化一个异步操作,将挂起的协程的coroutine_handle
传递给后续能安全恢复(挂起的协程)的操作,该操作完成时就可以恢复协程(可能是在另一个线程中),并且这当中不需要额外的同步语句。同时由于 await_suspend()
在当前线程上持续执行, await_suspend()
应该把*this
当作已被销毁并且在柄被发布到其他线程后不再访问它。这是因为一旦我们将协程句柄发布给其它线程,那么其它线程可能会在await_suspend()
方法返回之前恢复协程,这可能会导致(其它线程中的)协程与(当前线程中的协程的)await_suspend()
方法的剩余部分并发地执行。
而协程恢复时第一件要做的事就是调用await_resume()
方法来获取结果,之后,它大概率会立即销毁awaiter
对象(也就是调用await_suspend()
的this
指针)。(在另一个线程上恢复的)协程接下来可能会在当前线程中的await_suspend()
返回之前会执行到结束,销毁协程和promise
对象。
所以,一旦有可能在另一个线程中并发恢复当前协程,我们需要保证在await_suspend()
方法中避免访问this
指针或协程的promise
对象,因为这些可能已经(在另外的线程中)被销毁。一般来说,在操作开始和协程被安排在未来某点恢复之后能安全访问的内容就是await_suspend()
内的局部变量,下面我们看一个具体的例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
auto switch_to_new_thread(std::jthread& out) {
struct awaitable {
std::jthread* p_out;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> h) {
std::jthread& out = *p_out;
if (out.joinable())
throw std::runtime_error("Output jthread parameter not empty");
out = std::jthread([h] { h.resume(); });
// Potential undefined behavior: accessing potentially destroyed *this
// std::cout << "New thread ID: " << p_out->get_id() << '\n';
std::cout << "new thread ID:" << out.get_id() << "\n"; // this is ok
}
void await_resume() {}
};
return awaitable{&out};
}
struct task {
struct promise_type {
task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
task resuming_on_new_thread(std::jthread& out) {
std::cout << "coroutine begins, thread ID:" << std::this_thread::get_id() << "\n";
co_await switch_to_new_thread(out);
// awatier destroyed here
std::cout << "coroutine resumes, thread ID:" << std::this_thread::get_id() << "\n";
}
int main() {
std::jthread out;
resuming_on_new_thread(out);
return 0;
}
|
在这个例子中,协程开始时在一个线程上开始执行,然后通过在await_suspend()
中将coroutine_handle
传递给新线程从而使协程在另一个线程上恢复。正如前面所说的,在这种情况下,await_suspend()
执行过程中能安全访问的内容应当是await_suspend()
内的局部变量。由于协程在第二个线程恢复后与第一个线程中await_suspend()
的剩余部分并发执行,其可能在第一个线程的await_suspend()
执行过程中就已经完成了协程帧的销毁,而awaiter
对象是在协程帧中的(其生命周期跨过了挂起点),也就是说awaiter
对象(连同其持有的jthread
指针)可能在第一个线程执行await_suspend()
的过程中随时被销毁,所以我们要在将当前协程的coroutine_handle
传递给第二个线程之前先用std::jthread& out = *p_out;
将其保留在第一个线程函数的栈帧上才能保证没有错误。
My tutorial and take on C++20 coroutines