< 返回版块

zhuxiujia 发表于 2026-05-10 15:07

Tags:Rbatis,Turso

Rbatis + Turso:在 Rust 生态中构建 AI Agent 与 RAG 应用

引言

随着大语言模型(LLM)的爆发式增长,AI Agent 和 RAG(Retrieval-Augmented Generation)架构已经成为构建智能应用的主流范式。RAG 的核心在于向量搜索——将文档转换为向量嵌入(Embedding),然后通过相似度检索找到最相关的内容,辅助 LLM 生成更准确的回答。

Rust 生态中,Rbatis(高性能 Rust ORM)和 Turso(用 Rust 重写的兼容SQLite的嵌入式数据库,支持原生向量搜索)的组合为 AI 应用提供了独特的优势:Rbatis 的编译时 SQL 生成带来极致的性能,Turso 原生的向量搜索能力避免了额外引入向量数据库的复杂性。本文将深入探讨如何将 Rbatis + Turso 应用于 AI Agent 和 RAG 业务中。


认识两个主角

Rbatis — 高性能 Rust ORM

Rbatis 是一个基于编译时代码生成的 Rust ORM 框架,核心特点包括:

  • 动态 SQL 编译为原生 Rust 代码,达到手写 SQL 的性能水平
  • 零运行时开销,所有 SQL 解析和优化在编译时完成
  • 支持多种数据库:MySQL、PostgreSQL、SQLite、Turso、DuckDB 等
  • 多 SQL 构建方式:py_sql、html_sql(类似 MyBatis)、原始 SQL
  • 完整的 CRUD 宏,一行代码自动生成增删改查
[dependencies]
rbatis = { version = "4.8" }
rbdc-turso = { version = "4" }
rbs = { version = "4" }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }

Turso — 用 Rust 重写的 SQLite,内置原生向量搜索

Turso 是对 SQLite 的从头重写(ground-up rewrite in Rust),100% 兼容 SQLite 的同时,增加了并发写入、向量搜索、云原生访问等能力。它的杀手级特性是原生向量搜索——无需任何扩展或插件,开箱即用。

Turso 支持多种向量类型:

向量类型 函数 每维精度 适用场景
Dense vector32 32-bit float 大多数 ML Embedding(OpenAI、Sentence Transformers)
Dense vector64 64-bit float 需要更高精度的应用
Sparse vector32_sparse 仅存储非零值 + 索引 TF-IDF、Bag-of-Words
Quantized vector8 1 字节 大规模搜索、4x 压缩
Binary vector1bit 1 bit 近似近邻搜索、32x 压缩

支持的相似度距离函数:

函数 说明 最佳场景
vector_distance_cos 余弦距离 Text Embedding、文档相似度
vector_distance_l2 欧几里得距离 图像 Embedding、空间数据
vector_distance_dot 负点积 归一化 Embedding、MIPS
vector_distance_jaccard Jaccard 距离 稀疏向量、TF-IDF

Rbatis + Turso 基础集成

连接配置

rbdc-turso 驱动支持三种连接方式:

use rbatis::RBatis;
use rbdc_turso::TursoDriver;

#[tokio::main]
async fn main() -> Result<(), rbatis::Error> {
    let rb = RBatis::new();

    // 方式 1:内存数据库
    // rb.init(TursoDriver {}, "turso://:memory:")?;

    // 方式 2:本地文件
    // rb.init(TursoDriver {}, "turso://target/rag.db")?;

    // 方式 3:远程 Turso 数据库(生产环境)
    let turso_url = std::env::var("TURSO_URL").unwrap_or_default();
    let turso_token = std::env::var("TURSO_TOKEN").unwrap_or_default();
    rb.init(
        TursoDriver {},
        &format!("turso://?url={}&token={}", turso_url, turso_token)
    )?;

    Ok(())
}

定义实体与表结构

对于 RAG 场景,我们需要存储文档内容和对应的向量 Embedding:

