< 返回版块

c5soft 发表于 2021-09-03 18:27

Tags:axum,warp

最近花了点时间来学习axum, 并成功将一个用warp写的项目改用axum重写。axum太棒了,充分体现了rust这门语言的表达能力。

  1. 路由设计非常简洁,演示了Rust不用宏,也可以搞DSL的方法。
  2. Extractor与AddExtension极为灵活,简化了warp通过构建参数获取Request与环境数据的设计。
  3. 借用Tower生态提高了代码利用率。

axum非常稳定,压力测试中同时开15K并发妥妥的。在axum面世之前,warp是最棒的web框架,现在该是阿克苏姆担当主角了。由于两者都是基于hyper平台,从warp移植到axum也是分分钟的事。 下面贴出实战项目中两段代码main.rs与servce.rs。main.rs中演示了如何通过命令行参数切换,实现http与https两种服务,还演示了如何调用了静态文件服务功能。service.rs是放api的地方,演示了如何处理get与post请求,如何获取数据库中的数据,如何提供动态下载内容等功能。

//main.rs
mod addr;
mod base16;
mod bb8_tiberius;
mod ccb_gwk;
mod ccb_socket;
mod config;
mod context;
mod database;
mod json_helper;
mod json_value;
mod parse_exp;
mod parse_param;
mod service;
mod service_da;

use axum::{http::StatusCode, Router};
use tower_http::services::ServeDir;

use std::env::args;

use chrono::prelude::*;
use context::AppContext;
use json_helper::JsonHelper;
use json_value::JsonValue;

const VERSION: &str = "1.3.0";

#[tokio::main]
async fn main() {
    pretty_env_logger::init_timed();

    let is_https = args().nth(1).unwrap_or("http".into()) == "https";

    let context = AppContext::new().await;
    let ctx = context.clone();
    let config = &ctx.config;
    let server_config = &config["config"];
    let ctx = context.clone();
    let app = Router::new()
        .nest(
            "/",
            axum::service::get(ServeDir::new("D:/Js/OnlyOne/public")).handle_error(
                |error: std::io::Error| {
                    Ok::<_, std::convert::Infallible>((
                        StatusCode::INTERNAL_SERVER_ERROR,
                        format!("Unhandled internal error: {}", error),
                    ))
                },
            ),
        )
        .nest("/api", service::api(ctx));

    let addr = addr::Addr::new(server_config, is_https);
    let now = Local::now().to_string();
    let now = &now[0..19];
    println!(
        "{} HTTP{} Server V{} is starting at {:19}, {}",
        server_config["server_name"].string("W3"),
        if is_https { "S" } else { "" },
        VERSION,
        now,
        addr
    );

    let addr = addr.to_string_full();

    if is_https {
        axum_server::bind_rustls(addr)
            .private_key_file("key.pem")
            .certificate_file("cert.pem")
            .serve(app)
            .await
            .unwrap();
    } else {
        axum_server::bind(addr).serve(app).await.unwrap();
    }
}
//service.rs
use crate::base16;
use crate::ccb_gwk;
use crate::database;
use crate::parse_param;
use crate::service_da::{da_read_about, da_write_about, download_photos, DA_WEBP_DISABLE};
use crate::AppContext;
use crate::JsonHelper;
use crate::JsonValue;
use anyhow::{anyhow, Result};
use encoding::{all::GB18030, EncoderTrap, Encoding};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use tiberius::ToSql;
use tracing::info;

use axum::{
    extract::{Extension, Form, Query},
    response::Json,
    handler::{get, post},
    http::header::{HeaderMap, HeaderName, HeaderValue},
    routing::BoxRoute,
    AddExtensionLayer, Router,
};

fn string_to_gb18030bytes(string: &str) -> Result<Vec<u8>> {
    GB18030
        .encode(string, EncoderTrap::Strict)
        .map_err(|e| anyhow!("string_to_gb18030bytes failure: {:?}", e))
}

pub(crate) fn api(ctx: Arc<AppContext>) -> Router<BoxRoute> {
    Router::new()
        .route("/ask", get(ask))
        .route("/act", post(act))
        .layer(AddExtensionLayer::new(ctx))
        .boxed()
}


