RabbitMQ中发布者通过确认机制确保消息发布

在本教程中,我们将学习如何使用发布者确认来确保将消息发布到RabbitMQ代理。然后,我们将了解如何使用消费者确认来告知代理我们已成功使用消息。

场景
在简单的应用程序中,我们在使用 RabbitMQ 时经常会忽略显式确认机制,而是依赖于向队列发布基本消息并在使用时自动确认消息。然而,尽管 RabbitMQ 拥有强大的基础架构,但仍可能出现错误,因此需要一种方法来仔细检查消息是否已传送到代理并确认消息是否已成功使用。这就是发布者确认和消费者确认发挥作用的地方,它们提供了安全网。

 等待发布者确认
即使我们的应用程序没有错误,发布的消息也可能丢失。例如,由于不明原因的网络错误,消息可能会在传输过程中丢失。为了避免这种情况,AMQP 提供了事务语义来保证消息不会丢失。然而,这需要付出巨大的代价。由于事务繁重,处理消息的时间可能会显著增加,尤其是在大量事务的情况下。

相反,我们将采用确认模式,尽管这会带来一些开销,但比事务更快。此模式指示客户端和代理启动消息计数。随后,客户端使用代理发回的带有相应数字的交付标签来验证此计数。此过程可确保消息的安全存储,以便随后分发给消费者。

要进入确认模式,我们需要在我们的频道上调用一次:

channel.confirmSelect();

确认可能需要一些时间,尤其是对于持久队列,因为存在 IO 延迟。因此,RabbitMQ 异步等待确认,但提供了在我们的应用程序中使用的同步方法:

  • Channel.waitForConfirms() —阻止执行,直到自上次调用以来的所有消息都被代理 ACK(确认)或 NACK(拒绝)。
  • Channel.waitForConfirms(timeout) —与上面的相同,但我们可以将等待时间限制为毫秒。否则,我们将收到TimeoutException。
  • Channel.waitForConfirmsOrDie() —如果自上次调用以来有任何消息被 NACK,则此方法会抛出异常。如果我们不能容忍任何消息丢失,则此方法很有用。
  • Channel.waitForConfirmsOrDie(timeout) —与上面相同,但有超时。

发布者设置
让我们从发布消息的常规类开始。我们只会接收要连接的频道和队列:

class UuidPublisher {
    private Channel channel;
    private String queue;
    public UuidPublisher(Channel channel, String queue) {
        this.channel = channel;
        this.queue = queue;
    }
}

然后,我们添加一个发布字符串消息的方法:

public void send(String message) throws IOException {
    channel.basicPublish("", queue, null, message.getBytes());
}

当我们以这种方式发送消息时,我们可能会在传输过程中丢失它们,因此让我们包含一些代码来确保代理安全地接收我们的消息。

在频道上启动确认模式
我们首先修改构造函数,最后在通道上调用confirmSelect() 。这是必要的,这样我们才能在通道上使用“wait”方法:

public UuidPublisher(Channel channel, String queue) throws IOException {
    // ...
    this.channel.confirmSelect();
}

如果我们尝试在不进入确认模式的情况下等待确认,我们将得到一个IllegalStateException。然后,我们将选择一种同步wait()方法,并在使用send()方法发布消息后调用它。让我们等待超时,这样我们就可以确保我们永远不会永远等待:

public boolean send(String message) throws Exception {
    channel.basicPublish("", queue, null, message.getBytes());
    return channel.waitForConfirms(1000);
}

返回true 表示代理已成功接收消息。如果我们要发送几条消息,这种方法很有效。

批量确认已发布的消息
由于确认消息需要时间,因此我们不应该在每次发布后等待确认。相反,我们应该在等待确认之前发送一堆消息。让我们修改我们的方法来接收消息列表,并且仅在发送完所有消息后等待:

public void sendAllOrDie(List<String> messages) throws Exception {
    for (String message : messages) {
        channel.basicPublish("", queue, null, message.getBytes());
    }
    channel.waitForConfirmsOrDie(1000);
}

这次,我们使用waitForConfirmsOrDie(),因为如果waitForConfirms()返回false,则意味着代理拒绝了未知数量的消息。虽然这确保了如果任何消息被拒绝,我们都会收到异常,但我们无法判断哪些消息失败了。

