基本异步框架

seastar采用future/promise/continuation框架作为其基本异步模型。并在此基础上通过适配器整合/接入其他异步框架。

future_state

一个future/promise pair在逻辑上维护同一个状态,它包括了这个任务的状态(是否完成),结果值和可能的异常,在seastar中抽象为class future_state

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct future_state_base {
    // ...
    union any {
        // ...
        state st;
        std::exception_ptr ex;
    } _u;
    // ...
}
template <typename T>
struct future_state:  public future_state_base, private internal::uninitialized_wrapper<T> {
	// ...
}

internal::uninitialized_wrapper<T>类似std::optional<T>,用于存储结果值。

对于某个异步任务,有三个地方可能存储这个任务的future_state,但在任意时刻只有一个是有效的:

  • promise._local_state
  • future._state
  • 在调用了then()之后被存储到下一个任务中

promise

promise代表一个异步任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class promise_base {
    future_base* _future = nullptr;
    future_state_base* _state; 
    // points to the future_state that is currently being used
    task* _task = nullptr; // 初始化为nullptr,用于存放可能的continuation
}
template <typename SEASTAR_ELLIPSIS T>
class promise : private internal::promise_base_with_type<T SEASTAR_ELLIPSIS> {
    future_state _local_state;
}
1
2
3
4
5
6
template <typename SEASTAR_ELLIPSIS T>
inline future<T SEASTAR_ELLIPSIS>
promise<T SEASTAR_ELLIPSIS>::get_future() noexcept {
    assert(!this->_future && this->_state && !this->_task);
    return future<T SEASTAR_ELLIPSIS>(this);
}

