From dbe9e1a5a79dd6890a0cd7cfcb1de62f5c7ef9cd Mon Sep 17 00:00:00 2001 From: skidoodle Date: Fri, 28 Feb 2025 20:29:08 +0000 Subject: [PATCH] refactor: clean up and improve codebase --- .env.example | 7 +- Dockerfile | 7 +- docker-compose.dev.yaml | 16 ++ docker-compose.yaml | 14 +- main.go | 397 ++++++++++++++++++++++++++-------------- readme.md | 32 ++-- 6 files changed, 311 insertions(+), 162 deletions(-) create mode 100644 docker-compose.dev.yaml diff --git a/.env.example b/.env.example index a4e4a01..796b526 100644 --- a/.env.example +++ b/.env.example @@ -1,5 +1,8 @@ -# Required (Spotify API credentials) +# Required (Spotify API credentials, https://gist.github.com/skidoodle/9a9dc9c8802434f7fc0da94ebe4dba18) CLIENT_ID= CLIENT_SECRET= REFRESH_TOKEN= -# LOG_LEVEL=DEBUG (optional) +# Optional +# LOG_LEVEL=DEBUG|WARN|ERROR +# ALLOWED_ORIGINS=http://localhost:3000 +# SERVER_PORT=3000 diff --git a/Dockerfile b/Dockerfile index 7ba13c2..36c1e0f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,14 +1,15 @@ -FROM golang:alpine AS builder +FROM golang:1.22.4 AS builder ENV CGO_ENABLED=0 GOOS=linux GOARCH=amd64 WORKDIR /app COPY go.mod go.sum ./ RUN go mod download COPY . . -RUN go build -o spotify-ws . +RUN go build -ldflags="-s -w" -o spotify-ws . FROM gcr.io/distroless/static:nonroot WORKDIR /app -COPY --from=builder /app/spotify-ws . +COPY --from=builder --chown=nonroot:nonroot /app/spotify-ws . EXPOSE 3000 +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s CMD curl --fail http://localhost:3000/health || exit 1 USER nonroot:nonroot CMD ["./spotify-ws"] diff --git a/docker-compose.dev.yaml b/docker-compose.dev.yaml new file mode 100644 index 0000000..ada54b9 --- /dev/null +++ b/docker-compose.dev.yaml @@ -0,0 +1,16 @@ +services: + spotify-ws: + build: + context: . + dockerfile: Dockerfile + container_name: spotify-ws + restart: unless-stopped + ports: + - "3000:3000" + environment: + - REFRESH_TOKEN=${REFRESH_TOKEN} + - CLIENT_SECRET=${CLIENT_SECRET} + - CLIENT_ID=${CLIENT_ID} + #- LOG_LEVEL=DEBUG + #- ALLOWED_ORIGINS=http://localhost:3000 + #- SERVER_PORT=3000 diff --git a/docker-compose.yaml b/docker-compose.yaml index ccbdf8b..4c22cef 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -6,7 +6,15 @@ services: ports: - "3000:3000" environment: - - REFRESH_TOKEN= - - CLIENT_SECRET= - - CLIENT_ID= + - REFRESH_TOKEN=${REFRESH_TOKEN} + - CLIENT_SECRET=${CLIENT_SECRET} + - CLIENT_ID=${CLIENT_ID} #- LOG_LEVEL=DEBUG + # ALLOWED_ORIGINS=http://localhost:3000 + # SERVER_PORT=3000 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3000/health"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 5s diff --git a/main.go b/main.go index 3d19351..f03041d 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "net/http" "os" "os/signal" @@ -17,216 +18,336 @@ import ( "golang.org/x/oauth2" ) -const ( - serverPort = ":3000" - tokenRefreshURL = "https://accounts.spotify.com/api/token" - apiRetryDelay = 3 * time.Second - heartbeatDelay = 3 * time.Second -) +// Configuration holds application settings +type Configuration struct { + ServerPort string + AllowedOrigins []string + LogLevel logrus.Level + Spotify struct { + ClientID string + ClientSecret string + RefreshToken string + } +} var ( + config Configuration clients = make(map[*websocket.Conn]bool) clientsMutex sync.RWMutex broadcast = make(chan *spotify.CurrentlyPlaying) - connect = make(chan *websocket.Conn) - disconnect = make(chan *websocket.Conn) + connectChan = make(chan *websocket.Conn) upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return true - }, HandshakeTimeout: 10 * time.Second, ReadBufferSize: 1024, WriteBufferSize: 1024, } - spotifyClient spotify.Client - tokenSource oauth2.TokenSource - config *oauth2.Config - lastPlayingState *bool - lastTrackState *spotify.CurrentlyPlaying + spotifyClient spotify.Client + tokenSource oauth2.TokenSource + lastState struct { + sync.RWMutex + track *spotify.CurrentlyPlaying + playing bool + } +) + +const ( + defaultPort = ":3000" + tokenRefreshURL = "https://accounts.spotify.com/api/token" + apiRetryDelay = 3 * time.Second + heartbeatInterval = 3 * time.Second + writeTimeout = 10 * time.Second ) func main() { - logrus.SetFormatter(&logrus.TextFormatter{ForceColors: true}) - loadEnv() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - config = &oauth2.Config{ - ClientID: os.Getenv("CLIENT_ID"), - ClientSecret: os.Getenv("CLIENT_SECRET"), - Endpoint: oauth2.Endpoint{ - TokenURL: tokenRefreshURL, - }, - } - token := &oauth2.Token{RefreshToken: os.Getenv("REFRESH_TOKEN")} - tokenSource = config.TokenSource(context.Background(), token) + initializeApplication() + initializeSpotifyClient() - httpClient := oauth2.NewClient(context.Background(), tokenSource) - spotifyClient = spotify.NewClient(httpClient) - - http.HandleFunc("/", connectionHandler) - http.HandleFunc("/health", healthHandler) - - stop := make(chan os.Signal, 1) - signal.Notify(stop, os.Interrupt, syscall.SIGTERM) - - go trackFetcher() - go messageHandler() - go connectionManager() + router := http.NewServeMux() + router.HandleFunc("/", connectionHandler) + router.HandleFunc("/health", healthHandler) server := &http.Server{ - Addr: serverPort, + Addr: config.ServerPort, + Handler: router, } + go trackFetcher(ctx) + go messageHandler(ctx) + go connectionManager(ctx) + + startServer(server, ctx) + handleShutdown(server, cancel) +} + +func initializeApplication() { + logrus.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + TimestampFormat: time.RFC3339, + ForceColors: true, + }) + + if err := loadConfiguration(); err != nil { + logrus.Fatal(err) + } + + upgrader.CheckOrigin = func(r *http.Request) bool { + if len(config.AllowedOrigins) == 0 { + return true + } + origin := r.Header.Get("Origin") + for _, allowed := range config.AllowedOrigins { + if origin == allowed { + return true + } + } + return false + } +} + +func loadConfiguration() error { + _ = godotenv.Load() + + required := map[string]*string{ + "CLIENT_ID": &config.Spotify.ClientID, + "CLIENT_SECRET": &config.Spotify.ClientSecret, + "REFRESH_TOKEN": &config.Spotify.RefreshToken, + } + + for key, ptr := range required { + value := os.Getenv(key) + if value == "" { + return fmt.Errorf("missing required environment variable: %s", key) + } + *ptr = value + } + + config.ServerPort = defaultPort + if port := os.Getenv("SERVER_PORT"); port != "" { + config.ServerPort = ":" + strings.TrimLeft(port, ":") + } + + config.AllowedOrigins = strings.Split(os.Getenv("ALLOWED_ORIGINS"), ",") + + logLevel := strings.ToLower(os.Getenv("LOG_LEVEL")) + switch logLevel { + case "debug": + config.LogLevel = logrus.DebugLevel + case "warn": + config.LogLevel = logrus.WarnLevel + case "error": + config.LogLevel = logrus.ErrorLevel + default: + config.LogLevel = logrus.InfoLevel + } + logrus.SetLevel(config.LogLevel) + + return nil +} + +func initializeSpotifyClient() { + token := &oauth2.Token{RefreshToken: config.Spotify.RefreshToken} + oauthConfig := &oauth2.Config{ + ClientID: config.Spotify.ClientID, + ClientSecret: config.Spotify.ClientSecret, + Endpoint: oauth2.Endpoint{TokenURL: tokenRefreshURL}, + } + + tokenSource = oauthConfig.TokenSource(context.Background(), token) + spotifyClient = spotify.NewClient(oauth2.NewClient(context.Background(), tokenSource)) +} + +func startServer(server *http.Server, _ context.Context) { go func() { - logrus.Infof("Server started on %s", serverPort) + logrus.Infof("Server starting on %s", config.ServerPort) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logrus.Fatalf("Server failed: %v", err) + logrus.Fatalf("Server failed to start: %v", err) } }() +} - <-stop - logrus.Info("Shutting down server...") +func handleShutdown(server *http.Server, cancel context.CancelFunc) { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + <-sigChan + + logrus.Info("Initiating graceful shutdown...") + cancel() + + ctx, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelTimeout() clientsMutex.Lock() for client := range clients { - _ = client.Close() + client.Close() } clientsMutex.Unlock() - if err := server.Shutdown(context.Background()); err != nil { - logrus.Fatalf("Server shutdown failed: %v", err) - } - logrus.Info("Server exited cleanly") -} - -func loadEnv() { - if err := godotenv.Load(); err != nil { - logrus.Warn("Could not load .env file, falling back to system environment") - } - - requiredVars := []string{"CLIENT_ID", "CLIENT_SECRET", "REFRESH_TOKEN"} - for _, v := range requiredVars { - if os.Getenv(v) == "" { - logrus.Fatalf("Missing required environment variable: %s", v) - } - } - - logLevel := strings.ToLower(os.Getenv("LOG_LEVEL")) - if logLevel == "debug" { - logrus.SetLevel(logrus.DebugLevel) - logrus.Info("Log level set to DEBUG") - } else { - logrus.SetLevel(logrus.InfoLevel) - logrus.Info("Log level set to INFO") + if err := server.Shutdown(ctx); err != nil { + logrus.Errorf("Server shutdown error: %v", err) } + logrus.Info("Server shutdown complete") } func connectionHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Source", "github.com/skidoodle/spotify-ws") ws, err := upgrader.Upgrade(w, r, nil) if err != nil { - logrus.Errorf("Failed to upgrade to WebSocket: %v", err) + logrus.Errorf("WebSocket upgrade failed: %v", err) return } - connect <- ws - clientsMutex.RLock() - if lastTrackState != nil { - if err := ws.WriteJSON(lastTrackState); err != nil { - logrus.Errorf("Failed to send initial state to client: %v", err) + // Add client to the pool + clientsMutex.Lock() + clients[ws] = true + clientsMutex.Unlock() + + logrus.Debugf("New client connected: %s", ws.RemoteAddr()) + + // Send initial state if available + sendInitialState(ws) + + // Start monitoring the connection + go monitorConnection(ws) +} + +func connectionManager(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case client := <-connectChan: + // Add client to the pool + clientsMutex.Lock() + clients[client] = true + clientsMutex.Unlock() + + logrus.Debugf("New client connected: %s", client.RemoteAddr()) + + // Send initial state if available + sendInitialState(client) + + // Start monitoring the connection + go monitorConnection(client) } } - clientsMutex.RUnlock() +} +func monitorConnection(ws *websocket.Conn) { defer func() { - disconnect <- ws + // Clean up the connection + clientsMutex.Lock() + delete(clients, ws) + clientsMutex.Unlock() + + // Close the WebSocket connection + ws.Close() + logrus.Debugf("Client disconnected: %s", ws.RemoteAddr()) }() for { - if _, _, err := ws.ReadMessage(); err != nil { + // Set a read deadline to detect dead connections + ws.SetReadDeadline(time.Now().Add(30 * time.Second)) + + // Attempt to read a message (even though we don't expect any) + _, _, err := ws.NextReader() + if err != nil { + // Check if the error is a normal closure + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + logrus.Debugf("Client disconnected unexpectedly: %v", err) + } break } } } -func connectionManager() { +func sendInitialState(client *websocket.Conn) { + lastState.RLock() + defer lastState.RUnlock() + + if lastState.track != nil { + if err := client.WriteJSON(lastState.track); err != nil { + logrus.Errorf("Failed to send initial state: %v", err) + client.Close() + } + } +} + +func messageHandler(ctx context.Context) { for { select { - case client := <-connect: - clientsMutex.Lock() - clients[client] = true - clientsMutex.Unlock() - logrus.Debugf("New client connected: %v", client.RemoteAddr()) - case client := <-disconnect: - clientsMutex.Lock() - if _, ok := clients[client]; ok { - delete(clients, client) - logrus.Debugf("Client disconnected: %v", client.RemoteAddr()) - } - clientsMutex.Unlock() + case <-ctx.Done(): + return + case msg := <-broadcast: + broadcastToClients(msg) } } } -func messageHandler() { - for msg := range broadcast { - clientsMutex.RLock() - for client := range clients { - if err := client.WriteJSON(msg); err != nil { - logrus.Errorf("Failed to send message to client: %v", err) - disconnect <- client - } +func broadcastToClients(msg *spotify.CurrentlyPlaying) { + clientsMutex.RLock() + defer clientsMutex.RUnlock() + + for client := range clients { + client.SetWriteDeadline(time.Now().Add(writeTimeout)) + if err := client.WriteJSON(msg); err != nil { + logrus.Debugf("Broadcast failed: %v", err) + client.Close() } - clientsMutex.RUnlock() } } -func trackFetcher() { +func trackFetcher(ctx context.Context) { + ticker := time.NewTicker(heartbeatInterval) + defer ticker.Stop() + for { - current, err := fetchCurrentlyPlaying() - if err != nil { - logrus.Errorf("Error fetching currently playing track: %v", err) - time.Sleep(apiRetryDelay) - continue + select { + case <-ctx.Done(): + return + case <-ticker.C: + fetchAndBroadcastState() } - - if current != nil { - clientsMutex.Lock() - lastTrackState = current - clientsMutex.Unlock() - - if lastPlayingState == nil || *lastPlayingState != current.Playing { - logrus.Debugf("Playback state changed: is_playing=%v", current.Playing) - broadcast <- current - lastPlayingState = ¤t.Playing - } - - if current.Playing { - broadcast <- current - time.Sleep(heartbeatDelay) - continue - } - } - time.Sleep(heartbeatDelay) } } -func fetchCurrentlyPlaying() (*spotify.CurrentlyPlaying, error) { +func fetchAndBroadcastState() { current, err := spotifyClient.PlayerCurrentlyPlaying() - if err != nil && err.Error() == "token expired" { - logrus.Warn("Spotify token expired, refreshing token...") - newToken, err := tokenSource.Token() - if err != nil { - return nil, err - } - httpClient := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(newToken)) - spotifyClient = spotify.NewClient(httpClient) - return spotifyClient.PlayerCurrentlyPlaying() + if err != nil { + logrus.Errorf("Failed to fetch playback state: %v", err) + time.Sleep(apiRetryDelay) + return + } + + updateState(current) +} + +func updateState(current *spotify.CurrentlyPlaying) { + lastState.Lock() + defer lastState.Unlock() + + if current == nil { + return + } + + stateChanged := lastState.track == nil || + lastState.track.Item.ID != current.Item.ID || + lastState.playing != current.Playing + + lastState.track = current + lastState.playing = current.Playing + + if stateChanged || current.Playing { + broadcast <- current } - return current, err } func healthHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte("OK")) + w.Write([]byte("OK")) } diff --git a/readme.md b/readme.md index 326cde2..844103e 100644 --- a/readme.md +++ b/readme.md @@ -20,7 +20,6 @@ docker run -p 3000:3000 spotify-ws:main ```sh git clone https://github.com/skidoodle/spotify-ws cd spotify-ws -go get go run main.go ``` @@ -30,17 +29,19 @@ go run main.go ```yaml services: - spotify-ws: - container_name: spotify-ws - image: 'ghcr.io/skidoodle/spotify-ws:main' - restart: unless-stopped - environment: - - REFRESH_TOKEN= - - CLIENT_SECRET= - - CLIENT_ID= - #- LOG_LEVEL=debug - ports: - - '3000:3000' + spotify-ws: + image: ghcr.io/skidoodle/spotify-ws:main + container_name: spotify-ws + restart: unless-stopped + ports: + - "3000:3000" + environment: + - REFRESH_TOKEN=${REFRESH_TOKEN} + - CLIENT_SECRET=${CLIENT_SECRET} + - CLIENT_ID=${CLIENT_ID} + #- LOG_LEVEL=DEBUG + #- ALLOWED_ORIGINS=http://localhost:3000 + #- SERVER_PORT=3000 ``` ### Docker run @@ -51,10 +52,9 @@ docker run \ --name=spotify-ws \ --restart=unless-stopped \ -p 3000:3000 \ - -e CLIENT_ID= \ - -e CLIENT_SECRET= \ - -e REFRESH_TOKEN= \ - #-e LOG_LEVEL=DEBUG \ + -e CLIENT_ID=${CLIENT_ID} \ + -e CLIENT_SECRET=${CLIENT_SECRET} \ + -e REFRESH_TOKEN=${REFRESH_TOKEN} \ ghcr.io/skidoodle/spotify-ws:main ```