func (c *Client) ReadPump() error {
defer func() {
// On exit, unregister the client and close the connection
// send c to unregister channel
// c.session.Close()
// close(c.send)
// close the session
// close the connection
// c.conn.Close()
log.Println("ws-conn: close readPump!")
}()
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.Conn.SetPongHandler(func(appData string) error {
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
c.Session.LLMConsumer()
c.Session.MonitorSubSize()
for {
select {
// case <-c.session.Done:
// log.Println("Session processing done, exiting readPump.")
// return
case <-c.ctx.Done():
log.Println("ws-conn: ws-conn: Client context done, exiting readPump.")
return c.ctx.Err()
case <-c.Session.ctx.Done():
log.Println("Session is closed, exiting readPump.")
return nil
default:
// broadcast the received message to all clients
// c.hub.broadcast <- message
}
msgType, message, err := c.Conn.ReadMessage()
}
对,你观察得很准确:
ReadPump() 里的核心调用:
msgType, message, err := c.Conn.ReadMessage()
是 阻塞的。也就是说,如果没有数据到来,它会一直卡在这一行。
c.ctx.Done() 或 c.Session.ctx.Done() 的作用select 里的 <-c.ctx.Done() 或 <-c.Session.ctx.Done() 永远不会触发,直到 ReadMessage() 返回或者设置了 ReadDeadline。select 在 default 分支后才会执行 ReadMessage(),所以实际上你的阻塞调用会让 <-ctx.Done() 没有机会立即生效。c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
ctx.Done() 被调用时,你可以 Close() 连接,这样 ReadMessage() 会返回错误 use of closed network connection把 ReadMessage() 放到独立 goroutine,然后 select 等待:
readCh := make(chan struct {
msgType int
msg []byte
err error
})
go func() {
mt, msg, err := c.Conn.ReadMessage()
readCh <- struct {
msgType int
msg []byte
err error
}{mt, msg, err}
}()
select {
case <-c.ctx.Done():
c.Conn.Close() // 中断阻塞
return c.ctx.Err()
case <-c.Session.ctx.Done():
c.Conn.Close()
return nil
case r := <-readCh:
if r.err != nil {
return r.err
}
// 处理 r.msg
}
这样就不会被阻塞的 ReadMessage() 卡住了。
最简单、也是 Go WebSocket 通常做法:
c.Conn.SetReadDeadline(time.Now().Add(time.Second)) // 每秒检查
for {
select {
case <-c.ctx.Done():
c.Conn.Close() // 立即中断 ReadMessage
return c.ctx.Err()
default:
_, _, err := c.Conn.ReadMessage()
if err != nil {
return err
}
}
}
阻塞在 ReadMessage() 时,select <-ctx.Done() 不会立即生效
必须:
Conn.Close() 来中断阻塞SetReadDeadline + PongHandler 配合心跳直接依赖 <-ctx.Done() 不足以立即退出
如果你需要,我可以帮你改写你的 ReadPump(),做到:
c.ctx.Done() 或 c.Session.ctx.Done() 调用时立即退出ReadMessage() 可以安全中断你希望我帮你写吗?