context_merge 能否进一步优化 使用策略是否可行

最后发布时间 : 2026-03-20 15:14:49 浏览量 :

可以,这个方向是对的
而且我会直接说:context_merge 用“聚合启动策略 / 完成策略”来优化,比在节点内部手写 map[turnID]*mergeState 更接近生产级,前提是你的 DAG 引擎策略系统支持得足够强。

但这里有个关键区别:

你贴的这个 DBNode.StartPolicy() 适合的是 “等若干事件都到齐再启动一次节点”
context_merge 的需求其实更复杂一点,它不是单纯的固定事件聚合,而是:

  • route_ready 必到
  • rag_ready 条件性必到(只有 UseRAG=true 才需要)
  • 后面可能还有 tool_result_context 条件性到达
  • 并且这些事件必须按 同一 turn/session 聚合
  • 有些场景要支持多轮 tool loop 增量合并
  • 有些场景要支持超时 / 降级发射

所以答案分两层:


一、先给结论

1)能不能用策略?

能。

2)是否推荐?

推荐,但不要只靠静态 HasEvent("xxx") 这种固定规则。
context_merge 更适合的是:

  • “按 key 分桶聚合”
  • “条件式启动策略”
  • “启动后合并并清理状态”
  • 必要时支持 超时释放

也就是说,单纯这种:

dag.All(
    dag.HasEvent("route_ready"),
    dag.HasEvent("rag_ready"),
)

只适合“所有 turn 都必须有 rag”的情况。
你的场景里 RAG 是可选的,所以不能只靠死板静态策略。


二、你这个 DBNode 的策略是否合理?

你现在这个:

func (n *DBNode) StartPolicy() dag.NodeStartPolicy {
	return dag.AggregateStartPolicy{
		Required: dag.All(
			dag.Any(dag.HasEvent("text"), dag.HasEvent("asr_text")),
			dag.HasEvent("llm_chunk"),
		),
	}
}

它表达的是:

  • 必须先有一条用户输入:textasr_text
  • 同时要至少有一条 llm_chunk
  • 满足后 DBNode 才启动

这个语义上是合理的,但要看你的 DBNode 目标是什么。


如果 DBNode 的作用是“写入一轮完整问答记录”

那这个策略是合理的,因为它表示:

  • 没有用户输入,不写
  • 没有模型输出,不写

但它有一个潜在问题:

问题 1:首包 llm_chunk 到来之前,用户消息可能迟迟不落库

如果你想做“用户一发消息就立即入库”,这个策略会让 text/asr_text 被延迟处理,直到 llm_chunk 到达。

这意味着:

  • 用户发消息后,session 记录可能还没建好
  • 如果 LLM 出错、超时、被取消,这条用户输入可能永远不被 DBNode 处理

所以生产里通常更推荐把 DB 写拆成两类:

A. ConversationWriteNode / MessagePersistNode

  • text / asr_text 到来立刻写 user message

B. AnswerPersistNode / ChunkAggregatorNode

  • llm_chunk 流式聚合
  • llm_done 时写 assistant answer

而不是一个 DBNode 同时等两边。


三、把这个思路放到 context_merge 上,是否成立?

成立,但要升级。

你可以把 context_merge 看成一种“按 turnID 聚合事件的节点”。
如果引擎策略足够强,它确实可以写成策略驱动,而不是节点内部自己维护很多 if else。

但问题在于:

context_merge 和 DBNode 的本质差异

DBNode 是“固定输入模式”

例如:

  • text/asr_text
  • llm_chunk

基本固定。

context_merge 是“条件依赖模式”

例如:

  • 永远要 route_ready
  • UseRAG=true,还要 rag_ready
  • 若 tool loop 正在继续,还要等 tool_result_context
  • 若未来加 long memory recall,可能又要 memory_ready

这意味着 context_merge 不是简单的静态布尔表达式,而是:

Required events depend on already arrived data

也就是:

  • 先看到 route_ready
  • 再根据 route_ready.Data.Route.UseRAG 决定还要不要等 rag_ready

这就是动态依赖


四、所以真正推荐的是:策略 + 状态,而不是纯策略

最佳做法不是二选一,而是:

  • 用策略系统做第一层触发控制
  • context_merge 节点内部做第二层动态判断

也就是:

第一层:引擎负责按 key 聚合事件

例如同一个 turnID 的事件都送进同一个节点实例/桶里。

第二层:节点根据当前聚合状态决定是否 emit

