数据Redpanda平台简介

Redpanda 是一个开源流媒体平台,旨在快速、可扩展且可靠,满足现代数据密集型应用程序的需求。它提供了与 Apache Kafka 兼容的 API,可以轻松迁移现有应用程序。本文将探讨 RedPanda,了解其基础知识,并演示如何使用 Java 来利用其功能。

Redpanda 是什么?
Redpanda是一个高性能的流数据平台,专为存储和处理实时数据流而设计。它有助于构建事件驱动的架构,其中应用程序通过交换消息(事件)进行通信。 Redpanda 将生产者(数据发布者)与消费者(数据订阅者)解耦,从而实现异步通信和可扩展性。

Redpanda的关键组件

  • 存储引擎:Redpanda 采用针对现代硬件优化的高性能存储引擎,实现海量数据的高效存储和检索。
  • 流处理:它提供强大的流处理功能,允许用户使用Kafka Streams或KSQL等框架实时处理和分析数据。
  • 分布式共识:Redpanda 采用名为 Raft 的分布式共识协议,确保集群中节点之间的强一致性和容错能力。
  • 与 Kafka 的兼容性:Redpanda 完全兼容 Apache Kafka 协议,使得 Kafka 用户可以无缝迁移或与 Redpanda 集成。

Redpanda的特点

  • 简单性: Redpanda 是轻量级的,不需要外部依赖。它作为单个二进制文件运行,简化了部署和管理。
  • 操作简单:Redpanda 提供用户友好的 Web 控制台和命令行界面 (Redpanda Keeper),用于集群管理和监控。 Redpanda 还通过其直观的管理界面和自动化工具简化了部署和管理,减少了 DevOps 团队的运营开销。
  • 高性能:Redpanda 专为高吞吐量和低延迟 而设计,使其适合要求苛刻的实时应用程序,例如实时分析、欺诈检测和金融交易。
  • 可扩展性:它可以轻松水平扩展,允许用户无缝扩展其集群以适应不断增长的工作负载和数据量。
  • 可靠性:凭借分布式架构和内置容错机制,Redpanda 即使在节点故障的情况下也能确保数据的持久性和可用性。
  • Kafka API 兼容性: Redpanda 由于兼容 Kafka API,与现有 Kafka 工具和应用程序无缝集成,使 Kafka 用户更容易过渡。

安装与设置
作为设置 RedPanda 的一部分,了解broker代理/经纪人的概念至关重要。在 RedPanda 中,代理是负责管理数据流的存储和处理的基本组件。每个 RedPanda 节点都充当集群内的代理。

1 broker代理/经纪人
RedPanda 中的经纪人在促进数据生产者和消费者之间的沟通方面发挥着至关重要的作用。它充当中介者,从生产者接收数据,将其持久存储在磁盘上,并可供消费者使用。此外,代理还处理复制、分区和消息路由等各种任务,以确保系统的容错性、高可用性和可扩展性。

从本质上讲,RedPanda 中的代理的功能类似于一个集中式枢纽,负责编排流平台内的数据流。通过在集群中的多个代理之间分配数据,RedPanda 确保了针对故障的恢复能力和资源的高效利用。

2 安装
在继续之前,请确保Docker已在您的计算机上安装并运行。

要仅使用一个代理启动 RedPanda 集群,最简单的方法是使用rpk( rpk container start -n 1)。创建此命令行实用程序是为了配置和管理 RedPanda 集群。

docker-compose.yml我们还可以通过从RedPanda Docs网站下载文件到本地计算机来启动 RedPanda 集群,并使用docker-compose up -d在保存文件的目录中启动集群 docker-compose.yml 。

发布和消费消息的Java程序
下面是简单的 Java 程序,演示如何向 Redpanda 主题发布消息,然后从中读取消息。

1 Java 客户端设置
我们将使用 Kafka Java 客户端库从 Java 应用程序与 RedPanda 进行交互。您可以使用 Maven 或 Gradle 将其添加到您的项目中。以下是 Maven 的依赖项:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.7.0</version>
</dependency>

2 使用AdminClient创建主题
除了发布和使用消息之外,管理主题是使用 Redpanda 的一个重要方面。 Kafka 客户端库提供了一个AdminClient类,允许用户以编程方式创建、删除和管理主题。

下面是一个示例 Java 程序,演示如何使用AdminClientKafka 客户端库中的类创建主题:

public class TopicCreator {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // Kafka broker address
        String bootstrapServers =
"localhost:9092";
       
// Topic name
        String topicName =
"new-topic";
       
// Number of partitions for the topic
        int numPartitions = 3;
       
// Replication factor for the topic
        short replicationFactor = 1;

       
// Configure AdminClient properties
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

       
// Create AdminClient
        try (AdminClient adminClient = AdminClient.create(properties)) {
           
// Create a new topic
            NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
            adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
            System.out.println(
"Topic created successfully: " + topicName);
        } catch (Exception e) {
            System.err.println(
"Error creating topic: " + e.getMessage());
        }
    }
}

在这个程序中:

  • localhost:9092是 Redpanda 经纪人的地址。
  • AdminClient是使用提供的配置属性创建的。
  • NewTopic使用指定的主题名称、分区数和复制因子来实例化对象。
  • createTopics调用的方法来AdminClient创建主题。

3 向 Redpanda Topic 发布消息(Producer 示例)
以下代码演示了如何创建生产者并向 RedPanda 中的主题发送消息:

public class RedpandaProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // Redpanda broker address
        String bootstrapServers =
"localhost:9092";
       
// Topic name
        String topic =
"new-topic";

       
// Configure producer properties
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

       
// Create KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);


       
// Publish messages to the topic
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic,
"Message " + i);
            producer.send(record).get();
        }

       
// Close the producer
        producer.close();
    }
}

此代码定义了一个名为 的 Java 类RedpandaProducer,它充当 Kafka 生产者,用于将消息发布到 Redpanda 主题。我们来分解并解释一下代码块:

配置:用于配置生产者的属性是使用Properties对象设置的。关键配置包括:

  • BOOTSTRAP_SERVERS_CONFIG:指定用于建立与 Redpanda 的初始连接的主机/端口对列表。在本例中,它设置为localhost:9092。
  • KEY_SERIALIZER_CLASS_CONFIGand VALUE_SERIALIZER_CLASS_CONFIG:指定键和值的序列化器类。在这里,两者都设置为,StringSerializer因为我们正在处理字符串。

生产者初始化KafkaProducer:使用配置的属性创建的实例。

消息发布:在循环内,生产者将消息发布到指定主题。该循环进行迭代,并且对于每次迭代,ProducerRecord都会使用主题名称和消息创建一个新的迭代。该producer.send(record)方法将记录异步发送到主题。

4 读取Redpanda主题消息(消费者示例)
以下是如何使用 RedPanda Java 客户端使用主题中的消息:

public class RedpandaConsumer {

    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092"; // Redpanda broker address
        String topic =
"new-topic"; // Topic name

       
// Configure the consumer
        Properties props = new Properties();
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
"consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

       
// Subscribe to the topic
        consumer.subscribe(Collections.singletonList(topic));

       
// Poll for new messages
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(
"Received message: " + record.value());
            }
        }
    }
}

此代码定义了一个名为 的 Java 类RedpandaConsumer,它充当 Kafka 使用者,用于从 Redpanda 主题读取消息。以下是代码的细分:

配置:用于配置消费者的属性是使用Properties对象设置的。关键配置包括:

  • AUTO_OFFSET_RESET_CONFIG:在这里,它设置为“最早”,这意味着消费者将从最早的可用偏移量开始读取。
  • BOOTSTRAP_SERVERS_CONFIG:指定用于建立与 Redpanda 的初始连接的主机/端口对列表。在本例中,它设置为localhost:9092。
  • GROUP_ID_CONFIG:指定消费者组id。组中的每个消费者必须有一个唯一的组 ID。这里,它被设置为“ consumer-group ”。
  • KEY_DESERIALIZER_CLASS_CONFIGand VALUE_DESERIALIZER_CLASS_CONFIG:指定键和值的反序列化器类。在这里,两者都设置为,StringDeserializer因为我们正在处理字符串。

消费者初始化KafkaConsumer:使用配置的属性创建实例。

主题订阅:消费者使用 订阅指定的主题consumer.subscribe(Collections.singletonList(topic))。

消息消费:在连续while循环内,消费者轮询来自订阅主题的新消息。它用于consumer.poll(Duration.ofMillis(100))获取记录,其中Duration.ofMillis(100)指定如果没有立即可用的记录,则等待记录的最长时间。一旦检索到记录,使用者就会迭代它们,将每条消息的值打印到控制台。

RedPanda 的一些常见用例

  • 实时分析:Redpanda 可以为实时分析平台提供支持,使组织能够从物联网设备、传感器和应用程序日志等流数据源中获得可操作的见解。
  • 日志聚合:Redpanda可以作为集中式日志聚合平台,收集和存储来自分布式应用程序和系统的日志,以进行监控、故障排除和分析。
  • 微服务通信:Redpanda 促进现代云原生架构中微服务之间的事件驱动通信,从而实现服务的无缝集成和解耦。