基本异步框架
seastar采用future/promise/continuation框架作为其基本异步模型。并在此基础上通过适配器整合/接入其他异步框架。
future_state
future_state是一个任务相关的future/promise pair在逻辑上维护的同一个状态,它包括了这个任务的状态(是否完成),结果值和可能的异常,在seastar中抽象为class future_state。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
struct future_state_base {
// ...
union any {
// ...
state st;
std::exception_ptr ex;
} _u;
// ...
}
template <typename T>
struct future_state: public future_state_base, private internal::uninitialized_wrapper<T> {
// ...
}
|
internal::uninitialized_wrapper<T>类似std::optional<T>,用于存储结果值。
对于某个异步任务,有三个地方可能存储这个任务的future_state,但在任意时刻只有一个是有效的:
promise._local_state
future._state
- 在调用了
then()之后被存储到下一个任务中
promise
promise代表一个异步任务
1
2
3
4
5
6
7
8
9
10
|
class promise_base {
future_base* _future = nullptr;
future_state_base* _state;
// points to the future_state that is currently being used
task* _task = nullptr; // 初始化为nullptr,用于存放可能的continuation
}
template <typename SEASTAR_ELLIPSIS T>
class promise : private internal::promise_base_with_type<T SEASTAR_ELLIPSIS> {
future_state _local_state;
}
|
1
2
3
4
5
6
|
template <typename SEASTAR_ELLIPSIS T>
inline future<T SEASTAR_ELLIPSIS>
promise<T SEASTAR_ELLIPSIS>::get_future() noexcept {
assert(!this->_future && this->_state && !this->_task);
return future<T SEASTAR_ELLIPSIS>(this);
}
|
当异步任务完成后,通过调用promise的set_value()保存任务的结果值,promise对象会通过future_state的set()方法原地构造保存结果值到get_state()返回的promise的_state,同时将其状态设为就绪。如果在构造过程中发生异常则捕获并改变future的对应状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
class promise {
template <typename... A>
void set_value(A&&... a) noexcept {
internal::promise_base_with_type<T SEASTAR_ELLIPSIS>::set_value(std::forward<A>(a)...);
}
}
class promise_base_with_type {
template <typename... A>
void set_value(A&&... a) noexcept {
if (auto *s = get_state()) {
s->set(std::forward<A>(a)...);
make_ready<urgent::no>();
}
}
}
class future_state {
template <typename... A>
void set(A&&... a) noexcept {
assert(_u.st == state::future);
new (this) future_state(ready_future_marker(), std::forward<A>(a)...);
}
template <typename... A>
future_state(ready_future_marker, A&&... a) noexcept : future_state_base(state::result) {
try {
this->uninitialized_set(std::forward<A>(a)...);
} catch (...) {
new (this) future_state(current_exception_future_marker());
}
}
}
|
1
2
3
4
5
6
7
8
9
10
|
template <promise_base::urgent Urgent>
void promise_base::make_ready() noexcept {
if (_task) {
if (Urgent == urgent::yes) {
::seastar::schedule_urgent(std::exchange(_task, nullptr));
} else {
::seastar::schedule(std::exchange(_task, nullptr));
}
}
}
|
如果此时该promise对象task_!=nullptr(即存在continuation),则将其交给seastar的reactor调度器中进行调度。
future
future代表一个可能未完成的异步任务的计算结果。
1
2
3
4
5
6
7
8
9
|
class future_base {
promise_base* _promise;
// ...
}
class SEASTAR_NODISCARD future : private internal::future_base {
using future_state = seastar::future_state<internal::future_stored_type_t<T SEASTAR_ELLIPSIS>>;
future_state _state;
// ...
}
|
重点看.then()的实现,then()方法的调用链如下:
future::then()->call_then_impl::run()->future::then_impl()->future::then_impl_nrvo()
1
2
3
4
5
6
7
|
template <typename Func, typename Result = futurize_t<typename call_then_impl::template result_type<Func>>>
SEASTAR_CONCEPT( requires std::invocable<Func, T SEASTAR_ELLIPSIS> || internal::CanInvokeWhenAllSucceed<Func, T SEASTAR_ELLIPSIS>)
Result
then(Func&& func) noexcept {
// ...
return call_then_impl::run(*this, std::move(func));
}
|
1
2
3
4
5
6
7
|
template <typename... T>
struct call_then_impl<future<T...>> {
// ...
static result_type<Func> run(future<T...>& fut, Func&& func) noexcept {
return fut.then_impl(std::forward<Func>(func));
}
};
|
在then_impl()中,如果任务发生异常,则返回异常。如果任务已完成,则直接就地运行then()中的函数而不将其放入调度器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
bool available() const noexcept { return st == state::result || st >= state::exception_min; }
bool failed() const noexcept { return __builtin_expect(st >= state::exception_min, false); }
template <typename Func, typename Result = futurize_t<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>>
Result
then_impl(Func&& func) noexcept {
#ifndef SEASTAR_DEBUG
using futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;
if (failed()) {
return futurator::make_exception_future(static_cast<future_state_base&&>(get_available_state_ref()));
} else if (available()) {
#if SEASTAR_API_LEVEL < 5
return futurator::apply(std::forward<Func>(func), get_available_state_ref().take_value());
#else
return futurator::invoke(std::forward<Func>(func), get_available_state_ref().take_value());
#endif
}
#endif
return then_impl_nrvo<Func, Result>(std::forward<Func>(func));
}
|
then_impl_nrvo()首先创造一个新的future对象,并通过get_promise()在创建一个新的promise对象绑定到该future对象上,然后将该新创建的promise对象和需要调度的函数func以及一个构造continuation对象所需的用于处理前一个任务完成后额外逻辑的(异常处理,将前一个任务的值传入func运行)的回调函数wrapper丢进future::schedule()中,并以这个future对象作为返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
template <typename Func, typename Result>
Result then_impl_nrvo(Func&& func) noexcept {
using futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;
typename futurator::type fut(future_for_get_promise_marker{});
using pr_type = decltype(fut.get_promise());
schedule(fut.get_promise(), std::move(func), [](pr_type&& pr, Func& func, future_state&& state) {
if (state.failed()) {
pr.set_exception(static_cast<future_state_base&&>(std::move(state)));
} else {
futurator::satisfy_with_result_of(std::move(pr), [&func, &state] {
#if SEASTAR_API_LEVEL < 5
return std::apply(func, std::move(state).get_value());
#else
// clang thinks that "state" is not used, below, for future<>.
// Make it think it is used to avoid an unused-lambda-capture warning.
(void)state;
return internal::future_invoke(func, std::move(state).get_value());
#endif
});
}
});
return fut;
}
|
这里futurator::satisfy_with_result_of()根据调度任务func返回的类型进行处理当上述新创建的promise对象。当func()返回future对象时,通过future::forward_to()将其绑定的promise对象改为新创建的promise对象;当func()返回值时,将其设置到promise对象中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
template<typename T>
template<typename Func>
SEASTAR_CONCEPT( requires std::invocable<Func> )
void futurize<T>::satisfy_with_result_of(promise_base_with_type&& pr, Func&& func) {
using ret_t = decltype(func());
if constexpr (std::is_void_v<ret_t>) {
func();
pr.set_value();
} else if constexpr (is_future<ret_t>::value) {
func().forward_to(std::move(pr));
} else {
pr.set_value(func());
}
}
void future::forward_to(internal::promise_base_with_type<T SEASTAR_ELLIPSIS>&& pr) noexcept {
if (_state.available()) {
pr.set_urgent_state(std::move(_state));
} else {
*detach_promise() = std::move(pr);
}
}
|
future::schdule()用上述参数构造continuation对象并将future_state设置为invalid状态,然后调用future_base::schedule()。
1
2
3
4
5
6
7
8
9
|
void future::schedule(Pr&& pr, Func&& func, Wrapper&& wrapper) noexcept {
// ...
auto tws = new continuation<Pr, Func, Wrapper, T SEASTAR_ELLIPSIS>(std::move(pr), std::move(func), std::move(wrapper));
schedule(tws);
_state._u.st = future_state_base::state::invalid;
}
void future::schedule(continuation_base<T SEASTAR_ELLIPSIS>* tws) noexcept {
future_base::schedule(tws, &tws->_state);
}
|
这里解除初始任务的promise对象和其future对象的绑定,并将其task_成员绑定到刚刚创建的continuation对象上。
1
2
3
4
5
6
7
8
9
10
|
void future_base::schedule(task* tws, future_state_base* state) noexcept {
promise_base* p = detach_promise();
p->_state = state;
p->_task = tws;
}
promise_base* future_base::detach_promise() noexcept {
_promise->_state = nullptr;
_promise->_future = nullptr;
return std::exchange(_promise, nullptr);
}
|
continuation
continuation对象只是使用then()所连接的任务的连接点的简单封装。promise对象在被调用set_value()后会将该continuation放入调度器中。前一个任务结束后的逻辑已经被写入了_wrapper函数中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
class continuation_base : public task {
// ...
}
class continuation_base_with_promise : public continuation_base<T SEASTAR_ELLIPSIS> {
// ...
Promise _pr;
};
struct continuation final : continuation_base_with_promise<Promise, T SEASTAR_ELLIPSIS> {
// ...
virtual void run_and_dispose() noexcept override {
try {
_wrapper(std::move(this->_pr), _func, std::move(this->_state));
} catch (...) {
this->_pr.set_to_current_exception();
}
delete this;
}
Func _func;
[[no_unique_address]] Wrapper _wrapper;
}
|
run_and_dispose()则是调度器调度task对象的接口。
1
2
3
4
5
6
7
8
9
10
11
|
void reactor::run_tasks(task_queue& tq) {
// ...
while (!tasks.empty()) {
auto tsk = tasks.front();
tasks.pop_front();
// ...
tsk->run_and_dispose();
// ...
}
// ...
}
|
链式调用小结
通过上面的分析,我们可以清楚地看到seastar或中一个链式调用的实现如下所示:
- 某个异步任务创建并返回一个
future对象f1。
- 在该
future对象上调用then(func)。在future::then_imple_nrvo()中临时创建新的future对象f2和与f2关联的promise对象p2。用p2和func构造continuation对象c1。
- 将
f1所关联的promise对象p1与f1解绑,并将p1._task设为上述c1。
- 将
f2作为then()的返回值。
- 在之后的某个时刻,
p1对象上的set_value()方法被调用,在promise_base::make_ready()中p2._task(即c1)被放入全局调度器reactor对象中。
- 全局调度器在之后的某个时刻调度到该任务
c1,调用c1.run_and_dispose()运行,从而运行c1的_wrapper函数(在future::then_impl_nrvo()中构建,用于处理两个任务交接的逻辑)。
- 在
_wrapper函数中处理异常或调用futurator::satisfy_with_result_of()。
- 在
futurator::satisfy_with_result_of()中将前一个任务的结果转发到func()上运行,并根据其返回值设置p2的状态。
对C++20协程的适配
不同于上述基于future/promise的回调方案,协程是异步操作同步化的另一套解决方案,C++20的协程为我们提供了高度定制化的能力。要接入C++20协程,需要提供相应的promise_type和awaiter并实现相应的接口。Seastar采用future作为协程返回类型,并偏特化coroutine_traits<>来设置用于控制协程的promise_type
1
2
|
template<typename... T, typename... Args>
class coroutine_traits<seastar::future<T...>, Args...> : public seastar::internal::coroutine_traits_base<T...> {};
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
template <typename T = void>
class coroutine_traits_base {
public:
class promise_type final : public seastar::task {
seastar::promise<T> _promise;
public:
promise_type() = default;
promise_type(promise_type&&) = delete;
promise_type(const promise_type&) = delete;
template<typename... U>
void return_value(U&&... value) {
_promise.set_value(std::forward<U>(value)...);
}
void return_value(coroutine::exception ce) noexcept {
_promise.set_exception(std::move(ce.eptr));
}
[[deprecated("Forwarding coroutine returns are deprecated as too dangerous. Use 'co_return co_await ...' until explicit syntax is available.")]]
void return_value(future<T>&& fut) noexcept {
fut.forward_to(std::move(_promise));
}
void unhandled_exception() noexcept {
_promise.set_exception(std::current_exception());
}
seastar::future<T> get_return_object() noexcept {
return _promise.get_future();
}
SEASTAR_INTERNAL_COROUTINE_NAMESPACE::suspend_never initial_suspend() noexcept { return { }; }
SEASTAR_INTERNAL_COROUTINE_NAMESPACE::suspend_never final_suspend() noexcept { return { }; }
virtual void run_and_dispose() noexcept override {
auto handle = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<promise_type>::from_promise(*this);
handle.resume();
}
task* waiting_task() noexcept override { return _promise.waiting_task(); }
scheduling_group set_scheduling_group(scheduling_group sg) noexcept {
return std::exchange(this->_sg, sg);
}
};
};
|
这里将initial_suspend()和final_suspend()的返回值设为suspend_never类型不做额外的逻辑。在set_value()中将结果保存至promise对象中(对返回类型为void做了特化)。同时promise_type作为一个可调度对象,需要继承task并在run_and_dispose()中将控制流转移到对应的协程上。
对于以future为返回类型的协程,其awaiter如下(同样对返回tuple,单个值和void做了相应特化,下面只展示一个):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
template<typename... T>
auto operator co_await(future<T...> f) noexcept {
return internal::awaiter<true, T...>(std::move(f));
}
template<bool CheckPreempt, typename... T>
struct awaiter {
seastar::future<T...> _future;
public:
explicit awaiter(seastar::future<T...>&& f) noexcept : _future(std::move(f)) { }
awaiter(const awaiter&) = delete;
awaiter(awaiter&&) = delete;
bool await_ready() const noexcept {
return _future.available() && (!CheckPreempt || !need_preempt());
}
template<typename U>
void await_suspend(SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<U> hndl) noexcept {
if (!CheckPreempt || !_future.available()) {
_future.set_coroutine(hndl.promise());
} else {
schedule(&hndl.promise());
}
}
std::tuple<T...> await_resume() { return _future.get(); }
};
void schedule(task* t) noexcept {
engine().add_task(t);
}
void internal::future_base::set_coroutine(task& coroutine) noexcept {
assert(_promise);
_promise->_task = &coroutine;
}
|
默认情况下co_await一个future会进行抢占检查。在await_ready()中检查,如果任务已经完成且不需要抢占,则不挂当前协程直接返回结果。否则,在await_suspend()中将当前协程设为future_对应异步任务的后继任务,然后挂起当前协程,并将控制流返回到更外层(通常是seastar调度器)。否则如果任务已完成,则直接将当前协程加入到调度器中。
此外seastar还提供了一些辅助类用于更好地使用协程。
maybe_yield: 类似于用于让出CPU时间片的shced_yield(),用于防止长任务长时间占据调度器时间片而阻止调度器调度其他任务和reacotr进行I/O操作,用法类型于yield()。
all: 用于同步多个协程,当其对应任务均完成后返回到调用方。
swich_to:用于将当前协程切换到另一个调度组上执行。