使用 Java JMS 读取和写入 IBM MQ 队列

在本教程中,我们将学习如何使用 Java JMS(Java 消息服务)从 IBM MQ 队列读取和写入消息。

在本文中:

  • 我们探讨设置 JMS 连接、会话和消息生产者/接收者以与 IBM MQ 队列交互的过程。
  • 我们还介绍 IBM MQ 支持的几种消息类型。
  • 此外,我们还重点介绍自定义属性和标头如何增强消息处理。

设置环境
为了避免手动安装和配置的复杂性,我们可以在Docker容器内运行 IBM MQ。我们可以使用以下命令以基本配置运行容器:

docker run -d --name my-mq -e LICENSE=accept -e MQ_QMGR_NAME=QM1 MQ_QUEUE_NAME=QUEUE1 -p 1414:1414 -p 9443:9443 ibmcom/mq

接下来,我们需要在pom.xml文件中添加IBM MQ 客户端:

<dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>com.ibm.mq.allclient</artifactId>
    <version>9.4.0.0</version>
</dependency>

配置JMS连接
首先,我们需要使用QueueConnectionFactory设置 JMS 连接,用于创建与队列管理器的连接:

public class JMSSetup {
    public QueueConnectionFactory createConnectionFactory() throws JMSException {
        MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
        factory.setHostName("localhost");
        factory.setPort(1414);
        factory.setQueueManager(
"QM1");
        factory.setChannel(
"SYSTEM.DEF.SVRCONN"); 
        
        return factory;
    }
}

我们首先创建MQQueueConnectionFactory的一个实例,该实例用于配置和创建与 IBM MQ 服务器的连接。我们将主机名设置为localhost,因为 MQ 服务器在 Docker 容器内本地运行。端口1414从 Docker 容器映射到主机。

然后我们使用默认通道SYSTEM.DEF.SVRCONN。这是客户端连接到 IBM MQ 的通用通道。

将消息写入 IBM MQ 队列
在本节中,我们将介绍向 IBM MQ 队列发送消息的过程。

建立 JMS 连接
首先,我们创建MessageSender类。该类负责建立与 IBM MQ 服务器的连接、管理会话以及处理消息发送操作。我们声明QueueConnectionFactory、QueueConnection、QueueSession和QueueSender的实例变量,这些变量将用于与 IBM MQ 服务器交互。

以下是 IBM MQ 连接设置、会话创建和消息发送的示例实现:

public class MessageSender {
    private QueueConnectionFactory factory;
    private QueueConnection connection;
    private QueueSession session;
    private QueueSender sender;
    public MessageSender() throws JMSException {
        factory = new JMSSetup().createConnectionFactory();
        connection = factory.createQueueConnection();
        session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("QUEUE1");
        sender = session.createSender(queue);
        connection.start();
    }
   
// ...
}

接下来,在MessageSender的构造函数中,我们使用JMSSetup类初始化QueueConnectionFactory。然后使用此工厂创建QueueConnection。此连接允许我们与 IBM MQ 服务器进行交互。

一旦建立连接,我们就使用createQueueSession()创建QueueSession。此会话允许我们与队列进行通信。在这里,我们传递false以指示会话是非事务性的,并传递 Session.AUTO_ACKNOWLEDGE以在收到消息时自动确认消息。

之后,我们定义特定的队列“ QUEUE1 ”,并创建一个QueueSender来处理发送消息。最后,我们启动连接以确保会话处于活动状态并准备好传输消息。

编写文本消息
现在我们已经建立了连接、创建了会话、定义了队列并创建了消息生产者,我们准备向队列发送文本消息:

public void sendMessage(String messageText) {
    try {
        TextMessage message = session.createTextMessage();
        message.setText(messageText);
        sender.send(message);
    } catch (JMSException e) {
        // handle exception
    } finally {
       
// close resources
    }
}

首先,我们创建一个带有messageText参数的sendMessage()方法。sendMessage ( )方法负责向队列发送文本消息。它创建一个TextMessage对象并使用setText()方法设置消息内容。

接下来,我们使用QueueSender对象的send()方法将消息发送到定义的队列。此设计允许高效的消息传输,因为只要MessageSender对象存在,连接和会话就会保持打开状态。

