diff --git a/go.mod b/go.mod index 7a901fb..cdac06b 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22 require ( github.com/go-redis/redis/v8 v8.11.5 + github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.3 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index f723e2e..3270a05 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= diff --git a/main.go b/main.go index 054e0de..f4d6edb 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "time" "github.com/go-redis/redis/v8" + "github.com/gorilla/mux" "github.com/gorilla/websocket" "gopkg.in/yaml.v2" ) @@ -152,45 +153,6 @@ func (s *Server) handleConnections(w http.ResponseWriter, r *http.Request) { } } -func (s *Server) handleMessages() { - for { - message := <-s.broadcast - s.mu.Lock() - - msgJSON, err := json.Marshal(message) - if err != nil { - fmt.Printf("Error marshalling message: %v\n", err) - continue - } - fmt.Printf("Message status %s\n", message.Status) - if message.Status == "expired" { - err = redisClient.Del(ctx, message.Id).Err() - if err != nil { - fmt.Printf("Error deleting message: %v\n", err) - } - broadcastExpiredRecord(s, message.Id) - } else { - err = redisClient.Set(ctx, message.Id, msgJSON, 20*time.Second).Err() - if err != nil { - log.Println("Failed to store payload in Redis:", err) - return - } - - /*for client := range s.clients { - err = client.conn.WriteMessage(websocket.TextMessage, msgJSON) - if err != nil { - fmt.Printf("Error writing message: %v\n", err) - client.conn.Close() - delete(s.clients, client) - } - }*/ - // - broadcastAllRecords(s) - } - s.mu.Unlock() - } -} - func broadcastAllRecords(s *Server) { allRecords, err := fetchAllRecords() if err != nil { @@ -201,7 +163,7 @@ func broadcastAllRecords(s *Server) { for _, msgContent := range allRecords { err = json.Unmarshal([]byte(msgContent), &message) if err != nil { - log.Fatalf("Unable to marshal JSON due to %s", err) + log.Println("Unable to marshal JSON due to: ", err) } fmt.Printf("Broadcasting %s,%s,%s,%s to %d clients\n", message.Id, message.Status, message.Name, message.Timestamp, len(s.clients)) @@ -214,7 +176,7 @@ func broadcastAllRecords(s *Server) { } } -func broadcastExpiredRecord(s *Server, expired string) { +func broadcastRemovedRecords(s *Server, expired string) { var message Message = Message{Id: expired, Name: "", Image: "", Status: "expired", Timestamp: ""} var msgJSON, err = json.Marshal(message) if err != nil { @@ -240,7 +202,7 @@ func broadcastAllRecordsToClient(c *Client) { for _, msgContent := range allRecords { err = json.Unmarshal([]byte(msgContent), &message) if err != nil { - log.Fatalf("Unable to marshal JSON due to %s", err) + log.Println("Unable to marshal JSON due to: ", err) } fmt.Printf("Broadcasting %s,%s,%s,%s\n", message.Id, message.Status, message.Name, message.Timestamp) if err := c.conn.WriteMessage(websocket.TextMessage, []byte(msgContent)); err != nil { @@ -263,7 +225,7 @@ func (s *Server) listenForExpirationEvents() { expiredID := msg.Payload fmt.Println(expiredID) // Broadcast expiration event - broadcastExpiredRecord(s, expiredID) + broadcastRemovedRecords(s, expiredID) fmt.Printf("Done Broadcasting Expiration\n") } } @@ -288,16 +250,118 @@ func fetchAllRecords() (map[string]string, error) { return records, nil } +func statusCheck(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed) + return + } + + // Parse request body to extract ID + var request struct { + Id string `json:"Id"` + } + err := json.NewDecoder(r.Body).Decode(&request) + if err != nil || request.Id == "" { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + // Check if ID exists in Redis + value, err := redisClient.Get(ctx, request.Id).Result() + var message Message + if errors.Is(err, redis.Nil) { + message = Message{Id: request.Id, Name: "", Image: "", Status: "none", Timestamp: ""} + fmt.Println("Returning: ", message) + } else if err != nil { + http.Error(w, "Redis error", http.StatusInternalServerError) + return + } else { + err = json.Unmarshal([]byte(value), &message) + fmt.Printf("Returning %s,%s,%s,%s\n", message.Id, message.Status, message.Name, message.Timestamp) + if err != nil { + http.Error(w, "Failed to parse message data", http.StatusInternalServerError) + return + } + } + + // Send JSON response with full Message struct + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(message) + if err != nil { + http.Error(w, "Failed to encode message data", http.StatusInternalServerError) + return + } +} + +func setState(w http.ResponseWriter, r *http.Request, s *Server) { + if r.Method != http.MethodPost { + http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed) + return + } + + // Decode the incoming JSON to a Message struct + var message Message + err := json.NewDecoder(r.Body).Decode(&message) + fmt.Println("Message: ", message) + + if err != nil || message.Id == "" || message.Status == "" { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + // If Status is "none", delete the record and broadcast expiration + if message.Status == "none" { + err = redisClient.Del(ctx, message.Id).Err() + if err != nil { + fmt.Printf("Error deleting message: %v\n", err) + return + } + // Broadcast the expiration event to all clients + broadcastRemovedRecords(s, message.Id) + w.WriteHeader(http.StatusOK) + return + } else { + // Otherwise, store the updated message in Redis + msgJSON, err := json.Marshal(message) + if err != nil { + http.Error(w, "Failed to encode message", http.StatusInternalServerError) + return + } + + // Set a timeout based on the status (can be customized as needed) + timeout := 20 * time.Second + err = redisClient.Set(ctx, message.Id, msgJSON, timeout).Err() + if err != nil { + log.Println("Failed to store payload in Redis:", err) + http.Error(w, "Failed to store message in Redis", http.StatusInternalServerError) + return + } + + // Broadcast all records to all clients + broadcastAllRecords(s) + w.WriteHeader(http.StatusOK) + return + } +} + func main() { initRedis(*loadConfig()) server := newServer() - http.HandleFunc("/ws", server.handleConnections) - go server.handleMessages() + router := mux.NewRouter() + + // Register routes on the mux router + router.HandleFunc("/ws", server.handleConnections).Methods("GET") + router.HandleFunc("/status", statusCheck).Methods("POST") + router.HandleFunc("/set", func(w http.ResponseWriter, r *http.Request) { + setState(w, r, server) + }).Methods("POST") + + // Start server and other necessary goroutines go server.listenForExpirationEvents() + // Pass the mux router to ListenAndServe fmt.Println("Server started on :8080") - err := http.ListenAndServe(":8080", nil) + err := http.ListenAndServe(":8080", router) if err != nil { fmt.Printf("Server error: %v\n", err) }