146 lines
3.0 KiB
Go
146 lines
3.0 KiB
Go
// Package hub fans out live UDP frames to connected WebSocket clients.
|
|
//
|
|
// A slow or stalled client cannot block the broadcast path: each client owns
|
|
// a buffered send channel and is dropped if the channel fills up.
|
|
package hub
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log/slog"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/gofiber/websocket/v2"
|
|
)
|
|
|
|
// Client is a connected live websocket viewer with its own send queue.
|
|
type Client struct {
|
|
conn *websocket.Conn
|
|
send chan []byte
|
|
closed atomic.Bool
|
|
}
|
|
|
|
func (c *Client) close() {
|
|
if c.closed.CompareAndSwap(false, true) {
|
|
close(c.send)
|
|
_ = c.conn.Close()
|
|
}
|
|
}
|
|
|
|
// Addr returns the client's remote address as a string.
|
|
func (c *Client) Addr() string { return c.conn.RemoteAddr().String() }
|
|
|
|
// Hub fans out the latest state to all connected live clients.
|
|
type Hub struct {
|
|
mu sync.RWMutex
|
|
clients map[*Client]struct{}
|
|
latest atomic.Pointer[[]byte] // most recent broadcast frame, for bootstrapping new clients
|
|
events chan []byte // out-of-band server→client events
|
|
}
|
|
|
|
// New constructs a Hub and starts its event pump.
|
|
func New() *Hub {
|
|
h := &Hub{
|
|
clients: make(map[*Client]struct{}),
|
|
events: make(chan []byte, 32),
|
|
}
|
|
go h.eventPump()
|
|
return h
|
|
}
|
|
|
|
func (h *Hub) eventPump() {
|
|
for msg := range h.events {
|
|
h.Broadcast(msg)
|
|
}
|
|
}
|
|
|
|
// PublishEvent JSON-encodes payload and best-effort fans it out to all clients.
|
|
func (h *Hub) PublishEvent(payload any) {
|
|
b, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return
|
|
}
|
|
select {
|
|
case h.events <- b:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Add registers a new client and immediately sends the latest snapshot.
|
|
func (h *Hub) Add(conn *websocket.Conn) *Client {
|
|
cl := &Client{conn: conn, send: make(chan []byte, 64)}
|
|
go cl.writer()
|
|
|
|
h.mu.Lock()
|
|
h.clients[cl] = struct{}{}
|
|
h.mu.Unlock()
|
|
|
|
if latest := h.latest.Load(); latest != nil {
|
|
select {
|
|
case cl.send <- *latest:
|
|
default:
|
|
}
|
|
}
|
|
return cl
|
|
}
|
|
|
|
// Remove unregisters and closes a client.
|
|
func (h *Hub) Remove(cl *Client) {
|
|
h.mu.Lock()
|
|
delete(h.clients, cl)
|
|
h.mu.Unlock()
|
|
cl.close()
|
|
}
|
|
|
|
// Broadcast publishes data with non-blocking sends; clients whose queues are
|
|
// full are dropped.
|
|
func (h *Hub) Broadcast(data []byte) {
|
|
cp := make([]byte, len(data))
|
|
copy(cp, data)
|
|
h.latest.Store(&cp)
|
|
|
|
var drop []*Client
|
|
h.mu.RLock()
|
|
for cl := range h.clients {
|
|
select {
|
|
case cl.send <- data:
|
|
default:
|
|
drop = append(drop, cl)
|
|
}
|
|
}
|
|
h.mu.RUnlock()
|
|
|
|
if len(drop) == 0 {
|
|
return
|
|
}
|
|
h.mu.Lock()
|
|
for _, cl := range drop {
|
|
delete(h.clients, cl)
|
|
}
|
|
h.mu.Unlock()
|
|
for _, cl := range drop {
|
|
slog.Warn("dropping slow client", "addr", cl.Addr())
|
|
cl.close()
|
|
}
|
|
}
|
|
|
|
// Count returns the number of connected live clients.
|
|
func (h *Hub) Count() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.clients)
|
|
}
|
|
|
|
// writer drains the client's send queue, writing frames to its websocket.
|
|
func (c *Client) writer() {
|
|
for msg := range c.send {
|
|
if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil {
|
|
c.closed.Store(true)
|
|
_ = c.conn.Close()
|
|
for range c.send {
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|