利用确认模式保证批量发布
使用确认模式时,还可以在我们的频道上注册一个ConfirmListener 。此侦听器需要两个回调处理程序:一个用于成功交付,另一个用于代理失败。这样,我们可以实现一种机制来确保没有消息遗漏。我们将从将此侦听器添加到我们的频道的方法开始:

private void createConfirmListener() {
    this.channel.addConfirmListener(
      (tag, multiple) -> {
        // ...
      }, 
      (tag, multiple) -> {
       
// ...
      }
    );
}

在回调中,tag参数指的是消息的顺序投递标签,而multiple表示这是否确认了多条消息。在这种情况下, tag参数将指向最新确认的标签。相反,如果最后一个回调是 NACK,则所有投递标签大于最新 NACK 回调标签的消息也将被确认。

为了协调这些回调,我们将未确认的消息保存在ConcurrentSkipListMap中。我们将使用其标签号作为键将待处理的消息放在那里。这样,我们可以调用headMap()并获取到我们现在收到的标签之前的所有先前消息的视图:

private ConcurrentNavigableMap<Long, PendingMessage> pendingDelivery = new ConcurrentSkipListMap<>();

已确认消息的回调将从我们的地图中删除所有标记的消息:

(tag, multiple) -> {
    ConcurrentNavigableMap<Long, PendingMessage> confirmed = pendingDelivery.headMap(tag, true);
    confirmed.clear();
}

如果multiple为false , headMap ()将包含单个项目,否则将包含多个项目。因此,我们不需要检查是否收到了多条消息的确认。

实现被拒绝消息的重试机制
我们将为被拒绝消息的回调实现重试机制。此外,我们将包含最大重试次数,以避免永远重试的情况。让我们从一个保存消息当前尝试次数的类开始,以及一个增加此计数器的简单方法:

public class PendingMessage {
    private int tries;
    private String body;
    public PendingMessage(String body) {
        this.body = body;
    }
    public int incrementTries() {
        return ++this.tries;
    }
    // standard getters
}

现在,让我们使用它来实现回调。我们首先获取被拒绝的消息的视图,然后删除已超过最大尝试次数的任何项目:

(tag, multiple) -> {
    ConcurrentNavigableMap<Long, PendingMessage> failed = pendingDelivery.headMap(tag, true);
    failed.values().removeIf(pending -> {
        return pending.incrementTries() >= MAX_TRIES;
    });
    // ...
}

然后,如果我们仍有待处理的消息,我们会再次发送它们。这次,如果我们的应用程序发生意外错误,我们还会删除该消息:

if (!pendingDelivery.isEmpty()) {
    pendingDelivery.values().removeIf(message -> {
        try {
            channel.basicPublish("", queue, null, message.getBody().getBytes());
            return false;
        } catch (IOException e) {
            return true;
        }
    });
}

综合起来
最后,我们可以创建一个新方法,该方法可以批量发送消息,但可以检测被拒绝的消息并尝试再次发送。我们必须在通道上调用getNextPublishSeqNo()来找出我们的消息标签:

public void sendOrRetry(List<String> messages) throws IOException {
    createConfirmListener();
    for (String message : messages) {
        long tag = channel.getNextPublishSeqNo();
        pendingDelivery.put(tag, new PendingMessage(message));
        channel.basicPublish("", queue, null, message.getBytes());
    }
}

我们在发布消息之前创建监听器;否则,我们将不会收到确认。这将创建一个接收回调的循环,直到我们成功发送或重试所有消息。

发送消费者发货确认消息
在研究手动确认之前,让我们先看一个没有手动确认的示例。使用自动确认时,只要代理将消息发送给消费者,即认为该消息已成功送达。让我们看一个简单的示例:

public class UuidConsumer {
    private String queue;
    private Channel channel;
    // all-args constructor
    public void consume() throws IOException {
        channel.basicConsume(queue, true, (consumerTag, delivery) -> {
           
// processing...
        }, cancelledTag -> {
           
// logging...
        });
    }
}

通过autoAck参数将true传递 给basicConsume()时,将激活自动确认。尽管这快速而直接,但它并不安全,因为代理会在我们处理消息之前丢弃它。因此,最安全的选择是停用它,并在通道上使用basickAck()发送手动确认,保证消息在退出队列之前得到成功处理:

channel.basicConsume(queue, false, (consumerTag, delivery) -> {
    long deliveryTag = delivery.getEnvelope().getDeliveryTag();
    // processing...
    channel.basicAck(deliveryTag, false);
}, cancelledTag -> {
   
// logging...
});

