Apache Kafka中"主题"的综合指南

在本指南中,您将了解有关 Kafka 主题(Kafka 中数据的核心结构)的更多信息。
您将学习如何有效地创建、管理和利用 Kafka 主题。每个步骤都包含实际示例和简单的解释,确保清晰理解。

什么是 Apache Kafka 主题?
Kafka 主题Topic是 Kafka 应用程序生成和消费的消息的逻辑集合。消息是一段包含某些信息的数据,例如事件、记录或通知。例如,消息可以是产品创建事件,其中包含添加到在线商店的新产品的详细信息。

Kafka 主题有一个唯一的名称,用于在 Kafka 集群中标识它。Kafka 集群是一组称为代理的服务器,用于存储和管理主题和消息。Kafka 应用程序可以连接到 Kafka 集群并与主题和消息进行交互。

什么是主题分区?
Kafka 主题分为一个或多个分区,每个分区按有序序列存储消息子集。分区是 Kafka 代理中存储和处理的物理单元。每个分区都有一个唯一的标识符,称为分区 ID,由代理分配。

主题的分区数量在创建主题时确定,以后可以更改。分区的数量会影响主题的可扩展性、并行性、容错性和排序保证。

  • 可扩展性:主题的分区越多,它可以存储和处理的消息就越多。具有多个分区的主题可以分布在多个代理上,这增加了主题的存储容量和性能。
  • 并行性:主题的分区越多,可以同时与其交互的生产者和消费者就越多。具有多个分区的主题可以有多个生产者向不同分区发送消息,以及多个消费者从不同分区接收消息。这增加了主题的吞吐量和效率。
  • 容错性:主题的分区越多,对故障的恢复能力就越强。具有多个分区的主题可以有副本,这些副本是存储在不同代理上的分区的副本。如果代理发生故障,副本可以接管并继续提供消息。这增加了主题的可用性和可靠性。
  • 排序保证:主题的分区越多,排序保证就越不严格。具有多个分区的主题只能保证每个分区内消息的顺序,而不能保证跨分区的消息顺序。这意味着来自不同分区的消息可能会无序地传递给消费者。这可能是可接受的,也可能是不可接受的,具体取决于主题的用例。

如何创建主题?
要在 Kafka 集群中创建主题,我们可以使用 Kafka 提供的 kafka-topics.sh 命令行工具。该工具允许我们对主题执行各种操作,如创建、删除、列出和描述主题。

