package-tracking-app: Golang+RabbitMQ实时包裹跟踪应用


此应用程序使用车辆信息提供实时包裹位置信息,因为车辆携带包裹。所以它回答了我的包裹现在在哪里,它要去哪里?
架构:


Websocket 处理程序:
一旦客户端和服务器都发送了他们的握手,并且如果握手成功,那么数据传输部分就开始了。这是一个双向通信通道,每一方都可以独立于另一方随意发送数据。

func (p *PackageHandler) TrackByVehicleID(c echo.Context) error {
    wsConn, err := p.upgrader.Upgrade(c.Response(), c.Request(), nil)
    if err != nil {
        return err
    }

    ctx, cancelFunc := context.WithCancel(context.Background())

    go func() {
        _, _, err = wsConn.ReadMessage()
        if err != nil {
            cancelFunc()
        }
    }()

    for {
        select {
        case <-ctx.Done():
            wsConn.Close()
            return nil
        default:
            p, err := p.PUsecase.TrackByVehicleID(ctx, c.Param("vehicleId"))
            if err != nil {
                c.Logger().Error(err)
                continue
            }

            err = wsConn.WriteJSON(p)
            if err != nil {
                c.Logger().Error(err)
            }
        }
    }
}

upgrader.Upgrade将HTTP服务器连接升级为WebSocket协议。它检查握手过程;例如,它检查请求头是否正确,如 upgrade=Websocket 和 connection=Upgrade 等。

为了处理WebSocket的关闭握手,我使用wsConn.ReadMessage() 当客户端导航到另一个页面或类似ReadMessage()返回非零的错误。当错误不为零时,我就调用上下文的取消函数。在这样做的时候,上下文的完成通道被关闭,所以我们可以关闭底层的TCP连接,并从我们的处理程序中返回,如下图中的第一个选择情况。

在选择语句默认状态下,我们可以监听我们的package_status队列;当新的包裹状态到达时,我们可以将其信息传递给WebSocket。注意:p.PUseCase.TrackByVehicleID(ctx, vehicleID)这个方法是基于<-chan amqp.Delivery . 当一个新的消息到达我们的队列时,我们从这个通道获得包裹信息。

包裹用例

func (p *packageUsecase) TrackByVehicleID(ctx context.Context, id string) (*domain.Package, error) {
    bytes, err := p.pc.ConsumeByVehicleID(ctx, id)
    if err != nil {
        return nil, err
    }

    var res domain.Package
    err = json.Unmarshal(bytes, &res)
    return &res, err
}

在我们的用例中,那里没有特定的规则。因此,我们可以从 RabbitMQ 客户端获得字节消息,并将我们的包结构格式化。

RabbitMQ 客户端
我打开一个 TCP 连接和其中的一个通道(虚拟 AMQP 连接)。通道是全双工的,这意味着一个通道可以同时用于发布和消费消息。

我用声明关键字配置队列--如果不存在则创建,否则继续--并在其上注册消费者通道。

我在(c *rabbitmqClient) ConsumeByVehicleID方法中持续监听。我使用message_id属性来区分消息。

注意:我在main方法上使用(c *rabbitmqClient) Publish方法,只是为了测试。

package client

import (
    "context"
    
"errors"
    
"fmt"

    
"github.com/Abdulsametileri/package-tracking-app/domain"
    amqp
"github.com/rabbitmq/amqp091-go"
)

const (
    QueueName =
"package_status"
)

type rabbitmqClient struct {
    conn          *amqp.Connection
    ch            *amqp.Channel
    connString    string
    packageStatus <-chan amqp.Delivery
}

func NewRabbitMQClient(connectionString string) (*rabbitmqClient, error) {
    c := &rabbitmqClient{}
    var err error

    c.conn, err = amqp.Dial(connectionString)
    if err != nil {
        return nil, err
    }

    c.ch, err = c.conn.Channel()
    if err != nil {
        return nil, err
    }

    err = c.configureQueue()

    return c, err
}

func (c *rabbitmqClient) ConsumeByVehicleID(ctx context.Context, vehicleID string) (byte, error) {
    for msg := range c.packageStatus {
        if msg.MessageId == vehicleID {
            return msg.Body, nil
        }
    }
    return nil, errors.New(
"err when getting package status on channel")
}

func (c *rabbitmqClient) Publish(p domain.Package) {
    jsonStr := fmt.Sprintf(`{
"from": %q, "to": %q, "vehicleId": %q }`, p.From, p.To, p.VehicleID)

    _ = c.ch.Publish(
"", QueueName, false, false, amqp.Publishing{
        ContentType:
"application/json",
        MessageId:   p.VehicleID,
        Body:        byte(jsonStr),
    })
}

func (c *rabbitmqClient) Close() {
    c.ch.Close()
    c.conn.Close()
}

func (c *rabbitmqClient) configureQueue() error {
    _, err := c.ch.QueueDeclare(
        QueueName,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    c.packageStatus, err = c.ch.Consume(
        QueueName,
        
"",
        true,
        false,
        false,
        false,
        nil,
    )
    return err
}

开源项目点击标题