使用 Rust、OpenAI 和 Qdrant 构建 Agentic RAG

在本文中,我们将讨论如何使用 Rust 构建代理 RAG 工作流!我们将构建一个代理,它可以获取 CSV 文件、对其进行解析并将其嵌入到 Qdrant 中,以及从 Qdrant 中检索相关嵌入以回答用户有关 CSV 文件内容的问题。

有兴趣部署还是只想看看最终的代码是什么样的?您可以在这里找到存储库。

什么是 Agentic RAG? Agentic RAG(即 Agentic Retrieval Augmented Generation)是将 AI 代理与 RAG 相结合的概念,以便能够生成比代理工作流程更能适应特定用例的工作流程。

本质上,此工作流程与常规代理工作流程之间的区别在于,每个代理都可以单独访问来自矢量数据库的嵌入,以便能够检索上下文相关的数据 - 从而在 AI 代理工作流程中获得更准确的答案!

入门 首先,使用cargo shuttle init创建一个新项目。

接下来,我们将使用 shell 代码片段添加所需的依赖项:

cargo add anyhow
cargo add async-openai
cargo add qdrant-client
cargo add serde -F derive
cargo add serde-json
cargo add shuttle-qdrant
cargo add uuid -F v4

我们还需要确保拥有 Qdrant URL 和 API 密钥以及 OpenAI API 密钥。Shuttle 通过SecretStore主函数中的宏使用环境变量,并且可以存储在Secrets.toml文件中: OPENAI_API_KEY = ""

接下来,我们将更新主函数以包含 Qdrant 宏和 secrets 宏。我们将遍历每个 secret 并将其设置为环境变量 - 这使我们能够全局使用 secret,而无需引用变量SecretStore:

#[shuttle_runtime::main]
async fn main(
    #[shuttle_qdrant::Qdrant] qdrant_client: QdrantClient,
    #[shuttle_runtime::Secrets] secrets: SecretStore,
) -> shuttle_axum::ShuttleAxum {
    secrets.into_iter().for_each(|x| {
        set_var(x.0, x.1);
    });

    let router = Router::new()
        .route("/", get(hello_world));

    Ok(router.into())
}

构建代理 RAG 工作流程 设置我们的代理 代理本身非常简单:它拥有一个 OpenAI 客户端和一个 Qdrant 客户端,以便能够搜索相关的文档嵌入。您还可以在此处添加其他字段,具体取决于您的代理所需的功能。

use async_openai::{config::OpenAIConfig, Client as OpenAIClient};
use qdrant_client::prelude::QdrantClient;

pub struct MyAgent {
    openai_client: OpenAIClient<OpenAIConfig>,
    qdrant_client: QdrantClient,
}

接下来,我们将创建一个辅助方法来创建代理,以及一个系统消息,稍后我们会将其输入到模型提示中。

static SYSTEM_MESSAGE: &str = "
        You are a world-class data analyst, specialising in analysing comma-delimited CSV files.

        Your job is to analyse some CSV snippets and determine what the results are for the question that the user is asking.

        You should aim to be concise. If you don't know something, don't make it up but say 'I don't know.'.
"

impl MyAgent {
    pub fn new(qdrant_client: QdrantClient) -> Self {
        let api_key = std::env::var("OPENAI_API_KEY").unwrap();
        let config = OpenAIConfig::new().with_api_key(api_key);

        let openai_client = OpenAIClient::with_config(config);

        Self {
            openai_client,
            qdrant_client,
        }
    }
}

文件解析并嵌入到 Qdrant 接下来,我们将实现一个File用于 CSV 文件解析的结构 - 它应该能够将文件路径、内容以及行保存为Vec(字符串数组,或更准确地说是字符串向量)。我们将行存储为的原因有几个Vec

  • 更小的区块可以提高检索的准确性,这是 RAG 面临的最大挑战之一。检索错误或不准确的文档会严重影响准确性。
  • 提高检索准确性可增强上下文相关性 - 这对于需要特定问题的复杂查询非常重要。
  • 处理和索引较小的块
pub struct File {
    pub path: String,
    pub contents: String,
    pub rows: Vec<String>,
}