要创建一个主题,我们需要指定以下参数:

  • --bootstrap-server:我们要连接的 Kafka 集群中一个或多个代理的地址。例如,localhost:9092。
  • --create(创建表示我们要创建主题的标志。
  • --topic:主题:我们要创建的主题名称。例如,product-created-events-topic。
  • --分区:我们要为主题创建的分区数量。例如,3。
  • --复制因子:我们要为主题的每个分区创建的副本数量。例如,2。

例如,下面的命令在 Kafka 集群中创建了一个名为 product-created-events-topic 的主题,该主题有 3 个分区和 2 个副本,代理的地址是 localhost:9092:

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic product-created-events-topic --partitions 3 --replication-factor 2

如果主题创建成功,工具会打印一条确认信息,例如
Created topic product-created-events-topic.

如果主题已经存在,或者出现错误,工具将打印错误信息,例如
Topic 'product-created-events-topic' already exists.

如何用分区创建主题?
正如我在上一节所解释的,一个 Kafka 主题被划分为一个或多个分区,每个分区按有序顺序存储一个消息子集。一个主题的分区数量会影响其可扩展性、并行性、容错性和排序保证。

要创建带分区的主题,可以使用 kafka-topics.sh 命令行工具,我在上一节中介绍过该工具。该工具允许你对主题执行各种操作,如创建、删除、列出和描述主题。

要创建带分区的主题,需要指定以下参数:

  • --bootstrap-server(启动服务器):您要连接的 Kafka 集群中一个或多个代理的地址。例如,localhost:9092。
  • --create(创建表示要创建主题的标志。
  • --topic:主题:要创建的主题名称。例如,product-created-events-topic。
  • --分区:您要为主题创建的分区数量。例如,3。

例如,下面的命令在 Kafka 集群中创建了一个名为 product-created-events-topic 的主题,该主题有 3 个分区,代理的地址是 localhost:9092:
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic product-created-events-topic --partitions 3

如果主题创建成功,工具会打印一条确认信息,例如
Created topic product-created-events-topic.

如果主题已经存在,或者出现错误,工具将打印错误信息,例如
Topic 'product-created-events-topic' already exists.

如何更新主题?
有时,你可能想更改现有主题的配置和属性,比如更改分区数量、复制因子、保留策略和其他参数。你可以使用我在前面章节中介绍的 kafka-topics.sh 命令行工具。该工具允许你对主题执行各种操作,如创建、删除、列出和描述主题。

要更新主题,需要指定以下参数:

  • --bootstrap-server(启动服务器):您要连接的 Kafka 集群中一个或多个代理的地址。例如,localhost:9092。
  • --alter(更改表示要更新主题的标志。
  • --topic:主题:要更新的主题名称。例如,product-created-events-topic。
  • --config:配置:以逗号分隔的 key=value 对列表,其中 key 是要更新的参数名称,value 是要为其分配的新值。例如,retention.ms=43200000。

例如,下面的命令会将 Kafka 集群中名为 product-created-events-topic 的主题的保留策略更新为 12 小时,Kafka 集群的代理位于 localhost:9092:
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic product-created-events-topic --config retention.ms=43200000

您可以更新主题的许多参数,如分区数量、复制因子、清理策略、压缩类型等。

如何更新主题分区?
主题的分区数决定了主题被划分为多少个消息子集。分区数会影响主题的可扩展性、并行性、容错性和排序保证。

要更新主题的分区数,可以在 kafka-topics.sh 命令行工具中使用 --partitions 选项,并指定要分配给主题的新分区数。例如,在一个 Kafka 集群中,broker 的地址是 localhost:9092,下面的命令会将名为 product-created-events-topic 的主题的分区数增加到 6:
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic product-created-events-topic --partitions 6

请注意,只能增加主题的分区数,而不能减少分区数。因为减少分区数会导致数据丢失和不一致。如果要减少主题的分区数,必须删除主题,然后用所需的分区数重新创建。

如何更新主题复制因子?
主题的复制因子决定了每个分区存储在不同代理服务器上的副本数量。复制因子会影响主题的容错性和可用性。

要更新主题的复制因子,可以在 kafka-topics.sh 命令行工具中使用 --replication-factor 选项,并指定要分配给主题的新复制因子。例如,下面的命令将 Kafka 集群中名为 product-created-events-topic 的主题的复制因子提高到 3,该集群的代理位于 localhost:9092:
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic product-created-events-topic --replication-factor 3

请注意,只能增加主题的复制因子,不能减少。因为降低复制因子会导致数据丢失和不一致。如果要降低某个主题的复制因子,必须删除该主题,然后用所需的复制因子重新创建。

如何更新保留策略?
主题的保留策略决定了主题中的邮件在被删除前要保留多长时间。保留策略会影响主题的存储容量和性能。

要更新主题的保留策略,可以在 kafka-topics.sh 命令行工具中使用 --config 选项,并指定要分配给主题的新保留策略。为主题指定保留策略有两种方法:按时间或按大小。

要按时间指定保留策略,可以使用 retention.ms 参数,以毫秒为单位指定消息被删除前在主题中的最长保留时间。例如,在一个 Kafka 集群中,代理的地址是 localhost:9092,下面的命令将名为 product-created-events-topic 的主题的保留策略设置为 12 小时:
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic product-created-events-topic --config retention.ms=43200000

要按大小指定保留策略,可以使用 retention.bytes 参数,并以字节为单位指定在删除最旧消息之前,主题所能占用的最大大小。例如,下面的命令将 Kafka 集群中名为 product-created-events-topic 的主题的保留策略设置为 1 GB,Kafka 集群的代理地址为 localhost:9092:
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic product-created-events-topic --config retention.bytes=1073741824

您也可以同时使用这两个参数,当满足其中一个条件时,主题就会被删除。

如何删除主题?
有时,你可能想从 Kafka 集群中删除一个主题,要么是因为不再需要它,要么是因为想用不同的设置重新创建它。你可以使用我在前面章节中介绍的 kafka-topics.sh 命令行工具来实现这一目的。该工具允许你对主题执行各种操作,例如创建、更新、列出和描述主题。

要删除一个主题,需要指定以下参数:

  • --bootstrap-server(启动服务器):您要连接的 Kafka 集群中一个或多个代理的地址。例如,localhost:9092。
  • --delete(删除表示要删除主题的标志。
  • --topic:主题:要删除的主题名称。例如,product-created-events-topic。

例如,以下命令将从 Kafka 集群中删除名为 product-created-events-topic 的主题,该集群的代理位于 localhost:9092:
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic product-created-events-topic

如果主题删除成功,工具会打印一条确认信息,例如
Topic product-created-events-topic is marked for deletion.

如果主题不存在或出现错误,工具将打印错误信息,例如
Topic 'product-created-events-topic' does not exist.

如何从主题中读取消息?
Kafka 消费者是一个从 Kafka 集群中的一个或多个主题接收消息的应用程序。消费者可以订阅一个或多个主题,并以流式或批处理的方式从中消费消息。消费者还可以指定偏移量或位置,以便从主题开始消费消息。例如,消费者可以从主题的开始、结束或特定偏移量开始。

要从一个主题读取消息,你可以使用我在前面章节中介绍的 kafka-console-consumer.sh 命令行工具。该工具允许你从主题中读取消息,并将其打印到控制台(标准输出)。默认情况下,它输出的是消息中没有格式化的原始字节(使用默认格式化器)。

要从主题中读取信息,需要指定以下参数:

  • --bootstrap-server(启动服务器):要连接的 Kafka 集群中一个或多个代理的地址。例如,localhost:9092。
  • --topic(主题要从其中读取消息的主题名称。例如,product-created-events-topic。
  • --from-beginning:从开始:表示要从主题开头读取信息的标志。如果省略此标记,工具将从主题的末尾或最新偏移量读取消息。
  • --属性:以逗号分隔的 key=value 对列表,其中 key 是要设置的属性名称,value 是要为其赋值的值。例如,print.key=true,key.separator=-。你可以使用该选项自定义信息的输出格式,如显示键和值,或更改它们之间的分隔符。

例如,下面的命令从名为 product-created-events-topic 的主题开始读取主题中的消息,并显示每个消息的键和值,中间用破折号隔开,Kafka 集群的代理服务器是 localhost:9092:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic product-created-events-topic --from-beginning --property print.key=true,key.separator=-

如果工具成功连接到主题并开始读取信息,它就会将信息打印到控制台。

结论
在本教程中,您已经了解了 Apache Kafka Topic,它是 Kafka 应用程序产生和消费的消息的逻辑集合。您还学习了如何使用 kafka-topics.sh 和 kafka-console-consumer.sh 命令行工具从 Kafka 主题创建、更新、删除和读取消息。您还探索了 Kafka 主题的一些关键概念和属性,例如分区、复制、保留和压缩。