在本文中,我们将讨论如何使用 Shuttle 将自己的 cron 作业编写为 Web 服务!
Cron 作业(或“计划任务”)对于很多事情都很有用。它们允许您自动执行以下操作:
- 自动数据备份。
- 添加每日提醒(例如,针对已注册您拥有的服务但尚未开始使用该服务的客户)。
- 创建/撰写报告。
如果您对最终代码感兴趣并希望快速部署,可以在此处找到该存储库。您可以通过两个简单的步骤来部署它:
- 运行cargo shuttle init --from shuttle-hq/shuttle-examples --subfolder shuttle-cron并按照提示操作(需要cargo-shuttle安装)
- 跑步cargo shuttle deploy。就是这样!
本项目依赖:
我们需要安装以下依赖项:
- serde- 允许我们反/序列化任务
- chrono- 允许我们使用时间戳
- apalis- 我们将使用的任务队列框架(下面代码有)
- sqlx- 允许您与您配置的数据库进行交互
- shuttle-shared-db- 从 Shuttle 为您提供数据库(本地通过 Docker,通过部署中的运行时)
- tower- 与 apalis 一起使用,以便可以托管 cron 作业。(下面代码有)
cargo add apalis -F cron,postgres,extensions,retry
cargo add chrono -F serde,clock
cargo add serde -F derive
cargo add shuttle-shared-db -F postgres
cargo add sqlx -F runtime-tokio-native-tls,postgres
cargo add tower
完整代码:
#[shuttle_runtime::async_trait] impl shuttle_runtime::Service for MyService { async fn bind( self, _addr: std::net::SocketAddr ) -> Result<(), shuttle_runtime::Error> { let storage = PostgresStorage::new(self.db.clone()); // set up storage storage.setup().await.expect("Unable to run migrations :("); let cron_service_ext = CronjobData { message: "Hello world".to_string() }; // 为 cronjob 创建服务生成器 let service = ServiceBuilder::new() .layer(RetryLayer::new(DefaultRetryPolicy)) .layer(Extension(cron_service_ext)) .service(job_fn(say_hello_world)); let schedule = Schedule::from_str("* * * * * *") .expect("Couldn't start the scheduler!"); //创建一个 Worker,使用 cronjob 创建的服务 let worker = WorkerBuilder::new("morning-cereal") .with_storage(storage.clone()) .stream(CronStream::new(schedule).timer(TokioTimer).to_stream()) .build(service); // 启动您的worker Monitor::new() .register(worker) .run() .await .expect("Unable to start worker"); Ok(()) } }
|
解释:
1、首先,配置一个数据库
让我们使用 Shuttle 运行时配置一个数据库。您需要将shuttle-shared-db注释添加到您的 main 函数中,如下所示并设置我们的 Postgres 连接池:
use sqlx::{PgPool, postgres::PgPoolOptions};
pub struct MyService { db: PgPool, }
#[shuttle_runtime::main] async fn shuttle_main( #[shuttle_shared_db::Postgres] conn_string: String, ) -> Result<MyService, shuttle_runtime::Error> { let db = PgPoolOptions::new() .min_connections(5) .max_connections(5) .connect(&conn_string) .await .unwrap();
Ok(MyService { db }) }
|
2、实现一个具体服务
我们需要设置PostgresStorage类型,以便我们可以将 Postgres 用于持久作业队列。如果没有持久的作业队列,如果我们的网络服务出现任何中断,我们的作业就会消失!
我们可以直接从PgPool主结构中存储的类型转换它,并为其运行迁移,如下所示:
#[shuttle_runtime::async_trait] impl shuttle_runtime::Service for MyService { async fn bind( self, _addr: std::net::SocketAddr ) -> Result<(), shuttle_runtime::Error> { // 设置 Postgres 支持的存储 let storage = PostgresStorage::new(self.db); // 设仓 storage.setup().await.expect("Unable to run migrations :("); Ok(()) } }
|
shuttle_runtime::Service 这个接口允许将您的自定义服务打包到 Shuttle 运行时可以使用的项目中。它还提供了一个 HTTP 地址,您可以选择将 HTTP 绑定服务绑定到该地址(例如,Web 服务器)。3、创建一个tower服务
tower是引入的库包依赖,现在我们可以创建一个tower服务来保存我们想要运行的工作。
use tower::ServiceBuilder; use apalis::layers::{DefaultRetryLayer, Extension, RetryLayer}; use apalis::prelude::job_fn;
// .. your previous code
let cron_service_ext = CronjobData { message: "Hello world".to_string(), };
// create a servicebuilder for the cronjob let service = ServiceBuilder::new() .layer(RetryLayer::new(DefaultRetryPolicy)) .layer(Extension(cron_service_ext)) .service(job_fn(say_hello_world));
|
这里几个关键:
- 实现 apalis::prelude::Job 的结构体
- 这个结构体,它可以作为共享数据扩展添加到我们的 cron 服务中,其函数将完成我们想要的实际工作
- 当需要完成工作时(根据 cronjob 计划),apalis 将调用一个函数
实现 apalis::prelude::Job 的结构体时:在 cronjob 上下文中使用 apalis 作业时,它们还必须实现 From<chrono::DateTime<chrono::Utc>>(这意味着它们不能包含任何其他字段):use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use apalis::prelude::Job;
#[derive(Default, Debug, Clone, Serialize, Deserialize)] struct Reminder(DateTime<Utc>);
impl From<DateTime<Utc>> for Reminder { fn from(t: DateTime<Utc>) -> Self { Reminder(t) } }
// set up an identifier for apalis impl Job for Reminder { const NAME: &'static str = "reminder::DailyReminder"; }
|
现在,我们要实现一个函数,当 apalis 调用该函数时,它将执行实际工作。我们可以将其设置为只说 "Hello world from say_hello_world()!":
use apalis::prelude::JobContext;
async fn say_hello_world(job: Reminder, ctx: JobContext) { println!("Hello world from send_reminder()!"); // 这样就可以使用存储在 CronjobData 结构中的变量 let svc = ctx.data_opt::<CronjobData>().unwrap(); // 执行 CronjobData::execute() svc.execute(job); }
|
上述代码CronjobData 结构的变量来自实例化该结构的结果,然后将其作为共享数据扩展添加到我们使用的 Tower 服务中。这样,apalis 就可以通过使用 JobContext 结构中的 .data_opt() 来使用这些数据。
4、构建worker
现在我们已经构建了服务,我们需要构建woker,然后最终创建一个apalis::Monitor将执行工作的流程:
use apalis::prelude::timer::TokioTimer; use apalis::cron::{Schedule, CronStream}; use apalis::prelude::{WorkerBuilder, Monitor};
// .. your previous code let schedule = Schedule::from_str("* * * * * *").expect("Couldn't start the scheduler!");
// create a worker that uses the service created from the cronjob let worker = WorkerBuilder::new("morning-cereal") .with_storage(storage.clone()) .stream( CronStream::new(schedule) .timer(TokioTimer) .to_stream() ) .build(service);
// start your worker up Monitor::new().register(worker).run().await.expect("Unable to start worker");
|
下面是MyService 内容
5、运行
需要运行 Docker 命令来启动它。在生产中,您还需要手动实例化和管理 Postgres 实例,或者依赖 Terraform 等 IaC(基础设施即代码)工具。
现在我们已经编写了所有内容,您需要做的就是cargo shuttle deploy(--allow-dirty如果在脏 Git 分支上工作,则使用该标志)。部署完成后,您将获得部署信息以及部署数据库 URL 字符串。