impl File {
    pub fn new(path: PathBuf) -> Result<Self> {
        let contents = std::fs::read_to_string(&path)?;

        let path_as_str = format!("{}", path.display());

        let rows = contents
            .lines()
            .map(|x| x.to_owned())
            .collect::<Vec<String>>();

        Ok(Self {
            path: path_as_str,
            contents,
            rows
        })
    }
}
虽然上述解析方法很有用(将所有行收集到 中Vec),但请注意,这是一种简单的实现。根据您的 CSV 文件的分隔方式和/或是否有需要清理的脏数据,您可能需要准备数据以使其已经准备就绪,或者包括某种形式的数据清理或验证。这方面的一些示例可能是:
  • unicode-segmentation-用于拆分句子的库箱
  • csv_log_cleaner-用于清理 CSV 的二进制包
  • validator-用于验证结构/枚举字段的库包
接下来,我们将回到我们的代理并实现一种将文档嵌入到 Qdrant 中的方法,该方法将采用File我们定义的结构。

为此,我们需要执行以下操作:

  • 获取我们之前创建的行并将其添加为嵌入请求的输入。
  • 创建嵌入(使用 openAI)并创建有效载荷,用于与 Qdrant 中的嵌入一起存储。

请注意,虽然我们使用了uuid::Uuid唯一存储,但您也可以通过在结构中添加数字计数器并在插入嵌入后将其加 1 来轻松使用数字。 假设没有错误,返回Ok(())

use async_openai::types::{ CreateEmbeddingRequest, EmbeddingInput };
use async_openai::Embeddings;
use qdrant_client::prelude::{Payload, PointStruct};

static COLLECTION: &str = "my-collection";

// text-embedding-ada-002 is the model name from OpenAI that deals with embeddings
static EMBED_MODEL: &str = "text-embedding-ada-002";

impl MyAgent {
pub async fn embed_document(&self, file: File) -> Result<()> {
        if file.rows.is_empty() {
            return Err(anyhow::anyhow!("There's no rows to embed!"));
        }

        let request = CreateEmbeddingRequest {
            model: EMBED_MODEL.to_string(),
            input: EmbeddingInput::StringArray(file.rows.clone()),
            user: None,
            dimensions: Some(1536),
            ..Default::default()
        };

        let embeddings_result = Embeddings::new(&self.openai_client).create(request).await?;

        for embedding in embeddings_result.data {
            let payload: Payload = serde_json::json!({
                "id": file.path.clone(),
                "content": file.contents,
                "rows": file.rows
            })
            .try_into()
            .unwrap();

            println!("Embedded: {}", file.path);

            let vec = embedding.embedding;

            let points = vec![PointStruct::new(
                uuid::Uuid::new_v4().to_string(),
                vec,
                payload,
            )];
            self.qdrant_client
                .upsert_points(COLLECTION, None, points, None)
                .await?;
        }
        Ok(())
    }
}

文档搜索 现在我们已经嵌入了文档,我们需要一种方法来检查我们的嵌入是否与用户给出的提示具有上下文相关性。为此,我们将创建一个search_document执行以下操作的函数:

  • 使用嵌入提示CreateEmbeddingRequest并从结果中获取嵌入。我们将在文档搜索中使用此嵌入。因为我们只在这里添加了一个句子来嵌入(提示),所以它只会返回一个句子 - 因此我们可以从向量中创建一个迭代器并尝试找到第一个结果。
  • 通过结构体为我们的文档搜索创建一个参数列表SearchPoints(见下文)。在这里,我们需要设置集合名称、我们要搜索的向量(即输入)、如果有匹配项,我们希望返回多少个结果,以及有效载荷选择器。
  • 在数据库中搜索结果 - 如果没有结果,则返回错误;如果有结果,则返回结果。
use qdrant_client::qdrant::{
    with_payload_selector::SelectorOptions, SearchPoints, WithPayloadSelector,
};

