Spring Boot与运行在Kubernetes上的ksqlDB集成教程 - Piotr


在本文中,您将学习如何在 Kubernetes 上运行ksqlDB并将其与 Spring Boot 一起使用。您还将了解如何基于Strimzi运算符在 Kubernetes 上运行 Kafka。
为了将 Spring Boot 与 ksqlDB 服务器集成,我们将利用 ksqlDB 提供的轻量级 Java 客户端。此客户端支持拉取和推送查询。它还提供了用于插入行和创建表或流的 API。您可以在此处的 ksqlDB 文档中阅读有关它的更多信息。

我们的示例 Spring Boot 应用程序非常简单。我们将使用 Spring Cloud Stream Supplierbean 生成事件并将其发送到 Kafka 主题。有关使用 Spring Cloud Stream 的 Kafka 的更多信息,请参阅以下文章
另一方面,我们的应用程序使用 kSQL 查询从 Kafka 主题获取数据。它还KTable在启动时创建。

源代码
如果您想自己尝试一下,可以随时查看我的源代码。为此,您需要克隆我的 GitHub 存储库。然后进入transactions-service目录。之后,您应该按照我的指示进行操作。让我们开始。

先决条件
我们将使用几种工具。你需要有:

  • Kubernetes 集群——它可能是一个单节点的本地集群,例如 Minikube 或 Kind。就个人而言,我在 Docker 桌面上使用 Kubernetes
  • kubectlCLI – 与集群交互
  • Helm——我们将使用它在 Kubernetes 上安装 ksqlDB 服务器。如果您没有 Helm,则必须安装它

使用 Strimzi 在 Kubernetes 上运行 Kafka
当然,我们需要一个 Kafka 实例来执行我​​们的练习。有几种方法可以在 Kubernetes 上运行 Kafka。我将向您展示如何使用基于运算符的方法来实现。第一步,您需要在集群上安装 OLM(Operator Lifecycle Manager)。为此,您只需在 Kubernetes 上下文中执行以下命令:

$ curl -L https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.21.2/install.sh -o install.sh
$ chmod +x install.sh
$ ./install.sh v0.21.2

然后,您可以继续安装 Strimzi 操作员。这只是一个命令。

$ kubectl create -f https://operatorhub.io/install/stable/strimzi-kafka-operator.yaml

现在,我们可以在 Kubernetes 上创建一个 Kafka 集群。让我们从练习的专用命名空间开始:

$ kubectl create ns kafka

我假设你有一个单节点 Kubernetes 集群,所以我们还创建了一个单节点 Kafka。Kafka这是带有CRD的 YAML 清单。您可以在路径下的存储库中找到它k8s/cluster.yaml。

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 1
      inter.broker.protocol.version: "3.2"
      min.insync.replicas: 1
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: 1
      transaction.state.log.replication.factor: 1
    listeners:
      - name: plain
        port: 9092
        tls: false
        type: internal
      - name: tls
        port: 9093
        tls: true
        type: internal
    replicas: 1
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 30Gi
          deleteClaim: true
    version: 3.2.0
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true

让我们将它应用到命名空间中的 Kubernetes kafka:

$ kubectl apply -f k8s/cluster.yaml -n kafka

您应该会看到一个 Kafka 实例和一个 Zookeeper 实例。如果 pod 正在运行,则意味着您在 Kubernetes 上安装了 Kafka。

$ kubectl get pod -n kafka
NAME                                          READY   STATUS    RESTARTS  AGE
my-cluster-entity-operator-68cc6bc4d9-qs88p   3/3     Running   0         46m
my-cluster-kafka-0                            1/1     Running   0         48m
my-cluster-zookeeper-0                        1/1     Running   0         48m

Kafka 在集群内以 namemy-cluster-kafka-bootstrap和 port可用9092。

kubectl get svc -n kafka
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                               AGE
my-cluster-kafka-bootstrap    ClusterIP   10.108.109.255   <none>        9091/TCP,9092/TCP,9093/TCP            47m
my-cluster-kafka-brokers      ClusterIP   None             <none>        9090/TCP,9091/TCP,9092/TCP,9093/TCP   47m
my-cluster-zookeeper-client   ClusterIP   10.102.10.251    <none>        2181/TCP                              47m
my-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP            47m

