C++ 协程的缺点


这篇博文旨在强调将代码库转向协程(例程)所带来的一些风险,我相信持续不良的协程使用可能会导致更不安全和更慢的程序。
即使没有多线程,协程也应该像编写多线程代码一样受到怀疑,它仍然是异步的。

普通函数和堆栈如何工作
当调用普通函数时,参数通常会使用寄存器传递到函数中,如果有足够的参数,它可能会使用堆栈传递参数,确切的寄存器和堆栈用法是在应用程序二进制接口 (ABI) 中定义的。

在函数内部,发生的第一件事是函数保留足够的堆栈空间来存储局部变量和临时变量,并非所有这些都会放置在堆栈上,有些可能位于寄存器中或完全优化掉。

最后,在函数结束时,最初保留的堆栈空间将重置为函数第一次调用发生时最初的位置,然后返回给调用者。

许多细节因不同的 ABI 而异,例如,有些可能具有易失性和非易失性寄存器,如果需要,可能需要保存和恢复这些寄存器。

存储这些参数、局部变量和临时变量的堆栈空间只是一个普通的内存块,通常在线程启动时保留,每个线程都有自己唯一的堆栈。

由于线程内的每个函数调用都大量使用它,因此 L1/L2 缓存中的温度通常相当高,因此使用起来非常快。

C++ 协程函数如何工作
就像使用寄存器和堆栈传递普通函数参数一样,协程使用与之前指定的相同的 ABI,但是代码的不同却有很大不同。

与普通函数不同,协程函数可以挂起,调用函数可以继续并稍后调用其他函数来恢复协程,甚至可以在不同的线程上恢复它。

由于局部变量、临时变量和参数不能存储在堆栈上,而必须存储在其他位置,这是通过将它们放在堆上来完成的。

编译器如何翻译代码的示例

原始协程代码:

task<void> func()
{
  int mylocal = 10;
  co_return;
}

编译器生成的代码:

struct func_frame
{
  task<void>::promise_type __promise;
  int __step = 0;

  decltype(__promise.initial_suspend()) __initial_suspend_awaiter;
  decltype(__promise.final_suspend()) __final_suspend_awaiter;

  struct 
  {
    // local and temp variables here
    int mylocal;
  } local_and_temps;

  void resume()
  {
    switch(__step)
    {
      case 0:
       
// co_await __promise.initial_suspend();
        __initial_suspend_awaiter = _promise.initial_suspend();
        if (!__initial_suspend_awaiter.await_ready())
        {
          __step = 1;
          __initial_suspend_awaiter.await_suspend();
          return;
        }
      case 1:
        __initial_suspend_awaiter.await_resume();
       
// .. func body
        mylocal = 10;
       
// co_return
        __promise.return_void();
       
// co_await __promise.final_suspend();
        __final_suspend_awaiter = _promise.final_suspend();
        if (!__final_suspend_awaiter.await_ready())
        {
          __final_suspend_awaiter.await_suspend();
          return;
        }
        delete this;
    }
  }
};

task<void> func()
{
  func_frame * frame = task<void>::promise_type::operator new(func_frame);
  task<void> ret = frame->__promise.get_return_object()
  frame->resume();
  return ret;
}

虽然这并不完全是它的工作原理,但希望能帮助你想象一下幕后的情况,即有一个隐藏的结构/内存块来存储 coroutine 的状态。

在这个小示例中,mylocal 可能会被优化掉,但在真正的例行程序中,如果局部变量或临时变量需要跨越恢复边界,就需要将其存储在 local_and_temps 中,而且所有这些变量都将就地初始化,但这只是示例代码。

通过 HALO(coroutine Heap Allocation eLision Optimization,堆分配优化),当编译器可以确定 coroutine 的确切生命周期并知道它的框架布局时,它可能会用 func_frame 的副本代替 func 的主体,作为调用者(它本身可能是一个普通函数或 coroutine)的局部变量。

有些人错误地认为,分配可以在第一个暂停点按需进行,但这是不可能的,因为这需要重新定位所有局部变量和临时变量以及对它们的引用,而编译器无法跨越 ABI 边界对它们进行跟踪。

我喜欢的看法是,协程(例程)只是将函数转化为一个结构体,其中包含局部和临时函数,然后再转化为一个恢复函数,分步执行函数。

你不能“像同步代码一样编写异步代码”
写异步代码就像写同步代码一样 "这句话绝对糟糕透顶,它给人一种虚假的安全感,任何写过大量异步代码的人都知道,事情没那么简单。

你可能会遇到一系列全新的错误,但你往往不会去想这些错误,因为你会忽略悬浮点,因为它们很容易编写,你只需输入 co_await,它就会被处理。

