你问的核心是:
msg, err := sr.Recv() 这种“流式接收”在 Go 里是怎么实现的?
msg, err := sr.Recv()
这本质是一个 流式接口(stream abstraction),通常用:
✅ interface + channel或✅ interface + goroutine + 网络连接
来实现。
我分层给你讲清楚。
sr, err := chatModel.Stream(ctx, messages) for { msg, err := sr.Recv() }
说明:
Stream()
Recv()
io.EOF
这和 gRPC streaming 的模式一模一样。
通常定义一个接口:
type Stream interface { Recv() (*Message, error) }
然后内部实现结构体:
type streamImpl struct { ch chan *Message err chan error }
func (m *Model) Stream(ctx context.Context, msgs []*Message) (Stream, error) { s := &streamImpl{ ch: make(chan *Message), err: make(chan error, 1), } go s.start(ctx, msgs) return s, nil }
func (s *streamImpl) start(ctx context.Context, msgs []*Message) { defer close(s.ch) for i := 0; i < 5; i++ { time.Sleep(time.Second) s.ch <- &Message{ Content: fmt.Sprintf("chunk %d", i), } } }
func (s *streamImpl) Recv() (*Message, error) { msg, ok := <-s.ch if !ok { return nil, io.EOF } return msg, nil }
真实场景是:
结构图:
HTTP response body ↓ goroutine 读取 ↓ 解析 JSON ↓ 写入 channel ↓ Recv() 读取
因为:
for { msg, err := sr.Recv() }
Recv() 本质是:
<-channel
这是阻塞操作:
Go 里流式结束的标准方式是:
return nil, io.EOF
这是 Go 官方约定。
比如:
全部用 EOF 表示结束。
如果底层是 HTTP 流:
func (s *streamImpl) Recv() (*Message, error) { line, err := s.reader.ReadBytes('\n') if err != nil { return nil, err } var msg Message json.Unmarshal(line, &msg) return &msg, nil }
这种实现:
但逻辑效果一样。
Stream() 创建流对象 ↓ 内部开启 goroutine ↓ 不断接收 HTTP chunk ↓ 写入 channel ↓ Recv() 从 channel 读取 ↓ 读完返回 io.EOF
标准结构是:
type ChatStream interface { Recv() (*Message, error) Close() error }
配合:
我可以给你写一个:
你现在是想:
Github开源生信云平台 DEMO