Go中使用Redis实现消息队列教程

Redis 和 Golang这对充满活力的组合将彻底改变我们处理消息系统的方式。

Redis 作为内存数据存储以其速度和多功能性而闻名,它与 Golang(一种以其简单性和高性能而闻名的语言)无缝协作,为构建弹性和可扩展的消息基础设施提供了引人注目的解决方案。

在本文中,我们将使用顶级消息代理 Redis 在 Go 应用程序中实现发布-订阅模式。这种模式增强了可扩展性,支持跨节点的繁重异步任务,并支持事件驱动架构、数据转换等。我们将使用 Docker 来轻松管理和部署。本文假设您已经在系统中安装了DockerGo,并且已经设置了 Go 项目。

设置 docker 和配置文件
第一步是将 go-redis 库合并到我们的主 Dockerfile 中,我们将通过包含命令来丰富它:RUN go get github.com/go-redis/redis。精炼后的 Dockerfile 将如下所示:

FROM golang:1.18-alpine

WORKDIR /redis_docker

# Copy everything from this project into the filesystem of the container.
COPY . .

# Obtain the package needed to run redis commands.
RUN go get github.com/go-redis/redis

RUN go mod tidy

# Compile the binary exe for our app.
RUN go build -o main .
# Start the application.
CMD ["./main"]

由于 Redis 是以独立服务的形式运行的,因此在我们的 docker-compose 文件中建立一个 Redis 服务以将其无缝集成到我们的 Docker 环境中是非常必要的。在服务部分,我们将引入以下键值对来协调 Redis 服务和 Go 应用程序。

services:
  redis:
    container_name: "redis"
    image: redis:alpine
    # Specify the redis.conf file to use
    command: redis-server /usr/local/etc/redis/redis.conf
    ports:
      -
"6379:6379"
  web:
    container_name:
"redisapi"
    build:
      # build the image using the Dockerfile we have in this project. Can use an image instead.
      context: .
    ports:
      -
"8080:8080"

显示 redis.conf 文件:

# Required
##########
# 将内存使用限制设置为指定的字节数。
# 当达到内存限制时,Redis 将尝试根据所选的驱逐策略(参见 maxmemory-policy)移除 键值。
# 根据所选的驱逐策略(请参阅 maxmemory-policy)删除。
maxmemory 41943040

Redis Pub-sub 服务
Redis Pub-Sub是Publish-Subscribe的缩写,是一种消息传递模式,在这种模式下,消息发送者(发布者)可以使用不同的通道向多个接收者(订阅者)分发消息,而无需在他们之间进行直接通信,这些通道将监听事件。

在我们的 Go 应用程序中,必须初始化 Redis 服务,以便无缝使用。请使用以下代码片段实现初始化:

import (
 "errors"
 
"github.com/go-redis/redis"
)

type Redis struct {
 RedisClient redis.Client
}

func NewRedis( env Env) Redis {

 var client = redis.NewClient(&redis.Options{
 
// Container name + port since we are using docker
  Addr:    
"redis:6379",
  Password: env.RedisPassword,
 })

 if client == nil {
    errors.New(
"Cannot run redis")
 }

 return Redis{
  RedisClient: *client,
 }
}

在前面介绍的 Redis 服务的基础上,我们现在将构建一个发布-订阅机制。首先,我们将创建一个接受新上下文、待处理消息和通道/队列名称作为参数的发布者函数。该函数将负责将事件分派到指定的通道或队列。

// MessagePublisher 是一个通用的发布器,用于发布不同类型的消息。
type MessagePublisher struct {
 redisClient infrastructure.Redis
}

func NewMessagePublisher(redisClient infrastructure.Redis) *MessagePublisher {
 return &MessagePublisher{redisClient}
}

// PublishMessages 向  channels 发布消息。
func (p *MessagePublisher) PublishMessages(ctx context.Context, message interface{}, queueName string) {

 serializedMessage, err := json.Marshal(message)
 if err != nil {
  log.Printf(
"[%s] Failed to serialize message: %v", queueName, err)
 }

 
// 使用发布操作的上下文
 err = p.redisClient.RedisClient.Publish(queueName, serializedMessage).Err()
 if err != nil {
  log.Printf(
"[%s] Failed to publish message: %v", queueName, err)
 }
}

上述发布者可在项目的任何地方使用,具体如下:

/*
 * 为 redis 发布器创建新的上下文。
 * 需要创建新的上下文,就像我们使用应用程序的当前上下文一样、
 * 如果应用程序关闭,它就会关闭,但我们不想关闭 redis 发布子服务。
*/

 ctx, cancel := context.WithCancel(context.Background())
 defer cancel()

 