安全性:参数生命周期
说到普通函数,大多数人都习惯于这样的规则:如果传递成本低,则通过值传递;如果传递成本高,则使用常量引用;如果传递所有权,则通过值或 rvalue 引用传递。

这些规则已经深入许多开发人员的大脑,你可能想都不用想就会去做,但如果你在使用例程函数时也遵循这些规则,事情就会开始出错。

让我们来看看下面的情况:

task<void> async_insert(T && val);
task<void> async_find(const T &);
task<void> async_write(span<byte>);

您会在代码审查中发现这些吗?我知道我有可能会想念他们。

值得庆幸的是,使用 Promise 类型,您可以捕获参数类型,这将允许您创建静态断言列表并检测其中的一些情况。

安全性:迭代器和指针失效
因为函数可以暂停任何可能已更改的全局状态,所以看一下以下内容

task<void> send_all(string s)
{
  for (auto & source : m_sources)
  {
    co_await source.send(s);
  }
}

大多数人在编写这段代码时可能只考虑到发送使 m_sources 无效的代码,除非你有一些糟糕的耦合,否则这种情况可能不会发生。

现在试着想想如何解决这个问题,你会将所有发送排队然后等待吗?

task<void> send_all(string s)
{
  std::vector<task<void>> sends;
  sends.reserve(m_sources.size());
  for (auto & source : m_sources)
  {
    sends.push_back(source->send(s));
  }

  co_await wait_all( sends.begin(), sends.end() );
}

一开始看起来不错,对吧?但是,如果在 send_all 过程中有的源被删除了,怎么办?如何来保证所有发送源是都活着呢?

要做到这一点,可以通过让源发送来保持自己活着:

struct Source : std::enable_shared_from_this<Source>
{};

task<void> Source::send(string s)
{
  auto pSelf = this->shared_from_this();
  ...
}

或者在调用者中执行此操作

task<void> send_all(string s)
{
  std::vector<task<void>> sends;
  std::vector<shared_ptr<Source>> sources = m_sources;
  sends.reserve(sources.size());
  for (auto & source : sources)
  {
    sends.push_back(source->send(s));
  }

  co_await wait_all( sends.begin(), sends.end() );
}

另一种方法是跟踪未履行的promises 。

struct Source
{
  std::vector<std::coroutine_handle> dependent_coroutines;
  ~Source()
  {
    for (auto & coroutine : dependent_coroutines)
      coro.destroy();
  }

  task<void> send(string s)
  {
    auto corohandle = co_await get_current_coroutine{};
    dependent_coroutines.push_back( corohandle );
    ...
    dependent_coroutines.erase( dependent_coroutines.find( corohandle ) );
  }
}

还有一种方法是锁定和解锁数据。

task<void> send_all(string s)
{
  co_await m_sourcesLock.read_lock();
  std::vector<task<void>> sends;
  sends.reserve(m_sources.size());
  for (auto & source : m_sources)
  {
    sends.push_back(source->send(s));
  }

  co_await wait_all( sends.begin(), sends.end() );
  co_await m_sourcesLock.read_unlock();
}

有很多方法可以解决这个问题,每种方法都有自己的优点和缺点。

安全性:急切启动协程与惰性启动协程
对于无堆栈协程,当主体第一次开始执行时,通常会采取两种方法:

  1. eager 立即开始执行函数,直到第一个挂起点才停止;
  2. lazy 开始挂起,直到第一次恢复才开始。

这两种方法通常是根据调度决策来确定的,其中惰性协程可以更轻松地在不同线程上从开始到结束调度整个协程。

大多数主要的协程库都使用惰性启动协程,有一个提议的 Boost.Async 具有急切启动协程。

如果您知道函数始终仅与模式一起使用

co_await fetch_data("key");

您可能会认为它key是由调用者保持活动状态的,因为co_await您可以立即摆脱此实现。

eager_task<string> fetch_data(string_view key)
{
  auto it = cache.find(key)
  if (it != cache.end())
  {
    return it->second;
  }

  auto data = co_await fetch_remote(key);
  cache.emplace(key, data);
}

一两年后的某个时候,有人补充道

eager_task<void> fetch_mydata(string_view key)
{
  return fetch_data(std::format("mysystem/{}", key));
}

你运行你的测试,它工作正常,它开始得到广泛的使用,突然你开始看到fetch_data随机崩溃(没有key,因为它的数据消失了)

如果你幸运地发现了这个 fetch_mydata,但它可能在另一个系统中完全独立的地方,那么你就需要发现它没有执行 co_return co_await,而这又是大多数人没有想到的。

如果你有一个懒惰的例行程序,那么它就会经常出现故障,并希望能通过一些工具(如地址消毒器或其他任何可以检测免用程序的工具)来发现。

安全性:测试暂停点
从我提到的所有与安全相关的例子中,你已经看到函数的暂停和恢复会带来很多随机的错误。

