package main import ( "context" "encoding/json" "errors" "fmt" "log" "net/http" "os" "sync" "time" "github.com/go-redis/redis/v8" "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/spf13/viper" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) var ( upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } redisClient *redis.Client ctx = context.Background() ) type Message struct { Id string `json:"Id"` Name string `json:"Name"` Image string `json:"Image"` Status string `json:"Status"` Theme string `json:"Theme"` Timestamp string `json:"Timestamp"` } type Config struct { Redis struct { Host string `yaml:"host"` Port string `yaml:"port"` } Mongo struct { Host string `yaml:"host"` Port string `yaml:"port"` User string `yaml:"user"` Pass string `yaml:"pass"` } } type Client struct { conn *websocket.Conn mu sync.Mutex } type Server struct { clients map[*Client]bool broadcast chan Message mu sync.Mutex } type MongoConn struct { conn *mongo.Client } func getConfig() (Config, error) { viper.SetConfigName("config") // name of config file (without extension) viper.SetConfigType("yaml") viper.AddConfigPath(".") // optionally look for config in the working directory err := viper.ReadInConfig() // Find and read the config file if err != nil { // Handle errors reading the config file panic(fmt.Errorf("fatal error config file: %w", err)) } var config Config err = viper.Unmarshal(&config) if err != nil { log.Fatalf("unable to decode into struct, %v", err) } return config, nil } func loadConfig() Config { cfg, err := getConfig() if err != nil { log.Fatal(err) } return cfg } func initRedis(cfg Config) { redisClient = redis.NewClient(&redis.Options{ Addr: cfg.Redis.Host + ":" + cfg.Redis.Port, }) _, err := redisClient.Do(context.Background(), "CONFIG", "SET", "notify-keyspace-events", "KEA").Result() if err != nil { fmt.Printf("unable to set keyspace events %v", err.Error()) os.Exit(1) } redisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex") } func newServer() *Server { return &Server{ clients: make(map[*Client]bool), broadcast: make(chan Message), } } func newMongo(cfg Config) *MongoConn { clientOpts := options.Client().ApplyURI("mongodb://" + cfg.Mongo.Host + ":" + cfg.Mongo.Port) if !(cfg.Mongo.User == "" && cfg.Mongo.Pass == "") { log.Println("WARNING: MongoDB user/pass is set") credential := options.Credential{ Username: cfg.Mongo.User, Password: cfg.Mongo.Pass, } clientOpts.SetAuth(credential) } client, err := mongo.Connect(ctx, clientOpts) //mongoClient, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017")) if err != nil { log.Fatal(err) } return &MongoConn{ conn: client, } } func (s *Server) addClient(client *Client) { s.mu.Lock() defer s.mu.Unlock() s.clients[client] = true sendAllRecordsToClient(client) } func (s *Server) removeClient(client *Client) { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.clients[client]; ok { delete(s.clients, client) err := client.conn.Close() if err != nil { return } } } func (s *Server) handleConnections(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { fmt.Printf("Error upgrading connection: %v\n", err) return } client := &Client{conn: conn} s.addClient(client) defer s.removeClient(client) for { _, msgContent, err := conn.ReadMessage() if err != nil { fmt.Printf("Error reading message: %v\n", err) break } var message Message err = json.Unmarshal(msgContent, &message) if err != nil { log.Fatalf("Unable to marshal JSON due to %s", err) } fmt.Printf("Broadcasting ws message %s to %d clients\n", message, len(s.clients)) s.broadcast <- message } } func broadcastSingleRecord(s *Server, message Message) { for client := range s.clients { sendMessageToClient(client, message) } } func sendMessageToClient(client *Client, message Message) { var msgJSON, err = json.Marshal(message) if err != nil { log.Println("Error marshalling json:", err) return } go sendToClient(client, msgJSON) } func sendToClient(client *Client, msgJSON []byte) { client.mu.Lock() defer client.mu.Unlock() if err := client.conn.WriteMessage(websocket.TextMessage, msgJSON); err != nil { log.Println("Failed to send message to client:", err) } } func sendAllRecordsToClient(client *Client) { allRecords, err := fetchAllRecords() if err != nil { log.Println("Error fetching all records:", err) } fmt.Printf("Broadcasting %d records to client\n", len(allRecords)) var message Message for _, msgContent := range allRecords { err = json.Unmarshal([]byte(msgContent), &message) if err != nil { log.Println("Unable to marshal JSON due to: ", err) } fmt.Printf("Broadcasting %s,%s,%s,%s,%s\n", message.Id, message.Status, message.Name, message.Theme, message.Timestamp) go sendToClient(client, []byte(msgContent)) } } func (s *Server) listenForExpirationEvents() { ps := redisClient.PSubscribe(ctx, "__keyevent@0__:expired") defer func(ps *redis.PubSub) { err := ps.Close() if err != nil { log.Println("Error closing redis pubsub:", err) } }(ps) for msg := range ps.Channel() { // Extract expired payload ID expiredID := msg.Payload fmt.Println(expiredID) // Broadcast expiration event var message = Message{Id: expiredID, Name: "", Image: "", Status: "removed", Theme: "", Timestamp: ""} fmt.Printf("Broadcasting expiration: %s to %d clients\n", message, len(s.clients)) broadcastSingleRecord(s, message) } } func fetchAllRecords() (map[string]string, error) { allKeys, err := redisClient.Keys(ctx, "*").Result() if err != nil { return nil, fmt.Errorf("could not fetch keys: %v", err) } records := make(map[string]string) for _, key := range allKeys { value, err := redisClient.Get(ctx, key).Result() if errors.Is(err, redis.Nil) { continue // skip if key does not exist } else if err != nil { return nil, fmt.Errorf("could not fetch value for key %s: %v", key, err) } records[key] = value } return records, nil } func getState(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed) return } fmt.Println("Checking Status") var request struct { Id string `json:"Id"` } err := json.NewDecoder(r.Body).Decode(&request) if err != nil { fmt.Println("Invalid JSON format") fmt.Println(r.Body) http.Error(w, "Invalid JSON format", http.StatusBadRequest) return } fmt.Println(request) if request.Id == "" { fmt.Println("Missing or empty Id field") http.Error(w, "Missing or empty Id field", http.StatusBadRequest) return } fmt.Println(request.Id) value, err := redisClient.Get(ctx, request.Id).Result() if errors.Is(err, redis.Nil) { message := Message{Id: request.Id, Status: "none", Timestamp: time.Now().Format(time.RFC3339)} w.Header().Set("Content-Type", "application/json") err := json.NewEncoder(w).Encode(message) if err != nil { return } return } else if err != nil { log.Printf("Redis error: %v", err) http.Error(w, "Error connecting to Redis", http.StatusInternalServerError) return } var message Message err = json.Unmarshal([]byte(value), &message) if err != nil { log.Printf("Failed to decode Redis data for Id: %s - %v", request.Id, err) http.Error(w, "Failed to parse data", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") err = json.NewEncoder(w).Encode(message) if err != nil { http.Error(w, "Failed to encode response", http.StatusInternalServerError) } } func setState(w http.ResponseWriter, r *http.Request, s *Server) { if r.Method != http.MethodPost { http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed) return } var message Message err := json.NewDecoder(r.Body).Decode(&message) if err != nil || message.Id == "" || message.Status == "" { http.Error(w, "Invalid request format", http.StatusBadRequest) return } if message.Status == "none" { if err := redisClient.Del(ctx, message.Id).Err(); err != nil { log.Printf("Error deleting key from Redis: %v", err) http.Error(w, "Failed to delete key from Redis", http.StatusInternalServerError) return } fmt.Printf("Broadcasting removal: %s,%s,%s,%s to %d clients\n", message.Id, message.Status, message.Name, message.Timestamp, len(s.clients)) } else { msgJSON, err := json.Marshal(message) if err != nil { http.Error(w, "Failed to encode message", http.StatusInternalServerError) return } timeout := 60 * time.Minute err = redisClient.Set(ctx, message.Id, msgJSON, timeout).Err() if err != nil { log.Printf("Failed to set key in Redis: %v", err) http.Error(w, "Failed to store data", http.StatusInternalServerError) return } fmt.Printf("Broadcasting message: %s,%s,%s,%s,%s to %d clients\n", message.Id, message.Status, message.Name, message.Theme, message.Timestamp, len(s.clients)) } broadcastSingleRecord(s, message) w.WriteHeader(http.StatusOK) } func getLocByZip(w http.ResponseWriter, r *http.Request, mongo *MongoConn) { var request struct { Zip string `json:"Zip"` } err := json.NewDecoder(r.Body).Decode(&request) if err != nil { http.Error(w, "Invalid JSON format", http.StatusBadRequest) } if r.Method != http.MethodPost { http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed) return } fmt.Println(request) if request.Zip == "" { fmt.Println("Missing or empty zip field") http.Error(w, "Missing or empty zip field", http.StatusBadRequest) return } var result struct { Location struct { Coordinates []float64 `bson:"coordinates"` } `bson:"location"` } err = mongo.conn.Database("geodb").Collection("zipcodes"). FindOne(ctx, bson.M{"zip": request.Zip}).Decode(&result) if err != nil { http.Error(w, "Failed to find zip field", http.StatusInternalServerError) } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) err = json.NewEncoder(w).Encode(result.Location.Coordinates) if err != nil { http.Error(w, "Failed to encode response", http.StatusInternalServerError) } } func parkLookup(w http.ResponseWriter, r *http.Request, mongo *MongoConn) { var request struct { Lat float32 `json:"Lat"` Lon float32 `json:"Lon"` Dist float32 `json:"Dist"` } if json.NewDecoder(r.Body).Decode(&request) != nil { http.Error(w, "Cannot decode request", http.StatusBadRequest) return } if r.Method != http.MethodPost { http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed) return } log.Println(request.Lat, request.Lon, request.Dist) cursor, err := mongo.conn.Database("geodb").Collection("parks"). Find(ctx, bson.D{{"location", bson.D{{"$near", bson.D{{"$geometry", bson.D{ {"type", "Point"}, {"coordinates", bson.A{request.Lat, request.Lon}}}}, {"$minDistance", 0}, {"$maxDistance", request.Dist * 1609.344}}}}}}) if err != nil || cursor.Err() != nil { http.Error(w, "Failed to find parks", http.StatusInternalServerError) } type Result struct { Id primitive.ObjectID `bson:"_id"` Name string `json:"name"` Address string `json:"address"` City string `json:"city"` State string `json:"state"` Zip string `json:"zip"` Location struct { Coordinates []float64 `json:"coordinates"` } `json:"location"` } var totalResult []Result if cursor != nil && cursor.RemainingBatchLength() > 0 { log.Printf("Found %d parks", cursor.RemainingBatchLength()) for cursor.Next(ctx) { var result Result //log.Printf(cursor.Current.String()) if err := cursor.Decode(&result); err != nil { log.Fatal(err) } fmt.Printf("%+v\n", result) totalResult = append(totalResult, result) } if err := cursor.Err(); err != nil { log.Fatal(err) } } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) err = json.NewEncoder(w).Encode(totalResult) if err != nil { http.Error(w, "Failed to encode response", http.StatusInternalServerError) } } func enableCORS(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") // Allow all origins w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") // Allow preflight requests for the OPTIONS method if r.Method == http.MethodOptions { w.WriteHeader(http.StatusOK) return } next.ServeHTTP(w, r) }) } func main() { config := loadConfig() initRedis(config) mongoConn := newMongo(config) server := newServer() router := mux.NewRouter() // Register routes on the mux router router.HandleFunc("/ws", server.handleConnections).Methods("GET") router.HandleFunc("/get", getState).Methods("POST") router.HandleFunc("/set", func(w http.ResponseWriter, r *http.Request) { setState(w, r, server) }).Methods("POST") router.HandleFunc("/zipLookup", func(w http.ResponseWriter, r *http.Request) { getLocByZip(w, r, mongoConn) }).Methods("POST") router.HandleFunc("/parkLookup", func(w http.ResponseWriter, r *http.Request) { parkLookup(w, r, mongoConn) }).Methods("POST") corsRouter := enableCORS(router) // Start server and other necessary goroutines go server.listenForExpirationEvents() // Pass the mux router to ListenAndServe fmt.Println("Server started on :8080") err := http.ListenAndServe(":8080", corsRouter) if err != nil { fmt.Printf("Server error: %v\n", err) } }