use serde::{Deserialize, Serialize};
use rbatis::crud;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Document {
    pub id: Option<i64>,
    pub title: Option<String>,
    pub content: Option<String>,
    pub embedding: Option<Vec<u8>>,  // Turso 中 BLOB 类型存储向量
}

// 自动生成 CRUD 方法
crud!(Document{});

/// 创建文档表的 SQL
const CREATE_DOCUMENTS_TABLE: &str = "
CREATE TABLE IF NOT EXISTS documents (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    title TEXT NOT NULL,
    content TEXT,
    embedding BLOB
);
";

AI Agent 场景:从知识库到智能决策

场景描述

假设我们正在构建一个开发者 AI 助手 Agent,它需要:

  1. 从技术文档库中检索相关信息
  2. 基于检索结果推理并回答开发者的问题
  3. 记忆对话历史中的上下文

第一步:构建向量知识库

use rbatis::RBatis;
use rbatis::rbdc::Error;

/// 初始化文档知识库表
pub async fn init_knowledge_base(rb: &RBatis) -> Result<(), Error> {
    rb.exec(CREATE_DOCUMENTS_TABLE, vec![]).await?;
    Ok(())
}

/// 插入文档及其向量 Embedding
pub async fn insert_document(
    rb: &RBatis,
    title: &str,
    content: &str,
    embedding: &[f32],  // 来自 Embedding 模型的输出
) -> Result<(), Error> {
    // 将 f32 切片转换为 Turso vector32 要求的 BLOB 格式
    // 在实际应用中,这里需要将 f32 数组序列化为字节
    let embedding_blob = embedding
        .iter()
        .flat_map(|f| f.to_le_bytes())
        .collect::<Vec<u8>>();

    let doc = Document {
        id: None,
        title: Some(title.to_string()),
        content: Some(content.to_string()),
        embedding: Some(embedding_blob),
    };

    Document::insert(rb, &doc).await?;
    Ok(())
}

注意:Turso 的 vector32() 函数接受 JSON 数组格式的字符串。你可以在 SQL 层面直接使用 vector32('[0.1, 0.2, ...]') 进行转换,或者从 Rust 侧序列化后写入 BLOB 列。

第二步:向量相似度搜索

Rbatis 支持原始 SQL 执行,可以充分利用 Turso 的向量搜索函数:

use rbatis::rbdc::types::Json;
use rbs::to_value;

/// 语义搜索:根据查询向量找到最相似的文档
pub async fn semantic_search(
    rb: &RBatis,
    query_embedding: &[f32],
    limit: u32,
) -> Result<Vec<Document>, Error> {
    // 构造查询向量字符串
    let query_vec_str = query_embedding
        .iter()
        .map(|f| f.to_string())
        .collect::<Vec<_>>()
        .join(",");

    let sql = format!(
        r#"
        SELECT
            id, title, content,
            vector_distance_cos(embedding, vector32('[{}]')) AS distance
        FROM documents
        ORDER BY distance
        LIMIT {}
        "#,
        query_vec_str, limit
    );

    // 使用 Rbatis 执行原始 SQL
    let docs: Vec<Document> = rb
        .fetch_decode(&sql, vec![])
        .await?;

    Ok(docs)
}

vector_distance_cos 返回 0-2 之间的值,越小表示越相似。你也可以使用 vector_distance_l2(欧几里得距离)或 vector_distance_dot(负点积)。

第三步:AI Agent 的完整工作流

/// AI Agent 的问答工作流
pub async fn agent_answer(
    rb: &RBatis,
    user_question: &str,
    embedding_model: &dyn EmbeddingService,  // Embedding 服务接口
    llm_service: &dyn LLMService,            // LLM 服务接口
) -> Result<String, Error> {
    // 1. 将用户问题转为向量
    let query_embedding = embedding_model
        .embed(user_question)
        .await;

    // 2. 从知识库中检索最相关的文档
    let relevant_docs = semantic_search(rb, &query_embedding, 5).await?;

    // 3. 构建提示词上下文
    let context: String = relevant_docs
        .iter()
        .filter_map(|d| d.content.as_deref())
        .collect::<Vec<_>>()
        .join("\n---\n");

    // 4. 调用 LLM 生成回答
    let prompt = format!(
        "基于以下知识库内容回答问题:\n\n知识库:\n{}\n\n问题:{}\n\n回答:",
        context, user_question
    );

    let answer = llm_service
        .generate(&prompt)
        .await;

    Ok(answer)
}

RAG 实战:完整示例

项目结构

rag-demo/
├── Cargo.toml
└── src/
    └── main.rs

Cargo.toml

[package]
name = "rag-demo"
version = "0.1.0"
edition = "2021"

[dependencies]
rbatis = { version = "4.8" }
rbdc-turso = { version = "4" }
rbs = { version = "4" }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }  # 调用 Embedding API
serde_json = "1"
fast_log = "1.6"

完整代码

use rbatis::crud;
use rbatis::rbdc::Error;
use rbatis::RBatis;
use serde::{Deserialize, Serialize};

// ========== 实体定义 ==========

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Document {
    pub id: Option<i64>,
    pub title: Option<String>,
    pub content: Option<String>,
    pub embedding: Option<Vec<u8>>,
}

crud!(Document{});

// ========== Embedding 服务 ==========

/// 一个简单的 Embedding 服务接口(以 OpenAI API 为例)
pub struct OpenAIEmbedding {
    api_key: String,
    model: String,
}

impl OpenAIEmbedding {
    pub fn new(api_key: &str) -> Self {
        Self {
            api_key: api_key.to_string(),
            model: "text-embedding-3-small".to_string(),
        }
    }

    pub async fn embed(&self, text: &str) -> Vec<f32> {
        // 这里简化处理,实际需要 HTTP 调用
        // 生产环境请使用 reqwest 调用 OpenAI /embedding API
        println!("Embedding: {}", &text[..text.len().min(50)]);
        // 返回一个模拟的 1536 维向量(text-embedding-3-small 的维度)
        vec![0.1_f32; 1536]
    }

    pub async fn embed_batch(&self, texts: &[&str]) -> Vec<Vec<f32>> {
        let mut results = Vec::new();
        for text in texts {
            results.push(self.embed(text).await);
        }
        results
    }
}

// ========== 数据库操作 ==========

const CREATE_DOCUMENTS_TABLE: &str = "
CREATE TABLE IF NOT EXISTS documents (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    title TEXT NOT NULL,
    content TEXT,
    embedding BLOB
);
";

pub async fn init_db(rb: &RBatis) -> Result<(), Error> {
    rb.exec(CREATE_DOCUMENTS_TABLE, vec![]).await?;
    println!("[OK] 文档表初始化完成");
    Ok(())
}

pub async fn add_document(
    rb: &RBatis,
    title: &str,
    content: &str,
    embedding: &[f32],
) -> Result<(), Error> {
    let blob = embedding
        .iter()
        .flat_map(|f| f.to_le_bytes())
        .collect::<Vec<u8>>();

    let doc = Document {
        id: None,
        title: Some(title.to_string()),
        content: Some(content.to_string()),
        embedding: Some(blob),
    };

    Document::insert(rb, &doc).await?;
    println!("[OK] 文档已入库: {}", title);
    Ok(())
}

/// 批量添加文档
pub async fn batch_add_documents(
    rb: &RBatis,
    docs: &[(&str, &str)],
    embed_service: &OpenAIEmbedding,
) -> Result<(), Error> {
    let titles: Vec<&str> = docs.iter().map(|(t, _)| *t).collect();
    let contents: Vec<&str> = docs.iter().map(|(_, c)| *c).collect();

    let embeddings = embed_service.embed_batch(&contents).await;

    for ((title, content), embedding) in docs.iter().zip(embeddings.iter()) {
        add_document(rb, title, content, embedding).await?;
    }
    Ok(())
}

