From 53e2254795bb11292f55049e22c09c8bd56aa0f2 Mon Sep 17 00:00:00 2001 From: atridadl Date: Tue, 6 Feb 2024 18:44:45 -0700 Subject: [PATCH] Added coloured logs and fixed a concurrency bug in the localpubsub adapter --- api/sse.go | 3 ++- api/ssedemosend.go | 3 ++- go.mod | 1 + go.sum | 2 ++ lib/localpubsub.go | 33 +++++++++++++------------- lib/logging.go | 27 +++++++++++++++++++++ lib/{pubsub.go => pubsub/interface.go} | 2 +- lib/redis.go | 7 +++--- lib/sse.go | 5 ++-- main.go | 3 ++- 10 files changed, 61 insertions(+), 25 deletions(-) create mode 100644 lib/logging.go rename lib/{pubsub.go => pubsub/interface.go} (95%) diff --git a/api/sse.go b/api/sse.go index 00eac85..e135a6b 100644 --- a/api/sse.go +++ b/api/sse.go @@ -7,9 +7,10 @@ import ( "github.com/labstack/echo/v4" "goth.stack/lib" + "goth.stack/lib/pubsub" ) -func SSE(c echo.Context, pubSub lib.PubSub) error { +func SSE(c echo.Context, pubSub pubsub.PubSub) error { if pubSub == nil { return errors.New("pubSub is nil") } diff --git a/api/ssedemosend.go b/api/ssedemosend.go index db4139f..9886434 100644 --- a/api/ssedemosend.go +++ b/api/ssedemosend.go @@ -5,9 +5,10 @@ import ( "github.com/labstack/echo/v4" "goth.stack/lib" + "goth.stack/lib/pubsub" ) -func SSEDemoSend(c echo.Context, pubSub lib.PubSub) error { +func SSEDemoSend(c echo.Context, pubSub pubsub.PubSub) error { channel := c.QueryParam("channel") if channel == "" { channel = "default" diff --git a/go.mod b/go.mod index 814988c..d21c971 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( require ( github.com/alecthomas/assert/v2 v2.4.1 + github.com/fatih/color v1.16.0 github.com/go-redis/redismock/v9 v9.2.0 github.com/joho/godotenv v1.5.1 github.com/labstack/echo/v4 v4.11.4 diff --git a/go.sum b/go.sum index 57a997f..ef9d6ad 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55k github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0= github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw= diff --git a/lib/localpubsub.go b/lib/localpubsub.go index fb54eaf..c3f8b9d 100644 --- a/lib/localpubsub.go +++ b/lib/localpubsub.go @@ -2,53 +2,54 @@ package lib import ( "context" - "log" "sync" "time" + + "goth.stack/lib/pubsub" ) type LocalPubSub struct { - subscribers map[string][]chan Message + subscribers map[string][]chan pubsub.Message lock sync.RWMutex } type LocalPubSubMessage struct { - messages <-chan Message + messages <-chan pubsub.Message } -func (ps *LocalPubSub) SubscribeToChannel(channel string) (PubSubMessage, error) { +func (ps *LocalPubSub) SubscribeToChannel(channel string) (pubsub.PubSubMessage, error) { ps.lock.Lock() defer ps.lock.Unlock() if ps.subscribers == nil { - ps.subscribers = make(map[string][]chan Message) + ps.subscribers = make(map[string][]chan pubsub.Message) } - ch := make(chan Message, 100) + ch := make(chan pubsub.Message, 100) ps.subscribers[channel] = append(ps.subscribers[channel], ch) - log.Printf("Subscribed to channel %s", channel) + LogInfo.Printf("[PUBSUB/LOCAL] Subscribed to channel %s", channel) return &LocalPubSubMessage{messages: ch}, nil } func (ps *LocalPubSub) PublishToChannel(channel string, message string) error { - ps.lock.RLock() - defer ps.lock.RUnlock() + ps.lock.Lock() // Changed from RLock to Lock + defer ps.lock.Unlock() // Changed from RUnlock to Unlock if subscribers, ok := ps.subscribers[channel]; ok { - log.Printf("Publishing message to channel %s: %s", channel, message) + LogInfo.Printf("[PUBSUB/LOCAL] Publishing message to channel %s: %s", channel, message) for _, ch := range subscribers { - ch <- Message{Payload: message} + ch <- pubsub.Message{Payload: message} } } else { - log.Printf("No subscribers for channel %s", channel) + LogWarning.Printf("[PUBSUB/LOCAL] No subscribers for channel %s", channel) } return nil } -func (m *LocalPubSubMessage) ReceiveMessage(ctx context.Context) (*Message, error) { +func (m *LocalPubSubMessage) ReceiveMessage(ctx context.Context) (*pubsub.Message, error) { for { select { case <-ctx.Done(): @@ -56,16 +57,16 @@ func (m *LocalPubSubMessage) ReceiveMessage(ctx context.Context) (*Message, erro return nil, ctx.Err() case msg := <-m.messages: // A message has been received. Send it to the client. - log.Printf("Received message: %s", msg.Payload) + LogInfo.Printf("[PUBSUB/LOCAL] Received message: %s", msg.Payload) return &msg, nil case <-time.After(30 * time.Second): // No message has been received for 30 seconds. Send a keep-alive message. - return &Message{Payload: "keep-alive"}, nil + return &pubsub.Message{Payload: "keep-alive"}, nil } } } -func (ps *LocalPubSub) UnsubscribeFromChannel(channel string, ch <-chan Message) { +func (ps *LocalPubSub) UnsubscribeFromChannel(channel string, ch <-chan pubsub.Message) { ps.lock.Lock() defer ps.lock.Unlock() diff --git a/lib/logging.go b/lib/logging.go new file mode 100644 index 0000000..4744899 --- /dev/null +++ b/lib/logging.go @@ -0,0 +1,27 @@ +package lib + +import "github.com/fatih/color" + +// Error logging +var red = color.New(color.FgRed) +var LogError = red.Add(color.Bold) + +// Info logging +var cyan = color.New(color.FgCyan) +var LogInfo = cyan.Add(color.Bold) + +// Success logging +var green = color.New(color.FgGreen) +var LogSuccess = green.Add(color.Bold) + +// Warning logging +var yellow = color.New(color.FgYellow) +var LogWarning = yellow.Add(color.Bold) + +// Debug logging +var magenta = color.New(color.FgMagenta) +var LogDebug = magenta.Add(color.Bold) + +// Custom logging +var white = color.New(color.FgWhite) +var LogCustom = white.Add(color.Bold) diff --git a/lib/pubsub.go b/lib/pubsub/interface.go similarity index 95% rename from lib/pubsub.go rename to lib/pubsub/interface.go index d91efde..b43da28 100644 --- a/lib/pubsub.go +++ b/lib/pubsub/interface.go @@ -1,4 +1,4 @@ -package lib +package pubsub import "context" diff --git a/lib/redis.go b/lib/redis.go index b32bda3..71a6e84 100644 --- a/lib/redis.go +++ b/lib/redis.go @@ -7,6 +7,7 @@ import ( "github.com/joho/godotenv" "github.com/redis/go-redis/v9" + "goth.stack/lib/pubsub" ) var RedisClient *redis.Client @@ -39,16 +40,16 @@ func NewRedisClient() *redis.Client { return RedisClient } -func (m *RedisPubSubMessage) ReceiveMessage(ctx context.Context) (*Message, error) { +func (m *RedisPubSubMessage) ReceiveMessage(ctx context.Context) (*pubsub.Message, error) { msg, err := m.pubsub.ReceiveMessage(ctx) if err != nil { return nil, err } - return &Message{Payload: msg.Payload}, nil + return &pubsub.Message{Payload: msg.Payload}, nil } -func (ps *RedisPubSub) SubscribeToChannel(channel string) (PubSubMessage, error) { +func (ps *RedisPubSub) SubscribeToChannel(channel string) (pubsub.PubSubMessage, error) { pubsub := ps.Client.Subscribe(context.Background(), channel) _, err := pubsub.Receive(context.Background()) if err != nil { diff --git a/lib/sse.go b/lib/sse.go index 544ba66..5b4d485 100644 --- a/lib/sse.go +++ b/lib/sse.go @@ -9,6 +9,7 @@ import ( "time" "github.com/labstack/echo/v4" + "goth.stack/lib/pubsub" ) type SSEServerType struct { @@ -58,7 +59,7 @@ func (s *SSEServerType) ClientCount(channel string) int { return len(s.clients[channel]) } -func SendSSE(ctx context.Context, messageBroker PubSub, channel string, message string) error { +func SendSSE(ctx context.Context, messageBroker pubsub.PubSub, channel string, message string) error { // Create a channel to receive an error from the goroutine errCh := make(chan error, 1) @@ -102,7 +103,7 @@ func CreateTickerAndKeepAlive(c echo.Context, duration time.Duration) *time.Tick return ticker } -func HandleIncomingMessages(c echo.Context, pubsub PubSubMessage, client chan string) { +func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client chan string) { for { select { case <-c.Request().Context().Done(): diff --git a/main.go b/main.go index 69fdeeb..c259291 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( "goth.stack/api" "goth.stack/lib" + "goth.stack/lib/pubsub" "goth.stack/pages" ) @@ -27,7 +28,7 @@ func main() { _, err := lib.RedisClient.Ping(context.Background()).Result() // Initialize pubsub - var pubSub lib.PubSub + var pubSub pubsub.PubSub if err != nil { log.Printf("Failed to connect to Redis: %v", err) log.Println("Falling back to LocalPubSub")