如今,许多应用程序使用事件流和消息发布系统相互通信。Apache Kafka是一种分布式流媒体平台,该平台主要使发布和订阅主题变得简单,并通过并行化使用者来获得出色的性能。本文适用于希望使用Kafka的任何人,并且可能会利用另一个框架Apache Camel提供的抽象来做一个简单的基本用法。Apache Camel是一个企业集成框架(我称之为集成瑞士刀),它包含数百种现成的组件,用于与企业界和开放源代码库,框架和技术进行集成。
目标
我们的目标是为我们的微服务项目共享一种产生和使用来自每个Kafka主题的消息的方式,而不必担心消息的性质,并使用一种抽象模型来封装我们可以交换的所有可能消息。
完整代码: https://github.com/dariux2016/template-projects/tree/master/camel-kafka-producer
模型-MessageWrapper
首先,我们已经开始定义一个模型,目的是抽象出我们将交换的每条消息。考虑到这一点,我们定义了以下类:
import java.io.Serializable; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; /** * Model the message which will be sent to (and will be consumed from) a Kafka topic. * Encapsulate a payload which is the real object published to the topic. * Wrap some utility and system informations such as the timestamp, the caller module and the message type. */ @Getter @Setter @ToString @EqualsAndHashCode @Builder @AllArgsConstructor @NoArgsConstructor public class MessageWrapper implements Serializable{ |
在MessageWrapper模型中,我们具有以下字段:
- timestamp:存储将消息发布到Kafka主题的时间
- callerModule:包含有关谁是消息的发布者的信息(如果需要)
- messageType:自定义字段,用于根据我们想与Kafka主题交换的Java类来定义消息的类型
- payload:具有JSON表示形式的字符串,封装了交换的真实消息
然后我们有MessageType枚举,定义如下:
/** * Type of payload exchanged through the Kafka topics */ public enum MessageType { STRING(String.class), ITEM(Item.class); private final Class value; private MessageType(Class value) { this.value = value; } public Class value() { return value; } public static MessageType typeValue(Class v) { for (MessageType mt: MessageType.values()) { if (mt.value.equals(v)) { return mt; } } throw new IllegalArgumentException(v.toString()); } } |
通过这种枚举,我们说我们可以交换两种类型的消息:简单的字符串和项(更复杂的对象)。为了给您一个想法,Item类可以如下:
import java.io.Serializable; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; @Getter @Setter @ToString @EqualsAndHashCode @Builder @AllArgsConstructor @NoArgsConstructor public class Item implements Serializable { private static final long serialVersionUID = 1907569777241442102L; private String code; private String name; } |
ProducerService
现在,我们需要一个中心位置,将所有指向Kafka主题的消息封装在MessageWrapper中,这个位置是ProducerService 类。我们之所以称其为“生产者”,是因为仅在向Kafka生成消息时才调用它,而在消费消息时则不会调用它。该ProducerService类的实现如下:
import java.util.Date; import org.apache.camel.ProducerTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.dariux2016.kafka.model.MessageType; import com.github.dariux2016.kafka.model.MessageWrapper; @Service public class ProducerService { private static final Logger LOGGER = LoggerFactory.getLogger(ProducerService.class); @Autowired private ProducerTemplate producerTemplate; private MessageWrapper encapsulateMessage(String payload, Class payloadType) { Date timestamp = new Date(); MessageType messageType = MessageType.typeValue(payloadType); return MessageWrapper.builder() .timestamp(timestamp.getTime()) .messageType(messageType) .payload(payload).build(); } private String convertPayloadToJson(Object payload) { ObjectMapper objectMapper = new ObjectMapper(); String json = null; try { json = objectMapper.writeValueAsString(payload); } catch (JsonProcessingException e) { throw new RuntimeException("Cannot convert payload to json"); } return json; } /** * Send payload to the endpointUri * @param endpointUri * @param payload */ public void sendBody(String endpointUri, Object payload) { LOGGER.info("Publishing a message on {}", endpointUri); String payloadJson = convertPayloadToJson(payload); MessageWrapper body = encapsulateMessage(payloadJson, payload.getClass()); producerTemplate.sendBody(endpointUri, body); } /** * Send payload to the endpointUri without converting * @param endpointUri * @param payload */ public void send(String endpointUri, Object payload) { producerTemplate.sendBody(endpointUri, payload); } } |
基本上,核心组件是Camel框架的ProducerTemplate:这是一个通用组件,具有将对象(我们也称为有效负载)发布和发送到特定端点的功能。在我们的例子中,端点是指定Kafka主题的端点。
在ProducerService类中,我们有两个sendBody方法:最简单的方法(第47-49行)将负载直接发送到Camel端点;相反,另一个sendBody方法(第35-40行)对我们很有用,以便以标准形式交换消息并执行以下步骤:
- 接收有效载荷并将其转换为JSON格式
- 使用encapsulationMessage方法构建MessageWrapper对象
- 将MessageWrapper对象发送到相对于Kafka主题的Camel端点
因此,使用此类,我们可以获取任何类型的消息,将其封装在MessageWrapper对象中并发布到Kafka主题。
消息序列化和反序列化
在生成和使用有关Kafka主题的消息时,我们可以选择指定自定义序列化程序以及自定义反序列化程序。我们的目标是以可互换且独特的方式交换消息,因此使用自定义组件进行序列化和反序列化就是这种情况。
然后,我们实现了一个序列化程序MessageWrapperSerializer类,该类唯一负责通过serialize方法将JSON中的MessageWrapper转换为JSON字符串,如下所示:
import java.util.Map; import org.apache.kafka.common.serialization.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.dariux2016.kafka.model.MessageWrapper; /** * Custom Kafka Serializer for MessageWrapper * */ public class MessageWrapperSerializer implements Serializer<MessageWrapper> { private static final Logger LOGGER = LoggerFactory.getLogger(MessageWrapperSerializer.class); @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String arg0, MessageWrapper arg1) { LOGGER.info("Serializing message wrapper {}", arg1); byte[] retVal = null; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(arg1).getBytes(); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } return retVal; } @Override public void close() { } } |
然后,我们实现了各自的反序列化器MessageWrapperDeserializer类,以获取从Kafka主题中使用的JSON,将其转换为MessageWrapper对象,并使用反序列化deserialize方法获取我们感兴趣的有效负载,如下所示:
import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.dariux2016.kafka.model.MessageWrapper; /** * Custom Kafka deserializer for MessageWrapper * */ public class MessageWrapperDeserializer implements Deserializer<Object> { private static final Logger LOGGER = LoggerFactory.getLogger(MessageWrapperDeserializer.class); @Override public void close() { } @Override public void configure(Map<String, ?> arg0, boolean arg1) { } @Override public Object deserialize(String topic, byte[] data) { LOGGER.info("Deserializing from topic {}",topic); ObjectMapper mapper = new ObjectMapper(); Object payload = null; try { MessageWrapper message = mapper.readValue(data, MessageWrapper.class); String payloadString = message.getPayload(); payload = mapper.readValue(payloadString, message.getMessageType().value()); LOGGER.info("Deserialized payload {}", payload); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } return payload; } } |
环境配置
为了加快环境配置,我将Docker与以下docker-compose.yml设置结合使用:
version: "3" services: zookeeper: image: wurstmeister/zookeeper ports: - 2181:2181 kafka: image: wurstmeister/kafka ports: - 9092:9092 - 29092:29092 depends_on: - zookeeper expose: - '9092' environment: KAFKA_LISTENERS: DOCKER://0.0.0.0:29092,LOCAL://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:29092,LOCAL://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,LOCAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" |
然后运行:
docker-compose up
Kafka 配置
除了环境之外,在应用程序级别,我们还必须配置参数以与Kafka正确通信。然后,我们创建了以下application.yml:
kafka configuration kafka: brokers: localhost:9092 serializer-class: com.github.dariux2016.kafka.converter.MessageWrapperSerializer deserializer-class: com.github.dariux2016.kafka.converter.MessageWrapperDeserializer reconnect-backoff-ms: 2000 max-block-ms: 5000 kafka consumer-producer configs kafkaProducerConsumerProperties: ?brokers={{kafka.brokers}}&valueDeserializer={{kafka.deserializer-class}}&serializerClass={{kafka.serializer-class}}&maxBlockMs={{kafka.max-block-ms}}&reconnectBackoffMaxMs={{kafka.reconnect-backoff-ms}} kafka-uri-base: kafka:EXAMPLE-TOPIC |
在此配置中,我们进行了一些设置:
- 与Kafka经纪人的联系
- 一些Kafka Producer和Consumer属性,例如我们自定义的消息序列化程序和反序列化程序
- Kafka URI是我们感兴趣的主题的基础,在这里我用EXAMPLE-TOPIC命名
Camel配置:
到目前为止,我们所看到的都是针对产生消息的(MessageWrapperDeserializer组件除外)。现在,我们了解如何设置路线,该路线使用我们通过ProducerService发布的Kafka主题中的MessageWrapper对象。
在文件夹src / main / resources内,我们可以创建一个文件夹“ camel”,在其中可以放置一些XML文件作为Camel路由加载。用于消耗的消息的Came路由路线简单如下所示:
<?xml version="1.0" encoding="UTF-8"?> <routes xmlns="http://camel.apache.org/schema/spring" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <route> <!-- uri properties from application.yml --> <from uri="{{kafka-uri-base}}{{kafkaProducerConsumerProperties}}"></from> <bean ref="consumerBean" method="consume"></bean> </route> </routes> |
使用application.yml中配置的属性启动路由时,将替换URI。从该端点进行消费时,它被称为组件,它是到达Kafka主题的消息的简单消费方:ConsumerBean。
消费者bean就是Spring组件类,如下所示:
import org.apache.camel.Body; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Component public class ConsumerBean { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerBean.class); /** * Get the input object from the current exchange * @param exchange */ public void consume(@Body Object body) { LOGGER.info("Consuming a message {}", body); } } |
测试代码:
@RunWith(SpringRunner.class) @ContextConfiguration(classes = Starter.class, initializers = ConfigFileApplicationContextInitializer.class) public class CamelKafkaProducerTest { private static final Logger LOGGER = LoggerFactory.getLogger(CamelKafkaProducerTest.class); @Value("${kafka-uri-base}") private String kafkaBaseUri; @Value("${kafkaProducerConsumerProperties}") private String kafkaProperties; @Autowired private ProducerService producerService; @Test public void test() throws Exception { String kafkaEndpoint = kafkaBaseUri + kafkaProperties; LOGGER.info("Test publishing a String"); producerService.sendBody(kafkaEndpoint, "hello world"); LOGGER.info("Test publishing an Item"); Item item = Item.builder().code("A").name("first item").build(); producerService.sendBody(kafkaEndpoint, item); //mantain the camel context up and running Thread.sleep(5000); } } |
通过此测试,我向Kafka主题发布了两条消息:
- 字符串“ hello world”
- 代码为“ A”且名称为“ first item”的商品
两者都封装到MessageWrapper中,并发布到Kafka。在Camel路由通过配置被开始使用之后,它启动MessageWrapperDeserializer,它接收有效负载并将其放入Camel Exchange的主体body部分。这样做,ConsumerBean将简单地获取并使用。