很好 👍
我们直接给你一个 完整可运行的 Middleware 设计,专门适配你现在的:

目标:

✅ 不修改 Engine
✅ 不修改 nodeRuntime 结构
✅ 新能力通过注册 Middleware 注入
✅ 支持 tracing / metrics / logging / memory / retry 等横向能力


一、核心思想

我们把 Emit 变成一个“洋葱模型”:

Node.Emit()
   ↓
[ Middleware N ]
   ↓
[ Middleware N-1 ]
   ↓
[ Middleware 1 ]
   ↓
Engine.dispatch()

和 Web 框架(Gin / Echo)一样。


二、定义 Middleware 类型

type EmitFunc func(Event)

type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc

解释:


三、nodeRuntime 结构

type nodeRuntime struct {
    nodeID string
    input  <-chan Event
    emit   EmitFunc

    middlewares []EmitMiddleware
}

四、构建 Middleware 链

核心代码:

func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc {
    wrapped := final

    // 倒序构建洋葱模型
    for i := len(r.middlewares) - 1; i >= 0; i-- {
        mw := r.middlewares[i]
        wrapped = mw(r.nodeID, wrapped)
    }

    return wrapped
}

五、Emit 方法

func (r *nodeRuntime) Emit(ev Event) {
    chain := r.buildEmitChain(r.emit)
    chain(ev)
}

这样:

✔ Emit 逻辑永远不改
✔ 新能力只通过 middleware 加入


六、Engine 注册 Middleware

Engine 持有全局 middlewares:

type Engine struct {
    middlewares []EmitMiddleware
}

创建 runtime 时注入:

func (e *Engine) newRuntime(id string) *nodeRuntime {
    return &nodeRuntime{
        nodeID:      id,
        input:       e.nodeInput[id],
        emit:        e.dispatch,
        middlewares: e.middlewares,
    }
}

注册方法:

func (e *Engine) Use(mw EmitMiddleware) {
    e.middlewares = append(e.middlewares, mw)
}

七、具体 Middleware 示例


1️⃣ Trace Middleware

func TraceMiddleware(tracer TraceCollector) EmitMiddleware {
    return func(nodeID string, next EmitFunc) EmitFunc {
        return func(ev Event) {
            tracer.Record(nodeID, ev.Type)
            next(ev)
        }
    }
}

2️⃣ Metrics Middleware

func MetricsMiddleware(metrics Metrics) EmitMiddleware {
    return func(nodeID string, next EmitFunc) EmitFunc {
        return func(ev Event) {
            metrics.Count("event_emit_total")
            next(ev)
        }
    }
}

3️⃣ Logging Middleware

func LoggingMiddleware(logger Logger) EmitMiddleware {
    return func(nodeID string, next EmitFunc) EmitFunc {
        return func(ev Event) {
            logger.Infof("node=%s emit type=%s", nodeID, ev.Type)
            next(ev)
        }
    }
}

4️⃣ Panic Recovery Middleware(强烈推荐)

func RecoverMiddleware(logger Logger) EmitMiddleware {
    return func(nodeID string, next EmitFunc) EmitFunc {
        return func(ev Event) {
            defer func() {
                if r := recover(); r != nil {
                    logger.Errorf("panic in node=%s: %v", nodeID, r)
                }
            }()
            next(ev)
        }
    }
}

5️⃣ Memory 事件增强 Middleware

func MemoryMiddleware(memory MemoryStore) EmitMiddleware {
    return func(nodeID string, next EmitFunc) EmitFunc {
        return func(ev Event) {
            memory.Append(nodeID, ev)
            next(ev)
        }
    }
}

八、Engine 初始化示例

engine := &Engine{}

engine.Use(RecoverMiddleware(logger))
engine.Use(TraceMiddleware(traceCollector))
engine.Use(MetricsMiddleware(metrics))
engine.Use(LoggingMiddleware(logger))
engine.Use(MemoryMiddleware(memory))

以后新增能力:

engine.Use(NewYourMiddleware(...))

不需要:


九、性能优化(避免每次 Emit 重建链)

现在的实现每次 Emit 都 build chain,不太优雅。

优化方式:

在 runtime 创建时就构建一次:

func (e *Engine) newRuntime(id string) *nodeRuntime {
    r := &nodeRuntime{
        nodeID: id,
        input:  e.nodeInput[id],
        emit:   e.dispatch,
    }

    r.emit = r.buildEmitChain(r.emit)

    return r
}

