mirror of
https://github.com/skidoodle/spotify-ws
synced 2025-10-09 05:22:43 +02:00
refactor
This commit is contained in:
47
internal/websocket/handler.go
Normal file
47
internal/websocket/handler.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package websocket
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"net/http"
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
// newWebsocketHandler creates a new WebSocket handler closure.
|
||||
func (s *Server) newWebsocketHandler() websocket.Handler {
|
||||
return func(ws *websocket.Conn) {
|
||||
defer func() {
|
||||
s.hub.unregister <- ws
|
||||
if err := ws.Close(); err != nil {
|
||||
slog.Warn("error while closing websocket connection", "error", err, "remoteAddr", ws.RemoteAddr())
|
||||
}
|
||||
}()
|
||||
|
||||
origin := ws.Config().Origin.String()
|
||||
if !s.originChecker(origin) {
|
||||
slog.Warn("origin not allowed, rejecting connection", "origin", origin)
|
||||
return
|
||||
}
|
||||
|
||||
s.hub.register <- ws
|
||||
|
||||
// Send the last known state immediately upon connection.
|
||||
s.poller.SendLastState(ws)
|
||||
|
||||
// Block by reading from the client to detect disconnection.
|
||||
var msg string
|
||||
for {
|
||||
if err := websocket.Message.Receive(ws, &msg); err != nil {
|
||||
break // Client has disconnected.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// healthHandler responds to Docker health checks.
|
||||
func healthHandler(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
if _, err := w.Write([]byte("OK")); err != nil {
|
||||
slog.Warn("failed to write health check response", "error", err)
|
||||
}
|
||||
}
|
||||
94
internal/websocket/hub.go
Normal file
94
internal/websocket/hub.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package websocket
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"spotify-ws/internal/spotify"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
// Hub manages the set of active clients and broadcasts messages.
|
||||
type Hub struct {
|
||||
clients map[*websocket.Conn]struct{}
|
||||
mu sync.RWMutex
|
||||
realtime bool
|
||||
register chan *websocket.Conn
|
||||
unregister chan *websocket.Conn
|
||||
broadcast chan *spotify.CurrentlyPlaying
|
||||
}
|
||||
|
||||
// NewHub creates a new Hub.
|
||||
func NewHub(realtime bool) *Hub {
|
||||
return &Hub{
|
||||
clients: make(map[*websocket.Conn]struct{}),
|
||||
realtime: realtime,
|
||||
register: make(chan *websocket.Conn),
|
||||
unregister: make(chan *websocket.Conn),
|
||||
broadcast: make(chan *spotify.CurrentlyPlaying),
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the hub's event loop. It must be run in a separate goroutine.
|
||||
func (h *Hub) Run(ctx context.Context) {
|
||||
slog.Info("hub started")
|
||||
defer slog.Info("hub stopped")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
h.closeAllConnections()
|
||||
return
|
||||
case client := <-h.register:
|
||||
h.mu.Lock()
|
||||
h.clients[client] = struct{}{}
|
||||
h.mu.Unlock()
|
||||
slog.Debug("client registered", "remoteAddr", client.RemoteAddr())
|
||||
case client := <-h.unregister:
|
||||
h.mu.Lock()
|
||||
if _, ok := h.clients[client]; ok {
|
||||
delete(h.clients, client)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
slog.Debug("client unregistered", "remoteAddr", client.RemoteAddr())
|
||||
case state := <-h.broadcast:
|
||||
h.broadcastState(state)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast sends a state update to all connected clients.
|
||||
func (h *Hub) Broadcast(state *spotify.CurrentlyPlaying) {
|
||||
h.broadcast <- state
|
||||
}
|
||||
|
||||
// broadcastState handles the actual message sending.
|
||||
func (h *Hub) broadcastState(state *spotify.CurrentlyPlaying) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
if len(h.clients) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
clientPayload := newPlaybackState(state, h.realtime)
|
||||
for client := range h.clients {
|
||||
go func(c *websocket.Conn) {
|
||||
if err := websocket.JSON.Send(c, clientPayload); err != nil {
|
||||
slog.Warn("failed to broadcast message", "error", err, "remoteAddr", c.RemoteAddr())
|
||||
}
|
||||
}(client)
|
||||
}
|
||||
}
|
||||
|
||||
// closeAllConnections closes all active client connections during shutdown.
|
||||
func (h *Hub) closeAllConnections() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
for client := range h.clients {
|
||||
if err := client.Close(); err != nil {
|
||||
slog.Warn("error closing client connection during shutdown", "error", err, "remoteAddr", client.RemoteAddr())
|
||||
}
|
||||
}
|
||||
}
|
||||
108
internal/websocket/poller.go
Normal file
108
internal/websocket/poller.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package websocket
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"spotify-ws/internal/spotify"
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
// Poller is responsible for fetching data from the Spotify API periodically.
|
||||
type Poller struct {
|
||||
client *spotify.Client
|
||||
hub *Hub
|
||||
lastState *spotify.CurrentlyPlaying
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewPoller creates a new Poller.
|
||||
func NewPoller(client *spotify.Client, hub *Hub) *Poller {
|
||||
return &Poller{
|
||||
client: client,
|
||||
hub: hub,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the polling loop. It must be run in a separate goroutine.
|
||||
func (p *Poller) Run(ctx context.Context) {
|
||||
slog.Info("poller started")
|
||||
defer slog.Info("poller stopped")
|
||||
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
p.UpdateState(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateState fetches the latest state, compares it, and broadcasts if needed.
|
||||
func (p *Poller) UpdateState(ctx context.Context) {
|
||||
current, err := p.client.CurrentlyPlaying(ctx)
|
||||
if err != nil {
|
||||
slog.Error("failed to get currently playing track", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
hasChanged := p.hasStateChanged(current)
|
||||
if hasChanged {
|
||||
p.lastState = current
|
||||
}
|
||||
p.mu.Unlock()
|
||||
|
||||
if hasChanged {
|
||||
if !p.hub.realtime {
|
||||
trackName := "Nothing"
|
||||
if current.Item != nil {
|
||||
trackName = current.Item.Name
|
||||
}
|
||||
slog.Info("state changed, broadcasting update", "isPlaying", current.IsPlaying, "track", trackName)
|
||||
}
|
||||
p.hub.Broadcast(current)
|
||||
}
|
||||
}
|
||||
|
||||
// SendLastState sends the cached state to a single new client.
|
||||
func (p *Poller) SendLastState(ws *websocket.Conn) {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
if p.lastState == nil {
|
||||
return
|
||||
}
|
||||
clientPayload := newPlaybackState(p.lastState, p.hub.realtime)
|
||||
if err := websocket.JSON.Send(ws, clientPayload); err != nil {
|
||||
slog.Warn("failed to send initial state to client", "error", err, "remoteAddr", ws.RemoteAddr())
|
||||
}
|
||||
}
|
||||
|
||||
// hasStateChanged performs a robust comparison between the new and old states.
|
||||
// This function must be called within a lock.
|
||||
func (p *Poller) hasStateChanged(current *spotify.CurrentlyPlaying) bool {
|
||||
if p.lastState == nil {
|
||||
return true
|
||||
}
|
||||
if p.hub.realtime && current.IsPlaying && current.Item != nil {
|
||||
return true
|
||||
}
|
||||
if p.lastState.IsPlaying != current.IsPlaying {
|
||||
return true
|
||||
}
|
||||
if (p.lastState.Item == nil) != (current.Item == nil) {
|
||||
return true
|
||||
}
|
||||
if p.lastState.Item != nil && current.Item != nil && p.lastState.Item.ID != current.Item.ID {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
106
internal/websocket/server.go
Normal file
106
internal/websocket/server.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package websocket
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"spotify-ws/internal/spotify"
|
||||
)
|
||||
|
||||
// Server is the main application orchestrator. It owns all components
|
||||
// and manages the application's lifecycle.
|
||||
type Server struct {
|
||||
addr string
|
||||
httpServer *http.Server
|
||||
hub *Hub
|
||||
poller *Poller
|
||||
originChecker func(string) bool
|
||||
}
|
||||
|
||||
// NewServer creates a new, fully configured WebSocket server.
|
||||
func NewServer(addr string, allowedOrigins []string, spotifyClient *spotify.Client, realtime bool) *Server {
|
||||
hub := NewHub(realtime)
|
||||
poller := NewPoller(spotifyClient, hub)
|
||||
|
||||
// Create a closure for origin checking to keep the Server's dependencies clean.
|
||||
originChecker := func(origin string) bool {
|
||||
if len(allowedOrigins) == 0 {
|
||||
return true // Allow all if not specified.
|
||||
}
|
||||
for _, allowedOrigin := range allowedOrigins {
|
||||
if allowedOrigin == origin {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
return &Server{
|
||||
addr: addr,
|
||||
hub: hub,
|
||||
poller: poller,
|
||||
originChecker: originChecker,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the server and its components. It blocks until the context is
|
||||
// canceled and all components have shut down gracefully.
|
||||
func (s *Server) Run(ctx context.Context) error {
|
||||
// Do an initial state fetch before starting the server.
|
||||
s.poller.UpdateState(ctx)
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/", s.newWebsocketHandler())
|
||||
mux.HandleFunc("/health", healthHandler)
|
||||
|
||||
s.httpServer = &http.Server{
|
||||
Addr: s.addr,
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2) // For the hub and the poller.
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.hub.Run(ctx)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.poller.Run(ctx)
|
||||
}()
|
||||
|
||||
// Start the HTTP server.
|
||||
go func() {
|
||||
if err := s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
slog.Error("http server error", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for shutdown signal.
|
||||
<-ctx.Done()
|
||||
slog.Info("shutdown signal received")
|
||||
|
||||
// The hub and poller will stop automatically via the context.
|
||||
// We just need to shut down the HTTP server and wait for goroutines to finish.
|
||||
s.shutdown()
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// shutdown gracefully shuts down the HTTP server.
|
||||
func (s *Server) shutdown() {
|
||||
slog.Info("shutting down http server")
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := s.httpServer.Shutdown(shutdownCtx); err != nil {
|
||||
slog.Error("http server shutdown error", "error", err)
|
||||
}
|
||||
}
|
||||
29
internal/websocket/state.go
Normal file
29
internal/websocket/state.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package websocket
|
||||
|
||||
import "spotify-ws/internal/spotify"
|
||||
|
||||
// PlaybackState is the client-facing data structure. It conditionally omits
|
||||
// real-time data fields from JSON based on the server's mode.
|
||||
type PlaybackState struct {
|
||||
IsPlaying bool `json:"is_playing"`
|
||||
ProgressMs int `json:"progress_ms,omitempty"`
|
||||
Timestamp int64 `json:"timestamp,omitempty"`
|
||||
Item *spotify.TrackItem `json:"item"`
|
||||
}
|
||||
|
||||
// newPlaybackState creates a client-facing PlaybackState from the internal Spotify data.
|
||||
// It includes progress data only if the server is in real-time mode.
|
||||
func newPlaybackState(data *spotify.CurrentlyPlaying, realtime bool) PlaybackState {
|
||||
if data == nil {
|
||||
return PlaybackState{IsPlaying: false}
|
||||
}
|
||||
state := PlaybackState{
|
||||
IsPlaying: data.IsPlaying,
|
||||
Item: data.Item,
|
||||
}
|
||||
if realtime {
|
||||
state.ProgressMs = data.ProgressMs
|
||||
state.Timestamp = data.Timestamp
|
||||
}
|
||||
return state
|
||||
}
|
||||
Reference in New Issue
Block a user