package api import ( "fmt" "net/http" "time" "atri.dad/lib" "github.com/labstack/echo/v4" ) // SSE godoc // @Summary Server-Sent Events endpoint // @Description Establishes a Server-Sent Events connection // @Tags sse // @Accept json // @Produce text/event-stream // @Param channel query string false "Channel name" // @Success 200 {string} string "Event stream" // @Router /sse [get] func SSE(c echo.Context) error { channel := c.QueryParam("channel") if channel == "" { channel = "default" } // Use the request context, which is cancelled when the client disconnects ctx := c.Request().Context() // Set headers for SSE c.Response().Header().Set("Content-Type", "text/event-stream") c.Response().Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") c.Response().Header().Set("Connection", "keep-alive") c.Response().Header().Set("Pragma", "no-cache") c.Response().Header().Set("Expires", "0") c.Response().Header().Set("X-Accel-Buffering", "no") // Set CORS headers origin := c.Request().Header.Get("Origin") if origin == "https://atri.dad" || origin == "http://localhost:3000" { c.Response().Header().Set("Access-Control-Allow-Origin", origin) c.Response().Header().Set("Access-Control-Allow-Credentials", "true") } else { // Allow any origin as fallback for testing c.Response().Header().Set("Access-Control-Allow-Origin", "*") } // Set response status c.Response().WriteHeader(http.StatusOK) c.Response().Flush() // Create a channel to receive messages clientChan := make(chan string) // Add the client to the SSEServer lib.SSEServer.AddClient(channel, clientChan) // Clean up when the connection is closed defer lib.SSEServer.RemoveClient(channel, clientChan) // Write the initial message initialMsg := ": connected\n\n" if _, err := c.Response().Write([]byte(initialMsg)); err != nil { return err } c.Response().Flush() // Create a ticker for keep-alive messages ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() // Event loop for { select { case <-ctx.Done(): // Client disconnected return nil case <-ticker.C: // Send keep-alive comment if _, err := c.Response().Write([]byte(": ping\n\n")); err != nil { return err } c.Response().Flush() case msg := <-clientChan: // Format the SSE message data := fmt.Sprintf("data: %s\n\n", msg) if _, err := c.Response().Write([]byte(data)); err != nil { return err } c.Response().Flush() } } }