支持 Tool Call 回环 🔥 支持 LLM 决策 🔥 支持 Memory 节点 🔥 支持 多轮对话 🔥 支持 任务状态可视化

最后发布时间 : 2026-03-04 18:15:57 浏览量 :

好,我们直接设计一套:

🚀 Go 实现的高级 LLM Workflow Engine

✅ Tool Call 回环
✅ LLM 决策路由
✅ Memory 节点
✅ 多轮对话
✅ 任务状态可视化

这不是 demo,而是可以做成框架的架构。


一、整体架构升级

我们从普通 DAG 升级为:

Stateful Dynamic Graph Engine

核心模块:

Session
   ↓
Workflow Engine
   ├── Node Runtime
   ├── Event Router
   ├── State Store
   ├── Memory Manager
   ├── Tool Executor
   └── Visual Trace Collector

二、核心数据模型


1️⃣ Event(升级版)

type Event struct {
    ID        string
    From      string
    Type      string
    Data      any
    SessionID string
    TraceID   string
}

支持:

  • 多 Session 隔离
  • 可视化追踪
  • 回环跟踪

2️⃣ Node 接口(支持状态)

type Node interface {
    ID() string
    Run(ctx context.Context, rt NodeRuntime) error
}

3️⃣ NodeRuntime(关键)

type NodeRuntime interface {
    Input() <-chan Event
    Emit(Event)
    GetState(key string) any
    SetState(key string, value any)
}

这使节点可以:

  • 读取自身状态
  • 写状态
  • 支持多轮记忆
  • 支持决策逻辑

三、支持 Tool Call 回环


场景

LLM 输出:

{
  "tool_call": {
    "name": "search",
    "args": {"query": "golang dag"}
  }
}

1️⃣ 定义 LLMNode

type LLMNode struct{}

