使用高速缓存Serde加速Kafka反序列化性能 - Kaszuba


Kafka内部世界是在字节级别上存储状态的,Serde负责在外部领域语言和Kafka世界之间进行翻译,但会造成一定的性能损失,因为读写需要“始终”通过Serde,尤其是在使用诸如Avro之类的重型Serdes时。
Kafka峰会上,彭博社的Lei Chen很好地解释了反序列化的性能问题。有几种解决此问题的方法。彭博社采取的方法是创建自定义状态存储。这是一个非常好的解决方案,并且可以在所有情况下使用,但实际上我很困惑为什么默认情况下框架没有这样的状态存储。如何执行此操作以及与此实现相关的问题可以在此处找到。但是还有一种更简单的方法可能足以满足您的用例,即缓存Serde。
如果您的数据在对状态存储的读取或写入时都不经常更改,则在Serde级别上进行缓存可能就足够了。它实现起来很简单,并且不需要对流api发生任何干扰。
我将只关注反序列化,但是相同的概念也适用于序列化。缓存反序列化器通过维护内存中缓存来工作。读取时,首先搜索缓存以查看该值是否已反序列化,如果已经返回,则返回该值;如果尚未返回,则调用内部反序列化器,并将反序列化的值存储在缓存中。如果达到高速缓存的最大大小,它将开始清除最早的条目。任何类型的缓存都可以用于此目的,下面介绍的一种使用LinkedHashMap,因为它的功能类似于堆栈,因此当达到一定大小的缓存时,可以删除最旧的条目。

package tkaszuba;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.Map;

public class CachingDeserializer<T> implements Deserializer<T> {
    private static final Logger logger = LoggerFactory.getLogger(CachingDeserializer.class);

    private final LinkedHashMap<Bytes, T> cache = new LinkedHashMap<>();
    private final Deserializer<T> inner;
    private final int cacheSize;

    public CachingDeserializer(Deserializer<T> inner) {
        this(500, inner);
    }

    public CachingDeserializer(int cacheSize, Deserializer<T> inner) {
        this.cacheSize = cacheSize;
        this.inner = inner;
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        Bytes key = Bytes.wrap(bytes);
        if (cache.containsKey(key)) {
            logger.debug("Taking deserialized value from cache");
            return cache.get(key);
        }

        if (cache.size() == cacheSize)
            removeOldestEntryFromCache();

        T value = inner.deserialize(s, bytes);

        logger.debug(
"Adding deserialized value to cache");
        cache.put(key, value);

        return value;
    }

    public Map<Bytes, T> getCache() {
        return cache;
    }

    private void removeOldestEntryFromCache() {
        logger.debug(
"Removing oldest deserialized value from cache");
        cache.remove(cache.entrySet().iterator().next().getKey());
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        inner.configure(configs, isKey);
    }

    @Override
    public void close() {
        inner.close();
    }
}

CachingDeserializer使用Byte对象包装器作为键,而不是字节数组,因为它更易于使用。当达到缓存大小时,将使用迭代器删除列表中的第一项,因此无需进行昂贵的遍历。
使用反序列化器的方法如下:

@Test
void testCachingSerde() {
   KeyValue<Integer, String> keyValue1 = new KeyValue<>(1, "test");
   KeyValue<Integer, String> keyValue2 = new KeyValue<>(2,
"test");
   KeyValue<Integer, String> keyValue3 = new KeyValue<>(3,
"test");

   KeyValueSerializer<Integer, String> innerSerializer = new KeyValueSerializer<>(Serdes.Integer(), Serdes.String());
   KeyValueDeserializer<Integer, String> innerDeserializer = new KeyValueDeserializer<>(Serdes.Integer(), Serdes.String());

   try(CachingDeserializer<KeyValue<Integer, String>> deserializer = new CachingDeserializer<>(2, innerDeserializer)) {
      assertDoesNotThrow(() -> deserializer.configure(Collections.emptyMap(), false));

      byte[] value1 = innerSerializer.serialize(TOPIC, keyValue1);
      byte[] value2= innerSerializer.serialize(TOPIC, keyValue2);
      byte[] value3 = innerSerializer.serialize(TOPIC, keyValue3);

      assertEquals(keyValue1, deserializer.deserialize(TOPIC, value1));
      assertEquals(1, deserializer.getCache().size(),
"Should contain one item in the cache");

      assertEquals(keyValue1, deserializer.deserialize(TOPIC, value1));
      assertEquals(1, deserializer.getCache().size(),
"Should contain one item in the cache");
      assertEquals(keyValue1, deserializer.getCache().entrySet().iterator().next().getValue(),
"Should contain keyValue1");

      assertEquals(keyValue2, deserializer.deserialize(TOPIC, value2));
      assertEquals(2, deserializer.getCache().size(),
"Should contain two items in the cache");

      Iterator<Map.Entry<Bytes, KeyValue<Integer, String>>> iterator = deserializer.getCache().entrySet().iterator();
      assertEquals(keyValue1, iterator.next().getValue(),
"Should contain keyValue1");
      assertEquals(keyValue2, iterator.next().getValue(),
"Should contain keyValue2");

      assertEquals(keyValue3, deserializer.deserialize(TOPIC, value3));
      assertEquals(2, deserializer.getCache().size(),
"Should contain two items in the cache");

      iterator = deserializer.getCache().entrySet().iterator();
      assertEquals(keyValue2, iterator.next().getValue(),
"Should contain keyValue2");
      assertEquals(keyValue3, iterator.next().getValue(),
"Should contain keyValue3");

   }
}

注意:在缓存中存储复杂的可变对象(例如Avro)时,应克隆返回的对象,否则将修改缓存中的原始对象。直接从状态存储读取数据时,您无需担心,因为在反序列化过程中会克隆对象。您可能已将其包含在Serde中,具体取决于如何使用Serde。
因此,自然而然的问题是,为什么您不将所有内容都跳过而不直接将高速缓存保留在内存中,而无需在后台使用任何状态存储呢?我想这全都取决于您的用例,状态存储提供了容错能力并很好地处理了多个分区,但是确实引入了很多复杂性,这些复杂性有时很难处理。希望在新版本中,API将会增长,并且与它们一起使用将变得更加容易。