< 返回版块

三米前有蕉皮 发表于 2025-10-14 13:47

Tags:redis, worker,task-queue

https://github.com/emo-crab/asynq

Asynq 是一个用 Rust 编写的简单、可靠、高效的分布式任务队列库,基于 Redis 存储,灵感来自 hibiken/asynq

🔗 完全兼容 Go 版本 asynq: 本实现与 Go 版本的 hibiken/asynq 完全兼容,可以与 Go 服务无缝协作。

🌟 特性

  • 保证至少执行一次 - 任务不会丢失
  • 任务调度 - 支持延迟任务和定时任务
  • 🔄 自动重试 - 失败任务可配置重试策略
  • 🛡️ 故障恢复 - 工作者崩溃时自动恢复任务
  • 🎯 优先级队列 - 支持加权和严格优先级
  • 低延迟 - Redis 写入速度快,任务入队延迟低
  • 🔒 任务去重 - 支持唯一任务选项
  • ⏱️ 超时控制 - 每个任务支持超时和截止时间
  • 📦 任务聚合 - 支持批量处理多个任务
  • 🔌 灵活接口 - 支持中间件和自定义处理器
  • ⏸️ 队列暂停 - 可以暂停/恢复特定队列
  • 🕒 周期性任务 - 支持 cron 风格的定时任务
  • 🏠 高可用性 - 支持 Redis Cluster
  • 🖥️ Web UI - 提供队列和任务的 Web 管理界面
  • 🔄 Go 兼容 - 与 Go 版本 asynq 完全兼容,可混合部署
  • 🎯 宏支持 - 提供类似 actix-web 的属性宏,方便注册处理器(可选功能)

🚀 快速开始

添加依赖

在你的 Cargo.toml中添加:

[dependencies]
asynq = { version = "0.1", features = ["json"] }
## 启用宏支持(可选)
# asynq = { version = "0.1", features = ["json", "macros"] }
## or dev channel
#asynq = { git = "https://github.com/emo-crab/asynq", branch = "main" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }

基本用法

生产者 (发送任务)

use asynq::{client::Client, task::Task, redis::RedisConfig};
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize)]
struct EmailPayload {
    to: String,
    subject: String,
    body: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建 Redis 配置
    let redis_config = RedisConfig::from_url("redis://127.0.0.1:6379")?;
    
    // 创建客户端
    let client = Client::new(redis_config).await?;
    
    // 创建任务
    let payload = EmailPayload {
        to: "user@example.com".to_string(),
        subject: "Welcome!".to_string(),
        body: "Welcome to our service!".to_string(),
    };

    let task = Task::new_with_json("email:send", &payload)?;
    
    // 发送任务到队列
    let task_info = client.enqueue(task).await?;
    println!("Task enqueued with ID: {}", task_info.id);

    Ok(())
}

消费者 (处理任务)

use asynq::{server::Server,server::Handler,task::Task, redis::RedisConfig, config::ServerConfig};
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use std::collections::HashMap;

#[derive(Serialize, Deserialize)]
struct EmailPayload {
    to: String,
    subject: String,
    body: String,
}

struct EmailProcessor;