在 Kubernetes 上运行 KsqlDB 服务器
KsqlDB 服务器是 Confluent 平台的一部分。由于我们不是在 Kubernetes 上安装整个 Confluent Platform,而只是一个开源的 Kafka 集群,我们需要单独安装 KsqlDB Server。让我们用 Helm 来做。KSQL 服务器没有“官方”Helm 图表。因此,我们应该直接去 GitHub 上的 Confluent Helm 仓库:

$ git clone https://github.com/confluentinc/cp-helm-charts.git
$ cd cp-helm-charts

在这个存储库中,您可以为每个单独的 Confluent 组件找到单独的 Helm 图表,包括控制中心或 KSQL Server。我们的图表在存储库中的位置是charts/cp-ksql-server. 我们需要在安装过程中覆盖一些默认设置。首先,我们必须禁用无头模式。在无头模式下,KSQL Server 不公开 HTTP 端点并从输入脚本加载查询。我们的 Spring Boot 应用程序将通过 HTTP 连接到服务器。在下一步中,我们应该覆盖 Kafka 集群的默认地址和仍然6.1.0存在的 KSQL Server 的默认版本。我们将使用最新版本7.1.1。这是helm您应该在 Kubernetes 集群上运行的命令:

$ helm install cp-ksql-server \
    --set ksql.headless=false \
    --set kafka.bootstrapServers=my-cluster-kafka-bootstrap:9092 \
    --set imageTag=7.1.1 \
  charts/cp-ksql-server -n kafka


让我们验证 KSQL 是否在集群上运行:

$ kubectl get pod -n kafka | grep ksql
cp-ksql-server-679fc98889-hldfv               2/2     Running   0               2m11s

HTTP 端点可用于 namecp-ksql-server和 port下的其他应用程序8088:

$ kubectl get svc -n kafka | grep ksql
cp-ksql-server                ClusterIP   10.109.189.36    <none>        8088/TCP,5556/TCP                     3m25s

现在,我们的 Kubernetes 集群上运行着所需的全部人员。因此,我们可以继续进行 Spring Boot 应用程序的实现。

将 Spring Boot 与 ksqlDB 集成
我没有发现 Spring Boot 和 ksqlDB 之间有任何开箱即用的集成。因此,我们将ksqldb-api-client直接使用。首先,我们需要包含 ksqlDB Maven 存储库和一些依赖项:

<dependencies>
        ...

  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-api-client</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-udf</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-common</artifactId>
    <version>0.26.0</version>
  </dependency>
</dependencies>

<repositories>
  <repository>
    <id>ksqlDB</id>
    <name>ksqlDB</name>
    <url>https://ksqldb-maven.s3.amazonaws.com/maven/</url>
  </repository>
</repositories>

之后,我们可以定义一个@Bean返回 ksqlDBClient实现的 Spring。由于我们将在与 KSQL Server 相同的命名空间中运行我们的应用程序,因此我们需要提供 Kubernetes 服务名称作为主机名。

@Configuration
public class KSQLClientProducer {

    @Bean
    Client ksqlClient() {
        ClientOptions options = ClientOptions.create()
                .setHost("cp-ksql-server")
                .setPort(8088);
        return Client.create(options);
    }
}

我们的应用程序通过 HTTP 端点与 KSQL Server 交互。KTable它在启动时创建一个单曲。为此,我们需要调用executeStatementKSQL Clientbean 实例上的方法。我们正在创建 SOURCE 表以启用对其运行 拉取查询 。该表从transactions主题中获取数据。它期望传入事件中的 JSON 格式。

public class KTableCreateListener implements ApplicationListener<ContextRefreshedEvent> {

   private static final Logger LOG = LoggerFactory.getLogger(KTableCreateListener.class);
   private Client ksqlClient;

   public KTableCreateListener(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @Override
   public void onApplicationEvent(ContextRefreshedEvent event) {
      try {
         String sql = """
                 CREATE SOURCE TABLE IF NOT EXISTS transactions_view (
                   id BIGINT PRIMARY KEY,
                   sourceAccountId BIGINT,
                   targetAccountId BIGINT,
                   amount INT
                 ) WITH (
                   kafka_topic='transactions',
                   value_format='JSON'
                 );
                 """;
         ExecuteStatementResult result = ksqlClient.executeStatement(sql).get();
         LOG.info("Result: {}", result.queryId().orElse(null));
      } catch (ExecutionException | InterruptedException e) {
         LOG.error("Error: ", e);
      }
   }
}

创建表后,我们可以对其运行一些查询。有非常简单的查询。我们正在尝试查找与特定帐户相关的所有交易和所有交易。

@RestController
@RequestMapping("/transactions")
public class TransactionResource {

