For https://github.com/influxdata/influxdb_iox, the future core of InfluxDB, we use https://kafka.apache.org/ to sequence data: Up until now, we have relied on https://github.com/fede1024/rust-rdkafka, which provides async binding for https://github.com/edenhill/librdkafka, which in turn is written in C. So why would we replace that? Since rust-rdkafka also only exposes a small subset of librdkafka’s functionality, we think that this might apply to other users as well.

29 // get a client for writing to a partition let partition_client = client .partition_client( topic.to_owned(), 0, // partition ) .await .unwrap(); // produce some data let record = Record { key: b"".to_vec(), value: b"hello kafka".to_vec(), headers: BTreeMap::from([ ("foo".to_owned(), b"bar".to_vec()), ]), timestamp: OffsetDateTime::now_utc(), }; partition_client.produce(vec![record]).await.unwrap(); // consume data let (records, high_watermark) = partition_client .fetch_records( 0, // offset 1..1_000_000, // min..max bytes 1_000, // max wait time

{ #[test] fn roundtrip_in16(orig: Int16) { let mut data = vec[]!; orig.write(&mut data).unwrap(); let restored = Int16::read(&mut Cursor::new(data)).unwrap(); assert_eq!(orig, restored); }

Related Articles