Removed the init function, mod the broadcast to make it more readable and broadcast expiration on status clear
All checks were successful
Build Pogdark API / Build Pogdark API (push) Successful in 25s
All checks were successful
Build Pogdark API / Build Pogdark API (push) Successful in 25s
This commit is contained in:
parent
0afe8af553
commit
c077f41ce6
86
main.go
86
main.go
@ -27,11 +27,11 @@ var (
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
Id string
|
||||
Name string
|
||||
Image string
|
||||
Status string
|
||||
Timestamp string
|
||||
Id string `json:"Id"`
|
||||
Name string `json:"Name"`
|
||||
Image string `json:"Image"`
|
||||
Status string `json:"Status"`
|
||||
Timestamp string `json:"Timestamp"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@ -51,7 +51,7 @@ type Server struct {
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func GetConfig(configPath string) (*Config, error) {
|
||||
func getConfig(configPath string) (*Config, error) {
|
||||
// Create config structure
|
||||
config := &Config{}
|
||||
|
||||
@ -78,19 +78,24 @@ func GetConfig(configPath string) (*Config, error) {
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
cfg, err := GetConfig("config.yaml")
|
||||
func loadConfig() *Config {
|
||||
cfg, err := getConfig("config.yaml")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
func initRedis(cfg Config) {
|
||||
redisClient = redis.NewClient(&redis.Options{
|
||||
Addr: cfg.Redis.Host + ":" + cfg.Redis.Port,
|
||||
})
|
||||
_, err = redisClient.Do(context.Background(), "CONFIG", "SET", "notify-keyspace-events", "KEA").Result()
|
||||
_, err := redisClient.Do(context.Background(), "CONFIG", "SET", "notify-keyspace-events", "KEA").Result()
|
||||
if err != nil {
|
||||
fmt.Printf("unable to set keyspace events %v", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
redisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex")
|
||||
}
|
||||
|
||||
func newServer() *Server {
|
||||
@ -157,23 +162,31 @@ func (s *Server) handleMessages() {
|
||||
fmt.Printf("Error marshalling message: %v\n", err)
|
||||
continue
|
||||
}
|
||||
//fmt.Printf("%s\n", string(msgJSON))
|
||||
|
||||
err = redisClient.Set(ctx, message.Id, msgJSON, 20*time.Second).Err()
|
||||
if err != nil {
|
||||
log.Println("Failed to store payload in Redis:", err)
|
||||
return
|
||||
}
|
||||
|
||||
/*for client := range s.clients {
|
||||
err = client.conn.WriteMessage(websocket.TextMessage, msgJSON)
|
||||
fmt.Printf("Message status %s\n", message.Status)
|
||||
if message.Status == "expired" {
|
||||
err = redisClient.Del(ctx, message.Id).Err()
|
||||
if err != nil {
|
||||
fmt.Printf("Error writing message: %v\n", err)
|
||||
client.conn.Close()
|
||||
delete(s.clients, client)
|
||||
fmt.Printf("Error deleting message: %v\n", err)
|
||||
}
|
||||
}*/
|
||||
broadcastAllRecords(s)
|
||||
broadcastExpiredRecord(s, message.Id)
|
||||
} else {
|
||||
err = redisClient.Set(ctx, message.Id, msgJSON, 20*time.Second).Err()
|
||||
if err != nil {
|
||||
log.Println("Failed to store payload in Redis:", err)
|
||||
return
|
||||
}
|
||||
|
||||
/*for client := range s.clients {
|
||||
err = client.conn.WriteMessage(websocket.TextMessage, msgJSON)
|
||||
if err != nil {
|
||||
fmt.Printf("Error writing message: %v\n", err)
|
||||
client.conn.Close()
|
||||
delete(s.clients, client)
|
||||
}
|
||||
}*/
|
||||
//
|
||||
broadcastAllRecords(s)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
@ -184,11 +197,16 @@ func broadcastAllRecords(s *Server) {
|
||||
log.Println("Error fetching all records:", err)
|
||||
}
|
||||
if len(allRecords) > 0 {
|
||||
for _, message := range allRecords {
|
||||
fmt.Printf("Broadcasting %s to %d clients\n", string(message), len(s.clients))
|
||||
var message Message
|
||||
for _, msgContent := range allRecords {
|
||||
err = json.Unmarshal([]byte(msgContent), &message)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to marshal JSON due to %s", err)
|
||||
}
|
||||
fmt.Printf("Broadcasting %s,%s,%s,%s to %d clients\n", message.Id, message.Status, message.Name, message.Timestamp, len(s.clients))
|
||||
|
||||
for client := range s.clients {
|
||||
if err := client.conn.WriteMessage(websocket.TextMessage, []byte(message)); err != nil {
|
||||
if err := client.conn.WriteMessage(websocket.TextMessage, []byte(msgContent)); err != nil {
|
||||
log.Println("Failed to broadcast update:", err)
|
||||
}
|
||||
}
|
||||
@ -218,9 +236,14 @@ func broadcastAllRecordsToClient(c *Client) {
|
||||
log.Println("Error fetching all records:", err)
|
||||
}
|
||||
fmt.Printf("Broadcasting %d records to client\n", len(allRecords))
|
||||
for _, message := range allRecords {
|
||||
fmt.Printf("Broadcasting %s\n", string(message))
|
||||
if err := c.conn.WriteMessage(websocket.TextMessage, []byte(message)); err != nil {
|
||||
var message Message
|
||||
for _, msgContent := range allRecords {
|
||||
err = json.Unmarshal([]byte(msgContent), &message)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to marshal JSON due to %s", err)
|
||||
}
|
||||
fmt.Printf("Broadcasting %s,%s,%s,%s\n", message.Id, message.Status, message.Name, message.Timestamp)
|
||||
if err := c.conn.WriteMessage(websocket.TextMessage, []byte(msgContent)); err != nil {
|
||||
log.Println("Failed to broadcast update:", err)
|
||||
}
|
||||
}
|
||||
@ -266,8 +289,7 @@ func fetchAllRecords() (map[string]string, error) {
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
redisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex")
|
||||
initRedis(*loadConfig())
|
||||
|
||||
server := newServer()
|
||||
http.HandleFunc("/ws", server.handleConnections)
|
||||
|
Loading…
Reference in New Issue
Block a user