Redpanda简介

在本教程中,我们将讨论一个名为Redpanda的强大事件流平台。这是对事实上的行业流媒体平台Kafka 的竞争,有趣的是,它还与 Kafka API 兼容。

我们将了解 Redpanda 的关键组件、功能和用例,创建用于将消息发布到 Redpanda 主题的 Java 程序,然后从中读取消息。

由于 Redpanda 的制作者声称自己是 Kafka 的竞争对手:

Redpanda 特点

  • 包括一个易于安装的二进制包
  • 不依赖JVM和第三方工具
  • 由于其每核线程编程模型,比 Kafka 快 10 倍
  • 用 C++ 编写
  • 每个核心可以处理 1 GB/秒的写入
  • 支持自动内核调整
  • p99999 延迟为 16ms
  • Redpanda Cloud 提供了一些开箱即用的托管连接器

Redpanda架构
Redpanda 的架构不仅简单而且非常容易掌握。有趣的是,它有一个易于安装的二进制安装包。这使开发人员能够快速领先,这也是其受欢迎的原因。此外,它还提供了一个具有极高吞吐量的高性能流媒体平台。

关键部件和特性
让我们深入了解 Redpanda 的关键组件和功能,这些组件和功能使其极其强大和高性能:

控制平面支持 Kafka API,用于管理代理、创建消息主题、发布和消费消息等等。因此,依赖 Kafka 的遗留系统可以轻松迁移到 Redpanda。但是,有一组不同的管理 API 用于管理和配置 Redpanda 集群。

Redpanda 支持分层存储。这意味着我们可以将其配置为将其数据日志从本地缓存卸载或存档到云中更便宜的对象存储。此外,根据消费者的需求,数据会实时从远程对象存储移回到本地缓存。

Redpanda 有一个Raft 共识算法实现层,可以跨节点复制主题分区数据。此功能可防止发生故障时数据丢失。自然也就保证了较高的数据安全性和容错性。

Redpanda 拥有强大的身份验证和授权支持。它可以使用 SASL、OAuth、OpenID Connect (OIDC)、基本身份验证、Kerberos 等方法对外部用户和应用程序进行身份验证。此外,它还可以通过基于角色的访问控制 (RBAC) 机制对其资源进行细粒度的访问控制。

模式对于定义 Redpanda 经纪人、消费者和生产者之间交换的数据至关重要。因此,集群有一个架构注册表。架构注册表 API 帮助注册和修改架构。

HTTP代理 (pandaproxy) API提供了一种与 Redpanda 交互的便捷方式,以进行基本数据操作,例如列出主题和代理、获取事件、生成事件等等。

最后,Redpanda为其监控提供了指标端点。这些可以在 Prometheus(监控工具)上进行配置,以提取重要指标并将其显示在Grafana 仪表板上。


单个二进制安装包
Redpanda 的安装包包含一个二进制文件,因此它的安装比 Kafka 简单得多。与 Kafka 不同,它不依赖于 JVM 或 Zookeeper 等集群管理器。由于这些因素,操作 Redpanda 非常容易。

它是用 C++ 开发的,具有引人注目的每核线程编程模型,有助于最佳地利用 CPU 核心、内存和网络。因此,其部署的硬件成本显着降低。该模型还可以实现低延迟和高吞吐量。

Redpanda 的集群由多个节点组成。每个节点可以是数据平面或控制平面。所有这些节点都需要安装一个具有适当配置的二进制包。如果节点具有高端计算能力,则它们可以同时扮演这两种角色,而不会出现性能瓶颈。

管理工具
Redpanda 提供了两种管理工具,一个Web 控制台和一个称为Redpanda Keeper (RPK)的 CLI 。控制台是一个用户友好的 Web 应用程序,集群管理员可以使用。
RPK 主要用于低级集群管理和调优。但是,控制台提供了数据流的可见性以及故障排除和管理集群的功能。


部署
Redpanda支持自托管和Redpanda云部署。

在自托管部署中,客户可以将 Redpanda 集群部署在其私有数据中心内或公有云的 VPC 中。它可以部署在物理机或虚拟机以及 Kubernetes 上。根据经验,每个代理都应该有其专用节点。目前支持RHEL/CentOS和Ubuntu操作系统。

此外,AWS Simple Storage Service (S3)、Azure Blob Storage (ABS) 和 Google Cloud Storage (GCS) 可用于支持分层存储。

有趣的是,客户还可以选择Redpanda Cloud来提供托管服务。他们可以将整个集群完全放在 Redpanda Cloud 上,也可以选择拥有在其私有数据中心或公共云帐户中运行的数据平面。控制平面保留在 Redpanda Cloud 上,监控、配置和升级都在其中完成。

关键用例
与 Kafka 不同,Redpanda 对于开发人员来说是一个极其强大的流媒体平台,因为它的架构简单且易于安装。让我们沿着同样的思路快速浏览一下用例:
一般来说,流媒体平台的参与者是:

  • 源系统生成提要
  • 源可以监视事件、指标、通知等
  • 集群中的代理管理主题
  • 生产者从源系统读取提要并将其发布到主题
  • 消费者不断对订阅的主题进行民意调查
  • 目标系统接收来自消费者的转换后的消息

