22-07-13
banq
此应用程序使用车辆信息提供实时包裹位置信息,因为车辆携带包裹。所以它回答了我的包裹现在在哪里,它要去哪里?
架构:
- RabbitMQ与docker-compose
- 带有echo 框架的Websocket
- 基于Clean的架构模板
Websocket 处理程序:
一旦客户端和服务器都发送了他们的握手,并且如果握手成功,那么数据传输部分就开始了。这是一个双向通信通道,每一方都可以独立于另一方随意发送数据。
func (p *PackageHandler) TrackByVehicleID(c echo.Context) error { wsConn, err := p.upgrader.Upgrade(c.Response(), c.Request(), nil) if err != nil { return err } ctx, cancelFunc := context.WithCancel(context.Background()) go func() { _, _, err = wsConn.ReadMessage() if err != nil { cancelFunc() } }() for { select { case <-ctx.Done(): wsConn.Close() return nil default: p, err := p.PUsecase.TrackByVehicleID(ctx, c.Param("vehicleId")) if err != nil { c.Logger().Error(err) continue } err = wsConn.WriteJSON(p) if err != nil { c.Logger().Error(err) } } } } |
upgrader.Upgrade将HTTP服务器连接升级为WebSocket协议。它检查握手过程;例如,它检查请求头是否正确,如 upgrade=Websocket 和 connection=Upgrade 等。
为了处理WebSocket的关闭握手,我使用wsConn.ReadMessage() 当客户端导航到另一个页面或类似ReadMessage()返回非零的错误。当错误不为零时,我就调用上下文的取消函数。在这样做的时候,上下文的完成通道被关闭,所以我们可以关闭底层的TCP连接,并从我们的处理程序中返回,如下图中的第一个选择情况。
在选择语句默认状态下,我们可以监听我们的package_status队列;当新的包裹状态到达时,我们可以将其信息传递给WebSocket。注意:p.PUseCase.TrackByVehicleID(ctx, vehicleID)这个方法是基于<-chan amqp.Delivery . 当一个新的消息到达我们的队列时,我们从这个通道获得包裹信息。
包裹用例
func (p *packageUsecase) TrackByVehicleID(ctx context.Context, id string) (*domain.Package, error) { bytes, err := p.pc.ConsumeByVehicleID(ctx, id) if err != nil { return nil, err } var res domain.Package err = json.Unmarshal(bytes, &res) return &res, err } |
在我们的用例中,那里没有特定的规则。因此,我们可以从 RabbitMQ 客户端获得字节消息,并将我们的包结构格式化。
RabbitMQ 客户端
我打开一个 TCP 连接和其中的一个通道(虚拟 AMQP 连接)。通道是全双工的,这意味着一个通道可以同时用于发布和消费消息。
我用声明关键字配置队列--如果不存在则创建,否则继续--并在其上注册消费者通道。
我在(c *rabbitmqClient) ConsumeByVehicleID方法中持续监听。我使用message_id属性来区分消息。
注意:我在main方法上使用(c *rabbitmqClient) Publish方法,只是为了测试。
package client import ( "context" "errors" "fmt" "github.com/Abdulsametileri/package-tracking-app/domain" amqp "github.com/rabbitmq/amqp091-go" ) const ( QueueName = "package_status" ) type rabbitmqClient struct { conn *amqp.Connection ch *amqp.Channel connString string packageStatus <-chan amqp.Delivery } func NewRabbitMQClient(connectionString string) (*rabbitmqClient, error) { c := &rabbitmqClient{} var err error c.conn, err = amqp.Dial(connectionString) if err != nil { return nil, err } c.ch, err = c.conn.Channel() if err != nil { return nil, err } err = c.configureQueue() return c, err } func (c *rabbitmqClient) ConsumeByVehicleID(ctx context.Context, vehicleID string) (byte, error) { for msg := range c.packageStatus { if msg.MessageId == vehicleID { return msg.Body, nil } } return nil, errors.New("err when getting package status on channel") } func (c *rabbitmqClient) Publish(p domain.Package) { jsonStr := fmt.Sprintf(`{ "from": %q, "to": %q, "vehicleId": %q }`, p.From, p.To, p.VehicleID) _ = c.ch.Publish("", QueueName, false, false, amqp.Publishing{ ContentType: "application/json", MessageId: p.VehicleID, Body: byte(jsonStr), }) } func (c *rabbitmqClient) Close() { c.ch.Close() c.conn.Close() } func (c *rabbitmqClient) configureQueue() error { _, err := c.ch.QueueDeclare( QueueName, true, false, false, false, nil, ) if err != nil { return err } c.packageStatus, err = c.ch.Consume( QueueName, "", true, false, false, false, nil, ) return err } |
开源项目点击标题
猜你喜欢