构建一个纯Rust异步的Apache Kafka客户端 - influxdata


InfluxDB的未来核心是InfluxDB IOx, 使用的是Apache  Kafka 对数据进行排序:
到目前为止,我们一直依赖于 rust-rdkafka,它为librdkafka提供异步绑定,而 librdkafka又是用 C 编写的。
那么我们为什么要替换它呢?以下是一些原因:

  • 复杂性: librdkafka 是一个复杂的库,其中包含大量我们不需要或不想要的功能,它支持广泛的 Kafka 版本,而我们基本上运行的是“最新版本”。由于 rust-rdkafka 也只公开了 librdkafka 功能的一小部分,我们认为这也可能适用于其他用户。
  • 绑定:  rust-rdkafka 试图将 librdkafka 塞入 Rust 异步生态系统。这在一定程度上可行,但会导致一些问题,例如当从不同线程执行回调时,tokio 会感到困惑。绑定本身也有一些限制。
  • 缓冲/模块化: 我们对 librdkafka 中的缓冲和批处理工作方式的控制有限。这是跨语言库的固有问题。
  • 专业知识和见解: 错误和意外行为很难调试。我们对在生产中使用当前状态感到不舒服。
  • 可行性: 我们只使用非常有限的 Kafka 功能子集(例如,没有事务),为此 Kafka 协议相当简单。对于这个子集,编写一个新客户端实际上是可行的。

这就是为什么我们决定在 Rust 中启动一个简单、新鲜、完全异步的 Kafka 客户端:  RSKafka
 
这是一个快速使用示例。首先,我们设置一个客户端:

let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap(); 

让我们创建一个主题:

let topic = "my_topic";
let controller_client = client.controller_client().await.unwrap();
controller_client.create_topic(
    topic,
    2,      
// partitions
    1,      
// replication factor
    5_000,  
// timeout (ms)
).await.unwrap(); 

然后我们生产和消费一些数据:

// get a client for writing to a partition
let partition_client = client
    .partition_client(
        topic.to_owned(),
        0,  
// partition
    )
    .await
    .unwrap();
 
// produce some data
let record = Record {
    key: b
"".to_vec(),
    value: b
"hello kafka".to_vec(),
    headers: BTreeMap::from([
        (
"foo".to_owned(), b"bar".to_vec()),
    ]),
    timestamp: OffsetDateTime::now_utc(),
};
partition_client.produce(vec![record]).await.unwrap();
 
// consume data
let (records, high_watermark) = partition_client
    .fetch_records(
        0,            
// offset
        1..1_000_000,  
// min..max bytes
        1_000,        
// max wait time
    )
    .await
    .unwrap(); 

您可能会直接跳到 源代码,但我也邀请您继续阅读并了解我们是如何构建它的,以及哪些实践通常适用于客户端库。
详细点击标题