用Rust编写后台调度计划任务Cronjob

在本文中,我们将讨论如何使用 Shuttle 将自己的 cron 作业编写为 Web 服务!
Cron 作业(或“计划任务”)对于很多事情都很有用。它们允许您自动执行以下操作:

  • 自动数据备份。
  • 添加每日提醒(例如,针对已注册您拥有的服务但尚未开始使用该服务的客户)。
  • 创建/撰写报告。

如果您对最终代码感兴趣并希望快速部署,可以在此处找到该存储库。您可以通过两个简单的步骤来部署它:
  1. 运行cargo shuttle init --from shuttle-hq/shuttle-examples --subfolder shuttle-cron并按照提示操作(需要cargo-shuttle安装)
  2. 跑步cargo shuttle deploy。就是这样!

本项目依赖:
我们需要安装以下依赖项:

  • serde- 允许我们反/序列化任务
  • chrono- 允许我们使用时间戳
  • apalis- 我们将使用的任务队列框架(下面代码有)
  • sqlx- 允许您与您配置的数据库进行交互
  • shuttle-shared-db- 从 Shuttle 为您提供数据库(本地通过 Docker,通过部署中的运行时)
  • tower- 与 apalis 一起使用,以便可以托管 cron 作业。(下面代码有)

cargo add apalis -F cron,postgres,extensions,retry
cargo add chrono -F serde,clock
cargo add serde -F derive
cargo add shuttle-shared-db -F postgres
cargo add sqlx -F runtime-tokio-native-tls,postgres
cargo add tower 


完整代码:

#[shuttle_runtime::async_trait]
impl shuttle_runtime::Service for MyService {
    async fn bind(
        self, 
        _addr: std::net::SocketAddr
    ) -> Result<(), shuttle_runtime::Error> {
        let storage = PostgresStorage::new(self.db.clone());
        // set up storage
        storage.setup().await.expect(
"Unable to run migrations :(");
    
        let cron_service_ext = CronjobData { 
            message:
"Hello world".to_string() 
        };
    
       
// 为 cronjob 创建服务生成器
        let service = ServiceBuilder::new()
            .layer(RetryLayer::new(DefaultRetryPolicy))
            .layer(Extension(cron_service_ext))
            .service(job_fn(say_hello_world));
 
        let schedule = Schedule::from_str(
"* * * * * *")
            .expect(
"Couldn't start the scheduler!");
    
       
//创建一个 Worker,使用 cronjob 创建的服务
        let worker = WorkerBuilder::new(
"morning-cereal")
            .with_storage(storage.clone())
            .stream(CronStream::new(schedule).timer(TokioTimer).to_stream())
            .build(service);
    
       
// 启动您的worker
        Monitor::new()
            .register(worker)
            .run()
            .await
            .expect(
"Unable to start worker");
    
            Ok(())
    }
}

解释:
1、首先,配置一个数据库
让我们使用 Shuttle 运行时配置一个数据库。您需要将shuttle-shared-db注释添加到您的 main 函数中,如下所示并设置我们的 Postgres 连接池:

use sqlx::{PgPool, postgres::PgPoolOptions};

pub struct MyService {
    db: PgPool,
}

#[shuttle_runtime::main]
async fn shuttle_main(
    #[shuttle_shared_db::Postgres] conn_string: String,
    ) -> Result<MyService, shuttle_runtime::Error> {
    let db = PgPoolOptions::new()
        .min_connections(5)
        .max_connections(5)
        .connect(&conn_string)
        .await
        .unwrap();

    Ok(MyService { db })
}

2、实现一个具体服务
我们需要设置PostgresStorage类型,以便我们可以将 Postgres 用于持久作业队列。如果没有持久的作业队列,如果我们的网络服务出现任何中断,我们的作业就会消失!

我们可以直接从PgPool主结构中存储的类型转换它,并为其运行迁移,如下所示:

