4-Layer Event-Driven Memory Architecture
Dawn AI 的记忆系统是一个四层事件驱动架构,为 AI Agent 提供从短期工作记忆到长期用户画像的完整记忆生命周期管理。核心设计理念:
Working → Pending → Episodic → Profile,每层有独立的存储引擎和生命周期策略,各司其职。
消息写入不阻塞响应。Summarization、Consolidation、Reflection 三阶段通过 Spring Event 解耦,任何一步失败不影响用户体验。
记忆重要性随时间指数衰减,被检索则刷新权重。低重要性记忆在 180 天后自动清除,reflection 类记忆豁免驱逐。
Dawn AI 采用四层分层架构:L1 Working Memory 处理实时对话,L2 Pending Queue 批量积累,L3 Episodic Memory 向量持久化,L4 User Profile 提炼用户人格。每层独立存储,逐层向上流动。
四层记忆各司其职,从实时对话缓存到长期人格画像,逐层提炼:
类似人类的「工作记忆」,存储当前会话中的实时对话历史。
存储引擎 Redis List (降级: ConcurrentHashMap) Key ai:session:{sessionId} 窗口大小 20 条消息滑动窗口 TTL 2 小时 原子性 Atomic RENAME 确保 pending 排空 写入时机 每次用户消息到达时实时写入
批量积累消息,达到阈值后触发 LLM 摘要,是连接 L1 和 L3 的桥梁。
SummarizationRequestEvent,LLM 将对话压缩为约 100 字的精炼摘要。存储引擎 Redis List (:pending 后缀) Key ai:session:{sessionId}:pending 批次大小 默认 5 条 (测试环境 3 条) 摘要输出 约 100 字精炼摘要 触发事件 SummarizationRequestEvent TTL 2 小时
Dawn AI 的核心长期记忆层——基于向量嵌入的语义记忆,支持模糊匹配和概念关联。
summary(重要性 0.5)来自 LLM 摘要,reflection(重要性 0.9)来自 ReflectionWorker 的高阶提炼。搜索「旅行偏好」能匹配到「喜欢去海边度假」,因为两者在语义空间中相近。存储引擎 PostgreSQL + pgvector 扩展 索引类型 HNSW (余弦距离) Embedding 1024 维向量 元数据 type / sessionId / importance / createdAt / lastAccessedAt 衰减公式 importance *= 0.5^(days/30),下限 0.01 驱逐条件 importance < 0.1 AND age > 180天 (reflection 豁免)
从长期记忆中提炼的用户人格特征,注入 System Prompt 让模型「记住」用户是谁。
存储引擎 Redis Hash Key ai:profile:{userId} TTL 30 天 数据来源 ReflectionWorker 从长期记忆中提取 输出格式 约 200 字用户人格特征 注入方式 结构化 key-value → System Prompt
当用户发送消息时,Dawn AI 内部执行以下7 步事件驱动处理流程:
用户消息到达,调用 addMessage 写入 Redis List。消息进入 L1 Working Memory 的 20 条滑动窗口,实时可读。
消息同时写入 L2 Pending Queue(:pending 后缀)。当积累达到 batch-size=5 条时,触发下一步摘要流程。
触发 SummarizationRequestEvent,MemorySummarizer 将 5 条消息压缩为约 100 字的精炼摘要。LLM 不可用时 importance 降级为 0.3。
MemoryConsolidator 将摘要写入 PGVector,生成 1024 维 embedding,建立 HNSW 索引。记忆类型标记为 summary,初始重要性 0.5。
使用 CAS(Compare-And-Swap)原子计数器追踪摘要次数。当计数达到 reflection-threshold 时,触发长期反思流程。
ReflectionWorker 从多条 summary 中提炼高阶认知,生成约 200 字的用户人格特征。类型标记为 reflection,重要性 0.9,永不淘汰。
用户人格特征写入 L4 Redis Hash,以结构化 key-value 对形式注入下次对话的 System Prompt。模型每次都能感知用户身份。
Dawn AI 的记忆不是「写了就忘」,而是有完整的生命周期管理——用进废退,模拟人类记忆的自然遗忘曲线。
importance *= 0.5^(days/30) —— 30 天衰减到 50%,60 天衰减到 25%,90 天衰减到 12.5%。下限 0.01,避免完全归零。lastAccessedAt,被检索的记忆权重重置。真正实现了「用进废退」的效果——常用的记忆保持活跃,不用的自然褪色。importance < 0.1 且 age > 180天 时,记忆被自动清除。每日 03:00 执行驱逐任务,03:30 执行衰减任务。type=reflection 的记忆永不淘汰。这类记忆代表 LLM 对用户的深层理解(importance=0.9),生成成本高、价值密度大、数量有限(由 CAS 计数器控制频率)。四层记忆各有独立的存储引擎,Redis 处理热数据,PGVector 承载长期语义记忆:
| Layer | 存储 | 技术 | TTL |
|---|---|---|---|
| L1 Working | Redis List | ai:session:{id} |
2h |
| L2 Pending | Redis List | ai:session:{id}:pending |
2h |
| L3 Episodic | PostgreSQL | pgvector HNSW, cosine, 1024d | 由驱逐管理 |
| L4 Profile | Redis Hash | ai:profile:{id} |
30d |
| 特性 | Dawn AI | mem0 | Hindsight |
|---|---|---|---|
| 记忆层数 | 4层 | 2层 | 4类 |
| 向量存储 | PGVector | Qdrant / Chroma | 专用向量DB |
| 热存储 | ✅ Redis | ❌ 无 | ❌ 无 |
| 事件驱动 | ✅ Spring Event | ⚠️ 同步API | ⚠️ 同步 |
| 重要性衰减 | ✅ 指数衰减 | ❌ 无 | ❌ 无 |
| 反思机制 | ✅ CAS计数触发 | ❌ 无 | ✅ 心智模型 |
| 知识图谱 | ❌ 无 | ✅ NLP实体链接 | ✅ 内置图谱 |
| 因果推理 | ❌ 无 | ❌ 无 | ✅ 因果链追踪 |
| 用户画像 | ✅ 独立L4 | ⚠️ 嵌入长期记忆 | ✅ 内置 |
| 技术栈 | Java / Spring | Python / FastAPI | Python |
在摘要阶段提取人物、地点、项目等实体,写入元数据。参考 mem0 新算法思路,无需图数据库即可支持实体级查询。
当前 summary 固定 0.5、LLM 降级固定 0.3,应改为基于内容的动态评分。情绪强烈的对话、包含决策的内容应获得更高权重。
在检索时考虑事件发生的时间顺序,支持「上周讨论的方案」「最近的决定」等时间敏感查询。
将单一向量检索扩展为语义 + 关键词 + 实体的多路召回,参考 mem0 的混合检索策略,提升查全率。
聚合多用户的记忆数据,识别通用行为模式和偏好趋势,用于产品层面的洞察。
当多条 summary 语义高度重叠时,自动合并以减少存储和检索噪音。需要引入相似度阈值和 LLM 合并策略。
每一个架构决策背后都有明确的权衡考量:
0.5^(days/30) 模拟人类记忆的遗忘曲线——30 天衰减到一半,60 天衰减到 25%。配合访问刷新机制,被检索的记忆权重重置,实现了「用进废退」的效果。@Async 注解足够解耦各阶段,且无需引入 Kafka/RabbitMQ 等外部依赖。对于中小规模部署,这是复杂度和可靠性的最优平衡点。// 消息到达时的完整处理流程 ON user_message: // Step 1-2: 写入 L1 + L2 L1.WorkingMemory.push(message) // Redis List, 20条窗口, 2h TTL L2.PendingQueue.push(message) // Redis List, 批量积累 // Step 3: 批次触发摘要 IF L2.PendingQueue.size() >= batch_size: PUBLISH SummarizationRequestEvent summary = LLM.summarize(L2.drain()) // ~100字压缩 // Step 4: 向量持久化 L3.EpisodicMemory.store( embedding = embed(summary), // 1024d, HNSW type = "summary", importance = 0.5 ) // Step 5-6: CAS 计数 → 反思 counter = CAS_increment(reflection_counter) IF counter >= reflection_threshold: profile = ReflectionWorker.reflect(recent_summaries) L3.store(profile, type="reflection", importance=0.9) // Step 7: 画像注入 L4.UserProfile.update(profile) // Redis Hash, 30d TTL // → 注入下次 System Prompt // 定时任务 CRON "0 0 3 * * *": evict(importance < 0.1 && age > 180d) CRON "0 30 3 * * *": decay(importance *= 0.5 ^ (days/30)) ON retrieval_hit: refresh(lastAccessedAt) // 用进废退