从一次 /api/v1/chat/stream 请求出发,拆解事件如何在 Agent 推理过程中实时产生、序列化、并逐字节推送到浏览器。
Agent 在回答前要先规划、再思考、再调用工具、最后逐字生成答案——这是一个长耗时、多阶段的过程。SSE 让服务端在过程中单向、持续地把中间态推给前端,用户无需等待整段答案。
POST /api/v1/chat/streamChatController.streamChat() 暴露,produces = text/event-stream,返回 SseEmitter。fetch() 读 res.body.getReader(),手工按 SSE 协议解析 event: / data: 行。ChatStreamEventevent / sessionId / seq / timestamp / data,由 Jackson 序列化为 data 行的 JSON。浏览器原生 EventSource 只支持 GET、无法携带请求体。而本接口需要 POST 一个 ChatRequest JSON(message / sessionId / topicId),因此放弃 EventSource,改用 fetch() 手工解析流。代价是:失去浏览器自带的断线自动重连——项目当前也确实没有实现重连机制。
控制器立即把 SseEmitter 交还给 Spring MVC,真正的处理跑在 chatStreamExecutor 线程池里。Orchestrator 通过一个 Consumer<ChatStreamEvent> sink 把事件回灌给 ChatService,再由它统一 emitter.send()。
全部由 ChatStreamEvent 的工厂方法构造。规范顺序:connected → plan_thinking* → plan? → thinking* → (step | sub_progress)* → token* → done | error。
点击播放,模拟一次真实请求中事件按 seq 递增、跨层产生并到达客户端的过程。左侧是产生事件的层,右侧是浏览器逐字拼装答案与解析日志。
从控制器入口到字节落地浏览器的端到端调用路径,对应源码真实方法。
chatService.streamChat(request),把返回的 SseEmitter 交还给 Spring MVC,连接立即挂起等待事件。SseEmitter(streamTimeoutMs)、seqCounter、cancelled 标志与 streamId,注册 onTimeout / onError / onCompletion 回调。connected,随后调用 agentOrchestrator.streamChat(...),并把一个 lambda sink 与 cancelled::get 传进去。StepCollector.init(maxSteps, stepEventPublisher) 注册 step 实时发布器;StreamSinkHolder.set(sink) 把 sink 暴露给 sub-agent。plan_thinking,计划非空则发 plan。.contextCapture() 快照 ThreadLocal 进 Reactor Context,.takeWhile(!isCancelled) 支持中途取消。每个 chunk:有 reasoning 发 thinking,有 delta 发 token;工具调用由 StepCollector 异步发 step。blockLast() 等流结束后,收集 steps、写入 memory,发出携带完整 answer / steps / planSummary 的 done;异常路径则发 error。finally 中 StepCollector.clear() + StreamSinkHolder.clear() 防 ThreadLocal 泄漏。synchronized(emitter):设置自增 seq,Jackson 序列化,emitter.send(name + id + data)。done/error 还会落库逻辑日志。最终 finally 里 emitter.complete()。reader.read() 增量解码,按 \n 切行解析 event: / data:,handleStreamEvent() 按类型驱动 UI:token 逐字拼接气泡,step 展开工具 trace,done 收尾。SseEmitter(streamTimeoutMs),默认 app.ai.stream.timeout-ms = 120000(120s)。onTimeout 触发时标记 cancelled、发 error(TIMEOUT) 并 complete()。onCompletion(含客户端断开)置 cancelled=true,经 cancelled::get 传入 Orchestrator,takeWhile(!isCancelled) 终止 Reactor 流,避免无谓的模型消耗。sendEvent() 里取 seqCounter.getAndIncrement(),并写入 SSE 帧的 id: 字段。整个流内单调递增,前端可据此判序。StreamSinkHolder 用 ThreadLocal 持有 sink;DispatchSubAgentTool 取出后为 sub-agent 构造进度监听器,每完成一步发 sub_progress。非流式请求 sink 为 null,退化为静默执行。sendEvent() 用 synchronized(emitter) 串行化 send 与 seq 自增,保证帧不交错、序号不竞态。error 事件下发:每个事件序列化为一帧:event: 取自 event 字段,id: 取自 seq,data: 是整个信封的 JSON(MediaType.APPLICATION_JSON)。