diff --git a/lib/sse.go b/lib/sse.go index 16eb0a1..f848311 100644 --- a/lib/sse.go +++ b/lib/sse.go @@ -58,27 +58,26 @@ func (s *SSEServerType) ClientCount(channel string) int { } func SendSSE(ctx context.Context, messageBroker pubsub.PubSub, channel string, message string) error { - // Create a channel to receive an error from the goroutine + log.Printf("Sending SSE message to channel %s", channel) errCh := make(chan error, 1) - // Use a goroutine to send the message asynchronously go func() { select { case <-ctx.Done(): - // The client has disconnected, so return an error errCh <- ctx.Err() default: err := messageBroker.PublishToChannel(channel, message) - errCh <- err // Send the error to the channel + errCh <- err } }() - // Wait for the goroutine to finish and check for errors err := <-errCh if err != nil { + log.Printf("Error sending SSE message: %v", err) return err } + log.Println("SSE message sent successfully") return nil } @@ -89,6 +88,11 @@ func SetSSEHeaders(c echo.Context) { } func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client chan string) { + if c.Response().Writer == nil { + log.Println("Cannot handle incoming messages: ResponseWriter is nil") + return + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -113,12 +117,11 @@ func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client data := fmt.Sprintf("data: %s\n\n", msg.Payload) mutex.Lock() - defer mutex.Unlock() - if c.Response().Writer != nil { _, err = c.Response().Write([]byte(data)) if err != nil { log.Printf("Failed to write message: %v", err) + mutex.Unlock() return } @@ -130,8 +133,8 @@ func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client } } else { log.Println("Failed to write: ResponseWriter is nil") - return } + mutex.Unlock() } } }