ractor:Rust中基于Tokio构建的Actor模型


Ractor 是纯 Rust Actor框架。灵感来自Erlanggen_server,具有 Rust 的速度 + 性能!

Ractor 在RustConf'24上举办了一场会议,介绍了如何在 Meta 的 Rust Thrift 服务器中使用它进行分布式过载保护。幻灯片

Ractor 中的消息发送可以以两种方式进行,即先发后弃和等待回复。然而,它们的符号分别遵循 Erlang 的命名方案“cast”和“call”。

安装:
ractor通过将以下内容添加到 Cargo.toml 依赖项中进行安装
[dependencies] ractor = "0.9"

代码:

pub enum MyFirstActorMessage {
    /// Print's hello world
    PrintHelloWorld,
}

use ractor::{Actor, ActorRef, ActorProcessingErr};

pub struct MyFirstActor;

#[async_trait::async_trait]
impl Actor for MyFirstActor {
    type State = ();
    type Msg = MyFirstActorMessage;
    type Arguments = ();

    async fn pre_start(&self, _myself: ActorRef<Self::Msg>, _arguments: Self::Arguments)
        -> Result<Self::State, ActorProcessingErr> 
    {
        Ok(())
    }

首先我们需要演员的结构类型,也就是我们所说的MyFirstActor。

然后我们定义我们的Actor行为,至少需要定义三种类型

  1. State- 参与者的“状态”,对于无状态参与者来说,这可以简单地()表示参与者没有可变的状态
  2. Msg- 演员的消息类型。
  3. Arguments- 启动参数,用于pre_start构造初始状态。这对于从 TCP 侦听器 Actor 生成的 TCP Actor 非常有用。侦听器需要将拥有的流传递给新 Actor,并Arguments在那里提供便利,以便其他 Actor 可以正确构建其状态,而无需clone()使用具有潜在副作用的结构。

最后,我们定义 Actor 的启动例程,pre_start在成功时发出 Actor 的初始状态。一旦运行,您的 Actor 就会处于活跃和健康的状态,只需等待接收消息即可!

如何打印出 hello world?
需要连接一个消息处理程序。让我们开始吧!

#[async_trait::async_trait]
impl Actor for MyFirstActor {
    type State = ();
    type Msg = MyFirstActorMessage;
    type Arguments = ();

    async fn pre_start(&self, _myself: ActorRef<Self::Msg>, _arguments: Self::Arguments)
        -> Result<Self::State, ActorProcessingErr>
    {
        Ok(())
    }

    async fn handle(&self, _myself: ActorRef<Self::Msg>, message: Self::Msg, _state: &mut Self::State) 
        -> Result<(), ActorProcessingErr>
    {
        match message {
            MyFirstActorMessage::PrintHelloWorld => {
                println!("Hello world!");
            }
        }
        Ok(())
    }
}

这里我们添加了消息处理程序handle()方法,它将针对队列中收到的每条消息执行。

其全部连接到一个适当的程序中。

#[tokio::main]
async fn main() {
   // 建立一个 ActorRef 和一个 JoinHandle,这个 JoinHandle 将伴随
  
// actor的一生。 大多数情况下,我们会丢弃这个句柄,但在
// 主函数中,它可以用来等待干净的角色关闭(所有停止处理程序都将 // 完成)。
    let (actor, actor_handle) = Actor::spawn(None, MyFirstActor, ()).await.expect(
"Actor failed to start");
    
    for _i in 0..10 {
       
// Sends a message, with no reply
        actor.cast(MyFirstActorMessage::PrintHelloWorld).expect(
"Failed to send message to actor");
    }

   
//给一点时间打印所有信息
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

   
// Cleanup
    actor.stop(None);
    actor_handle.await.unwrap();
}


添加状态
现在如果我们想向actor询问一些信息怎么办?比如它在生命周期中迄今为止打印的 hello-worlds 的数量,让我们看看它会是什么样子。

use ractor::{Actor, ActorRef, ActorProcessingErr, RpcReplyPort};

pub enum MyFirstActorMessage {
    /// Print's hello world
    PrintHelloWorld,
   
/// Replies with how many hello worlds have occurred
    HowManyHelloWorlds(RpcReplyPort<u16>),
}

pub struct MyFirstActor;

#[async_trait::async_trait]
impl Actor for MyFirstActor {
    type State = u16;
    type Msg = MyFirstActorMessage;
    type Arguments = ();

    async fn pre_start(&self, _myself: ActorRef<Self::Msg>, _arguments: Self::Arguments)
        -> Result<Self::State, ActorProcessingErr>
    {
        Ok(0)
    }

    async fn handle(&self, _myself: ActorRef<Self::Msg>, message: Self::Msg, state: &mut Self::State) 
        -> Result<(), ActorProcessingErr>
    {
        match message {
            MyFirstActorMessage::PrintHelloWorld => {
                println!(
"Hello world!");
                *state += 1;
            }
            MyFirstActorMessage::HowManyHelloWorlds(reply) => {
                if reply.send(*state).is_err() {
                    println!(
"Listener dropped their port before we could reply");
                }
            }
        }
        Ok(())
    }
}
  • 我们将 Actor::State 的类型改为 u16,这样演员就可以保持一些内部状态,即打印 "Hello world "的次数。
  • 我们改变了 hello-world 消息的处理方式,每次打印时都会递增状态。 我们添加了一个新的消息类型 MyFirstActorMessage::HowManyHelloWorlds,它的参数类型为 RpcReplyPort。 这是角色通过远程过程调用进行相互通信的主要方式之一。 这种调用是一种提供响应通道("端口")作为参数的消息,因此接收者不需要知道是谁发出的请求。
  • 我们为这种消息类型添加了一个递送器匹配臂,它会在收到请求时发送回复。

连接有状态的
与非有状态的示例非常相似,我们将这样连接!

在这个简短的例子中,我们让参与者发送 10 条消息,然后发送最后一条查询消息来读取当前计数并打印出来。我们还给它 100 毫秒的时间来执行(因此使用call_t!)或返回超时结果。

#[tokio::main]
async fn main() {
    //  建立一个 ActorRef 和一个 JoinHandle,这个 JoinHandle 将伴随
 
// actor的一生。 大多数情况下,我们会丢弃这个句柄,但在
// 主函数中,它可以用来等待干净的角色关闭(所有停止处理程序都将 // 完成)。
    let (actor, actor_handle) = 
        Actor::spawn(None, MyFirstActor, ())
            .await
            .expect(
"Actor failed to start");
    
    for _i in 0..10 {
       
// Sends a message, with no reply
        actor.cast(MyFirstActorMessage::PrintHelloWorld)
            .expect(
"Failed to send message to actor");
    }

    let hello_world_count = 
        ractor::call_t!(actor, MyFirstActorMessage::HowManyHelloWorlds, 100)
        .expect(
"RPC failed");
    
    println!(
"Actor replied with {} hello worlds!", hello_world_count);

   
// Cleanup
    actor.stop(None);
    actor_handle.await.unwrap();
}

什么是 call_t? 这是一个方便的宏,可以为我们构建 RPC 调用!

为方便开发,actor.cast(MESG)宏有三种变体!

  •  actor.cast(MESG)的别名,简单地向actor发送消息,非阻塞调用!
  •  actor.call(|reply|MESG(reply))的别名,它可以为我们构建消息,而无需提供一个 lambda 函数将回复端口作为参数来构建消息类型。 我们不需要在端口上构建和等待,RPC 功能会为我们完成这项工作。
  • call_t! - 与 call_t! 相同,但有一个超时参数