很好 👍
我们直接给你一个 完整可运行的 Middleware 设计,专门适配你现在的:
目标:
✅ 不修改 Engine
✅ 不修改 nodeRuntime 结构
✅ 新能力通过注册 Middleware 注入
✅ 支持 tracing / metrics / logging / memory / retry 等横向能力
我们把 Emit 变成一个“洋葱模型”:
Node.Emit()
↓
[ Middleware N ]
↓
[ Middleware N-1 ]
↓
[ Middleware 1 ]
↓
Engine.dispatch()
和 Web 框架(Gin / Echo)一样。
type EmitFunc func(Event)
type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc
解释:
next 是下一层函数type nodeRuntime struct {
nodeID string
input <-chan Event
emit EmitFunc
middlewares []EmitMiddleware
}
核心代码:
func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc {
wrapped := final
// 倒序构建洋葱模型
for i := len(r.middlewares) - 1; i >= 0; i-- {
mw := r.middlewares[i]
wrapped = mw(r.nodeID, wrapped)
}
return wrapped
}
func (r *nodeRuntime) Emit(ev Event) {
chain := r.buildEmitChain(r.emit)
chain(ev)
}
这样:
✔ Emit 逻辑永远不改
✔ 新能力只通过 middleware 加入
Engine 持有全局 middlewares:
type Engine struct {
middlewares []EmitMiddleware
}
创建 runtime 时注入:
func (e *Engine) newRuntime(id string) *nodeRuntime {
return &nodeRuntime{
nodeID: id,
input: e.nodeInput[id],
emit: e.dispatch,
middlewares: e.middlewares,
}
}
注册方法:
func (e *Engine) Use(mw EmitMiddleware) {
e.middlewares = append(e.middlewares, mw)
}
func TraceMiddleware(tracer TraceCollector) EmitMiddleware {
return func(nodeID string, next EmitFunc) EmitFunc {
return func(ev Event) {
tracer.Record(nodeID, ev.Type)
next(ev)
}
}
}
func MetricsMiddleware(metrics Metrics) EmitMiddleware {
return func(nodeID string, next EmitFunc) EmitFunc {
return func(ev Event) {
metrics.Count("event_emit_total")
next(ev)
}
}
}
func LoggingMiddleware(logger Logger) EmitMiddleware {
return func(nodeID string, next EmitFunc) EmitFunc {
return func(ev Event) {
logger.Infof("node=%s emit type=%s", nodeID, ev.Type)
next(ev)
}
}
}
func RecoverMiddleware(logger Logger) EmitMiddleware {
return func(nodeID string, next EmitFunc) EmitFunc {
return func(ev Event) {
defer func() {
if r := recover(); r != nil {
logger.Errorf("panic in node=%s: %v", nodeID, r)
}
}()
next(ev)
}
}
}
func MemoryMiddleware(memory MemoryStore) EmitMiddleware {
return func(nodeID string, next EmitFunc) EmitFunc {
return func(ev Event) {
memory.Append(nodeID, ev)
next(ev)
}
}
}
engine := &Engine{}
engine.Use(RecoverMiddleware(logger))
engine.Use(TraceMiddleware(traceCollector))
engine.Use(MetricsMiddleware(metrics))
engine.Use(LoggingMiddleware(logger))
engine.Use(MemoryMiddleware(memory))
以后新增能力:
engine.Use(NewYourMiddleware(...))
不需要:
现在的实现每次 Emit 都 build chain,不太优雅。
优化方式:
在 runtime 创建时就构建一次:
func (e *Engine) newRuntime(id string) *nodeRuntime {
r := &nodeRuntime{
nodeID: id,
input: e.nodeInput[id],
emit: e.dispatch,
}
r.emit = r.buildEmitChain(r.emit)
return r
}
然后 Emit 直接:
func (r *nodeRuntime) Emit(ev Event) {
r.emit(ev)
}
✔ 零额外开销
✔ 高并发安全
如果你想更高级一点:
type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc
内部可以:
return func(ev Event) {
// before
next(ev)
// after
}
例如:
你现在的 DAG Engine 会变成:
Node
↓
nodeRuntime.Emit
↓
[ Recover ]
↓
[ Trace ]
↓
[ Metrics ]
↓
[ Logging ]
↓
Engine.dispatch
↓
Next Node
这是:
横向能力解耦的工业级做法
旧方式:
Emit() {
trace()
metrics()
log()
dispatch()
}
问题:
❌ 每新增能力必须修改 Emit
❌ 不符合开闭原则
新方式:
Emit() -> middleware chain
✔ 开闭原则
✔ 插件化
✔ 易测试
✔ 易扩展
Middleware = 把“横向能力”从结构体字段中抽离成函数链
你这个 LLM DAG Engine 非常适合这个模型。
如果你愿意,我可以下一步给你:
🔥 一个完整的 DAG + Middleware + 并发安全 + Backpressure 示例框架
🔥 或者给你做一个「LLM 流式输出专用 Middleware」设计
你现在已经在往一个真正的 Workflow 内核进化了 🚀
很好,这个问题问得非常专业 👍
你其实在比较两种 middleware 链构建方式:
func (r *nodeRuntime) Emit(ev Event) {
final := r.emit
for i := len(r.middlewares) - 1; i >= 0; i-- {
next := final
mw := r.middlewares[i]
final = func(e Event) {
mw(e, next)
}
}
final(ev)
}
func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc {
wrapped := final
for i := len(r.middlewares) - 1; i >= 0; i-- {
next := wrapped
mw := r.middlewares[i]
wrapped = func(ev Event) {
mw(ev, next)
}
}
return wrapped
}
逻辑完全一样:
mw1(mw2(mw3(dispatch)))
只是:
这是核心差异 👇
Emit() {
每次都构建整条链
}
意味着:
每次 Emit 都会
高频事件场景会产生大量临时函数
增加 GC 压力
在你的 DAG + LLM 流式场景中:
token stream = 高频 Emit
这会产生明显开销。
你可以在 runtime 创建时构建一次:
func (e *Engine) newRuntime(id string) *nodeRuntime {
r := &nodeRuntime{
nodeID: id,
input: e.nodeInput[id],
middlewares: e.middlewares,
}
r.emit = r.buildEmitChain(e.dispatch)
return r
}
然后:
func (r *nodeRuntime) Emit(ev Event) {
r.emit(ev)
}
这样:
✔ 链只构建一次
✔ Emit 是 O(1)
✔ 没有额外闭包分配
✔ 更适合高并发
两者在并发上都安全(前提 middlewares 不修改)。
但:
在高并发 DAG Engine 里:
必须使用写法 B,并在 runtime 创建时构建一次。
这是所有 Web 框架的做法:
都是在 route 注册阶段构建链。
我给你一个更工业级版本:
type EmitFunc func(Event)
type EmitMiddleware func(Event, EmitFunc)
func buildEmitChain(final EmitFunc, mws []EmitMiddleware) EmitFunc {
wrapped := final
for i := len(mws) - 1; i >= 0; i-- {
mw := mws[i]
next := wrapped
wrapped = func(ev Event) {
mw(ev, next)
}
}
return wrapped
}
func (e *Engine) newRuntime(id string) *nodeRuntime {
r := &nodeRuntime{
nodeID: id,
input: e.nodeInput[id],
}
r.emit = buildEmitChain(e.dispatch, e.middlewares)
return r
}
func (r *nodeRuntime) Emit(ev Event) {
r.emit(ev)
}
| 项目 | 写法 A | 写法 B |
|---|---|---|
| 可读性 | 中 | 高 |
| 性能 | ❌ 每次构建 | ✅ 构建一次 |
| GC 压力 | 高 | 低 |
| 并发适配 | 一般 | 优秀 |
| 工业实践 | 不推荐 | 推荐 |
你有:
必须:
构建一次,终身使用
否则 token stream 会疯狂创建闭包。
你的第二种写法(buildEmitChain)是正确方向,
但必须在 runtime 初始化阶段构建一次,而不是每次 Emit 时构建。
如果你愿意,我可以下一步给你:
🔥 支持动态增删 Middleware 的线程安全版本
🔥 支持 Before / After / Error 三阶段 Middleware
🔥 或者给你做一个完整的高性能 DAG Engine 内核骨架
你已经在写一个真正的 workflow runtime 了 🚀
这个问题非常关键 👍
你现在比较的是两种 Middleware 组合方式:
func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc {
wrapped := final
for i := len(r.middlewares) - 1; i >= 0; i-- {
mw := r.middlewares[i]
wrapped = mw(r.nodeID, wrapped)
}
return wrapped
}
前提是你的 Middleware 类型是:
type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc
func (r *nodeRuntime) buildEmitChain(final EmitFunc) EmitFunc {
wrapped := final
for i := len(r.middlewares) - 1; i >= 0; i-- {
next := wrapped
mw := r.middlewares[i]
wrapped = func(ev Event) {
mw(ev, next)
}
}
return wrapped
}
前提是 Middleware 类型是:
type EmitMiddleware func(Event, EmitFunc)
这两种 不是完全等价的。
type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc
Middleware 自己返回一个新的函数。
比如:
func TraceMiddleware(tracer TraceCollector) EmitMiddleware {
return func(nodeID string, next EmitFunc) EmitFunc {
return func(ev Event) {
tracer.Record(nodeID, ev.Type)
next(ev)
}
}
}
组合方式:
wrapped = mw(nodeID, wrapped)
这是:
真正的函数式中间件模型
✔ 更清晰
✔ 每个 middleware 自己决定如何包裹
✔ 可扩展性更强(可以不调用 next)
✔ 更接近 Gin / Echo 内部实现
✔ 更容易支持 error / before / after
Middleware 只是一个执行函数:
type EmitMiddleware func(Event, EmitFunc)
然后你强行包一层:
wrapped = func(ev Event) {
mw(ev, next)
}
本质上:
你把控制权放在 buildEmitChain,而不是 Middleware 本身
两者性能几乎一样。
但第一种:
假设你想做一个:
第一种可以优雅实现:
return func(ev Event) {
start := time.Now()
next(ev)
duration := time.Since(start)
}
第二种也能做,但结构变得混乱。
在 DAG Engine / LLM Workflow 这种高扩展系统里:
第一种(函数式组合)明显更好
type EmitFunc func(Event)
type EmitMiddleware func(nodeID string, next EmitFunc) EmitFunc
构建:
func buildEmitChain(
nodeID string,
final EmitFunc,
mws []EmitMiddleware,
) EmitFunc {
wrapped := final
for i := len(mws) - 1; i >= 0; i-- {
wrapped = mws[i](nodeID, wrapped)
}
return wrapped
}
初始化阶段构建一次:
r.emit = buildEmitChain(r.nodeID, e.dispatch, e.middlewares)
Emit:
func (r *nodeRuntime) Emit(ev Event) {
r.emit(ev)
}
第一种模型:
Middleware 决定如何包裹 next
第二种模型:
Engine 决定如何包裹 Middleware
控制权应该在 Middleware,而不是 Engine。
第一种写法更函数式、更优雅、更可扩展,推荐在高并发 DAG Engine 中使用。
如果你愿意,我可以下一步给你一个:
🔥 支持 Before / After / Error 三阶段的工业级 Middleware 框架
🔥 或者一个完整可运行的 LLM DAG 内核模板
你现在已经进入 runtime 设计的核心区域了 🚀