如何使用Java、Apache Camel和Kafka产生/使用消息? - Dario

20-07-08 banq

如今,许多应用程序使用事件流和消息发布系统相互通信。Apache Kafka是一种分布式流媒体平台,该平台主要使发布和订阅主题变得简单,并通过并行化使用者来获得出色的性能。本文适用于希望使用Kafka的任何人,并且可能会利用另一个框架Apache Camel提供的抽象来做一个简单的基本用法。Apache Camel是一个企业集成框架(我称之为集成瑞士刀),它包含数百种现成的组件,用于与企业界和开放源代码库,框架和技术进行集成。

目标

我们的目标是为我们的微服务项目共享一种产生和使用来自每个Kafka主题的消息的方式,而不必担心消息的性质,并使用一种抽象模型来封装我们可以交换的所有可能消息。

完整代码: https://github.com/dariux2016/template-projects/tree/master/camel-kafka-producer

模型-MessageWrapper

首先,我们已经开始定义一个模型,目的是抽象出我们将交换的每条消息。考虑到这一点,我们定义了以下类:

import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
 * Model the message which will be sent to (and will be consumed from) a Kafka topic.
 * Encapsulate a payload which is the real object published to the topic.
 * Wrap some utility and system informations such as the timestamp, the caller module and the message type.
 */
@Getter 
@Setter 
@ToString 
@EqualsAndHashCode
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageWrapper implements Serializable{

在MessageWrapper模型中,我们具有以下字段:

  • timestamp:存储将消息发布到Kafka主题的时间
  • callerModule:包含有关谁是消息的发布者的信息(如果需要)
  • messageType:自定义字段,用于根据我们想与Kafka主题交换的Java类来定义消息的类型
  • payload:具有JSON表示形式的字符串,封装了交换的真实消息

然后我们有MessageType枚举,定义如下:

/**
 * Type of payload exchanged through the Kafka topics
 */
public enum MessageType {
    
    STRING(String.class),
    ITEM(Item.class);
    
    private final Class value;
    
    private MessageType(Class value) {
        this.value = value;
    }
    
    public Class value() {
        return value;
    }
    
    public static MessageType typeValue(Class v) {
        for (MessageType mt: MessageType.values()) {
            if (mt.value.equals(v)) {
                return mt;
            }
        }
        throw new IllegalArgumentException(v.toString());
    }
}

通过这种枚举,我们说我们可以交换两种类型的消息:简单的字符串和项(更复杂的对象)。为了给您一个想法,Item类可以如下:

import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Getter 
@Setter 
@ToString
@EqualsAndHashCode
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Item implements Serializable {

    private static final long serialVersionUID = 1907569777241442102L;

    private String code;
    private String name;
}

ProducerService

现在,我们需要一个中心位置,将所有指向Kafka主题的消息封装在MessageWrapper中,这个位置是ProducerService 类。我们之所以称其为“生产者”,是因为仅在向Kafka生成消息时才调用它,而在消费消息时则不会调用它。该ProducerService类的实现如下:

import java.util.Date;

import org.apache.camel.ProducerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.dariux2016.kafka.model.MessageType;
import com.github.dariux2016.kafka.model.MessageWrapper;

@Service
public class ProducerService {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ProducerService.class);
    
    @Autowired
    private ProducerTemplate producerTemplate;
    
    private MessageWrapper encapsulateMessage(String payload, Class payloadType) {
        Date timestamp = new Date();
        MessageType messageType = MessageType.typeValue(payloadType);
        return MessageWrapper.builder()
                .timestamp(timestamp.getTime())
                .messageType(messageType)
                .payload(payload).build();
    }
    
    private String convertPayloadToJson(Object payload) {
        ObjectMapper objectMapper = new ObjectMapper();
        String json = null;
        try {
            json = objectMapper.writeValueAsString(payload);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Cannot convert payload to json");
        }
        
        return json;
    }
    
    /**
     * Send payload to the endpointUri
     * @param endpointUri
     * @param payload
     */
    public void sendBody(String endpointUri, Object payload) {
        LOGGER.info("Publishing a message on {}", endpointUri);
        String payloadJson = convertPayloadToJson(payload);
        MessageWrapper body = encapsulateMessage(payloadJson, payload.getClass());
        producerTemplate.sendBody(endpointUri, body);
    }
    
    /**
     * Send payload to the endpointUri without converting
     * @param endpointUri
     * @param payload
     */
    public void send(String endpointUri, Object payload) {
        producerTemplate.sendBody(endpointUri, payload);
    }

}

