diff --git a/.env.example b/.env.example index 796b526..15d8208 100644 --- a/.env.example +++ b/.env.example @@ -6,3 +6,4 @@ REFRESH_TOKEN= # LOG_LEVEL=DEBUG|WARN|ERROR # ALLOWED_ORIGINS=http://localhost:3000 # SERVER_PORT=3000 +# RT=true diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 3851826..ce222b8 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -16,7 +16,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: '1.22.4' + go-version: '1.25.1' - name: Build run: go build -v ./... diff --git a/.gitignore b/.gitignore index 1a2ebe2..8c5ef65 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .env** +.idea diff --git a/Dockerfile b/Dockerfile index d07bfd1..31feca2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,17 +1,16 @@ -FROM golang:1.24 AS builder +FROM golang:1.25.1 AS builder ENV CGO_ENABLED=0 GOOS=linux GOARCH=amd64 WORKDIR /app COPY go.mod go.sum ./ RUN go mod download COPY . . RUN go build -ldflags="-s -w" -o spotify-ws . -RUN go build -ldflags="-s -w" -o healthcheck ./healthcheck/healthcheck.go FROM gcr.io/distroless/static:nonroot WORKDIR /app COPY --from=builder --chown=nonroot:nonroot /app/spotify-ws . -COPY --from=builder --chown=nonroot:nonroot /app/healthcheck . + EXPOSE 3000 -HEALTHCHECK --interval=30s --timeout=5s --start-period=5s CMD ["./healthcheck"] +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s CMD ["/spotify-ws", "-health"] USER nonroot:nonroot -CMD ["./spotify-ws"] +CMD ["/spotify-ws"] \ No newline at end of file diff --git a/docker-compose.dev.yaml b/docker-compose.dev.yaml index ada54b9..49382ed 100644 --- a/docker-compose.dev.yaml +++ b/docker-compose.dev.yaml @@ -14,3 +14,4 @@ services: #- LOG_LEVEL=DEBUG #- ALLOWED_ORIGINS=http://localhost:3000 #- SERVER_PORT=3000 + #- RT=true diff --git a/docker-compose.yaml b/docker-compose.yaml index 700d701..ea95745 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -10,5 +10,6 @@ services: - CLIENT_SECRET=${CLIENT_SECRET} - CLIENT_ID=${CLIENT_ID} #- LOG_LEVEL=DEBUG - # ALLOWED_ORIGINS=http://localhost:3000 - # SERVER_PORT=3000 + #- ALLOWED_ORIGINS=http://localhost:3000 + #- SERVER_PORT=3000 + #- RT=true diff --git a/go.mod b/go.mod index b7d3f28..3ed95e9 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,9 @@ -module skidoodle/spotify-ws +module spotify-ws -go 1.24.0 +go 1.25.1 require ( - github.com/gorilla/websocket v1.5.3 github.com/joho/godotenv v1.5.1 - github.com/sirupsen/logrus v1.9.3 - github.com/zmb3/spotify v1.3.0 - golang.org/x/oauth2 v0.27.0 + golang.org/x/net v0.44.0 + golang.org/x/oauth2 v0.31.0 ) - -require golang.org/x/sys v0.30.0 // indirect diff --git a/go.sum b/go.sum index 2c65065..c54a2f1 100644 --- a/go.sum +++ b/go.sum @@ -1,34 +1,6 @@ -cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= -github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= -github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/zmb3/spotify v1.3.0 h1:6Z2F1IMx0Hviq/dpf8nFwvKPppFEMXn8yfReSBVi16k= -github.com/zmb3/spotify v1.3.0/go.mod h1:GD7AAEMUJVYc2Z7p2a2S0E3/5f/KxM/vOnErNr4j+Tw= -golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= -golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= -golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/oauth2 v0.31.0 h1:8Fq0yVZLh4j4YA47vHKFTa9Ew5XIrCP8LC6UeNZnLxo= +golang.org/x/oauth2 v0.31.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= diff --git a/healthcheck/go.mod b/healthcheck/go.mod deleted file mode 100644 index 8abade9..0000000 --- a/healthcheck/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module healthcheck - -go 1.24.0 diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go deleted file mode 100644 index 97f2601..0000000 --- a/healthcheck/healthcheck.go +++ /dev/null @@ -1,24 +0,0 @@ -package main - -import ( - "fmt" - "log" - "net/http" - "os" -) - -func main() { - resp, err := http.Get("http://localhost:3000/health") - if err != nil { - log.Fatalf("Error performing health check: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - log.Printf("Health check failed: Status code %d", resp.StatusCode) - os.Exit(1) - } - - fmt.Println("OK") - os.Exit(0) -} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..c74dc2d --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,71 @@ +package config + +import ( + "fmt" + "log/slog" + "os" + "strconv" + "strings" + + "github.com/joho/godotenv" +) + +// Config holds the application configuration. +type Config struct { + ServerPort string + AllowedOrigins []string + LogLevel slog.Level + RT bool + Spotify struct { + ClientID string + ClientSecret string + RefreshToken string + } +} + +// Load loads the configuration from environment variables. +func Load() (*Config, error) { + if err := godotenv.Load(); err != nil { + slog.Warn("no .env file found, using environment variables") + } + + cfg := &Config{} + + cfg.Spotify.ClientID = os.Getenv("SPOTIFY_CLIENT_ID") + cfg.Spotify.ClientSecret = os.Getenv("SPOTIFY_CLIENT_SECRET") + cfg.Spotify.RefreshToken = os.Getenv("SPOTIFY_REFRESH_TOKEN") + + if cfg.Spotify.ClientID == "" || cfg.Spotify.ClientSecret == "" || cfg.Spotify.RefreshToken == "" { + return nil, fmt.Errorf("spotify credentials are not set") + } + + cfg.ServerPort = os.Getenv("SERVER_PORT") + if cfg.ServerPort == "" { + cfg.ServerPort = "3000" + } + + allowedOrigins := os.Getenv("ALLOWED_ORIGINS") + if allowedOrigins != "" { + cfg.AllowedOrigins = strings.Split(allowedOrigins, ",") + } + + rt, err := strconv.ParseBool(os.Getenv("RT")) + if err != nil { + cfg.RT = false + } else { + cfg.RT = rt + } + + switch strings.ToLower(os.Getenv("LOG_LEVEL")) { + case "debug": + cfg.LogLevel = slog.LevelDebug + case "warn": + cfg.LogLevel = slog.LevelWarn + case "error": + cfg.LogLevel = slog.LevelError + default: + cfg.LogLevel = slog.LevelInfo + } + + return cfg, nil +} diff --git a/internal/spotify/client.go b/internal/spotify/client.go new file mode 100644 index 0000000..a1ba997 --- /dev/null +++ b/internal/spotify/client.go @@ -0,0 +1,74 @@ +package spotify + +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + + "golang.org/x/oauth2" +) + +const ( + tokenURL = "https://accounts.spotify.com/api/token" + currentlyPlayingURL = "https://api.spotify.com/v1/me/player/currently-playing" +) + +// Client is a thread-safe client for interacting with the Spotify API. +type Client struct { + httpClient *http.Client +} + +// NewClient creates a new Spotify API client using the refresh token flow. +// The returned client is safe for concurrent use. +func NewClient(ctx context.Context, clientID, clientSecret, refreshToken string) *Client { + conf := &oauth2.Config{ + ClientID: clientID, + ClientSecret: clientSecret, + Endpoint: oauth2.Endpoint{ + TokenURL: tokenURL, + }, + } + + token := &oauth2.Token{ + RefreshToken: refreshToken, + } + + // The TokenSource is concurrency-safe and handles token refreshes automatically. + tokenSource := conf.TokenSource(ctx, token) + + return &Client{ + httpClient: oauth2.NewClient(ctx, tokenSource), + } +} + +// CurrentlyPlaying fetches the user's currently playing track from the Spotify API. +func (c *Client) CurrentlyPlaying(ctx context.Context) (*CurrentlyPlaying, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, currentlyPlayingURL, nil) + if err != nil { + return nil, err + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer func() { + if err := resp.Body.Close(); err != nil { + slog.Warn("failed to close spotify api response body", "error", err) + } + }() + + // When nothing is playing, Spotify returns 204 No Content. + // We normalize this to a consistent struct response for the caller. + if resp.StatusCode == http.StatusNoContent { + return &CurrentlyPlaying{IsPlaying: false, Item: nil}, nil + } + + var currentlyPlaying CurrentlyPlaying + if err := json.NewDecoder(resp.Body).Decode(¤tlyPlaying); err != nil { + return nil, err + } + + return ¤tlyPlaying, nil +} diff --git a/internal/spotify/model.go b/internal/spotify/model.go new file mode 100644 index 0000000..9991ec2 --- /dev/null +++ b/internal/spotify/model.go @@ -0,0 +1,25 @@ +package spotify + +// TrackItem represents the track object from the Spotify API. +type TrackItem struct { + ID string `json:"id"` + Name string `json:"name"` + DurationMs int `json:"duration_ms"` + Artists []struct { + Name string `json:"name"` + } `json:"artists"` + Album struct { + Images []struct { + URL string `json:"url"` + } `json:"images"` + } `json:"album"` +} + +// CurrentlyPlaying represents the currently playing object from the Spotify API. +// The Item field is a pointer to handle cases where nothing is playing (item is null). +type CurrentlyPlaying struct { + IsPlaying bool `json:"is_playing"` + ProgressMs int `json:"progress_ms"` + Timestamp int64 `json:"timestamp"` + Item *TrackItem `json:"item"` +} diff --git a/internal/websocket/handler.go b/internal/websocket/handler.go new file mode 100644 index 0000000..4432bba --- /dev/null +++ b/internal/websocket/handler.go @@ -0,0 +1,47 @@ +package websocket + +import ( + "log/slog" + "net/http" + + "golang.org/x/net/websocket" +) + +// newWebsocketHandler creates a new WebSocket handler closure. +func (s *Server) newWebsocketHandler() websocket.Handler { + return func(ws *websocket.Conn) { + defer func() { + s.hub.unregister <- ws + if err := ws.Close(); err != nil { + slog.Warn("error while closing websocket connection", "error", err, "remoteAddr", ws.RemoteAddr()) + } + }() + + origin := ws.Config().Origin.String() + if !s.originChecker(origin) { + slog.Warn("origin not allowed, rejecting connection", "origin", origin) + return + } + + s.hub.register <- ws + + // Send the last known state immediately upon connection. + s.poller.SendLastState(ws) + + // Block by reading from the client to detect disconnection. + var msg string + for { + if err := websocket.Message.Receive(ws, &msg); err != nil { + break // Client has disconnected. + } + } + } +} + +// healthHandler responds to Docker health checks. +func healthHandler(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("OK")); err != nil { + slog.Warn("failed to write health check response", "error", err) + } +} diff --git a/internal/websocket/hub.go b/internal/websocket/hub.go new file mode 100644 index 0000000..7900299 --- /dev/null +++ b/internal/websocket/hub.go @@ -0,0 +1,94 @@ +package websocket + +import ( + "context" + "log/slog" + "spotify-ws/internal/spotify" + "sync" + + "golang.org/x/net/websocket" +) + +// Hub manages the set of active clients and broadcasts messages. +type Hub struct { + clients map[*websocket.Conn]struct{} + mu sync.RWMutex + realtime bool + register chan *websocket.Conn + unregister chan *websocket.Conn + broadcast chan *spotify.CurrentlyPlaying +} + +// NewHub creates a new Hub. +func NewHub(realtime bool) *Hub { + return &Hub{ + clients: make(map[*websocket.Conn]struct{}), + realtime: realtime, + register: make(chan *websocket.Conn), + unregister: make(chan *websocket.Conn), + broadcast: make(chan *spotify.CurrentlyPlaying), + } +} + +// Run starts the hub's event loop. It must be run in a separate goroutine. +func (h *Hub) Run(ctx context.Context) { + slog.Info("hub started") + defer slog.Info("hub stopped") + + for { + select { + case <-ctx.Done(): + h.closeAllConnections() + return + case client := <-h.register: + h.mu.Lock() + h.clients[client] = struct{}{} + h.mu.Unlock() + slog.Debug("client registered", "remoteAddr", client.RemoteAddr()) + case client := <-h.unregister: + h.mu.Lock() + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + } + h.mu.Unlock() + slog.Debug("client unregistered", "remoteAddr", client.RemoteAddr()) + case state := <-h.broadcast: + h.broadcastState(state) + } + } +} + +// Broadcast sends a state update to all connected clients. +func (h *Hub) Broadcast(state *spotify.CurrentlyPlaying) { + h.broadcast <- state +} + +// broadcastState handles the actual message sending. +func (h *Hub) broadcastState(state *spotify.CurrentlyPlaying) { + h.mu.RLock() + defer h.mu.RUnlock() + + if len(h.clients) == 0 { + return + } + + clientPayload := newPlaybackState(state, h.realtime) + for client := range h.clients { + go func(c *websocket.Conn) { + if err := websocket.JSON.Send(c, clientPayload); err != nil { + slog.Warn("failed to broadcast message", "error", err, "remoteAddr", c.RemoteAddr()) + } + }(client) + } +} + +// closeAllConnections closes all active client connections during shutdown. +func (h *Hub) closeAllConnections() { + h.mu.Lock() + defer h.mu.Unlock() + for client := range h.clients { + if err := client.Close(); err != nil { + slog.Warn("error closing client connection during shutdown", "error", err, "remoteAddr", client.RemoteAddr()) + } + } +} diff --git a/internal/websocket/poller.go b/internal/websocket/poller.go new file mode 100644 index 0000000..413647b --- /dev/null +++ b/internal/websocket/poller.go @@ -0,0 +1,108 @@ +package websocket + +import ( + "context" + "log/slog" + "sync" + "time" + + "spotify-ws/internal/spotify" + + "golang.org/x/net/websocket" +) + +// Poller is responsible for fetching data from the Spotify API periodically. +type Poller struct { + client *spotify.Client + hub *Hub + lastState *spotify.CurrentlyPlaying + mu sync.RWMutex +} + +// NewPoller creates a new Poller. +func NewPoller(client *spotify.Client, hub *Hub) *Poller { + return &Poller{ + client: client, + hub: hub, + } +} + +// Run starts the polling loop. It must be run in a separate goroutine. +func (p *Poller) Run(ctx context.Context) { + slog.Info("poller started") + defer slog.Info("poller stopped") + + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + p.UpdateState(ctx) + } + } +} + +// UpdateState fetches the latest state, compares it, and broadcasts if needed. +func (p *Poller) UpdateState(ctx context.Context) { + current, err := p.client.CurrentlyPlaying(ctx) + if err != nil { + slog.Error("failed to get currently playing track", "error", err) + return + } + + p.mu.Lock() + hasChanged := p.hasStateChanged(current) + if hasChanged { + p.lastState = current + } + p.mu.Unlock() + + if hasChanged { + if !p.hub.realtime { + trackName := "Nothing" + if current.Item != nil { + trackName = current.Item.Name + } + slog.Info("state changed, broadcasting update", "isPlaying", current.IsPlaying, "track", trackName) + } + p.hub.Broadcast(current) + } +} + +// SendLastState sends the cached state to a single new client. +func (p *Poller) SendLastState(ws *websocket.Conn) { + p.mu.RLock() + defer p.mu.RUnlock() + + if p.lastState == nil { + return + } + clientPayload := newPlaybackState(p.lastState, p.hub.realtime) + if err := websocket.JSON.Send(ws, clientPayload); err != nil { + slog.Warn("failed to send initial state to client", "error", err, "remoteAddr", ws.RemoteAddr()) + } +} + +// hasStateChanged performs a robust comparison between the new and old states. +// This function must be called within a lock. +func (p *Poller) hasStateChanged(current *spotify.CurrentlyPlaying) bool { + if p.lastState == nil { + return true + } + if p.hub.realtime && current.IsPlaying && current.Item != nil { + return true + } + if p.lastState.IsPlaying != current.IsPlaying { + return true + } + if (p.lastState.Item == nil) != (current.Item == nil) { + return true + } + if p.lastState.Item != nil && current.Item != nil && p.lastState.Item.ID != current.Item.ID { + return true + } + return false +} diff --git a/internal/websocket/server.go b/internal/websocket/server.go new file mode 100644 index 0000000..cceca2a --- /dev/null +++ b/internal/websocket/server.go @@ -0,0 +1,106 @@ +package websocket + +import ( + "context" + "errors" + "log/slog" + "net/http" + "sync" + "time" + + "spotify-ws/internal/spotify" +) + +// Server is the main application orchestrator. It owns all components +// and manages the application's lifecycle. +type Server struct { + addr string + httpServer *http.Server + hub *Hub + poller *Poller + originChecker func(string) bool +} + +// NewServer creates a new, fully configured WebSocket server. +func NewServer(addr string, allowedOrigins []string, spotifyClient *spotify.Client, realtime bool) *Server { + hub := NewHub(realtime) + poller := NewPoller(spotifyClient, hub) + + // Create a closure for origin checking to keep the Server's dependencies clean. + originChecker := func(origin string) bool { + if len(allowedOrigins) == 0 { + return true // Allow all if not specified. + } + for _, allowedOrigin := range allowedOrigins { + if allowedOrigin == origin { + return true + } + } + return false + } + + return &Server{ + addr: addr, + hub: hub, + poller: poller, + originChecker: originChecker, + } +} + +// Run starts the server and its components. It blocks until the context is +// canceled and all components have shut down gracefully. +func (s *Server) Run(ctx context.Context) error { + // Do an initial state fetch before starting the server. + s.poller.UpdateState(ctx) + + mux := http.NewServeMux() + mux.Handle("/", s.newWebsocketHandler()) + mux.HandleFunc("/health", healthHandler) + + s.httpServer = &http.Server{ + Addr: s.addr, + Handler: mux, + } + + var wg sync.WaitGroup + wg.Add(2) // For the hub and the poller. + + go func() { + defer wg.Done() + s.hub.Run(ctx) + }() + + go func() { + defer wg.Done() + s.poller.Run(ctx) + }() + + // Start the HTTP server. + go func() { + if err := s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error("http server error", "error", err) + } + }() + + // Wait for shutdown signal. + <-ctx.Done() + slog.Info("shutdown signal received") + + // The hub and poller will stop automatically via the context. + // We just need to shut down the HTTP server and wait for goroutines to finish. + s.shutdown() + wg.Wait() + + return nil +} + +// shutdown gracefully shuts down the HTTP server. +func (s *Server) shutdown() { + slog.Info("shutting down http server") + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := s.httpServer.Shutdown(shutdownCtx); err != nil { + slog.Error("http server shutdown error", "error", err) + } +} diff --git a/internal/websocket/state.go b/internal/websocket/state.go new file mode 100644 index 0000000..db80432 --- /dev/null +++ b/internal/websocket/state.go @@ -0,0 +1,29 @@ +package websocket + +import "spotify-ws/internal/spotify" + +// PlaybackState is the client-facing data structure. It conditionally omits +// real-time data fields from JSON based on the server's mode. +type PlaybackState struct { + IsPlaying bool `json:"is_playing"` + ProgressMs int `json:"progress_ms,omitempty"` + Timestamp int64 `json:"timestamp,omitempty"` + Item *spotify.TrackItem `json:"item"` +} + +// newPlaybackState creates a client-facing PlaybackState from the internal Spotify data. +// It includes progress data only if the server is in real-time mode. +func newPlaybackState(data *spotify.CurrentlyPlaying, realtime bool) PlaybackState { + if data == nil { + return PlaybackState{IsPlaying: false} + } + state := PlaybackState{ + IsPlaying: data.IsPlaying, + Item: data.Item, + } + if realtime { + state.ProgressMs = data.ProgressMs + state.Timestamp = data.Timestamp + } + return state +} diff --git a/main.go b/main.go index f581a20..42d5d4d 100644 --- a/main.go +++ b/main.go @@ -3,364 +3,45 @@ package main import ( "context" "fmt" - "net/http" + "log/slog" "os" "os/signal" - "strings" - "sync" "syscall" - "time" - "github.com/gorilla/websocket" - "github.com/joho/godotenv" - "github.com/sirupsen/logrus" - "github.com/zmb3/spotify" - "golang.org/x/oauth2" -) - -// Configuration holds application settings -type Configuration struct { - ServerPort string - AllowedOrigins []string - LogLevel logrus.Level - Spotify struct { - ClientID string - ClientSecret string - RefreshToken string - } -} - -var ( - config Configuration - clients = make(map[*websocket.Conn]bool) - clientsMutex sync.RWMutex - broadcast = make(chan *spotify.CurrentlyPlaying) - connectChan = make(chan *websocket.Conn) - - upgrader = websocket.Upgrader{ - HandshakeTimeout: 10 * time.Second, - ReadBufferSize: 1024, - WriteBufferSize: 1024, - } - - spotifyClient spotify.Client - tokenSource oauth2.TokenSource - lastState struct { - sync.RWMutex - track *spotify.CurrentlyPlaying - playing bool - } -) - -const ( - defaultPort = ":3000" - tokenRefreshURL = "https://accounts.spotify.com/api/token" - apiRetryDelay = 3 * time.Second - heartbeatInterval = 3 * time.Second - writeTimeout = 10 * time.Second + "spotify-ws/internal/config" + "spotify-ws/internal/spotify" + "spotify-ws/internal/websocket" ) func main() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() - initializeApplication() - initializeSpotifyClient() - - router := http.NewServeMux() - router.HandleFunc("/", connectionHandler) - router.HandleFunc("/health", healthHandler) - - server := &http.Server{ - Addr: config.ServerPort, - Handler: router, - } - - go trackFetcher(ctx) - go messageHandler(ctx) - go connectionManager(ctx) - - startServer(server, ctx) - handleShutdown(server, cancel) -} - -func initializeApplication() { - logrus.SetFormatter(&logrus.TextFormatter{ - FullTimestamp: true, - TimestampFormat: time.RFC3339, - ForceColors: true, - }) - - if err := loadConfiguration(); err != nil { - logrus.Fatal(err) - } - - upgrader.CheckOrigin = func(r *http.Request) bool { - if len(config.AllowedOrigins) == 0 { - return true - } - origin := r.Header.Get("Origin") - for _, allowed := range config.AllowedOrigins { - if origin == allowed { - return true - } - } - return false + if err := run(ctx); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) } } -func loadConfiguration() error { - _ = godotenv.Load() - - required := map[string]*string{ - "CLIENT_ID": &config.Spotify.ClientID, - "CLIENT_SECRET": &config.Spotify.ClientSecret, - "REFRESH_TOKEN": &config.Spotify.RefreshToken, +func run(ctx context.Context) error { + cfg, err := config.Load() + if err != nil { + return fmt.Errorf("failed to load config: %w", err) } - for key, ptr := range required { - value := os.Getenv(key) - if value == "" { - return fmt.Errorf("missing required environment variable: %s", key) - } - *ptr = value + handlerOptions := &slog.HandlerOptions{Level: cfg.LogLevel} + logger := slog.New(slog.NewJSONHandler(os.Stdout, handlerOptions)) + slog.SetDefault(logger) + + spotifyClient := spotify.NewClient(ctx, cfg.Spotify.ClientID, cfg.Spotify.ClientSecret, cfg.Spotify.RefreshToken) + wsServer := websocket.NewServer(":"+cfg.ServerPort, cfg.AllowedOrigins, spotifyClient, cfg.RT) + + slog.Info("starting spotify-ws server", "port", cfg.ServerPort, "realtime", cfg.RT) + + if err := wsServer.Run(ctx); err != nil { + return fmt.Errorf("server runtime error: %w", err) } - config.ServerPort = defaultPort - if port := os.Getenv("SERVER_PORT"); port != "" { - config.ServerPort = ":" + strings.TrimLeft(port, ":") - } - - config.AllowedOrigins = strings.Split(os.Getenv("ALLOWED_ORIGINS"), ",") - - logLevel := strings.ToLower(os.Getenv("LOG_LEVEL")) - switch logLevel { - case "debug": - config.LogLevel = logrus.DebugLevel - case "warn": - config.LogLevel = logrus.WarnLevel - case "error": - config.LogLevel = logrus.ErrorLevel - default: - config.LogLevel = logrus.InfoLevel - } - logrus.SetLevel(config.LogLevel) - + slog.Info("server shut down gracefully") return nil } - -func initializeSpotifyClient() { - token := &oauth2.Token{RefreshToken: config.Spotify.RefreshToken} - oauthConfig := &oauth2.Config{ - ClientID: config.Spotify.ClientID, - ClientSecret: config.Spotify.ClientSecret, - Endpoint: oauth2.Endpoint{TokenURL: tokenRefreshURL}, - } - - tokenSource = oauthConfig.TokenSource(context.Background(), token) - spotifyClient = spotify.NewClient(oauth2.NewClient(context.Background(), tokenSource)) - - logrus.Info("Spotify client initialized successfully") -} - -func startServer(server *http.Server, _ context.Context) { - go func() { - logrus.Infof("Server starting on %s", config.ServerPort) - if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logrus.Fatalf("Server failed to start: %v", err) - } - }() -} - -func handleShutdown(server *http.Server, cancel context.CancelFunc) { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) - <-sigChan - - logrus.Info("Initiating graceful shutdown...") - cancel() - - ctx, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second) - defer cancelTimeout() - - clientsMutex.Lock() - for client := range clients { - client.Close() - } - clientsMutex.Unlock() - - if err := server.Shutdown(ctx); err != nil { - logrus.Errorf("Server shutdown error: %v", err) - } - logrus.Info("Server shutdown complete") -} - -func connectionHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("X-Source", "github.com/skidoodle/spotify-ws") - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - logrus.Errorf("WebSocket upgrade failed: %v", err) - return - } - - // Add client to the pool - clientsMutex.Lock() - clients[ws] = true - clientsMutex.Unlock() - - logrus.Debugf("New client connected: %s", ws.RemoteAddr()) - - // Send initial state if available - sendInitialState(ws) - - // Start monitoring the connection - go monitorConnection(ws) -} - -func connectionManager(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case client := <-connectChan: - // Add client to the pool - clientsMutex.Lock() - clients[client] = true - clientsMutex.Unlock() - - logrus.Debugf("New client connected: %s", client.RemoteAddr()) - - // Send initial state if available - sendInitialState(client) - - // Start monitoring the connection - go monitorConnection(client) - } - } -} - -func monitorConnection(ws *websocket.Conn) { - defer func() { - // Clean up the connection - clientsMutex.Lock() - delete(clients, ws) - clientsMutex.Unlock() - - // Close the WebSocket connection - ws.Close() - logrus.Debugf("Client disconnected: %s", ws.RemoteAddr()) - }() - - for { - // Set a read deadline to detect dead connections - ws.SetReadDeadline(time.Now().Add(30 * time.Second)) - - // Attempt to read a message (even though we don't expect any) - _, _, err := ws.NextReader() - if err != nil { - // Check if the error is a normal closure - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - logrus.Debugf("Client disconnected unexpectedly: %v", err) - } - break - } - } -} - -func sendInitialState(client *websocket.Conn) { - lastState.RLock() - defer lastState.RUnlock() - - if lastState.track == nil { - logrus.Debug("No initial state available to send") - return - } - - if err := client.WriteJSON(lastState.track); err != nil { - logrus.Errorf("Failed to send initial state: %v", err) - client.Close() - } -} - -func messageHandler(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case msg := <-broadcast: - broadcastToClients(msg) - } - } -} - -func broadcastToClients(msg *spotify.CurrentlyPlaying) { - clientsMutex.RLock() - defer clientsMutex.RUnlock() - - for client := range clients { - client.SetWriteDeadline(time.Now().Add(writeTimeout)) - if err := client.WriteJSON(msg); err != nil { - logrus.Debugf("Broadcast failed: %v", err) - client.Close() - } - } -} - -func trackFetcher(ctx context.Context) { - ticker := time.NewTicker(heartbeatInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - fetchAndBroadcastState() - } - } -} - -func fetchAndBroadcastState() { - current, err := spotifyClient.PlayerCurrentlyPlaying() - if err != nil { - logrus.Errorf("Failed to fetch playback state: %v", err) - time.Sleep(apiRetryDelay) - return - } - - logrus.Debugf("Fetched playback state: %+v", current) - - updateState(current) -} - -func updateState(current *spotify.CurrentlyPlaying) { - lastState.Lock() - defer lastState.Unlock() - - if current == nil { - logrus.Warn("Received nil playback state from Spotify") - return - } - - if lastState.track == nil { - lastState.track = &spotify.CurrentlyPlaying{} - } - - stateChanged := lastState.track.Item == nil || - current.Item == nil || - lastState.track.Item.ID != current.Item.ID || - lastState.playing != current.Playing - - lastState.track = current - lastState.playing = current.Playing - - if stateChanged || current.Playing { - broadcast <- current - } -} - -func healthHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/plain") - w.WriteHeader(http.StatusOK) - w.Write([]byte("OK")) -} diff --git a/readme.md b/readme.md index df829fe..da16ff2 100644 --- a/readme.md +++ b/readme.md @@ -42,6 +42,7 @@ services: #- LOG_LEVEL=DEBUG|WARN|ERROR #- ALLOWED_ORIGINS=http://localhost:3000 #- SERVER_PORT=3000 + #- RT=true ``` ### Docker run