// get a client for writing to a partition
let partition_client = client
.partition_client(
topic.to_owned(),
0, // partition
)
.await
.unwrap();
// produce some data
let record = Record {
key: b"".to_vec(),
value: b"hello kafka".to_vec(),
headers: BTreeMap::from([
("foo".to_owned(), b"bar".to_vec()),
]),
timestamp: OffsetDateTime::now_utc(),
};
partition_client.produce(vec![record]).await.unwrap();
// consume data
let (records, high_watermark) = partition_client
.fetch_records(
0, // offset
1..1_000_000, // min..max bytes
1_000, // max wait time
)
.await
.unwrap();
|