GoLang中后台作业终极指南

在这篇博文中,我们将探讨GoLang、Redis和 Gocraft的强大组合,以及它们如何将您的Web 开发项目提升到一个新的水平。

  • GoLang是一种流行的编程语言,以其速度、并发性和效率而闻名,通常需要后台进程在不阻塞主线程的情况下处理任务。
  • 后台作业是在主程序继续运行时在后台异步执行的任务。这确保了系统资源的有效利用,同时避免阻塞关键功能或用户界面。

本文强调三点:

  • 后台作业是在主程序继续运行时在后台异步执行的任务,可以用于发送电子邮件、处理大型数据集、上传文件等任务。
  • Gocraft和GoLang是一个强大的组合,可以提升Web开发项目的效率和可扩展性。
  • Gocraft/work库简化了使用Redis运行后台作业的过程,提供作业排队、作业参数、作业重试、作业日志、作业调度、作业并发和Web U.I.等功能。

后台作业的用途是什么?
后台作业通常用于以下任务:

  • 发送电子邮件
  • 处理大型数据集
  • 上传文件
  • 计划任务
  • 长时间运行的操作

它们具有以下几个优点:
  • 改进的响应能力:主程序在运行后台任务时保持响应。
  • 提高可扩展性:您可以快速扩展后台工作人员的数量以处理更高的工作负载。
  • 更好的资源利用率: 系统资源的使用效率更高,因为主程序不必等待后台任务完成。

Goraft /work是一个GoLang库,它简化了使用Redis运行后台作业的过程。它提供以下功能:

  • 作业排队: 将作业从Redis队列中入队和出队。
  • 作业参数:将参数传递给作业以进行自定义。
  • 作业重试:自动重试失败的作业指定的次数。
  • 作业日志:跟踪作业的执行状态和日志。
  • 作业调度:安排作业在特定时间或间隔运行。
  • 作业并发:控制可以同时运行的作业数量。
  • Web UI:通过Web 界面监控作业队列和工作线程性能。

Gocraft/work使用一个简单的架构,包含三个主要组件:

  1. Enqueuer:该组件将作业添加到Redis队列中。
  2. 工作池:这是一个工作进程池,用于从队列中出列并执行作业。
  3. Web UI:此可视化界面监视作业队列和工作性能。

以下是 Gocraft/work 工作原理的基本概述:

  1. 应用程序使用 Enqueuer将作业排入Redis队列。
  2. 工作进程从队列中获取作业并执行它们。
  3. 工作进程报告其状态并记录回Redis。
  4. Web UI 显示作业队列和工作器性能信息。

调度程序和Redis工作池
最好有一个调度程序和一个工作池来有效地管理后台作业。调度程序确定每个作业应何时执行,而工作池则忽略执行作业的工作进程池。

Redis是一种流行的内存数据存储,通常用作后台作业队列的后备存储。它具有高性能和可靠性,非常适合处理大量作业。Redis提供了多种安装选项,具体取决于您的操作系统和偏好。您可以选择预编译的二进制文件、从源代码构建或使用官方包存储库。

可以在此处找到每个平台的安装过程的快速概述 - https://redis.io/docs/install/install-redis/

Gocraft/工作入门
要开始使用 Gocraft/work,您可以按照以下步骤操作:

1、使用以下命令安装gocraft/work库:
go get github.com/gocraft/work

2、下载并安装Web UI的二进制文件:
go get github.com/gocraft/work/cmd/workwebui
go get github.com/gocraft/work/cmd/workwebui

3、要运行Web UI,您可以使用以下命令:
workwebui -redis="redis:6379" -ns="application_namespace" -listen=":5040"

  1. “-redis” — 此参数指定托管作业队列的Redis服务器的地址和端口。
  2. “-ns” — 该参数定义用于标识作业队列的命名空间。
  3. “-listen” — 此参数指定Web UI 将侦听连接的端口。

遵循这样的目录结构:
example
├── scheduler
│   ├── scheduler.go
├── processors
│   └── process.go
└── go.mod

排队作业
Redis池:

  1. redisPool :该变量定义Redis连接池。它确保与Redis 服务器的高效且可重用的连接。
  2. MaxActive:指定池中活动连接的最大数量(例如 5)。
  3. MaxIdle:这定义了池中空闲连接的最大数量(例如,5)。
  4. Wait:这控制当所有连接都繁忙时池的行为方式。
  5. 拨号:此函数定义池如何与地址“:6379”的Redis 服务器建立新连接。

入队:
  1. Enqueuer:该变量定义一个 enqueuer 对象,负责将作业排队到Redis 服务器。
  2. “application_namespace ”:指定用于标识属于该应用程序的作业的命名空间。
  3. redisPool :这为入队提供Redis连接池