消息类型
除了TextMessage之外,IBM MQ 还支持多种其他消息类型,以满足不同的用例。例如,我们可以发送以下内容:

  • BytesMessage:消息以字节的形式保存原始二进制数据。
  • ObjectMessage:消息携带序列化的 Java 对象。
  • MapMessage:包含键值对的消息。
  • StreamMessage:消息包含原始数据类型的流。

从 IBM MQ 队列读取消息
现在我们已经向队列发送了消息,让我们探索如何从队列中读取消息。

建立 JMS 连接并创建会话
首先,我们需要建立连接并创建会话,类似于发送消息时所做的操作。我们首先创建一个MessageReceiver类。该类处理与 IBM MQ 服务器的连接并设置消息使用所需的组件:

public class MessageReceiver {
    private QueueConnectionFactory factory;
    private QueueConnection connection;
    private QueueSession session;
    private QueueReceiver receiver;
    public MessageReceiver() throws JMSException {
        factory = new JMSSetup().createConnectionFactory();
        connection = factory.createQueueConnection();
        session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("QUEUE1");
        receiver = session.createReceiver(queue);
        connection.start();
    }
   
// ...
}

在这个类中,我们首先创建一个QueueConnectionFactory来设置与 IBM MQ 服务器的连接。然后我们使用此连接创建一个QueueSession,它允许我们与队列进行交互。

最后,我们定义特定的队列“ QUEUE1 ”,并创建一个QueueReceiver来处理来自队列的传入消息。

读消息
一旦设置了连接、会话和接收器,我们就可以开始从队列接收消息。我们使用QueueReceiver的accept()方法从指定队列中提取消息:

public void receiveMessage() {
    try {
        Message message = receiver.receive(1000);
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
        } else {
            // ...
        }
    } catch (JMSException e) {
       
// handle exception
    } finally {
       
// close resources
    }
}

在receiveMessage()方法中,我们使用receive()函数等待队列中的消息,超时时间为1000毫秒。收到消息后,我们检查它是否为TextMessage类型。

如果是,我们可以使用getText()方法检索实际的消息内容,该方法以字符串形式返回文本内容。

消息属性和标头
在本节中,我们将讨论在发送或接收消息时可以使用的一些常用消息属性和标头。

消息属性
消息属性可用于存储和检索消息正文以外的其他信息。这对于过滤消息或向消息添加上下文数据特别有用。以下是我们在发送消息时设置自定义属性的方法:

TextMessage message = session.createTextMessage();
message.setText(messageText);
message.setStringProperty("OrderID", "12345");

接下来,我们可以在收到消息时检索属性:

Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    String orderID = message.getStringProperty("OrderID");

 消息头
邮件标头提供包含有关邮件的元数据的预定义字段。一些常用的邮件标头包括:

  • JMSMessageID:JMS 提供商为每条消息分配的唯一标识符。我们可以使用此 ID 来跟踪和记录消息。
  • JMSExpiration:定义消息过期时间(毫秒)。如果消息未在此时间内送达,则会被丢弃。
  • JMSTimestamp:消息发送的时间。
  • JMSPriority:消息的优先级。
让我们看看如何在接收消息时检索消息头:

Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    String messageId = message.getJMSMessageID();
    long timestamp = message.getJMSTimestamp();
    long expiration = message.getJMSExpiration();
    int priority = message.getJMSPriority();
}

使用 Mockito 进行模拟测试
在本节中,我们将使用Mockito模拟依赖项并验证MessageSender和MessageReceiver类的交互。我们首先使用@Mock注释创建依赖项的模拟实例。

接下来,我们验证sendMessage()方法是否与模拟的QueueSender正确交互。我们模拟QueueSender的send()方法,并验证TextMessage是否正确创建:

String messageText = "Hello Baeldung! Nice to meet you!";
doNothing().when(sender).send(any(TextMessage.class));
messageSender.sendMessage(messageText);
verify(sender).send(any(TextMessage.class));
verify(textMessage).setText(messageText);

最后,我们验证了receiveMessage()方法是否与模拟的QueueReceiver正确交互。我们模拟receive()方法以返回预定义的TextMessage,并且消息文本按预期检索:

when(receiver.receive(anyLong())).thenReturn(textMessage);
when(textMessage.getText()).thenReturn("Hello Baeldung! Nice to meet you!");
messageReceiver.receiveMessage();
verify(textMessage).getText();