在 Kubernetes 上使用Spring Boot+ActiveMQ


本文将教您如何在 Kubernetes 上运行 ActiveMQ,并通过 Spring Boot 将其与您的应用程序集成。我们将使用专门的操作员部署一个集群的 ActiveMQ 代理。然后我们将构建并运行两个 Spring Boot 应用程序:

  1. 第一个在多个实例中运行并从队列接收消息,
  2. 而第二个是向该队列发送消息。

为了测试 ActiveMQ 集群,我们将使用Kind。消费者应用程序使用几种不同的模式连接到集群。

ActiveMQ 也是一个非常流行的消息代理。例如,它支持最新版本的 AMQP 协议,而 Rabbit 则是基于它们对 AMQP 0.9 的扩展。

源代码:
GitHub 存储库。然后进入messaging目录。您将找到三个 Spring Boot 应用程序simple-producer:simple-consumer和simple-counter. 

ActiveMQ Artemis 是 Red Hat 提供的名为AMQ Broker的商业产品的基础。Red Hat 积极开发了一个用于 ActiveMQ 的 Spring Boot 启动器和一个在 Kubernetes 上运行它的操作符。为了访问 Spring Boot,您需要在pom.xml文件中包含 Red Hat Maven 存储库:

<repository>
  <id>red-hat-ga</id>
  <url>https://maven.repository.redhat.com/ga</url>
</repository>

之后,您可以在 Maven 中包含一个启动器pom.xml:

<dependency>
  <groupId>org.amqphub.spring</groupId>
  <artifactId>amqp-10-jms-spring-boot-starter</artifactId>
  <version>2.5.6</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>log4j-over-slf4j</artifactId>
    </exclusion>
  </exclusions>
</dependency>

然后,我们只需要使用@EnableJMS注解为我们的应用程序启用 JMS:

@SpringBootApplication
@EnableJms
public class SimpleConsumer {

   public static void main(String[] args) {
      SpringApplication.run(SimpleConsumer.class, args);
   }

}

我们的应用程序非常简单。它只是接收并打印传入的消息。接收消息的方法应该用 注释@JmsListener。该destination字段包含目标队列的名称。

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
      .getLogger(Listener.class);

   @JmsListener(destination = "test-1")
   public void processMsg(SimpleMessage message) {
      LOG.info(
"============= Received: " + message);
   }

}

这是代表我们信息的类:

public class SimpleMessage implements Serializable {

   private Long id;
   private String source;
   private String content;

   public SimpleMessage() {
   }

   public SimpleMessage(Long id, String source, String content) {
      this.id = id;
      this.source = source;
      this.content = content;
   }

   // ... GETTERS AND SETTERS

   @Override
   public String toString() {
      return
"SimpleMessage{" +
             
"id=" + id +
             
", source='" + source + '\'' +
             
", content='" + content + '\'' +
              '}';
   }
}

最后,我们需要设置连接配置设置。使用 AMQP Spring Boot 启动器非常简单。我们只需要设置属性amqphub.amqp10jms.remoteUrl。现在,我们将基于在 Kubernetes 级别设置的环境变量Deployment。

amqphub.amqp10jms.remoteUrl = ${ARTEMIS_URL}
生产者应用程序非常相似。我们使用 SpringJmsTemplate生成消息并将消息发送到目标队列,而不是用于接收消息的注解。发送消息的方法公开为 HTTPPOST /producer/send端点。

@RestController
@RequestMapping("/producer")
public class ProducerController {

   private static long id = 1;
   private final JmsTemplate jmsTemplate;
   @Value(
"${DESTINATION}")
   private String destination;

   public ProducerController(JmsTemplate jmsTemplate) {
      this.jmsTemplate = jmsTemplate;
   }

   @PostMapping(
"/send")
   public SimpleMessage send(@RequestBody SimpleMessage message) {
      if (message.getId() == null) {
          message.setId(id++);
      }
      jmsTemplate.convertAndSend(destination, message);
      return message;
   }
}

