从零到上线:手把手教你用FastAPI + LangGraph打造一个带WebSocket流式输出和会话记忆的AI客服接口

张开发
2026/4/22 22:43:15 15 分钟阅读

分享文章

从零到上线:手把手教你用FastAPI + LangGraph打造一个带WebSocket流式输出和会话记忆的AI客服接口
从零到上线手把手教你用FastAPI LangGraph打造一个带WebSocket流式输出和会话记忆的AI客服接口想象一下当你需要为电商平台开发一个智能客服系统时用户期望的不仅是快速响应还需要对话连贯自然——就像和真人交流一样记住之前的聊天内容。这正是现代AI客服系统的核心挑战如何在高并发环境下实现多用户会话隔离、实时流式响应同时保持对话上下文的连贯性。本文将带你从零开始使用FastAPI作为高性能Web框架结合LangGraph构建有状态的对话图最终实现一个支持WebSocket流式输出和完整会话记忆的AI客服接口。不同于简单的问答机器人这个系统能自动总结长对话、管理用户专属的对话线程并提供历史记录查询API。无论你是想学习现代AI应用开发的全栈工程师还是希望升级现有对话系统的后端开发者都能从本文获得可直接落地的解决方案。1. 项目架构设计与技术选型在开始编码之前我们需要明确系统的核心需求和对应的技术方案。这个AI客服系统需要满足以下几个关键特性实时交互支持WebSocket协议实现逐字输出效果会话记忆每个用户的对话历史需要被完整保存和智能利用自动总结长对话时自动生成摘要避免上下文窗口爆炸状态管理多用户会话隔离与线程安全技术栈选择上我们采用以下组合技术组件作用优势说明FastAPIWeb框架高性能、自动文档、WebSocket支持LangGraph有状态对话图可视化流程、检查点机制MemorySaver对话状态持久化线程安全、支持多后端存储Websockets实时双向通信低延迟、全双工通信协议环境准备需要以下基础组件# 创建虚拟环境 python -m venv venv source venv/bin/activate # Linux/Mac venv\Scripts\activate # Windows # 安装核心依赖 pip install fastapi uvicorn websockets pip install langgraph langchain-core特别提醒生产环境建议使用PostgreSQL或Redis作为MemorySaver的后端存储本文为演示方便使用内存存储。2. 构建有状态的对话图LangGraph的核心价值在于将对话流程可视化为有向图其中节点代表处理步骤边代表控制流。我们先定义对话系统的状态结构from typing import Annotated, TypedDict from langchain_core.messages import AnyMessage from langgraph.graph.message import add_messages class ConversationState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] summary: str user_metadata: dict这个状态对象包含三个关键部分messages对话消息列表自动处理消息追加summary动态生成的对话摘要user_metadata用户特征信息如偏好设置接下来实现对话图的核心节点from langgraph.graph import StateGraph from langchain_core.messages import HumanMessage, AIMessage # 初始化对话图 workflow StateGraph(ConversationState) # 定义模型调用节点 async def chat_node(state: ConversationState): last_message state[messages][-1] # 根据摘要调整系统提示词 system_prompt ( f之前的对话摘要{state.get(summary,无)}\n 请以专业客服的身份回答用户问题 ) # 构造完整对话上下文 chat_history [SystemMessage(contentsystem_prompt)] chat_history.extend(state[messages][-5:]) # 限制上下文长度 # 调用AI模型实际项目替换为你的LLM调用 response await mock_llm_call(chat_history) return {messages: [AIMessage(contentresponse)]} # 注册节点 workflow.add_node(chat, chat_node)智能总结节点的实现是关键创新点它会在对话轮次超过阈值时自动触发async def summarize_node(state: ConversationState): # 动态调整总结粒度 summary_depth min(10, len(state[messages])//2) recent_chat state[messages][-summary_depth:] # 生成增量式摘要 summary_prompt ( f当前摘要{state.get(summary,)}\n 请根据以下新对话内容扩展摘要\n f{recent_chat} ) new_summary await mock_llm_call(summary_prompt) # 清理旧消息但保留最近2条 retained_messages state[messages][-2:] if len(state[messages]) 2 else state[messages] return { summary: new_summary, messages: retained_messages } workflow.add_node(summarize, summarize_node)通过条件边实现智能流程控制from typing import Literal def route_decision(state: ConversationState) - Literal[summarize, __end__]: # 超过5轮对话或用户明确要求总结时触发 if len(state[messages]) 5 or any( 总结 in msg.content for msg in state[messages] if isinstance(msg, HumanMessage) ): return summarize return __end__ # 构建图关系 workflow.add_edge(chat, __end__) workflow.add_conditional_edges( chat, route_decision, {summarize: summarize, __end__: __end__} ) workflow.add_edge(summarize, __end__) # 最终编译 app workflow.compile()这个设计使得对话系统能够在常规对话时快速响应在对话过长时自动总结优化内存使用保持对话上下文的连贯性3. FastAPI集成与WebSocket实现FastAPI的双协议支持HTTP/WebSocket让我们能同时提供两种交互方式。首先实现WebSocket端点from fastapi import FastAPI, WebSocket from langgraph.checkpoint.memory import MemorySaver app FastAPI() memory MemorySaver() app.websocket(/ws/chat/{thread_id}) async def websocket_chat(websocket: WebSocket, thread_id: str): await websocket.accept() try: while True: # 接收用户消息 user_input await websocket.receive_text() # 构造对话状态 config {configurable: {thread_id: thread_id}} current_state memory.get(config) or { messages: [], summary: , user_metadata: {} } # 添加用户消息 current_state[messages].append(HumanMessage(contentuser_input)) # 流式处理响应 async for output in app.astream( current_state, configconfig, stream_modevalues ): if messages in output: last_msg output[messages][-1] # 逐词流式输出 for token in stream_tokens(last_msg.content): await websocket.send_text(token) # 更新对话状态 memory.put(config, output) except Exception as e: print(fWebSocket错误: {e}) finally: await websocket.close()同时提供传统的HTTP接口作为备选方案from fastapi import HTTPException from pydantic import BaseModel class ChatRequest(BaseModel): message: str thread_id: str default app.post(/api/chat) async def http_chat(request: ChatRequest): try: config {configurable: {thread_id: request.thread_id}} current_state memory.get(config) or { messages: [], summary: , user_metadata: {} } current_state[messages].append(HumanMessage(contentrequest.message)) # 同步调用非流式 result await app.ainvoke(current_state, configconfig) memory.put(config, result) return { response: result[messages][-1].content, summary: result.get(summary, ) } except Exception as e: raise HTTPException(status_code500, detailstr(e))历史记录查询接口的实现app.get(/api/history/{thread_id}) async def get_history(thread_id: str, limit: int 20): config {configurable: {thread_id: thread_id}} state memory.get(config) if not state: return {history: [], summary: } return { history: [ {role: user if isinstance(m, HumanMessage) else bot, content: m.content} for m in state[messages][-limit:] ], summary: state.get(summary, ) }4. 前端集成与性能优化虽然本文聚焦后端实现但提供一个简单的前端示例帮助测试WebSocket功能!DOCTYPE html html head titleAI客服测试/title style #chatbox { height: 300px; border: 1px solid #ccc; overflow-y: scroll; } .user { color: blue; } .bot { color: green; } /style /head body div idchatbox/div input typetext idmessage placeholder输入消息... button onclicksendMessage()发送/button button onclickconnectWS()连接/button script let ws; const threadId Date.now().toString(); // 用时间戳作为临时会话ID function connectWS() { ws new WebSocket(ws://localhost:8000/ws/chat/${threadId}); ws.onmessage (event) { const chatbox document.getElementById(chatbox); const lastDiv chatbox.lastElementChild; if (lastDiv lastDiv.classList.contains(bot) !lastDiv.dataset.completed) { lastDiv.textContent event.data; } else { const div document.createElement(div); div.className bot; div.textContent event.data; div.dataset.completed false; chatbox.appendChild(div); } }; ws.onclose () { const chatbox document.getElementById(chatbox); const div document.createElement(div); div.textContent 连接已关闭; chatbox.appendChild(div); }; } function sendMessage() { const input document.getElementById(message); const chatbox document.getElementById(chatbox); // 显示用户消息 const userDiv document.createElement(div); userDiv.className user; userDiv.textContent 用户: ${input.value}; chatbox.appendChild(userDiv); // 发送到服务器 if (ws ws.readyState WebSocket.OPEN) { ws.send(input.value); } else { alert(请先建立WebSocket连接); } input.value ; } /script /body /html性能优化建议使用连接池管理WebSocket连接对LangGraph的检查点存储使用Redis等外部存储实现消息批处理减少IO操作添加速率限制防止滥用# 示例添加FastAPI中间件实现基础速率限制 from fastapi import Request from fastapi.middleware import Middleware from slowapi import Limiter from slowapi.util import get_remote_address limiter Limiter(key_funcget_remote_address) middleware [Middleware(limiter)] app FastAPI(middlewaremiddleware) app.websocket(/ws/chat/{thread_id}) limiter.limit(10/minute) async def websocket_chat(request: Request, websocket: WebSocket, thread_id: str): ...5. 部署与监控当系统开发完成后我们需要考虑生产环境部署方案。以下是推荐的部署架构前端静态资源 ↑ CDN加速 ↑ 负载均衡器 (Nginx/Traefik) ↑ FastAPI应用集群 (Uvicorn/Gunicorn) ↓ Redis/MongoDB (会话存储) ↓ 监控系统 (Prometheus Grafana)使用Docker部署的示例配置# backend/Dockerfile FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD [uvicorn, main:app, --host, 0.0.0.0, --port, 8000, --workers, 4]配套的docker-compose.ymlversion: 3.8 services: backend: build: ./backend ports: - 8000:8000 environment: - REDIS_URLredis://redis:6379/0 depends_on: - redis redis: image: redis:alpine ports: - 6379:6379 volumes: - redis_data:/data volumes: redis_data:监控指标建议收集WebSocket连接数平均响应延迟对话轮次分布总结触发频率错误率统计可以通过FastAPI的中间件集成Prometheus监控from prometheus_fastapi_instrumentator import Instrumentator app.on_event(startup) async def startup(): Instrumentator().instrument(app).expose(app)在实际项目中我们发现以下几个优化点能显著提升用户体验为WebSocket添加心跳机制保持长连接实现客户端重连逻辑对历史消息实现分页加载为总结功能添加手动触发选项

更多文章