/// 向量相似度搜索
pub async fn search_similar(
    rb: &RBatis,
    query_embedding: &[f32],
    limit: u32,
) -> Result<Vec<(Document, f64)>, Error> {
    let vec_str = query_embedding
        .iter()
        .map(|f| f.to_string())
        .collect::<Vec<_>>()
        .join(",");

    let sql = format!(
        r#"
        SELECT
            id, title, content, embedding,
            vector_distance_cos(embedding, vector32('[{}]')) AS distance
        FROM documents
        ORDER BY distance
        LIMIT {}
        "#,
        vec_str, limit
    );

    // 注意:这里的 fetch_decode 需要自定义 Result 类型来包含 distance
    // 为简化演示,这里直接使用 exec 并打印结果
    let result = rb.exec_decode::<serde_json::Value>(&sql, vec![]).await?;
    println!("搜索结果: {}", serde_json::to_string_pretty(&result).unwrap());

    Ok(vec![])
}

// ========== RAG 查询流程 ==========

pub async fn rag_query(
    rb: &RBatis,
    question: &str,
    embed_service: &OpenAIEmbedding,
) -> Result<(), Error> {
    println!("\n=== RAG 查询 ===");
    println!("问题: {}", question);

    // 1. 将问题转为向量
    let query_vec = embed_service.embed(question).await;
    println!("[1/3] 问题向量化完成");

    // 2. 检索相似文档
    let _results = search_similar(rb, &query_vec, 3).await?;
    println!("[2/3] 向量检索完成");

    // 3. 在实际应用中,将检索结果作为 context 传给 LLM
    println!("[3/3] 等待 LLM 生成回答...");
    println!("=== 结束 ===\n");

    Ok(())
}

// ========== AI Agent:带记忆的多轮对话 ==========

/// 对话消息实体 — crud! 宏自动生成 insert / insert_batch 等方法
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ConversationMessage {
    pub id: Option<i64>,
    pub session_id: Option<String>,
    pub turn_index: Option<i64>,
    pub role: Option<String>,
    pub content: Option<String>,
    pub created_at: Option<String>,
}

crud!(ConversationMessage{});

pub struct ConversationMemory {
    pub messages: Vec<ConversationMessage>,
}

impl ConversationMemory {
    pub fn new() -> Self {
        Self { messages: vec![] }
    }

    pub fn add(&mut self, session_id: &str, turn_index: i64, role: &str, content: &str) {
        self.messages.push(ConversationMessage {
            id: None,
            session_id: Some(session_id.to_string()),
            turn_index: Some(turn_index),
            role: Some(role.to_string()),
            content: Some(content.to_string()),
            created_at: None,
        });
    }

    /// 从记忆中获取最近 N 轮的对话摘要(用于构造 prompt)
    pub fn recent_context(&self, n: usize) -> String {
        self.messages
            .iter()
            .rev()
            .take(n * 2)
            .rev()
            .map(|m| format!("{}: {}", m.role.as_deref().unwrap_or(""), m.content.as_deref().unwrap_or("")))
            .collect::<Vec<_>>()
            .join("\n")
    }

    /// 将记忆持久化到 Turso — 使用 crud! 宏生成的 insert 方法
    pub async fn save_to_db(&self, rb: &RBatis) -> Result<(), Error> {
        // 建表(仅首次需要)
        rb.exec(
            "CREATE TABLE IF NOT EXISTS rbatis_conversation_message (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id TEXT,
                turn_index INTEGER,
                role TEXT,
                content TEXT,
                created_at TEXT DEFAULT (datetime('now'))
            )",
            vec![],
        )
        .await?;