pub(crate) async fn ask(
    Query(qs): Query<HashMap<String, String>>,
    Extension(context): Extension<Arc<AppContext>>,
) -> (HeaderMap, Vec<u8>) {
    let mut headers = HeaderMap::new();
    //let qs=format!("{:?}",qs);
    //let bytes=Vec::from(json!({"ask":qs}).to_string());

    let empty = String::from("");
    let dbs = context.dbs.clone();
    let ask = qs.get("ask").unwrap_or(&empty).clone();
    let params = base16::base16_decode(qs.get("params").unwrap_or(&empty)).unwrap();
    let params: Value = serde_json::from_str(&params).unwrap();
    info!("ask={} params={}", ask, params);
    //let content_type = "application/json";
    let p1: JsonValue;
    let p2: JsonValue;
    let p3: JsonValue;
    let p4: JsonValue;
    let p5: JsonValue;
    let pof = |id| JsonValue::of(&params[id]);
    let static_path = context.config["config"]["static_path"].string("wwwroot");
    let da_webp_active = context.config["config"]["da_webp_active"].bool(false);
    let accept_webp = params["acceptWebp"].bool(false);
    let da_webp_quality = if da_webp_active && accept_webp {
        context.config["config"]["da_webp_quality"].i64(20) as i8
    } else {
        DA_WEBP_DISABLE
    };
    let mut sql: String = "".into();
    let mut sql_params: Vec<&dyn ToSql> = Vec::new();
    let mut pending = true;
    let mut result: String = "null".into();
    let mut about_file: String = "".into();
    let mut voucher_id: &str = &empty;
    let mut attach: String = "".into();

    if ask == "@login" {
        sql = r"EXEC TM_OnlyOneLogin @P1,@P2".into();
        p1 = pof("userId");
        p2 = pof("password");
        sql_params = vec![&p1, &p2];
    } else if ask == "workload" {
        sql = "EXEC TM_WorkLoad @P1,@P2,@P3".into();
        p1 = pof("userName");
        p2 = pof("year");
        let more_where = if params["limitMonth"].bool(false) {
            let month_from = params["monthFrom"].i64(1);
            let month_to = params["monthTo"].i64(13);
            let month_to = if month_to < month_from {
                month_from
            } else {
                month_to
            };
            format!(
                " AND z.kjqj BETWEEN '{:02}' AND '{:02}'",
                month_from, month_to
            )
        } else {
            "".to_string()
        };
        //println!("moreWhere:{}",more_where);
        p3 = JsonValue::new(json!(more_where));
        sql_params = vec![&p1, &p2, &p3];
    } else if ask == "wujinFH" {
        sql = "EXEC dbo.TM_UpdateOracleWSZZ4WujinFH @P1,@P2".into();
        p1 = pof("p1");
        p2 = pof("p2");
        sql_params = vec![&p1, &p2];
    } else if ask == "salaryVoucher" {
        sql = "EXEC dbo.TM_MakeSalaryVoucher @P1,@P2".into();
        p1 = pof("period");
        p2 = pof("personType");
        sql_params = vec![&p1, &p2];
    } else if ask == "salaryVoucherBank" {
        sql = "EXEC dbo.TM_GetSalaryBankDetail @P1,@P2".into();
        p1 = pof("period");
        p2 = pof("personType");
        sql_params = vec![&p1, &p2];
    } else if ask == "salaryVoucherSheet" {
        sql = "EXEC dbo.TM_GetSalaryVoucher @P1".into();
        p1 = pof("batch");
        sql_params = vec![&p1];
    } else if ask == "checkncye" {
        sql = "EXEC dbo.TM_CheckNCYE @P1,@P2".into();
        p1 = pof("year");
        p2 = pof("tblname");
        sql_params = vec![&p1, &p2];
    } else if ask == "py2code" {
        sql = "EXEC dbo.TM_PY2Code @P1,@P2".into();
        p1 = pof("type");
        p2 = pof("code");
        sql_params = vec![&p1, &p2];
    } else if ask == "@aboutvoucher" {
        sql = "EXEC dbo.TM_AboutVoucher @P1".into();
        p1 = pof("pznm");
        sql_params = vec![&p1];
    } else if ask == "@voucherphotos" {
        voucher_id = params["voucherId"].str("");
        match da_read_about(voucher_id, &static_path, da_webp_quality).await {
            Ok((about_file_exists, read_result, about_file_name)) => {
                about_file = about_file_name;
                if about_file_exists {
                    result = format!("{{\"msg\":\"ok\", \"data\":{}}}", read_result);
                    pending = false;
                }
            }
            Err(e) => {
                result = format!("{{\"msg\":\"{:?}\"}}", e);
                pending = false;
            }
        }
        if pending {
            sql = "EXEC dbo.TM_VoucherPhotos @P1".into();
            p1 = pof("voucherId");
            sql_params = vec![&p1];
        }
    } else if ask == "@aboutreceipt" {
        sql = "EXEC dbo.TM_AboutReceipt @P1,@P2,@P3,@P4,@P5".into();
        p1 = pof("id");
        p2 = pof("checkSum");
        p3 = pof("datePaid");
        p4 = pof("amount");
        p5 = pof("checker");
        sql_params = vec![&p1, &p2, &p3, &p4, &p5];
    } else if ask == "payee" {
        sql = "EXEC dbo.TM_QueryPayee @P1,@P2".into();
        p1 = pof("bankName");
        p2 = pof("bankAcct");
        sql_params = vec![&p1, &p2];
    } else if ask == "ledger" || ask == "voucher" {
        // CREATE PROCEDURE dbo.TM_QueryLedgerExt
        // @起始年 INT,@终止年 INT,@查询条件 VARCHAR(4096),@排序 VARCHAR(80)='日期,凭证号,笔号',
        // @借贷对冲 BIT=0,@隐藏负值 BIT=0,@Select VARCHAR(250)='*'
        let params = parse_param::params_convert(&context.config, &ask, &params);
        let pof = |id| JsonValue::of(&params[id]);
        let only_sum_line = if ask == "ledger" { ",1" } else { ",0" };
        sql = "EXEC dbo.TM_QueryLedgerExt @P1,@P2,@P3,@P4,0,0,@P5".to_string() + only_sum_line;
        p1 = pof("yearFrom");
        p2 = pof("yearTo");
        p3 = pof("filter");
        p4 = pof("orderby");
        p5 = pof("select");
        sql_params = vec![&p1, &p2, &p3, &p4, &p5];
    } else if ask == "balance" {
        // CREATE PROCEDURE dbo.TM_QueryBalanceExt
        // @起始年 INT,@终止年 INT,@查询条件 VARCHAR(4096),@期初条件 VARCHAR(4096)=NULL,
        // @年初条件 VARCHAR(4096)=NULL,@余额条件 VARCHAR(250)=NULL,
        // @顶层 VARCHAR(10)='科目1级',@底层 VARCHAR(10)='科目4级',
        // @合并 INT=NULL,@合计 BIT=0,@仅底层 BIT=0,@仅编码 BIT=0,@倍率 INT=1,@查项目余额 BIT=0
        let params = parse_param::params_convert(&context.config, &ask, &params);
        let pof = |id| JsonValue::of(&params[id]);
        sql = format!(
            "EXEC dbo.TM_QueryBalanceExt @P1,@P2,@P3,@P4,@P5{}",
            params["params_in_sql"].str("")
        );
        p1 = pof("yearFrom");
        p2 = pof("yearTo");
        p3 = pof("filter");
        p4 = pof("filter_qc");
        p5 = pof("filter_nc");
        sql_params = vec![&p1, &p2, &p3, &p4, &p5];
    }
    if pending {
        let result_json = if !sql.is_empty() {
            let row_is_obj = ask.starts_with('@');
            let result = database::query(dbs, &sql, &sql_params, row_is_obj).await;
            match result {
                Ok(result) => {
                    if ask == "ledger" {
                        let params =
                            parse_param::params_convert(&context.config, "voucher", &params);
                        json!({ "msg":"ok","voucherColDefs":params["select"], "data":result})
                    } else if ask == "@voucherphotos" {
                        let row_count = result["rowCount"].u64(0);
                        if row_count > 0 {
                            let result = download_photos(context, result, da_webp_quality).await;
                            match da_write_about(&about_file, &result).await {
                                Ok(_) => json!({ "msg":"ok", "data":result}),
                                Err(e) => json!({ "msg": format!("{:?}", e) }),
                            }
                        } else {
                            json!({
                                "msg": format!("没有找到凭证{}的影像资料", voucher_id)
                            })
                        }
                    } else if ask == "salaryVoucherSheet" {
                        let empty_vec: Vec<Value> = Vec::new();
                        let sheet = result["rows"].as_array().unwrap_or(&empty_vec);
                        let sheet = sheet
                            .iter()
                            .map(|x| x.get(0).unwrap_or(&Value::Null).string(""))
                            .fold("".to_string(), |lines, line| lines + &line + "\r\n");
                        attach = sheet;
                        json!("attachment")
                    } else {
                        json!({ "msg":"ok", "data":result})
                    }
                }
                Err(err) => {
                    json!({ "msg": format!("{:?}", err) })
                }
            }
        } else if ask == "checkgwk" {
            let check_all = params["checkAll"].bool(false);
            ccb_gwk::check_gwk(&context.config, check_all)
                .await
                .unwrap()
        } else {
            json!({ "msg": format!("unknown ask {} params:{}", ask, params.to_string()) })
        };
        result = format!("{}", result_json);
    }
    if ask == "salaryVoucherSheet" {
        let file_name = params["fileName"].str("凭证");
        let value = format!("attachment;filename={}.txt", file_name);
        let bytes: Vec<u8> = string_to_gb18030bytes(&attach).unwrap_or_default();
        //reply::with_header(bytes, "Content-disposition", value)
        headers.insert(
            HeaderName::from_static("content-type"),
            HeaderValue::from_static("text/plain"),
        );
        headers.insert(
            HeaderName::from_static("content-disposition"),
            HeaderValue::from_str(&value).unwrap(),
        );
        (headers, bytes)
    } else {
        let bytes: Vec<u8> = result.into_bytes();
        //reply::with_header(bytes, "content-type", content_type)
        headers.insert(
            HeaderName::from_static("content-type"),
            HeaderValue::from_static("application/json"),
        );
        (headers, bytes)
    }
}

