好,我帮你写一个 可直接运行的 Go WebSocket 生产级模板,支持以下功能:
下面是完整示例:
package main
import (
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
// ==================== WebSocket 客户端 ====================
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
}
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(512)
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("read error: %v", err)
}
break
}
// 广播收到的消息
c.hub.broadcast <- message
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(50 * time.Second)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
// Hub 关闭 channel
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
log.Println("write error:", err)
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// ==================== Hub 管理 ====================
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
}
func newHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
}
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
log.Println("新客户端加入,当前在线:", len(h.clients))
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
log.Println("客户端离开,当前在线:", len(h.clients))
}
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
// ==================== HTTP/WebSocket ====================
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // 生产中可做跨域或 token 检查
},
}
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("升级 WebSocket 失败:", err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
client.hub.register <- client
go client.writePump()
go client.readPump()
}
func main() {
hub := newHub()
go hub.run()
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
log.Println("WebSocket 服务器启动: :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal("ListenAndServe:", err)
}
}
register / unregister / broadcast 统一管理。readPump 负责接收消息并广播。writePump 负责发送消息和心跳 ping。writePump 定时发送 ping。readPump pong 自动刷新 readDeadline。client.send 使用 buffered channel 防止发送阻塞。这个模板可以直接用作生产基础。
我可以帮你进一步升级,做 分布式支持 + Redis Pub/Sub 跨实例广播 + 限流,这样就可以支撑高并发和集群环境。
你想我帮你加上这个功能吗?