在这篇博文中,我们将探讨GoLang、Redis和 Gocraft的强大组合,以及它们如何将您的Web 开发项目提升到一个新的水平。
- GoLang是一种流行的编程语言,以其速度、并发性和效率而闻名,通常需要后台进程在不阻塞主线程的情况下处理任务。
- 后台作业是在主程序继续运行时在后台异步执行的任务。这确保了系统资源的有效利用,同时避免阻塞关键功能或用户界面。
本文强调三点:
- 后台作业是在主程序继续运行时在后台异步执行的任务,可以用于发送电子邮件、处理大型数据集、上传文件等任务。
- Gocraft和GoLang是一个强大的组合,可以提升Web开发项目的效率和可扩展性。
- Gocraft/work库简化了使用Redis运行后台作业的过程,提供作业排队、作业参数、作业重试、作业日志、作业调度、作业并发和Web U.I.等功能。
后台作业的用途是什么?
后台作业通常用于以下任务:
- 发送电子邮件
- 处理大型数据集
- 上传文件
- 计划任务
- 长时间运行的操作
它们具有以下几个优点:
- 改进的响应能力:主程序在运行后台任务时保持响应。
- 提高可扩展性:您可以快速扩展后台工作人员的数量以处理更高的工作负载。
- 更好的资源利用率: 系统资源的使用效率更高,因为主程序不必等待后台任务完成。
Goraft /work是一个GoLang库,它简化了使用Redis运行后台作业的过程。它提供以下功能:
- 作业排队: 将作业从Redis队列中入队和出队。
- 作业参数:将参数传递给作业以进行自定义。
- 作业重试:自动重试失败的作业指定的次数。
- 作业日志:跟踪作业的执行状态和日志。
- 作业调度:安排作业在特定时间或间隔运行。
- 作业并发:控制可以同时运行的作业数量。
- Web UI:通过Web 界面监控作业队列和工作线程性能。
Gocraft/work使用一个简单的架构,包含三个主要组件:
- Enqueuer:该组件将作业添加到Redis队列中。
- 工作池:这是一个工作进程池,用于从队列中出列并执行作业。
- Web UI:此可视化界面监视作业队列和工作性能。
以下是 Gocraft/work 工作原理的基本概述:
- 应用程序使用 Enqueuer将作业排入Redis队列。
- 工作进程从队列中获取作业并执行它们。
- 工作进程报告其状态并记录回Redis。
- Web UI 显示作业队列和工作器性能信息。
调度程序和Redis工作池
最好有一个调度程序和一个工作池来有效地管理后台作业。调度程序确定每个作业应何时执行,而工作池则忽略执行作业的工作进程池。
Redis是一种流行的内存数据存储,通常用作后台作业队列的后备存储。它具有高性能和可靠性,非常适合处理大量作业。Redis提供了多种安装选项,具体取决于您的操作系统和偏好。您可以选择预编译的二进制文件、从源代码构建或使用官方包存储库。
可以在此处找到每个平台的安装过程的快速概述 - https://redis.io/docs/install/install-redis/
Gocraft/工作入门
要开始使用 Gocraft/work,您可以按照以下步骤操作:
1、使用以下命令安装gocraft/work库:
go get github.com/gocraft/work
2、下载并安装Web UI的二进制文件:
go get github.com/gocraft/work/cmd/workwebui
go get github.com/gocraft/work/cmd/workwebui
3、要运行Web UI,您可以使用以下命令:
workwebui -redis="redis:6379" -ns="application_namespace" -listen=":5040"
- “-redis” — 此参数指定托管作业队列的Redis服务器的地址和端口。
- “-ns” — 该参数定义用于标识作业队列的命名空间。
- “-listen” — 此参数指定Web UI 将侦听连接的端口。
遵循这样的目录结构:example ├── scheduler │ ├── scheduler.go ├── processors │ └── process.go └── go.mod
|
排队作业
Redis池:
- redisPool :该变量定义Redis连接池。它确保与Redis 服务器的高效且可重用的连接。
- MaxActive:指定池中活动连接的最大数量(例如 5)。
- MaxIdle:这定义了池中空闲连接的最大数量(例如,5)。
- Wait:这控制当所有连接都繁忙时池的行为方式。
- 拨号:此函数定义池如何与地址“:6379”的Redis 服务器建立新连接。
入队:- Enqueuer:该变量定义一个 enqueuer 对象,负责将作业排队到Redis 服务器。
- “application_namespace ”:指定用于标识属于该应用程序的作业的命名空间。
- redisPool :这为入队提供Redis连接池
// main file in scheduler package
package main import ( "github.com/gomodule/redigo/redis" "github.com/gocraft/work" ) // 创建 redis 池 var redisPool = &redis.Pool{ MaxActive: 5, MaxIdle: 5, Wait: true, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, } //使用特定命名空间创建查询器 var enqueuer = work.NewEnqueuer("application_namespace", redisPool)
func main() { // 使用指定参数执行名为 "send_email "的任务。 _, err := enqueuer.Enqueue("send_welcome_email", work.Q{"email_address": "test@example.com", "user_id": 4}) if err != nil { log.Fatal(err) } }
|
- 在 enqueuer 对象上调用 Enqueue 方法。
- "send_welcome_email":该文本指定了需要添加到队列中的作业名称。
- work.Q:该映射存储作业的参数。
- "email_address"(电子邮件地址):此参数指定要发送到的电子邮件地址。
- "user_id":用户名:此参数保存有关客户的附加信息。
- Enqueue 方法会返回一个作业 ID 和一个错误对象。
- 如果在队列作业时发生错误,log.Fatal(err) 函数会记录错误信息并退出程序。
处理工作任务Process jobs
// main file in processor package
package main
import ( "github.com/gomodule/redigo/redis" "github.com/gocraft/work" "os" "os/signal" ) // Make a redis pool var redisPool = &redis.Pool{ MaxActive: 5, MaxIdle: 5, Wait: true, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, } type Context struct{ email string userId int64 } func main() { // 创建一个新池。参数: // Context{} 是一个结构体,将作为请求的上下文。 // 10 是最大并发量 // "application_namespace" 是 Redis 命名空间 // redisPool 是一个 Redis 池。 pool := work.NewWorkerPool(Context{}, 10, "application_namespace", redisPool)
// 添加将为每个作业执行的中间件 pool.Middleware((*Context).Log) pool.Middleware((*Context).FindCustomer)
// Map the name of jobs to handler functions pool.Job("send_welcome_email", (*Context).SendWelcomeEmail)
// 开始处理工作 pool.Start()
// 等待退出信号: signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, os.Kill) <-signalChan
// Stop the pool pool.Stop() }
func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error { fmt.Println("Starting job: ", job.Name) return next() }
func (c *Context) FindCustomer(job *work.Job, next work.NextMiddlewareFunc) error { // 如果有 user_id 参数,则在上下文中设置该参数,以便将来的中间件和处理程序使用。 if _, ok := job.Args["user_id"]; ok { c.userId = job.ArgInt64("user_id") c.email = job.ArgString("email_address") if err := job.ArgError(); err != nil { return err } } return next() }
func (c *Context) SendWelcomeEmail(job *work.Job) error { // Extract arguments: addr := job.ArgString("email_address") if err := job.ArgError(); err != nil { return err }
fmt.Println("Sending email") // 使用 net/smtp 库发送电子邮件 // (电子邮件应使用 gmail 域名,密码应使用应用程序密码创建) from, password := "xyz@gmail.com", "sthkxlpixjferhfd" to := []string{ addr, }
smtpHost := "smtp.gmail.com" smtpPort := "587" message := []byte("This is a Welcome email message.")
// 认证. auth := smtp.PlainAuth("", from, password, smtpHost)
//发送电子邮件。 err := smtp.SendMail(smtpHost+":"+smtpPort, auth, from, to, message) if err != nil { fmt.Println(err) return nil }
fmt.Println("Email Sent Successfully!") return nil }
|
计划作业
您可以安排未来要执行的工作。
enqueuer := work.NewEnqueuer("application_namespace", redisPool) secondsInTheFuture := 300 _, err := enqueuer.EnqueueIn("send_welcome_email", secondsInTheFuture, work.Q{"email_address": "test@example.com"})
|
- secondsInTheFuture(未来秒数):该变量存储未来执行任务的秒数。在本例中,值为 300 秒(5 分钟)。
- EnqueueIn:该方法在 enqueuer 对象上调用,以安排作业的延迟执行。
唯一作业
您可以为唯一作业建立队列,这样队列中就只能同时存在一个具有给定名称/参数的作业。
enqueuer := work.NewEnqueuer("application_namespace", redisPool) job, err := enqueuer.EnqueueUnique("send_welcome_email", work.Q{"email_address": "xyz@gmail.com"}) // job returned job, err = enqueuer.EnqueueUnique("send_welcome_email", work.Q{"email_address": "xyz@gmail.com"}) // job == nil -- this duplicate job isn't enqueued. job, err = enqueuer.EnqueueUniqueIn("send_welcome_email", 300, work.Q{"email_address": "abc@gmail.com"}) // job != nil (diff email_address)
|
或者,您也可以提供自己的键,以确保作业的唯一性。当另一个作业以与队列中已存在的作业相同的键排队时,它只会更新参数。
enqueuer := work.NewEnqueuer("application_namespace", redisPool) job, err := enqueuer.EnqueueUniqueByKey("send_welcome_email", work.Q{"email_address": "xyz@gmail.com"}, map[string]interface{}{"my_key": "586"}) job, err = enqueuer.EnqueueUniqueInByKey("send_welcome_email", 120, work.Q{"email_address": "abc@gmail.com"}, map[string]interface{}{"my_key": "586"})
|
定期排队(Cron)
您可以使用工作池定期在 Gocraft/工作集群上排队作业。
pool := work.NewWorkerPool(Context{}, 10, "application_namespace", redisPool) pool.PeriodicallyEnqueue("0 * * * * *", "send_welcome_email") // 这将每分钟触发一次 "send_welcome_email "任务 pool.Job("send_welcome_email", (*Context).SendWelcomeEmail) // 仍然需要为这项工作单独注册一个处理程序
|
作业并发
您可以使用控制作业并发性JobOptions{MaxConcurrency: }。与 WorkerPool 并发不同,它控制单个 Redis 实例中可同时处于活动状态的该类型作业的数量限制。这是通过在排队函数上设置一个前提条件来实现的,这意味着如果我们达到或超过作业的限制,则不会安排新的作业MaxConcurrency。Redis 键(请参阅 参考资料redis.go::redisKeyJobsLock)用作计数信号量,以便跟踪每种作业类型的作业并发性。默认值为0,表示“作业并发数没有限制”。
注意:如果您想“单线程”运行作业,那么您可以进行MaxConcurrency相应的设置:
worker_pool.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, (*Context).WorkFxn)
以下是Gocraft /work的关键概念和功能的简化细分:
作业队列:
- 每个作业都有以其自身命名的队列。
- Redis 中的队列是基于列表的。
- 使用 LPUSH 将作业排队到相应的队列中。
作业重点:
- 每个队列可以有一个优先级 (1–100000)。
- 优先级越高的队列被工作人员选择的机会就越高。
- 工作人员根据他们的相对偏好概率选择队列。
工作处理:
- 工作者从所选队列中获取作业。
- 该作业会通过 Lua 脚本自动移至“进行中”队列。
- Worker 执行作业并增加作业锁。
- 如果成功,该作业将从“进行中”队列中删除。
- 如果不成功,作业将根据剩余的重试次数移至“重试”或“死亡”队列。
作业重试和预定作业:
- “重试”和“计划作业”队列被实现为 Redis z 集。
- 分数是执行的时间戳。
- “重新排队器”定期检查这些队列并将就绪作业移至常用队列。
工作者worker和worker池:
- Worker 池管理 Worker 并提供公共 API。
- 启动/停止池、附加作业/中间件以及设置并发限制。
- 每个池根据并发设置启动 N 个工作协程。
- 每个worker独立地从Redis获取作业、执行它们并循环。
独特且定期的工作:
- 独特的作业确保每个队列仅存在一个具有特定参数的实例。
- 定期作业根据 cron 计划使用“定期排队器”进行排队。
其他组件:
- Reaper:如果工作线程崩溃,则重新排队“正在进行”队列中的作业。
- 暂停的作业:可以通过设置 Redis 键来暂停队列中的作业。
- 死作业:超过重试限制的作业将移至死队列。
- 术语:澄清“工作者worker”、“队列queue”和“作业类型job”等术语。
结论
这个深入的指南探索了Gocraft库的复杂性,为您提供了有效管理 Go 应用程序中后台作业的知识。我们解释了作业队列、优先级、重试和工作池等关键概念,并探索了周期性作业、独特作业和死信等强大功能。
通过使用Gocraft,您可以利用后台处理的强大功能来提高应用程序的响应能力、可扩展性和整体用户体验。