Added endpoints to replace ws for incoming comms
All checks were successful
Build Pogdark API / Build Pogdark API (push) Successful in 38s
All checks were successful
Build Pogdark API / Build Pogdark API (push) Successful in 38s
This commit is contained in:
parent
c077f41ce6
commit
7d4cfb6e05
1
go.mod
1
go.mod
@ -4,6 +4,7 @@ go 1.22
|
||||
|
||||
require (
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
github.com/gorilla/mux v1.8.1
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
)
|
||||
|
2
go.sum
2
go.sum
@ -6,6 +6,8 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
|
||||
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
|
||||
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
|
||||
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||
|
156
main.go
156
main.go
@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/websocket"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
@ -152,45 +153,6 @@ func (s *Server) handleConnections(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleMessages() {
|
||||
for {
|
||||
message := <-s.broadcast
|
||||
s.mu.Lock()
|
||||
|
||||
msgJSON, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
fmt.Printf("Error marshalling message: %v\n", err)
|
||||
continue
|
||||
}
|
||||
fmt.Printf("Message status %s\n", message.Status)
|
||||
if message.Status == "expired" {
|
||||
err = redisClient.Del(ctx, message.Id).Err()
|
||||
if err != nil {
|
||||
fmt.Printf("Error deleting message: %v\n", err)
|
||||
}
|
||||
broadcastExpiredRecord(s, message.Id)
|
||||
} else {
|
||||
err = redisClient.Set(ctx, message.Id, msgJSON, 20*time.Second).Err()
|
||||
if err != nil {
|
||||
log.Println("Failed to store payload in Redis:", err)
|
||||
return
|
||||
}
|
||||
|
||||
/*for client := range s.clients {
|
||||
err = client.conn.WriteMessage(websocket.TextMessage, msgJSON)
|
||||
if err != nil {
|
||||
fmt.Printf("Error writing message: %v\n", err)
|
||||
client.conn.Close()
|
||||
delete(s.clients, client)
|
||||
}
|
||||
}*/
|
||||
//
|
||||
broadcastAllRecords(s)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func broadcastAllRecords(s *Server) {
|
||||
allRecords, err := fetchAllRecords()
|
||||
if err != nil {
|
||||
@ -201,7 +163,7 @@ func broadcastAllRecords(s *Server) {
|
||||
for _, msgContent := range allRecords {
|
||||
err = json.Unmarshal([]byte(msgContent), &message)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to marshal JSON due to %s", err)
|
||||
log.Println("Unable to marshal JSON due to: ", err)
|
||||
}
|
||||
fmt.Printf("Broadcasting %s,%s,%s,%s to %d clients\n", message.Id, message.Status, message.Name, message.Timestamp, len(s.clients))
|
||||
|
||||
@ -214,7 +176,7 @@ func broadcastAllRecords(s *Server) {
|
||||
}
|
||||
}
|
||||
|
||||
func broadcastExpiredRecord(s *Server, expired string) {
|
||||
func broadcastRemovedRecords(s *Server, expired string) {
|
||||
var message Message = Message{Id: expired, Name: "", Image: "", Status: "expired", Timestamp: ""}
|
||||
var msgJSON, err = json.Marshal(message)
|
||||
if err != nil {
|
||||
@ -240,7 +202,7 @@ func broadcastAllRecordsToClient(c *Client) {
|
||||
for _, msgContent := range allRecords {
|
||||
err = json.Unmarshal([]byte(msgContent), &message)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to marshal JSON due to %s", err)
|
||||
log.Println("Unable to marshal JSON due to: ", err)
|
||||
}
|
||||
fmt.Printf("Broadcasting %s,%s,%s,%s\n", message.Id, message.Status, message.Name, message.Timestamp)
|
||||
if err := c.conn.WriteMessage(websocket.TextMessage, []byte(msgContent)); err != nil {
|
||||
@ -263,7 +225,7 @@ func (s *Server) listenForExpirationEvents() {
|
||||
expiredID := msg.Payload
|
||||
fmt.Println(expiredID)
|
||||
// Broadcast expiration event
|
||||
broadcastExpiredRecord(s, expiredID)
|
||||
broadcastRemovedRecords(s, expiredID)
|
||||
fmt.Printf("Done Broadcasting Expiration\n")
|
||||
}
|
||||
}
|
||||
@ -288,16 +250,118 @@ func fetchAllRecords() (map[string]string, error) {
|
||||
return records, nil
|
||||
}
|
||||
|
||||
func statusCheck(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse request body to extract ID
|
||||
var request struct {
|
||||
Id string `json:"Id"`
|
||||
}
|
||||
err := json.NewDecoder(r.Body).Decode(&request)
|
||||
if err != nil || request.Id == "" {
|
||||
http.Error(w, "Invalid request body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if ID exists in Redis
|
||||
value, err := redisClient.Get(ctx, request.Id).Result()
|
||||
var message Message
|
||||
if errors.Is(err, redis.Nil) {
|
||||
message = Message{Id: request.Id, Name: "", Image: "", Status: "none", Timestamp: ""}
|
||||
fmt.Println("Returning: ", message)
|
||||
} else if err != nil {
|
||||
http.Error(w, "Redis error", http.StatusInternalServerError)
|
||||
return
|
||||
} else {
|
||||
err = json.Unmarshal([]byte(value), &message)
|
||||
fmt.Printf("Returning %s,%s,%s,%s\n", message.Id, message.Status, message.Name, message.Timestamp)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to parse message data", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Send JSON response with full Message struct
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
err = json.NewEncoder(w).Encode(message)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to encode message data", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func setState(w http.ResponseWriter, r *http.Request, s *Server) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
// Decode the incoming JSON to a Message struct
|
||||
var message Message
|
||||
err := json.NewDecoder(r.Body).Decode(&message)
|
||||
fmt.Println("Message: ", message)
|
||||
|
||||
if err != nil || message.Id == "" || message.Status == "" {
|
||||
http.Error(w, "Invalid request body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// If Status is "none", delete the record and broadcast expiration
|
||||
if message.Status == "none" {
|
||||
err = redisClient.Del(ctx, message.Id).Err()
|
||||
if err != nil {
|
||||
fmt.Printf("Error deleting message: %v\n", err)
|
||||
return
|
||||
}
|
||||
// Broadcast the expiration event to all clients
|
||||
broadcastRemovedRecords(s, message.Id)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
} else {
|
||||
// Otherwise, store the updated message in Redis
|
||||
msgJSON, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to encode message", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Set a timeout based on the status (can be customized as needed)
|
||||
timeout := 20 * time.Second
|
||||
err = redisClient.Set(ctx, message.Id, msgJSON, timeout).Err()
|
||||
if err != nil {
|
||||
log.Println("Failed to store payload in Redis:", err)
|
||||
http.Error(w, "Failed to store message in Redis", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Broadcast all records to all clients
|
||||
broadcastAllRecords(s)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
initRedis(*loadConfig())
|
||||
|
||||
server := newServer()
|
||||
http.HandleFunc("/ws", server.handleConnections)
|
||||
go server.handleMessages()
|
||||
router := mux.NewRouter()
|
||||
|
||||
// Register routes on the mux router
|
||||
router.HandleFunc("/ws", server.handleConnections).Methods("GET")
|
||||
router.HandleFunc("/status", statusCheck).Methods("POST")
|
||||
router.HandleFunc("/set", func(w http.ResponseWriter, r *http.Request) {
|
||||
setState(w, r, server)
|
||||
}).Methods("POST")
|
||||
|
||||
// Start server and other necessary goroutines
|
||||
go server.listenForExpirationEvents()
|
||||
|
||||
// Pass the mux router to ListenAndServe
|
||||
fmt.Println("Server started on :8080")
|
||||
err := http.ListenAndServe(":8080", nil)
|
||||
err := http.ListenAndServe(":8080", router)
|
||||
if err != nil {
|
||||
fmt.Printf("Server error: %v\n", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user