例如:

  • 收到 route_ready
  • 发现 UseRAG=false
  • 直接 emit
  • UseRAG=true,继续等 rag_ready

这比“纯手写状态机”更干净,也比“纯静态策略”更灵活。


五、如果你的策略系统支持“按事件内容判断”,那就非常适合 context_merge

你现在展示的策略是:

dag.HasEvent("text")
dag.Any(...)
dag.All(...)

这还是事件名级别

如果你能扩展成类似下面这样,那 context_merge 就很舒服了:

dag.All(
    dag.HasEvent("route_ready"),
    dag.If(
        dag.EventFieldBool("route_ready", "Route.UseRAG"),
        dag.HasEvent("rag_ready"),
        dag.True(),
    ),
)

或者:

dag.When("route_ready", func(ev *dag.Event) bool {
    tc := ev.Data.(*dagtypes.TurnContext)
    return tc.Route != nil && tc.Route.UseRAG
}).Require("rag_ready")

这种就很适合生产。

如果你的引擎现在还不支持,我建议你加。


六、生产环境里 context_merge 最推荐的三种实现方式


方案 A:纯节点内 mergeState

就是我上一条给你的那个版本。

优点

  • 最直接
  • 不依赖引擎复杂特性
  • 好落地

缺点

  • 聚合逻辑分散在业务节点里
  • 状态生命周期不好统一治理
  • 做通用化比较累

适合先跑起来。


方案 B:引擎级 AggregateStartPolicy + 节点只做 merge

也就是你现在问的这个方向。

例如:

func (n *ContextMergeNode) StartPolicy() dag.NodeStartPolicy {
    return dag.AggregateStartPolicy{
        Required: dag.HasEvent("route_ready"),
    }
}

节点启动后读取当前 bucket 中的所有事件,自己判断:

  • 如果 UseRAG=false,直接 emit
  • 如果 UseRAG=truerag_ready 已存在,emit
  • 否则继续等

优点

  • 启动更可控
  • 聚合桶可以由引擎统一管理

缺点

  • 如果策略表达力不够,还是要节点内部判断

这是我最推荐的折中方案。


方案 C:引擎级通用 AggregatorNode

context_merge 做成引擎内置的通用节点:

dag.NewAggregatorNode(AggregatorConfig{
    KeyFunc: ExtractTurnID,
    MergeFunc: MergeTurnContext,
    ReadyFunc: func(state any) bool { ... },
    EmitFunc: func(state any) *dag.Event { ... },
})

优点

  • 最生产级
  • 所有“等待多个上游、按 key 聚合”的场景都能复用
  • DB 聚合、context 聚合、tool 聚合都能统一

缺点

  • 需要你重构引擎
  • 前期成本更高

如果你这个 DAG 框架是长期要做成通用库,我建议最终走这个。


七、你的 DBNode StartPolicy 在 context_merge 上怎么类比?

你这个 DB 策略可以类比成:

func (n *ContextMergeNode) StartPolicy() dag.NodeStartPolicy {
	return dag.AggregateStartPolicy{
		Required: dag.HasEvent("route_ready"),
	}
}

为什么不是:

dag.All(dag.HasEvent("route_ready"), dag.HasEvent("rag_ready"))

因为 rag_ready 不是总是必需。
所以最合理的是:

  • 先用 route_ready 启动
  • 节点读聚合桶中的现有事件
  • 再决定是否 emit / 继续等

也就是说:

context_merge 的策略适合“最小启动条件”,不适合“全部完成条件”写死。


八、推荐的 context_merge 优化版本

我给你一个更生产一点的思路。


1)引擎支持按 key 聚合

例如:

type AggregateKeyFunc func(ev *Event) string

context_merge

func TurnIDKey(ev *dag.Event) string {
    tc, ok := ev.Data.(*dagtypes.TurnContext)
    if !ok || tc == nil {
        return ""
    }
    return tc.TurnID
}

这样:

  • route_ready(turn1)
  • rag_ready(turn1)

会进同一个聚合桶。


2)ContextMergeNode 的 StartPolicy

func (n *ContextMergeNode) StartPolicy() dag.NodeStartPolicy {
	return dag.AggregateStartPolicy{
		Required: dag.HasEvent("route_ready"),
	}
}

意思是:

  • 至少拿到 route,才值得开始判断

3)ContextMergeNode 的 Ready 判断逻辑

这个放节点里,或者更进一步抽成策略函数:

func (n *ContextMergeNode) IsReady(events []*dag.Event) bool {
	var hasRoute bool
	var hasRAG bool
	var tc *dagtypes.TurnContext

	for _, ev := range events {
		switch ev.Type {
		case "route_ready":
			hasRoute = true
			t, _ := ev.Data.(*dagtypes.TurnContext)
			if t != nil {
				tc = t
			}
		case "rag_ready":
			hasRAG = true
		}
	}

	if !hasRoute || tc == nil || tc.Route == nil {
		return false
	}
	if tc.Route.UseRAG {
		return hasRAG
	}
	return true
}

这样就已经是“动态依赖”了。


九、ClosePolicy 能不能也用于 context_merge?

可以,但要小心。

你贴的 ClosePolicy 注释掉了:

func (n *DBNode) ClosePolicy() dag.NodeClosePolicy {
	return dag.AggregateClosePolicy{
		Required: dag.All(
			dag.Any(dag.HasEvent("llm_chunk")),
		),
	}
}

context_merge 来说,ClosePolicy 更适合控制:

  • emit 后是否清理这个 turn 的聚合桶
  • 是否等超时后关闭
  • tool loop 回流后是否继续保留

但它不太适合表达“是否可以 emit”,因为 emit 是业务逻辑,而 close 是资源生命周期逻辑。

建议

context_merge

  • StartPolicy: 最小启动条件
  • Ready/EmitPolicy: 是否达到可输出条件
  • ClosePolicy: emit 后清理 or timeout 清理

如果你的引擎现在只有 Start / Close,没有 Ready 概念,建议补一个:

type NodeEmitPolicy interface {
    CanEmit(events []*Event) bool
}

这个很有用。


十、对 DBNode,我反而建议不要这样聚合

说句实话,DBNode 用你这个策略虽然可行,但在生产上我更建议拆掉:


更推荐的 DB 设计

1. UserMessageDBNode

收到:

  • text
  • asr_text

立刻写 user message

2. AssistantMessageDBNode

收到:

  • llm_chunk
  • llm_done

聚合 chunk,llm_done 时落 assistant 完整消息

3. SessionDBNode

收到:

  • create_session

写 session 元数据


这样比一个大而全的 DBNode 更稳。

因为现在你那个 DBNode 一旦聚合条件不满足,可能会导致:

  • user message 不落库
  • partial response 丢失
  • 错误恢复困难

十一、我对 context_merge 的最终建议

推荐程度排序:

第一名:通用 Aggregator 方案

引擎支持:

  • AggregateKeyFunc
  • AggregateStartPolicy
  • AggregateReadyPolicy
  • AggregateClosePolicy

然后 context_merge 只是配置,不自己写太多状态机。

第二名:StartPolicy + 节点内动态判断

这是你现在最容易改的版本,性价比最高。

第三名:纯节点内部 map[turnID]*mergeState

能用,但长期看不够优雅。


十二、给你一个我认为最适合你的落地方案

你现在不用一步到位搞太重,建议这样改:


引擎层新增三个能力

1. 聚合 key

type AggregateKeyFunc func(ev *Event) string

2. ReadyPolicy

type NodeReadyPolicy interface {
	IsReady(events []*Event) bool
}

3. TTL / timeout 清理

避免某个 turn 永远等不到 rag_ready 卡住。


context_merge 的配置

StartPolicy

dag.AggregateStartPolicy{
    Required: dag.HasEvent("route_ready"),
}

ReadyPolicy

伪代码:

if !has(route_ready) return false
if route.UseRAG && !has(rag_ready) return false
return true

ClosePolicy

emit后立即清理

超时清理

十三、最简答案

你问的是:

context_merge 能否进一步优化,使用策略是否可行?

答案是:

可行,而且值得做。
context_merge 不能只用你现在这种静态 All/Any/HasEvent 策略,因为它有动态依赖
最好的做法是:

  • AggregateStartPolicy 做最小启动条件
  • 增加 ReadyPolicy 做条件式聚合完成判断
  • TurnID 聚合事件
  • emit 后清理 state
  • 加 timeout 防止卡死

如果你愿意,我下一条可以直接给你:

  1. AggregateStartPolicy + ReadyPolicy + AggregateKeyFunc 的完整 Go 接口设计
  2. context_merge 改造成“纯策略驱动聚合节点”的完整代码
  3. 顺手把你的 DBNode 也一起改成更合理的 UserMessageDBNode + AssistantMessageDBNode 版本。