        // 使用 crud! 宏的 insert_batch 批量写入,无需手写 SQL
        ConversationMessage::insert_batch(rb, &self.messages, 50).await?;
        Ok(())
    }

    /// 从数据库加载历史对话 — 仍然使用 crud! 宏的 select_by_map
    pub async fn load_from_db(rb: &RBatis, session_id: &str) -> Result<Self, Error> {
        let messages = ConversationMessage::select_by_map(
            rb,
            value! {"session_id": session_id},
        )
        .await?;
        Ok(Self { messages })
    }
}

pub async fn agent_chat(
    rb: &RBatis,
    session_id: &str,
    turn: i64,
    question: &str,
    memory: &mut ConversationMemory,
    embed_service: &OpenAIEmbedding,
) -> Result<String, Error> {
    // 1. 检索知识库
    let query_vec = embed_service.embed(question).await;
    let docs = search_similar(rb, &query_vec, 3).await?;

    // 2. 构建上下文(检索结果 + 对话记忆)
    let knowledge_context: String = /* 从 docs 中提取 */ "Rbatis 是一个高性能 Rust ORM...".to_string();
    let memory_context = memory.recent_context(3);

    // 3. 在实际应用中,这里构造最终 Prompt 并发给 LLM
    let _final_prompt = format!(
        "知识库上下文:\n{}\n\n对话历史:\n{}\n\n用户问题:{}",
        knowledge_context, memory_context, question
    );

    // 4. 模拟 LLM 回答
    let answer = format!(
        "基于知识库,关于「{}」的回答是:这是一个很好的问题,建议参考 Rbatis 官方文档获取详细信息。",
        question
    );

    // 5. 使用 crud! 宏更新对话记忆
    memory.add(session_id, turn * 2, "user", question);
    memory.add(session_id, turn * 2 + 1, "assistant", &answer);

    Ok(answer)
}

// ========== 主程序 ==========

#[tokio::main]
async fn main() -> Result<(), Error> {
    fast_log::init(fast_log::Config::new().console()).expect("日志初始化失败");

    // 初始化 Rbatis + Turso
    let rb = RBatis::new();
    rb.init(rbdc_turso::TursoDriver {}, "turso://target/rag_demo.db")?;
    println!("[OK] 连接 Turso 数据库成功");

    // 初始化表结构
    init_db(&rb).await?;

    // 初始化 Embedding 服务
    let embed = OpenAIEmbedding::new("sk-your-api-key");

    // ===== 构建知识库 =====
    let documents = vec![
        (
            "Rbatis ORM 入门",
            "Rbatis 是一个基于编译时代码生成的高性能 Rust ORM 框架,支持 MySQL、PostgreSQL、SQLite、Turso 等多种数据库。"
        ),
        (
            "Rbatis 动态 SQL",
            "Rbatis 支持 py_sql 和 html_sql 两种动态 SQL 构建方式,其中 html_sql 类似 MyBatis 的 XML 模板风格。"
        ),
        (
            "Turso 向量搜索",
            "Turso 原生支持向量搜索,提供 vector32、vector64、vector8 等类型,支持余弦距离、欧几里得距离等相似度函数。"
        ),
        (
            "Rbatis + Turso 集成",
            "通过 rbdc-turso 驱动,Rbatis 可以无缝对接 Turso 数据库,同时利用 Turso 的向量搜索能力构建 RAG 应用。"
        ),
        (
            "AI Agent 架构",
            "AI Agent 通过 ReAct 模式进行推理和行动,结合 RAG 技术可以从知识库中检索相关信息辅助决策。"
        ),
    ];

    batch_add_documents(&rb, &documents, &embed).await?;

    // ===== RAG 查询演示 =====
    rag_query(&rb, "如何在 Rust 中使用 Rbatis 进行向量搜索?", &embed).await?;

    // ===== AI Agent 多轮对话演示 =====
    let session_id = "session-001";
    let mut memory = ConversationMemory::new();

    let answer1 = agent_chat(
        &rb,
        session_id,
        1,
        "Rbatis 支持哪些数据库?",
        &mut memory,
        &embed,
    ).await?;
    println!("Agent: {}", answer1);

    let answer2 = agent_chat(
        &rb,
        session_id,
        2,
        "如何使用 Turso 的向量搜索功能?",
        &mut memory,
        &embed,
    ).await?;
    println!("Agent: {}", answer2);

    // 持久化对话记忆 — 全部由 crud! 宏处理,无需手写 SQL
    memory.save_to_db(&rb).await?;
    println!("[OK] 对话历史已保存到 Turso(使用 crud! 宏)");

    Ok(())
}

