基于 Java 17 + Spring Boot 3.2 + Spring AI 1.1.4 的生产级 AI Agent 应用。 融合 ReAct 与 Plan-and-Solve 双范式,配备三层记忆体系、 混合检索 + 重排序 RAG 管道,以及 Prometheus / Langfuse 全链路可观测。
每个组件都有明确动机:性能、可观测、可替换。无 Spring AI 之外的额外编排框架,避免抽象冗余。
四层清晰:Controller → Service → Agent/RAG/Memory 协作 → Spring AI 抽象 LLM/Embedding/Tool。
┌──────────────────────────────────────────────────────────────────────────┐ │ REST API Layer │ │ ChatController · RagController · TopicController · AiInteractionCtrl │ │ SSE streaming via SseEmitter (Spring MVC) │ └────────────────────────────────┬─────────────────────────────────────────┘ │ ┌────────────────────────────────▼─────────────────────────────────────────┐ │ Service Layer │ │ ChatService · RagService · MemoryService · TopicQueryService │ └────┬──────────────────────┬──────────────────────────┬──────────────────┘ │ │ │ ┌────▼────────────┐ ┌──────▼─────────────┐ ┌────────▼────────────────┐ │ Agent Core │ │ RAG Pipeline │ │ Memory System │ │ ─────────────── │ │ ───────────────── │ │ ─────────────────────── │ │ AgentOrch. │ │ DocumentExtractor │ │ Working (Redis) │ │ TaskPlanner │ │ OverlapSplitter │ │ Summary (pgvector) │ │ ToolRegistry │ │ QueryRewriter+HyDE │ │ Long-term (reflection) │ │ StepCollector │ │ Hybrid Retrieval │ │ UserProfileService │ │ + AOP Aspect │ │ Dense ⊕ BM25/RRF │ │ Consolidator / Decay / │ │ Tools: ×3 │ │ 2-stage Rerank │ │ Eviction / Reflection │ └────┬────────────┘ └──────┬─────────────┘ └────────┬────────────────┘ │ │ │ └──────────────────────┴──────────────────────────┘ │ ┌────────────────────────────────▼─────────────────────────────────────────┐ │ Spring AI (1.1.4) · OpenAI-compatible │ │ ChatClient · EmbeddingModel · VectorStore · Function Tools │ └──────────────────────────────┬───────────────────────────────────────────┘ │ ┌────────────▼─────────────┐ │ oMLX local · or Cloud │ │ Qwen3.5-9B · bge-m3 │ └──────────────────────────┘
不是二选一。Plan 给骨架,ReAct 给灵活性:先一次性低温度规划,再用强制约束驱动 ReAct 严格执行。稳定与灵活的平衡点。
BeanOutputConverter 强制结构化输出 List<PlanStep>Calculator · Weather · KnowledgeSearch纯 ReAct 在复杂任务下容易"跑题"或绕过工具直接回答;纯 Plan 牺牲了响应当前上下文的灵活性。 混合方案让 LLM 先想清楚再做——Plan 像编译期检查,ReAct 像运行时执行。强制约束指令杜绝模型"拿训练知识糊弄"的退化路径。
围绕 AgentOrchestrator 的 8 步生命周期。StepCollector 用 ThreadLocal 收集,AOP 切面零侵入。
┌─ AgentOrchestrator.chat() / streamChat() ─────────────────────────────────┐ │ │ │ 1. StepCollector.init(maxSteps) // reset thread-local state │ │ 2. TaskPlanner.plan() // separate low-temp LLM call │ │ 3. buildSystemPrompt(plan, sessionId, topicId) │ │ ├─ baseSystemPrompt │ │ ├─ UserProfileService.formatForSystemPrompt() // 个性化注入 │ │ ├─ topicSection (若指定 topicId) │ │ ├─ formatPlan(plan) │ │ ├─ formatPlanEnforcement(plan) // 强制工具调用约束 │ │ └─ maxSteps 提示 │ │ 4. buildHistory(sessionId) // 从 MemoryService 拉取最近 20 条│ │ 5. chatClient.prompt().toolNames(...).call() / .stream() │ │ ↳ Spring AI 自动管理 ReAct 循环 │ │ ↳ ToolExecutionAspect (AOP) 拦截每次 apply() → StepCollector │ │ 6. recordTokenUsage() · recordRagMetrics() // Micrometer counters │ │ 7. memoryService.addMessage(user) · addMessage(assistant) │ │ 8. StepCollector.clear() // 防止 ThreadLocal 泄漏 │ │ │ └──────────────────────────────────────────────────────────────────────────────┘
递归下降解析器,支持 + − × ÷ 与括号。Sanitize 后只接受 [0-9+\-*/().] 字符。生产可换 exp4j。
Mock 数据演示 Function Calling 模式(Beijing/Shanghai/Shenzhen/Chengdu)。生产替换为真实 API + Resilience4j 熔断。
触发完整 RAG 管道:QueryRewriter → HyDE → Hybrid → Rerank。基于 StepCollector 做请求内查询去重,节省 LLM/检索调用。
单次对话工具调用硬上限
单次请求 RAG 调用上限
单次流式请求最大等待
JSON Schema 强制输出
从 Working → Summary → Long-term,事件驱动逐级蒸馏。配合衰减与淘汰,避免向量库无限膨胀。
当前会话的原始对话流。新消息 rpush,超过上限 lpop 进入待蒸馏队列。Redis 不可用时自动降级到 ConcurrentHashMap fallback。
每淘汰满 batch_size 条消息,触发 SummarizationRequestEvent → MemorySummarizer 异步蒸馏成段落,带 importance score 写入向量库供后续检索。
每累计 N 个 summary,触发 ReflectionRequestEvent → ReflectionWorker 跨会话归纳"用户的薄弱点 / 偏好"。配合 UserProfileService 反哺 system prompt。
| 组件 | 触发 | 职责 | 关键配置 |
|---|---|---|---|
MemorySummarizer | SummarizationRequestEvent | 对话片段 → 摘要 + importance score | batch-size=3 |
MemoryConsolidator | ConsolidationRequestEvent | 写入 VectorStore,达阈值触发反思 | reflection-threshold=3 |
ReflectionWorker | ReflectionRequestEvent | 归纳长期模式 → 写回 type=reflection | episode-threshold=4 |
ImportanceDecayManager | cron 0 30 3 * * ? | 30 天无访问 importance 减半 | half-life=30d · min=0.01 |
EvictionPolicyManager | cron 0 0 3 * * ? | 低重要度 / 过期文档物理淘汰 | threshold=0.1 · max-age=2d |
MemoryAccessUpdater | 每次检索命中 | 更新 lastAccessedAt 时间戳 | — |
UserProfileService | 每次 chat 入口 | 从 reflection 提取画像 → 注入 system prompt | — |
摄入与查询两条主链路。RetrievalRouter 启发式选择 DENSE / HYBRID / SPARSE,Rerank 支持启发式与 cross-encoder。
| chunk-size / overlap | 500 / 50 |
| similarity-threshold | 0.6 bge-m3 区间 0.4–0.65 |
| default-top-k | 5 |
| hybrid-enabled | true · BM25 + Dense |
| sparse FTS config | english |
| rerank-enabled / type | true · heuristic | cross-encoder |
| query-rewrite / hyde | true / true |
| max-calls-per-session | 3 |
// 自动选择策略 if (strategy != AUTO) return strategy; if (hasMetadataFilters()) return DENSE; // 短查询 / 引号 / 含数字 → keyword-like boolean keywordLike = tokens.size() <= 3 || query.contains("\"") || query.matches(".*\\d.*"); return keywordLike ? HYBRID : DENSE;
短查询(核心名词)走 BM25;自然语言长句走 Dense;带过滤器(如 topicId)始终用 Dense 以利用 metadata 过滤。
Metrics 与 LLM Tracing 两条独立线,docker-compose overlay 按需启用,互不依赖。生产可分别下沉到 VictoriaMetrics / Langfuse 自托管。
| Profile | 容器 | UI |
|---|---|---|
| base | app + postgres + redis | — |
| --profile metrics | + prometheus + grafana | :3000 |
| --profile observe | + langfuse-* + clickhouse + minio | :3001 |
| both | 全开 | :3000 + :3001 |
ai.agent.chat.duration | Agent 端到端延迟 |
ai.token.input / output | Token 成本追踪 |
ai.rag.calls_per_session | 每会话 RAG 调用分布 |
ai.rag.dedup.skipped | 请求内查询去重命中 |
ai.planner.result{status} | 规划成功 / 解析失败 |
agent.memory.redis.failure | Redis 读写失败计数 |
Docker Compose 不再设置 mem_limit / mem_reservation,Redis、Node 与 ClickHouse 也不再设置本地开发内存上限;各组件按宿主机 / Docker Desktop 配额自适应。
SSE + Reactor + 自定义线程池叠加,ThreadLocal 在三种切换中都要稳。项目用 Micrometer Context Propagation + ExecutorService 装饰器双管齐下。
提交线程 get() 抓快照,工作线程 finally 恢复原值。一次性任务用,频繁则失控。
SessionAwareExecutor 在 submit/invoke 入口统一 wrap,调用方无感知。覆盖自定义线程池。
注册 ThreadLocalAccessor + Hooks.enableAutomaticContextPropagation(),Reactor publishOn/subscribeOn 全自动传播。
用 ApplicationRunner 在所有 Bean 初始化完成、第一个真实请求之前的窗口注册 Accessor + 开启 Hook,
既覆盖后续所有 Reactor pipeline,又能享受 Spring 上下文。这是 "Servlet 任意线程 → Reactor 调度器线程" 的标准解法。
REST + SSE 双形态。Agent 通过 sessionId 持续记忆,通过 topicId 限定知识检索域。
| Method | Path | 说明 |
|---|---|---|
| POST | /api/v1/chat | 同步 Agent 对话(plan + react + memory + tools) |
| POST | /api/v1/chat/stream | SSE 流式对话:plan_thinking → plan → thinking → step → token → done |
| POST | /api/v1/rag/ingest | 文档入库(文本 / 文件 via Tika) |
| GET | /api/v1/rag/search | 向量检索(debug 用) |
| GET | /actuator/health | 健康检查(show-details: always) |
| GET | /actuator/prometheus | Prometheus 指标抓取端点 |