Redpanda 保证将来自监控工具、合规性和安全平台、物联网设备等各种来源的实时信息传送到目标系统,平均延迟降低 10 倍。

它支持消费者和生产者模型来处理来自各种来源的实时提要或事件。生产者是从源系统读取数据并将其发布到 Redpanda 集群中的主题的应用程序。集群中的broker高度可靠、容错,保证消息的传递。

消费者应用程序订阅集群中的主题。最终,他们从主题中读取数据,并在进一步转换数据后,将其发送到各种目标系统,如分析平台、NoSQL 数据库、关系数据库或其他流平台。

在微服务架构中,Redpanda 通过促进微服务之间的异步通信来帮助解耦微服务。

因此,它可以在各行业的发展中发挥重要作用:

  • 用于事件和日志处理、报告、故障排除和自动修复的可观察性平台
  • 实时合规和欺诈检测系统
  • 实时分析仪表板和应用程序

使用 Kafka API 实现 Redpanda 客户端
值得注意的是,Redpanda 支持 Kafka API。因此,我们将使用 Kafka 客户端编写可以与 Redpanda Stream 交互的程序。

对于我们的示例,我们使用Java Testcontainers在 Windows 桌面上部署单节点 Redpanda。

此外,我们将探索涵盖主题创建、消息发布和消息消费的基本程序。这仅用于演示目的,因此,我们不会深入研究 Kafka API 概念。

先决条件
在开始之前,让我们导入Kafka 客户端库所需的Maven 依赖项:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>

创建主题
为了在 Redpanda 上创建主题,我们首先实例化Kafka 客户端库中的AdminClient类:

AdminClient createAdminClient() {
    Properties adminProps = new Properties();
    adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
    return KafkaAdminClient.create(adminProps);
}

为了设置AdminClient,我们获取了代理 URL 并将其传递给其静态create()方法。

现在,让我们看看如何创建主题:

void createTopic(String topicName) {
    try (AdminClient adminClient = createAdminClient()) {
        NewTopic topic = new NewTopic(topicName, 1, (short) 1);
        adminClient.createTopics(Collections.singleton(topic));
    } catch (Exception e) {
        LOGGER.error("Error occurred during topic creation:", e);
    }
}

AdminClient类的createTopics ()方法采用NewTopic对象作为创建主题的参数。

最后,让我们看一下createTopic()方法的实际操作:

@Test
void whenCreateTopic_thenSuccess() throws ExecutionException, InterruptedException {
    String topic = "test-topic";
    createTopic(topic);
    try(AdminClient adminClient = createAdminClient()) {
        assertTrue(adminClient.listTopics()
          .names()
          .get()
          .contains(topic));
    }
}

程序在Redpanda上成功创建了主题test-topic。我们还使用AdminClient类的listTopics()方法验证代理中主题是否存在。

向主题发布消息
可以理解的是,生产者应用程序最基本的要求是将消息发布到主题。为此,我们将使用KafkaProducer:

KafkaProducer<String, String> createProducer() {
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    return new KafkaProducer<String, String>(producerProps);
}

我们通过向KafkaProducer构造函数提供代理 URL 和StringSerializer类等基本属性来实例化生产者。

现在,让我们使用生产者将消息发布到主题:

void publishMessage(String msgKey, String msg, String topic, KafkaProducer<String, String> producer)
    throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, msgKey, msg);
    producer.send(record).get();
}

创建ProducerRecord对象后,我们将其传递给KafkaProducer对象中的send()方法来发布消息。send ()方法是异步操作的,因此,我们调用get()方法来确保在消息发布之前处于阻塞状态。

最后,现在让我们发布一条消息:

@Test
void givenTopic_whenPublishMsg_thenSuccess() {
    try (final KafkaProducer<String, String> producer = createProducer()) {
        assertDoesNotThrow(() -> publishMessage("test_msg_key_2", "Hello Redpanda!", "baeldung-topic", producer));
    }
}

首先,我们通过调用方法createProducer() 创建KafkaProducer对象。然后我们发布消息“Hello Redpanda!” 通过调用我们之前介绍过的方法publishMessage()到主题baeldung-topic 。

使用主题中的消息
下一步,我们首先创建一个KafkaConsumer,然后才能使用流中的消息:

KafkaConsumer<String, String> createConsumer() {
    Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"test-consumer-group");
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return new KafkaConsumer<String, String>(consumerProps);
}

我们通过向KafkaConsumer构造函数提供代理 URL、 StringDeSerializer类等基本属性来实例化消费者。此外,我们确保消费者将从偏移量 0(“最早”)开始消费消息。

继续,让我们使用一些消息:

@Test
void givenTopic_whenConsumeMessage_thenSuccess() {
    try (KafkaConsumer<String, String> kafkaConsumer = createConsumer()) {
        kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
        while(true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
            if(records.count() == 0) {
                continue;
            }
            assertTrue(records.count() >= 1);
            break;
        }
    }
}

该方法在创建KafkaConsumer对象后订阅一个主题。然后,它每 1000 毫秒轮询一次以从中读取消息。在这里,为了演示,我们将退出循环,但在现实世界中,应用程序会不断轮询消息,然后进一步处理它们。