Template
1
0
Fork 0

No really... this time for real now.

This commit is contained in:
Atridad Lahiji 2024-05-09 14:32:19 -06:00
parent 3374ed3bac
commit 44df2533c8
No known key found for this signature in database

View file

@ -98,28 +98,40 @@ func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client
var mutex sync.Mutex
for {
// Receive messages using the context of the request, which is cancelled when the client disconnects
msg, err := pubsub.ReceiveMessage(c.Request().Context())
if err != nil {
if err == context.Canceled {
// The client has disconnected. Stop trying to send messages.
// Log when the client disconnects and stop the message forwarding
LogInfo.Printf("Client disconnected, stopping message forwarding")
return
}
// Log errors other than client disconnection
LogError.Printf("Failed to receive message: %v", err)
return
}
// Skip processing if the message payload is empty
if msg.Payload == "" {
LogInfo.Printf("Received empty message, skipping processing")
continue
}
// Prepare the data string to be sent as an SSE
data := fmt.Sprintf("data: %s\n\n", msg.Payload)
// Locking before writing to the response writer to avoid concurrent write issues
mutex.Lock()
if c.Response().Writer != nil {
_, err := c.Response().Write([]byte(data))
if err != nil {
// Log failure to write and unlock before returning
LogError.Printf("Failed to write message: %v", err)
mutex.Unlock()
return
}
// Flush the response if possible
flusher, ok := c.Response().Writer.(http.Flusher)
if ok {
flusher.Flush()
@ -129,6 +141,7 @@ func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client
} else {
LogError.Println("Failed to write: ResponseWriter is nil")
}
// Ensure the mutex is unlocked after processing each message
mutex.Unlock()
}
}