ReadPump会一直阻塞在ReadMessage 是否需要 c.ctx.Done(): c.Session.ctx.Done()

最后发布时间:2026-03-02 11:36:10 浏览量:
func (c *Client) ReadPump() error {
	defer func() {
		// On exit, unregister the client and close the connection
		// send c to unregister channel
		// c.session.Close()
		// close(c.send)
		// close the session
		// close the connection
		// c.conn.Close()
		log.Println("ws-conn: close readPump!")
	}()

	c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
	c.Conn.SetPongHandler(func(appData string) error {
		c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
		return nil
	})

	c.Session.LLMConsumer()
	c.Session.MonitorSubSize()

	for {
		select {
		// case <-c.session.Done:
		// 	log.Println("Session processing done, exiting readPump.")
		// 	return
		case <-c.ctx.Done():
			log.Println("ws-conn: ws-conn: Client context done, exiting readPump.")
			return c.ctx.Err()
		case <-c.Session.ctx.Done():
			log.Println("Session is closed, exiting readPump.")
			return nil
		default:
			// broadcast the received message to all clients
			// c.hub.broadcast <- message
		}
		msgType, message, err := c.Conn.ReadMessage()
		
}

对,你观察得很准确:

ReadPump() 里的核心调用:

msgType, message, err := c.Conn.ReadMessage()

阻塞的。也就是说,如果没有数据到来,它会一直卡在这一行。


1️⃣ c.ctx.Done()c.Session.ctx.Done() 的作用

  • select 里的 <-c.ctx.Done()<-c.Session.ctx.Done() 永远不会触发,直到 ReadMessage() 返回或者设置了 ReadDeadline
  • 因为 selectdefault 分支后才会执行 ReadMessage(),所以实际上你的阻塞调用会让 <-ctx.Done() 没有机会立即生效。

2️⃣ 解决方案

方法一:使用 SetReadDeadline + PongHandler

c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.Conn.SetPongHandler(func(string) error {
    c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
    return nil
})
  • 你已经做了这一点
  • ctx.Done() 被调用时,你可以 Close() 连接,这样 ReadMessage() 会返回错误 use of closed network connection
  • goroutine 退出

方法二:用 goroutine + channel 组合

ReadMessage() 放到独立 goroutine,然后 select 等待:

readCh := make(chan struct {
    msgType int
    msg     []byte
    err     error
})

go func() {
    mt, msg, err := c.Conn.ReadMessage()
    readCh <- struct {
        msgType int
        msg     []byte
        err     error
    }{mt, msg, err}
}()

select {
case <-c.ctx.Done():
    c.Conn.Close() // 中断阻塞
    return c.ctx.Err()
case <-c.Session.ctx.Done():
    c.Conn.Close()
    return nil
case r := <-readCh:
    if r.err != nil {
        return r.err
    }
    // 处理 r.msg
}

这样就不会被阻塞的 ReadMessage() 卡住了。


方法三:使用 SetReadDeadline + Close

最简单、也是 Go WebSocket 通常做法:

c.Conn.SetReadDeadline(time.Now().Add(time.Second)) // 每秒检查
for {
    select {
    case <-c.ctx.Done():
        c.Conn.Close() // 立即中断 ReadMessage
        return c.ctx.Err()
    default:
        _, _, err := c.Conn.ReadMessage()
        if err != nil {
            return err
        }
    }
}
  • 缺点:会触发很多超时,需要合理设置间隔

✅ 总结

  • 阻塞在 ReadMessage() 时,select <-ctx.Done() 不会立即生效

  • 必须

    1. 调用 Conn.Close() 来中断阻塞
    2. 或使用 SetReadDeadline + PongHandler 配合心跳
  • 直接依赖 <-ctx.Done() 不足以立即退出


如果你需要,我可以帮你改写你的 ReadPump(),做到:

  • c.ctx.Done()c.Session.ctx.Done() 调用时立即退出
  • 阻塞的 ReadMessage() 可以安全中断
  • 保证 WritePump/ReadPump 与 Hub 协同安全退出

你希望我帮你写吗?