然后 Emit 直接:

func (r *nodeRuntime) Emit(ev Event) {
    r.emit(ev)
}

✔ 零额外开销
✔ 高并发安全


十、进阶:支持双向 Middleware(Before / After)

如果你想更高级一点:

type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc

内部可以:

return func(ev Event) {
    // before
    next(ev)
    // after
}

例如:


十一、最终架构效果

你现在的 DAG Engine 会变成:

Node
  ↓
nodeRuntime.Emit
  ↓
[ Recover ]
  ↓
[ Trace ]
  ↓
[ Metrics ]
  ↓
[ Logging ]
  ↓
Engine.dispatch
  ↓
Next Node

这是:

横向能力解耦的工业级做法


十二、和你现在架构对比

旧方式:

Emit() {
    trace()
    metrics()
    log()
    dispatch()
}

问题:

❌ 每新增能力必须修改 Emit
❌ 不符合开闭原则


新方式:

Emit() -> middleware chain

✔ 开闭原则
✔ 插件化
✔ 易测试
✔ 易扩展


十三、一句话总结

Middleware = 把“横向能力”从结构体字段中抽离成函数链

你这个 LLM DAG Engine 非常适合这个模型。


如果你愿意,我可以下一步给你:

🔥 一个完整的 DAG + Middleware + 并发安全 + Backpressure 示例框架
🔥 或者给你做一个「LLM 流式输出专用 Middleware」设计

你现在已经在往一个真正的 Workflow 内核进化了 🚀

很好,这个问题问得非常专业 👍
你其实在比较两种 middleware 链构建方式


一、两种写法对比

✅ 写法 A(每次 Emit 时构建)

func (r *nodeRuntime) Emit(ev Event) {
    final := r.emit

    for i := len(r.middlewares) - 1; i >= 0; i-- {
        next := final
        mw := r.middlewares[i]

        final = func(e Event) {
            mw(e, next)
        }
    }

    final(ev)
}

✅ 写法 B(抽成 buildEmitChain)

func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc {
    wrapped := final

    for i := len(r.middlewares) - 1; i >= 0; i-- {
        next := wrapped
        mw := r.middlewares[i]
        wrapped = func(ev Event) {
            mw(ev, next)
        }
    }

    return wrapped
}

二、功能上:两者等价 ✅

逻辑完全一样:

mw1(mw2(mw3(dispatch)))

只是:


三、关键区别:性能 & 架构

这是核心差异 👇


🚨 写法 A 的问题

Emit() {
   每次都构建整条链
}

意味着:

在你的 DAG + LLM 流式场景中:

token stream = 高频 Emit

这会产生明显开销。


✅ 写法 B 的优势

你可以在 runtime 创建时构建一次:

func (e *Engine) newRuntime(id string) *nodeRuntime {
    r := &nodeRuntime{
        nodeID:      id,
        input:       e.nodeInput[id],
        middlewares: e.middlewares,
    }

    r.emit = r.buildEmitChain(e.dispatch)

    return r
}

然后:

func (r *nodeRuntime) Emit(ev Event) {
    r.emit(ev)
}

这样:

✔ 链只构建一次
✔ Emit 是 O(1)
✔ 没有额外闭包分配
✔ 更适合高并发


四、并发安全性

两者在并发上都安全(前提 middlewares 不修改)。

但:


五、工业级建议 ⭐⭐⭐⭐⭐

在高并发 DAG Engine 里:

必须使用写法 B,并在 runtime 创建时构建一次。

这是所有 Web 框架的做法:

都是在 route 注册阶段构建链。


六、终极优化版本(推荐)

我给你一个更工业级版本:


定义类型

type EmitFunc func(Event)
type EmitMiddleware func(Event, EmitFunc)

构建链

func buildEmitChain(final EmitFunc, mws []EmitMiddleware) EmitFunc {
    wrapped := final

    for i := len(mws) - 1; i >= 0; i-- {
        mw := mws[i]
        next := wrapped

        wrapped = func(ev Event) {
            mw(ev, next)
        }
    }

    return wrapped
}

runtime 初始化

func (e *Engine) newRuntime(id string) *nodeRuntime {
    r := &nodeRuntime{
        nodeID: id,
        input:  e.nodeInput[id],
    }

    r.emit = buildEmitChain(e.dispatch, e.middlewares)

    return r
}

Emit

func (r *nodeRuntime) Emit(ev Event) {
    r.emit(ev)
}