// main file in scheduler package

package main
import (
 
"github.com/gomodule/redigo/redis"
 
"github.com/gocraft/work"
)
// 创建 redis 池
var redisPool = &redis.Pool{
 MaxActive: 5,
 MaxIdle: 5,
 Wait: true,
 Dial: func() (redis.Conn, error) {
  return redis.Dial(
"tcp", ":6379")
 },
}
//使用特定命名空间创建查询器
var enqueuer = work.NewEnqueuer(
"application_namespace", redisPool)

func main() {
 
// 使用指定参数执行名为 "send_email "的任务。
 _, err := enqueuer.Enqueue(
"send_welcome_email", work.Q{"email_address": "test@example.com", "user_id": 4})
 if err != nil {
  log.Fatal(err)
 }
}

  • 在 enqueuer 对象上调用 Enqueue 方法。
  • "send_welcome_email":该文本指定了需要添加到队列中的作业名称。
  • work.Q:该映射存储作业的参数。
  • "email_address"(电子邮件地址):此参数指定要发送到的电子邮件地址。
  • "user_id":用户名:此参数保存有关客户的附加信息。
  • Enqueue 方法会返回一个作业 ID 和一个错误对象。
  • 如果在队列作业时发生错误,log.Fatal(err) 函数会记录错误信息并退出程序。

处理工作任务Process jobs

// main file in processor package

package main

import (
 
"github.com/gomodule/redigo/redis"
 
"github.com/gocraft/work"
 
"os"
 
"os/signal"
)
// Make a redis pool
var redisPool = &redis.Pool{
 MaxActive: 5,
 MaxIdle: 5,
 Wait: true,
 Dial: func() (redis.Conn, error) {
  return redis.Dial(
"tcp", ":6379")
 },
}
type Context struct{
    email string
    userId int64
}
func main() {
 
// 创建一个新池。参数:
 
// Context{} 是一个结构体,将作为请求的上下文。
 
// 10 是最大并发量
 
// "application_namespace" 是 Redis 命名空间
 
// redisPool 是一个 Redis 池。
 pool := work.NewWorkerPool(Context{}, 10,
"application_namespace", redisPool)

 
// 添加将为每个作业执行的中间件
 pool.Middleware((*Context).Log)
 pool.Middleware((*Context).FindCustomer)

 
// Map the name of jobs to handler functions
 pool.Job(
"send_welcome_email", (*Context).SendWelcomeEmail)

 
// 开始处理工作
 pool.Start()

 
// 等待退出信号:
 signalChan := make(chan os.Signal, 1)
 signal.Notify(signalChan, os.Interrupt, os.Kill)
 <-signalChan

 
// Stop the pool
 pool.Stop()
}

func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
 fmt.Println(
"Starting job: ", job.Name)
 return next()
}


func (c *Context) FindCustomer(job *work.Job, next work.NextMiddlewareFunc) error {
 
// 如果有 user_id 参数,则在上下文中设置该参数,以便将来的中间件和处理程序使用。
 if _, ok := job.Args[
"user_id"]; ok {
  c.userId = job.ArgInt64(
"user_id")
  c.email = job.ArgString(
"email_address")
  if err := job.ArgError(); err != nil {
   return err
  }
 }
 return next()
}


func (c *Context) SendWelcomeEmail(job *work.Job) error {
 
// Extract arguments:
 addr := job.ArgString(
"email_address")
 if err := job.ArgError(); err != nil {
  return err
 }

 fmt.Println(
"Sending email")
 
// 使用 net/smtp 库发送电子邮件
 
// (电子邮件应使用 gmail 域名,密码应使用应用程序密码创建)
 from, password :=
"xyz@gmail.com", "sthkxlpixjferhfd"
 to := []string{
   addr,
 }

 smtpHost :=
"smtp.gmail.com"
 smtpPort :=
"587"
 message := []byte(
"This is a Welcome email message.")

 
// 认证.
 auth := smtp.PlainAuth(
"", from, password, smtpHost)

 
//发送电子邮件。
 err := smtp.SendMail(smtpHost+
":"+smtpPort, auth, from, to, message)
 if err != nil {
  fmt.Println(err)
  return nil
 }

 fmt.Println(
"Email Sent Successfully!")
 return nil
}

计划作业
您可以安排未来要执行的工作。

enqueuer := work.NewEnqueuer("application_namespace", redisPool)
secondsInTheFuture := 300
_, err := enqueuer.EnqueueIn(
"send_welcome_email", secondsInTheFuture, work.Q{"email_address": "test@example.com"})

  • secondsInTheFuture(未来秒数):该变量存储未来执行任务的秒数。在本例中,值为 300 秒(5 分钟)。
  • EnqueueIn:该方法在 enqueuer 对象上调用,以安排作业的延迟执行。

