From 44df2533c8da76a20108aa72cd5170b13811f271 Mon Sep 17 00:00:00 2001 From: Atridad Lahiji Date: Thu, 9 May 2024 14:32:19 -0600 Subject: [PATCH] No really... this time for real now. --- lib/sse.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/lib/sse.go b/lib/sse.go index 8f2a1e1..09f5b28 100644 --- a/lib/sse.go +++ b/lib/sse.go @@ -98,28 +98,40 @@ func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client var mutex sync.Mutex for { + // Receive messages using the context of the request, which is cancelled when the client disconnects msg, err := pubsub.ReceiveMessage(c.Request().Context()) if err != nil { if err == context.Canceled { - // The client has disconnected. Stop trying to send messages. + // Log when the client disconnects and stop the message forwarding LogInfo.Printf("Client disconnected, stopping message forwarding") return } + // Log errors other than client disconnection LogError.Printf("Failed to receive message: %v", err) return } + // Skip processing if the message payload is empty + if msg.Payload == "" { + LogInfo.Printf("Received empty message, skipping processing") + continue + } + + // Prepare the data string to be sent as an SSE data := fmt.Sprintf("data: %s\n\n", msg.Payload) + // Locking before writing to the response writer to avoid concurrent write issues mutex.Lock() if c.Response().Writer != nil { _, err := c.Response().Write([]byte(data)) if err != nil { + // Log failure to write and unlock before returning LogError.Printf("Failed to write message: %v", err) mutex.Unlock() return } + // Flush the response if possible flusher, ok := c.Response().Writer.(http.Flusher) if ok { flusher.Flush() @@ -129,6 +141,7 @@ func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client } else { LogError.Println("Failed to write: ResponseWriter is nil") } + // Ensure the mutex is unlocked after processing each message mutex.Unlock() } }