#[async_trait]
impl Handler for EmailProcessor {
    async fn process_task(&self, task: Task) -> asynq::error::Result<()> {
        match task.get_type() {
            "email:send" => {
                let payload: EmailPayload = task.get_payload_with_json()?;
                println!("Sending email to: {}", payload.to);
                // 执行实际的邮件发送逻辑
                Ok(())
            }
            _ => {
                Err(asynq::error::Error::other("Unknown task type"))
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Redis 配置
    let redis_config = RedisConfig::from_url("redis://127.0.0.1:6379")?;
    
    // 配置队列
    let mut queues = HashMap::new();
    queues.insert("critical".to_string(), 6);
    queues.insert("default".to_string(), 3);
    queues.insert("low".to_string(), 1);
    
    // 服务器配置
    let config = ServerConfig::new()
        .concurrency(4)
        .queues(queues);
    
    // 创建服务器
    let mut server = Server::new(redis_config, config).await?;
    
    // 启动服务器
    server.run(EmailProcessor).await?;

    Ok(())
}

使用 ServeMux 路由任务

ServeMux 提供了类似 Go 版本的任务路由功能,可以根据任务类型自动路由到不同的处理器:

use asynq::{serve_mux::ServeMux, task::Task, redis::RedisConfig, config::ServerConfig, server::ServerBuilder};
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_config = RedisConfig::from_url("redis://127.0.0.1:6379")?;
    
    // 创建 ServeMux
    let mut mux = ServeMux::new();
    
    // 注册同步处理器
    mux.handle_func("email:send", |task: Task| {
        println!("Processing email:send {:?}",task);
        Ok(())
    });
    
    // 注册异步处理器
    mux.handle_async_func("image:resize", |task: Task| async move {
        println!("Processing image:resize {:?}",task);
        // 异步处理逻辑
        Ok(())
    });

    mux.handle_func("payment:process", |task: Task| {
        println!("Processing payment {:?}",task);
        Ok(())
    });
    
    // 配置服务器
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    let config = ServerConfig::new().concurrency(4).queues(queues);
    
    // 创建并运行服务器
    let mut server = ServerBuilder::new()
        .redis_config(redis_config)
        .server_config(config)
        .build()
        .await?;
    
    // ServeMux 实现了 Handler trait,可以直接传递给 server.run()
    server.run(mux).await?;

    Ok(())
}

特点:

  • 🎯 自动根据任务类型路由到对应的处理器
  • ⚡ 支持同步 (handle_func) 和异步 (handle_async_func) 处理器
  • 🔄 与 Go 版本的 ServeMux 完全兼容
  • 🛡️ 类型安全,编译时检查
  • 📝 简洁的 API,易于使用

更多示例请参考 examples/servemux_example.rs

任务处理器宏(可选功能)

启用 macros 功能后,可以使用类似 actix-web 路由宏的属性宏,更简洁地定义处理器:

use asynq::{
    serve_mux::ServeMux, 
    task::Task, 
    task_handler, 
    task_handler_async,
    register_handlers,
    register_async_handlers,
    redis::RedisConfig, 
    config::ServerConfig, 
    server::ServerBuilder
};
use std::collections::HashMap;

// 使用属性宏定义处理器
#[task_handler("email:send")]
fn handle_email(task: Task) -> asynq::error::Result<()> {
    println!("Processing email:send");
    Ok(())
}

#[task_handler_async("image:resize")]
async fn handle_image(task: Task) -> asynq::error::Result<()> {
    println!("Processing image:resize");
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_config = RedisConfig::from_url("redis://127.0.0.1:6379")?;
    
    // 创建 ServeMux 并使用便捷宏注册处理器
    let mut mux = ServeMux::new();
    register_handlers!(mux, handle_email);
    register_async_handlers!(mux, handle_image);
    
    // 配置并运行服务器
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    let config = ServerConfig::new().concurrency(4).queues(queues);
    
    let mut server = ServerBuilder::new()
        .redis_config(redis_config)
        .server_config(config)
        .build()
        .await?;
    
    server.run(mux).await?;
    Ok(())
}

宏特性:

  • 🎯 声明式语法: 使用简洁的属性语法定义处理器
  • 📝 减少样板代码: 模式字符串与函数自动关联
  • 🔧 便捷注册: 使用 register_handlers!register_async_handlers!
  • 🌐 熟悉的模式: 类似 actix-web 的 #[get("/path")] 路由宏

完整示例请参考 examples/macro_example.rs

📚 高级用法

延迟任务

use std::time::Duration;
// 延迟 5 分钟执行
client.enqueue_in(task, Duration::from_secs(300)).await?;

唯一任务 (去重)

use std::time::Duration;

// 在 1 小时内保持唯一性
let unique_task = Task::new_with_json("report:daily", &payload)?;
client.enqueue_unique(unique_task, Duration::from_secs(3600)).await?;

任务组 (批处理)

// 将任务添加到组中进行聚合
for i in 1..=10 {
    let item_task = Task::new_with_json("batch:process", &serde_json::json!({"item": i}))?;
    client.add_to_group(item_task, "daily_batch").await?;
}

任务选项

let task = Task::new_with_json("image:resize", &payload)?
    .with_queue("image_processing")     // 指定队列
    .with_max_retry(5)                  // 最大重试次数
    .with_timeout(Duration::from_secs(300)) // 超时时间
    .with_unique_ttl(Duration::from_secs(3600)); // 唯一性TTL

优先级队列

let mut queues = HashMap::new();
queues.insert("critical".to_string(), 6);  // 最高优先级
queues.insert("default".to_string(), 3);   // 中等优先级
queues.insert("low".to_string(), 1);       // 低优先级

let config = ServerConfig::new()
    .queues(queues)
    .strict_priority(true); // 严格优先级模式

🏗️ 架构设计

Asynq 采用模块化设计,主要组件包括:

asynq/
├── src/
│   ├── lib.rs              # 库入口和公共API
│   ├── client.rs           # 客户端实现
│   ├── server.rs           # 服务器实现
│   ├── serve_mux.rs         # ServeMux 路由实现 (兼容 Go servemux.go)
│   ├── processor.rs        # 处理器实现 (兼容 Go processor.go)
│   ├── task.rs             # 任务相关数据结构
│   ├── error.rs            # 错误处理
│   ├── config.rs           # 配置管理
│   ├── redis.rs            # Redis连接管理
│   ├── inspector.rs        # 队列检查器
│   └── broker/             # 存储后端抽象层
│       ├── mod.rs          # Broker特征定义
│       └── redis_broker.rs # Redis实现
├── proto/
│   └── asynq.proto         # Protocol Buffer定义
└── examples/
    ├── producer.rs         # 生产者示例
    ├── consumer.rs         # 消费者示例
    ├── servemux_example.rs # ServeMux 使用示例
    └── processor_example.rs # 处理器示例

核心组件

  • Client: 负责将任务加入队列
  • Server: 负责从队列中取出任务并处理
  • ServeMux: 任务路由多路复用器,根据任务类型路由到不同处理器(兼容 Go servemux.go)
  • Processor: 任务处理器核心,负责并发控制和任务执行(兼容 Go asynq processor.go)
  • Aggregator: 任务聚合器,将同组任务聚合为批处理任务(兼容 Go asynq aggregator.go)
  • Broker: 存储后端抽象层,目前支持Redis
  • Task: 任务数据结构,包含类型、负载和选项
  • Handler: 任务处理器特征,用户需要实现此接口
  • Inspector: 队列和任务的检查和管理工具

Processor 特性

Processor 模块实现了与 Go asynq processor.go 兼容的任务处理架构:

  • 信号量并发控制: 使用 Tokio Semaphore 精确控制并发工作者数量
  • 队列优先级: 支持严格优先级和加权优先级两种模式
  • 任务超时: 支持任务级别和全局超时设置
  • 优雅关闭: 等待所有活跃工作者完成后再关闭
  • 自动重试: 失败任务自动重试,支持指数退避策略
  • 任务归档: 达到最大重试次数后自动归档任务

GroupAggregator 特性

GroupAggregator 模块实现了与 Go asynq aggregator.go 兼容的任务聚合功能:

  • 任务分组: 通过 with_group() 为任务设置组标签
  • 批量聚合: 自动将同组任务聚合成单个批处理任务
  • 灵活触发: 支持宽限期、最大组大小、最大延迟三种触发条件
  • 自定义聚合: 通过 GroupAggregator trait 自定义聚合逻辑
  • 函数式接口: 使用 GroupAggregatorFunc 快速创建聚合器

示例用法:

use asynq::components::aggregator::GroupAggregatorFunc;

// 定义聚合函数
let aggregator = GroupAggregatorFunc::new(|group, tasks| {
    // 将多个任务合并为一个批处理任务
    let combined = tasks.iter()
        .map(|t| t.get_payload())
        .collect::<Vec<_>>()
        .join(&b"\n"[..]);
    Task::new("batch:process", &combined)
});

// 设置到服务器
server.set_group_aggregator(aggregator);

🛠️ 配置选项

服务器配置

use asynq::config::ServerConfig;
use std::time::Duration;

let config = ServerConfig::new()
    .concurrency(8)                                          // 并发工作者数量
    .task_check_interval(Duration::from_secs(1))            // 任务检查间隔
    .delayed_task_check_interval(Duration::from_secs(5))    // 延迟任务检查间隔
    .shutdown_timeout(Duration::from_secs(10))              // 关闭超时
    .health_check_interval(Duration::from_secs(15))         // 健康检查间隔
    .group_grace_period(Duration::from_secs(60))?           // 组聚合宽限期
    .group_max_delay(Duration::from_secs(300))              // 组最大延迟
    .group_max_size(100)                                    // 组最大大小
    .janitor_interval(Duration::from_secs(8))               // 清理间隔
    .janitor_batch_size(50);                                // 清理批量大小

Redis 配置

use asynq::redis::{RedisConfig, PoolConfig};
use std::time::Duration;

// 基本配置
let redis_config = RedisConfig::from_url("redis://127.0.0.1:6379")?;

// 高级配置
let pool_config = PoolConfig {
    max_size: 20,
    min_idle: Some(5),
    connection_timeout: Duration::from_secs(30),
    idle_timeout: Some(Duration::from_secs(600)),
    max_lifetime: Some(Duration::from_secs(1800)),
};

let redis_config = RedisConfig::from_url("redis://127.0.0.1:6379")?
    .with_pool_config(pool_config);

📊 监控和管理

队列检查器

use asynq::base::keys::TaskState;
use asynq::inspector::Inspector;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let inspector = Inspector::new(broker);

    // Get queue statistics
    let stats = inspector.get_queue_stats("default").await?;
    println!("Pending: {}, Active: {}", stats.pending, stats.active);

    // List tasks
    let tasks = inspector.list_tasks("default", TaskState::Pending, 1, 10).await?;

    // Requeue archived task
    inspector.requeue_archived_task("default", "task-id").await?;

    // Pause queue
    inspector.pause_queue("default").await?;

    Ok(())
}

🔧 开发指南

本地开发

  1. 克隆仓库:
git clone https://github.com/emo-crab/asynq.git
cd asynq
  1. 安装依赖:
cargo build
  1. 启动 Redis:
docker run -d -p 6379:6379 redis:alpine
  1. 运行示例:
# 终端1: 启动消费者
cargo run --example consumer

# 终端2: 运行生产者
cargo run --example producer

运行测试

# 单元测试
cargo test

# 集成测试(需要Redis)
cargo test --features integration-tests

🤝 贡献

我们欢迎各种形式的贡献!请阅读 CONTRIBUTING.md 了解详细信息。

开发原则

  • 使用 Rust 的特性和最佳实践
  • 保持API的简洁和易用
  • 提供完整的中文文档
  • 确保代码质量和测试覆盖率
  • 遵循语义化版本

📝 许可证

本项目采用MIT License OR GPL License

🙏 致谢

  • 感谢 hibiken/asynq 提供的设计灵感
  • 感谢 Rust 社区提供的优秀库支持

📞 联系

如果你有任何问题或建议,请:


⭐ 如果这个项目对你有帮助,请给我们一个 star!


Ext Link: https://github.com/emo-crab/asynq

评论区

写评论

还没有评论

1 共 0 条评论, 1 页