使用 Nginx Ingress 创建 Kind 集群
我们的示例应用程序已准备就绪。在部署它们之前,我们需要准备本地 Kubernetes 集群。我们将在那里部署由三个代理组成的 ActiveMQ 集群。因此,我们的 Kubernetes 集群也将由三个节点组成。因此,在 Kubernetes 上运行了三个消费者应用程序实例。它们通过 AMQP 协议连接到 ActiveMQ 代理。还有一个生产者应用程序实例可以按需发送消息。

为了在本地运行多节点 Kubernetes 集群,我们将使用 Kind。我们不仅会测试通过 AMQP 协议的通信,还会通过 HTTP 公开 ActiveMQ 管理控制台。因为 ActiveMQ 使用无头服务来公开 Web 控制台,所以我们必须在 Kind 上创建和配置 Ingress 才能访问它。让我们开始。

第一步,我们将创建一个 Kind 集群。它由一个控制平面和三个工作人员组成。必须正确准备配置才能运行 Nginx 入口控制器。我们应该将ingress-ready标签添加到单个工作节点并公开端口80和443. 这是 Kind 配置文件的最终版本:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
  - role: control-plane
  - role: worker
    kubeadmConfigPatches:
    - |
      kind: JoinConfiguration
      nodeRegistration:
        kubeletExtraArgs:
          node-labels: "ingress-ready=true"
    extraPortMappings:
    - containerPort: 80
      hostPort: 80
      protocol: TCP
    - containerPort: 443
      hostPort: 443
      protocol: TCP  
  - role: worker
  - role: worker

现在,让我们通过执行以下命令创建一个 Kind 集群:

$ kind create cluster --config kind-config.yaml
如果您的集群已成功创建。

之后,让我们安装 Nginx Ingress Controller。它只是一个命令:

$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml

在 Kubernetes 上安装 ActiveMQ Artemis
最后,我们可以继续安装 ActiveMQ Artemis。首先,让我们安装所需的 CRD。您可以在 GitHub 上的操作员存储库中找到所有 YAML 清单。

$ git clone https://github.com/artemiscloud/activemq-artemis-operator.git
$ cd activemq-artemis-operator

带有 CRD 的清单位于deploy/crds目录中:

$ kubectl create -f ./deploy/crds

之后,我们可以安装操作符:

$ kubectl create -f ./deploy/service_account.yaml
$ kubectl create -f ./deploy/role.yaml
$ kubectl create -f ./deploy/role_binding.yaml
$ kubectl create -f ./deploy/election_role.yaml
$ kubectl create -f ./deploy/election_role_binding.yaml
$ kubectl create -f ./deploy/operator_config.yaml
$ kubectl create -f ./deploy/operator.yaml

为了创建集群,我们必须创建ActiveMQArtemis对象。它包含许多作为集群(1)一部分的代理。我们还应该设置访问器,以在每个代理 pod (2)之外公开 AMQP 端口。当然,我们也会暴露管理控制台(3)。

apiVersion: broker.amq.io/v1beta1
kind: ActiveMQArtemis
metadata:
  name: ex-aao
spec:
  deploymentPlan:
    size: 3 # (1)
    image: placeholder
    messageMigration: true
    resources:
      limits:
        cpu: "500m"
        memory:
"1024Mi"
      requests:
        cpu:
"250m"
        memory:
"512Mi"
  acceptors: # (2)
    - name: amqp
      protocols: amqp
      port: 5672
      connectionsAllowed: 5
  console: # (3)
    expose: true

创建完成ActiveMQArtemis后,操作员将开始部署过程。它创建StatefulSet对象:

$ kubectl get statefulset
NAME        READY   AGE
ex-aao-ss   3/3     1m

它按顺序使用代理启动所有三个 pod:

$ kubectl get pod -l application=ex-aao-app
NAME          READY   STATUS    RESTARTS    AGE
ex-aao-ss-0   1/1     Running   0           5m
ex-aao-ss-1   1/1     Running   0           3m
ex-aao-ss-2   1/1     Running   0           1m

后面更详细步骤点击标题