   private static final Logger LOG = LoggerFactory.getLogger(TransactionResource.class);
   Client ksqlClient;

   public TransactionResource(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @GetMapping
   public List<Transaction> getTransactions() throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view;")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   @GetMapping("/target/{accountId}")
   public List<Transaction> getTransactionsByTargetAccountId(@PathVariable("accountId") Long accountId)
            throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view WHERE sourceAccountId=" + accountId + ";")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   private Transaction mapRowToTransaction(Row row) {
      Transaction t = new Transaction();
      t.setId(row.getLong("ID"));
      t.setSourceAccountId(row.getLong("SOURCEACCOUNTID"));
      t.setTargetAccountId(row.getLong("TARGETACCOUNTID"));
      t.setAmount(row.getInteger("AMOUNT"));
      return t;
   }

}

使用 Spring Cloud Stream 向主题发送事件
最后,我们可以进行练习的最后一部分。我们需要生成测试数据并将其发送到 Kafkatransactions主题。实现它的最简单方法是使用 Spring Cloud Stream Kafka 模块。首先,让我们添加以下 Maven 依赖项:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

然后,我们可以创建一个基于 Spring Supplierbean 的生产者。Supplierbean 不断生成新事件并将其发送到目标通道。默认情况下,它每秒重复一次该操作。

@Configuration
public class KafkaEventProducer {

   private static long transactionId = 0;
   private static final Random r = new Random();

   @Bean
   public Supplier<Message<Transaction>> transactionsSupplier() {
      return () -> {
          Transaction t = new Transaction();
          t.setId(++transactionId);
          t.setSourceAccountId(r.nextLong(1, 100));
          t.setTargetAccountId(r.nextLong(1, 100));
          t.setAmount(r.nextInt(1, 10000));
          Message<Transaction> o = MessageBuilder
                .withPayload(t)
                .setHeader(KafkaHeaders.MESSAGE_KEY, new TransactionKey(t.getId()))
                .build();
          return o;
      };
   }
}

当然,我们还需要提供我们的 Kafka 集群的地址和频道的目标主题名称。Kafka的地址是在部署阶段注入的。

spring.kafka.bootstrap-servers = ${KAFKA_URL}
spring.cloud.stream.bindings.transactionsSupplier-out-0.destination = transactions

最后,让我们在 Kubernetes 上部署我们的 Spring Boot。这是包含 KubernetesDeployment和Service定义的 YAML 清单:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: transactions
spec:
  selector:
    matchLabels:
      app: transactions
  template:
    metadata:
      labels:
        app: transactions
    spec:
      containers:
      - name: transactions
        image: piomin/transactions-service
        env:
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap:9092
        ports:
        - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: transactions
spec:
  type: ClusterIP
  selector:
    app: transactions
  ports:
    - port: 8080

让我们在命名空间中部署应用程序kafka:

$ kubectl apply -f k8s/deployment.yaml -n kafka

在 Kubernetes 上测试 ksqlDB
在 Kubernetes 上部署应用程序后,让我们启用port-forward在本地端口上对其进行测试:

$ kubectl port-forward service/transactions 8080:8080
现在,我们可以测试我们的两个 HTTP 端点。让我们从搜索所有事务的端点开始:

$ curl http://localhost:8080/transactions
然后,您可以调用端点来搜索与 相关的所有事务targetAccountId,例如:

$ curl http://localhost:8080/transactions/target/10

最后的想法
在本文中,我想展示如何在 Kubernetes 上使用 ksqlDB。我们使用 Spring Boot 和 Spring Cloud Stream 等框架与 Kafka 和 ksqlDB 进行交互。您可以了解如何使用 Strimzi 运算符在 Kubernetes 上运行 Kafka 集群,或者如何直接从 Helm 存储库部署 KSQL Server。