// Publish the message(context, message, queue Name)
 redisPublisher.PublishMessages(ctx,
"Test Message", "Test")

接下来,让我们利用 Redis 服务建立一个订阅者。这个熟练的订阅者会监听 "测试 "通道上的事件,有效处理通过 PublishMessages 函数发送的数据。

//消费者是不同信息类型的通用消费者
type MessageConsumer struct {
 redisClient  infrastructure.Redis
 subscription *redis.PubSub
}

// NewMessageConsumer 创建 MessageConsumer 的新实例。
func NewMessageConsumer(redis infrastructure.Redis) *MessageConsumer {
 return &MessageConsumer{
  redisClient: redis,
 }
}

//该函数接收数组中的队列名称,并使用切换语句来执行队列所需的逻辑
func (c *MessageConsumer) ConsumerMessages(ctx context.Context, queueNames []string) {
 for _, queueName := range queueNames {
  switch queueName {
  case
"Test":
   
// We will handle the go routines in the custom function
   go c.handleCustomType1Logic(ctx, queueName)
  default:
   log.Printf(
"[%s] Unsupported message type: %+v\n", queueName, queueName)
  }
 }
}

// handleCustomType1Logic 启动一个 goroutine 来处理来自指定队列的消息。
func (c *MessageConsumer) handleCustomType1Logic(ctx context.Context, queueName string) {

// 创建取消上下文,优雅地停止 goroutine
 consumerCtx, cancel := context.WithCancel(context.Background())
 defer cancel()

 log.Printf(
"[%s] Consumer started listening...\n", queueName)

 
//订阅指定的 Redis 频道
 c.subscription = c.redisClient.RedisClient.Subscribe(queueName)
 defer c.subscription.Close()

//获取接收信息的通道
 channel := c.subscription.Channel()

 for {
  select {
// C如果取消主上下文以停止执行程序,则会被置疑
  case <-consumerCtx.Done():
   log.Printf(
"[%s] Consumer stopped listening...\n", queueName)
   return
// 监听频道上的来电信息
  case msg := <-channel:
   var messageObj interface{}
// 反序列化报文有效载荷
   err := json.Unmarshal([]byte(msg.Payload), &messageObj)
   if err != nil {
    log.Printf(
"[%s] Failed to deserialize message: %v", queueName, err)
    continue
   }

   
// 继续你的逻辑:

   fmt.Printf(
"[%s] Received message: %+v\n", queueName, messageObj)
  }
 }
}

引入 MessageConsumer 结构,该结构旨在处理各种类型的消息,它连接到 Redis 并订阅特定队列。ConsumerMessages 方法由队列名称数组驱动,为每个队列执行定制的逻辑。例如,遇到 "Test "队列会触发一个并发例程,以处理与该消息类型相关的自定义逻辑。

handleCustomType1Logic 方法订阅 "测试 "队列,处理传入的消息,并在数据上无缝执行自定义逻辑。您可以随意添加自己的逻辑。

这种周到的订阅者结构,强调明确的关注点分离和有效的错误处理,为在我们的 Go 应用程序中创建一个可扩展的、反应灵敏的消息传递系统奠定了坚实的基础。此外,在创建新的逻辑处理程序时,请务必使用选择语句,以防止应用程序中的程序泄漏。

最终,我们的目标是让订阅者持续监控发送到通道的事件。为了实现这一目标,我们将在 Go 应用程序启动时设置通道。

lifecycle.Append(fx.Hook{
 OnStart: func(ctx context.Context) error {

  go func() {
   // Add your queues here
   consumer.ConsumerMessages(ctx, []string{
"Test"})

   if env.ServerPort ==
"" {
    _ = handler.Gin.Run()
   } else {
    _ = handler.Gin.Run(
":" + env.ServerPort)
   }
  }()

  return nil
 },
})

不要忘记在 OnStart 函数内的 ConsumerMessages 方法中向队列数组添加新队列。

结论
总而言之,本文探讨了 Redis 和 Golang 如何协同工作以实现更好的消息传递。我们使用 Docker 来简化设置。在我们的 Go 应用程序中开始工作时,我们确保它一直在监听事件。我们创建的组织良好的消息处理程序就像一个可靠的工作者,确保消息得到顺利处理。这说明了 Redis 和 Golang 如何以其简洁性成为解决通信难题和构建强大消息系统的最佳搭档。