Github开源生信云平台 DEMO
llm_chunk
ConversationWriteNode
MessagePersistNode
AnswerPersistNode
ChunkAggregatorNode
context_merge
map[turnID]*mergeState
可以,这个方向是对的。而且我会直接说:context_merge 用“聚合启动策略 / 完成策略”来优化,比在节点内部手写 map[turnID]*mergeState 更接近生产级,前提是你的 DAG 引擎策略系统支持得足够强。
但这里有个关键区别:
你贴的这个 DBNode.StartPolicy() 适合的是 “等若干事件都到齐再启动一次节点”。而 context_merge 的需求其实更复杂一点,它不是单纯的固定事件聚合,而是:
DBNode.StartPolicy()
route_ready
rag_ready
UseRAG=true
tool_result_context
所以答案分两层:
能。
推荐,但不要只靠静态 HasEvent("xxx") 这种固定规则。context_merge 更适合的是:
HasEvent("xxx")
也就是说,单纯这种:
dag.All( dag.HasEvent("route_ready"), dag.HasEvent("rag_ready"), )
只适合“所有 turn 都必须有 rag”的情况。你的场景里 RAG 是可选的,所以不能只靠死板静态策略。
你现在这个:
func (n *DBNode) StartPolicy() dag.NodeStartPolicy { return dag.AggregateStartPolicy{ Required: dag.All( dag.Any(dag.HasEvent("text"), dag.HasEvent("asr_text")), dag.HasEvent("llm_chunk"), ), } }
它表达的是:
text
asr_text
这个语义上是合理的,但要看你的 DBNode 目标是什么。
那这个策略是合理的,因为它表示:
但它有一个潜在问题:
如果你想做“用户一发消息就立即入库”,这个策略会让 text/asr_text 被延迟处理,直到 llm_chunk 到达。
text/asr_text
这意味着:
所以生产里通常更推荐把 DB 写拆成两类:
llm_done
而不是一个 DBNode 同时等两边。
成立,但要升级。
你可以把 context_merge 看成一种“按 turnID 聚合事件的节点”。如果引擎策略足够强,它确实可以写成策略驱动,而不是节点内部自己维护很多 if else。
但问题在于:
例如:
基本固定。
memory_ready
这意味着 context_merge 不是简单的静态布尔表达式,而是:
Required events depend on already arrived data
也就是:
route_ready.Data.Route.UseRAG
这就是动态依赖。
最佳做法不是二选一,而是:
例如同一个 turnID 的事件都送进同一个节点实例/桶里。
turnID
UseRAG=false
这比“纯手写状态机”更干净,也比“纯静态策略”更灵活。
你现在展示的策略是:
dag.HasEvent("text") dag.Any(...) dag.All(...)
这还是事件名级别。
如果你能扩展成类似下面这样,那 context_merge 就很舒服了:
dag.All( dag.HasEvent("route_ready"), dag.If( dag.EventFieldBool("route_ready", "Route.UseRAG"), dag.HasEvent("rag_ready"), dag.True(), ), )
或者:
dag.When("route_ready", func(ev *dag.Event) bool { tc := ev.Data.(*dagtypes.TurnContext) return tc.Route != nil && tc.Route.UseRAG }).Require("rag_ready")
这种就很适合生产。
如果你的引擎现在还不支持,我建议你加。
就是我上一条给你的那个版本。
适合先跑起来。
也就是你现在问的这个方向。
func (n *ContextMergeNode) StartPolicy() dag.NodeStartPolicy { return dag.AggregateStartPolicy{ Required: dag.HasEvent("route_ready"), } }
节点启动后读取当前 bucket 中的所有事件,自己判断:
这是我最推荐的折中方案。
把 context_merge 做成引擎内置的通用节点:
dag.NewAggregatorNode(AggregatorConfig{ KeyFunc: ExtractTurnID, MergeFunc: MergeTurnContext, ReadyFunc: func(state any) bool { ... }, EmitFunc: func(state any) *dag.Event { ... }, })
如果你这个 DAG 框架是长期要做成通用库,我建议最终走这个。
你这个 DB 策略可以类比成:
为什么不是:
dag.All(dag.HasEvent("route_ready"), dag.HasEvent("rag_ready"))
因为 rag_ready 不是总是必需。所以最合理的是:
也就是说:
context_merge 的策略适合“最小启动条件”,不适合“全部完成条件”写死。
我给你一个更生产一点的思路。
type AggregateKeyFunc func(ev *Event) string
对 context_merge:
func TurnIDKey(ev *dag.Event) string { tc, ok := ev.Data.(*dagtypes.TurnContext) if !ok || tc == nil { return "" } return tc.TurnID }
这样:
route_ready(turn1)
rag_ready(turn1)
会进同一个聚合桶。
意思是:
这个放节点里,或者更进一步抽成策略函数:
func (n *ContextMergeNode) IsReady(events []*dag.Event) bool { var hasRoute bool var hasRAG bool var tc *dagtypes.TurnContext for _, ev := range events { switch ev.Type { case "route_ready": hasRoute = true t, _ := ev.Data.(*dagtypes.TurnContext) if t != nil { tc = t } case "rag_ready": hasRAG = true } } if !hasRoute || tc == nil || tc.Route == nil { return false } if tc.Route.UseRAG { return hasRAG } return true }
这样就已经是“动态依赖”了。
可以,但要小心。
你贴的 ClosePolicy 注释掉了:
func (n *DBNode) ClosePolicy() dag.NodeClosePolicy { return dag.AggregateClosePolicy{ Required: dag.All( dag.Any(dag.HasEvent("llm_chunk")), ), } }
对 context_merge 来说,ClosePolicy 更适合控制:
但它不太适合表达“是否可以 emit”,因为 emit 是业务逻辑,而 close 是资源生命周期逻辑。
StartPolicy
Ready/EmitPolicy
ClosePolicy
如果你的引擎现在只有 Start / Close,没有 Ready 概念,建议补一个:
type NodeEmitPolicy interface { CanEmit(events []*Event) bool }
这个很有用。
说句实话,DBNode 用你这个策略虽然可行,但在生产上我更建议拆掉:
DBNode
收到:
立刻写 user message
聚合 chunk,llm_done 时落 assistant 完整消息
create_session
写 session 元数据
这样比一个大而全的 DBNode 更稳。
因为现在你那个 DBNode 一旦聚合条件不满足,可能会导致:
引擎支持:
AggregateKeyFunc
AggregateStartPolicy
AggregateReadyPolicy
AggregateClosePolicy
然后 context_merge 只是配置,不自己写太多状态机。
这是你现在最容易改的版本,性价比最高。
能用,但长期看不够优雅。
你现在不用一步到位搞太重,建议这样改:
type NodeReadyPolicy interface { IsReady(events []*Event) bool }
避免某个 turn 永远等不到 rag_ready 卡住。
dag.AggregateStartPolicy{ Required: dag.HasEvent("route_ready"), }
伪代码:
if !has(route_ready) return false if route.UseRAG && !has(rag_ready) return false return true
emit后立即清理
或
超时清理
你问的是:
context_merge 能否进一步优化,使用策略是否可行?
答案是:
可行,而且值得做。但 context_merge 不能只用你现在这种静态 All/Any/HasEvent 策略,因为它有动态依赖。最好的做法是:
All/Any/HasEvent
ReadyPolicy
TurnID
如果你愿意,我下一条可以直接给你:
AggregateStartPolicy + ReadyPolicy + AggregateKeyFunc
UserMessageDBNode + AssistantMessageDBNode