你问的核心是:
msg, err := sr.Recv()这种“流式接收”在 Go 里是怎么实现的?
这本质是一个 流式接口(stream abstraction),通常用:
✅ interface + channel
或
✅ interface + goroutine + 网络连接
来实现。
我分层给你讲清楚。
sr, err := chatModel.Stream(ctx, messages)
for {
msg, err := sr.Recv()
}
说明:
Stream() 返回一个 流对象Recv() 每次返回一段数据(delta)io.EOF 表示结束这和 gRPC streaming 的模式一模一样。
通常定义一个接口:
type Stream interface {
Recv() (*Message, error)
}
然后内部实现结构体:
type streamImpl struct {
ch chan *Message
err chan error
}
func (m *Model) Stream(ctx context.Context, msgs []*Message) (Stream, error) {
s := &streamImpl{
ch: make(chan *Message),
err: make(chan error, 1),
}
go s.start(ctx, msgs)
return s, nil
}
func (s *streamImpl) start(ctx context.Context, msgs []*Message) {
defer close(s.ch)
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
s.ch <- &Message{
Content: fmt.Sprintf("chunk %d", i),
}
}
}
func (s *streamImpl) Recv() (*Message, error) {
msg, ok := <-s.ch
if !ok {
return nil, io.EOF
}
return msg, nil
}
真实场景是:
结构图:
HTTP response body
↓
goroutine 读取
↓
解析 JSON
↓
写入 channel
↓
Recv() 读取
因为:
for {
msg, err := sr.Recv()
}
Recv() 本质是:
<-channel
这是阻塞操作:
Go 里流式结束的标准方式是:
return nil, io.EOF
这是 Go 官方约定。
比如:
全部用 EOF 表示结束。
如果底层是 HTTP 流:
func (s *streamImpl) Recv() (*Message, error) {
line, err := s.reader.ReadBytes('\n')
if err != nil {
return nil, err
}
var msg Message
json.Unmarshal(line, &msg)
return &msg, nil
}
这种实现:
但逻辑效果一样。
| 技术 | 用途 |
|---|---|
| interface | 抽象流 |
| goroutine | 后台读取 |
| channel | 传递数据 |
| io.EOF | 结束信号 |
| context | 控制取消 |
Stream() 创建流对象
↓
内部开启 goroutine
↓
不断接收 HTTP chunk
↓
写入 channel
↓
Recv() 从 channel 读取
↓
读完返回 io.EOF
标准结构是:
type ChatStream interface {
Recv() (*Message, error)
Close() error
}
配合:
我可以给你写一个:
你现在是想: