Server-Sent Events · Streaming Architecture

Dawn AI SSE 流式架构详解

从一次 /api/v1/chat/stream 请求出发,拆解事件如何在 Agent 推理过程中实时产生、序列化、并逐字节推送到浏览器。

Spring MVC · SseEmitter POST + fetch ReadableStream Reactor · ChatClient.stream() ThreadLocal Sink
01

为什么是 SSE,而且是 POST + fetch

Agent 在回答前要先规划、再思考、再调用工具、最后逐字生成答案——这是一个长耗时、多阶段的过程。SSE 让服务端在过程中单向、持续地把中间态推给前端,用户无需等待整段答案。

ENDPOINT
POST /api/v1/chat/stream
ChatController.streamChat() 暴露,produces = text/event-stream,返回 SseEmitter
TRANSPORT
fetch + ReadableStream
前端用 fetch()res.body.getReader(),手工按 SSE 协议解析 event: / data: 行。
ENVELOPE
ChatStreamEvent
统一事件信封:event / sessionId / seq / timestamp / data,由 Jackson 序列化为 data 行的 JSON。
设计决策

浏览器原生 EventSource 只支持 GET、无法携带请求体。而本接口需要 POST 一个 ChatRequest JSON(message / sessionId / topicId),因此放弃 EventSource,改用 fetch() 手工解析流。代价是:失去浏览器自带的断线自动重连——项目当前也确实没有实现重连机制。

02

分层架构:事件从哪里来,到哪里去

控制器立即把 SseEmitter 交还给 Spring MVC,真正的处理跑在 chatStreamExecutor 线程池里。Orchestrator 通过一个 Consumer<ChatStreamEvent> sink 把事件回灌给 ChatService,再由它统一 emitter.send()

浏览器app.js
fetch(POST)
提交 ChatRequest,读取 ReadableStream,按行解析 SSE 帧并驱动 UI。
sendMessageStream() · handleStreamEvent()
HTTP text/event-stream(长连接)
Controllerweb 层
ChatController
@PostMapping("/stream"),返回 SseEmitter 后立即把控制权还给容器。
streamChat(ChatRequest)
ChatServiceSSE 收口
SseEmitter
new SseEmitter(timeout),注册 onTimeout / onError / onCompletion 回调。
streamTimeoutMs = 120000
chatStreamExecutor
异步线程池执行处理;满载时抛 RejectedExecution → CAPACITY_EXCEEDED。
executor.execute(...)
sendEvent() sink
synchronized(emitter) 内设置 seq 自增、Jackson 序列化、emitter.send()。
seq = seqCounter.getAndIncrement()
Consumer<ChatStreamEvent> sink(事件回调)
Orchestrator事件源头
AgentOrchestrator
resolvePlan → chatClient.stream(),每个 chunk 产出 thinking / token 事件。
streamChat(..., sink, isCancelled)
StepCollector
工具调用步骤汇聚器,通过 stepEventPublisher 实时 emit step 事件。
init(maxSteps, publisher)
StreamSinkHolder
ThreadLocal 持有当前 sink,供 sub-agent 冒泡 sub_progress 心跳。
set(sink) / get() / clear()
03

九种事件类型

全部由 ChatStreamEvent 的工厂方法构造。规范顺序:connected → plan_thinking* → plan? → thinking* → (step | sub_progress)* → token* → done | error

connected
连接建立,最先发出
data: sessionId, streamId
plan_thinking
规划器的思考过程增量
data: content, accumulatedLength
plan
规划结果(计划存在时才发)
data: steps[], summary
thinking
主模型推理过程增量
data: content, accumulatedLength
step
一次工具调用步骤完成
data: AgentStep(toolName, input, output, durationMs, status, subSteps)
sub_progress
sub-agent 内部步骤进度心跳
data: parentToolName, subAgentType, subStep, currentTool
token
最终答案 token 增量(逐字)
data: content, accumulatedLength
done
流正常结束,含完整结果
data: answer, steps[], planSummary, totalSteps, durationMs, model
error
异常终止
data: code, message
04

时序演示:一次请求里事件如何流出

点击播放,模拟一次真实请求中事件按 seq 递增、跨层产生并到达客户端的过程。左侧是产生事件的层,右侧是浏览器逐字拼装答案与解析日志。

速度
seq · idle
Service · connected
Orchestrator · plan
Orchestrator · thinking
StepCollector · step
Orchestrator · token
Orchestrator · done
// 浏览器 · 答案拼装
等待 token…
// 解析日志(handleStreamEvent)
05