这就是为什么在您的例行程序中定期测试挂起点变得非常重要的原因,而要做到这一点并不容易,因为等待程序的调用者并不能决定它是否挂起。

这意味着您需要在每个 awaitable 中构建支持,或者开始使用 await_transform,而这两者都不容易,尤其是当您想通过依赖注入来添加这种行为时。

性能:内存分配
当调用一个尚未取消分配的例程函数时,它将为例程框架/状态分配内存。

此外,由于试图克服参数生命周期问题和迭代器/指针失效问题,你可以获得更多的内存分配。

你可能希望像 glibc 中的线程本地 tcache(它有 7 个大小相等的分配,存储在 64 个 24-1024 字节不等的分区中)那样,能让这些分配变得更快。

但是,如果回过头来看 send_all 函数,该函数使用字符串,那么它调用 send 的每一个源都会有另外两个分配(例程帧和字符串参数),每个分配的大小都相同,因此如果有 7 个源,就一定会用完 tcache,如果字符串大小的例程帧都放在同一个桶中,那么只需要 4 个就能用完一个 tcache 桶。

性能:HALO 堆栈膨胀
使用 HALO(协程堆分配 eLision Optimization)协程帧不需要放置在堆上,可以是本地的(在堆栈上或调用协程的一部分)

如果你没有太多巨大的堆栈框架,这看起来似乎很容易,但如果你设法将很多东西组合在一起,那么最终可能会产生一些非常大的框架。

例如以下

task<void> post_metric(string name, int value)
{
  char sendbuffer[16*1024];
  ...
}

task<void> post_metrics()
{
  std::array metrics { 
    post_metric("a", ...);
    post_metric(
"b", ...);
    post_metric(
"c", ...);
    ...
  };
  co_await when_all(begin(metrics), end(metrics));
}

每个请求至少需要 16kb,post_metric并且可能会post_metrics同时进行多个并行调用,因此您可能会得到更大的数据。

现在我们还考虑以下问题

task<void> func()
{
  if (cond)
  {
    co_await func1()
  }
  else
  {
    co_await func2();
    co_await post_metrics();
  }
}

在这种情况下,func 也可以包括 post_metrics 的大小,即使在 cond 为真的情况下,这也是递归的,因此 func1 和 func2 都可以计入 func 的大小。

此外,优化可能会根据基本块概率来决定对哪些分支执行 HALO,这可能意味着一个编译器会对一个分支执行 HALO,而另一个编译器则会对另一个分支执行 HALO,从而导致性能非常不一致。

性能:调试构建
即使是最基本的协程,也有很多内部函数调用。

task<void> func()
{
  co_return;
}
func()
promise_type::operator new()
promise_type::promise_type()
promise_type::get_return_value()
promise_type::initial_suspend()
initial_suspend::initial_suspend()
initial_suspend::await_ready()
initial_suspend::await_suspend() [opt]
initial_suspend::await_resume()
promise_type::return_void()
final_suspend::final_suspend()
final_suspend::await_ready()
final_suspend::await_suspend() [opt]
promise_type::operator delete()

因此,现在一个函数实际上是 13 个函数,在幕后优化,其中许多函数将被内联,这样您就不会产生这些函数调用开销。

但是,当您在调试版本中运行时呢?那么所有这些功能最终都可能以未优化的状态存在。

设计:无堆栈协程的病毒式传播
C++ 协程是无堆栈的,这意味着挂起仅发生在单个函数上,而不是整个调用堆栈上。

因为您通常希望允许挂起传播到调用堆栈以允许更多函数并行执行操作,或者因为它希望传递到调度程序中,所以您必须使调用堆栈上的所有调用者也进入协程。

这种情况可能发生在越来越广泛的范围内,所需要的只是一些深度嵌套的函数调用,这些函数调用希望成为协程,以便它能够快速传播到大部分代码库。

避免这种情况的一种方法是更愿意实际等待,而不是进一步挂起调用树,但这可能会限制并行性。

替代方案:堆栈协程如何堆叠
堆栈协程也称为纤程,它提供用户模式线程机制,需要预先生成新线程,然后像正常一样继续调用其他函数,也可以在纤程之间切换。

使用堆栈协程,您仍然可能会遇到一些类似的安全问题,但是生命周期问题的范围会缩小,因为您有一个实际的堆栈。

性能问题有很大不同,您不再有每个函数调用的开销可能是另一个内存分配或一堆额外的函数,但是您错过了优化器在一切都可以完全完成时将事物一路推送到纳米协程的可能性内联。

堆栈协程本质上不是病毒式的,因此如果某些东西想要在协程之间切换,则可以在不需要整个调用堆栈接受该切换的情况下完成。另一方面,这意味着有人可能会在您不知情的情况下暂停您的函数。