唯一作业
您可以为唯一作业建立队列,这样队列中就只能同时存在一个具有给定名称/参数的作业。

enqueuer := work.NewEnqueuer("application_namespace", redisPool)
job, err := enqueuer.EnqueueUnique(
"send_welcome_email", work.Q{"email_address": "xyz@gmail.com"}) // job returned
job, err = enqueuer.EnqueueUnique(
"send_welcome_email", work.Q{"email_address": "xyz@gmail.com"}) // job == nil -- this duplicate job isn't enqueued.
job, err = enqueuer.EnqueueUniqueIn(
"send_welcome_email", 300, work.Q{"email_address": "abc@gmail.com"}) // job != nil (diff email_address)

或者,您也可以提供自己的键,以确保作业的唯一性。当另一个作业以与队列中已存在的作业相同的键排队时,它只会更新参数。

enqueuer := work.NewEnqueuer("application_namespace", redisPool)
job, err := enqueuer.EnqueueUniqueByKey(
"send_welcome_email", work.Q{"email_address": "xyz@gmail.com"}, map[string]interface{}{"my_key": "586"})
job, err = enqueuer.EnqueueUniqueInByKey(
"send_welcome_email", 120, work.Q{"email_address": "abc@gmail.com"}, map[string]interface{}{"my_key": "586"})

定期排队(Cron)
您可以使用工作池定期在 Gocraft/工作集群上排队作业。

pool := work.NewWorkerPool(Context{}, 10, "application_namespace", redisPool)
pool.PeriodicallyEnqueue(
"0 * * * * *", "send_welcome_email") // 这将每分钟触发一次 "send_welcome_email "任务
pool.Job(
"send_welcome_email", (*Context).SendWelcomeEmail) // 仍然需要为这项工作单独注册一个处理程序


作业并发
您可以使用控制作业并发性JobOptions{MaxConcurrency: <num>}。与 WorkerPool 并发不同,它控制单个 Redis 实例中可同时处于活动状态的该类型作业的数量限制。这是通过在排队函数上设置一个前提条件来实现的,这意味着如果我们达到或超过作业的限制,则不会安排新的作业MaxConcurrency。Redis 键(请参阅 参考资料redis.go::redisKeyJobsLock)用作计数信号量,以便跟踪每种作业类型的作业并发性。默认值为0,表示“作业并发数没有限制”。

注意:如果您想“单线程”运行作业,那么您可以进行MaxConcurrency相应的设置:

worker_pool.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, (*Context).WorkFxn)

以下是Gocraft /work的关键概念和功能的简化细分:

作业队列:

  • 每个作业都有以其自身命名的队列。
  • Redis 中的队列是基于列表的。
  • 使用 LPUSH 将作业排队到相应的队列中。

作业重点:

  • 每个队列可以有一个优先级 (1–100000)。
  • 优先级越高的队列被工作人员选择的机会就越高。
  • 工作人员根据他们的相对偏好概率选择队列。

工作处理:

  1. 工作者从所选队列中获取作业。
  2. 该作业会通过 Lua 脚本自动移至“进行中”队列。
  3. Worker 执行作业并增加作业锁。
  4. 如果成功,该作业将从“进行中”队列中删除。
  5. 如果不成功,作业将根据剩余的重试次数移至“重试”或“死亡”队列。

作业重试和预定作业:

  • “重试”和“计划作业”队列被实现为 Redis z 集。
  • 分数是执行的时间戳。
  • “重新排队器”定期检查这些队列并将就绪作业移至常用队列。

工作者worker和worker池:

  • Worker 池管理 Worker 并提供公共 API。
  • 启动/停止池、附加作业/中间件以及设置并发限制。
  • 每个池根据并发设置启动 N 个工作协程。
  • 每个worker独立地从Redis获取作业、执行它们并循环。

独特且定期的工作:

  • 独特的作业确保每个队列仅存在一个具有特定参数的实例。
  • 定期作业根据 cron 计划使用“定期排队器”进行排队。

其他组件:

  • Reaper:如果工作线程崩溃,则重新排队“正在进行”队列中的作业。
  • 暂停的作业:可以通过设置 Redis 键来暂停队列中的作业。
  • 死作业:超过重试限制的作业将移至死队列。
  • 术语:澄清“工作者worker”、“队列queue”和“作业类型job”等术语。

结论
这个深入的指南探索了Gocraft库的复杂性,为您提供了有效管理 Go 应用程序中后台作业的知识。我们解释了作业队列、优先级、重试和工作池等关键概念,并探索了周期性作业、独特作业和死信等强大功能。
通过使用Gocraft,您可以利用后台处理的强大功能来提高应用程序的响应能力、可扩展性和整体用户体验。