完整调用链

从控制器入口到字节落地浏览器的端到端调用路径,对应源码真实方法。

1
Controller 转发ChatController.streamChat()
校验请求体后直接调用 chatService.streamChat(request),把返回的 SseEmitter 交还给 Spring MVC,连接立即挂起等待事件。
2
建立 Emitter 与状态ChatService.streamChat()
创建 SseEmitter(streamTimeoutMs)seqCountercancelled 标志与 streamId,注册 onTimeout / onError / onCompletion 回调。
3
异步执行 + 首帧chatStreamExecutor.execute()
在线程池中先发 connected,随后调用 agentOrchestrator.streamChat(...),并把一个 lambda sink 与 cancelled::get 传进去。
4
初始化事件源AgentOrchestrator.streamChat()
StepCollector.init(maxSteps, stepEventPublisher) 注册 step 实时发布器;StreamSinkHolder.set(sink) 把 sink 暴露给 sub-agent。
5
规划阶段resolvePlan()
规划器返回 reasoning 与 steps:有思考则发 plan_thinking,计划非空则发 plan
6
流式推理chatClient.prompt().stream()
.contextCapture() 快照 ThreadLocal 进 Reactor Context,.takeWhile(!isCancelled) 支持中途取消。每个 chunk:有 reasoning 发 thinking,有 delta 发 token;工具调用由 StepCollector 异步发 step
7
收尾StepCollector.collect() → done
blockLast() 等流结束后,收集 steps、写入 memory,发出携带完整 answer / steps / planSummary 的 done;异常路径则发 error。finally 中 StepCollector.clear() + StreamSinkHolder.clear() 防 ThreadLocal 泄漏。
8
统一下发ChatService.sendEvent()
sink 每收到事件即进入 synchronized(emitter):设置自增 seq,Jackson 序列化,emitter.send(name + id + data)done/error 还会落库逻辑日志。最终 finally 里 emitter.complete()
9
前端消费app.js · sendMessageStream()
reader.read() 增量解码,按 \n 切行解析 event: / data:handleStreamEvent() 按类型驱动 UI:token 逐字拼接气泡,step 展开工具 trace,done 收尾。
06

关键机制与边界处理

超时
SseEmitter(streamTimeoutMs),默认 app.ai.stream.timeout-ms = 120000(120s)。onTimeout 触发时标记 cancelled、发 error(TIMEOUT)complete()
取消 / 断线
onCompletion(含客户端断开)置 cancelled=true,经 cancelled::get 传入 Orchestrator,takeWhile(!isCancelled) 终止 Reactor 流,避免无谓的模型消耗。
#
序号 seq
每个事件在 sendEvent() 里取 seqCounter.getAndIncrement(),并写入 SSE 帧的 id: 字段。整个流内单调递增,前端可据此判序。
sub-agent 心跳冒泡
StreamSinkHolder 用 ThreadLocal 持有 sink;DispatchSubAgentTool 取出后为 sub-agent 构造进度监听器,每完成一步发 sub_progress。非流式请求 sink 为 null,退化为静默执行。
🔒
并发安全
事件可能来自主流与 sub-agent 等不同线程,sendEvent()synchronized(emitter) 串行化 send 与 seq 自增,保证帧不交错、序号不竞态。
!
错误码体系
异常被分类成统一错误码后随 error 事件下发:
TIMEOUT CAPACITY_EXCEEDED MAX_STEPS_EXCEEDED AI_NOT_CONFIGURED INTERNAL_ERROR
07

线缆上的真实报文

每个事件序列化为一帧:event: 取自 event 字段,id: 取自 seqdata: 是整个信封的 JSON(MediaType.APPLICATION_JSON)。

// seq=0 连接建立 event: connected id: 0 data: {"event":"connected","sessionId":"a1b2…","seq":0,"data":{"streamId":"…"}} // seq=1 规划结果 event: plan id: 1 data: {"event":"plan","seq":1,"data":{"steps":[…],"summary":"步骤1: … → 步骤2: 完成"}} // seq=N 答案 token(逐字,多帧) event: token id: 7 data: {"event":"token","seq":7,"data":{"content":"你","accumulatedLength":1}} // seq=last 结束帧 event: done id: 12 data: {"event":"done","seq":12,"data":{"answer":"…","totalSteps":2,"durationMs":8421,"model":"…"}}