基于Spring Integration的Apache Kafka JMS消费者


让我们总结一下从Apache Kafka编写JMS消费者时需要知道的基本功能:

  • Kafka没有跟踪哪些消息被消费而言,Apache Kafka是无状态的。这是JMS消费者的责任。这种简单性使Kafka非常快。
  • Apache Kafka消息的JMS消费者者需要保持消息偏移以便能够跟踪他消费使用的消息。偏移也使他有机会回到原来特定位置并再次重新阅读消息。
  • 在将消息写入Kafka时,消息将写入分区的末尾。这样可以节省磁盘,这也是Apache Kafka的另一个性能提升。

使用Spring Integration编写Apache Kafka的JMS消费者,有两种选择:

  • 使用基于高级 Apache Kafka API的入站通道Inbound Channel 适配器。
  • 使用基于Apache Kafka的SimpleConsumer的消息驱动适配器。

有什么区别?使用入站通道适配器, 您无权访问主题分区中使用的偏移量。偏移量通过Zookeeper进行管理,这意味着您无法回放到主题分区中的旧位置以重新读取消息。如果你没问题,那么入站通道适配器就是你的朋友。

另一方面,使用Kafka的SimpleConsumer的消息驱动适配器为我们提供了org.springframework.integration.kafka.listener.OffsetManager来跟踪偏移量。

在本演示中,我们将测试通过InboundChannel Adapter获取消息。因为大多数时候这个选项就足够了。

我们将构建简单的基于Spring-Boot的应用程序,并集成了Spring Integration依赖项。主类看起来像这样:

@SpringBootApplication
@EnableIntegration
@ImportResource("applicationContext.xml")
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

Spring集成部分:

<int:channel id="inputFromKafka">
        <int:queue />
    </int:channel>

    <bean id=
"messageProcessor" class="com.example.processor.MessageProcessor" />

    <int:service-activator input-channel=
"inputFromKafka"
        ref=
"messageProcessor">
        <int:poller fixed-delay=
"10" />
    </int:service-activator>

我们在这里简单地定义了queue-pollable channel inputFromKafka,然后将该通道中的消息移交给com.example.processor.MessageProcessor。轮询间隔为10毫秒。在这个处理器中,让我们打印出消息。

public class MessageProcessor {
   public void handleMessage(Message<?> message) {
         System.out.println("**************************************************");
System.out.println(
"Spring integration received:"+message.getPayload());
System.out.println(
"**************************************************");
    }
}

非常重要的是要记住,来自Apache Kafka JMS消息转换的Spring Integration消息的有效负载结构如下:

Map<String, Map<Integer, List<Object>>>

第一个映射的键是主题的ID,第二个映射的键是主题分区的ID,List <Object>是有效负载的列表。

现在Spring Integration Kafka配置:

<bean id="consumerProperties"
        class=
"org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name=
"properties">
            <props>
                <prop key=
"auto.offset.reset">smallest</prop>
                <prop key=
"socket.receive.buffer.bytes">10485760</prop> 
                <prop key=
"fetch.message.max.bytes">5242880</prop>
                <prop key=
"auto.commit.interval.ms">10</prop>
            </props>
        </property>
    </bean>


    <int-kafka:zookeeper-connect id=
"zookeeperConnect"
        zk-connect=
"localhost:2181" zk-connection-timeout="6000"
        zk-session-timeout=
"6000" zk-sync-time="2000" />


    <int-kafka:inbound-channel-adapter
        kafka-consumer-context-ref=
"consumerContext" 
        auto-startup=
"true"
        channel=
"inputFromKafka" 
        id=
"kafka-inbound-channel-adapter">
        <int:poller fixed-delay=
"10" 
                    time-unit=
"MILLISECONDS"
                    receive-timeout=
"0" />
    </int-kafka:inbound-channel-adapter>

    <bean id=
"kafkaStringSerializer" 
          class=
"org.springframework.integration.kafka.serializer.common.StringDecoder" />

    <int-kafka:consumer-context id=
"consumerContext"
        consumer-timeout=
"4000" zookeeper-connect="zookeeperConnect"
        consumer-properties=
"consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                group-id=
"tomask79_consumer" 
                max-messages=
"50" 
                value-decoder=
"kafkaStringSerializer">
                <int-kafka:topic id=
"test_topic" streams="1" />
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>


你可以发现inboundchannel-adapter入站通道适配器用来自Kafka的消息放入inputToKafka 通道,检查下面属性:
auto.commit.interval.ms
此属性表示消费者偏移提交给zookeeper的频率(以毫秒为单位)。这有助于Kafka跟踪哪些消息是新的或已经读过的消息。

auto.offset.reset
当在zookeper中的偏移超出实际范围时该做什么。此设置说明在主题分区中切换移动位置到哪里才能读取到消息。

源码Git
让我们构建并启动我们的Spring-Boot maven应用程序,运行:
mvn clean install

启动:
java -jar target/receiver-0.0.1-SNAPSHOT.jar

现在让我们测试我们的消费者。首先,我们将启动Apache Kafka控制台生产者向主题发送一些消息。

kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

此命令将启动标准命令输入,如果我们键入例如“Hello Spring Integration kafka”,那么这个消息也应该由演示应用程序显示出来,如下所示:

**************************************************
Spring integration received: {test_topic={0=[Hello Spring Integration kafka]}}
**************************************************

大多数时候使用Spring Integration的高级Kafka API就足够了