基本异步框架
seastar采用future/promise/continuation框架作为其基本异步模型。并在此基础上通过适配器整合/接入其他异步框架。
future_state
一个future/promise pair
在逻辑上维护同一个状态,它包括了这个任务的状态(是否完成),结果值和可能的异常,在seastar中抽象为class future_state
。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
struct future_state_base {
// ...
union any {
// ...
state st;
std::exception_ptr ex;
} _u;
// ...
}
template <typename T>
struct future_state: public future_state_base, private internal::uninitialized_wrapper<T> {
// ...
}
|
internal::uninitialized_wrapper<T>
类似std::optional<T>
,用于存储结果值。
对于某个异步任务,有三个地方可能存储这个任务的future_state
,但在任意时刻只有一个是有效的:
promise._local_state
future._state
- 在调用了
then()
之后被存储到下一个任务中
promise
promise
代表一个异步任务
1
2
3
4
5
6
7
8
9
10
|
class promise_base {
future_base* _future = nullptr;
future_state_base* _state;
// points to the future_state that is currently being used
task* _task = nullptr; // 初始化为nullptr,用于存放可能的continuation
}
template <typename SEASTAR_ELLIPSIS T>
class promise : private internal::promise_base_with_type<T SEASTAR_ELLIPSIS> {
future_state _local_state;
}
|
1
2
3
4
5
6
|
template <typename SEASTAR_ELLIPSIS T>
inline future<T SEASTAR_ELLIPSIS>
promise<T SEASTAR_ELLIPSIS>::get_future() noexcept {
assert(!this->_future && this->_state && !this->_task);
return future<T SEASTAR_ELLIPSIS>(this);
}
|
当异步任务完成后,通过调用promise
的set_value()
保存任务的结果值,再通过future_state
的set()
方法原地构造保存结果值到get_state()
返回的promise
的_state
,同时将其状态设为就绪。
1
2
3
4
5
6
7
8
9
10
11
12
|
template <typename... A>
void set_value(A&&... a) noexcept {
if (auto *s = get_state()) {
s->set(std::forward<A>(a)...);
make_ready<urgent::no>();
}
}
template <typename... A>
void set(A&&... a) noexcept {
assert(_u.st == state::future);
new (this) future_state(ready_future_marker(), std::forward<A>(a)...);
}
|
1
2
3
4
5
6
7
8
9
10
|
template <promise_base::urgent Urgent>
void promise_base::make_ready() noexcept {
if (_task) {
if (Urgent == urgent::yes) {
::seastar::schedule_urgent(std::exchange(_task, nullptr));
} else {
::seastar::schedule(std::exchange(_task, nullptr));
}
}
}
|
如果此时promise
对象task_!=nullptr
(即存在continuation
),则将其交给seastar
的reactor调度器中。
future
future
代表一个可能未完成的异步任务的计算结果。
1
2
3
4
5
6
7
8
9
|
class future_base {
promise_base* _promise;
// ...
}
class SEASTAR_NODISCARD future : private internal::future_base {
using future_state = seastar::future_state<internal::future_stored_type_t<T SEASTAR_ELLIPSIS>>;
future_state _state;
// ...
}
|
重点看.then()
的实现,then()
方法的调用链如下:
future::then()
->call_then_impl::run()
->future::then_impl()
->future::then_impl_nrvo()
1
2
3
4
5
6
7
|
template <typename Func, typename Result = futurize_t<typename call_then_impl::template result_type<Func>>>
SEASTAR_CONCEPT( requires std::invocable<Func, T SEASTAR_ELLIPSIS> || internal::CanInvokeWhenAllSucceed<Func, T SEASTAR_ELLIPSIS>)
Result
then(Func&& func) noexcept {
// ...
return call_then_impl::run(*this, std::move(func));
}
|
1
2
3
4
5
6
7
|
template <typename... T>
struct call_then_impl<future<T...>> {
// ...
static result_type<Func> run(future<T...>& fut, Func&& func) noexcept {
return fut.then_impl(std::forward<Func>(func));
}
};
|
在then_impl()
中,如果任务发生异常,则返回异常。如果任务已完成,则直接就地运行then()
中的函数而不将其放入调度器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
bool available() const noexcept { return st == state::result || st >= state::exception_min; }
bool failed() const noexcept { return __builtin_expect(st >= state::exception_min, false); }
template <typename Func, typename Result = futurize_t<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>>
Result
then_impl(Func&& func) noexcept {
#ifndef SEASTAR_DEBUG
using futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;
if (failed()) {
return futurator::make_exception_future(static_cast<future_state_base&&>(get_available_state_ref()));
} else if (available()) {
#if SEASTAR_API_LEVEL < 5
return futurator::apply(std::forward<Func>(func), get_available_state_ref().take_value());
#else
return futurator::invoke(std::forward<Func>(func), get_available_state_ref().take_value());
#endif
}
#endif
return then_impl_nrvo<Func, Result>(std::forward<Func>(func));
}
|
then_impl_nrvo()
首先创造一个新的future
对象,并通过get_promise()
在创建一个新的promise
对象绑定到该future
对象上,然后将该新创建的promise
对象和需要调度的函数func
以及一个用于处理前一个任务完成后额外逻辑的(异常处理,将前一个任务的值传入func运行)wrapper函数丢进future::schedule()
中,并返回这个future
对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
template <typename Func, typename Result>
Result then_impl_nrvo(Func&& func) noexcept {
using futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;
typename futurator::type fut(future_for_get_promise_marker{});
using pr_type = decltype(fut.get_promise());
schedule(fut.get_promise(), std::move(func), [](pr_type&& pr, Func& func, future_state&& state) {
if (state.failed()) {
pr.set_exception(static_cast<future_state_base&&>(std::move(state)));
} else {
futurator::satisfy_with_result_of(std::move(pr), [&func, &state] {
#if SEASTAR_API_LEVEL < 5
return std::apply(func, std::move(state).get_value());
#else
// clang thinks that "state" is not used, below, for future<>.
// Make it think it is used to avoid an unused-lambda-capture warning.
(void)state;
return internal::future_invoke(func, std::move(state).get_value());
#endif
});
}
});
return fut;
}
|
这里futurator::satisfy_with_result_of()
根据调度任务func
返回的类型进行处理当上述新创建的promise
对象。当func()
返回future
对象时,通过future::forward_to()
将其绑定的promise
对象改为新创建的promise
对象;当func()
返回值时,将其设置到promise
对象中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
template<typename T>
template<typename Func>
SEASTAR_CONCEPT( requires std::invocable<Func> )
void futurize<T>::satisfy_with_result_of(promise_base_with_type&& pr, Func&& func) {
using ret_t = decltype(func());
if constexpr (std::is_void_v<ret_t>) {
func();
pr.set_value();
} else if constexpr (is_future<ret_t>::value) {
func().forward_to(std::move(pr));
} else {
pr.set_value(func());
}
}
void future::forward_to(internal::promise_base_with_type<T SEASTAR_ELLIPSIS>&& pr) noexcept {
if (_state.available()) {
pr.set_urgent_state(std::move(_state));
} else {
*detach_promise() = std::move(pr);
}
}
|
future::schdule()
用上述参数构造continuation对象
并将future_state
设置为invalid
状态,然后调用future_base::schedule()
。
1
2
3
4
5
6
7
8
9
|
void future::schedule(Pr&& pr, Func&& func, Wrapper&& wrapper) noexcept {
// ...
auto tws = new continuation<Pr, Func, Wrapper, T SEASTAR_ELLIPSIS>(std::move(pr), std::move(func), std::move(wrapper));
schedule(tws);
_state._u.st = future_state_base::state::invalid;
}
void futuree::schedule(continuation_base<T SEASTAR_ELLIPSIS>* tws) noexcept {
future_base::schedule(tws, &tws->_state);
}
|
这里解除初始任务的promise
对象和其future
对象的绑定,并将其task_
成员绑定到刚刚创建的continuation
对象上。
1
2
3
4
5
6
7
8
9
10
|
void future_base::schedule(task* tws, future_state_base* state) noexcept {
promise_base* p = detach_promise();
p->_state = state;
p->_task = tws;
}
promise_base* future_base::detach_promise() noexcept {
_promise->_state = nullptr;
_promise->_future = nullptr;
return std::exchange(_promise, nullptr);
}
|
continuation
continuation
对象只是使用then()
所连接的任务的连接点的简单封装。promise
对象在被调用set_value()
后会将该continuation
放入调度器中。前一个任务结束后的逻辑已经被写入了_wrapper
函数中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
class continuation_base : public task {
// ...
}
class continuation_base_with_promise : public continuation_base<T SEASTAR_ELLIPSIS> {
// ...
Promise _pr;
};
struct continuation final : continuation_base_with_promise<Promise, T SEASTAR_ELLIPSIS> {
// ...
virtual void run_and_dispose() noexcept override {
try {
_wrapper(std::move(this->_pr), _func, std::move(this->_state));
} catch (...) {
this->_pr.set_to_current_exception();
}
delete this;
}
Func _func;
[[no_unique_address]] Wrapper _wrapper;
}
|
run_and_dispose()
则是调度器调度任务的接口。
1
2
3
4
5
6
7
8
9
10
11
|
void reactor::run_tasks(task_queue& tq) {
// ...
while (!tasks.empty()) {
auto tsk = tasks.front();
tasks.pop_front();
// ...
tsk->run_and_dispose();
// ...
}
// ...
}
|
链式调用小结
通过上面的分析,我们可以清楚地看到seastar或中一个链式调用的实现如下所示:
- 某个异步任务创建并返回一个
future
对象f1
。
- 在该
future
对象上调用then(func)
。在future::then_imple_nrvo()
中临时创建新的future
对象f2
和与f2
关联的promise
对象p2
。用p2
和func
构造continuation
对象c1
。
- 将
f1
所关联的promise
对象p1
与f1
解绑,并将p1._task
设为上述c1
。
- 将
f2
作为then()
的返回值。
- 在之后的某个时刻,
p1
对象上的set_value()
方法被调用,在promise_base::make_ready()
中p2._task
(即c1
)被放入全局调度器reactor
对象中。
- 全局调度器在之后的某个时刻调度到该任务
c1
,调用c1.run_and_dispose()
运行,从而运行c1
的_wrapper
函数(在future::then_impl_nrvo()
中构建,用于处理两个任务交接的逻辑)。
- 在
_wrapper
函数中处理异常或调用futurator::satisfy_with_result_of()
。
- 在
futurator::satisfy_with_result_of()
中将前一个任务的结果转发到func()
上运行,并根据其返回值设置p2
的状态。
对C++20协程的适配