Go WebSocket 生产级模板

最后发布时间:2026-02-04 14:41:52 浏览量:

好,我帮你写一个 可直接运行的 Go WebSocket 生产级模板,支持以下功能:

  • 多客户端管理(注册、注销)
  • 广播消息
  • 每个客户端独立读写 goroutine
  • ping/pong 心跳检测
  • 异步消息发送,防止阻塞
  • 简单日志打印

下面是完整示例:

package main

import (
    "log"
    "net/http"
    "time"

    "github.com/gorilla/websocket"
)

// ==================== WebSocket 客户端 ====================
type Client struct {
    hub  *Hub
    conn *websocket.Conn
    send chan []byte
}

func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()
    c.conn.SetReadLimit(512)
    c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
    c.conn.SetPongHandler(func(string) error {
        c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
        return nil
    })

    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                log.Printf("read error: %v", err)
            }
            break
        }
        // 广播收到的消息
        c.hub.broadcast <- message
    }
}

func (c *Client) writePump() {
    ticker := time.NewTicker(50 * time.Second)
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if !ok {
                // Hub 关闭 channel
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
                log.Println("write error:", err)
                return
            }
        case <-ticker.C:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

// ==================== Hub 管理 ====================
type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
}

func newHub() *Hub {
    return &Hub{
        clients:    make(map[*Client]bool),
        broadcast:  make(chan []byte),
        register:   make(chan *Client),
        unregister: make(chan *Client),
    }
}

func (h *Hub) run() {
    for {
        select {
        case client := <-h.register:
            h.clients[client] = true
            log.Println("新客户端加入,当前在线:", len(h.clients))
        case client := <-h.unregister:
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                close(client.send)
                log.Println("客户端离开,当前在线:", len(h.clients))
            }
        case message := <-h.broadcast:
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    close(client.send)
                    delete(h.clients, client)
                }
            }
        }
    }
}

// ==================== HTTP/WebSocket ====================
var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true // 生产中可做跨域或 token 检查
    },
}

func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println("升级 WebSocket 失败:", err)
        return
    }
    client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
    client.hub.register <- client

    go client.writePump()
    go client.readPump()
}

func main() {
    hub := newHub()
    go hub.run()

    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        serveWs(hub, w, r)
    })

    log.Println("WebSocket 服务器启动: :8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatal("ListenAndServe:", err)
    }
}

✅ 特性说明

  1. Hub 管理所有客户端
  • register / unregister / broadcast 统一管理。
  1. 每个客户端独立读写 goroutine
  • readPump 负责接收消息并广播。
  • writePump 负责发送消息和心跳 ping。
  1. 心跳检测
  • writePump 定时发送 ping。
  • readPump pong 自动刷新 readDeadline。
  1. 非阻塞发送
  • client.send 使用 buffered channel 防止发送阻塞。
  1. 安全退出
  • 客户端异常断开,Hub 自动注销并关闭 channel。

这个模板可以直接用作生产基础。

我可以帮你进一步升级,做 分布式支持 + Redis Pub/Sub 跨实例广播 + 限流,这样就可以支撑高并发和集群环境。

你想我帮你加上这个功能吗?

快捷入口
Go 思维导图 浏览PDF 下载PDF
分享到:
标签