基本上,核心组件是Camel框架的ProducerTemplate:这是一个通用组件,具有将对象(我们也称为有效负载)发布和发送到特定端点的功能。在我们的例子中,端点是指定Kafka主题的端点。

在ProducerService类中,我们有两个sendBody方法:最简单的方法(第47-49行)将负载直接发送到Camel端点;相反,另一个sendBody方法(第35-40行)对我们很有用,以便以标准形式交换消息并执行以下步骤:

  • 接收有效载荷并将其转换为JSON格式
  • 使用encapsulationMessage方法构建MessageWrapper对象
  • 将MessageWrapper对象发送到相对于Kafka主题的Camel端点

因此,使用此类,我们可以获取任何类型的消息,将其封装在MessageWrapper对象中并发布到Kafka主题。

消息序列化和反序列化

在生成和使用有关Kafka主题的消息时,我们可以选择指定自定义序列化程序以及自定义反序列化程序。我们的目标是以可互换且独特的方式交换消息,因此使用自定义组件进行序列化和反序列化就是这种情况。

然后,我们实现了一个序列化程序MessageWrapperSerializer类,该类唯一负责通过serialize方法将JSON中的MessageWrapper转换为JSON字符串,如下所示:

import java.util.Map;

import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.dariux2016.kafka.model.MessageWrapper;

/**
 * Custom Kafka Serializer for MessageWrapper
 *
 */
public class MessageWrapperSerializer implements Serializer<MessageWrapper> {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageWrapperSerializer.class);

    @Override
    public void configure(Map<String, ?> map, boolean b) {    }

    @Override
    public byte[] serialize(String arg0, MessageWrapper arg1) {
        LOGGER.info("Serializing message wrapper {}", arg1);
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsString(arg1).getBytes();
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
        return retVal;
    }

    @Override
    public void close() {    }
}

然后,我们实现了各自的反序列化器MessageWrapperDeserializer类,以获取从Kafka主题中使用的JSON,将其转换为MessageWrapper对象,并使用反序列化deserialize方法获取我们感兴趣的有效负载,如下所示:

import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.dariux2016.kafka.model.MessageWrapper;

/**
 * Custom Kafka deserializer for MessageWrapper
 *
 */
public class MessageWrapperDeserializer implements Deserializer<Object> {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageWrapperDeserializer.class);
    
    @Override
    public void close() {    }
    
    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {    }
    
    @Override
    public Object deserialize(String topic, byte[] data) {
        LOGGER.info("Deserializing from topic {}",topic);
        ObjectMapper mapper = new ObjectMapper();
        Object payload = null;
        try {
            MessageWrapper message = mapper.readValue(data, MessageWrapper.class);
            String payloadString = message.getPayload();
            payload = mapper.readValue(payloadString, message.getMessageType().value());
            LOGGER.info("Deserialized payload {}", payload);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
        return payload;
    }
}

环境配置

为了加快环境配置,我将Docker与以下docker-compose.yml设置结合使用:

version: "3"

services:
  zookeeper:
     image: wurstmeister/zookeeper
     ports:
      - 2181:2181
      
  kafka:
     image: wurstmeister/kafka
     ports:
      - 9092:9092
      - 29092:29092
     depends_on:
     - zookeeper
     expose:
     - '9092'
     environment:
        KAFKA_LISTENERS: DOCKER://0.0.0.0:29092,LOCAL://0.0.0.0:9092
        KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:29092,LOCAL://localhost:9092
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,LOCAL:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

然后运行:

docker-compose up

Kafka 配置

除了环境之外,在应用程序级别,我们还必须配置参数以与Kafka正确通信。然后,我们创建了以下application.yml:

#kafka configuration
kafka:
  brokers: localhost:9092
  serializer-class: com.github.dariux2016.kafka.converter.MessageWrapperSerializer
  deserializer-class: com.github.dariux2016.kafka.converter.MessageWrapperDeserializer
  reconnect-backoff-ms: 2000
  max-block-ms: 5000
  
#kafka consumer-producer configs
kafkaProducerConsumerProperties: ?brokers={{kafka.brokers}}&valueDeserializer={{kafka.deserializer-class}}&serializerClass={{kafka.serializer-class}}&maxBlockMs={{kafka.max-block-ms}}&reconnectBackoffMaxMs={{kafka.reconnect-backoff-ms}}

kafka-uri-base: kafka:EXAMPLE-TOPIC

在此配置中,我们进行了一些设置:

  • 与Kafka经纪人的联系
  • 一些Kafka Producer和Consumer属性,例如我们自定义的消息序列化程序和反序列化程序
  • Kafka URI是我们感兴趣的主题的基础,在这里我用EXAMPLE-TOPIC命名

Camel配置:

到目前为止,我们所看到的都是针对产生消息的(MessageWrapperDeserializer组件除外)。现在,我们了解如何设置路线,该路线使用我们通过ProducerService发布的Kafka主题中的MessageWrapper对象。

在文件夹src / main / resources内,我们可以创建一个文件夹“ camel”,在其中可以放置一些XML文件作为Camel路由加载。用于消耗的消息的Came路由路线简单如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<routes xmlns="http://camel.apache.org/schema/spring"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
           http://www.springframework.org/schema/beans 
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://camel.apache.org/schema/spring 
           http://camel.apache.org/schema/spring/camel-spring.xsd">
    <route>
        <!-- uri properties from application.yml -->
        <from uri="{{kafka-uri-base}}{{kafkaProducerConsumerProperties}}"></from>
        <bean ref="consumerBean" method="consume"></bean>
    </route>
</routes>

使用application.yml中配置的属性启动路由时,将替换URI。从该端点进行消费时,它被称为组件,它是到达Kafka主题的消息的简单消费方:ConsumerBean。

消费者bean就是Spring组件类,如下所示:

import org.apache.camel.Body;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class ConsumerBean {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerBean.class);

    /**
     * Get the input object from the current exchange
     * @param exchange
     */
    public void consume(@Body Object body) {
        LOGGER.info("Consuming a message {}", body);
    }
}

测试代码:

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = Starter.class, initializers = ConfigFileApplicationContextInitializer.class)
public class CamelKafkaProducerTest {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(CamelKafkaProducerTest.class);
    
    @Value("${kafka-uri-base}")
    private String kafkaBaseUri;
    
    @Value("${kafkaProducerConsumerProperties}")
    private String kafkaProperties;
    
    @Autowired
    private ProducerService producerService;
    
    @Test
    public void test() throws Exception {
        String kafkaEndpoint = kafkaBaseUri + kafkaProperties;
        
        LOGGER.info("Test publishing a String");
        producerService.sendBody(kafkaEndpoint, "hello world");
        
        LOGGER.info("Test publishing an Item");
        Item item = Item.builder().code("A").name("first item").build();
        producerService.sendBody(kafkaEndpoint, item);

        //mantain the camel context up and running
        Thread.sleep(5000);
    }
}

通过此测试,我向Kafka主题发布了两条消息:

  • 字符串“ hello world”
  • 代码为“ A”且名称为“ first item”的商品

两者都封装到MessageWrapper中,并发布到Kafka。在Camel路由通过配置被开始使用之后,它启动MessageWrapperDeserializer,它接收有效负载并将其放入Camel Exchange的主体body部分。这样做,ConsumerBean将简单地获取并使用。

 

         

猜你喜欢