三公机器人

牛牛机器人,三公撑船机器人,微信牛牛机器人

牛牛机器人 Cloud Agent 开发笔记(3):Web 交互与数据持久化



前面两篇我们聊了Cloud Agent的基础架构设计和工具调用链的实现,这一篇进入实打实的落地环节:怎么给Agent做一个能交互的Web界面,以及怎么把用户的对话历史、Agent的运行状态持久化存下来,不会重启就丢数据。


我们用的栈还是现在做AI Agent最顺手的组合:FastAPI做后端服务,Vue3做前端交互,SQLAlchemy+SQLite做基础持久化,不需要复杂的分布式存储,小项目本地就能跑起来,后续扩容也能无缝切到PostgreSQL。


第一步:先梳理清楚Web交互需要哪些核心能力


做Agent的Web交互,和普通的聊天框不一样,核心要解决三个特殊需求:


流式输出‌:Agent生成内容慢,不能等全生成完再返回,要像Claude、ChatGPT那样逐字吐出来,用户不用干等

工具调用可视化‌:Agent不是只输出文本,它还要调用工具、访问API、读文件,得让用户看到当前Agent正在做什么,调用了哪个工具,返回了什么结果

对话历史恢复‌:用户刷新页面、下次打开,能接着上一次的对话继续聊,不能重新开会话


基于这个需求,我们用SSE(Server-Sent Events)做流式传输,比WebSocket更轻量,后端适配FastAPI也简单,不需要额外维护连接状态,正好适配Agent单会话流式输出的场景。


后端:FastAPI对接SSE流式响应,处理Agent会话


先写核心的对话接口,核心逻辑是:接收用户的提问,把查询参数挂到请求里,然后逐次yield Agent的输出事件,区分普通文本、工具调用、结束三种事件类型,前端根据事件类型做不同渲染。


核心接口代码示例:

python

from fastapi import APIRouter, Request, Depends

from fastapi.responses import StreamingResponse

from pydantic import BaseModel

import json

from typing import AsyncGenerator


from agent.cloud_agent import CloudAgent

from persistence import get_db

from models import Conversation


router = APIRouter()


# 定义请求体

class ChatRequest(BaseModel):

    conversation_id: str | None = None

    message: str

    user_id: str = "default"


# 定义SSE事件格式,前端好解析

class SSEEvent:

    def __init__(self, event_type: str, data: dict):

        self.event_type = event_type

        self.data = data

    

    def to_sse(self) -> str:

        return f"event: {self.event_type}\ndata: {json.dumps(self.data, ensure_ascii=False)}\n\n"


# 核心流式聊天接口

@router.post("/chat/stream")

async def stream_chat(req: ChatRequest, db = Depends(get_db)) -> StreamingResponse:

    # 1. 拉取已有对话历史,如果是新会话就创建

    if req.conversation_id:

        conv = db.query(Conversation).filter(Conversation.id == req.conversation_id).first()

        history = conv.get_history()

    else:

        conv = Conversation(user_id=req.user_id, title=req.message[:20])

        db.add(conv)

        db.commit()

        history = []

    

    # 2. 初始化Agent,传入历史对话

    agent = CloudAgent(history=history)

    

    # 3. 定义流式生成器,逐次返回事件

    async def event_generator() -> AsyncGenerator[str, None]:

        # 工具调用开始事件

        async for step in agent.run(req.message):

            if step["type"] == "tool_start":

                yield SSEEvent(

                    event_type="tool_start",

                    data={"tool_name": step["tool_name"], "params": step["params"]}

                ).to_sse()

            elif step["type"] == "tool_end":

                yield SSEEvent(

                    event_type="tool_end",

                    data={"result": step["result"]}

                ).to_sse()

            elif step["type"] == "text_delta":

                # 逐字返回文本增量

                yield SSEEvent(

                    event_type="text_delta",

                    data={"delta": step["delta"]}

                ).to_sse()

        

        # 对话结束,保存完整对话到数据库

        full_history = agent.get_full_history()

        conv.set_history(full_history)

        db.commit()

        # 发送结束事件,返回会话ID方便前端保存

        yield SSEEvent(

            event_type="done",

            data={"conversation_id": conv.id}

        ).to_sse()

    

    return StreamingResponse(event_generator(), media_type="text/event-stream")



这里有个非常容易踩的坑:FastAPI默认会对响应做gzip压缩,但SSE流式响应开了压缩之后,会把所有内容攒到buffer满了才返回,就变成了全量输出不是流式了,一定要在FastAPI的配置里给SSE接口关掉压缩:


python

# 可以用这个装饰器手动关,或者在中间件里判断路径处理

from fastapi.middleware.gzip import GZipMiddleware

app.add_middleware(GZipMiddleware, minimum_size=1000, exclude_paths=["/api/chat/stream"])


前端:Vue3实现带工具状态展示的流式聊天框


前端核心要处理SSE连接,根据不同的事件类型更新页面,我们用最简单的响应式变量维护消息列表,工具调用单独做一个状态展示,不用搞复杂的状态管理,小项目够用。


核心SSE连接逻辑:

vue

<script setup>

import { ref } from 'vue';


const messageInput = ref('');

// 消息列表:type区分text/tool

const messages = ref<Array<{

  role: 'user' | 'agent',

  content: string,

  tool?: { name: string, result: any }

}>>([]);

const isConnecting = ref(false);

let currentConversationId: string | null = null;


// 发送消息,建立SSE连接