架构总览

以下是 Rbatis + Turso 在 AI Agent / RAG 系统中的完整架构:

┌─────────────────────────────────────────────────────┐
│                    用户应用                           │
│  ┌──────────┐  ┌──────────┐  ┌───────────────────┐ │
│  │ AI Agent │  │  RAG 查询 │  │  多轮对话管理     │ │
│  └────┬─────┘  └────┬─────┘  └────────┬──────────┘ │
└───────┼──────────────┼────────────────┼────────────┘
        │              │                │
        ▼              ▼                ▼
┌─────────────────────────────────────────────────────┐
│                   Rbatis ORM                         │
│  编译时 SQL 生成 · CRUD 宏 · 动态 SQL · 连接池        │
└──────────────────────┬──────────────────────────────┘
                       │
                       ▼
┌─────────────────────────────────────────────────────┐
│              rbdc-turso 驱动                          │
│    Turso 原生协议 · Rust SQLite 重写 · 类型映射          │
└──────────────────────┬──────────────────────────────┘
                       │
                       ▼
┌─────────────────────────────────────────────────────┐
│                  Turso 数据库                         │
│  ┌────────────┐  ┌────────────┐  ┌────────────────┐ │
│  │ 文档表      │  │ 向量搜索    │  │ 对话记忆表     │ │
│  │ documents  │  │ vector32   │  │ conversations │ │
│  │ · content  │  │ · cos/L2   │  │ · session_id  │ │
│  │ · embed    │  │ · dot/Jacc │  │ · content     │ │
│  └────────────┘  └────────────┘  └────────────────┘ │
└─────────────────────────────────────────────────────┘

最佳实践与注意事项

1. 向量维度管理

  • OpenAI text-embedding-3-small 输出 1536 维向量
  • OpenAI text-embedding-3-large 输出 3072 维向量
  • Turso 支持最大 65536 维,完全满足需求
  • 对于 1536 维向量,每条记录约 6 KB 存储

2. Embedding 生成策略

  • 文档入库时:将文档分块(Chunk),每块分别生成 Embedding
  • 查询时:将用户问题实时生成 Embedding
  • 建议分块大小:256-512 tokens,重叠 20-50 tokens

3. 性能优化

  • Turso 的向量搜索当前使用线性扫描(无索引)
  • 对于大规模数据,使用 WHERE 子句预过滤(如按分类、标签等)
  • 考虑使用 vector8 量化类型,压缩 4 倍且精度损失可接受
  • Rbatis 的编译时 SQL 确保了查询本身的零开销

4. 生产环境注意事项

  • 使用环境变量管理 Turso URL 和 Token
  • 对话记忆建议增加 TTL 或定期清理
  • 考虑引入缓存层减少 Embedding API 调用
  • 使用连接池(FastPool 默认)管理数据库连接
// 生产环境连接配置
rb.init(
    rbdc_turso::TursoDriver {},
    &format!(
        "turso://?url={}&token={}",
        std::env::var("TURSO_URL").expect("TURSO_URL 未设置"),
        std::env::var("TURSO_TOKEN").expect("TURSO_TOKEN 未设置")
    )
)?;

总结

Rbatis + Turso 的组合为 Rust 生态中的 AI 应用带来了独特的价值:

