package recording import ( "database/sql" "encoding/json" "log/slog" "strconv" "sync" "time" "github.com/gofiber/websocket/v2" "github.com/klauspost/compress/zstd" ) // PlayerCmd is a client→server playback control message. type PlayerCmd struct { Cmd string `json:"cmd"` Ms int64 `json:"ms,omitempty"` Value float64 `json:"value,omitempty"` } // Player streams a single recording over a websocket with seek/pause/speed // controls. Frames are streamed from SQLite in a sliding window so memory use // is bounded. type Player struct { db *sql.DB conn *websocket.Conn recordingID int64 totalFrames int durationMs int64 dec *zstd.Decoder mu sync.Mutex speed float64 playing bool positionMs int64 cmdCh chan PlayerCmd stopCh chan struct{} } // NewPlayer constructs a Player bound to a websocket connection. func NewPlayer(db *sql.DB, conn *websocket.Conn, id int64) (*Player, error) { dec, err := zstd.NewReader(nil) if err != nil { return nil, err } return &Player{ db: db, conn: conn, recordingID: id, dec: dec, speed: 1.0, playing: true, cmdCh: make(chan PlayerCmd, 16), stopCh: make(chan struct{}), }, nil } func (p *Player) sendJSON(v any) error { b, err := json.Marshal(v) if err != nil { return err } return p.conn.WriteMessage(websocket.TextMessage, b) } func (p *Player) sendErr(msg string) { _ = p.sendJSON(map[string]any{"error": msg}) } func (p *Player) sendState() { p.mu.Lock() playing, speed, pos := p.playing, p.speed, p.positionMs p.mu.Unlock() _ = p.sendJSON(map[string]any{ "type": "playback_state", "playing": playing, "speed": speed, "position_ms": pos, }) } // Run executes the playback loop. It returns when playback ends, the client // stops it, or the connection drops. func (p *Player) Run() { defer p.dec.Close() var name string if err := p.db.QueryRow(`SELECT name FROM recordings WHERE id = ?`, p.recordingID).Scan(&name); err != nil { p.sendErr("recording not found") return } if err := p.db.QueryRow(`SELECT COUNT(*), COALESCE(MAX(offset_ms),0) FROM frames WHERE recording_id = ?`, p.recordingID). Scan(&p.totalFrames, &p.durationMs); err != nil { p.sendErr("failed to read recording") return } if p.totalFrames == 0 { p.sendErr("no frames in recording") return } _ = p.sendJSON(map[string]any{ "type": "playback_start", "name": name, "frames": p.totalFrames, "duration_ms": p.durationMs, }) go p.readCommands() const prefetch = 256 type frame struct { ms int64 data []byte codec int } var buf []frame cursor := int64(-1) frameIdx := 0 refill := func(fromMs int64) error { buf = buf[:0] rows, err := p.db.Query( `SELECT offset_ms, data, codec FROM frames WHERE recording_id = ? AND offset_ms >= ? ORDER BY offset_ms LIMIT ?`, p.recordingID, fromMs, prefetch) if err != nil { return err } defer rows.Close() for rows.Next() { var f frame if err := rows.Scan(&f.ms, &f.data, &f.codec); err != nil { continue } buf = append(buf, f) } return nil } doSeek := func(targetMs int64) { if targetMs < 0 { targetMs = 0 } if targetMs > p.durationMs { targetMs = p.durationMs } p.mu.Lock() p.positionMs = targetMs p.mu.Unlock() cursor = targetMs - 1 buf = buf[:0] var idx int if err := p.db.QueryRow( `SELECT COUNT(*) FROM frames WHERE recording_id = ? AND offset_ms < ?`, p.recordingID, targetMs).Scan(&idx); err == nil { frameIdx = idx } p.sendState() } if err := refill(0); err != nil { p.sendErr("read error") return } p.sendState() anchorWall := time.Now() anchorPos := int64(0) timer := time.NewTimer(time.Hour) defer timer.Stop() armTimer := func() { p.mu.Lock() playing, speed, pos := p.playing, p.speed, p.positionMs p.mu.Unlock() if !playing || len(buf) == 0 { if !timer.Stop() { select { case <-timer.C: default: } } timer.Reset(time.Hour) return } next := buf[0] delta := time.Duration(float64(next.ms-pos)/speed) * time.Millisecond if delta < 0 { delta = 0 } if !timer.Stop() { select { case <-timer.C: default: } } timer.Reset(delta) } armTimer() stateBroadcast := time.NewTicker(250 * time.Millisecond) defer stateBroadcast.Stop() for { select { case <-p.stopCh: return case cmd := <-p.cmdCh: switch cmd.Cmd { case "play": p.mu.Lock() if !p.playing { p.playing = true anchorWall = time.Now() anchorPos = p.positionMs } p.mu.Unlock() p.sendState() armTimer() case "pause": p.mu.Lock() if p.playing { elapsed := time.Since(anchorWall) p.positionMs = anchorPos + int64(float64(elapsed.Milliseconds())*p.speed) if p.positionMs > p.durationMs { p.positionMs = p.durationMs } p.playing = false } p.mu.Unlock() p.sendState() armTimer() case "speed": p.mu.Lock() if cmd.Value > 0 && cmd.Value <= 16 { if p.playing { elapsed := time.Since(anchorWall) p.positionMs = anchorPos + int64(float64(elapsed.Milliseconds())*p.speed) } p.speed = cmd.Value anchorWall = time.Now() anchorPos = p.positionMs } p.mu.Unlock() p.sendState() armTimer() case "seek": doSeek(cmd.Ms) if err := refill(cmd.Ms); err != nil { p.sendErr("read error") return } p.mu.Lock() anchorWall = time.Now() anchorPos = p.positionMs p.mu.Unlock() armTimer() case "stop": return } case <-stateBroadcast.C: p.mu.Lock() if p.playing { elapsed := time.Since(anchorWall) p.positionMs = anchorPos + int64(float64(elapsed.Milliseconds())*p.speed) if p.positionMs > p.durationMs { p.positionMs = p.durationMs } } p.mu.Unlock() p.sendState() case <-timer.C: p.mu.Lock() if !p.playing { p.mu.Unlock() armTimer() continue } elapsed := time.Since(anchorWall) pos := anchorPos + int64(float64(elapsed.Milliseconds())*p.speed) p.positionMs = pos p.mu.Unlock() for len(buf) > 0 && buf[0].ms <= p.positionMs { f := buf[0] buf = buf[1:] data := f.data if f.codec == 1 { out, err := p.dec.DecodeAll(f.data, nil) if err != nil { slog.Warn("frame decode failed", "err", err) continue } data = out } frameIdx++ msg := append([]byte(nil), `{"type":"playback_frame","offset_ms":`...) msg = strconv.AppendInt(msg, f.ms, 10) msg = append(msg, `,"frame":`...) msg = strconv.AppendInt(msg, int64(frameIdx), 10) msg = append(msg, `,"data":`...) msg = append(msg, data...) msg = append(msg, '}') if err := p.conn.WriteMessage(websocket.TextMessage, msg); err != nil { return } cursor = f.ms } if len(buf) == 0 { if err := refill(cursor + 1); err != nil { p.sendErr("read error") return } if len(buf) == 0 { _ = p.sendJSON(map[string]any{"type": "playback_end"}) return } } armTimer() } } } func (p *Player) readCommands() { for { _, msg, err := p.conn.ReadMessage() if err != nil { close(p.stopCh) return } var cmd PlayerCmd if err := json.Unmarshal(msg, &cmd); err != nil { continue } select { case p.cmdCh <- cmd: default: } } }