Template
1
0
Fork 0
goth.stack/lib/sse.go

135 lines
2.9 KiB
Go
Raw Normal View History

2023-05-18 20:04:55 -06:00
package lib
import (
"context"
"fmt"
"net/http"
"sync"
"atri.dad/lib/pubsub"
"github.com/labstack/echo/v4"
)
type SSEServerType struct {
clients map[string]map[chan string]bool
mu sync.Mutex
}
var SSEServer *SSEServerType
func init() {
SSEServer = &SSEServerType{
clients: make(map[string]map[chan string]bool),
}
}
func NewSSEServer() *SSEServerType {
return &SSEServerType{
clients: make(map[string]map[chan string]bool),
}
}
func (s *SSEServerType) AddClient(channel string, client chan string) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.clients[channel]; !ok {
s.clients[channel] = make(map[chan string]bool)
}
s.clients[channel][client] = true
}
func (s *SSEServerType) RemoveClient(channel string, client chan string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.clients[channel], client)
if len(s.clients[channel]) == 0 {
delete(s.clients, channel)
}
}
func (s *SSEServerType) ClientCount(channel string) int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.clients[channel])
}
func SendSSE(ctx context.Context, messageBroker pubsub.PubSub, channel string, message string) error {
2024-05-08 14:43:57 -06:00
LogInfo.Printf("Sending SSE message to channel %s", channel)
2023-05-18 20:04:55 -06:00
errCh := make(chan error, 1)
go func() {
select {
case <-ctx.Done():
errCh <- ctx.Err()
default:
err := messageBroker.PublishToChannel(channel, message)
2024-05-08 01:13:39 -06:00
errCh <- err
2023-05-18 20:04:55 -06:00
}
}()
err := <-errCh
if err != nil {
2024-05-08 14:43:57 -06:00
LogError.Printf("Error sending SSE message: %v", err)
2023-05-18 20:04:55 -06:00
return err
}
2024-05-08 14:43:57 -06:00
LogSuccess.Printf("SSE message sent successfully")
2023-05-18 20:04:55 -06:00
return nil
}
func SetSSEHeaders(c echo.Context) {
c.Response().Header().Set(echo.HeaderContentType, "text/event-stream")
c.Response().Header().Set(echo.HeaderConnection, "keep-alive")
c.Response().Header().Set(echo.HeaderCacheControl, "no-cache")
}
func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client chan string) {
2024-05-08 01:13:39 -06:00
if c.Response().Writer == nil {
2024-05-08 14:43:57 -06:00
LogError.Printf("Cannot handle incoming messages: ResponseWriter is nil")
2024-05-08 01:13:39 -06:00
return
}
2024-05-07 18:03:54 -06:00
var mutex sync.Mutex
2023-05-18 20:04:55 -06:00
for {
msg, err := pubsub.ReceiveMessage(c.Request().Context())
if err != nil {
if err == context.Canceled {
// The client has disconnected. Stop trying to send messages.
2024-05-09 01:28:11 -06:00
LogInfo.Printf("Client disconnected, stopping message forwarding")
return
}
LogError.Printf("Failed to receive message: %v", err)
2023-05-18 20:04:55 -06:00
return
}
data := fmt.Sprintf("data: %s\n\n", msg.Payload)
2024-05-08 14:43:57 -06:00
mutex.Lock()
if c.Response().Writer != nil {
_, err := c.Response().Write([]byte(data))
if err != nil {
LogError.Printf("Failed to write message: %v", err)
mutex.Unlock()
return
2023-05-18 20:04:55 -06:00
}
flusher, ok := c.Response().Writer.(http.Flusher)
if ok {
flusher.Flush()
2023-05-18 20:04:55 -06:00
} else {
LogError.Println("Failed to flush: ResponseWriter does not implement http.Flusher")
2023-05-18 20:04:55 -06:00
}
} else {
LogError.Println("Failed to write: ResponseWriter is nil")
2023-05-18 20:04:55 -06:00
}
mutex.Unlock()
2023-05-18 20:04:55 -06:00
}
}