From c077f41ce6e25f844914e1cc2c309c6bf5006cd2 Mon Sep 17 00:00:00 2001 From: whysman Date: Sun, 10 Nov 2024 16:28:42 -0500 Subject: [PATCH] Removed the init function, mod the broadcast to make it more readable and broadcast expiration on status clear --- main.go | 86 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 32 deletions(-) diff --git a/main.go b/main.go index 8dbbde6..054e0de 100644 --- a/main.go +++ b/main.go @@ -27,11 +27,11 @@ var ( ) type Message struct { - Id string - Name string - Image string - Status string - Timestamp string + Id string `json:"Id"` + Name string `json:"Name"` + Image string `json:"Image"` + Status string `json:"Status"` + Timestamp string `json:"Timestamp"` } type Config struct { @@ -51,7 +51,7 @@ type Server struct { mu sync.Mutex } -func GetConfig(configPath string) (*Config, error) { +func getConfig(configPath string) (*Config, error) { // Create config structure config := &Config{} @@ -78,19 +78,24 @@ func GetConfig(configPath string) (*Config, error) { return config, nil } -func init() { - cfg, err := GetConfig("config.yaml") +func loadConfig() *Config { + cfg, err := getConfig("config.yaml") if err != nil { log.Fatal(err) } + return cfg +} + +func initRedis(cfg Config) { redisClient = redis.NewClient(&redis.Options{ Addr: cfg.Redis.Host + ":" + cfg.Redis.Port, }) - _, err = redisClient.Do(context.Background(), "CONFIG", "SET", "notify-keyspace-events", "KEA").Result() + _, err := redisClient.Do(context.Background(), "CONFIG", "SET", "notify-keyspace-events", "KEA").Result() if err != nil { fmt.Printf("unable to set keyspace events %v", err.Error()) os.Exit(1) } + redisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex") } func newServer() *Server { @@ -157,23 +162,31 @@ func (s *Server) handleMessages() { fmt.Printf("Error marshalling message: %v\n", err) continue } - //fmt.Printf("%s\n", string(msgJSON)) - - err = redisClient.Set(ctx, message.Id, msgJSON, 20*time.Second).Err() - if err != nil { - log.Println("Failed to store payload in Redis:", err) - return - } - - /*for client := range s.clients { - err = client.conn.WriteMessage(websocket.TextMessage, msgJSON) + fmt.Printf("Message status %s\n", message.Status) + if message.Status == "expired" { + err = redisClient.Del(ctx, message.Id).Err() if err != nil { - fmt.Printf("Error writing message: %v\n", err) - client.conn.Close() - delete(s.clients, client) + fmt.Printf("Error deleting message: %v\n", err) } - }*/ - broadcastAllRecords(s) + broadcastExpiredRecord(s, message.Id) + } else { + err = redisClient.Set(ctx, message.Id, msgJSON, 20*time.Second).Err() + if err != nil { + log.Println("Failed to store payload in Redis:", err) + return + } + + /*for client := range s.clients { + err = client.conn.WriteMessage(websocket.TextMessage, msgJSON) + if err != nil { + fmt.Printf("Error writing message: %v\n", err) + client.conn.Close() + delete(s.clients, client) + } + }*/ + // + broadcastAllRecords(s) + } s.mu.Unlock() } } @@ -184,11 +197,16 @@ func broadcastAllRecords(s *Server) { log.Println("Error fetching all records:", err) } if len(allRecords) > 0 { - for _, message := range allRecords { - fmt.Printf("Broadcasting %s to %d clients\n", string(message), len(s.clients)) + var message Message + for _, msgContent := range allRecords { + err = json.Unmarshal([]byte(msgContent), &message) + if err != nil { + log.Fatalf("Unable to marshal JSON due to %s", err) + } + fmt.Printf("Broadcasting %s,%s,%s,%s to %d clients\n", message.Id, message.Status, message.Name, message.Timestamp, len(s.clients)) for client := range s.clients { - if err := client.conn.WriteMessage(websocket.TextMessage, []byte(message)); err != nil { + if err := client.conn.WriteMessage(websocket.TextMessage, []byte(msgContent)); err != nil { log.Println("Failed to broadcast update:", err) } } @@ -218,9 +236,14 @@ func broadcastAllRecordsToClient(c *Client) { log.Println("Error fetching all records:", err) } fmt.Printf("Broadcasting %d records to client\n", len(allRecords)) - for _, message := range allRecords { - fmt.Printf("Broadcasting %s\n", string(message)) - if err := c.conn.WriteMessage(websocket.TextMessage, []byte(message)); err != nil { + var message Message + for _, msgContent := range allRecords { + err = json.Unmarshal([]byte(msgContent), &message) + if err != nil { + log.Fatalf("Unable to marshal JSON due to %s", err) + } + fmt.Printf("Broadcasting %s,%s,%s,%s\n", message.Id, message.Status, message.Name, message.Timestamp) + if err := c.conn.WriteMessage(websocket.TextMessage, []byte(msgContent)); err != nil { log.Println("Failed to broadcast update:", err) } } @@ -266,8 +289,7 @@ func fetchAllRecords() (map[string]string, error) { } func main() { - - redisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex") + initRedis(*loadConfig()) server := newServer() http.HandleFunc("/ws", server.handleConnections)