async fn act(
    Form(qs): Form<HashMap<String, String>>,
    Extension(context): Extension<Arc<AppContext>>,
) -> Json<Value> {
    let empty = String::from("");
    let dbs = context.dbs.clone();
    let act = qs.get("act").unwrap_or(&empty).clone();
    let params = base16::base16_decode(qs.get("params").unwrap_or(&empty)).unwrap();
    let params: Value = serde_json::from_str(&params).unwrap_or(Value::Null);
    info!("act={} params={}", act, params);
    //let content_type = "application/json";
    let sql: String;
    let p1: JsonValue;
    let p2: JsonValue;
    let p3: JsonValue;
    let p4: JsonValue;
    let p5: JsonValue;
    let sql_params: Vec<&dyn ToSql>;
    let pof = |id| JsonValue::of(&params[id]);
    if act == "exam" {
        sql = r"EXEC dbo.TM_Exam @P1,@P2,@P3,@P4".into();
        p1 = pof("ids");
        p2 = pof("fhr");
        p3 = pof("fhrId");
        p4 = pof("isUndo");
        sql_params = vec![&p1, &p2, &p3, &p4];
    } else if act == "changepayee" {
        sql = "EXEC dbo.TM_ChangePayee @P1,@P2,@P3,@P4,@P5".into();
        p1 = pof("bankName");
        p2 = pof("bankAcct");
        p3 = pof("unitCode");
        p4 = pof("updateDate");
        p5 = pof("mark");
        sql_params = vec![&p1, &p2, &p3, &p4, &p5];
    } else if act == "execsql" || act == "@execsql" {
        sql = params["sql"].string("");
        sql_params = Vec::new();
    } else {
        sql = "".into();
        sql_params = Vec::new();
    }
    let result = if !sql.is_empty() {
        let row_is_obj = act.starts_with('@');
        let result = database::query(dbs, &sql, &sql_params, row_is_obj).await;
        match result {
            Ok(result) => {
                json!({ "msg":"ok", "data":result})
            }
            Err(err) => {
                json!({ "msg": format!("{:?}", err) })
            }
        }
    } else {
        json!({ "msg": format!("unknown act {} params:{}", act, params.to_string()) })
    };
    Json(result)
}

评论区

写评论
jmy10241024 2021-09-09 17:57

存储过程...........

douchuan 2021-09-04 11:58

cool more axum in production is welcome

sunli829 2021-09-03 20:02

虽然看不太懂,不过代码写得不错,挺工整。👍🏿

1 共 3 条评论, 1 页