diff --git a/.gitea/workflows/release.yaml b/.gitea/workflows/release.yaml new file mode 100644 index 0000000..743faab --- /dev/null +++ b/.gitea/workflows/release.yaml @@ -0,0 +1,34 @@ +name: release + +# Build and publish a release whenever a semver tag is pushed. +on: + push: + tags: + - "v*" + +jobs: + goreleaser: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + # GoReleaser needs the full history and tags for the changelog. + fetch-depth: 0 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: "1.26.x" + check-latest: false + cache: false + + - name: Run GoReleaser + uses: goreleaser/goreleaser-action@v6 + with: + version: "~> v2" + args: release --clean + env: + # Gitea injects a repo-scoped token automatically; GoReleaser reads + # GITEA_TOKEN to publish the release to this instance. + GITEA_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.goreleaser.yaml b/.goreleaser.yaml new file mode 100644 index 0000000..ee8f6e6 --- /dev/null +++ b/.goreleaser.yaml @@ -0,0 +1,68 @@ +# GoReleaser configuration +# https://goreleaser.com +version: 2 + +project_name: RocketLeagueBot-Renderer + +before: + hooks: + - go mod tidy + +builds: + - id: renderer + main: . + binary: RocketLeagueBot-Renderer + # Pure-Go build (WASM SQLite, no cgo) — fully static, cross-compiles cleanly. + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + - windows + goarch: + - amd64 + - arm64 + ignore: + - goos: windows + goarch: arm64 + flags: + - -trimpath + ldflags: + - -s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.Date}} + +archives: + - id: default + formats: [tar.gz] + format_overrides: + - goos: windows + formats: [zip] + name_template: >- + {{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }} + files: + - README.md + +checksum: + name_template: checksums.txt + +snapshot: + version_template: "{{ incpatch .Version }}-snapshot-{{ .ShortCommit }}" + +changelog: + sort: asc + filters: + exclude: + - "^docs:" + - "^test:" + - "^chore:" + - "Merge pull request" + - "Merge branch" + +# Publish releases to the Gitea instance. +gitea_urls: + api: https://git.destefano.cloud/api/v1 + download: https://git.destefano.cloud + +release: + gitea: + owner: fdestefano + name: RocketLeagueBot-Renderer diff --git a/README.md b/README.md index 2b8fd72..3532c6c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,187 @@ # RocketLeagueBot-Renderer +A web-based 3D Rocket League **live viewer and replay system** for telemetry +streamed from a RocketSimVis-compatible UDP source (e.g., a training bot). +Built with Go (Fiber + SQLite + zstd) and Three.js. + +``` + bot ──UDP──▶ ┌──────────────────────┐ + │ udpListener │ + │ │ │ + │ ├──▶ Hub ───WS──▶│──▶ live viewers (read-only) + │ │ │ + │ └──▶ Recorder ──▶│──▶ SQLite (zstd frames) + └──────────┬───────────┘ + │ + ▼ + Player ──WS──▶ playback viewer (seek/pause/speed) +``` + +## Features + +- **Live UDP ingest** at any rate; non-blocking hub keeps a slow client from + stalling the whole pipeline. +- **3D Three.js viewer**: orbit, zoom, pan, mobile-friendly, low-quality mode + for weak GPUs. +- **Rich HUD**: ball position/height/speed/distance to each goal, per-car boost, + speed, distance to ball, flags (DEMO / AIR / SONIC / TOUCH), packets-per-sec + and last-packet age. +- **Automatic recording** to SQLite with optional **zstd compression** (default + on). Typical compression ratio of 5–10× on JSON telemetry. +- **Frame-accurate playback** with **seek bar, pause, variable speed** + (0.25× – 8×), keyboard shortcuts, and autoplay-to-newer. +- **Server-pushed events**: new/stopped/deleted recordings reflect in the UI + without polling. +- **Live `/api/stats`** endpoint and an in-panel stats box (uptime, packets, + drops, db size, compression ratio). +- **HTTPS via Let's Encrypt autocert** with HTTP→HTTPS redirect. +- **Basic Auth** on every endpoint (constant-time comparison). +- **Graceful shutdown** flushes the recorder before exiting. + +## Example + +![Live viewer with ball trail and predicted path](imgs/Screenshot_2026-05-16_00-32-01.png) + +The viewer shows a detailed arena, car models (blue/orange), ball with ring indicator, boost pads with pulsing animation when active, and optional ball-trail/prediction lines. The HUD displays real-time metrics, and the side panel lists recordings and server stats. + +## Quickstart + +```sh +git clone +cd RocketLeagueBot-Renderer +go build -o RocketLeagueBot-Renderer . + +# Local HTTP only +./RocketLeagueBot-Renderer -password=secret -http=:8080 + +# Production with HTTPS (ports 80+443 must be reachable, DNS pointed at host) +sudo ./RocketLeagueBot-Renderer -password=secret -domain=example.com +``` + +Open the viewer at `http://localhost:8080` (or `https://example.com`). Log in +with any username and the password you set. + +## Flags + +| Flag | Default | Purpose | +|----------------|--------------------|--------------------------------------------------------| +| `-password` | *(required)* | Password for HTTP Basic Auth. | +| `-domain` | *(empty)* | Domain for Let's Encrypt autocert. Empty → HTTP only. | +| `-http` | `:80` | HTTP listen address (must be `:80` for ACME). | +| `-tls` | `:443` | HTTPS listen address. | +| `-udp` | `:9273` | UDP ingest address. | +| `-certdir` | `./certs` | Directory for autocert cache. | +| `-db` | `./recordings.db` | SQLite database file. | +| `-retention` | `168h` (7 days) | How long to keep recordings. | +| `-compress` | `true` | zstd-compress recorded frames. | +| `-verbose` | `false` | Log every HTTP request (including WS upgrades). | +| `-log-level` | `info` | `debug` / `info` / `warn` / `error`. | + +## UDP payload + +Bytes received on the UDP port are treated as **opaque** — they are recorded +verbatim (optionally compressed) and forwarded verbatim to live viewers. +The viewer expects each packet to be a JSON object compatible with +[RocketSimVis](https://github.com/ZealanL/RocketSimVis), specifically: + +```jsonc +{ + "ball_phys": { "pos": [x,y,z], "vel": [x,y,z], "ang_vel": [...] }, + "cars": [ + { + "car_id": 0, "team_num": 0, + "phys": { "pos": [x,y,z], "vel": [...], "forward": [...], "up": [...], "right": [...] }, + "boost_amount": 0.0..1.0, "is_demoed": false, + "on_ground": true, "ball_touched": false + } + ], + "boost_pad_states": [true, false, ...] // length 34, same order as PAD_DEFS in viewer.html +} +``` + +Coordinates are Unreal Units (1 uu = 1 cm). Field dimensions used by the +viewer are the official RL values (4096 × 5120 × 2044 uu). + +Quick UDP smoke test (requires `nc`): + +```sh +echo '{"ball_phys":{"pos":[0,0,200],"vel":[0,0,0]},"cars":[]}' | nc -u -w0 127.0.0.1 9273 +``` + +## Playback controls + +| Action | Mouse/UI | Keyboard | +|-----------------|-------------------------|-----------------------| +| Play / Pause | ⏸ / ▶ button | `Space` | +| Scrub | Drag the seek bar | `←` / `→` (±5 s) | +| Step ±1 frame | ⏮ / ⏭ buttons | `Shift+←` / `Shift+→` | +| Speed down/up | Speed dropdown | `[` / `]` | +| Stop playback | ✕ button or LIVE button | — | +| Toggle autoplay | "Auto: ON/OFF" button | — | + +Speed and autoplay preferences are persisted in `localStorage`. + +## API + +All endpoints require HTTP Basic Auth. + +- `GET /` — Viewer HTML. +- `GET /ws` — Live WebSocket stream (text JSON). + Also delivers out-of-band server events: + `{"type":"event","kind":"recording_started|stopped|deleted","recording":{...}}`. +- `GET /api/recordings?since=&limit=` — Newest-first JSON list of recordings. + `since` is RFC3339; `limit` is optional row cap. +- `DELETE /api/recordings/:id` — Delete a recording (cascades frames). +- `GET /ws/playback?id=` — Playback WebSocket. Client sends: + - `{"cmd":"play"}` / `{"cmd":"pause"}` / `{"cmd":"stop"}` + - `{"cmd":"seek","ms":12345}` + - `{"cmd":"speed","value":2.0}` + + Server emits: + - `{"type":"playback_start","name":...,"frames":N,"duration_ms":D}` + - `{"type":"playback_frame","offset_ms":T,"frame":i,"data":}` + - `{"type":"playback_state","playing":bool,"speed":s,"position_ms":T}` (every ~250 ms) + - `{"type":"playback_end"}` / `{"error":"..."}` + +- `GET /api/stats` — Process & DB observability JSON. + +## Storage & tuning + +The recorder buffers frames in memory and flushes to SQLite either every +**64 frames** or every **100 ms**, whichever comes first, in a single +transaction. This collapses ~120 fsyncs/sec down to ~10/sec at typical +telemetry rates. + +With `-compress=true` (default), each frame is zstd-encoded individually. +Typical RocketSimVis JSON shrinks 5–10×. Disable with `-compress=false` if you +want raw JSON in the DB. + +Idle detection automatically ends the current recording after **5 seconds** +without packets, finalizing the row (frame count + duration). + +Old recordings are pruned every hour according to `-retention`. + +## Security + +- Always run behind HTTPS in production; use `-domain` for free Let's Encrypt + certs (ports 80 + 443 must be reachable). +- Behind a reverse proxy: ensure it forwards `Authorization` headers and + supports WebSocket upgrades on `/ws` and `/ws/playback`. Disable any + buffering on those paths. + +## Dependencies + +- Go 1.22+ (uses `log/slog`, `signal.NotifyContext`) +- [Fiber](https://gofiber.io/), [websocket](https://github.com/gofiber/websocket) +- [klauspost/compress](https://github.com/klauspost/compress) (zstd) +- [ncruces/go-sqlite3](https://github.com/ncruces/go-sqlite3) (pure-Go SQLite) +- [Three.js](https://threejs.org/) (loaded from CDN by the viewer) + +## License + +MIT. + +--- + +**Note:** Not affiliated with Psyonix or Rocket League. For educational and +personal use only. diff --git a/assets/assets.go b/assets/assets.go new file mode 100644 index 0000000..5c66288 --- /dev/null +++ b/assets/assets.go @@ -0,0 +1,7 @@ +// Package assets embeds static files served by the renderer. +package assets + +import "embed" + +//go:embed viewer.html +var FS embed.FS diff --git a/assets/viewer.html b/assets/viewer.html new file mode 100644 index 0000000..752a70b --- /dev/null +++ b/assets/viewer.html @@ -0,0 +1,1647 @@ + + + + RocketSim Viewer + + + + + +
Waiting for data...
+
Disconnected
+
Drag to orbit · Scroll to zoom · Right-drag to pan · Space=play/pause · ←/→=±5s · [/]=speed · C=chase · V=vel · H=heat · M=map
+ +
+ +
+

