// Package hub fans out live UDP frames to connected WebSocket clients. // // A slow or stalled client cannot block the broadcast path: each client owns // a buffered send channel and is dropped if the channel fills up. package hub import ( "encoding/json" "log/slog" "sync" "sync/atomic" "github.com/gofiber/websocket/v2" ) // Client is a connected live websocket viewer with its own send queue. type Client struct { conn *websocket.Conn send chan []byte closed atomic.Bool } func (c *Client) close() { if c.closed.CompareAndSwap(false, true) { close(c.send) _ = c.conn.Close() } } // Addr returns the client's remote address as a string. func (c *Client) Addr() string { return c.conn.RemoteAddr().String() } // Hub fans out the latest state to all connected live clients. type Hub struct { mu sync.RWMutex clients map[*Client]struct{} latest atomic.Pointer[[]byte] // most recent broadcast frame, for bootstrapping new clients events chan []byte // out-of-band server→client events } // New constructs a Hub and starts its event pump. func New() *Hub { h := &Hub{ clients: make(map[*Client]struct{}), events: make(chan []byte, 32), } go h.eventPump() return h } func (h *Hub) eventPump() { for msg := range h.events { h.Broadcast(msg) } } // PublishEvent JSON-encodes payload and best-effort fans it out to all clients. func (h *Hub) PublishEvent(payload any) { b, err := json.Marshal(payload) if err != nil { return } select { case h.events <- b: default: } } // Add registers a new client and immediately sends the latest snapshot. func (h *Hub) Add(conn *websocket.Conn) *Client { cl := &Client{conn: conn, send: make(chan []byte, 64)} go cl.writer() h.mu.Lock() h.clients[cl] = struct{}{} h.mu.Unlock() if latest := h.latest.Load(); latest != nil { select { case cl.send <- *latest: default: } } return cl } // Remove unregisters and closes a client. func (h *Hub) Remove(cl *Client) { h.mu.Lock() delete(h.clients, cl) h.mu.Unlock() cl.close() } // Broadcast publishes data with non-blocking sends; clients whose queues are // full are dropped. func (h *Hub) Broadcast(data []byte) { cp := make([]byte, len(data)) copy(cp, data) h.latest.Store(&cp) var drop []*Client h.mu.RLock() for cl := range h.clients { select { case cl.send <- data: default: drop = append(drop, cl) } } h.mu.RUnlock() if len(drop) == 0 { return } h.mu.Lock() for _, cl := range drop { delete(h.clients, cl) } h.mu.Unlock() for _, cl := range drop { slog.Warn("dropping slow client", "addr", cl.Addr()) cl.close() } } // Count returns the number of connected live clients. func (h *Hub) Count() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) } // writer drains the client's send queue, writing frames to its websocket. func (c *Client) writer() { for msg := range c.send { if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil { c.closed.Store(true) _ = c.conn.Close() for range c.send { } return } } }