Python中异步模式
Python 的 asyncio 库专为高效异步编程而设计:
import asyncio |
通过使用 async def 定义函数并使用 await 和 asyncio.sleep(1),我们创建了可以暂停和恢复的 coroutines,允许程序执行其他任务,而不是空闲等待。
这种方法尤其适用于 IO 绑定任务,如 API 调用,在这种情况下,事件循环可以并发管理多个任务,而不会阻塞。
我们调用 asyncio.run(run()),将我们的主函数启动到事件循环中。这正是 asyncio 的优势所在,它可以无缝地协调众多 coroutines 的执行。
让我们尝试一下 asyncio 中的常见模式。让我们创建一堆作业,将它们放在事件循环中,等待它们完成。
内存饥饿事件循环
# ... mock_api_request |
- 我们从 mock_api_request 例程中创建一个任务。
- 我们将所有任务添加到列表任务中。(如果有一百万个项目,可能会耗尽内存)
- 如果内存没有耗尽,Python 将在调用 await asyncio.wait(tasks) 时开始工作。
- 每个任务都会进入事件循环。事件循环抓取一个任务并开始工作。
- Python 将同步运行任务,直到看到 await 关键字。
- 一旦看到 await 关键字,事件循环就会放弃,并切换到队列中的下一个任务。
- 重复第 5 步和第 6 步,偶尔检查等待的任务是否完成。
- 当等待的函数完成 IO 工作后,它将继续执行,直到下一个等待或函数结束。
- 这样一直持续到列表中的每个项目都完成为止。
酷,这就是事件循环的大致作用。
那么,我们如何加快同步-异步程序的运行速度呢?
Batching
# ... mock_api_request |
至少在内存使用方面,这要好得多。不过,速度不一定比memory-inefficient的版本快。
你会注意到,在分批运行时,每次等待 asyncio.wait(tasks),你都必须等最后一个任务完成后才能开始下一批。这可不行。
生产者/消费者模式
在下一次迭代中,我们将引入 asyncio.Queue()。
# ... mock_api_request |
当我们运行这个程序时,我们会注意到事情或多或少又恢复了同步。现在的情况是,我们通过生产者将所有项目放入队列,但消费者每次只抓取队列中的一个项目。
让我们把生产者/消费者模式与之前的批处理版本结合起来。
多个消费者
让我们再次修改运行函数。
async def run(): |
现在我们创建一个列表,并将其分配给变量消费者。然后,我们创建 10 个 asyncio 任务,等待生产者和所有 10 个消费者完成工作。现在,只要你运行它,消费者就会随时从队列中拉出,你就能获得源源不断的工作!
当生产者完成工作,队列为空时,消费者就会挂起,等待永远不会到来的任务。进程永远不会停止。如果你的生产者连接的是一个永远不会结束的源,那么这种情况可能还不错。为此,我们需要某种方法来告诉消费者退出。
使用 `None` 发送信号
async def consumer(queue: asyncio.Queue): |
我们在消费者中添加了一个条件,用于检查队列中是否传递了 None。我们可以将此作为退出的信号,因为通常我们可以假设项目不应该是 None。
我们还应该更新生产者以发送此信号。
async def producer(queue: asyncio.Queue, number_of_consumers: int): |
请注意我们是如何在生产者的函数签名中添加 number_of_consumers 的。我们需要在队列中放入与 Worker 数量相同的 None,因为每个 None 都需要自己的信号。只要记住在运行方法中更新生产者任务即可。
producer_task = asyncio.create_task(producer(queue, number_of_consumers))
有时,"None "是一个有效值,或者您需要更复杂的信号。Asyncio 可以满足您的需求。
Asyncio 事件
让我们添加一个 asyncio.Event() 变量,用作我们的信号。
async def run(): |
我们还需要更新消费者和生产者。
async def producer(queue: asyncio.Queue, stop_event: asyncio.Event): |
如果您使用 number_of_consumers 在队列中放入了大量 None 值,则需要从生产者的函数签名中删除该值。
在消费者中,重要的是 if stop event.is_set() 和 queue.empty():。这将确保在实际退出之前首先完成所有任务。这可能会根据你的需要而有所不同。例如,您可能需要一个单独的 asyncio.Event 用于在不耗尽队列的情况下终止函数。
总结
现在您应该掌握了在 Python 中优化和提高 IO 绑定调用吞吐量的优化路径和各种模式。
您还可以做一些事情来提高速度。例如,您可以改变消费者的数量。您还可以将 "队列 "部分卸载到专用队列(如 RabbitMQ 或 NATS),这样您就可以扩展和/或分离生产者和消费者。