典型 Go 流式接口设计

最后发布时间:2026-03-02 09:56:26 浏览量:

你问的核心是:

msg, err := sr.Recv() 这种“流式接收”在 Go 里是怎么实现的?

这本质是一个 流式接口(stream abstraction),通常用:

✅ interface + channel

✅ interface + goroutine + 网络连接

来实现。

我分层给你讲清楚。


一、你这段代码本质在做什么?

sr, err := chatModel.Stream(ctx, messages)

for {
    msg, err := sr.Recv()
}

说明:

  • Stream() 返回一个 流对象
  • Recv() 每次返回一段数据(delta)
  • 直到 io.EOF 表示结束

这和 gRPC streaming 的模式一模一样。


二、典型 Go 流式接口设计

通常定义一个接口:

type Stream interface {
    Recv() (*Message, error)
}

然后内部实现结构体:

type streamImpl struct {
    ch   chan *Message
    err  chan error
}

三、最常见实现方式:channel + goroutine

1️⃣ 创建流

func (m *Model) Stream(ctx context.Context, msgs []*Message) (Stream, error) {
    s := &streamImpl{
        ch:  make(chan *Message),
        err: make(chan error, 1),
    }

    go s.start(ctx, msgs)

    return s, nil
}

2️⃣ goroutine 负责不断写入 channel

func (s *streamImpl) start(ctx context.Context, msgs []*Message) {
    defer close(s.ch)

    for i := 0; i < 5; i++ {
        time.Sleep(time.Second)

        s.ch <- &Message{
            Content: fmt.Sprintf("chunk %d", i),
        }
    }
}

3️⃣ Recv() 从 channel 读

func (s *streamImpl) Recv() (*Message, error) {
    msg, ok := <-s.ch
    if !ok {
        return nil, io.EOF
    }
    return msg, nil
}

四、如果是调用 LLM(如 Qwen / OpenAI)

真实场景是:

  1. 建立 HTTP streaming 连接
  2. 服务器持续返回数据
  3. 后台 goroutine 读取 HTTP body
  4. 解析 chunk
  5. 写入 channel
  6. Recv() 从 channel 读

结构图:

HTTP response body
        ↓
goroutine 读取
        ↓
解析 JSON
        ↓
写入 channel
        ↓
Recv() 读取

五、为什么能循环 Recv()?

因为:

for {
    msg, err := sr.Recv()
}

Recv() 本质是:

<-channel

这是阻塞操作:

  • 没数据 → 阻塞
  • 有数据 → 返回
  • channel 关闭 → 返回 ok=false → 转成 io.EOF

六、为什么用 io.EOF?

Go 里流式结束的标准方式是:

return nil, io.EOF

这是 Go 官方约定。

比如:

  • 文件读取
  • 网络流
  • gRPC streaming
  • bufio.Reader

全部用 EOF 表示结束。


七、可能的另一种实现(更高级)

如果底层是 HTTP 流:

func (s *streamImpl) Recv() (*Message, error) {
    line, err := s.reader.ReadBytes('\n')
    if err != nil {
        return nil, err
    }

    var msg Message
    json.Unmarshal(line, &msg)

    return &msg, nil
}

这种实现:

  • 没用 channel
  • 直接每次从 socket 读

但逻辑效果一样。


八、总结:Recv 的本质

技术用途
interface抽象流
goroutine后台读取
channel传递数据
io.EOF结束信号
context控制取消

九、你这段代码的运行机制(简化版)

Stream() 创建流对象
    ↓
内部开启 goroutine
    ↓
不断接收 HTTP chunk
    ↓
写入 channel
    ↓
Recv() 从 channel 读取
    ↓
读完返回 io.EOF

十、如果你想自己实现一个 LLM Streaming Client

标准结构是:

type ChatStream interface {
    Recv() (*Message, error)
    Close() error
}

配合:

  • context 控制取消
  • select 监听 ctx.Done()
  • channel 传递数据

如果你愿意

我可以给你写一个:

  • 完整可运行的“模拟 LLM 流式返回”示例
  • 或者画一个 goroutine + channel + Recv 的执行时序图
  • 或解析 CloudWeGo 的 qwen 实现结构

你现在是想:

  • 理解原理?
  • 还是想自己实现一个 streaming 模型客户端?