#[shuttle_runtime::async_trait]
impl shuttle_runtime::Service for MyService {
    async fn bind(
        self, 
        _addr: std::net::SocketAddr
    ) -> Result<(), shuttle_runtime::Error> {
        // 设置 Postgres 支持的存储
        let storage = PostgresStorage::new(self.db);
       
// 设仓
        storage.setup().await.expect(
"Unable to run migrations :(");
    Ok(())
   }
}

shuttle_runtime::Service 这个接口允许将您的自定义服务打包到 Shuttle 运行时可以使用的项目中。它还提供了一个 HTTP 地址,您可以选择将 HTTP 绑定服务绑定到该地址(例如,Web 服务器)。

3、创建一个tower服务
tower是引入的库包依赖,现在我们可以创建一个tower服务来保存我们想要运行的工作。

use tower::ServiceBuilder;
use apalis::layers::{DefaultRetryLayer, Extension, RetryLayer};
use apalis::prelude::job_fn;

// .. your previous code

let cron_service_ext = CronjobData {
    message:
"Hello world".to_string(),
};

// create a servicebuilder for the cronjob
let service = ServiceBuilder::new()
    .layer(RetryLayer::new(DefaultRetryPolicy))
    .layer(Extension(cron_service_ext))
    .service(job_fn(say_hello_world)); 


这里几个关键:

  • 实现 apalis::prelude::Job 的结构体
  • 这个结构体,它可以作为共享数据扩展添加到我们的 cron 服务中,其函数将完成我们想要的实际工作
  • 当需要完成工作时(根据 cronjob 计划),apalis 将调用一个函数

实现 apalis::prelude::Job 的结构体时:在 cronjob 上下文中使用 apalis 作业时,它们还必须实现 From<chrono::DateTime<chrono::Utc>>(这意味着它们不能包含任何其他字段):

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use apalis::prelude::Job;

#[derive(Default, Debug, Clone, Serialize, Deserialize)]
struct Reminder(DateTime<Utc>);

impl From<DateTime<Utc>> for Reminder {
   fn from(t: DateTime<Utc>) -> Self {
       Reminder(t)
   }
}

// set up an identifier for apalis
impl Job for Reminder {
    const NAME: &'static str =
"reminder::DailyReminder";
}

现在,我们要实现一个函数,当 apalis 调用该函数时,它将执行实际工作。我们可以将其设置为只说 "Hello world from say_hello_world()!":

use apalis::prelude::JobContext;

async fn say_hello_world(job: Reminder, ctx: JobContext) {
     println!("Hello world from send_reminder()!");
   
// 这样就可以使用存储在 CronjobData 结构中的变量
    let svc = ctx.data_opt::<CronjobData>().unwrap();
   
// 执行 CronjobData::execute()
    svc.execute(job);
}

上述代码CronjobData 结构的变量来自实例化该结构的结果,然后将其作为共享数据扩展添加到我们使用的 Tower 服务中。这样,apalis 就可以通过使用 JobContext 结构中的 .data_opt() 来使用这些数据。

4、构建worker
现在我们已经构建了服务,我们需要构建woker,然后最终创建一个apalis::Monitor将执行工作的流程:

use apalis::prelude::timer::TokioTimer;
use apalis::cron::{Schedule, CronStream};
use apalis::prelude::{WorkerBuilder, Monitor};

// .. your previous code
let schedule = Schedule::from_str(
"* * * * * *").expect("Couldn't start the scheduler!");

// create a worker that uses the service created from the cronjob
let worker = WorkerBuilder::new(
"morning-cereal")
    .with_storage(storage.clone())
    .stream(
        CronStream::new(schedule)
        .timer(TokioTimer)
        .to_stream()
     )
    .build(service);

// start your worker up
Monitor::new().register(worker).run().await.expect(
"Unable to start worker");


下面是MyService 内容

5、运行

需要运行 Docker 命令来启动它。在生产中,您还需要手动实例化和管理 Postgres 实例,或者依赖 Terraform 等 IaC(基础设施即代码)工具。

现在我们已经编写了所有内容,您需要做的就是cargo shuttle deploy(--allow-dirty如果在脏 Git 分支上工作,则使用该标志)。部署完成后,您将获得部署信息以及部署数据库 URL 字符串。