Cpp中还不完美的Coroutines


概述

Coroutines重要的不是它的性能,而是它带来的编程思维的改变,它能让你用同步的眼光去编写代码,也就是在对网络编程来说能消灭掉那些烦人的回调函数,仅此一点它的重要程度就不下于C++11,其次本篇文章为极客时间卢誉声老师的现代 C++20 实战高手课笔记,如果觉得本篇文章对你有帮助可以前去购买。

C++协程

首先,需要先纠正网络上盛传的观点“线程是轻量的进程,协程是更轻量的线程”,协程用一句话概括就是可以暂停恢复的函数。

而现代 C++20 标准下的协程,是一种可休眠、可恢复、不基于栈实现的函数。协程相较于函数来说是一个更加泛化的概念。函数只有“调用”和“返回”两个行为,而协程在这一基础上进行了扩展,增加了“休眠”和“恢复”。

同时,C++的协程属于无栈协程(与传统意义上的GO的协程不同),因此 C++ 并没有提供标准调度器,需要开发者按照规定的规则实现所需的接口约定(也就是需要在类或者结构体内部按照约定的名称命名函数,俗称硬编码),一般需要实现这么几个:

  • 调用者的传参方式。
  • 将值返回给调用者的方式。
  • 休眠与恢复执行的规则。
  • 异常处理方式。

协程执行

相较于函数,协程是一种可休眠、可恢复、不基于栈实现的函数。因此,它的生命周期管理自然与普通函数不同。也可以预见,C++ 中的协程是基于堆来实现的:

  1. 调用函数在堆上创建协程帧(coroutine frame),用于存储协程的各类数据。协程帧的结构属于内存模型,因此不同编译器可能会有不同实现。
  2. 调用被调协程,开始执行协程代码。
  3. 被调协程执行到某个特定点,通过 co_await/co_yield 将当前协程休眠,线程 1 恢复自身原本的执行流程继续执行剩余代码
  4. 线程 2 通过协程句柄 coroutine_handle 的恢复函数 resume 恢复协程执行。
  5. 协程在线程 2 上恢复执行,继续向下执行,直到协程执行结束为止。结束后回到线程 2 的原本执行流程继续执行代码。
  6. 最后,线程 2 负责通过协程句柄 coroutine_handle 销毁协程帧。

C++ 中的协程并不会像函数调用那样在栈上执行,它的状态储存在堆上。因此,我们只能在函数调用过程中,通过协程句柄 coroutine_handle 改变“部分”协程的状态——恢复或销毁。

图中浅蓝色部分的方法就是 Return_t 关联的 promise 对象的函数,浅红色部分就是 co_await 等待的 awaiter。这个流程的驱动是由编译器根据协程函数生成的代码驱动的,分成三部分:

  • 协程创建;
  • co_await awaiter 等待 task 完成;
  • 获取协程返回值和释放协程帧。

接口规定

C++ 的协程要求开发者实现大量接口约定,而我们很难通过阅读标准文档来编写实际的代码,所以我们有必要学习一下实现接口约定的实践方法,这对我们在 C++ 里熟练应用协程非常重要。

实践里,有两个重要的用户自定义类型 Promise 和 Awaitable。我们先来看看 Promise(这里的promise和std::future里的promise没有任何关系,是协程和开发者约定的数据类型),Promise允许我们通过这个对象提供值,这个值会在之后计算得出:

生成器 Generator 用来控制协程,包括协程调用、返回值的操作、co_await/co_yield 的具体行为以及 promise_type 的具体定义。

我们要从 coroutine_traits 这一 concept 开始说起。标准中提供的代码如下:

template<class, class...>
struct coroutine_traits {};

template<class R, class... Args>
requires requires { typename R::promise_type; }
struct coroutine_traits<R, Args...> {
  using promise_type = typename R::promise_type;
};

实际编程代码中特化的 coroutine_traits 必须定义一个公有的 promise_type 成员(见代码第 7 行)。否则,这就不是一个有效的协程类且属于未定义行为。也就是说,编译器会查找协程类中的 promise_type 作为 Promise,若 promise_type 不存在,就会发生编译时报错。

与此同时,一个协程类需要包含三个成员,分别是 promise_typecoroutine_handlecoroutine_state,这些都是硬编码,编译器不会给你任何提示(支持还不够好),其中:

