Kafka没有跟踪哪些消息被消费而言,Apache Kafka是无状态的。这是JMS消费者的责任。这种简单性使Kafka非常快。
 Apache Kafka消息的JMS消费者者需要保持消息偏移以便能够跟踪他消费使用的消息。偏移也使他有机会回到原来特定位置并再次重新阅读消息。
 在将消息写入Kafka时,消息将写入分区的末尾。这样可以节省磁盘,这也是Apache Kafka的另一个性能提升。
  
使用Spring Integration编写Apache Kafka的JMS消费者,有两种选择:
使用基于高级 Apache Kafka API的入站通道Inbound Channel 适配器。
 使用基于Apache Kafka的SimpleConsumer的消息驱动适配器。
  有什么区别?使用入站通道适配器, 您无权访问主题分区中使用的偏移量。偏移量通过Zookeeper进行管理,这意味着您无法回放到主题分区中的旧位置以重新读取消息。如果你没问题,那么入站通道适配器就是你的朋友。
另一方面,使用Kafka的SimpleConsumer的消息驱动适配器为我们提供了org.springframework.integration.kafka.listener.OffsetManager来跟踪偏移量。
在本演示中,我们将测试通过InboundChannel Adapter获取消息。因为大多数时候这个选项就足够了。
我们将构建简单的基于Spring-Boot的应用程序,并集成了Spring Integration依赖项。主类看起来像这样:
@SpringBootApplication"applicationContext.xml" )public  class  DemoApplication {public  static  void  main(String[] args) {class , args);  
Spring集成部分:
<int :channel id="inputFromKafka" >int :queue />int :channel> "messageProcessor"  class = "com.example.processor.MessageProcessor"  />int :service-activator input-channel= "inputFromKafka" "messageProcessor" >int :poller fixed-delay= "10"  />int :service-activator>  
我们在这里简单地定义了queue-pollable channel inputFromKafka,然后将该通道中的消息移交给com.example.processor.MessageProcessor。轮询间隔为10毫秒。在这个处理器中,让我们打印出消息。
public  class  MessageProcessor {public  void  handleMessage(Message<?> message) {"<strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong>" ); "Spring integration received:" +message.getPayload()); "<strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong>" ); 
非常重要的是要记住,来自Apache Kafka JMS消息转换的Spring Integration消息的有效负载结构如下:
Map>> 
第一个映射的键是主题的ID,第二个映射的键是主题分区的ID,List 是有效负载的列表。 
现在Spring Integration Kafka配置:
<bean id="consumerProperties" class ="org.springframework.beans.factory.config.PropertiesFactoryBean" > "properties" > "auto.offset.reset" >smallest</prop> "socket.receive.buffer.bytes" >10485760</prop>  "fetch.message.max.bytes" >5242880</prop> "auto.commit.interval.ms" >10</prop>int -kafka:zookeeper-connect id= "zookeeperConnect" "localhost:2181"  zk-connection-timeout= "6000" "6000"  zk-sync-time= "2000"  />int -kafka:inbound-channel-adapter "consumerContext"   "true" "inputFromKafka"   "kafka-inbound-channel-adapter" >int :poller fixed-delay= "10"   "MILLISECONDS" "0"  />int -kafka:inbound-channel-adapter> "kafkaStringSerializer"  class = "org.springframework.integration.kafka.serializer.common.StringDecoder"  />int -kafka:consumer-context id= "consumerContext" "4000"  zookeeper-connect= "zookeeperConnect" "consumerProperties" >int -kafka:consumer-configurations>int -kafka:consumer-configuration "tomask79_consumer"   "50"   "kafkaStringSerializer" >int -kafka:topic id= "test_topic"  streams= "1"  />int -kafka:consumer-configuration>int -kafka:consumer-configurations>int -kafka:consumer-context>  
auto.offset.reset
源码Git 
启动:
现在让我们测试我们的消费者。首先,我们将启动Apache Kafka控制台生产者向主题发送一些消息。
kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic 
此命令将启动标准命令输入,如果我们键入例如“Hello Spring Integration kafka”,那么这个消息也应该由演示应用程序显示出来,如下所示:
<strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong><strong>*</strong> 
大多数时候使用Spring Integration的高级Kafka API就足够了