Github开源生信云平台 DEMO
好,我们直接设计一套:
🚀 Go 实现的高级 LLM Workflow Engine ✅ Tool Call 回环✅ LLM 决策路由✅ Memory 节点✅ 多轮对话✅ 任务状态可视化
🚀 Go 实现的高级 LLM Workflow Engine
✅ Tool Call 回环✅ LLM 决策路由✅ Memory 节点✅ 多轮对话✅ 任务状态可视化
这不是 demo,而是可以做成框架的架构。
我们从普通 DAG 升级为:
Stateful Dynamic Graph Engine
核心模块:
Session ↓ Workflow Engine ├── Node Runtime ├── Event Router ├── State Store ├── Memory Manager ├── Tool Executor └── Visual Trace Collector
type Event struct { ID string From string Type string Data any SessionID string TraceID string }
支持:
type Node interface { ID() string Run(ctx context.Context, rt NodeRuntime) error }
type NodeRuntime interface { Input() <-chan Event Emit(Event) GetState(key string) any SetState(key string, value any) }
这使节点可以:
LLM 输出:
{ "tool_call": { "name": "search", "args": {"query": "golang dag"} } }
type LLMNode struct{} func (n *LLMNode) Run(ctx context.Context, rt NodeRuntime) error { for { select { case ev := <-rt.Input(): result := callLLM(ev.Data) if result.ToolCall != nil { rt.Emit(Event{ Type: "tool_call", From: n.ID(), Data: result.ToolCall, }) continue } rt.Emit(Event{ Type: "llm_result", From: n.ID(), Data: result.Text, }) case <-ctx.Done(): return ctx.Err() } } }
type ToolExecutorNode struct{} func (n *ToolExecutorNode) Run(ctx context.Context, rt NodeRuntime) error { for { select { case ev := <-rt.Input(): toolCall := ev.Data.(ToolCall) result := executeTool(toolCall) rt.Emit(Event{ Type: "tool_result", From: n.ID(), Data: result, }) case <-ctx.Done(): return ctx.Err() } } }
Edges: []Edge{ {FromNode: "llm", OnEvent: "tool_call", ToNode: "tool"}, {FromNode: "tool", OnEvent: "tool_result", ToNode: "llm"}, }
这就是:
LLM → Tool → LLM → Tool → ... → 最终回答
真正的 Agent Loop。
让 LLM 决定走哪条边。
type DecisionNode struct{} func (n *DecisionNode) Run(ctx context.Context, rt NodeRuntime) error { for { select { case ev := <-rt.Input(): decision := callLLMDecision(ev.Data) rt.Emit(Event{ Type: "route", From: n.ID(), Data: decision, }) case <-ctx.Done(): return ctx.Err() } } }
{ FromNode: "decision", OnEvent: "route", ToNode: "rag", Cond: func(ev Event) bool { return ev.Data == "use_rag" }, }
现在路由由 LLM 决定。
每个 Session 有 Memory Store:
type MemoryStore interface { Get(sessionID string) []Message Append(sessionID string, msg Message) }
type MemoryNode struct { store MemoryStore } func (n *MemoryNode) Run(ctx context.Context, rt NodeRuntime) error { for { select { case ev := <-rt.Input(): sessionID := ev.SessionID history := n.store.Get(sessionID) merged := merge(history, ev.Data) rt.Emit(Event{ Type: "memory_ready", From: n.ID(), Data: merged, }) case <-ctx.Done(): return ctx.Err() } } }
关键点:
每个 Session 独立 Engine 实例或共享 Engine + SessionID 隔离
推荐:
type Session struct { ID string Engine *Engine }
Memory 通过 SessionID 区分。
type NodeState struct { Status string LastEvent string Error error }
Engine 维护:
nodeStates map[string]*NodeState
type Trace struct { From string To string EventType string Time time.Time }
收集到:
[]Trace
{ "nodes": [ {"id":"llm","status":"running"}, {"id":"tool","status":"completed"} ], "edges":[ {"from":"llm","to":"tool","event":"tool_call"} ] }
前端可以用:
实时渲染执行图。
用户输入 ↓ MemoryNode ↓ LLMNode ├── tool_call → ToolNode → LLMNode (loop) ├── decision → 条件路由 ├── llm_chunk → TTS └── llm_result → SSE
你已经实现的是:
Go 版本的 LangGraph + Agent + Memory + Workflow Engine
而且:
可以加:
如果你愿意,我可以下一步给你:
🔥 一个完整“可开源级”Engine结构目录设计🔥 线程安全实现细节🔥 防 goroutine 泄漏模型🔥 高并发 Session 管理模型🔥 生产级 backpressure 方案
你现在这个方向,已经是可以做框架级项目了。
很好 👍 这一步是**从“能跑”升级到“架构清晰”**的关键。
你问的是:
我给你一套完整可运行级别的结构设计(不是伪架构)。
先说核心思想:
Node 不应该直接依赖 EngineNode 只应该依赖一个“运行时上下文”
所以:
它的作用是:
Node 不需要知道:
这叫:
运行时隔离(Runtime Isolation)
由 Engine 实现。
type nodeRuntime struct { nodeID string sessionID string input <-chan Event emit func(Event) state map[string]any mu sync.RWMutex }
func (r *nodeRuntime) Input() <-chan Event { return r.input } func (r *nodeRuntime) Emit(ev Event) { ev.From = r.nodeID ev.SessionID = r.sessionID r.emit(ev) } func (r *nodeRuntime) GetState(key string) any { r.mu.RLock() defer r.mu.RUnlock() return r.state[key] } func (r *nodeRuntime) SetState(key string, value any) { r.mu.Lock() defer r.mu.Unlock() r.state[key] = value }
MemoryStore 是:
Session 级别的长期记忆
而 NodeRuntime 的 state 是:
节点级短期状态
区别:
type InMemoryStore struct { mu sync.RWMutex store map[string][]Message } func NewMemoryStore() *InMemoryStore { return &InMemoryStore{ store: make(map[string][]Message), } } func (m *InMemoryStore) Get(sessionID string) []Message { m.mu.RLock() defer m.mu.RUnlock() return append([]Message{}, m.store[sessionID]...) } func (m *InMemoryStore) Append(sessionID string, msg Message) { m.mu.Lock() defer m.mu.Unlock() m.store[sessionID] = append(m.store[sessionID], msg) }
每个用户一个 Session:
Session 创建时:
func NewSession(id string, dag *DAG, store MemoryStore) *Session { engine := NewEngine(dag, store, id) return &Session{ ID: id, Engine: engine, } }
Engine 里加入:
type Engine struct { dag *DAG memoryStore MemoryStore sessionID string nodeInput map[string]chan Event bus chan Event }
MemoryNode 负责:
type Message struct { Role string Content string }
type MemoryNode struct { store MemoryStore } func (n *MemoryNode) ID() string { return "memory" } func (n *MemoryNode) Run(ctx context.Context, rt NodeRuntime) error { for { select { case ev := <-rt.Input(): userText := ev.Data.(string) sessionID := ev.SessionID // 1️⃣ 读取历史 history := n.store.Get(sessionID) // 2️⃣ 拼接新消息 history = append(history, Message{ Role: "user", Content: userText, }) // 3️⃣ 输出给 LLM rt.Emit(Event{ Type: "memory_ready", Data: history, }) case <-ctx.Done(): return ctx.Err() } } }
type LLMNode struct { store MemoryStore } func (n *LLMNode) ID() string { return "llm" } func (n *LLMNode) Run(ctx context.Context, rt NodeRuntime) error { for { select { case ev := <-rt.Input(): messages := ev.Data.([]Message) // 调用 LLM response := callLLM(messages) // 保存 assistant 回复 n.store.Append(ev.SessionID, Message{ Role: "assistant", Content: response, }) rt.Emit(Event{ Type: "llm_result", Data: response, }) case <-ctx.Done(): return ctx.Err() } } }
第一次:
User: 你好
MemoryStore:
[user: 你好]
LLM 回复:
assistant: 你好,有什么可以帮你?
[user: 你好] [assistant: 你好,有什么可以帮你?]
第二次:
User: 你刚才说什么?
MemoryNode 读取历史:
[user: 你好] [assistant: 你好,有什么可以帮你?] [user: 你刚才说什么?]
LLM 有完整上下文。
Edges: []Edge{ {FromNode: "input", OnEvent: "user_input", ToNode: "memory"}, {FromNode: "memory", OnEvent: "memory_ready", ToNode: "llm"}, }
Node 不依赖 EngineNode 可独立测试
可替换为:
多个用户:
session1 := NewSession("userA", dag, store) session2 := NewSession("userB", dag, store)
不会串数据。
Session ↓ Engine ↓ MemoryNode → LLMNode → ToolNode → ... ↓ MemoryStore
下一步可以升级到:
🔥 Memory 滑动窗口🔥 Token 限制裁剪🔥 Vector RAG 融合🔥 Checkpoint 持久化🔥 高并发 session 池🔥 防 goroutine 泄漏模型
你现在已经在设计一个可以开源的 Go Agent Framework 了。
如果你想,我可以给你画出一套完整目录结构和并发安全实现模型。