Spring为Kafka带来了熟悉的Spring编程模型。它提供了KafkaTemplate用于发布记录和用于异步执行POJO侦听器的侦听器容器。Spring Boot自动配置连接了大部分基础架构,因此您可以专注于业务逻辑。
错误恢复
考虑这个简单的POJO侦听器方法:
@KafkaListener(id = "fooGroup", topics = "topic1") public void listen(String in) { logger.info("Received: " + in); if (in.startsWith("foo")) { throw new RuntimeException("failed"); } }
|
默认情况下,只记录失败的记录,然后我们继续下一个记录。但是,我们可以在侦听器容器中配置错误处理程序以执行其他操作。为此,我们使用我们自己的方法覆盖Spring Boot的自动配置容器工厂:@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.setErrorHandler(new SeekToCurrentErrorHandler()); // <<<<<< return factory; }
|
请注意,我们仍然可以利用大部分自动配置。SeekToCurrentErrorHandler丢弃来自poll()剩下的记录,并执行seek操作实现消费者操作偏移offset复位,使丢弃记录在下一次轮询再取出。默认情况下,错误处理程序会跟踪失败的记录,在10次传递尝试后放弃并记录失败的记录。但是,我们也可以将失败的消息发送到另一个主题。我们称之为死信主题。
下面是合在一起代码:
@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.setErrorHandler(new SeekToCurrentErrorHandler( new DeadLetterPublishingRecoverer(template), 3)); return factory; }
@KafkaListener(id = "fooGroup", topics = "topic1") public void listen(String in) { logger.info("Received: " + in); if (in.startsWith("foo")) { throw new RuntimeException("failed"); } }
@KafkaListener(id = "dltGroup", topics = "topic1.DLT") public void dltListen(String in) { logger.info("Received from DLT: " + in); }
|
反序列化错误
但是,在Spring获得记录之前发生的反序列化异常呢?使用ErrorHandlingDeserializer。此反序列化器包装委托反序列化器并捕获任何异常。然后将它们转发到侦听器容器,该容器将它们直接发送到错误处理程序。该异常包含源数据,因此您可以诊断问题。
领域对象并推断类型
请考虑以下示例:
@Bean public RecordMessageConverter converter() { return new StringJsonMessageConverter(); }
@KafkaListener(id = "fooGroup", topics = "topic1") public void listen(Foo2 foo) { logger.info("Received: " + foo); if (foo.getFoo().startsWith("fail")) { throw new RuntimeException("failed"); } }
@KafkaListener(id = "dltGroup", topics = "topic1.DLT") public void dltListen(Foo2 in) { logger.info("Received from DLT: " + in); }
|
请注意,我们现在正在使用类型的对象Foo2。消息转换器bean推断要转换为方法签名中的参数类型的类型。转换器自动“信任”该类型。Spring Boot自动将转换器配置到侦听器容器中。
在生产者方面,发送的对象可以是不同的类(只要它是类型兼容的):
@RestController public class Controller {
@Autowired private KafkaTemplate<Object, Object> template;
@PostMapping(path = "/send/foo/{what}") public void sendFoo(@PathVariable String what) { this.template.send("topic1", new Foo1(what)); }
}
|
配置:spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
$ curl -X POST http://localhost:8080/send/foo/fail
|
在这里,我们使用StringDeserializer,并在消费者端使用一个“智能”消息转换器。多方法监听器
我们还可以使用单个侦听器容器并根据类型路由到特定方法。由于有多个方法,类型需要选择要调用的方法,因此这里我们就无法推断类型了。
相反,我们依赖于记录头中传递的类型信息来从源类型映射到目标类型。此外,由于我们不推断类型,我们需要配置消息转换器以“信任”包的映射类型。
在这种情况下,我们将在两侧使用消息转换器( StringSerializer和StringDeserializer 一起使用)。以下消费者侧转换器示例将它们放在一起:
@Bean public RecordMessageConverter converter() { StringJsonMessageConverter converter = new StringJsonMessageConverter(); DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID); typeMapper.addTrustedPackages("com.common"); Map<String, Class<?>> mappings = new HashMap<>(); mappings.put("foo", Foo2.class); mappings.put("bar", Bar2.class); typeMapper.setIdClassMapping(mappings); converter.setTypeMapper(typeMapper); return converter; }
|
在这里,我们将“foo”映射到类Foo2,将“bar” 映射到类Bar2。请注意,我们必须告诉它使用TYPE_ID标头来确定转换的类型。同样,Spring Boot会自动将消息转换器配置到容器中。下面是application.yml文件片段中的生产者端类型映射; 格式是以冒号分隔的token:FQCN列表:
spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: spring.json.type.mapping: foo:com.common.Foo1,bar:com.common.Bar1
|
此配置将类映射Foo1到“foo”,将类映射Bar1到“bar”。
监听器:
@Component @KafkaListener(id = "multiGroup", topics = { "foos", "bars" }) public class MultiMethods {
@KafkaHandler public void foo(Foo1 foo) { System.out.println("Received: " + foo); }
@KafkaHandler public void bar(Bar bar) { System.out.println("Received: " + bar); }
@KafkaHandler(isDefault = true) public void unknown(Object object) { System.out.println("Received unknown: " + object); }
}
|
生产者:
@RestController public class Controller {
@Autowired private KafkaTemplate<Object, Object> template;
@PostMapping(path = "/send/foo/{what}") public void sendFoo(@PathVariable String what) { this.template.send(new GenericMessage<>(new Foo1(what), Collections.singletonMap(KafkaHeaders.TOPIC, "foos"))); }
@PostMapping(path = "/send/bar/{what}") public void sendBar(@PathVariable String what) { this.template.send(new GenericMessage<>(new Bar(what), Collections.singletonMap(KafkaHeaders.TOPIC, "bars"))); }
@PostMapping(path = "/send/unknown/{what}") public void sendUnknown(@PathVariable String what) { this.template.send(new GenericMessage<>(what, Collections.singletonMap(KafkaHeaders.TOPIC, "bars"))); }
}
|
事务
通过在application.yml文件中设置transactional-id-prefix来启用事务:
spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer transaction-id-prefix: tx. consumer: properties: isolation.level: read_committed
|
当使用spring-kafka 1.3.x或更高版本以及支持事务的kafka-clients版本(0.11或更高版本)时,方法中KafkaTemplate执行的任何操作@KafkaListener都将参与事务,并且侦听器容器将在提交之前将偏移发送到事务它。认识到我们还为消费者设置了隔离级别,使其无法查看未提交的记录。以下示例暂停侦听器,以便我们可以看到此效果:
@KafkaListener(id = "fooGroup2", topics = "topic2") public void listen(List foos) throws IOException { logger.info("Received: " + foos); foos.forEach(f -> kafkaTemplate.send("topic3", f.getFoo().toUpperCase())); logger.info("Messages sent, hit enter to commit tx"); System.in.read(); }
@KafkaListener(id = "fooGroup3", topics = "topic3") public void listen(String in) { logger.info("Received: " + in); }
|
此示例的生产者在单个事务中发送多个记录:@PostMapping(path = "/send/foos/{what}") public void sendFoo(@PathVariable String what) { this.template.executeInTransaction(kafkaTemplate -> { StringUtils.commaDelimitedListToSet(what).stream() .map(s -> new Foo1(s)) .forEach(foo -> kafkaTemplate.send("topic2", foo)); return null; }); }
curl -X POST http://localhost:8080/send/foos/a,b,c,d,e
Received: [Foo2 [foo=a], Foo2 [foo=b], Foo2 [foo=c], Foo2 [foo=d], Foo2 [foo=e]] Messages sent, hit Enter to commit tx
Received: [A, B, C, D, E]
|
更多Spring Cloud Stream对Kafka的支持点击kafka标签进入