Redis 和 Golang这对充满活力的组合将彻底改变我们处理消息系统的方式。
Redis 作为内存数据存储以其速度和多功能性而闻名,它与 Golang(一种以其简单性和高性能而闻名的语言)无缝协作,为构建弹性和可扩展的消息基础设施提供了引人注目的解决方案。
在本文中,我们将使用顶级消息代理 Redis 在 Go 应用程序中实现发布-订阅模式。这种模式增强了可扩展性,支持跨节点的繁重异步任务,并支持事件驱动架构、数据转换等。我们将使用 Docker 来轻松管理和部署。本文假设您已经在系统中安装了Docker和Go,并且已经设置了 Go 项目。
设置 docker 和配置文件
第一步是将 go-redis 库合并到我们的主 Dockerfile 中,我们将通过包含命令来丰富它:RUN go get github.com/go-redis/redis。精炼后的 Dockerfile 将如下所示:
FROM golang:1.18-alpine
WORKDIR /redis_docker
# Copy everything from this project into the filesystem of the container. COPY . .
# Obtain the package needed to run redis commands. RUN go get github.com/go-redis/redis
RUN go mod tidy
# Compile the binary exe for our app. RUN go build -o main . # Start the application. CMD ["./main"]
|
由于 Redis 是以独立服务的形式运行的,因此在我们的 docker-compose 文件中建立一个 Redis 服务以将其无缝集成到我们的 Docker 环境中是非常必要的。在服务部分,我们将引入以下键值对来协调 Redis 服务和 Go 应用程序。
services: redis: container_name: "redis" image: redis:alpine # Specify the redis.conf file to use command: redis-server /usr/local/etc/redis/redis.conf ports: - "6379:6379" web: container_name: "redisapi" build: # build the image using the Dockerfile we have in this project. Can use an image instead. context: . ports: - "8080:8080"
|
显示 redis.conf 文件:
# Required ########## # 将内存使用限制设置为指定的字节数。 # 当达到内存限制时,Redis 将尝试根据所选的驱逐策略(参见 maxmemory-policy)移除 键值。 # 根据所选的驱逐策略(请参阅 maxmemory-policy)删除。 maxmemory 41943040
|
Redis Pub-sub 服务
Redis Pub-Sub是Publish-Subscribe的缩写,是一种消息传递模式,在这种模式下,消息发送者(发布者)可以使用不同的通道向多个接收者(订阅者)分发消息,而无需在他们之间进行直接通信,这些通道将监听事件。
在我们的 Go 应用程序中,必须初始化 Redis 服务,以便无缝使用。请使用以下代码片段实现初始化:
import ( "errors" "github.com/go-redis/redis" )
type Redis struct { RedisClient redis.Client }
func NewRedis( env Env) Redis {
var client = redis.NewClient(&redis.Options{ // Container name + port since we are using docker Addr: "redis:6379", Password: env.RedisPassword, })
if client == nil { errors.New("Cannot run redis") }
return Redis{ RedisClient: *client, } }
|
在前面介绍的 Redis 服务的基础上,我们现在将构建一个发布-订阅机制。首先,我们将创建一个接受新上下文、待处理消息和通道/队列名称作为参数的发布者函数。该函数将负责将事件分派到指定的通道或队列。
// MessagePublisher 是一个通用的发布器,用于发布不同类型的消息。 type MessagePublisher struct { redisClient infrastructure.Redis }
func NewMessagePublisher(redisClient infrastructure.Redis) *MessagePublisher { return &MessagePublisher{redisClient} }
// PublishMessages 向 channels 发布消息。 func (p *MessagePublisher) PublishMessages(ctx context.Context, message interface{}, queueName string) {
serializedMessage, err := json.Marshal(message) if err != nil { log.Printf("[%s] Failed to serialize message: %v", queueName, err) }
// 使用发布操作的上下文 err = p.redisClient.RedisClient.Publish(queueName, serializedMessage).Err() if err != nil { log.Printf("[%s] Failed to publish message: %v", queueName, err) } }
|
上述发布者可在项目的任何地方使用,具体如下:
/* * 为 redis 发布器创建新的上下文。 * 需要创建新的上下文,就像我们使用应用程序的当前上下文一样、 * 如果应用程序关闭,它就会关闭,但我们不想关闭 redis 发布子服务。 */ ctx, cancel := context.WithCancel(context.Background()) defer cancel()
// Publish the message(context, message, queue Name) redisPublisher.PublishMessages(ctx, "Test Message", "Test")
|
接下来,让我们利用 Redis 服务建立一个订阅者。这个熟练的订阅者会监听 "测试 "通道上的事件,有效处理通过 PublishMessages 函数发送的数据。
//消费者是不同信息类型的通用消费者 type MessageConsumer struct { redisClient infrastructure.Redis subscription *redis.PubSub }
// NewMessageConsumer 创建 MessageConsumer 的新实例。 func NewMessageConsumer(redis infrastructure.Redis) *MessageConsumer { return &MessageConsumer{ redisClient: redis, } }
//该函数接收数组中的队列名称,并使用切换语句来执行队列所需的逻辑 func (c *MessageConsumer) ConsumerMessages(ctx context.Context, queueNames []string) { for _, queueName := range queueNames { switch queueName { case "Test": // We will handle the go routines in the custom function go c.handleCustomType1Logic(ctx, queueName) default: log.Printf("[%s] Unsupported message type: %+v\n", queueName, queueName) } } }
// handleCustomType1Logic 启动一个 goroutine 来处理来自指定队列的消息。 func (c *MessageConsumer) handleCustomType1Logic(ctx context.Context, queueName string) {
// 创建取消上下文,优雅地停止 goroutine consumerCtx, cancel := context.WithCancel(context.Background()) defer cancel()
log.Printf("[%s] Consumer started listening...\n", queueName)
//订阅指定的 Redis 频道 c.subscription = c.redisClient.RedisClient.Subscribe(queueName) defer c.subscription.Close()
//获取接收信息的通道 channel := c.subscription.Channel()
for { select { // C如果取消主上下文以停止执行程序,则会被置疑 case <-consumerCtx.Done(): log.Printf("[%s] Consumer stopped listening...\n", queueName) return // 监听频道上的来电信息 case msg := <-channel: var messageObj interface{} // 反序列化报文有效载荷 err := json.Unmarshal([]byte(msg.Payload), &messageObj) if err != nil { log.Printf("[%s] Failed to deserialize message: %v", queueName, err) continue }
// 继续你的逻辑:
fmt.Printf("[%s] Received message: %+v\n", queueName, messageObj) } } }
|
引入 MessageConsumer 结构,该结构旨在处理各种类型的消息,它连接到 Redis 并订阅特定队列。ConsumerMessages 方法由队列名称数组驱动,为每个队列执行定制的逻辑。例如,遇到 "Test "队列会触发一个并发例程,以处理与该消息类型相关的自定义逻辑。
handleCustomType1Logic 方法订阅 "测试 "队列,处理传入的消息,并在数据上无缝执行自定义逻辑。您可以随意添加自己的逻辑。
这种周到的订阅者结构,强调明确的关注点分离和有效的错误处理,为在我们的 Go 应用程序中创建一个可扩展的、反应灵敏的消息传递系统奠定了坚实的基础。此外,在创建新的逻辑处理程序时,请务必使用选择语句,以防止应用程序中的程序泄漏。
最终,我们的目标是让订阅者持续监控发送到通道的事件。为了实现这一目标,我们将在 Go 应用程序启动时设置通道。
lifecycle.Append(fx.Hook{ OnStart: func(ctx context.Context) error {
go func() { // Add your queues here consumer.ConsumerMessages(ctx, []string{"Test"})
if env.ServerPort == "" { _ = handler.Gin.Run() } else { _ = handler.Gin.Run(":" + env.ServerPort) } }()
return nil }, })
|
不要忘记在 OnStart 函数内的 ConsumerMessages 方法中向队列数组添加新队列。
结论
总而言之,本文探讨了 Redis 和 Golang 如何协同工作以实现更好的消息传递。我们使用 Docker 来简化设置。在我们的 Go 应用程序中开始工作时,我们确保它一直在监听事件。我们创建的组织良好的消息处理程序就像一个可靠的工作者,确保消息得到顺利处理。这说明了 Redis 和 Golang 如何以其简洁性成为解决通信难题和构建强大消息系统的最佳搭档。