Added coloured logs and fixed a concurrency bug in the localpubsub adapter
This commit is contained in:
parent
e44c858ab3
commit
53e2254795
10 changed files with 61 additions and 25 deletions
|
@ -7,9 +7,10 @@ import (
|
||||||
|
|
||||||
"github.com/labstack/echo/v4"
|
"github.com/labstack/echo/v4"
|
||||||
"goth.stack/lib"
|
"goth.stack/lib"
|
||||||
|
"goth.stack/lib/pubsub"
|
||||||
)
|
)
|
||||||
|
|
||||||
func SSE(c echo.Context, pubSub lib.PubSub) error {
|
func SSE(c echo.Context, pubSub pubsub.PubSub) error {
|
||||||
if pubSub == nil {
|
if pubSub == nil {
|
||||||
return errors.New("pubSub is nil")
|
return errors.New("pubSub is nil")
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,9 +5,10 @@ import (
|
||||||
|
|
||||||
"github.com/labstack/echo/v4"
|
"github.com/labstack/echo/v4"
|
||||||
"goth.stack/lib"
|
"goth.stack/lib"
|
||||||
|
"goth.stack/lib/pubsub"
|
||||||
)
|
)
|
||||||
|
|
||||||
func SSEDemoSend(c echo.Context, pubSub lib.PubSub) error {
|
func SSEDemoSend(c echo.Context, pubSub pubsub.PubSub) error {
|
||||||
channel := c.QueryParam("channel")
|
channel := c.QueryParam("channel")
|
||||||
if channel == "" {
|
if channel == "" {
|
||||||
channel = "default"
|
channel = "default"
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -28,6 +28,7 @@ require (
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alecthomas/assert/v2 v2.4.1
|
github.com/alecthomas/assert/v2 v2.4.1
|
||||||
|
github.com/fatih/color v1.16.0
|
||||||
github.com/go-redis/redismock/v9 v9.2.0
|
github.com/go-redis/redismock/v9 v9.2.0
|
||||||
github.com/joho/godotenv v1.5.1
|
github.com/joho/godotenv v1.5.1
|
||||||
github.com/labstack/echo/v4 v4.11.4
|
github.com/labstack/echo/v4 v4.11.4
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -21,6 +21,8 @@ github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55k
|
||||||
github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
|
github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
|
||||||
github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0=
|
github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0=
|
||||||
github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
|
github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
|
||||||
|
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
|
||||||
|
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
|
||||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||||
github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw=
|
github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw=
|
||||||
|
|
|
@ -2,53 +2,54 @@ package lib
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"goth.stack/lib/pubsub"
|
||||||
)
|
)
|
||||||
|
|
||||||
type LocalPubSub struct {
|
type LocalPubSub struct {
|
||||||
subscribers map[string][]chan Message
|
subscribers map[string][]chan pubsub.Message
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type LocalPubSubMessage struct {
|
type LocalPubSubMessage struct {
|
||||||
messages <-chan Message
|
messages <-chan pubsub.Message
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *LocalPubSub) SubscribeToChannel(channel string) (PubSubMessage, error) {
|
func (ps *LocalPubSub) SubscribeToChannel(channel string) (pubsub.PubSubMessage, error) {
|
||||||
ps.lock.Lock()
|
ps.lock.Lock()
|
||||||
defer ps.lock.Unlock()
|
defer ps.lock.Unlock()
|
||||||
|
|
||||||
if ps.subscribers == nil {
|
if ps.subscribers == nil {
|
||||||
ps.subscribers = make(map[string][]chan Message)
|
ps.subscribers = make(map[string][]chan pubsub.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan Message, 100)
|
ch := make(chan pubsub.Message, 100)
|
||||||
ps.subscribers[channel] = append(ps.subscribers[channel], ch)
|
ps.subscribers[channel] = append(ps.subscribers[channel], ch)
|
||||||
|
|
||||||
log.Printf("Subscribed to channel %s", channel)
|
LogInfo.Printf("[PUBSUB/LOCAL] Subscribed to channel %s", channel)
|
||||||
|
|
||||||
return &LocalPubSubMessage{messages: ch}, nil
|
return &LocalPubSubMessage{messages: ch}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *LocalPubSub) PublishToChannel(channel string, message string) error {
|
func (ps *LocalPubSub) PublishToChannel(channel string, message string) error {
|
||||||
ps.lock.RLock()
|
ps.lock.Lock() // Changed from RLock to Lock
|
||||||
defer ps.lock.RUnlock()
|
defer ps.lock.Unlock() // Changed from RUnlock to Unlock
|
||||||
|
|
||||||
if subscribers, ok := ps.subscribers[channel]; ok {
|
if subscribers, ok := ps.subscribers[channel]; ok {
|
||||||
log.Printf("Publishing message to channel %s: %s", channel, message)
|
LogInfo.Printf("[PUBSUB/LOCAL] Publishing message to channel %s: %s", channel, message)
|
||||||
for _, ch := range subscribers {
|
for _, ch := range subscribers {
|
||||||
ch <- Message{Payload: message}
|
ch <- pubsub.Message{Payload: message}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Printf("No subscribers for channel %s", channel)
|
LogWarning.Printf("[PUBSUB/LOCAL] No subscribers for channel %s", channel)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LocalPubSubMessage) ReceiveMessage(ctx context.Context) (*Message, error) {
|
func (m *LocalPubSubMessage) ReceiveMessage(ctx context.Context) (*pubsub.Message, error) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -56,16 +57,16 @@ func (m *LocalPubSubMessage) ReceiveMessage(ctx context.Context) (*Message, erro
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
case msg := <-m.messages:
|
case msg := <-m.messages:
|
||||||
// A message has been received. Send it to the client.
|
// A message has been received. Send it to the client.
|
||||||
log.Printf("Received message: %s", msg.Payload)
|
LogInfo.Printf("[PUBSUB/LOCAL] Received message: %s", msg.Payload)
|
||||||
return &msg, nil
|
return &msg, nil
|
||||||
case <-time.After(30 * time.Second):
|
case <-time.After(30 * time.Second):
|
||||||
// No message has been received for 30 seconds. Send a keep-alive message.
|
// No message has been received for 30 seconds. Send a keep-alive message.
|
||||||
return &Message{Payload: "keep-alive"}, nil
|
return &pubsub.Message{Payload: "keep-alive"}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *LocalPubSub) UnsubscribeFromChannel(channel string, ch <-chan Message) {
|
func (ps *LocalPubSub) UnsubscribeFromChannel(channel string, ch <-chan pubsub.Message) {
|
||||||
ps.lock.Lock()
|
ps.lock.Lock()
|
||||||
defer ps.lock.Unlock()
|
defer ps.lock.Unlock()
|
||||||
|
|
||||||
|
|
27
lib/logging.go
Normal file
27
lib/logging.go
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
package lib
|
||||||
|
|
||||||
|
import "github.com/fatih/color"
|
||||||
|
|
||||||
|
// Error logging
|
||||||
|
var red = color.New(color.FgRed)
|
||||||
|
var LogError = red.Add(color.Bold)
|
||||||
|
|
||||||
|
// Info logging
|
||||||
|
var cyan = color.New(color.FgCyan)
|
||||||
|
var LogInfo = cyan.Add(color.Bold)
|
||||||
|
|
||||||
|
// Success logging
|
||||||
|
var green = color.New(color.FgGreen)
|
||||||
|
var LogSuccess = green.Add(color.Bold)
|
||||||
|
|
||||||
|
// Warning logging
|
||||||
|
var yellow = color.New(color.FgYellow)
|
||||||
|
var LogWarning = yellow.Add(color.Bold)
|
||||||
|
|
||||||
|
// Debug logging
|
||||||
|
var magenta = color.New(color.FgMagenta)
|
||||||
|
var LogDebug = magenta.Add(color.Bold)
|
||||||
|
|
||||||
|
// Custom logging
|
||||||
|
var white = color.New(color.FgWhite)
|
||||||
|
var LogCustom = white.Add(color.Bold)
|
|
@ -1,4 +1,4 @@
|
||||||
package lib
|
package pubsub
|
||||||
|
|
||||||
import "context"
|
import "context"
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
|
"goth.stack/lib/pubsub"
|
||||||
)
|
)
|
||||||
|
|
||||||
var RedisClient *redis.Client
|
var RedisClient *redis.Client
|
||||||
|
@ -39,16 +40,16 @@ func NewRedisClient() *redis.Client {
|
||||||
return RedisClient
|
return RedisClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *RedisPubSubMessage) ReceiveMessage(ctx context.Context) (*Message, error) {
|
func (m *RedisPubSubMessage) ReceiveMessage(ctx context.Context) (*pubsub.Message, error) {
|
||||||
msg, err := m.pubsub.ReceiveMessage(ctx)
|
msg, err := m.pubsub.ReceiveMessage(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Message{Payload: msg.Payload}, nil
|
return &pubsub.Message{Payload: msg.Payload}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *RedisPubSub) SubscribeToChannel(channel string) (PubSubMessage, error) {
|
func (ps *RedisPubSub) SubscribeToChannel(channel string) (pubsub.PubSubMessage, error) {
|
||||||
pubsub := ps.Client.Subscribe(context.Background(), channel)
|
pubsub := ps.Client.Subscribe(context.Background(), channel)
|
||||||
_, err := pubsub.Receive(context.Background())
|
_, err := pubsub.Receive(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/labstack/echo/v4"
|
"github.com/labstack/echo/v4"
|
||||||
|
"goth.stack/lib/pubsub"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SSEServerType struct {
|
type SSEServerType struct {
|
||||||
|
@ -58,7 +59,7 @@ func (s *SSEServerType) ClientCount(channel string) int {
|
||||||
return len(s.clients[channel])
|
return len(s.clients[channel])
|
||||||
}
|
}
|
||||||
|
|
||||||
func SendSSE(ctx context.Context, messageBroker PubSub, channel string, message string) error {
|
func SendSSE(ctx context.Context, messageBroker pubsub.PubSub, channel string, message string) error {
|
||||||
// Create a channel to receive an error from the goroutine
|
// Create a channel to receive an error from the goroutine
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
|
|
||||||
|
@ -102,7 +103,7 @@ func CreateTickerAndKeepAlive(c echo.Context, duration time.Duration) *time.Tick
|
||||||
return ticker
|
return ticker
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(c echo.Context, pubsub PubSubMessage, client chan string) {
|
func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client chan string) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.Request().Context().Done():
|
case <-c.Request().Context().Done():
|
||||||
|
|
3
main.go
3
main.go
|
@ -13,6 +13,7 @@ import (
|
||||||
|
|
||||||
"goth.stack/api"
|
"goth.stack/api"
|
||||||
"goth.stack/lib"
|
"goth.stack/lib"
|
||||||
|
"goth.stack/lib/pubsub"
|
||||||
"goth.stack/pages"
|
"goth.stack/pages"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -27,7 +28,7 @@ func main() {
|
||||||
_, err := lib.RedisClient.Ping(context.Background()).Result()
|
_, err := lib.RedisClient.Ping(context.Background()).Result()
|
||||||
|
|
||||||
// Initialize pubsub
|
// Initialize pubsub
|
||||||
var pubSub lib.PubSub
|
var pubSub pubsub.PubSub
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Failed to connect to Redis: %v", err)
|
log.Printf("Failed to connect to Redis: %v", err)
|
||||||
log.Println("Falling back to LocalPubSub")
|
log.Println("Falling back to LocalPubSub")
|
||||||
|
|
Loading…
Add table
Reference in a new issue