维度 优势
性能 Rbatis 编译时 SQL 生成 + Turso 原生向量搜索,真正的零开销抽象
简洁性 无需引入额外的向量数据库(如 Pinecone、Milvus),数据和向量在一起
统一性 ORM 和向量搜索使用同一数据库,事务、ACID 保障天然支持
可维护性 纯 Rust 实现,类型安全,编译期检查,减少运行时错误
成本 Turso 提供免费额度,适合原型验证和小规模生产

对于 Rust 开发者来说,这意味着可以用最少的依赖和最熟悉的方式,快速构建具备语义理解和检索能力的 AI Agent 与 RAG 应用。Rbatis 的 ORM 能力处理结构化数据,Turso 的向量搜索处理非结构化语义检索,两者携手为 AI 应用提供了坚实的数据基座。


额外话题:rbatis-py — Python 场景下的 Rbatis

如果你们的 AI Agent 技术栈用了 Python(如 LangChain、LlamaIndex 等 LLM 框架),但又想享受 Rbatis 的高性能和 Turso 的向量搜索,可以用 rbatis-py

rbatis-py 是 Rbatis 的 Python 绑定库(基于 PyO3 / Maturin 构建),让你在 Python 中也能使用相同的 ORM 能力。

安装

pip install rbatis-py

要求 Python ≥ 3.8。

什么时候必须用 rbatis-py?

场景 推荐方案 原因
纯 Rust 项目 直接使用 rbatis crate 零额外开销
Python AI 框架 + Rust 性能需求 rbatis-py LangChain/LlamaIndex 生态在 Python
快速原型 / 数据科学 rbatis-py Python 胶水代码更灵活
混合技术栈(Python 编排 + Rust 核心) rbatis-py 统一 ORM 层,减少心智负担

Python 版 RAG 查询示例

import asyncio
import json
from rbatis_py import RBatis, Model

class Document(Model):
    __table__ = "documents"
    id: int | None = None
    title: str | None = None
    content: str | None = None
    embedding: bytes | None = None  # Turso BLOB

async def semantic_search(db: RBatis, query_vec: list[float], limit: int = 3):
    """使用 Turso 向量搜索 — 手写 SQL 调用 vector_distance_cos"""
    vec_str = ",".join(str(v) for v in query_vec)
    sql = f"""
        SELECT id, title, content,
               vector_distance_cos(embedding, vector32('[{vec_str}]')) AS distance
        FROM documents
        ORDER BY distance
        LIMIT {limit}
    """
    return await db.exec_decode(sql)

async def main():
    db = RBatis()
    # 连接 Turso 远程数据库
    await db.link("libsql://your-db.turso.io?token=YOUR_TOKEN")

    # --- 常规操作:使用 crud! 宏等价物(内置 Model 方法)---
    await Document.insert(db, {
        "title": "Rbatis Python 绑定",
        "content": "rbatis-py 让 Python 也能用上 Rbatis 的性能...",
        "embedding": None  # 这里应填入向量 BLOB
    })

    rows = await Document.select_by_map(db, {"title": "Rbatis Python 绑定"})
    print(rows)

    # --- 向量搜索:和 Rust 端一样,必须手写 SQL ---
    fake_vec = [0.1] * 1536
    results = await semantic_search(db, fake_vec)

    for row in results:
        print(f"  [{row['distance']:.4f}] {row['title']}")

    await db.close()

asyncio.run(main())

可以看到,即使换成 Python,核心模式没变:

  • 常规 CRUD → 用 Model.insert() / Model.select_by_map()(等价于 Rust 的 crud! 宏)
  • 向量搜索 → 同样需要手写 SQL 调用 vector_distance_cos()
  • 事务支持async with db.begin_defer() 自动管理

如果你正在用 Python 做 AI Agent 编排,但数据库层想用 Rbatis 的性能优势,rbatis-py 是理想的桥梁。安装方式:pip install rbatis-py,项目地址:https://github.com/rbatis/rbatis-py


参考资料


Ext Link: https://rbatis.github.io/rbatis.io

评论区

写评论

还没有评论

1 共 0 条评论, 1 页