mirror of
https://github.com/skidoodle/spotify-ws.git
synced 2025-02-15 06:09:14 +01:00
improved function
This commit is contained in:
parent
cdec626875
commit
a12ebd132f
8 changed files with 177 additions and 138 deletions
271
main.go
271
main.go
|
@ -2,202 +2,231 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/joho/godotenv"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/zmb3/spotify"
|
||||
"golang.org/x/oauth2"
|
||||
)
|
||||
|
||||
const (
|
||||
serverPort = ":3000"
|
||||
tokenRefreshURL = "https://accounts.spotify.com/api/token"
|
||||
apiRetryDelay = 3 * time.Second
|
||||
heartbeatDelay = 3 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
clients = make(map[*websocket.Conn]bool) // Map to keep track of connected clients
|
||||
clientsMutex sync.Mutex // Mutex to protect access to clients map
|
||||
broadcast = make(chan *spotify.CurrentlyPlaying) // Channel for broadcasting currently playing track
|
||||
connect = make(chan *websocket.Conn) // Channel for managing new connections
|
||||
disconnect = make(chan *websocket.Conn) // Channel for managing client disconnections
|
||||
upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool { return true }, // Allow all origins
|
||||
HandshakeTimeout: 10 * time.Second, // Timeout for WebSocket handshake
|
||||
ReadBufferSize: 1024, // Buffer size for reading incoming messages
|
||||
WriteBufferSize: 1024, // Buffer size for writing outgoing messages
|
||||
Subprotocols: []string{"binary"}, // Supported subprotocols
|
||||
clients = make(map[*websocket.Conn]bool)
|
||||
clientsMutex sync.RWMutex
|
||||
broadcast = make(chan *spotify.CurrentlyPlaying)
|
||||
connect = make(chan *websocket.Conn)
|
||||
disconnect = make(chan *websocket.Conn)
|
||||
|
||||
upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true
|
||||
},
|
||||
HandshakeTimeout: 10 * time.Second,
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
spotifyClient spotify.Client // Spotify API client
|
||||
tokenSource oauth2.TokenSource // OAuth2 token source
|
||||
config *oauth2.Config // OAuth2 configuration
|
||||
|
||||
spotifyClient spotify.Client
|
||||
tokenSource oauth2.TokenSource
|
||||
config *oauth2.Config
|
||||
lastPlayingState *bool
|
||||
lastTrackState *spotify.CurrentlyPlaying
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Load environment variables from .env file if not already set
|
||||
if os.Getenv("CLIENT_ID") == "" || os.Getenv("CLIENT_SECRET") == "" || os.Getenv("REFRESH_TOKEN") == "" {
|
||||
if err := godotenv.Load(); err != nil {
|
||||
log.Fatalf("Error loading .env file: %v", err)
|
||||
}
|
||||
}
|
||||
logrus.SetFormatter(&logrus.TextFormatter{ForceColors: true})
|
||||
loadEnv()
|
||||
|
||||
clientID := os.Getenv("CLIENT_ID")
|
||||
clientSecret := os.Getenv("CLIENT_SECRET")
|
||||
refreshToken := os.Getenv("REFRESH_TOKEN")
|
||||
|
||||
// Setup OAuth2 configuration for Spotify API
|
||||
config = &oauth2.Config{
|
||||
ClientID: clientID,
|
||||
ClientSecret: clientSecret,
|
||||
ClientID: os.Getenv("CLIENT_ID"),
|
||||
ClientSecret: os.Getenv("CLIENT_SECRET"),
|
||||
Endpoint: oauth2.Endpoint{
|
||||
TokenURL: "https://accounts.spotify.com/api/token",
|
||||
TokenURL: tokenRefreshURL,
|
||||
},
|
||||
}
|
||||
|
||||
token := &oauth2.Token{RefreshToken: refreshToken}
|
||||
token := &oauth2.Token{RefreshToken: os.Getenv("REFRESH_TOKEN")}
|
||||
tokenSource = config.TokenSource(context.Background(), token)
|
||||
|
||||
// Create an OAuth2 HTTP client
|
||||
httpClient := oauth2.NewClient(context.Background(), tokenSource)
|
||||
spotifyClient = spotify.NewClient(httpClient)
|
||||
|
||||
// Handle WebSocket connections at the root endpoint
|
||||
http.HandleFunc("/", ConnectionHandler)
|
||||
http.HandleFunc("/", connectionHandler)
|
||||
http.HandleFunc("/health", healthHandler)
|
||||
|
||||
// Log server start-up and initialize background tasks
|
||||
log.Println("Server started on :3000")
|
||||
go TrackFetcher() // Periodically fetch currently playing track from Spotify
|
||||
go MessageHandler() // Broadcast messages to connected clients
|
||||
go ConnectionManager() // Manage client connections
|
||||
stop := make(chan os.Signal, 1)
|
||||
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
// Start the HTTP server
|
||||
if err := http.ListenAndServe(":3000", nil); err != nil {
|
||||
log.Fatalf("Error starting server: %v", err)
|
||||
go trackFetcher()
|
||||
go messageHandler()
|
||||
go connectionManager()
|
||||
|
||||
server := &http.Server{
|
||||
Addr: serverPort,
|
||||
}
|
||||
|
||||
go func() {
|
||||
logrus.Infof("Server started on %s", serverPort)
|
||||
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
logrus.Fatalf("Server failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
<-stop
|
||||
logrus.Info("Shutting down server...")
|
||||
|
||||
clientsMutex.Lock()
|
||||
for client := range clients {
|
||||
_ = client.Close()
|
||||
}
|
||||
clientsMutex.Unlock()
|
||||
|
||||
if err := server.Shutdown(context.Background()); err != nil {
|
||||
logrus.Fatalf("Server shutdown failed: %v", err)
|
||||
}
|
||||
logrus.Info("Server exited cleanly")
|
||||
}
|
||||
|
||||
func loadEnv() {
|
||||
if err := godotenv.Load(); err != nil {
|
||||
logrus.Warn("Could not load .env file, falling back to system environment")
|
||||
}
|
||||
|
||||
requiredVars := []string{"CLIENT_ID", "CLIENT_SECRET", "REFRESH_TOKEN"}
|
||||
for _, v := range requiredVars {
|
||||
if os.Getenv(v) == "" {
|
||||
logrus.Fatalf("Missing required environment variable: %s", v)
|
||||
}
|
||||
}
|
||||
|
||||
logLevel := strings.ToLower(os.Getenv("LOG_LEVEL"))
|
||||
if logLevel == "debug" {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
logrus.Info("Log level set to DEBUG")
|
||||
} else {
|
||||
logrus.SetLevel(logrus.InfoLevel)
|
||||
logrus.Info("Log level set to INFO")
|
||||
}
|
||||
}
|
||||
|
||||
// ConnectionHandler upgrades HTTP connections to WebSocket and handles communication with clients
|
||||
func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
func connectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ws, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to upgrade to WebSocket: %v", err)
|
||||
return
|
||||
}
|
||||
connect <- ws
|
||||
|
||||
clientsMutex.RLock()
|
||||
if lastTrackState != nil {
|
||||
if err := ws.WriteJSON(lastTrackState); err != nil {
|
||||
logrus.Errorf("Failed to send initial state to client: %v", err)
|
||||
}
|
||||
}
|
||||
clientsMutex.RUnlock()
|
||||
|
||||
defer func() {
|
||||
disconnect <- ws
|
||||
err := ws.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// Immediately send the current track to the newly connected client
|
||||
currentTrack, err := spotifyClient.PlayerCurrentlyPlaying()
|
||||
if err != nil {
|
||||
log.Printf("Error getting currently playing track: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Send the current track information to the client
|
||||
err = ws.WriteJSON(currentTrack)
|
||||
if err != nil {
|
||||
log.Printf("Error sending current track to client: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Keep the connection open to listen for incoming messages (heartbeat)
|
||||
for {
|
||||
_, _, err := ws.ReadMessage()
|
||||
if err != nil {
|
||||
disconnect <- ws
|
||||
if _, _, err := ws.ReadMessage(); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ConnectionManager manages client connections and disconnections using channels
|
||||
func ConnectionManager() {
|
||||
func connectionManager() {
|
||||
for {
|
||||
select {
|
||||
case client := <-connect:
|
||||
clientsMutex.Lock()
|
||||
clients[client] = true
|
||||
clientsMutex.Unlock()
|
||||
logrus.Debugf("New client connected: %v", client.RemoteAddr())
|
||||
case client := <-disconnect:
|
||||
clientsMutex.Lock()
|
||||
if _, ok := clients[client]; ok {
|
||||
delete(clients, client)
|
||||
err := client.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
logrus.Debugf("Client disconnected: %v", client.RemoteAddr())
|
||||
}
|
||||
clientsMutex.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MessageHandler continuously listens for messages on the broadcast channel and sends them to all connected clients
|
||||
func MessageHandler() {
|
||||
func messageHandler() {
|
||||
for msg := range broadcast {
|
||||
clientsMutex.Lock()
|
||||
clientsMutex.RLock()
|
||||
for client := range clients {
|
||||
err := client.WriteJSON(msg)
|
||||
if err != nil {
|
||||
err := client.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
delete(clients, client)
|
||||
if err := client.WriteJSON(msg); err != nil {
|
||||
logrus.Errorf("Failed to send message to client: %v", err)
|
||||
disconnect <- client
|
||||
}
|
||||
}
|
||||
clientsMutex.Unlock()
|
||||
clientsMutex.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
// TrackFetcher periodically fetches the currently playing track from the Spotify API and broadcasts it to clients
|
||||
func TrackFetcher() {
|
||||
var playing bool
|
||||
func trackFetcher() {
|
||||
for {
|
||||
// Fetch the currently playing track
|
||||
current, err := spotifyClient.PlayerCurrentlyPlaying()
|
||||
current, err := fetchCurrentlyPlaying()
|
||||
if err != nil {
|
||||
log.Printf("Error getting currently playing track: %v", err)
|
||||
// Refresh the access token if it has expired
|
||||
if err.Error() == "token expired" {
|
||||
log.Println("Token expired, refreshing token...")
|
||||
newToken, err := tokenSource.Token()
|
||||
if err != nil {
|
||||
log.Fatalf("Couldn't refresh token: %v", err)
|
||||
}
|
||||
httpClient := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(newToken))
|
||||
spotifyClient = spotify.NewClient(httpClient)
|
||||
}
|
||||
// Wait before retrying to avoid overwhelming the API
|
||||
time.Sleep(30 * time.Minute)
|
||||
logrus.Errorf("Error fetching currently playing track: %v", err)
|
||||
time.Sleep(apiRetryDelay)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the track is playing
|
||||
switch {
|
||||
case current.Playing:
|
||||
broadcast <- current
|
||||
playing = true
|
||||
// Send updates every 3 seconds while playing
|
||||
for current.Playing {
|
||||
time.Sleep(3 * time.Second)
|
||||
current, err = spotifyClient.PlayerCurrentlyPlaying()
|
||||
if err != nil {
|
||||
log.Printf("Error getting currently playing track: %v", err)
|
||||
break
|
||||
}
|
||||
broadcast <- current
|
||||
}
|
||||
case !current.Playing && playing:
|
||||
playing = false
|
||||
}
|
||||
if current != nil {
|
||||
clientsMutex.Lock()
|
||||
lastTrackState = current
|
||||
clientsMutex.Unlock()
|
||||
|
||||
// Wait before checking again to avoid overwhelming the API
|
||||
time.Sleep(3 * time.Second)
|
||||
if lastPlayingState == nil || *lastPlayingState != current.Playing {
|
||||
logrus.Debugf("Playback state changed: is_playing=%v", current.Playing)
|
||||
broadcast <- current
|
||||
lastPlayingState = ¤t.Playing
|
||||
}
|
||||
|
||||
if current.Playing {
|
||||
broadcast <- current
|
||||
time.Sleep(heartbeatDelay)
|
||||
continue
|
||||
}
|
||||
}
|
||||
time.Sleep(heartbeatDelay)
|
||||
}
|
||||
}
|
||||
|
||||
func fetchCurrentlyPlaying() (*spotify.CurrentlyPlaying, error) {
|
||||
current, err := spotifyClient.PlayerCurrentlyPlaying()
|
||||
if err != nil && err.Error() == "token expired" {
|
||||
logrus.Warn("Spotify token expired, refreshing token...")
|
||||
newToken, err := tokenSource.Token()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpClient := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(newToken))
|
||||
spotifyClient = spotify.NewClient(httpClient)
|
||||
return spotifyClient.PlayerCurrentlyPlaying()
|
||||
}
|
||||
return current, err
|
||||
}
|
||||
|
||||
func healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte("OK"))
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue