Template
1
0
Fork 0

Fixed SSE

This commit is contained in:
Atridad Lahiji 2024-05-14 22:03:39 -06:00
parent 6f47a15d29
commit cd3a539243
No known key found for this signature in database

View file

@ -99,34 +99,27 @@ func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client
var mutex sync.Mutex
for {
// Receive messages using the context of the request, which is cancelled when the client disconnects
msg, err := pubsub.ReceiveMessage(c.Request().Context())
if err != nil {
if err == context.Canceled {
// Log when the client disconnects and stop the message forwarding
LogInfo.Printf("Client disconnected, stopping message forwarding")
// The client has disconnected. Stop trying to send messages.
return
}
// Log errors other than client disconnection
LogError.Printf("Failed to receive message: %v", err)
return
}
// Prepare the data string to be sent as an SSE
data := fmt.Sprintf("data: %s\n\n", msg.Payload)
// Locking before writing to the response writer to avoid concurrent write issues
mutex.Lock()
if c.Response().Writer != nil {
_, err := c.Response().Write([]byte(data))
if err != nil {
// Log failure to write and unlock before returning
LogError.Printf("Failed to write message: %v", err)
mutex.Unlock()
return
}
// Flush the response if possible
flusher, ok := c.Response().Writer.(http.Flusher)
if ok {
flusher.Flush()
@ -136,7 +129,5 @@ func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client
} else {
LogError.Println("Failed to write: ResponseWriter is nil")
}
// Ensure the mutex is unlocked after processing each message
mutex.Unlock()
}
}