promise_type 在协程内操作时使用,它必须满足一定规则的类型,包含一系列约束的函数来支持休眠和恢复等功能,包括提交协程的返回值、提交协程异常。通常来说,编译器会通过 promise_type 的具体实现来判断协程是否合法。

coroutine_handle 在协程外部操作时使用,可供调用者使用来休眠协程。它的类型是标准库提供的模板类,封装了协程帧的指针以及恢复、销毁协程帧的接口。

coroutine_state 用于支持协程的生命周期,是运行时分配在堆(如果开启了编译器优化选项,则有可能会被优化使用寄存器)上的对象,目的是进一步规范说明协程执行时在堆上创建的数据,包括以下内容:

  • promise 对象
  • 参数(在协程创建时,会拷贝所有函数参数进入协程帧)
  • 当前休眠的状态(在运行时,供后续恢复或销毁协程帧使用)
  • 局部变量(在运行时,供协程帧使用)
  • 临时变量(在运行时,供协程帧使用,它的生命周期是整个协程帧的生命周期)

coroutine_state 参数还可以细分成这两种情况。

  • 值类型的参数会被移动或拷贝。
  • 引用类型的参数会拷贝引用,当被引用的内存被释放了,那么协程状态中的引用会变成一个野引用。

这三个成员里,coroutine_state 是比较特殊的抽象,是支持协程运行时的。

promise_type

对于 promise_type,它是实现协程的最关键一环。开发者需要自己来实现它,代码如下所示:

template<typename T>
struct promise;

template<typename T>
struct Generator : std::coroutine_handle<promise<T>> {
  using promise_type = promise<T>;
};

template<typename T>
struct promise {
  T _value; // 待计算的值
  std::exception_ptr _exception; // 待抛出的异常

  template<typename Ty>
  promise(Ty&& lambdaObj, T value) : _value(value) {}
  promise(T value) : _value(value) {}
  promise() {}

  Generator<T> get_return_object() { return { Generator<T>::from_promise(*this) }; }
  std::suspend_always initial_suspend() noexcept { return {}; }
  std::suspend_always final_suspend() noexcept { return {}; }
  // optional,但co_yield需要这一函数实现
  std::suspend_always yield_value(T value) {
      _value = value;
      return {};
  }
  // optional,但co_return需要这一函数实现或return_void
  std::suspend_always return_value(T value) {
      _value = value;
      return {};
  }
  void return_void() {}
  void unhandled_exception() { _exception = std::current_exception(); }
};

具体的调用流程和需要实现的函数如下:

coroutine_handle

看看标准库提供的 coroutine_handle 这一模版类的定义:

template<class Promise>
struct coroutine_handle {
  // 构造函数和赋值函数
  constexpr coroutine_handle() noexcept;
  constexpr coroutine_handle(nullptr_t) noexcept;
  static coroutine_handle from_promise(Promise&);
  coroutine_handle& operator=(nullptr_t) noexcept;

  // 导入和导出
  constexpr void* address() const noexcept; // 获取coroutine_handle内部数据的指针
  static constexpr coroutine_handle from_address(void* addr); // 将内部数据指针转换为对应的coroutine_handle对象,会创建一个新对象

  // 转换函数
  constexpr operator coroutine_handle<void>() const noexcept;

  // 查询协程状态
  constexpr explicit operator bool() const noexcept; // 用于确定coroutine_handle是否有效
  bool done() const; // 用于确定协程是否已经执行完成

  // 控制协程执行
  void operator()() const; // 行为同resume,用于唤醒协程
  void resume() const; // 用于唤醒协程
  void destroy() const; // 用于销毁协程

  // 访问Promise对象
  Promise& promise() const;

private:
  void* ptr;  // exposition only 
};

template<>
struct coroutine_handle<void> {
  // 构造函数和赋值函数
  constexpr coroutine_handle() noexcept;
  constexpr coroutine_handle(nullptr_t) noexcept;
  coroutine_handle& operator=(nullptr_t) noexcept;

  // 导入和导出
  constexpr void* address() const noexcept;
  static constexpr coroutine_handle from_address(void* addr);

  // 查询协程状态
  constexpr explicit operator bool() const noexcept;
  bool done() const;

  // 恢复协程执行
  void operator()() const;
  void resume() const;
  void destroy() const;

private:
  void* ptr;  // exposition only
};