最简单的形式是,我们在处理完每条消息后确认它。我们使用收到的相同交付标签来确认消费。最重要的是,要发出单独确认信号,我们必须将false传递给basicAck()。这可能非常慢,所以让我们看看如何改进它。

定义频道上的基本 QoS
通常,RabbitMQ 会在消息可用时立即推送消息。我们将在频道上设置必要的服务质量设置以避免这种情况。因此,让我们在构造函数中包含一个batchSize参数,并将其传递给频道上的basicQos(),这样只会预取此数量的消息:

public class UuidConsumer {
    // ...
    private int batchSize;
    public UuidConsumer(Channel channel, String queue, int batchSize) throws IOException {
       
// ...
        this.batchSize = batchSize;
        channel.basicQos(batchSize);
    }
}

这有助于在我们处理能够处理的消息的同时,让其他消费者能够获取消息。

定义确认策略
我们不必向处理的每条消息发送 ACK,而是在每次达到批处理大小时发送一个 ACK​​,从而提高性能。为了实现更完整的场景,我们引入一个简单的处理方法。如果我们可以将消息解析为 UUID,则认为该消息已处理:

private boolean process(String message) {
    try {
        UUID.fromString(message);
        return true;
    } catch (IllegalArgumentException e) {
        return false;
    }
}

现在,让我们用一个用于发送批量确认的基本框架来修改我们的consume()方法:

channel.basicConsume(queue, false, (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    long deliveryTag = delivery.getEnvelope().getDeliveryTag();
    if (!process(message)) {
       
// ...
    } else if (deliveryTag % batchSize == 0) {
       
// ...
    } else {
       
// ...
    }
}

如果无法处理该消息,我们将 NACK 该消息,并检查是否已达到批处理大小以 ACK 待处理的消息。否则,我们将存储待处理 ACK 的交付标签,以便在以后的迭代中发送。我们将把它存储在类变量中:

private AtomicLong pendingTag = new AtomicLong();

拒绝消息
如果我们不想要或无法处理消息,我们会拒绝它们;拒绝后,我们可以重新排队。例如,如果我们超出容量并希望另一个消费者接收它而不是告诉代理丢弃它,重新排队很有用。我们有两种方法可以实现这一点:

  • channel.basicReject(deliveryTag, requeue)  —拒绝单条消息,并可选择重新排队或丢弃。
  • channel.basicNack(deliveryTag, multiple, requeue) — 与上面相同,但可以选择批量拒绝。将true传递给multiple将拒绝自上次 ACK 到当前传递标签的所有消息。

由于我们要逐条拒绝消息,因此我们将使用第一个选项。如果有待处理的 ACK,我们将发送它并重置变量。最后,我们拒绝该消息:

if (!process(message, deliveryTag)) {
    if (pendingTag.get() != 0) {
        channel.basicAck(pendingTag.get(), true);
        pendingTag.set(0);
    }
    channel.basicReject(deliveryTag, false);
}

批量确认消息
由于交付标签是连续的,我们可以使用模数运算符来检查是否已达到批处理大小。 如果已达到,我们将发送 ACK 并重置未决标签。 这次,将true传递给“ multiple”参数至关重要,以便代理知道我们已成功处理了包括当前交付标签在内的所有消息:

else if (deliveryTag % batchSize == 0) {
    channel.basicAck(deliveryTag, true);
    pendingTag.set(0);
} else {
    pendingTag.set(deliveryTag);
}

否则,我们只需设置待处理标签以在另一次迭代中检查它。此外,为同一标签发送多个确认将导致RabbitMQ出现“ PRECONDITION_FAILED - 未知交付标签”错误。

需要注意的是,当使用多个标志发送 ACK 时,我们必须考虑由于没有更多消息需要处理而永远无法达到批处理大小的情况。一种选择是保留一个观察线程,定期检查是否有待处理的 ACK 需要发送。

结论
在本文中,我们探讨了 RabbitMQ 中发布者确认和消费者确认的功能,这些功能对于确保分布式系统中的数据安全性和稳健性至关重要。

发布者确认使我们能够验证消息是否已成功传输到 RabbitMQ 代理,从而降低消息丢失的风险。消费者确认通过确认消息消费来实现受控且有弹性的消息处理。