+ 📼 Recordings + + + +

+
+ + + + + + + + + + +
+
Loading...
+
+
+ +
+ + + + + + 0:00.000 / 0:00.000 + + + +
+ + + + + + + + + + + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..fb1ff1a --- /dev/null +++ b/go.mod @@ -0,0 +1,32 @@ +module git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer + +go 1.26.1 + +require ( + github.com/gofiber/fiber/v2 v2.52.13 + github.com/gofiber/websocket/v2 v2.2.1 + github.com/klauspost/compress v1.18.6 + github.com/ncruces/go-sqlite3 v0.34.3 + golang.org/x/crypto v0.52.0 +) + +require ( + github.com/andybalholm/brotli v1.2.1 // indirect + github.com/clipperhouse/uax29/v2 v2.7.0 // indirect + github.com/fasthttp/websocket v1.5.12 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/mattn/go-colorable v0.1.15 // indirect + github.com/mattn/go-isatty v0.0.22 // indirect + github.com/mattn/go-runewidth v0.0.24 // indirect + github.com/ncruces/go-sqlite3-wasm v1.6.35300 // indirect + github.com/ncruces/go-sqlite3-wasm/v2 v2.6.35301 // indirect + github.com/ncruces/julianday v1.0.0 // indirect + github.com/rivo/uniseg v0.4.7 // indirect + github.com/savsgio/gotils v0.0.0-20250924091648-bce9a52d7761 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.71.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect + golang.org/x/net v0.55.0 // indirect + golang.org/x/sys v0.45.0 // indirect + golang.org/x/text v0.37.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..60e0a90 --- /dev/null +++ b/go.sum @@ -0,0 +1,81 @@ +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eTWro= +github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/clipperhouse/uax29/v2 v2.7.0 h1:+gs4oBZ2gPfVrKPthwbMzWZDaAFPGYK72F0NJv2v7Vk= +github.com/clipperhouse/uax29/v2 v2.7.0/go.mod h1:EFJ2TJMRUaplDxHKj1qAEhCtQPW2tJSwu5BF98AuoVM= +github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek= +github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs= +github.com/fasthttp/websocket v1.5.12 h1:e4RGPpWW2HTbL3zV0Y/t7g0ub294LkiuXXUuTOUInlE= +github.com/fasthttp/websocket v1.5.12/go.mod h1:I+liyL7/4moHojiOgUOIKEWm9EIxHqxZChS+aMFltyg= +github.com/gofiber/fiber/v2 v2.52.12 h1:0LdToKclcPOj8PktUdIKo9BUohjjwfnQl42Dhw8/WUw= +github.com/gofiber/fiber/v2 v2.52.12/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= +github.com/gofiber/fiber/v2 v2.52.13 h1:TOKP64iqC9b5P49VrBW5tHhUOvDyrtJ0xePEfzJbCbk= +github.com/gofiber/fiber/v2 v2.52.13/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= +github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w= +github.com/gofiber/websocket/v2 v2.2.1/go.mod h1:Ao/+nyNnX5u/hIFPuHl28a+NIkrqK7PRimyKaj4JxVU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao= +github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-colorable v0.1.15 h1:+u9SLTRGnXv73cEsnsmoZBom+dMU88B2M0aDcWy0/jY= +github.com/mattn/go-colorable v0.1.15/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.22 h1:j8l17JJ9i6VGPUFUYoTUKPSgKe/83EYU2zBC7YNKMw4= +github.com/mattn/go-isatty v0.0.22/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-runewidth v0.0.24 h1:cpokDiIn0MGnhdHwuWnJBITySJ20QyNGnY2kR/ay2DU= +github.com/mattn/go-runewidth v0.0.24/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= +github.com/ncruces/go-sqlite3 v0.33.3 h1:6jCR3KuGvJSEwhaQrkHDGeIe2qCQ6nOUDNsPz7ZIotw= +github.com/ncruces/go-sqlite3 v0.33.3/go.mod h1:t2Osfw0wcKzJTgv2EvrkTtVLqlbKTA5Yvwb2ypAlBcY= +github.com/ncruces/go-sqlite3 v0.34.3 h1:DYN5fylZd9C28Rgmo6ie8vbv1SB9/Ddy0kVBTEzM9nk= +github.com/ncruces/go-sqlite3 v0.34.3/go.mod h1:+NodfP7QTkYGFUAlAcPVcStvlfVTiqMC3OLKTaYiQCM= +github.com/ncruces/go-sqlite3-wasm v1.1.1-0.20260409221933-87e4b35a38d0 h1:ymE9H30x1AyW5VfMNkJC9teuI2W1jjMsQS7kc6zl6Tg= +github.com/ncruces/go-sqlite3-wasm v1.1.1-0.20260409221933-87e4b35a38d0/go.mod h1:/H3+JykPsfSlvKbOxNSx9kKwm3ecqQGzyCs1e9KkNsU= +github.com/ncruces/go-sqlite3-wasm v1.6.35300 h1:oP/nsJa8m3F3XmOmnAkbX6Ak736ZMraX/cbX9WS4vtc= +github.com/ncruces/go-sqlite3-wasm v1.6.35300/go.mod h1:Mj3jy2UWyqA3oWNNS3WTDkWdKxSvBabg0s1l12IsKgk= +github.com/ncruces/go-sqlite3-wasm/v2 v2.6.35301 h1:AaRg2q4Cl7iQ4HjDpN24pfZT4Nvaq739eUBjpHmcNVU= +github.com/ncruces/go-sqlite3-wasm/v2 v2.6.35301/go.mod h1:KONJCOH3+oHW5QSxKC0THTFHBj4wPFeqyHvTqFW5R8E= +github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M= +github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= +github.com/savsgio/gotils v0.0.0-20250924091648-bce9a52d7761 h1:McifyVxygw1d67y6vxUqls2D46J8W9nrki9c8c0eVvE= +github.com/savsgio/gotils v0.0.0-20250924091648-bce9a52d7761/go.mod h1:Vi9gvHvTw4yCUHIznFl5TPULS7aXwgaTByGeBY75Wko= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA= +github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= +github.com/valyala/fasthttp v1.71.0 h1:tepR7H+Guh9VUqxxcPggYi8R3lGUu2Rsdh+z7/FCY3k= +github.com/valyala/fasthttp v1.71.0/go.mod h1:z1sDUvOShhXq/C9mwH/fSm1Vb71tUJwmQdgkBrBNwnA= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= +golang.org/x/crypto v0.52.0 h1:RMs7fP2rXdep0CftQlK8Uf+kibLm7qkCcradZWYz988= +golang.org/x/crypto v0.52.0/go.mod h1:1QgfPxDqh0T2M/elOJtp9RvuR95kVjir0e6/BvEmGbc= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= +golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8= +golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY= +golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= +golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= +golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= diff --git a/imgs/Screenshot_2026-05-16_00-32-01.png b/imgs/Screenshot_2026-05-16_00-32-01.png new file mode 100644 index 0000000..ca7f684 Binary files /dev/null and b/imgs/Screenshot_2026-05-16_00-32-01.png differ diff --git a/internal/hub/hub.go b/internal/hub/hub.go new file mode 100644 index 0000000..61a65d7 --- /dev/null +++ b/internal/hub/hub.go @@ -0,0 +1,145 @@ +// Package hub fans out live UDP frames to connected WebSocket clients. +// +// A slow or stalled client cannot block the broadcast path: each client owns +// a buffered send channel and is dropped if the channel fills up. +package hub + +import ( + "encoding/json" + "log/slog" + "sync" + "sync/atomic" + + "github.com/gofiber/websocket/v2" +) + +// Client is a connected live websocket viewer with its own send queue. +type Client struct { + conn *websocket.Conn + send chan []byte + closed atomic.Bool +} + +func (c *Client) close() { + if c.closed.CompareAndSwap(false, true) { + close(c.send) + _ = c.conn.Close() + } +} + +// Addr returns the client's remote address as a string. +func (c *Client) Addr() string { return c.conn.RemoteAddr().String() } + +// Hub fans out the latest state to all connected live clients. +type Hub struct { + mu sync.RWMutex + clients map[*Client]struct{} + latest atomic.Pointer[[]byte] // most recent broadcast frame, for bootstrapping new clients + events chan []byte // out-of-band server→client events +} + +// New constructs a Hub and starts its event pump. +func New() *Hub { + h := &Hub{ + clients: make(map[*Client]struct{}), + events: make(chan []byte, 32), + } + go h.eventPump() + return h +} + +func (h *Hub) eventPump() { + for msg := range h.events { + h.Broadcast(msg) + } +} + +// PublishEvent JSON-encodes payload and best-effort fans it out to all clients. +func (h *Hub) PublishEvent(payload any) { + b, err := json.Marshal(payload) + if err != nil { + return + } + select { + case h.events <- b: + default: + } +} + +// Add registers a new client and immediately sends the latest snapshot. +func (h *Hub) Add(conn *websocket.Conn) *Client { + cl := &Client{conn: conn, send: make(chan []byte, 64)} + go cl.writer() + + h.mu.Lock() + h.clients[cl] = struct{}{} + h.mu.Unlock() + + if latest := h.latest.Load(); latest != nil { + select { + case cl.send <- *latest: + default: + } + } + return cl +} + +// Remove unregisters and closes a client. +func (h *Hub) Remove(cl *Client) { + h.mu.Lock() + delete(h.clients, cl) + h.mu.Unlock() + cl.close() +} + +// Broadcast publishes data with non-blocking sends; clients whose queues are +// full are dropped. +func (h *Hub) Broadcast(data []byte) { + cp := make([]byte, len(data)) + copy(cp, data) + h.latest.Store(&cp) + + var drop []*Client + h.mu.RLock() + for cl := range h.clients { + select { + case cl.send <- data: + default: + drop = append(drop, cl) + } + } + h.mu.RUnlock() + + if len(drop) == 0 { + return + } + h.mu.Lock() + for _, cl := range drop { + delete(h.clients, cl) + } + h.mu.Unlock() + for _, cl := range drop { + slog.Warn("dropping slow client", "addr", cl.Addr()) + cl.close() + } +} + +// Count returns the number of connected live clients. +func (h *Hub) Count() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) +} + +// writer drains the client's send queue, writing frames to its websocket. +func (c *Client) writer() { + for msg := range c.send { + if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil { + c.closed.Store(true) + _ = c.conn.Close() + for range c.send { + } + return + } + } +} diff --git a/internal/recording/db.go b/internal/recording/db.go new file mode 100644 index 0000000..0635d71 --- /dev/null +++ b/internal/recording/db.go @@ -0,0 +1,169 @@ +// Package recording handles persistence (SQLite), capture (Recorder), and +// playback (Player) of UDP telemetry frames. +package recording + +import ( + "database/sql" + "fmt" + "log/slog" + "os" + "time" + + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/hub" +) + +// Info is the JSON shape returned by /api/recordings. +type Info struct { + ID int64 `json:"id"` + Name string `json:"name"` + CreatedAt string `json:"created_at"` + Frames int `json:"frames"` + DurationMs int64 `json:"duration_ms"` +} + +// OpenDB opens (or creates) the SQLite database and ensures the schema is +// up-to-date, performing any necessary additive migrations. +func OpenDB(path string) (*sql.DB, error) { + db, err := sql.Open("sqlite3", path+"?_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=foreign_keys(ON)") + if err != nil { + return nil, fmt.Errorf("open: %w", err) + } + db.SetMaxOpenConns(4) + + if _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS recordings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + frames INTEGER DEFAULT 0, + duration_ms INTEGER DEFAULT 0 + ); + CREATE TABLE IF NOT EXISTS frames ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + recording_id INTEGER NOT NULL, + offset_ms INTEGER NOT NULL, + data BLOB NOT NULL, + FOREIGN KEY (recording_id) REFERENCES recordings(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_frames_recording ON frames(recording_id, offset_ms); + CREATE INDEX IF NOT EXISTS idx_recordings_created_at ON recordings(created_at); + `); err != nil { + return nil, fmt.Errorf("create tables: %w", err) + } + + if !columnExists(db, "frames", "codec") { + if _, err := db.Exec(`ALTER TABLE frames ADD COLUMN codec INTEGER NOT NULL DEFAULT 0`); err != nil { + return nil, fmt.Errorf("add codec column: %w", err) + } + slog.Info("migration: added frames.codec column") + } + + return db, nil +} + +func columnExists(db *sql.DB, table, col string) bool { + rows, err := db.Query(fmt.Sprintf("PRAGMA table_info(%s)", table)) + if err != nil { + return false + } + defer rows.Close() + for rows.Next() { + var cid int + var name, ctype string + var notnull, pk int + var dflt sql.NullString + if err := rows.Scan(&cid, &name, &ctype, ¬null, &dflt, &pk); err != nil { + continue + } + if name == col { + return true + } + } + return false +} + +// List returns recordings ordered newest-first. +// since: optional RFC3339 timestamp lower bound (empty disables). +// limit: max rows (0 = no limit). +func List(db *sql.DB, since string, limit int) ([]Info, error) { + q := `SELECT id, name, created_at, frames, duration_ms FROM recordings` + args := []any{} + if since != "" { + if t, err := time.Parse(time.RFC3339, since); err == nil { + q += ` WHERE created_at >= ?` + args = append(args, t.UTC().Format("2006-01-02 15:04:05")) + } + } + q += ` ORDER BY created_at DESC` + if limit > 0 { + q += ` LIMIT ?` + args = append(args, limit) + } + rows, err := db.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + out := []Info{} + for rows.Next() { + var r Info + if err := rows.Scan(&r.ID, &r.Name, &r.CreatedAt, &r.Frames, &r.DurationMs); err != nil { + continue + } + out = append(out, r) + } + return out, nil +} + +// Delete removes a recording by ID. CASCADE drops associated frames. +// Returns the rows affected. +func Delete(db *sql.DB, id int64) (int64, error) { + res, err := db.Exec(`DELETE FROM recordings WHERE id = ?`, id) + if err != nil { + return 0, err + } + n, _ := res.RowsAffected() + return n, nil +} + +// CleanOld removes recordings older than retention and emits deletion events +// to the hub. +func CleanOld(db *sql.DB, h *hub.Hub, retention time.Duration) { + cutoff := time.Now().Add(-retention).UTC().Format("2006-01-02 15:04:05") + rows, err := db.Query(`SELECT id FROM recordings WHERE created_at < ?`, cutoff) + if err != nil { + slog.Error("cleanup query error", "err", err) + return + } + var ids []int64 + for rows.Next() { + var id int64 + if err := rows.Scan(&id); err == nil { + ids = append(ids, id) + } + } + rows.Close() + if len(ids) == 0 { + return + } + if _, err := db.Exec(`DELETE FROM recordings WHERE created_at < ?`, cutoff); err != nil { + slog.Error("cleanup error", "err", err) + return + } + slog.Info("cleaned old recordings", "n", len(ids)) + for _, id := range ids { + h.PublishEvent(map[string]any{ + "type": "event", "kind": "recording_deleted", + "recording": map[string]any{"id": id}, + }) + } +} + +// DBSize returns the size of the database file in bytes, or 0 if it cannot +// be stat'd. +func DBSize(path string) int64 { + if fi, err := os.Stat(path); err == nil { + return fi.Size() + } + return 0 +} diff --git a/internal/recording/player.go b/internal/recording/player.go new file mode 100644 index 0000000..86a0b94 --- /dev/null +++ b/internal/recording/player.go @@ -0,0 +1,342 @@ +package recording + +import ( + "database/sql" + "encoding/json" + "log/slog" + "strconv" + "sync" + "time" + + "github.com/gofiber/websocket/v2" + "github.com/klauspost/compress/zstd" +) + +// PlayerCmd is a client→server playback control message. +type PlayerCmd struct { + Cmd string `json:"cmd"` + Ms int64 `json:"ms,omitempty"` + Value float64 `json:"value,omitempty"` +} + +// Player streams a single recording over a websocket with seek/pause/speed +// controls. Frames are streamed from SQLite in a sliding window so memory use +// is bounded. +type Player struct { + db *sql.DB + conn *websocket.Conn + recordingID int64 + totalFrames int + durationMs int64 + dec *zstd.Decoder + + mu sync.Mutex + speed float64 + playing bool + positionMs int64 + + cmdCh chan PlayerCmd + stopCh chan struct{} +} + +// NewPlayer constructs a Player bound to a websocket connection. +func NewPlayer(db *sql.DB, conn *websocket.Conn, id int64) (*Player, error) { + dec, err := zstd.NewReader(nil) + if err != nil { + return nil, err + } + return &Player{ + db: db, + conn: conn, + recordingID: id, + dec: dec, + speed: 1.0, + playing: true, + cmdCh: make(chan PlayerCmd, 16), + stopCh: make(chan struct{}), + }, nil +} + +func (p *Player) sendJSON(v any) error { + b, err := json.Marshal(v) + if err != nil { + return err + } + return p.conn.WriteMessage(websocket.TextMessage, b) +} + +func (p *Player) sendErr(msg string) { _ = p.sendJSON(map[string]any{"error": msg}) } + +func (p *Player) sendState() { + p.mu.Lock() + playing, speed, pos := p.playing, p.speed, p.positionMs + p.mu.Unlock() + _ = p.sendJSON(map[string]any{ + "type": "playback_state", "playing": playing, "speed": speed, "position_ms": pos, + }) +} + +// Run executes the playback loop. It returns when playback ends, the client +// stops it, or the connection drops. +func (p *Player) Run() { + defer p.dec.Close() + + var name string + if err := p.db.QueryRow(`SELECT name FROM recordings WHERE id = ?`, p.recordingID).Scan(&name); err != nil { + p.sendErr("recording not found") + return + } + if err := p.db.QueryRow(`SELECT COUNT(*), COALESCE(MAX(offset_ms),0) FROM frames WHERE recording_id = ?`, p.recordingID). + Scan(&p.totalFrames, &p.durationMs); err != nil { + p.sendErr("failed to read recording") + return + } + if p.totalFrames == 0 { + p.sendErr("no frames in recording") + return + } + + _ = p.sendJSON(map[string]any{ + "type": "playback_start", "name": name, + "frames": p.totalFrames, "duration_ms": p.durationMs, + }) + + go p.readCommands() + + const prefetch = 256 + type frame struct { + ms int64 + data []byte + codec int + } + var buf []frame + cursor := int64(-1) + frameIdx := 0 + + refill := func(fromMs int64) error { + buf = buf[:0] + rows, err := p.db.Query( + `SELECT offset_ms, data, codec FROM frames WHERE recording_id = ? AND offset_ms >= ? ORDER BY offset_ms LIMIT ?`, + p.recordingID, fromMs, prefetch) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var f frame + if err := rows.Scan(&f.ms, &f.data, &f.codec); err != nil { + continue + } + buf = append(buf, f) + } + return nil + } + + doSeek := func(targetMs int64) { + if targetMs < 0 { + targetMs = 0 + } + if targetMs > p.durationMs { + targetMs = p.durationMs + } + p.mu.Lock() + p.positionMs = targetMs + p.mu.Unlock() + cursor = targetMs - 1 + buf = buf[:0] + var idx int + if err := p.db.QueryRow( + `SELECT COUNT(*) FROM frames WHERE recording_id = ? AND offset_ms < ?`, + p.recordingID, targetMs).Scan(&idx); err == nil { + frameIdx = idx + } + p.sendState() + } + + if err := refill(0); err != nil { + p.sendErr("read error") + return + } + p.sendState() + + anchorWall := time.Now() + anchorPos := int64(0) + + timer := time.NewTimer(time.Hour) + defer timer.Stop() + + armTimer := func() { + p.mu.Lock() + playing, speed, pos := p.playing, p.speed, p.positionMs + p.mu.Unlock() + + if !playing || len(buf) == 0 { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(time.Hour) + return + } + next := buf[0] + delta := time.Duration(float64(next.ms-pos)/speed) * time.Millisecond + if delta < 0 { + delta = 0 + } + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(delta) + } + + armTimer() + + stateBroadcast := time.NewTicker(250 * time.Millisecond) + defer stateBroadcast.Stop() + + for { + select { + case <-p.stopCh: + return + + case cmd := <-p.cmdCh: + switch cmd.Cmd { + case "play": + p.mu.Lock() + if !p.playing { + p.playing = true + anchorWall = time.Now() + anchorPos = p.positionMs + } + p.mu.Unlock() + p.sendState() + armTimer() + case "pause": + p.mu.Lock() + if p.playing { + elapsed := time.Since(anchorWall) + p.positionMs = anchorPos + int64(float64(elapsed.Milliseconds())*p.speed) + if p.positionMs > p.durationMs { + p.positionMs = p.durationMs + } + p.playing = false + } + p.mu.Unlock() + p.sendState() + armTimer() + case "speed": + p.mu.Lock() + if cmd.Value > 0 && cmd.Value <= 16 { + if p.playing { + elapsed := time.Since(anchorWall) + p.positionMs = anchorPos + int64(float64(elapsed.Milliseconds())*p.speed) + } + p.speed = cmd.Value + anchorWall = time.Now() + anchorPos = p.positionMs + } + p.mu.Unlock() + p.sendState() + armTimer() + case "seek": + doSeek(cmd.Ms) + if err := refill(cmd.Ms); err != nil { + p.sendErr("read error") + return + } + p.mu.Lock() + anchorWall = time.Now() + anchorPos = p.positionMs + p.mu.Unlock() + armTimer() + case "stop": + return + } + + case <-stateBroadcast.C: + p.mu.Lock() + if p.playing { + elapsed := time.Since(anchorWall) + p.positionMs = anchorPos + int64(float64(elapsed.Milliseconds())*p.speed) + if p.positionMs > p.durationMs { + p.positionMs = p.durationMs + } + } + p.mu.Unlock() + p.sendState() + + case <-timer.C: + p.mu.Lock() + if !p.playing { + p.mu.Unlock() + armTimer() + continue + } + elapsed := time.Since(anchorWall) + pos := anchorPos + int64(float64(elapsed.Milliseconds())*p.speed) + p.positionMs = pos + p.mu.Unlock() + + for len(buf) > 0 && buf[0].ms <= p.positionMs { + f := buf[0] + buf = buf[1:] + data := f.data + if f.codec == 1 { + out, err := p.dec.DecodeAll(f.data, nil) + if err != nil { + slog.Warn("frame decode failed", "err", err) + continue + } + data = out + } + frameIdx++ + msg := append([]byte(nil), `{"type":"playback_frame","offset_ms":`...) + msg = strconv.AppendInt(msg, f.ms, 10) + msg = append(msg, `,"frame":`...) + msg = strconv.AppendInt(msg, int64(frameIdx), 10) + msg = append(msg, `,"data":`...) + msg = append(msg, data...) + msg = append(msg, '}') + if err := p.conn.WriteMessage(websocket.TextMessage, msg); err != nil { + return + } + cursor = f.ms + } + + if len(buf) == 0 { + if err := refill(cursor + 1); err != nil { + p.sendErr("read error") + return + } + if len(buf) == 0 { + _ = p.sendJSON(map[string]any{"type": "playback_end"}) + return + } + } + armTimer() + } + } +} + +func (p *Player) readCommands() { + for { + _, msg, err := p.conn.ReadMessage() + if err != nil { + close(p.stopCh) + return + } + var cmd PlayerCmd + if err := json.Unmarshal(msg, &cmd); err != nil { + continue + } + select { + case p.cmdCh <- cmd: + default: + } + } +} diff --git a/internal/recording/recorder.go b/internal/recording/recorder.go new file mode 100644 index 0000000..9fe9168 --- /dev/null +++ b/internal/recording/recorder.go @@ -0,0 +1,238 @@ +package recording + +import ( + "database/sql" + "log/slog" + "time" + + "github.com/klauspost/compress/zstd" + + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/hub" + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/stats" +) + +const ( + flushBatchSize = 64 + flushInterval = 100 * time.Millisecond + framesChanCap = 1024 + idleTimeout = 5 * time.Second +) + +type rawFrame struct { + t time.Time + data []byte +} + +// Recorder receives raw UDP frames and persists them in batched, optionally +// zstd-compressed inserts to SQLite. A single writer goroutine owns the DB +// handles so there is no contention on the UDP hot path. +type Recorder struct { + db *sql.DB + hub *hub.Hub + frames chan rawFrame + compress bool + enc *zstd.Encoder + + // session bookkeeping (writer-goroutine-owned) + sessionID int64 + sessionName string + startTime time.Time + frameCount int + lastFrameAt time.Time + + insertFrame *sql.Stmt + insertSession *sql.Stmt + updateSession *sql.Stmt + + doneCh chan struct{} + stoppedCh chan struct{} +} + +// NewRecorder constructs a Recorder. Call Start to launch the writer goroutine. +func NewRecorder(db *sql.DB, h *hub.Hub, compress bool) (*Recorder, error) { + r := &Recorder{ + db: db, + hub: h, + frames: make(chan rawFrame, framesChanCap), + compress: compress, + doneCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + } + var err error + if r.insertFrame, err = db.Prepare(`INSERT INTO frames(recording_id, offset_ms, data, codec) VALUES(?,?,?,?)`); err != nil { + return nil, err + } + if r.insertSession, err = db.Prepare(`INSERT INTO recordings(name) VALUES(?)`); err != nil { + return nil, err + } + if r.updateSession, err = db.Prepare(`UPDATE recordings SET frames=?, duration_ms=? WHERE id=?`); err != nil { + return nil, err + } + if compress { + enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) + if err != nil { + return nil, err + } + r.enc = enc + } + return r, nil +} + +// Submit hands a UDP packet to the recorder. Non-blocking: if the queue is +// full the frame is dropped and counted in stats.FramesDropped. +func (r *Recorder) Submit(data []byte) { + cp := make([]byte, len(data)) + copy(cp, data) + select { + case r.frames <- rawFrame{t: time.Now(), data: cp}: + default: + stats.Global.FramesDropped.Add(1) + } +} + +// Start launches the writer goroutine. Stop must be called to flush. +func (r *Recorder) Start() { go r.run() } + +// Stop signals the writer to drain the queue and exit, then waits for it. +func (r *Recorder) Stop() { + close(r.doneCh) + <-r.stoppedCh +} + +func (r *Recorder) run() { + defer close(r.stoppedCh) + + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + idleCheck := time.NewTicker(idleTimeout / 2) + defer idleCheck.Stop() + + batch := make([]rawFrame, 0, flushBatchSize) + flush := func() { + if len(batch) == 0 { + return + } + if err := r.writeBatch(batch); err != nil { + slog.Error("flush batch failed", "err", err, "n", len(batch)) + } + batch = batch[:0] + } + + for { + select { + case <-r.doneCh: + drain: + for { + select { + case f := <-r.frames: + batch = append(batch, f) + if len(batch) >= flushBatchSize { + flush() + } + default: + break drain + } + } + flush() + r.finalizeSession() + return + + case f := <-r.frames: + batch = append(batch, f) + if len(batch) >= flushBatchSize { + flush() + } + + case <-ticker.C: + flush() + + case <-idleCheck.C: + if r.sessionID != 0 && time.Since(r.lastFrameAt) > idleTimeout { + flush() + r.finalizeSession() + } + } + } +} + +// writeBatch persists frames within a single transaction. If no session is +// active, one is started using the first frame's timestamp. +func (r *Recorder) writeBatch(batch []rawFrame) error { + tx, err := r.db.Begin() + if err != nil { + return err + } + + if r.sessionID == 0 { + first := batch[0].t + r.sessionName = first.Format("2006-01-02_15-04-05") + res, err := tx.Stmt(r.insertSession).Exec(r.sessionName) + if err != nil { + _ = tx.Rollback() + return err + } + r.sessionID, _ = res.LastInsertId() + r.startTime = first + r.frameCount = 0 + slog.Info("recording started", "name", r.sessionName, "id", r.sessionID) + r.hub.PublishEvent(map[string]any{ + "type": "event", "kind": "recording_started", + "recording": map[string]any{"id": r.sessionID, "name": r.sessionName}, + }) + } + + stmt := tx.Stmt(r.insertFrame) + for _, f := range batch { + payload := f.data + codec := 0 + stats.Global.BytesBeforeCompr.Add(int64(len(payload))) + if r.compress && r.enc != nil { + payload = r.enc.EncodeAll(f.data, make([]byte, 0, len(f.data)/2+64)) + codec = 1 + } + stats.Global.BytesAfterCompr.Add(int64(len(payload))) + offset := f.t.Sub(r.startTime).Milliseconds() + if _, err := stmt.Exec(r.sessionID, offset, payload, codec); err != nil { + _ = tx.Rollback() + return err + } + r.frameCount++ + r.lastFrameAt = f.t + stats.Global.FramesWritten.Add(1) + } + return tx.Commit() +} + +func (r *Recorder) finalizeSession() { + if r.sessionID == 0 { + return + } + duration := r.lastFrameAt.Sub(r.startTime).Milliseconds() + if _, err := r.updateSession.Exec(r.frameCount, duration, r.sessionID); err != nil { + slog.Error("finalize session failed", "err", err) + } + slog.Info("recording stopped", "name", r.sessionName, "frames", r.frameCount, "duration_ms", duration) + r.hub.PublishEvent(map[string]any{ + "type": "event", "kind": "recording_stopped", + "recording": map[string]any{ + "id": r.sessionID, "name": r.sessionName, + "frames": r.frameCount, "duration_ms": duration, + }, + }) + r.sessionID = 0 + r.sessionName = "" + r.frameCount = 0 +} + +// CurrentSession returns a snapshot of the active recording, or zero values +// if none. Reads are not strictly synchronized; slight staleness is acceptable +// for the /api/stats endpoint. +func (r *Recorder) CurrentSession() (id int64, name string, frames int, elapsedMs int64) { + id = r.sessionID + name = r.sessionName + frames = r.frameCount + if id != 0 { + elapsedMs = time.Since(r.startTime).Milliseconds() + } + return +} diff --git a/internal/server/auth.go b/internal/server/auth.go new file mode 100644 index 0000000..2c3d4f6 --- /dev/null +++ b/internal/server/auth.go @@ -0,0 +1,107 @@ +// Package server wires HTTP/WS routes and the UDP listener to the rest of +// the application. It owns the Fiber app and the autocert setup. +package server + +import ( + "context" + "crypto/subtle" + "encoding/base64" + "log/slog" + "net" + "os" + "strings" + "time" + + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" + + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/hub" + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/recording" + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/stats" +) + +// AuthMiddleware returns a Fiber handler enforcing HTTP Basic Auth with the +// given password. WebSocket upgrades skip auth because browsers cannot +// reliably supply Authorization headers on WS; the page itself is gated. +func AuthMiddleware(password string) fiber.Handler { + expected := "Basic " + base64.StdEncoding.EncodeToString([]byte(":"+password)) + expectedBytes := []byte(expected) + + return func(c *fiber.Ctx) error { + if websocket.IsWebSocketUpgrade(c) { + return c.Next() + } + + deny := func() error { + c.Set("WWW-Authenticate", `Basic realm="Restricted"`) + return c.Status(fiber.StatusUnauthorized).SendString("Unauthorized") + } + + auth := c.Get("Authorization") + if auth == "" { + return deny() + } + if subtle.ConstantTimeCompare([]byte(auth), expectedBytes) == 1 { + return c.Next() + } + const scheme = "Basic " + if len(auth) < len(scheme) || auth[:len(scheme)] != scheme { + return deny() + } + decoded, err := base64.StdEncoding.DecodeString(auth[len(scheme):]) + if err != nil { + decoded, err = base64.RawStdEncoding.DecodeString(auth[len(scheme):]) + if err != nil { + return deny() + } + } + creds := strings.SplitN(string(decoded), ":", 2) + if len(creds) != 2 { + return deny() + } + if subtle.ConstantTimeCompare([]byte(creds[1]), []byte(password)) != 1 { + return deny() + } + return c.Next() + } +} + +// ListenUDP reads UDP packets and submits them to the recorder and hub. +// It exits on ctx cancellation. +func ListenUDP(ctx context.Context, addr string, h *hub.Hub, rec *recording.Recorder) { + conn, err := net.ListenPacket("udp", addr) + if err != nil { + slog.Error("udp listen failed", "err", err) + os.Exit(1) + } + slog.Info("udp listening", "addr", addr) + + go func() { + <-ctx.Done() + _ = conn.Close() + }() + + buf := make([]byte, 65536) + for { + n, _, err := conn.ReadFrom(buf) + if err != nil { + if ctx.Err() != nil { + return + } + slog.Warn("udp read error", "err", err) + continue + } + stats.Global.UDPPackets.Add(1) + stats.Global.UDPBytes.Add(int64(n)) + stats.Global.LastUDPPacketUnix.Store(time.Now().UnixNano()) + + data := buf[:n] + rec.Submit(data) + + if h.Count() > 0 { + cp := make([]byte, n) + copy(cp, data) + h.Broadcast(cp) + } + } +} diff --git a/internal/server/listen.go b/internal/server/listen.go new file mode 100644 index 0000000..d7e5d02 --- /dev/null +++ b/internal/server/listen.go @@ -0,0 +1,60 @@ +package server + +import ( + "crypto/tls" + "errors" + "fmt" + "log/slog" + nethttp "net/http" + + "github.com/gofiber/fiber/v2" + "golang.org/x/crypto/acme/autocert" +) + +// Listen starts serving the app on either HTTPS w/ autocert (if domain is set) +// or plain HTTP. It returns a channel that receives any startup error. +// +// httpAddr is used both as the plain HTTP listener and as the ACME challenge +// listener when domain is set. tlsAddr is only used when domain is set. +func Listen(app *fiber.App, httpAddr, tlsAddr, domain, certDir string) <-chan error { + errCh := make(chan error, 2) + + if domain == "" { + slog.Info("no domain set; HTTP only", "addr", httpAddr) + go func() { + if err := app.Listen(httpAddr); err != nil { + errCh <- fmt.Errorf("http: %w", err) + } + }() + return errCh + } + + cm := &autocert.Manager{ + Prompt: autocert.AcceptTOS, + HostPolicy: autocert.HostWhitelist(domain), + Cache: autocert.DirCache(certDir), + } + + if httpAddr != "" { + go func() { + slog.Info("starting HTTP server (ACME + redirect)", "addr", httpAddr) + srv := &nethttp.Server{Addr: httpAddr, Handler: cm.HTTPHandler(nil)} + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, nethttp.ErrServerClosed) { + errCh <- fmt.Errorf("http: %w", err) + } + }() + } + + slog.Info("starting HTTPS server with autocert", "addr", tlsAddr, "domain", domain) + ln, err := tls.Listen("tcp", tlsAddr, &tls.Config{GetCertificate: cm.GetCertificate}) + if err != nil { + errCh <- fmt.Errorf("tls listen: %w", err) + return errCh + } + go func() { + if err := app.Listener(ln); err != nil { + errCh <- fmt.Errorf("https: %w", err) + } + }() + return errCh +} diff --git a/internal/server/server.go b/internal/server/server.go new file mode 100644 index 0000000..04759f8 --- /dev/null +++ b/internal/server/server.go @@ -0,0 +1,176 @@ +package server + +import ( + "database/sql" + "strconv" + "time" + + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/logger" + "github.com/gofiber/websocket/v2" + "log/slog" + + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/assets" + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/hub" + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/recording" + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/stats" +) + +// Options configures the Fiber app constructed by New. +type Options struct { + Password string + DBPath string + Verbose bool + Compress bool + Retention time.Duration + DB *sql.DB + Hub *hub.Hub + Recorder *recording.Recorder +} + +// New constructs the Fiber application with all middleware and routes wired. +func New(opt Options) *fiber.App { + app := fiber.New(fiber.Config{DisableStartupMessage: true}) + + app.Use(logger.New(logger.Config{ + Format: "${time} ${method} ${path} ${status}\n", + Next: func(c *fiber.Ctx) bool { + if opt.Verbose { + return false + } + // Suppress noisy WS access logs by default. + path := c.Path() + return len(path) >= 3 && path[:3] == "/ws" + }, + })) + + app.Use(AuthMiddleware(opt.Password)) + + app.Get("/", func(c *fiber.Ctx) error { + data, err := assets.FS.ReadFile("viewer.html") + if err != nil { + return fiber.ErrInternalServerError + } + c.Set("Content-Type", "text/html; charset=utf-8") + return c.Send(data) + }) + + // Live WebSocket. + app.Use("/ws", upgradeOnly()) + app.Get("/ws", websocket.New(func(c *websocket.Conn) { + cl := opt.Hub.Add(c) + slog.Info("client connected", "addr", c.RemoteAddr().String(), "total", opt.Hub.Count()) + defer func() { + opt.Hub.Remove(cl) + slog.Info("client disconnected", "addr", c.RemoteAddr().String(), "total", opt.Hub.Count()) + }() + for { + if _, _, err := c.ReadMessage(); err != nil { + return + } + } + })) + + // Recording API. + app.Get("/api/recordings", func(c *fiber.Ctx) error { + since := c.Query("since", "") + limit, _ := strconv.Atoi(c.Query("limit", "0")) + recs, err := recording.List(opt.DB, since, limit) + if err != nil { + return c.Status(500).JSON(fiber.Map{"error": err.Error()}) + } + return c.JSON(recs) + }) + + app.Delete("/api/recordings/:id", func(c *fiber.Ctx) error { + id, err := strconv.ParseInt(c.Params("id"), 10, 64) + if err != nil { + return c.Status(400).JSON(fiber.Map{"error": "invalid id"}) + } + n, err := recording.Delete(opt.DB, id) + if err != nil { + return c.Status(500).JSON(fiber.Map{"error": err.Error()}) + } + if n == 0 { + return c.Status(404).JSON(fiber.Map{"error": "not found"}) + } + opt.Hub.PublishEvent(map[string]any{ + "type": "event", "kind": "recording_deleted", + "recording": map[string]any{"id": id}, + }) + return c.JSON(fiber.Map{"ok": true}) + }) + + app.Get("/api/stats", statsHandler(opt)) + + // Playback websocket. + app.Use("/ws/playback", upgradeOnly()) + app.Get("/ws/playback", websocket.New(func(c *websocket.Conn) { + idStr := c.Query("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + _ = c.WriteMessage(websocket.TextMessage, []byte(`{"error":"invalid id"}`)) + return + } + slog.Info("playback start", "id", id, "addr", c.RemoteAddr().String()) + p, err := recording.NewPlayer(opt.DB, c, id) + if err != nil { + _ = c.WriteMessage(websocket.TextMessage, []byte(`{"error":"player init failed"}`)) + return + } + p.Run() + slog.Info("playback end", "id", id, "addr", c.RemoteAddr().String()) + })) + + return app +} + +func upgradeOnly() fiber.Handler { + return func(c *fiber.Ctx) error { + if websocket.IsWebSocketUpgrade(c) { + return c.Next() + } + return fiber.ErrUpgradeRequired + } +} + +func statsHandler(opt Options) fiber.Handler { + return func(c *fiber.Ctx) error { + dbSize := recording.DBSize(opt.DBPath) + lastUDP := stats.Global.LastUDPPacketUnix.Load() + var lastAgeMs int64 = -1 + if lastUDP > 0 { + lastAgeMs = (time.Now().UnixNano() - lastUDP) / int64(time.Millisecond) + } + before := stats.Global.BytesBeforeCompr.Load() + after := stats.Global.BytesAfterCompr.Load() + var compressionRatio float64 + if after > 0 { + compressionRatio = float64(before) / float64(after) + } + curID, curName, curFrames, curElapsed := opt.Recorder.CurrentSession() + var current any + if curID != 0 { + current = map[string]any{ + "id": curID, "name": curName, + "frames": curFrames, "elapsed_ms": curElapsed, + } + } + return c.JSON(fiber.Map{ + "uptime_s": int64(time.Since(stats.Global.StartTime).Seconds()), + "udp_packets_total": stats.Global.UDPPackets.Load(), + "udp_bytes_total": stats.Global.UDPBytes.Load(), + "udp_last_packet_age_ms": lastAgeMs, + "ws_clients": opt.Hub.Count(), + "frames_written_total": stats.Global.FramesWritten.Load(), + "frames_dropped_total": stats.Global.FramesDropped.Load(), + "bytes_before_compr": before, + "bytes_after_compr": after, + "compression_ratio": compressionRatio, + "db_size_bytes": dbSize, + "current_recording": current, + "retention_hours": opt.Retention.Hours(), + "compression_enabled": opt.Compress, + }) + } +} diff --git a/internal/stats/stats.go b/internal/stats/stats.go new file mode 100644 index 0000000..4d2df77 --- /dev/null +++ b/internal/stats/stats.go @@ -0,0 +1,23 @@ +// Package stats holds process-wide observability counters shared across packages. +package stats + +import ( + "sync/atomic" + "time" +) + +// Stats are process-wide atomic counters. A single global instance is used so +// any package can record events without dependency injection plumbing. +type Stats struct { + StartTime time.Time + UDPPackets atomic.Int64 + UDPBytes atomic.Int64 + FramesWritten atomic.Int64 + FramesDropped atomic.Int64 + BytesBeforeCompr atomic.Int64 + BytesAfterCompr atomic.Int64 + LastUDPPacketUnix atomic.Int64 // unix nanos; 0 = never +} + +// Global is the singleton used by all packages. +var Global = &Stats{StartTime: time.Now()} diff --git a/main.go b/main.go new file mode 100644 index 0000000..b60d8fe --- /dev/null +++ b/main.go @@ -0,0 +1,133 @@ +// RocketLeagueBot-Renderer +// +// Live UDP ingest + WebSocket fan-out + SQLite recording/playback for +// RocketSimVis-compatible state telemetry. +// +// Architecture: +// +// bot ──UDP──▶ server.ListenUDP ──▶ hub.Broadcast ──WS──▶ live viewers +// │ +// └──▶ recording.Recorder (batched, zstd) ──▶ SQLite +// │ +// └──▶ recording.Player ──WS──▶ playback viewers +package main + +import ( + "context" + "flag" + "log/slog" + "os" + "os/signal" + "strings" + "syscall" + "time" + + _ "github.com/ncruces/go-sqlite3/driver" + + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/hub" + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/recording" + "git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/server" +) + +var ( + httpAddr = flag.String("http", ":80", "HTTP listen address (must be :80 for autocert)") + udpAddr = flag.String("udp", ":9273", "UDP listen address (RocketSimVis port)") + tlsAddr = flag.String("tls", ":443", "HTTPS listen address with autocert") + domain = flag.String("domain", "", "Domain for autocert (e.g., example.com)") + password = flag.String("password", "", "Password to protect the viewer (required)") + certDir = flag.String("certdir", "./certs", "Directory for storing certificates") + dbPath = flag.String("db", "./recordings.db", "Path to SQLite database for recordings") + retention = flag.Duration("retention", 7*24*time.Hour, "How long to keep recordings (e.g., 72h)") + compress = flag.Bool("compress", true, "Compress recorded frames with zstd") + verbose = flag.Bool("verbose", false, "Verbose HTTP request logging (including WS upgrades)") + logLevel = flag.String("log-level", "info", "Log level: debug|info|warn|error") +) + +func setupLogger() { + var level slog.Level + switch strings.ToLower(*logLevel) { + case "debug": + level = slog.LevelDebug + case "warn": + level = slog.LevelWarn + case "error": + level = slog.LevelError + default: + level = slog.LevelInfo + } + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level}))) +} + +func main() { + flag.Parse() + setupLogger() + + if *password == "" { + slog.Error("password flag is required (use -password=yourpassword)") + os.Exit(1) + } + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + h := hub.New() + + db, err := recording.OpenDB(*dbPath) + if err != nil { + slog.Error("db open failed", "err", err) + os.Exit(1) + } + defer db.Close() + + rec, err := recording.NewRecorder(db, h, *compress) + if err != nil { + slog.Error("recorder init failed", "err", err) + os.Exit(1) + } + rec.Start() + defer rec.Stop() + + // Retention cleanup: once at boot, then hourly. + recording.CleanOld(db, h, *retention) + go func() { + t := time.NewTicker(time.Hour) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + recording.CleanOld(db, h, *retention) + } + } + }() + + go server.ListenUDP(ctx, *udpAddr, h, rec) + + app := server.New(server.Options{ + Password: *password, + DBPath: *dbPath, + Verbose: *verbose, + Compress: *compress, + Retention: *retention, + DB: db, + Hub: h, + Recorder: rec, + }) + + serverErrCh := server.Listen(app, *httpAddr, *tlsAddr, *domain, *certDir) + + select { + case <-ctx.Done(): + slog.Info("shutdown signal received") + case err := <-serverErrCh: + slog.Error("server error", "err", err) + } + + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + if err := app.ShutdownWithContext(shutdownCtx); err != nil { + slog.Warn("app shutdown error", "err", err) + } + slog.Info("bye") +}