Files
fdestefano d5e65fbb03
release / goreleaser (push) Failing after 51s
Release v1.0.0 #major
2026-06-02 23:12:36 -05:00

146 lines
3.0 KiB
Go

// Package hub fans out live UDP frames to connected WebSocket clients.
//
// A slow or stalled client cannot block the broadcast path: each client owns
// a buffered send channel and is dropped if the channel fills up.
package hub
import (
"encoding/json"
"log/slog"
"sync"
"sync/atomic"
"github.com/gofiber/websocket/v2"
)
// Client is a connected live websocket viewer with its own send queue.
type Client struct {
conn *websocket.Conn
send chan []byte
closed atomic.Bool
}
func (c *Client) close() {
if c.closed.CompareAndSwap(false, true) {
close(c.send)
_ = c.conn.Close()
}
}
// Addr returns the client's remote address as a string.
func (c *Client) Addr() string { return c.conn.RemoteAddr().String() }
// Hub fans out the latest state to all connected live clients.
type Hub struct {
mu sync.RWMutex
clients map[*Client]struct{}
latest atomic.Pointer[[]byte] // most recent broadcast frame, for bootstrapping new clients
events chan []byte // out-of-band server→client events
}
// New constructs a Hub and starts its event pump.
func New() *Hub {
h := &Hub{
clients: make(map[*Client]struct{}),
events: make(chan []byte, 32),
}
go h.eventPump()
return h
}
func (h *Hub) eventPump() {
for msg := range h.events {
h.Broadcast(msg)
}
}
// PublishEvent JSON-encodes payload and best-effort fans it out to all clients.
func (h *Hub) PublishEvent(payload any) {
b, err := json.Marshal(payload)
if err != nil {
return
}
select {
case h.events <- b:
default:
}
}
// Add registers a new client and immediately sends the latest snapshot.
func (h *Hub) Add(conn *websocket.Conn) *Client {
cl := &Client{conn: conn, send: make(chan []byte, 64)}
go cl.writer()
h.mu.Lock()
h.clients[cl] = struct{}{}
h.mu.Unlock()
if latest := h.latest.Load(); latest != nil {
select {
case cl.send <- *latest:
default:
}
}
return cl
}
// Remove unregisters and closes a client.
func (h *Hub) Remove(cl *Client) {
h.mu.Lock()
delete(h.clients, cl)
h.mu.Unlock()
cl.close()
}
// Broadcast publishes data with non-blocking sends; clients whose queues are
// full are dropped.
func (h *Hub) Broadcast(data []byte) {
cp := make([]byte, len(data))
copy(cp, data)
h.latest.Store(&cp)
var drop []*Client
h.mu.RLock()
for cl := range h.clients {
select {
case cl.send <- data:
default:
drop = append(drop, cl)
}
}
h.mu.RUnlock()
if len(drop) == 0 {
return
}
h.mu.Lock()
for _, cl := range drop {
delete(h.clients, cl)
}
h.mu.Unlock()
for _, cl := range drop {
slog.Warn("dropping slow client", "addr", cl.Addr())
cl.close()
}
}
// Count returns the number of connected live clients.
func (h *Hub) Count() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.clients)
}
// writer drains the client's send queue, writing frames to its websocket.
func (c *Client) writer() {
for msg := range c.send {
if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil {
c.closed.Store(true)
_ = c.conn.Close()
for range c.send {
}
return
}
}
}