Apache Kafka和Spring Integration的使用


Apache Kafka当然是最常用的JMS代理,它有Apache Hadoop等分布式系统用于数据入口。与其他JMS代理相比,Apache Kafka的关键特性(从我的观点来看):

  • Apache Kafka是无状态的,当您使用Kafka主题的消息时,它不会被删除。Kafka对已发布的消息有明确的保留政策。所以你的所有终端都必须是幂等的。
  • Apache Kafka是严格的发布 - 订阅 JMS代理。使用Kafka,您只能向主题发送消息。没有队列概念。
  • 在Apache Kafka中,消费者分为消费者群体。已发布的消息将分发到这些使用者组中,其中每个组中只有一个使用者获取该消息。
  • 在Apache Kafka中没有队列,但如果您只有一个消费者者组,那么您可以获得点对点消息传递的效果。

让我们看看如何将Apache Kafka与Spring Integration结合使用。我们将构建将消息生成到Kafka主题的简单演示。

卡夫卡安装和启动
您需要启动Apache ZooKeeper:
$KAFKA_HOME/bin/zookeeper-server-start.sh  $KAFKA_HOME/config/zookeeper.properties

启动Kafka:
$KAFKA_HOME/bin/kafka-server-start.sh  $KAFKA_HOME/config/server.properties

创建一个主题topic:
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic

列出Kafka创建的主题:
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181

基于Spring Integration构建Apache Kafka生成器
我们将构建简单的Spring Integration应用程序发布消息到Apache Kafka作为程序参数输入。
首先,让我们创建Spring Integration基础架构:

<int:channel id="inputToKafka">
        <int:queue/>
    </int:channel>

    <int-kafka:outbound-channel-adapter
            id=
"kafkaOutboundChannelAdapter"
            kafka-producer-context-ref=
"kafkaProducerContext"
            channel=
"inputToKafka">
        <int:poller fixed-delay=
"1000" time-unit="MILLISECONDS" 
        receive-timeout=
"0" task-executor="taskExecutor"/>
    </int-kafka:outbound-channel-adapter>

    <task:executor id=
"taskExecutor" pool-size="5" 
     keep-alive=
"120" queue-capacity="500"/>

在这个xml配置,我们创建了队列通道 “inputToKafka”,我们将推送消息到这个通道。ID为“ kafkaOutboundChannelAdapter ”的Bean 是一个出站通道适配器,具有已定义的异步轮询,执行从“inputToKafka”通道读取消息并将其推送到Apache Kafka。

现在Apache Kafka生产者配置:

<bean id="kafkaStringSerializer" 
          class=
"org.apache.kafka.common.serialization.StringSerializer" />

    <int-kafka:producer-context id=
"kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration 
                  broker-list=
"localhost:9092"
                  topic=
"test_topic"
                  key-class-type=
"java.lang.String"
                  value-class-type=
"java.lang.String"
                  key-serializer=
"kafkaStringSerializer"
                  value-serializer=
"kafkaStringSerializer"
                  />
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>

我们来看看生产者参数:

Apache Kafka broker cluster. Cluster where we're going to 
be publishing messages. Let's go with default configuration. 

broker-list="localhost:9092"

Name of topic for publishing messages.
topic="test_topic"
Type of the optional key associated with the message
key-class-type=
"java.lang.String"
Type of value sent in the message.
value-class-type=
"java.lang.String"
Reference to the key serializer, all keys and values has to be serialized before sending into Apache Kafka.
key-serializer=
"kafkaStringSerializer"
Same and key serializer.
value-serializer=
"kafkaStringSerializer"

启动消息流
要启动消息流,也就是消息生产者,我们将简单地创建SpringBoot CommandLineRunner,将命令行参数传递到提到的队列通道中:

@Component
@DependsOn(value="kafkaOutboundChannelAdapter")
public class MessageRunner implements CommandLineRunner {

    @Resource(name =
"inputToKafka")
    private MessageChannel messageChannel;

    @Override
    public void run(String... args) throws Exception {
        for (String arg1 : args) {
            messageChannel.send(
                    new GenericMessage<String>(arg1)
            );
        }
    }
}

如何测试应用程序
在maven应用程序的根目录中,运行:
mvn clean install

成功编译后,运行:
java -jar target / demo-0.0.1-SNAPSHOT.jar Test1 Test2 Test3

这将创建作为参数传递的String消息,并将它们推送到Apache Kafka代理中的test_topic中。
现在,如果您运行Apache Kafka消费者:

$KAFKA_HOME/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic --from-beginning

会得到:

$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic --from-beginning
Test1
Test2
Test3

太棒了,我们在Apache Kafka中创建了基于Spring Integration的简单消息生成器!