RustRover – JetBrains 推出的独立 Rust IDE
“什么时候会有 Rust IDE?” 这是用户经常提出的问题(八年了,你知道这八年我怎么过的吗?),现在,JetBrains 宣布这一天已经到来:它就是 JetBrains 独立 Rust IDE – RustRover。
多年来 JetBrains 中 Rust 功能多以插件形式被支持。然而这次 JetBrains 收到社区对专门针对 Rust 及其生态系统的 IDE 的请求,推出了 rust IDE 并且新 IDE 的功能也要与现有 JetBrains IDE 相当!独立 IDE 也意味着会极大提升开发效率。
- https://blog.jetbrains.com/rust/2023/09/13/introducing-rustrover-a-standalone-rust-ide-by-jetbrains/
rust-kafka Rust Kafka 客户端库
一个在 librdkafka 基础上完全异步、基于 future 的 Rust Kafka 客户端库 目前提供的主要功能有:
- 支持自 0.8.x 以来的所有 Kafka 版本。有关代理兼容性选项的更多信息,请查看 librdkafka 文档。
- 从单个或多个 topic 消费。
- 自动 consumer rebalance。
- 可定制的 rebalance,带有 rebalance 前和 rebalance 后的回调。
- 同步或异步消息生成。
- 可定制的偏移量提交。
- 创建和删除 topic 以及添加和编辑 partition。
- 更改 broker 和 topic 配置。
- 访问集群元数据(主题分区、副本、broker 等列表)。
- 访问组元数据(列出组、列出组成员、主机名等)。
- 访问生产者和消费者指标、错误和回调。
- 通过幂等和事务性生产者以及已提交读取的消费者实现一次性语义 (EOS)。
rust-kafka 可以结合 tokio 进行异步生产
[dependencies]
rdkafka = { version = "0.25", features = ["cmake-build"] }
样例:
use std::thread;
use std::time::Duration;
use clap::{value_t, App, Arg};
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use log::info;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::Consumer;
use rdkafka::message::{BorrowedMessage, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use crate::example_utils::setup_logger;
mod example_utils;
async fn record_borrowed_message_receipt(msg: &BorrowedMessage<'_>) {
// Simulate some work that must be done in the same order as messages are
// received; i.e., before truly parallel processing can begin.
info!("Message received: {}", msg.offset());
}
async fn record_owned_message_receipt(_msg: &OwnedMessage) {
// Like `record_borrowed_message_receipt`, but takes an `OwnedMessage`
// instead, as in a real-world use case an `OwnedMessage` might be more
// convenient than a `BorrowedMessage`.
}
// Emulates an expensive, synchronous computation.
fn expensive_computation<'a>(msg: OwnedMessage) -> String {
info!("Starting expensive computation on message {}", msg.offset());
thread::sleep(Duration::from_millis(rand::random::<u64>() % 5000));
info!(
"Expensive computation completed on message {}",
msg.offset()
);
match msg.payload_view::<str>() {
Some(Ok(payload)) => format!("Payload len for {} is {}", payload, payload.len()),
Some(Err(_)) => "Message payload is not a string".to_owned(),
None => "No payload".to_owned(),
}
}
// Creates all the resources and runs the event loop. The event loop will:
// 1) receive a stream of messages from the `StreamConsumer`.
// 2) filter out eventual Kafka errors.
// 3) send the message to a thread pool for processing.
// 4) produce the result to the output topic.
// `tokio::spawn` is used to handle IO-bound tasks in parallel (e.g., producing
// the messages), while `tokio::task::spawn_blocking` is used to handle the
// simulated CPU-bound task.
async fn run_async_processor(
brokers: String,
group_id: String,
input_topic: String,
output_topic: String,
) {
// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", &group_id)
.set("bootstrap.servers", &brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
consumer
.subscribe(&[&input_topic])
.expect("Can't subscribe to specified topic");
// Create the `FutureProducer` to produce asynchronously.
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
// Create the outer pipeline on the message stream.
let stream_processor = consumer.stream().try_for_each(|borrowed_message| {
let producer = producer.clone();
let output_topic = output_topic.to_string();
async move {
// Process each message
record_borrowed_message_receipt(&borrowed_message).await;
// Borrowed messages can't outlive the consumer they are received from, so they need to
// be owned in order to be sent to a separate thread.
let owned_message = borrowed_message.detach();
record_owned_message_receipt(&owned_message).await;
tokio::spawn(async move {
// The body of this block will be executed on the main thread pool,
// but we perform `expensive_computation` on a separate thread pool
// for CPU-intensive tasks via `tokio::task::spawn_blocking`.
let computation_result =
tokio::task::spawn_blocking(|| expensive_computation(owned_message))
.await
.expect("failed to wait for expensive computation");
let produce_future = producer.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
Duration::from_secs(0),
);
match produce_future.await {
Ok(delivery) => println!("Sent: {:?}", delivery),
Err((e, _)) => println!("Error: {:?}", e),
}
});
Ok(())
}
});
info!("Starting event loop");
stream_processor.await.expect("stream processing failed");
info!("Stream processing terminated");
}
#[tokio::main]
async fn main() {
let matches = App::new("Async example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Asynchronous computation example")
.arg(
Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"),
)
.arg(
Arg::with_name("group-id")
.short("g")
.long("group-id")
.help("Consumer group id")
.takes_value(true)
.default_value("example_consumer_group_id"),
)
.arg(
Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true),
)
.arg(
Arg::with_name("input-topic")
.long("input-topic")
.help("Input topic")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("output-topic")
.long("output-topic")
.help("Output topic")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("num-workers")
.long("num-workers")
.help("Number of workers")
.takes_value(true)
.default_value("1"),
)
.get_matches();
setup_logger(true, matches.value_of("log-conf"));
let brokers = matches.value_of("brokers").unwrap();
let group_id = matches.value_of("group-id").unwrap();
let input_topic = matches.value_of("input-topic").unwrap();
let output_topic = matches.value_of("output-topic").unwrap();
let num_workers = value_t!(matches, "num-workers", usize).unwrap();
(0..num_workers)
.map(|_| {
tokio::spawn(run_async_processor(
brokers.to_owned(),
group_id.to_owned(),
input_topic.to_owned(),
output_topic.to_owned(),
))
})
.collect::<FuturesUnordered<_>>()
.for_each(|_| async { () })
.await
}
From 日报小组 侯盛鑫
社区学习交流平台订阅:
1
共 0 条评论, 1 页
评论区
写评论还没有评论