This commit is contained in:
commit
75776d6ba3
43
.gitea/workflows/build.yaml
Normal file
43
.gitea/workflows/build.yaml
Normal file
@ -0,0 +1,43 @@
|
||||
name: Build Pogdark API
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
pull_request:
|
||||
branches:
|
||||
- master
|
||||
|
||||
jobs:
|
||||
build:
|
||||
name: Build Pogdark API
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version: '1.22' # Specify the desired Go version
|
||||
|
||||
- name: Download Dependencies
|
||||
run: |
|
||||
go mod download
|
||||
|
||||
- name: Build Go Binary
|
||||
run: |
|
||||
go build -o app .
|
||||
|
||||
- name: List Binary Output
|
||||
run: |
|
||||
ls -l app
|
||||
|
||||
- name: Build Docker Image
|
||||
run: |
|
||||
docker build -t localhost:5000/pogdark-api:latest .
|
||||
|
||||
- name: Push Docker Image to Local Registry
|
||||
run: |
|
||||
docker push localhost:5000/pogdark-api:latest
|
14
go.mod
Normal file
14
go.mod
Normal file
@ -0,0 +1,14 @@
|
||||
module pogdark-server
|
||||
|
||||
go 1.22
|
||||
|
||||
require (
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
)
|
28
go.sum
Normal file
28
go.sum
Normal file
@ -0,0 +1,28 @@
|
||||
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
|
||||
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
|
||||
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
|
||||
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
|
||||
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
||||
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
|
||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
|
||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
243
main.go
Normal file
243
main.go
Normal file
@ -0,0 +1,243 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
var (
|
||||
upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true
|
||||
},
|
||||
}
|
||||
redisClient *redis.Client
|
||||
ctx = context.Background()
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
Id string
|
||||
Name string
|
||||
Image string
|
||||
Status string
|
||||
Timestamp string
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
conn *websocket.Conn
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
clients map[*Client]bool
|
||||
broadcast chan Message
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func init() {
|
||||
redisClient = redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
})
|
||||
_, err := redisClient.Do(context.Background(), "CONFIG", "SET", "notify-keyspace-events", "KEA").Result()
|
||||
if err != nil {
|
||||
fmt.Printf("unable to set keyspace events %v", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func newServer() *Server {
|
||||
return &Server{
|
||||
clients: make(map[*Client]bool),
|
||||
broadcast: make(chan Message),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) addClient(client *Client) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.clients[client] = true
|
||||
broadcastAllRecordsToClient(client)
|
||||
}
|
||||
|
||||
func (s *Server) removeClient(client *Client) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if _, ok := s.clients[client]; ok {
|
||||
delete(s.clients, client)
|
||||
err := client.conn.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleConnections(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
fmt.Printf("Error upgrading connection: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
client := &Client{conn: conn}
|
||||
s.addClient(client)
|
||||
|
||||
defer s.removeClient(client)
|
||||
|
||||
for {
|
||||
_, msgContent, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
fmt.Printf("Error reading message: %v\n", err)
|
||||
break
|
||||
}
|
||||
|
||||
var message Message
|
||||
err = json.Unmarshal(msgContent, &message)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to marshal JSON due to %s", err)
|
||||
}
|
||||
s.broadcast <- message
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleMessages() {
|
||||
for {
|
||||
message := <-s.broadcast
|
||||
s.mu.Lock()
|
||||
|
||||
msgJSON, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
fmt.Printf("Error marshalling message: %v\n", err)
|
||||
continue
|
||||
}
|
||||
//fmt.Printf("%s\n", string(msgJSON))
|
||||
|
||||
err = redisClient.Set(ctx, message.Id, msgJSON, 20*time.Second).Err()
|
||||
if err != nil {
|
||||
log.Println("Failed to store payload in Redis:", err)
|
||||
return
|
||||
}
|
||||
|
||||
/*for client := range s.clients {
|
||||
err = client.conn.WriteMessage(websocket.TextMessage, msgJSON)
|
||||
if err != nil {
|
||||
fmt.Printf("Error writing message: %v\n", err)
|
||||
client.conn.Close()
|
||||
delete(s.clients, client)
|
||||
}
|
||||
}*/
|
||||
broadcastAllRecords(s)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func broadcastAllRecords(s *Server) {
|
||||
allRecords, err := fetchAllRecords()
|
||||
if err != nil {
|
||||
log.Println("Error fetching all records:", err)
|
||||
}
|
||||
if len(allRecords) > 0 {
|
||||
for _, message := range allRecords {
|
||||
fmt.Printf("Broadcasting %s to %d clients\n", string(message), len(s.clients))
|
||||
|
||||
for client := range s.clients {
|
||||
if err := client.conn.WriteMessage(websocket.TextMessage, []byte(message)); err != nil {
|
||||
log.Println("Failed to broadcast update:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func broadcastExpiredRecord(s *Server, expired string) {
|
||||
var message Message = Message{Id: expired, Name: "", Image: "", Status: "expired", Timestamp: ""}
|
||||
var msgJSON, err = json.Marshal(message)
|
||||
if err != nil {
|
||||
log.Println("Error marshalling json:", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("Broadcasting expiration: %s\n", string(msgJSON))
|
||||
for client := range s.clients {
|
||||
if err := client.conn.WriteMessage(websocket.TextMessage, msgJSON); err != nil {
|
||||
log.Println("Failed to broadcast update:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func broadcastAllRecordsToClient(c *Client) {
|
||||
allRecords, err := fetchAllRecords()
|
||||
if err != nil {
|
||||
log.Println("Error fetching all records:", err)
|
||||
}
|
||||
fmt.Printf("Broadcasting %d records to client\n", len(allRecords))
|
||||
for _, message := range allRecords {
|
||||
fmt.Printf("Broadcasting %s\n", string(message))
|
||||
if err := c.conn.WriteMessage(websocket.TextMessage, []byte(message)); err != nil {
|
||||
log.Println("Failed to broadcast update:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) listenForExpirationEvents() {
|
||||
ps := redisClient.PSubscribe(ctx, "__keyevent@0__:expired")
|
||||
defer func(ps *redis.PubSub) {
|
||||
err := ps.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ps)
|
||||
|
||||
for msg := range ps.Channel() {
|
||||
// Extract expired payload ID
|
||||
expiredID := msg.Payload
|
||||
fmt.Println(expiredID)
|
||||
// Broadcast expiration event
|
||||
broadcastExpiredRecord(s, expiredID)
|
||||
fmt.Printf("Done Broadcasting Expiration\n")
|
||||
}
|
||||
}
|
||||
|
||||
func fetchAllRecords() (map[string]string, error) {
|
||||
allKeys, err := redisClient.Keys(ctx, "*").Result()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not fetch keys: %v", err)
|
||||
}
|
||||
|
||||
records := make(map[string]string)
|
||||
for _, key := range allKeys {
|
||||
value, err := redisClient.Get(ctx, key).Result()
|
||||
if errors.Is(err, redis.Nil) {
|
||||
continue // skip if key does not exist
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("could not fetch value for key %s: %v", key, err)
|
||||
}
|
||||
records[key] = value
|
||||
}
|
||||
|
||||
return records, nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
redisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex")
|
||||
|
||||
server := newServer()
|
||||
http.HandleFunc("/ws", server.handleConnections)
|
||||
go server.handleMessages()
|
||||
go server.listenForExpirationEvents()
|
||||
|
||||
fmt.Println("Server started on :8080")
|
||||
err := http.ListenAndServe(":8080", nil)
|
||||
if err != nil {
|
||||
fmt.Printf("Server error: %v\n", err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user