impl MyAgent {
    async fn search_document(&self, prompt: String) -> Result<String> {
        let request = CreateEmbeddingRequest {
            model: EMBED_MODEL.to_string(),
            input: EmbeddingInput::String(prompt),
            user: None,
            dimensions: Some(1536),
            ..Default::default()
        };

        let embeddings_result = Embeddings::new(&self.openai_client).create(request).await?;

        let embedding = &embeddings_result.data.first().unwrap().embedding;

        let payload_selector = WithPayloadSelector {
            selector_options: Some(SelectorOptions::Enable(true)),
        };

        // set parameters for search
        let search_points = SearchPoints {
            collection_name: COLLECTION.to_string(),
            vector: embedding.to_owned(),
            limit: 1,
            with_payload: Some(payload_selector),
            ..Default::default()
        };

        // if the search is successful
        // attempt to iterate through the results vector and find a result
        let search_result = self.qdrant_client.search_points(&search_points).await?;
        let result = search_result.result.into_iter().next();

        match result {
            Some(res) => Ok(res.payload.get("contents").unwrap().to_string()),
            None => Err(anyhow::anyhow!("There were no results that matched :(")),
        }
    }
}
现在我们已经设置好了有效使用代理所需的一切,我们可以设置一个提示功能!
use async_openai::types::{
    ChatCompletionRequestMessage, ChatCompletionRequestSystemMessageArgs,
    ChatCompletionRequestUserMessageArgs, CreateChatCompletionRequestArgs,
};

static PROMPT_MODEL: &str = "gpt-4o";

impl MyAgent {
    pub async fn prompt(&self, prompt: &str) -> anyhow::Result<String> {
        let context = self.search_document(prompt.to_owned()).await?;
        let input = format!(
            "{prompt}

            Provided context:
            {}
            ",
            context // this is the payload from Qdrant
        );

        let res = self
            .openai_client
            .chat()
            .create(
                CreateChatCompletionRequestArgs::default()
                    .model(PROMPT_MODEL)
                    .messages(vec![
                        //First we add the system message to define what the Agent does
                        ChatCompletionRequestMessage::System(
                            ChatCompletionRequestSystemMessageArgs::default()
                                .content(SYSTEM_MESSAGE)
                                .build()?,
                        ),
                        //Then we add our prompt
                        ChatCompletionRequestMessage::User(
                            ChatCompletionRequestUserMessageArgs::default()
                                .content(input)
                                .build()?,
                        ),
                    ])
                    .build()?,
            )
            .await
            .map(|res| {
                //We extract the first one
                res.choices[0].message.content.clone().unwrap()
            })?;

        println!("Retrieved result from prompt: {res}");

        Ok(res)
    }
}

将代理连接到我们的 Web 服务 因为我们将代理逻辑与 Web 服务逻辑分开了,所以我们只需要将各个部分连接在一起就可以完成了!

首先,我们将创建几个结构 -Prompt采用 JSON 提示的结构和AppState充当 Axum Web 服务器中的共享应用程序状态的函数。

#[derive(Deserialize)]
pub struct Prompt {
    prompt: String,
}

#[derive(Clone)]
pub struct AppState {
    agent: MyAgent,
}

我们还将在这里介绍我们的提示处理程序端点:

async fn prompt(
    State(state): State<AppState>,
    Json(json): Json<Prompt>,
) -> Result<impl IntoResponse> {
    let prompt_response = state.agent.prompt(&json.prompt).await?;

    Ok((StatusCode::OK, prompt_response))
}

然后我们需要在主函数中解析我们的 CSV 文件,创建AppState并嵌入 CSV,以及设置我们的路由器:

#[shuttle_runtime::main]
async fn main(
    #[shuttle_qdrant::Qdrant] qdrant_client: QdrantClient,
    #[shuttle_runtime::Secrets] secrets: SecretStore,
) -> shuttle_axum::ShuttleAxum {
    secrets.into_iter().for_each(|x| {
        set_var(x.0, x.1);
    });

    // note that this already assumes you have a file called "test.csv"
    // in your project root
    let file = File::new("test.csv".into())?;

    let state = AppState {
        agent: MyAgent::new(qdrant_client),
    };

    state.agent.embed_document(file).await?;

    let router = Router::new()
        .route("/", get(hello_world))
        .route("/prompt", post(prompt))
        .with_state(state);

    Ok(router.into())
}

部署 要部署,您需要做的就是使用cargo shuttle deploy(--ad如果在具有未提交更改的 Git 分支上则使用标志),然后坐下来观看奇迹发生!

总结 通过结合 AI 代理和 RAG 的力量,我们可以创建强大的工作流程,以满足许多不同的用例。借助 Rust,我们可以利用性能优势,以较低的内存占用安全地运行我们的工作流程。