Python中异步模式

Python 的 asyncio 库专为高效异步编程而设计:

import asyncio

async def mock_api_request(i):
    print(f"API request started {i}")
    await asyncio.sleep(1)  # 这可能是一个 API 调用,或其他一些 IO 绑定任务
    print(f
"API request completed {i}")

async def run():
    for i in range(1_000_000):
        await mock_api_request(i)

asyncio.run(run())

通过使用 async def 定义函数并使用 await 和 asyncio.sleep(1),我们创建了可以暂停和恢复的 coroutines,允许程序执行其他任务,而不是空闲等待。

这种方法尤其适用于 IO 绑定任务,如 API 调用,在这种情况下,事件循环可以并发管理多个任务,而不会阻塞。

我们调用 asyncio.run(run()),将我们的主函数启动到事件循环中。这正是 asyncio 的优势所在,它可以无缝地协调众多 coroutines 的执行。

让我们尝试一下 asyncio 中的常见模式。让我们创建一堆作业,将它们放在事件循环中,等待它们完成。

内存饥饿事件循环

# ... mock_api_request

async def run():
    tasks = []
    for i in range(1_000_000):
        tasks.append(asyncio.create_task(mock_api_request(i)))
    await asyncio.wait(tasks)

# ... everything else

  1. 我们从 mock_api_request 例程中创建一个任务。
  2. 我们将所有任务添加到列表任务中。(如果有一百万个项目,可能会耗尽内存)
  3. 如果内存没有耗尽,Python 将在调用 await asyncio.wait(tasks) 时开始工作。
  4. 每个任务都会进入事件循环。事件循环抓取一个任务并开始工作。
  5. Python 将同步运行任务,直到看到 await 关键字。
  6. 一旦看到 await 关键字,事件循环就会放弃,并切换到队列中的下一个任务。
  7. 重复第 5 步和第 6 步,偶尔检查等待的任务是否完成。
  8. 当等待的函数完成 IO 工作后,它将继续执行,直到下一个等待或函数结束。
  9. 这样一直持续到列表中的每个项目都完成为止。

酷,这就是事件循环的大致作用。

那么,我们如何加快同步-异步程序的运行速度呢?

Batching

# ... mock_api_request

async def run():
    tasks = []
    batch_size = 100
    for i in range(1_000_000):
        tasks.append(asyncio.create_task(mock_api_request(i)))
        if len(tasks) >= batch_size:
            await asyncio.wait(tasks)
            tasks = []
    
    if tasks:  # 如果还有剩余任务,请等待它们完成
        await asyncio.wait(tasks)

# ... everything else

至少在内存使用方面,这要好得多。不过,速度不一定比memory-inefficient的版本快。

你会注意到,在分批运行时,每次等待 asyncio.wait(tasks),你都必须等最后一个任务完成后才能开始下一批。这可不行。

生产者/消费者模式
在下一次迭代中,我们将引入 asyncio.Queue()。

# ... mock_api_request

async def producer(queue: asyncio.Queue):
    for i in range(1_000_000):
        await queue.put(i)

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        await mock_api_request(item)

async def run():
    queue = asyncio.Queue(maxsize=100)
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))
    await asyncio.wait([producer_task, consumer_task])


当我们运行这个程序时,我们会注意到事情或多或少又恢复了同步。现在的情况是,我们通过生产者将所有项目放入队列,但消费者每次只抓取队列中的一个项目。

让我们把生产者/消费者模式与之前的批处理版本结合起来。

多个消费者
让我们再次修改运行函数。

async def run():
    queue = asyncio.Queue(maxsize=100)
    number_of_consumers = 10
    producer_task = asyncio.create_task(producer(queue))
    consumers = []
    for _ in range(number_of_consumers):
        consumer_task = asyncio.create_task(consumer(queue))
        consumers.append(consumer_task)
    await asyncio.wait([producer_task, *consumers])

现在我们创建一个列表,并将其分配给变量消费者。然后,我们创建 10 个 asyncio 任务,等待生产者和所有 10 个消费者完成工作。现在,只要你运行它,消费者就会随时从队列中拉出,你就能获得源源不断的工作!

当生产者完成工作,队列为空时,消费者就会挂起,等待永远不会到来的任务。进程永远不会停止。如果你的生产者连接的是一个永远不会结束的源,那么这种情况可能还不错。为此,我们需要某种方法来告诉消费者退出。

使用 `None` 发送信号

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        await mock_api_request(item)


我们在消费者中添加了一个条件,用于检查队列中是否传递了 None。我们可以将此作为退出的信号,因为通常我们可以假设项目不应该是 None。

我们还应该更新生产者以发送此信号。

async def producer(queue: asyncio.Queue, number_of_consumers: int):
    for i in range(1_000_000):
        await queue.put(i)
    for _ in range(number_of_consumers):
        await queue.put(None)

请注意我们是如何在生产者的函数签名中添加 number_of_consumers 的。我们需要在队列中放入与 Worker 数量相同的 None,因为每个 None 都需要自己的信号。只要记住在运行方法中更新生产者任务即可。

producer_task = asyncio.create_task(producer(queue, number_of_consumers))

有时,"None  "是一个有效值,或者您需要更复杂的信号。Asyncio 可以满足您的需求。

Asyncio 事件
让我们添加一个 asyncio.Event() 变量,用作我们的信号。

async def run():
    queue = asyncio.Queue(maxsize=100)
    stop_event = asyncio.Event()
    producer_task = asyncio.create_task(producer(queue, stop_event))
    consumers = []
    number_of_consumers = 10
    for _ in range(number_of_consumers):
        consumer_task = asyncio.create_task(consumer(queue, stop_event))
        consumers.append(consumer_task)
    await asyncio.wait([producer_task, *consumers])

我们还需要更新消费者和生产者。

async def producer(queue: asyncio.Queue, stop_event: asyncio.Event):
    for i in range(1_000_000):
        await queue.put(i)
    stop_event.set()


async def consumer(queue: asyncio.Queue, stop_event: asyncio.Event):
    while True:
        if queue.empty() and stop_event.is_set():
            break
        item = await queue.get()
        await mock_api_request(item)

如果您使用 number_of_consumers 在队列中放入了大量 None 值,则需要从生产者的函数签名中删除该值。

在消费者中,重要的是 if stop event.is_set() 和 queue.empty():。这将确保在实际退出之前首先完成所有任务。这可能会根据你的需要而有所不同。例如,您可能需要一个单独的 asyncio.Event 用于在不耗尽队列的情况下终止函数。

总结
现在您应该掌握了在 Python 中优化和提高 IO 绑定调用吞吐量的优化路径和各种模式。

您还可以做一些事情来提高速度。例如,您可以改变消费者的数量。您还可以将 "队列 "部分卸载到专用队列(如 RabbitMQ 或 NATS),这样您就可以扩展和/或分离生产者和消费者。