相比于 void 类型的特化版本,如果开发者指定了 promise 类型,那么就会用通用版本的 coroutine_handle,这个类多了以下几个成员函数。

  • from_promise:获取 promise 对应的 coroutine_handle。实际行为会根据 promise 创建新的 coroutine_handle 对象。
  • operator coroutine_handle<>:将 promise 版本的 coroutine_handle 转换为 void 类型的 coroutine_handle。
  • promise:获取 coroutine_handle 内部的 promise 对象。

两个版本最后一行用“exposition only”标识出来的部分,就是 coroutine_handle 的内部存储内容,这部分只是为了说明标准做的示例,实际不同编译器可以根据自己的需求定义这里的实现。

协程调度

除了定义外,还需要学习如何对协程进行调度,包括协程休眠、控制权转移和待计算值的传递。对协程进行调度的关键在于 co_await 和 co_yield 操作符(关键字)。

co_await

co_await 是协程中必须要了解的与编译器的约定。只有了解它,我们才能知道如何通过 co_await 灵活处理线程的休眠与唤醒。而搞清楚 co_await 操作符的具体行为表现,是我们理解 Awaitable 的重点。co_await 操作符用于休眠协程,并将控制权返还给协程调用者,用法如下:

co_await 表达式;

与此同时,co_await 的表达式需要满足下列两个条件之一。

  • 表达式类型必须定义了 co_await 操作符重载。
  • 可以通过当前协程的 Promise::await_transform 转换为定义了 co_await 操作符的类型。

co_await 只能出现在函数体的执行表达式中,不能出现在异常处理、声明语句、简单声明表达式、默认参数和带 static 和 thread_local 的局部变量定义中。另外,co_await 的执行过程较为复杂,其中涉及到两个类型。

  • Awaitable:用于获取 Awaiter 对象。
  • Awaiter:用于控制实际的休眠操作细节。

我们需要将其分为编译时和运行时两个阶段来理解。先看编译时:

编译时,编译器通过以下方式,将表达式转换成 Awaitable 对象。

  • 如果表达式是通过初始休眠点、结束休眠点或 yield 产生的,那么表达式本身就是 Awaitable 对象。
  • 否则,如果当前协程的 promise 中包含 await_transform 函数,那么就会调用 promise.await_transform 将表达式转换为 Awaitable 对象。
  • 否则,表达式本身就是 Awaitable 对象。

接着,编译器就会通过以下操作获取 Awaiter 对象。

  • 如果 Awaitable 类型包含 co_await 操作符重载,那么就会将 co_await 重载的执行结果作为 Awaiter。
  • 如果没有找到 co_await 操作符重载,那么就会将 Awaitable 对象自身作为 Awaiter 对象。

再了解一下 co_await 在运行时的执行过程:

在运行时,代码会调用 Awaiter 对象的 await_ready 函数,如果该函数返回值为 false,那么就会执行以下行为:首先,将协程休眠;然后,使用当前协程的句柄,调用 Awaiter 对象的 await_suspend(handle) 函数。

我们可以在 await_suspend 中通过句柄获取到当前协程的各种信息,在自己编写的调度器中选择何时唤醒或者销毁这个协程。

在唤醒协程时,会调用 Awaiter 的 await_resume 函数,并使用该函数的返回值作为 co_await 表达式的值。其中,await_resume 函数的执行,会根据 await_ready 和 await_suspend 的执行结果有所不同。

这里可能会有一些关于多线程上执行协程的疑问。如果协程的执行涉及在不同线程上执行,会有线程安全问题吗?答案其实是不会。协程在进入 await_suspend 之前会休眠,因此 await_suspend 函数可以将协程句柄传递给任意的线程,而不需要考虑额外的线程同步问题。

举例来说,通过协程处理异步任务,await_suspend 函数是某个 Awaiter 类的成员函数,其 this 指针指向 Awaiter 对象。

我们将句柄存储在一个回调函数中(如图中的 lambda 表达式)。然后,在预先设定的线程池中完成异步任务。最后,调用回调函数利用协程句柄调度唤醒协程。

在这种情况下,代码块 2 依然会在本线程继续执行,回调函数中的代码则会在其他线程执行。由于其他线程的调度时序是未知的,因此本线程在执行代码块 2 时,协程可能已经被其他线程唤醒。这种多线程同时访问同一内存块上数据的情况,我们通常称为数据竞争问题。