当异步任务完成后,通过调用promiseset_value()保存任务的结果值,再通过future_stateset()方法原地构造保存结果值到get_state()返回的promise_state,同时将其状态设为就绪。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
template <typename... A>
void set_value(A&&... a) noexcept {
    if (auto *s = get_state()) {
        s->set(std::forward<A>(a)...);
        make_ready<urgent::no>();
    }
}
template <typename... A>
void set(A&&... a) noexcept {
    assert(_u.st == state::future);
    new (this) future_state(ready_future_marker(), std::forward<A>(a)...);
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
template <promise_base::urgent Urgent>
void promise_base::make_ready() noexcept {
    if (_task) {
        if (Urgent == urgent::yes) {
            ::seastar::schedule_urgent(std::exchange(_task, nullptr));
        } else {
            ::seastar::schedule(std::exchange(_task, nullptr));
        }
    }
}

如果此时promise对象task_!=nullptr(即存在continuation),则将其交给seastar的reactor调度器中。

future

future代表一个可能未完成的异步任务的计算结果。

1
2
3
4
5
6
7
8
9
class future_base {
    promise_base* _promise;
    // ...
}
class SEASTAR_NODISCARD future : private internal::future_base {
    using future_state = seastar::future_state<internal::future_stored_type_t<T SEASTAR_ELLIPSIS>>;
    future_state _state;
    // ...
}

重点看.then()的实现,then()方法的调用链如下:

future::then()->call_then_impl::run()->future::then_impl()->future::then_impl_nrvo()

1
2
3
4
5
6
7
template <typename Func, typename Result = futurize_t<typename call_then_impl::template result_type<Func>>>
    SEASTAR_CONCEPT( requires std::invocable<Func, T SEASTAR_ELLIPSIS> || internal::CanInvokeWhenAllSucceed<Func, T SEASTAR_ELLIPSIS>)
    Result
    then(Func&& func) noexcept {
        // ...
        return call_then_impl::run(*this, std::move(func));
    }
1
2
3
4
5
6
7
template <typename... T>
struct call_then_impl<future<T...>> {
    // ...
    static result_type<Func> run(future<T...>& fut, Func&& func) noexcept {
        return fut.then_impl(std::forward<Func>(func));
    }
};

then_impl()中,如果任务发生异常,则返回异常。如果任务已完成,则直接就地运行then()中的函数而不将其放入调度器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
bool available() const noexcept { return st == state::result || st >= state::exception_min; }
        bool failed() const noexcept { return __builtin_expect(st >= state::exception_min, false); }

template <typename Func, typename Result = futurize_t<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>>
    Result
    then_impl(Func&& func) noexcept {
#ifndef SEASTAR_DEBUG
        using futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;
        if (failed()) {
            return futurator::make_exception_future(static_cast<future_state_base&&>(get_available_state_ref()));
        } else if (available()) {
#if SEASTAR_API_LEVEL < 5
            return futurator::apply(std::forward<Func>(func), get_available_state_ref().take_value());
#else
            return futurator::invoke(std::forward<Func>(func), get_available_state_ref().take_value());
#endif
        }
#endif
        return then_impl_nrvo<Func, Result>(std::forward<Func>(func));
    }

then_impl_nrvo()首先创造一个新的future对象,并通过get_promise()在创建一个新的promise对象绑定到该future对象上,然后将该新创建的promise对象和需要调度的函数func以及一个用于处理前一个任务完成后额外逻辑的(异常处理,将前一个任务的值传入func运行)wrapper函数丢进future::schedule()中,并返回这个future对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template <typename Func, typename Result>
    Result then_impl_nrvo(Func&& func) noexcept {
        using futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;
        typename futurator::type fut(future_for_get_promise_marker{});
        using pr_type = decltype(fut.get_promise());
        schedule(fut.get_promise(), std::move(func), [](pr_type&& pr, Func& func, future_state&& state) {
            if (state.failed()) {
                pr.set_exception(static_cast<future_state_base&&>(std::move(state)));
            } else {
                futurator::satisfy_with_result_of(std::move(pr), [&func, &state] {
#if SEASTAR_API_LEVEL < 5
                    return std::apply(func, std::move(state).get_value());
#else
                    // clang thinks that "state" is not used, below, for future<>.
                    // Make it think it is used to avoid an unused-lambda-capture warning.
                    (void)state;
                    return internal::future_invoke(func, std::move(state).get_value());
#endif
                });
            }
        });
        return fut;
    }

这里futurator::satisfy_with_result_of()根据调度任务func返回的类型进行处理当上述新创建的promise对象。当func()返回future对象时,通过future::forward_to()将其绑定的promise对象改为新创建的promise对象;当func()返回值时,将其设置到promise对象中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
template<typename T>
template<typename Func>
SEASTAR_CONCEPT( requires std::invocable<Func> )
void futurize<T>::satisfy_with_result_of(promise_base_with_type&& pr, Func&& func) {
    using ret_t = decltype(func());
    if constexpr (std::is_void_v<ret_t>) {
        func();
        pr.set_value();
    } else if constexpr (is_future<ret_t>::value) {
        func().forward_to(std::move(pr));
    } else {
        pr.set_value(func());
    }
}
void future::forward_to(internal::promise_base_with_type<T SEASTAR_ELLIPSIS>&& pr) noexcept {
    if (_state.available()) {
        pr.set_urgent_state(std::move(_state));
    } else {
        *detach_promise() = std::move(pr);
    }
}

future::schdule()用上述参数构造continuation对象并将future_state设置为invalid状态,然后调用future_base::schedule()

1
2
3
4
5
6
7
8
9
void future::schedule(Pr&& pr, Func&& func, Wrapper&& wrapper) noexcept {
	// ...
    auto tws = new continuation<Pr, Func, Wrapper, T SEASTAR_ELLIPSIS>(std::move(pr), std::move(func), std::move(wrapper));
    schedule(tws);
    _state._u.st = future_state_base::state::invalid;
}
void futuree::schedule(continuation_base<T SEASTAR_ELLIPSIS>* tws) noexcept {
    future_base::schedule(tws, &tws->_state);
}

这里解除初始任务的promise对象和其future对象的绑定,并将其task_成员绑定到刚刚创建的continuation对象上。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
void future_base::schedule(task* tws, future_state_base* state) noexcept {
    promise_base* p = detach_promise();
    p->_state = state;
    p->_task = tws;
}
promise_base* future_base::detach_promise() noexcept {
    _promise->_state = nullptr;
    _promise->_future = nullptr;
    return std::exchange(_promise, nullptr);
}

continuation

continuation对象只是使用then()所连接的任务的连接点的简单封装。promise对象在被调用set_value()后会将该continuation放入调度器中。前一个任务结束后的逻辑已经被写入了_wrapper函数中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class continuation_base : public task {
	// ...
}
class continuation_base_with_promise : public continuation_base<T SEASTAR_ELLIPSIS> {
    // ...
    Promise _pr;
};
struct continuation final : continuation_base_with_promise<Promise, T SEASTAR_ELLIPSIS> {
    // ...
    virtual void run_and_dispose() noexcept override {
        try {
            _wrapper(std::move(this->_pr), _func, std::move(this->_state));
        } catch (...) {
            this->_pr.set_to_current_exception();
        }
        delete this;
    }
    Func _func;
    [[no_unique_address]] Wrapper _wrapper;
}

run_and_dispose()则是调度器调度任务的接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void reactor::run_tasks(task_queue& tq) {
    // ...
    while (!tasks.empty()) {
        auto tsk = tasks.front();
        tasks.pop_front();
        // ...
        tsk->run_and_dispose();
        // ...
    }
    // ...
}

链式调用小结

通过上面的分析,我们可以清楚地看到seastar或中一个链式调用的实现如下所示:

  • 某个异步任务创建并返回一个future对象f1
  • 在该future对象上调用then(func)。在future::then_imple_nrvo()中临时创建新的future对象f2和与f2关联的promise对象p2。用p2func构造continuation对象c1
  • f1所关联的promise对象p1f1解绑,并将p1._task设为上述c1
  • f2作为then()的返回值。
  • 在之后的某个时刻,p1对象上的set_value()方法被调用,在promise_base::make_ready()p2._task(即c1)被放入全局调度器reactor对象中。
  • 全局调度器在之后的某个时刻调度到该任务c1,调用c1.run_and_dispose()运行,从而运行c1_wrapper函数(在future::then_impl_nrvo()中构建,用于处理两个任务交接的逻辑)。
  • _wrapper函数中处理异常或调用futurator::satisfy_with_result_of()
  • futurator::satisfy_with_result_of()中将前一个任务的结果转发到func()上运行,并根据其返回值设置p2的状态。

对C++20协程的适配