Ractor 是纯 Rust Actor框架。灵感来自Erlanggen_server,具有 Rust 的速度 + 性能!
Ractor 在RustConf'24上举办了一场会议,介绍了如何在 Meta 的 Rust Thrift 服务器中使用它进行分布式过载保护。幻灯片
Ractor 中的消息发送可以以两种方式进行,即先发后弃和等待回复。然而,它们的符号分别遵循 Erlang 的命名方案“cast”和“call”。
安装:
ractor通过将以下内容添加到 Cargo.toml 依赖项中进行安装
[dependencies] ractor = "0.9"
代码:
pub enum MyFirstActorMessage { /// Print's hello world PrintHelloWorld, }
use ractor::{Actor, ActorRef, ActorProcessingErr};
pub struct MyFirstActor;
#[async_trait::async_trait] impl Actor for MyFirstActor { type State = (); type Msg = MyFirstActorMessage; type Arguments = ();
async fn pre_start(&self, _myself: ActorRef<Self::Msg>, _arguments: Self::Arguments) -> Result<Self::State, ActorProcessingErr> { Ok(()) }
|
首先我们需要演员的结构类型,也就是我们所说的MyFirstActor。
然后我们定义我们的Actor行为,至少需要定义三种类型
- State- 参与者的“状态”,对于无状态参与者来说,这可以简单地()表示参与者没有可变的状态
- Msg- 演员的消息类型。
- Arguments- 启动参数,用于pre_start构造初始状态。这对于从 TCP 侦听器 Actor 生成的 TCP Actor 非常有用。侦听器需要将拥有的流传递给新 Actor,并Arguments在那里提供便利,以便其他 Actor 可以正确构建其状态,而无需clone()使用具有潜在副作用的结构。
最后,我们定义 Actor 的启动例程,pre_start在成功时发出 Actor 的初始状态。一旦运行,您的 Actor 就会处于活跃和健康的状态,只需等待接收消息即可!
如何打印出 hello world?
需要连接一个消息处理程序。让我们开始吧!
#[async_trait::async_trait] impl Actor for MyFirstActor { type State = (); type Msg = MyFirstActorMessage; type Arguments = ();
async fn pre_start(&self, _myself: ActorRef<Self::Msg>, _arguments: Self::Arguments) -> Result<Self::State, ActorProcessingErr> { Ok(()) }
async fn handle(&self, _myself: ActorRef<Self::Msg>, message: Self::Msg, _state: &mut Self::State) -> Result<(), ActorProcessingErr> { match message { MyFirstActorMessage::PrintHelloWorld => { println!("Hello world!"); } } Ok(()) } }
|
这里我们添加了消息处理程序handle()方法,它将针对队列中收到的每条消息执行。
其全部连接到一个适当的程序中。
#[tokio::main] async fn main() { // 建立一个 ActorRef 和一个 JoinHandle,这个 JoinHandle 将伴随 // actor的一生。 大多数情况下,我们会丢弃这个句柄,但在 // 主函数中,它可以用来等待干净的角色关闭(所有停止处理程序都将 // 完成)。 let (actor, actor_handle) = Actor::spawn(None, MyFirstActor, ()).await.expect("Actor failed to start"); for _i in 0..10 { // Sends a message, with no reply actor.cast(MyFirstActorMessage::PrintHelloWorld).expect("Failed to send message to actor"); }
//给一点时间打印所有信息 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Cleanup actor.stop(None); actor_handle.await.unwrap(); }
|
添加状态
现在如果我们想向actor询问一些信息怎么办?比如它在生命周期中迄今为止打印的 hello-worlds 的数量,让我们看看它会是什么样子。
use ractor::{Actor, ActorRef, ActorProcessingErr, RpcReplyPort};
pub enum MyFirstActorMessage { /// Print's hello world PrintHelloWorld, /// Replies with how many hello worlds have occurred HowManyHelloWorlds(RpcReplyPort<u16>), }
pub struct MyFirstActor;
#[async_trait::async_trait] impl Actor for MyFirstActor { type State = u16; type Msg = MyFirstActorMessage; type Arguments = ();
async fn pre_start(&self, _myself: ActorRef<Self::Msg>, _arguments: Self::Arguments) -> Result<Self::State, ActorProcessingErr> { Ok(0) }
async fn handle(&self, _myself: ActorRef<Self::Msg>, message: Self::Msg, state: &mut Self::State) -> Result<(), ActorProcessingErr> { match message { MyFirstActorMessage::PrintHelloWorld => { println!("Hello world!"); *state += 1; } MyFirstActorMessage::HowManyHelloWorlds(reply) => { if reply.send(*state).is_err() { println!("Listener dropped their port before we could reply"); } } } Ok(()) } }
|
- 我们将 Actor::State 的类型改为 u16,这样演员就可以保持一些内部状态,即打印 "Hello world "的次数。
- 我们改变了 hello-world 消息的处理方式,每次打印时都会递增状态。 我们添加了一个新的消息类型 MyFirstActorMessage::HowManyHelloWorlds,它的参数类型为 RpcReplyPort。 这是角色通过远程过程调用进行相互通信的主要方式之一。 这种调用是一种提供响应通道("端口")作为参数的消息,因此接收者不需要知道是谁发出的请求。
- 我们为这种消息类型添加了一个递送器匹配臂,它会在收到请求时发送回复。
连接有状态的
与非有状态的示例非常相似,我们将这样连接!
在这个简短的例子中,我们让参与者发送 10 条消息,然后发送最后一条查询消息来读取当前计数并打印出来。我们还给它 100 毫秒的时间来执行(因此使用call_t!)或返回超时结果。
#[tokio::main] async fn main() { // 建立一个 ActorRef 和一个 JoinHandle,这个 JoinHandle 将伴随 // actor的一生。 大多数情况下,我们会丢弃这个句柄,但在 // 主函数中,它可以用来等待干净的角色关闭(所有停止处理程序都将 // 完成)。 let (actor, actor_handle) = Actor::spawn(None, MyFirstActor, ()) .await .expect("Actor failed to start"); for _i in 0..10 { // Sends a message, with no reply actor.cast(MyFirstActorMessage::PrintHelloWorld) .expect("Failed to send message to actor"); }
let hello_world_count = ractor::call_t!(actor, MyFirstActorMessage::HowManyHelloWorlds, 100) .expect("RPC failed"); println!("Actor replied with {} hello worlds!", hello_world_count);
// Cleanup actor.stop(None); actor_handle.await.unwrap(); }
|
什么是 call_t? 这是一个方便的宏,可以为我们构建 RPC 调用!
为方便开发,actor.cast(MESG)宏有三种变体!
- actor.cast(MESG)的别名,简单地向actor发送消息,非阻塞调用!
- actor.call(|reply|MESG(reply))的别名,它可以为我们构建消息,而无需提供一个 lambda 函数将回复端口作为参数来构建消息类型。 我们不需要在端口上构建和等待,RPC 功能会为我们完成这项工作。
- call_t! - 与 call_t! 相同,但有一个超时参数