343 lines
7.2 KiB
Go
343 lines
7.2 KiB
Go
package recording
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"log/slog"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gofiber/websocket/v2"
|
|
"github.com/klauspost/compress/zstd"
|
|
)
|
|
|
|
// PlayerCmd is a client→server playback control message.
|
|
type PlayerCmd struct {
|
|
Cmd string `json:"cmd"`
|
|
Ms int64 `json:"ms,omitempty"`
|
|
Value float64 `json:"value,omitempty"`
|
|
}
|
|
|
|
// Player streams a single recording over a websocket with seek/pause/speed
|
|
// controls. Frames are streamed from SQLite in a sliding window so memory use
|
|
// is bounded.
|
|
type Player struct {
|
|
db *sql.DB
|
|
conn *websocket.Conn
|
|
recordingID int64
|
|
totalFrames int
|
|
durationMs int64
|
|
dec *zstd.Decoder
|
|
|
|
mu sync.Mutex
|
|
speed float64
|
|
playing bool
|
|
positionMs int64
|
|
|
|
cmdCh chan PlayerCmd
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
// NewPlayer constructs a Player bound to a websocket connection.
|
|
func NewPlayer(db *sql.DB, conn *websocket.Conn, id int64) (*Player, error) {
|
|
dec, err := zstd.NewReader(nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Player{
|
|
db: db,
|
|
conn: conn,
|
|
recordingID: id,
|
|
dec: dec,
|
|
speed: 1.0,
|
|
playing: true,
|
|
cmdCh: make(chan PlayerCmd, 16),
|
|
stopCh: make(chan struct{}),
|
|
}, nil
|
|
}
|
|
|
|
func (p *Player) sendJSON(v any) error {
|
|
b, err := json.Marshal(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return p.conn.WriteMessage(websocket.TextMessage, b)
|
|
}
|
|
|
|
func (p *Player) sendErr(msg string) { _ = p.sendJSON(map[string]any{"error": msg}) }
|
|
|
|
func (p *Player) sendState() {
|
|
p.mu.Lock()
|
|
playing, speed, pos := p.playing, p.speed, p.positionMs
|
|
p.mu.Unlock()
|
|
_ = p.sendJSON(map[string]any{
|
|
"type": "playback_state", "playing": playing, "speed": speed, "position_ms": pos,
|
|
})
|
|
}
|
|
|
|
// Run executes the playback loop. It returns when playback ends, the client
|
|
// stops it, or the connection drops.
|
|
func (p *Player) Run() {
|
|
defer p.dec.Close()
|
|
|
|
var name string
|
|
if err := p.db.QueryRow(`SELECT name FROM recordings WHERE id = ?`, p.recordingID).Scan(&name); err != nil {
|
|
p.sendErr("recording not found")
|
|
return
|
|
}
|
|
if err := p.db.QueryRow(`SELECT COUNT(*), COALESCE(MAX(offset_ms),0) FROM frames WHERE recording_id = ?`, p.recordingID).
|
|
Scan(&p.totalFrames, &p.durationMs); err != nil {
|
|
p.sendErr("failed to read recording")
|
|
return
|
|
}
|
|
if p.totalFrames == 0 {
|
|
p.sendErr("no frames in recording")
|
|
return
|
|
}
|
|
|
|
_ = p.sendJSON(map[string]any{
|
|
"type": "playback_start", "name": name,
|
|
"frames": p.totalFrames, "duration_ms": p.durationMs,
|
|
})
|
|
|
|
go p.readCommands()
|
|
|
|
const prefetch = 256
|
|
type frame struct {
|
|
ms int64
|
|
data []byte
|
|
codec int
|
|
}
|
|
var buf []frame
|
|
cursor := int64(-1)
|
|
frameIdx := 0
|
|
|
|
refill := func(fromMs int64) error {
|
|
buf = buf[:0]
|
|
rows, err := p.db.Query(
|
|
`SELECT offset_ms, data, codec FROM frames WHERE recording_id = ? AND offset_ms >= ? ORDER BY offset_ms LIMIT ?`,
|
|
p.recordingID, fromMs, prefetch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var f frame
|
|
if err := rows.Scan(&f.ms, &f.data, &f.codec); err != nil {
|
|
continue
|
|
}
|
|
buf = append(buf, f)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
doSeek := func(targetMs int64) {
|
|
if targetMs < 0 {
|
|
targetMs = 0
|
|
}
|
|
if targetMs > p.durationMs {
|
|
targetMs = p.durationMs
|
|
}
|
|
p.mu.Lock()
|
|
p.positionMs = targetMs
|
|
p.mu.Unlock()
|
|
cursor = targetMs - 1
|
|
buf = buf[:0]
|
|
var idx int
|
|
if err := p.db.QueryRow(
|
|
`SELECT COUNT(*) FROM frames WHERE recording_id = ? AND offset_ms < ?`,
|
|
p.recordingID, targetMs).Scan(&idx); err == nil {
|
|
frameIdx = idx
|
|
}
|
|
p.sendState()
|
|
}
|
|
|
|
if err := refill(0); err != nil {
|
|
p.sendErr("read error")
|
|
return
|
|
}
|
|
p.sendState()
|
|
|
|
anchorWall := time.Now()
|
|
anchorPos := int64(0)
|
|
|
|
timer := time.NewTimer(time.Hour)
|
|
defer timer.Stop()
|
|
|
|
armTimer := func() {
|
|
p.mu.Lock()
|
|
playing, speed, pos := p.playing, p.speed, p.positionMs
|
|
p.mu.Unlock()
|
|
|
|
if !playing || len(buf) == 0 {
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
timer.Reset(time.Hour)
|
|
return
|
|
}
|
|
next := buf[0]
|
|
delta := time.Duration(float64(next.ms-pos)/speed) * time.Millisecond
|
|
if delta < 0 {
|
|
delta = 0
|
|
}
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
timer.Reset(delta)
|
|
}
|
|
|
|
armTimer()
|
|
|
|
stateBroadcast := time.NewTicker(250 * time.Millisecond)
|
|
defer stateBroadcast.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-p.stopCh:
|
|
return
|
|
|
|
case cmd := <-p.cmdCh:
|
|
switch cmd.Cmd {
|
|
case "play":
|
|
p.mu.Lock()
|
|
if !p.playing {
|
|
p.playing = true
|
|
anchorWall = time.Now()
|
|
anchorPos = p.positionMs
|
|
}
|
|
p.mu.Unlock()
|
|
p.sendState()
|
|
armTimer()
|
|
case "pause":
|
|
p.mu.Lock()
|
|
if p.playing {
|
|
elapsed := time.Since(anchorWall)
|
|
p.positionMs = anchorPos + int64(float64(elapsed.Milliseconds())*p.speed)
|
|
if p.positionMs > p.durationMs {
|
|
p.positionMs = p.durationMs
|
|
}
|
|
p.playing = false
|
|
}
|
|
p.mu.Unlock()
|
|
p.sendState()
|
|
armTimer()
|
|
case "speed":
|
|
p.mu.Lock()
|
|
if cmd.Value > 0 && cmd.Value <= 16 {
|
|
if p.playing {
|
|
elapsed := time.Since(anchorWall)
|
|
p.positionMs = anchorPos + int64(float64(elapsed.Milliseconds())*p.speed)
|
|
}
|
|
p.speed = cmd.Value
|
|
anchorWall = time.Now()
|
|
anchorPos = p.positionMs
|
|
}
|
|
p.mu.Unlock()
|
|
p.sendState()
|
|
armTimer()
|
|
case "seek":
|
|
doSeek(cmd.Ms)
|
|
if err := refill(cmd.Ms); err != nil {
|
|
p.sendErr("read error")
|
|
return
|
|
}
|
|
p.mu.Lock()
|
|
anchorWall = time.Now()
|
|
anchorPos = p.positionMs
|
|
p.mu.Unlock()
|
|
armTimer()
|
|
case "stop":
|
|
return
|
|
}
|
|
|
|
case <-stateBroadcast.C:
|
|
p.mu.Lock()
|
|
if p.playing {
|
|
elapsed := time.Since(anchorWall)
|
|
p.positionMs = anchorPos + int64(float64(elapsed.Milliseconds())*p.speed)
|
|
if p.positionMs > p.durationMs {
|
|
p.positionMs = p.durationMs
|
|
}
|
|
}
|
|
p.mu.Unlock()
|
|
p.sendState()
|
|
|
|
case <-timer.C:
|
|
p.mu.Lock()
|
|
if !p.playing {
|
|
p.mu.Unlock()
|
|
armTimer()
|
|
continue
|
|
}
|
|
elapsed := time.Since(anchorWall)
|
|
pos := anchorPos + int64(float64(elapsed.Milliseconds())*p.speed)
|
|
p.positionMs = pos
|
|
p.mu.Unlock()
|
|
|
|
for len(buf) > 0 && buf[0].ms <= p.positionMs {
|
|
f := buf[0]
|
|
buf = buf[1:]
|
|
data := f.data
|
|
if f.codec == 1 {
|
|
out, err := p.dec.DecodeAll(f.data, nil)
|
|
if err != nil {
|
|
slog.Warn("frame decode failed", "err", err)
|
|
continue
|
|
}
|
|
data = out
|
|
}
|
|
frameIdx++
|
|
msg := append([]byte(nil), `{"type":"playback_frame","offset_ms":`...)
|
|
msg = strconv.AppendInt(msg, f.ms, 10)
|
|
msg = append(msg, `,"frame":`...)
|
|
msg = strconv.AppendInt(msg, int64(frameIdx), 10)
|
|
msg = append(msg, `,"data":`...)
|
|
msg = append(msg, data...)
|
|
msg = append(msg, '}')
|
|
if err := p.conn.WriteMessage(websocket.TextMessage, msg); err != nil {
|
|
return
|
|
}
|
|
cursor = f.ms
|
|
}
|
|
|
|
if len(buf) == 0 {
|
|
if err := refill(cursor + 1); err != nil {
|
|
p.sendErr("read error")
|
|
return
|
|
}
|
|
if len(buf) == 0 {
|
|
_ = p.sendJSON(map[string]any{"type": "playback_end"})
|
|
return
|
|
}
|
|
}
|
|
armTimer()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Player) readCommands() {
|
|
for {
|
|
_, msg, err := p.conn.ReadMessage()
|
|
if err != nil {
|
|
close(p.stopCh)
|
|
return
|
|
}
|
|
var cmd PlayerCmd
|
|
if err := json.Unmarshal(msg, &cmd); err != nil {
|
|
continue
|
|
}
|
|
select {
|
|
case p.cmdCh <- cmd:
|
|
default:
|
|
}
|
|
}
|
|
}
|