为了避免出现这种数据竞争问题,将协程句柄传递给其他线程后,await_suspend 后续代码(示例中代码块 2)必须假定 *this(也就是调用 await_suspend 的 Awaiter 对象)已经被销毁,并且再也不对其进行访问。

这是一种典型的使用异步 I/O 处理的场景。

生成器和 co_yield

除了 co_await,在协程的上下文中还有一个常见操作符(关键字)——co_yield,它本质上是 co_await 的语法糖,一般用在生成器这种协程的常见场景中。那么,什么是生成器呢?

生成器是基于协程的异步编程中常见的一种编程模式。最常见的应用场景就是,通过生成或其他数据源来获取某种序列。

生成器的核心思路是让协程的调用者和被调用的协程进行协同调度,其中被调用的协程就是生成器。

这个协同调度过程是这样的:首先,调用者唤醒生成器,生成器返回一个值,接着就会主动进入休眠状态;然后,调用者使用这个值来执行相应代码逻辑,然后重新唤醒生成器……这个过程如此往复,直到调用者从生成器获取了所需的值为止。

从图中可以看出,在生成器这种模式下,主要就是两个操作。

  • 调用者作为调度方恢复协程执行。
  • 协程将获取或生成的值返回给调用者并继续休眠,等待调用者恢复执行。

而其中的关键就是 co_yield 关键字,用法是这样。

co_yield 表达式;

协程可以通过该关键字将表达式的结果传回给调用方并自动休眠。代码等价于:

co_await promise.yield_value(表达式);

可以看出,调用 co_yield 的本质就是调用了 promise 的 yield_value 函数,并通过 co_await 将自身休眠。

#include <coroutine>
#include <iostream>
#include <cstdint>

struct CountGenerator {
    struct promise_type {
        int32_t _value{ 0 };

        ~promise_type() {
            std::cout << "promise_type 对象销毁" << std::endl;
        }
        CountGenerator get_return_object() {
            return {
              ._handle = std::coroutine_handle<promise_type>::from_promise(*this)
            };
        }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception() {}
        std::suspend_always yield_value(int32_t value) {
            _value = value;
            return {};
        }
        void return_void() {}
    };

    std::coroutine_handle<promise_type> _handle;
};

CountGenerator doCount() {
    for (int32_t i = 0; i < 3; ++i) {
        co_yield i;
    }
}

int main() {
    auto h = doCount()._handle;
    auto& promise = h.promise();
    while (!h.done()) {
        std::cout << "计数: " << promise._value << std::endl;
        h();
    }
    h.destroy();

    return 0;
}

这里定义了一个协程类 CountGenerator,比较特殊的是,这个类定义了一个 yield_value 成员函数,用于把 co_yield 的表达式值存储到 promise 对象内部,调用者通过这一方式来获取值。

由于调用方不知道协程什么时候结束执行,所以通过 coroutine_handle 中的 done 函数获取运行时状态。如果协程尚未结束执行,就获取相应的值并继续,否则就销毁协程并退出程序。

示例

#include <coroutine>
#include <iostream>
#include <thread>

namespace Coroutine {
  struct task {
    struct promise_type {
      promise_type() {
        std::cout << "1.create promie object\n";
      }
      task get_return_object() {
        std::cout << "2.create coroutine return object, and the coroutine is created now\n";
        return {std::coroutine_handle<task::promise_type>::from_promise(*this)};
      }
      std::suspend_never initial_suspend() {
        std::cout << "3.do you want to susupend the current coroutine?\n";
        std::cout << "4.don't suspend because return std::suspend_never, so continue to execute coroutine body\n";
        return {};
      }
      std::suspend_never final_suspend() noexcept {
        std::cout << "13.coroutine body finished, do you want to susupend the current coroutine?\n";
        std::cout << "14.don't suspend because return std::suspend_never, and the continue will be automatically destroyed, bye\n";
        return {};
      }
      void return_void() {
        std::cout << "12.coroutine don't return value, so return_void is called\n";
      }
      void unhandled_exception() {}
    };

    std::coroutine_handle<task::promise_type> handle_;
  };