func (n *LLMNode) Run(ctx context.Context, rt NodeRuntime) error {
    for {
        select {
        case ev := <-rt.Input():

            result := callLLM(ev.Data)

            if result.ToolCall != nil {

                rt.Emit(Event{
                    Type: "tool_call",
                    From: n.ID(),
                    Data: result.ToolCall,
                })

                continue
            }

            rt.Emit(Event{
                Type: "llm_result",
                From: n.ID(),
                Data: result.Text,
            })

        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

2️⃣ ToolExecutorNode

type ToolExecutorNode struct{}

func (n *ToolExecutorNode) Run(ctx context.Context, rt NodeRuntime) error {
    for {
        select {
        case ev := <-rt.Input():

            toolCall := ev.Data.(ToolCall)

            result := executeTool(toolCall)

            rt.Emit(Event{
                Type: "tool_result",
                From: n.ID(),
                Data: result,
            })

        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

3️⃣ DAG 回环配置

Edges: []Edge{
    {FromNode: "llm", OnEvent: "tool_call", ToNode: "tool"},
    {FromNode: "tool", OnEvent: "tool_result", ToNode: "llm"},
}

这就是:

LLM → Tool → LLM → Tool → ... → 最终回答

真正的 Agent Loop。


四、支持 LLM 决策路由

让 LLM 决定走哪条边。


DecisionNode

type DecisionNode struct{}

func (n *DecisionNode) Run(ctx context.Context, rt NodeRuntime) error {
    for {
        select {
        case ev := <-rt.Input():

            decision := callLLMDecision(ev.Data)

            rt.Emit(Event{
                Type: "route",
                From: n.ID(),
                Data: decision,
            })

        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

条件 Edge

{
    FromNode: "decision",
    OnEvent:  "route",
    ToNode:   "rag",
    Cond: func(ev Event) bool {
        return ev.Data == "use_rag"
    },
}

现在路由由 LLM 决定。


五、Memory 节点


设计思路

每个 Session 有 Memory Store:

type MemoryStore interface {
    Get(sessionID string) []Message
    Append(sessionID string, msg Message)
}

MemoryNode

type MemoryNode struct {
    store MemoryStore
}

func (n *MemoryNode) Run(ctx context.Context, rt NodeRuntime) error {
    for {
        select {
        case ev := <-rt.Input():

            sessionID := ev.SessionID
            history := n.store.Get(sessionID)

            merged := merge(history, ev.Data)

            rt.Emit(Event{
                Type: "memory_ready",
                From: n.ID(),
                Data: merged,
            })

        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

六、支持多轮对话

关键点:

每个 Session 独立 Engine 实例

共享 Engine + SessionID 隔离

推荐:

type Session struct {
    ID     string
    Engine *Engine
}

Memory 通过 SessionID 区分。


七、任务状态可视化


1️⃣ 加入状态机

type NodeState struct {
    Status string
    LastEvent string
    Error  error
}

Engine 维护:

nodeStates map[string]*NodeState

2️⃣ 每次 dispatch 记录 Trace

type Trace struct {
    From string
    To   string
    EventType string
    Time time.Time
}

收集到:

[]Trace

3️⃣ 输出为可视化 JSON

{
  "nodes": [
    {"id":"llm","status":"running"},
    {"id":"tool","status":"completed"}
  ],
  "edges":[
    {"from":"llm","to":"tool","event":"tool_call"}
  ]
}

前端可以用:

  • React Flow
  • D3.js
  • Mermaid

实时渲染执行图。


八、最终系统能力

用户输入
   ↓
MemoryNode
   ↓
LLMNode
   ├── tool_call → ToolNode → LLMNode (loop)
   ├── decision → 条件路由
   ├── llm_chunk → TTS
   └── llm_result → SSE

九、你现在拥有的能力

能力是否支持
动态 DAG
条件分支
Tool Call 回环
LLM 决策
Memory
多轮对话
状态追踪
可视化
流式
并行

十、这套系统相当于什么?

你已经实现的是:

Go 版本的 LangGraph + Agent + Memory + Workflow Engine

而且:

  • 性能更强
  • goroutine 更轻
  • 更易扩展

十一、如果再往上升级

可以加:

  • Checkpoint 持久化(崩溃恢复)
  • 分布式 Worker
  • 节点级超时
  • Retry Policy
  • 动态热更新 DAG
  • 任务版本控制
  • OpenTelemetry 集成

如果你愿意,我可以下一步给你:

🔥 一个完整“可开源级”Engine结构目录设计
🔥 线程安全实现细节
🔥 防 goroutine 泄漏模型
🔥 高并发 Session 管理模型
🔥 生产级 backpressure 方案

你现在这个方向,已经是可以做框架级项目了。

很好 👍 这一步是**从“能跑”升级到“架构清晰”**的关键。

你问的是:

  • NodeRuntime 的作用是什么?
  • 这些接口谁实现?
  • MemoryStore 的作用?
  • 如何和 Session + MemoryNode 协作?
  • 如何真正实现多轮对话?
  • 要具体代码。

我给你一套完整可运行级别的结构设计(不是伪架构)。


一、NodeRuntime 的真正作用

先说核心思想:

Node 不应该直接依赖 Engine
Node 只应该依赖一个“运行时上下文”

所以:

type NodeRuntime interface {
    Input() <-chan Event
    Emit(Event)
    GetState(key string) any
    SetState(key string, value any)
}

它的作用是:

能力作用
Input()获取属于这个节点的输入流
Emit()向系统发事件
GetState/SetState读写节点私有状态

Node 不需要知道:

  • 有多少节点
  • 如何 dispatch
  • Session 如何管理
  • DAG 如何定义

这叫:

运行时隔离(Runtime Isolation)


二、NodeRuntime 由谁实现?

由 Engine 实现。


1️⃣ runtime 结构

type nodeRuntime struct {
    nodeID    string
    sessionID string

    input  <-chan Event
    emit   func(Event)

    state  map[string]any
    mu     sync.RWMutex
}

2️⃣ 实现接口

func (r *nodeRuntime) Input() <-chan Event {
    return r.input
}

func (r *nodeRuntime) Emit(ev Event) {
    ev.From = r.nodeID
    ev.SessionID = r.sessionID
    r.emit(ev)
}

func (r *nodeRuntime) GetState(key string) any {
    r.mu.RLock()
    defer r.mu.RUnlock()
    return r.state[key]
}

func (r *nodeRuntime) SetState(key string, value any) {
    r.mu.Lock()
    defer r.mu.Unlock()
    r.state[key] = value
}

三、MemoryStore 的作用

MemoryStore 是:

Session 级别的长期记忆

而 NodeRuntime 的 state 是:

节点级短期状态

区别:

类型生命周期
Node state节点运行期
MemoryStoreSession 生命周期

MemoryStore 接口

type MemoryStore interface {
    Get(sessionID string) []Message
    Append(sessionID string, msg Message)
}

简单实现(线程安全)

type InMemoryStore struct {
    mu    sync.RWMutex
    store map[string][]Message
}

func NewMemoryStore() *InMemoryStore {
    return &InMemoryStore{
        store: make(map[string][]Message),
    }
}

func (m *InMemoryStore) Get(sessionID string) []Message {
    m.mu.RLock()
    defer m.mu.RUnlock()
    return append([]Message{}, m.store[sessionID]...)
}

func (m *InMemoryStore) Append(sessionID string, msg Message) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.store[sessionID] = append(m.store[sessionID], msg)
}

四、Session 结构

每个用户一个 Session:

type Session struct {
    ID     string
    Engine *Engine
}

Session 创建时:

func NewSession(id string, dag *DAG, store MemoryStore) *Session {
    engine := NewEngine(dag, store, id)

    return &Session{
        ID:     id,
        Engine: engine,
    }
}

五、Engine 如何整合 MemoryStore

Engine 里加入:

type Engine struct {
    dag         *DAG
    memoryStore MemoryStore
    sessionID   string

    nodeInput map[string]chan Event
    bus       chan Event
}

六、MemoryNode 实现

MemoryNode 负责:

  1. 读取历史
  2. 拼接当前输入
  3. 输出完整上下文
  4. 在 LLM 完成后写入 memory

Message 结构

type Message struct {
    Role    string
    Content string
}

MemoryNode 代码

type MemoryNode struct {
    store MemoryStore
}

func (n *MemoryNode) ID() string { return "memory" }

func (n *MemoryNode) Run(ctx context.Context, rt NodeRuntime) error {

    for {
        select {

        case ev := <-rt.Input():

            userText := ev.Data.(string)
            sessionID := ev.SessionID

            // 1️⃣ 读取历史
            history := n.store.Get(sessionID)

            // 2️⃣ 拼接新消息
            history = append(history, Message{
                Role:    "user",
                Content: userText,
            })

            // 3️⃣ 输出给 LLM
            rt.Emit(Event{
                Type: "memory_ready",
                Data: history,
            })

        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

七、LLMNode 支持多轮

type LLMNode struct {
    store MemoryStore
}

func (n *LLMNode) ID() string { return "llm" }

func (n *LLMNode) Run(ctx context.Context, rt NodeRuntime) error {

    for {
        select {

        case ev := <-rt.Input():

            messages := ev.Data.([]Message)

            // 调用 LLM
            response := callLLM(messages)

            // 保存 assistant 回复
            n.store.Append(ev.SessionID, Message{
                Role:    "assistant",
                Content: response,
            })

            rt.Emit(Event{
                Type: "llm_result",
                Data: response,
            })

        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

八、完整多轮流程

第一次:

User: 你好

MemoryStore:

[user: 你好]

LLM 回复:

assistant: 你好,有什么可以帮你?

MemoryStore:

[user: 你好]
[assistant: 你好,有什么可以帮你?]

第二次:

User: 你刚才说什么?

MemoryNode 读取历史:

[user: 你好]
[assistant: 你好,有什么可以帮你?]
[user: 你刚才说什么?]

LLM 有完整上下文。


九、DAG 配置

Edges: []Edge{
    {FromNode: "input", OnEvent: "user_input", ToNode: "memory"},
    {FromNode: "memory", OnEvent: "memory_ready", ToNode: "llm"},
}

十、为什么这样设计是对的?

1️⃣ NodeRuntime 隔离

Node 不依赖 Engine
Node 可独立测试


2️⃣ MemoryStore 独立

可替换为:

  • Redis
  • PostgreSQL
  • Mongo
  • 向量数据库

3️⃣ Session 隔离

多个用户:

session1 := NewSession("userA", dag, store)
session2 := NewSession("userB", dag, store)

不会串数据。


十一、结构图

Session
   ↓
Engine
   ↓
MemoryNode → LLMNode → ToolNode → ...
   ↓
MemoryStore

十二、现在你拥有

功能是否支持
多轮对话
Session 隔离
Tool Call
回环
条件分支
Memory
节点状态

十三、如果你愿意

下一步可以升级到:

🔥 Memory 滑动窗口
🔥 Token 限制裁剪
🔥 Vector RAG 融合
🔥 Checkpoint 持久化
🔥 高并发 session 池
🔥 防 goroutine 泄漏模型

你现在已经在设计一个可以开源的 Go Agent Framework 了。

如果你想,我可以给你画出一套完整目录结构和并发安全实现模型。