async function sendMessage() {

  if (!messageInput.value.trim() || isConnecting.value) return;

  

  // 先把用户消息加到列表

  const userMsg = messageInput.value.trim();

  messages.value.push({ role: 'user', content: userMsg });

  messageInput.value = '';

  isConnecting.value = true;

  

  // 给Agent加一个空占位,后续增量更新

  const agentMsgIndex = messages.value.length;

  messages.value.push({ role: 'agent', content: '', tool: undefined });

  

  // 建立SSE连接

  const eventSource = new EventSource(`/api/chat/stream?message=${encodeURIComponent(userMsg)}&conversation_id=${currentConversationId || ''}`);

  

  let currentTool: any = null;

  

  // 处理不同事件

  eventSource.addEventListener('tool_start', (e) => {

    const data = JSON.parse(e.data);

    currentTool = { name: data.tool_name, params: data.params };

    // 把当前工具调用加到消息里展示

    messages.value[agentMsgIndex].tool = currentTool;

  });

  

  eventSource.addEventListener('text_delta', (e) => {

    const data = JSON.parse(e.data);

    messages.value[agentMsgIndex].content += data.delta;

  });

  

  eventSource.addEventListener('done', (e) => {

    const data = JSON.parse(e.data);

    currentConversationId = data.conversation_id;

    eventSource.close();

    isConnecting.value = false;

    currentTool = null;

  });

  

  eventSource.addEventListener('error', (e) => {

    console.error('SSE error', e);

    eventSource.close();

    isConnecting.value = false;

    messages.value[agentMsgIndex].content += '\n[连接出错,请重试]';

  });

}

</script>



工具调用的展示直接做一个醒目的灰色块就行,不用太复杂:


vue

<template>

  <div>

    <div v-for="msg in messages" :key="index" :class="msg.role">

      <div v-if="msg.tool">

        <span>🔧 调用工具 {{ msg.tool.name }}</span>

      </div>

      <div>{{ msg.content }}</div>

    </div>

    <div>

      <textarea v-model="messageInput" @keyup.enter="sendMessage"></textarea>

      <button @click="sendMessage" :disabled="isConnecting">

        {{ isConnecting ? '生成中...' : '发送' }}

      </button>

    </div>

  </div>

</template>


<style scoped>

.tool-block {

  background: #f5f5f5;

  border-radius: 6px;

  padding: 8px 12px;

  margin-bottom: 8px;

}

.tool-badge {

  font-size: 12px;

  color: #666;

}

</style>



这样用户就能清晰看到Agent当前在做什么,不会对着空白屏幕不知道是不是卡住了。


数据持久化:用SQLAlchemy做会话和状态存储


我们需要存两类核心数据:用户的会话基本信息、每一轮的对话历史、Agent工具调用的中间结果。对于中小规模的Cloud Agent,用SQLite足够,配置简单,不需要额外装数据库服务,直接存在本地文件里,后续需要扩容直接把连接串改成PostgreSQL就行,代码不用改。


数据模型定义:

python

from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy.orm import sessionmaker

import json

from datetime import datetime

from config import settings


# 初始化连接,SQLite文件存在项目根目录

engine = create_engine(

    settings.DB_URL, # 开发环境就是"sqlite:///./cloud_agent.db"

    connect_args={"check_same_thread": False} # SQLite需要加这个参数

)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()


# 对话表,核心模型

class Conversation(Base):

    __tablename__ = "conversations"

    

    id = Column(Integer, primary_key=True, index=True)

    user_id = Column(String(64), index=True)

    title = Column(String(128))

    # 对话历史存在json字段里,用TEXT类型存,兼容所有数据库

    history_json = Column(Text)

    created_at = Column(DateTime, default=datetime.utcnow)

    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    

    # 封装序列化/反序列化,方便调用

    def get_history(self):

        if not self.history_json:

            return []

        return json.loads(self.history_json)

    

    def set_history(self, history):

        self.history_json = json.dumps(history, ensure_ascii=False)

        self.updated_at = datetime.utcnow()


# 建表,第一次运行自动创建

Base.metadata.create_all(bind=engine)


# 依赖注入,给FastAPI接口用

def get_db():

    db = SessionLocal()

    try:

        yield db

    finally:

        db.close()


为什么不把每一条消息单独拆成表?


对于Cloud Agent这种单用户为主的小项目,把整个对话历史序列化存在一个json字段里足够用,查询一次就能拿到整个会话的所有内容,不需要连表查询,性能更好,代码也简单。如果后续要做消息检索、大数据量分析,再拆分成单独的消息表也不迟,现在先满足快速落地的需求。


如果需要存Agent的工具缓存、中间状态,比如Agent爬取过的云资源信息,也可以按照同样的逻辑加一张ToolCache表,存在数据库里,下次同一个会话不用重复调用云API,速度快很多,还能减少API调用次数省钱。


最后说两个踩过的坑,帮你省时间

SSE跨域问题‌:如果前端和后端端口不一样,跨域的时候一定要让后端CORS配置允许text/event-stream类型,并且要把Access-Control-Allow-Origin设对,不能用通配符,不然Chrome会拦截SSE响应。

对话历史丢上下文‌:每次保存对话的时候,一定要把系统提示词、用户消息、Agent回复、工具调用结果都完整存进去,不要只存用户和Agent的文本消息,不然下次恢复会话的时候,Agent不知道之前调用过什么工具,很容易出错。

SQLite并发问题‌:如果是多用户同时用的小服务,SQLite默认只支持单写,并发高了会报锁库错误,这时候直接换PostgreSQL就行,SQLAlchemy代码不用改,只换连接串就搞定了,我们一开始的设计已经留好了扩展性。


下一篇我们会聊Cloud Agent的核心能力:怎么给它加上云资源操作的权限控制,以及怎么处理不同云厂商的API兼容性,不会写死在代码里,方便后续加新厂商,感兴趣可以继续蹲~


Powered By Z-BlogPHP 1.7.3

三公机器人,牛牛机器人,三公撑船机器人,微信牛牛机器人