很好 👍我们直接给你一个 完整可运行的 Middleware 设计,专门适配你现在的:
目标:
✅ 不修改 Engine✅ 不修改 nodeRuntime 结构✅ 新能力通过注册 Middleware 注入✅ 支持 tracing / metrics / logging / memory / retry 等横向能力
我们把 Emit 变成一个“洋葱模型”:
Emit
Node.Emit() ↓ [ Middleware N ] ↓ [ Middleware N-1 ] ↓ [ Middleware 1 ] ↓ Engine.dispatch()
和 Web 框架(Gin / Echo)一样。
type EmitFunc func(Event) type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc
解释:
next
type nodeRuntime struct { nodeID string input <-chan Event emit EmitFunc middlewares []EmitMiddleware }
核心代码:
func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc { wrapped := final // 倒序构建洋葱模型 for i := len(r.middlewares) - 1; i >= 0; i-- { mw := r.middlewares[i] wrapped = mw(r.nodeID, wrapped) } return wrapped }
func (r *nodeRuntime) Emit(ev Event) { chain := r.buildEmitChain(r.emit) chain(ev) }
这样:
✔ Emit 逻辑永远不改✔ 新能力只通过 middleware 加入
Engine 持有全局 middlewares:
type Engine struct { middlewares []EmitMiddleware }
创建 runtime 时注入:
func (e *Engine) newRuntime(id string) *nodeRuntime { return &nodeRuntime{ nodeID: id, input: e.nodeInput[id], emit: e.dispatch, middlewares: e.middlewares, } }
注册方法:
func (e *Engine) Use(mw EmitMiddleware) { e.middlewares = append(e.middlewares, mw) }
func TraceMiddleware(tracer TraceCollector) EmitMiddleware { return func(nodeID string, next EmitFunc) EmitFunc { return func(ev Event) { tracer.Record(nodeID, ev.Type) next(ev) } } }
func MetricsMiddleware(metrics Metrics) EmitMiddleware { return func(nodeID string, next EmitFunc) EmitFunc { return func(ev Event) { metrics.Count("event_emit_total") next(ev) } } }
func LoggingMiddleware(logger Logger) EmitMiddleware { return func(nodeID string, next EmitFunc) EmitFunc { return func(ev Event) { logger.Infof("node=%s emit type=%s", nodeID, ev.Type) next(ev) } } }
func RecoverMiddleware(logger Logger) EmitMiddleware { return func(nodeID string, next EmitFunc) EmitFunc { return func(ev Event) { defer func() { if r := recover(); r != nil { logger.Errorf("panic in node=%s: %v", nodeID, r) } }() next(ev) } } }
func MemoryMiddleware(memory MemoryStore) EmitMiddleware { return func(nodeID string, next EmitFunc) EmitFunc { return func(ev Event) { memory.Append(nodeID, ev) next(ev) } } }
engine := &Engine{} engine.Use(RecoverMiddleware(logger)) engine.Use(TraceMiddleware(traceCollector)) engine.Use(MetricsMiddleware(metrics)) engine.Use(LoggingMiddleware(logger)) engine.Use(MemoryMiddleware(memory))
以后新增能力:
engine.Use(NewYourMiddleware(...))
不需要:
现在的实现每次 Emit 都 build chain,不太优雅。
优化方式:
在 runtime 创建时就构建一次:
func (e *Engine) newRuntime(id string) *nodeRuntime { r := &nodeRuntime{ nodeID: id, input: e.nodeInput[id], emit: e.dispatch, } r.emit = r.buildEmitChain(r.emit) return r }
然后 Emit 直接:
func (r *nodeRuntime) Emit(ev Event) { r.emit(ev) }
✔ 零额外开销✔ 高并发安全
如果你想更高级一点:
type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc
内部可以:
return func(ev Event) { // before next(ev) // after }
例如:
你现在的 DAG Engine 会变成:
Node ↓ nodeRuntime.Emit ↓ [ Recover ] ↓ [ Trace ] ↓ [ Metrics ] ↓ [ Logging ] ↓ Engine.dispatch ↓ Next Node
这是:
横向能力解耦的工业级做法
旧方式:
Emit() { trace() metrics() log() dispatch() }
问题:
❌ 每新增能力必须修改 Emit❌ 不符合开闭原则
新方式:
Emit() -> middleware chain
✔ 开闭原则✔ 插件化✔ 易测试✔ 易扩展
Middleware = 把“横向能力”从结构体字段中抽离成函数链
你这个 LLM DAG Engine 非常适合这个模型。
如果你愿意,我可以下一步给你:
🔥 一个完整的 DAG + Middleware + 并发安全 + Backpressure 示例框架🔥 或者给你做一个「LLM 流式输出专用 Middleware」设计
你现在已经在往一个真正的 Workflow 内核进化了 🚀
很好,这个问题问得非常专业 👍你其实在比较两种 middleware 链构建方式:
func (r *nodeRuntime) Emit(ev Event) { final := r.emit for i := len(r.middlewares) - 1; i >= 0; i-- { next := final mw := r.middlewares[i] final = func(e Event) { mw(e, next) } } final(ev) }
func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc { wrapped := final for i := len(r.middlewares) - 1; i >= 0; i-- { next := wrapped mw := r.middlewares[i] wrapped = func(ev Event) { mw(ev, next) } } return wrapped }
逻辑完全一样:
mw1(mw2(mw3(dispatch)))
只是:
这是核心差异 👇
Emit() { 每次都构建整条链 }
意味着:
每次 Emit 都会
高频事件场景会产生大量临时函数
增加 GC 压力
在你的 DAG + LLM 流式场景中:
token stream = 高频 Emit
这会产生明显开销。
你可以在 runtime 创建时构建一次:
func (e *Engine) newRuntime(id string) *nodeRuntime { r := &nodeRuntime{ nodeID: id, input: e.nodeInput[id], middlewares: e.middlewares, } r.emit = r.buildEmitChain(e.dispatch) return r }
然后:
✔ 链只构建一次✔ Emit 是 O(1)✔ 没有额外闭包分配✔ 更适合高并发
两者在并发上都安全(前提 middlewares 不修改)。
但:
在高并发 DAG Engine 里:
必须使用写法 B,并在 runtime 创建时构建一次。
这是所有 Web 框架的做法:
都是在 route 注册阶段构建链。
我给你一个更工业级版本:
type EmitFunc func(Event) type EmitMiddleware func(Event, EmitFunc)
func buildEmitChain(final EmitFunc, mws []EmitMiddleware) EmitFunc { wrapped := final for i := len(mws) - 1; i >= 0; i-- { mw := mws[i] next := wrapped wrapped = func(ev Event) { mw(ev, next) } } return wrapped }
func (e *Engine) newRuntime(id string) *nodeRuntime { r := &nodeRuntime{ nodeID: id, input: e.nodeInput[id], } r.emit = buildEmitChain(e.dispatch, e.middlewares) return r }
你有:
必须:
构建一次,终身使用
否则 token stream 会疯狂创建闭包。
你的第二种写法(buildEmitChain)是正确方向,但必须在 runtime 初始化阶段构建一次,而不是每次 Emit 时构建。
🔥 支持动态增删 Middleware 的线程安全版本🔥 支持 Before / After / Error 三阶段 Middleware🔥 或者给你做一个完整的高性能 DAG Engine 内核骨架
你已经在写一个真正的 workflow runtime 了 🚀
这个问题非常关键 👍你现在比较的是两种 Middleware 组合方式:
func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc { wrapped := final for i := len(r.middlewares) - 1; i >= 0; i-- { mw := r.middlewares[i] wrapped = mw(r.nodeID, wrapped) } return wrapped }
前提是你的 Middleware 类型是:
前提是 Middleware 类型是:
type EmitMiddleware func(Event, EmitFunc)
这两种 不是完全等价的。
Middleware 自己返回一个新的函数。
比如:
组合方式:
wrapped = mw(nodeID, wrapped)
真正的函数式中间件模型
✔ 更清晰✔ 每个 middleware 自己决定如何包裹✔ 可扩展性更强(可以不调用 next)✔ 更接近 Gin / Echo 内部实现✔ 更容易支持 error / before / after
Middleware 只是一个执行函数:
然后你强行包一层:
wrapped = func(ev Event) { mw(ev, next) }
本质上:
你把控制权放在 buildEmitChain,而不是 Middleware 本身
两者性能几乎一样。
但第一种:
假设你想做一个:
第一种可以优雅实现:
return func(ev Event) { start := time.Now() next(ev) duration := time.Since(start) }
第二种也能做,但结构变得混乱。
在 DAG Engine / LLM Workflow 这种高扩展系统里:
第一种(函数式组合)明显更好
构建:
func buildEmitChain( nodeID string, final EmitFunc, mws []EmitMiddleware, ) EmitFunc { wrapped := final for i := len(mws) - 1; i >= 0; i-- { wrapped = mws[i](nodeID, wrapped) } return wrapped }
初始化阶段构建一次:
r.emit = buildEmitChain(r.nodeID, e.dispatch, e.middlewares)
Emit:
第一种模型:
Middleware 决定如何包裹 next
第二种模型:
Engine 决定如何包裹 Middleware
控制权应该在 Middleware,而不是 Engine。
第一种写法更函数式、更优雅、更可扩展,推荐在高并发 DAG Engine 中使用。
如果你愿意,我可以下一步给你一个:
🔥 支持 Before / After / Error 三阶段的工业级 Middleware 框架🔥 或者一个完整可运行的 LLM DAG 内核模板
你现在已经进入 runtime 设计的核心区域了 🚀
Github开源生信云平台 DEMO