  struct awaiter {
    bool await_ready() {
      std::cout << "6.do you want to suspend current coroutine?\n";
      std::cout << "7.yes, suspend becase awaiter.await_ready() return false\n";
      return false;
    }
    void await_suspend(
      std::coroutine_handle<task::promise_type> handle) {
      std::cout << "8.execute awaiter.await_suspend()\n";
      std::thread([handle]() mutable { handle(); }).detach();
      std::cout << "9.a new thread lauched, and will return back to caller\n";
    }
    void await_resume() {}
  };

  task test() {
    std::cout << "5.begin to execute coroutine body, the thread id=" << std::this_thread::get_id() << "\n";//#1
    co_await awaiter{};
    std::cout << "11.coroutine resumed, continue execcute coroutine body now, the thread id=" << std::this_thread::get_id() << "\n";//#3
  }
}// namespace Coroutine

int main() {
  Coroutine::test();
  std::cout << "10.come back to caller becuase of co_await awaiter\n";
  std::this_thread::sleep_for(std::chrono::seconds(1));

  return 0;
}

输出:

1.create promie object
2.create coroutine return object, and the coroutine is created now
3.do you want to susupend the current coroutine?
4.don't suspend because return std::suspend_never, so continue to execute coroutine body
5.begin to execute coroutine body, the thread id=0x10e1c1dc0
6.do you want to suspend current coroutine?
7.yes, suspend becase awaiter.await_ready() return false
8.execute awaiter.await_suspend()
9.a new thread lauched, and will return back to caller
10.come back to caller becuase of co_await awaiter
11.coroutine resumed, continue execcute coroutine body now, the thread id=0x700001dc7000
12.coroutine don't return value, so return_void is called
13.coroutine body finished, do you want to susupend the current coroutine?
14.don't suspend because return std::suspend_never, and the continue will be automatically destroyed, bye

小结

协程是彻底迈向现代编程语言的关键标志之一,一个协程类(Generator 类)包含 promise_type、coroutine_handle、coroutine_state。

但是 C++20 的协程缺乏具体实现,接口约定都需要开发者来实现,换句话说目前只提供了跟编译器沟通的协议,相信以后的标准肯定会支持得更加完美。

// ============= Promise的Concept定义 ===================
// PromiseType是Promise的类型,ValueType是协程中待计算的值的类型
template<typename PromiseType, typename ValueType>
concept Promise = requires(PromiseType promise) {
  { promise.get_return_object() } -> Coroutine<PromiseType>;
  { promise.initial_suspend() } -> Awaiter;
  { promise.final_suspend() } -> Awaiter;

  requires (requires(ValueType value) { promise.return_value(value); } || { promise.return_void(); })
  { promise.unhandled_exception() };
};

// ============= Awaiter的Concept定义 ===================
// AwaitSuspendResult约束了await_suspend的返回值类型
// AwaiterType是Awaiter的类型,Promise是协程的Promise类型,下同
template <typename ResultType, typename Promise>
concept AwaitSuspendResult = std::same_as<ResultType, void> ||
  std::same_as<ResultType, bool> ||
  std::same_as<ResultType, std::coroutine_handle<Promise>>;

// Awaiter约束定义,Awaiter类型必须满足requires中的所有接口约定
template <typename AwaiterType, typename Promise>
concept Awaiter = requires(AwaiterType awaiter, std::coroutine_handle<Promise> h) {
    awaiter.await_resume();
    { awaiter.await_ready() } -> std::same_as<bool>;
    { awaiter.await_suspend(h) } -> AwaitSuspendResult<Promise>;
};

// ============= Awaitable的Concept定义 ===================
// ValidCoAwait约束用于判断对于AwaitableType是否存在正确的co_await操作符重载
// co_await可以重载为成员函数或者非成员函数,约束中都需要判断
// AwaitableType是Awaitable的类型,Promise是协程的Promise类型,下同
template <typename AwaitableType, typename Promise>
concept ValidCoAwait = requires(AwaitableType awaitable) {
    { awaitable.operator co_await() } -> Awaiter<Promise>;
} || requires(AwaitableType awaitable) {
    { operator co_await(static_cast<AwaitableType&&>(awaitable)) } -> Awaiter<Promise>;
};

// Awaitable约束定义
// Awaitable必须存在正确的co_await操作符重载,或者自身是一个Awaiter
template <typename AwaitableType, typename Promise>
concept Awaitable = ValidCoAwait<AwaitableType, Promise> ||
  Awaiter<AwaitableType, Promise>;

评论
  目录