七、性能对比总结

项目写法 A写法 B
可读性
性能❌ 每次构建✅ 构建一次
GC 压力
并发适配一般优秀
工业实践不推荐推荐

八、在你的 LLM DAG 场景下的结论

你有:

必须:

构建一次,终身使用

否则 token stream 会疯狂创建闭包。


九、最终一句话

你的第二种写法(buildEmitChain)是正确方向,
但必须在 runtime 初始化阶段构建一次,而不是每次 Emit 时构建。


如果你愿意,我可以下一步给你:

🔥 支持动态增删 Middleware 的线程安全版本
🔥 支持 Before / After / Error 三阶段 Middleware
🔥 或者给你做一个完整的高性能 DAG Engine 内核骨架

你已经在写一个真正的 workflow runtime 了 🚀

这个问题非常关键 👍
你现在比较的是两种 Middleware 组合方式


第一种(函数式组合,推荐)

func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc {
    wrapped := final

    for i := len(r.middlewares) - 1; i >= 0; i-- {
        mw := r.middlewares[i]
        wrapped = mw(r.nodeID, wrapped)
    }

    return wrapped
}

前提是你的 Middleware 类型是:

type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc

第二种(手动闭包包裹)

func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc {
    wrapped := final

    for i := len(r.middlewares) - 1; i >= 0; i-- {
        next := wrapped
        mw := r.middlewares[i]

        wrapped = func(ev Event) {
            mw(ev, next)
        }
    }

    return wrapped
}

前提是 Middleware 类型是:

type EmitMiddleware func(Event, EmitFunc)

⚠️ 关键差异

这两种 不是完全等价的


一、第一种是“纯函数式组合”(更好)

Middleware 定义:

type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc

Middleware 自己返回一个新的函数。

比如:

func TraceMiddleware(tracer TraceCollector) EmitMiddleware {
    return func(nodeID string, next EmitFunc) EmitFunc {
        return func(ev Event) {
            tracer.Record(nodeID, ev.Type)
            next(ev)
        }
    }
}

组合方式:

wrapped = mw(nodeID, wrapped)

这是:

真正的函数式中间件模型


优点

✔ 更清晰
✔ 每个 middleware 自己决定如何包裹
✔ 可扩展性更强(可以不调用 next)
✔ 更接近 Gin / Echo 内部实现
✔ 更容易支持 error / before / after


二、第二种是“外部包裹式”

Middleware 只是一个执行函数:

type EmitMiddleware func(Event, EmitFunc)

然后你强行包一层:

wrapped = func(ev Event) {
    mw(ev, next)
}

缺点

  1. 结构不对称
  2. Middleware 不能改变函数签名
  3. 不容易支持更复杂控制流
  4. 扩展性差

本质上:

你把控制权放在 buildEmitChain,而不是 Middleware 本身


三、性能对比

两者性能几乎一样。

但第一种:


四、可扩展性对比(非常重要)

假设你想做一个:

第一种可以优雅实现:

return func(ev Event) {
    start := time.Now()
    next(ev)
    duration := time.Since(start)
}

第二种也能做,但结构变得混乱。


五、工业级结论

在 DAG Engine / LLM Workflow 这种高扩展系统里:

第一种(函数式组合)明显更好


推荐最终版本 ⭐⭐⭐⭐⭐

type EmitFunc func(Event)

type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc

构建:

func buildEmitChain(
    nodeID string,
    final EmitFunc,
    mws []EmitMiddleware,
) EmitFunc {

    wrapped := final

    for i := len(mws) - 1; i >= 0; i-- {
        wrapped = mws[i](nodeID, wrapped)
    }

    return wrapped
}

初始化阶段构建一次:

r.emit = buildEmitChain(r.nodeID, e.dispatch, e.middlewares)

Emit:

func (r *nodeRuntime) Emit(ev Event) {
    r.emit(ev)
}

六、架构层面总结

第一种模型:

Middleware 决定如何包裹 next

第二种模型:

Engine 决定如何包裹 Middleware

控制权应该在 Middleware,而不是 Engine。


最终一句话

第一种写法更函数式、更优雅、更可扩展,推荐在高并发 DAG Engine 中使用。


如果你愿意,我可以下一步给你一个:

🔥 支持 Before / After / Error 三阶段的工业级 Middleware 框架
🔥 或者一个完整可运行的 LLM DAG 内核模板

你现在已经进入 runtime 设计的核心区域了 🚀