前面两篇我们聊了Cloud Agent的基础架构设计和工具调用链的实现,这一篇进入实打实的落地环节:怎么给Agent做一个能交互的Web界面,以及怎么把用户的对话历史、Agent的运行状态持久化存下来,不会重启就丢数据。
我们用的栈还是现在做AI Agent最顺手的组合:FastAPI做后端服务,Vue3做前端交互,SQLAlchemy+SQLite做基础持久化,不需要复杂的分布式存储,小项目本地就能跑起来,后续扩容也能无缝切到PostgreSQL。
第一步:先梳理清楚Web交互需要哪些核心能力
做Agent的Web交互,和普通的聊天框不一样,核心要解决三个特殊需求:
流式输出:Agent生成内容慢,不能等全生成完再返回,要像Claude、ChatGPT那样逐字吐出来,用户不用干等
工具调用可视化:Agent不是只输出文本,它还要调用工具、访问API、读文件,得让用户看到当前Agent正在做什么,调用了哪个工具,返回了什么结果
对话历史恢复:用户刷新页面、下次打开,能接着上一次的对话继续聊,不能重新开会话
基于这个需求,我们用SSE(Server-Sent Events)做流式传输,比WebSocket更轻量,后端适配FastAPI也简单,不需要额外维护连接状态,正好适配Agent单会话流式输出的场景。
后端:FastAPI对接SSE流式响应,处理Agent会话
先写核心的对话接口,核心逻辑是:接收用户的提问,把查询参数挂到请求里,然后逐次yield Agent的输出事件,区分普通文本、工具调用、结束三种事件类型,前端根据事件类型做不同渲染。
核心接口代码示例:
python
from fastapi import APIRouter, Request, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json
from typing import AsyncGenerator
from agent.cloud_agent import CloudAgent
from persistence import get_db
from models import Conversation
router = APIRouter()
# 定义请求体
class ChatRequest(BaseModel):
conversation_id: str | None = None
message: str
user_id: str = "default"
# 定义SSE事件格式,前端好解析
class SSEEvent:
def __init__(self, event_type: str, data: dict):
self.event_type = event_type
self.data = data
def to_sse(self) -> str:
return f"event: {self.event_type}\ndata: {json.dumps(self.data, ensure_ascii=False)}\n\n"
# 核心流式聊天接口
@router.post("/chat/stream")
async def stream_chat(req: ChatRequest, db = Depends(get_db)) -> StreamingResponse:
# 1. 拉取已有对话历史,如果是新会话就创建
if req.conversation_id:
conv = db.query(Conversation).filter(Conversation.id == req.conversation_id).first()
history = conv.get_history()
else:
conv = Conversation(user_id=req.user_id, title=req.message[:20])
db.add(conv)
db.commit()
history = []
# 2. 初始化Agent,传入历史对话
agent = CloudAgent(history=history)
# 3. 定义流式生成器,逐次返回事件
async def event_generator() -> AsyncGenerator[str, None]:
# 工具调用开始事件
async for step in agent.run(req.message):
if step["type"] == "tool_start":
yield SSEEvent(
event_type="tool_start",
data={"tool_name": step["tool_name"], "params": step["params"]}
).to_sse()
elif step["type"] == "tool_end":
yield SSEEvent(
event_type="tool_end",
data={"result": step["result"]}
).to_sse()
elif step["type"] == "text_delta":
# 逐字返回文本增量
yield SSEEvent(
event_type="text_delta",
data={"delta": step["delta"]}
).to_sse()
# 对话结束,保存完整对话到数据库
full_history = agent.get_full_history()
conv.set_history(full_history)
db.commit()
# 发送结束事件,返回会话ID方便前端保存
yield SSEEvent(
event_type="done",
data={"conversation_id": conv.id}
).to_sse()
return StreamingResponse(event_generator(), media_type="text/event-stream")
这里有个非常容易踩的坑:FastAPI默认会对响应做gzip压缩,但SSE流式响应开了压缩之后,会把所有内容攒到buffer满了才返回,就变成了全量输出不是流式了,一定要在FastAPI的配置里给SSE接口关掉压缩:
python
# 可以用这个装饰器手动关,或者在中间件里判断路径处理
from fastapi.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000, exclude_paths=["/api/chat/stream"])
前端:Vue3实现带工具状态展示的流式聊天框
前端核心要处理SSE连接,根据不同的事件类型更新页面,我们用最简单的响应式变量维护消息列表,工具调用单独做一个状态展示,不用搞复杂的状态管理,小项目够用。
核心SSE连接逻辑:
vue
<script setup>
import { ref } from 'vue';
const messageInput = ref('');
// 消息列表:type区分text/tool
const messages = ref<Array<{
role: 'user' | 'agent',
content: string,
tool?: { name: string, result: any }
}>>([]);
const isConnecting = ref(false);
let currentConversationId: string | null = null;
// 发送消息,建立SSE连接
async function sendMessage() {
if (!messageInput.value.trim() || isConnecting.value) return;
// 先把用户消息加到列表
const userMsg = messageInput.value.trim();
messages.value.push({ role: 'user', content: userMsg });
messageInput.value = '';
isConnecting.value = true;
// 给Agent加一个空占位,后续增量更新
const agentMsgIndex = messages.value.length;
messages.value.push({ role: 'agent', content: '', tool: undefined });
// 建立SSE连接
const eventSource = new EventSource(`/api/chat/stream?message=${encodeURIComponent(userMsg)}&conversation_id=${currentConversationId || ''}`);
let currentTool: any = null;
// 处理不同事件
eventSource.addEventListener('tool_start', (e) => {
const data = JSON.parse(e.data);
currentTool = { name: data.tool_name, params: data.params };
// 把当前工具调用加到消息里展示
messages.value[agentMsgIndex].tool = currentTool;
});
eventSource.addEventListener('text_delta', (e) => {
const data = JSON.parse(e.data);
messages.value[agentMsgIndex].content += data.delta;
});
eventSource.addEventListener('done', (e) => {
const data = JSON.parse(e.data);
currentConversationId = data.conversation_id;
eventSource.close();
isConnecting.value = false;
currentTool = null;
});
eventSource.addEventListener('error', (e) => {
console.error('SSE error', e);
eventSource.close();
isConnecting.value = false;
messages.value[agentMsgIndex].content += '\n[连接出错,请重试]';
});
}
</script>
工具调用的展示直接做一个醒目的灰色块就行,不用太复杂:
vue
<template>
<div>
<div v-for="msg in messages" :key="index" :class="msg.role">
<div v-if="msg.tool">
<span>🔧 调用工具 {{ msg.tool.name }}</span>
</div>
<div>{{ msg.content }}</div>
</div>
<div>
<textarea v-model="messageInput" @keyup.enter="sendMessage"></textarea>
<button @click="sendMessage" :disabled="isConnecting">
{{ isConnecting ? '生成中...' : '发送' }}
</button>
</div>
</div>
</template>
<style scoped>
.tool-block {
background: #f5f5f5;
border-radius: 6px;
padding: 8px 12px;
margin-bottom: 8px;
}
.tool-badge {
font-size: 12px;
color: #666;
}
</style>
这样用户就能清晰看到Agent当前在做什么,不会对着空白屏幕不知道是不是卡住了。
数据持久化:用SQLAlchemy做会话和状态存储
我们需要存两类核心数据:用户的会话基本信息、每一轮的对话历史、Agent工具调用的中间结果。对于中小规模的Cloud Agent,用SQLite足够,配置简单,不需要额外装数据库服务,直接存在本地文件里,后续需要扩容直接把连接串改成PostgreSQL就行,代码不用改。
数据模型定义:
python
from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import json
from datetime import datetime
from config import settings
# 初始化连接,SQLite文件存在项目根目录
engine = create_engine(
settings.DB_URL, # 开发环境就是"sqlite:///./cloud_agent.db"
connect_args={"check_same_thread": False} # SQLite需要加这个参数
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 对话表,核心模型
class Conversation(Base):
__tablename__ = "conversations"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String(64), index=True)
title = Column(String(128))
# 对话历史存在json字段里,用TEXT类型存,兼容所有数据库
history_json = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 封装序列化/反序列化,方便调用
def get_history(self):
if not self.history_json:
return []
return json.loads(self.history_json)
def set_history(self, history):
self.history_json = json.dumps(history, ensure_ascii=False)
self.updated_at = datetime.utcnow()
# 建表,第一次运行自动创建
Base.metadata.create_all(bind=engine)
# 依赖注入,给FastAPI接口用
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
为什么不把每一条消息单独拆成表?
对于Cloud Agent这种单用户为主的小项目,把整个对话历史序列化存在一个json字段里足够用,查询一次就能拿到整个会话的所有内容,不需要连表查询,性能更好,代码也简单。如果后续要做消息检索、大数据量分析,再拆分成单独的消息表也不迟,现在先满足快速落地的需求。
如果需要存Agent的工具缓存、中间状态,比如Agent爬取过的云资源信息,也可以按照同样的逻辑加一张ToolCache表,存在数据库里,下次同一个会话不用重复调用云API,速度快很多,还能减少API调用次数省钱。
最后说两个踩过的坑,帮你省时间
SSE跨域问题:如果前端和后端端口不一样,跨域的时候一定要让后端CORS配置允许text/event-stream类型,并且要把Access-Control-Allow-Origin设对,不能用通配符,不然Chrome会拦截SSE响应。
对话历史丢上下文:每次保存对话的时候,一定要把系统提示词、用户消息、Agent回复、工具调用结果都完整存进去,不要只存用户和Agent的文本消息,不然下次恢复会话的时候,Agent不知道之前调用过什么工具,很容易出错。
SQLite并发问题:如果是多用户同时用的小服务,SQLite默认只支持单写,并发高了会报锁库错误,这时候直接换PostgreSQL就行,SQLAlchemy代码不用改,只换连接串就搞定了,我们一开始的设计已经留好了扩展性。
下一篇我们会聊Cloud Agent的核心能力:怎么给它加上云资源操作的权限控制,以及怎么处理不同云厂商的API兼容性,不会写死在代码里,方便后续加新厂商,感兴趣可以继续蹲~