package api import ( "fmt" "net/http" "time" "atri.dad/lib" "github.com/labstack/echo/v4" ) // SSE godoc // @Summary Server-Sent Events endpoint // @Description Establishes a Server-Sent Events connection // @Tags sse // @Accept json // @Produce text/event-stream // @Param channel query string false "Channel name" // @Success 200 {string} string "Event stream" // @Router /sse [get] func SSE(c echo.Context) error { channel := c.QueryParam("channel") if channel == "" { channel = "default" } // Use the request context, which is cancelled when the client disconnects ctx := c.Request().Context() // Standard SSE headers c.Response().Header().Set("Content-Type", "text/event-stream") c.Response().Header().Set("Cache-Control", "no-cache") c.Response().Header().Set("Connection", "keep-alive") c.Response().Header().Set("X-Accel-Buffering", "no") // CORS headers origin := c.Request().Header.Get("Origin") if origin == "https://atri.dad" || origin == "http://localhost:3000" { c.Response().Header().Set("Access-Control-Allow-Origin", origin) } else { c.Response().Header().Set("Access-Control-Allow-Origin", "*") } // Prepare the response c.Response().WriteHeader(http.StatusOK) c.Response().Flush() // Client channel for SSE messages clientChan := make(chan string) lib.SSEServer.AddClient(channel, clientChan) defer lib.SSEServer.RemoveClient(channel, clientChan) // Write initial message if _, err := c.Response().Write([]byte("data: Connected to SSE server\n\n")); err != nil { return err } c.Response().Flush() // Keep-alive ticker ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() // Event loop for { select { case <-ctx.Done(): return nil case <-ticker.C: // Send keep-alive comment if _, err := c.Response().Write([]byte(": ping\n\n")); err != nil { return err } c.Response().Flush() case msg := <-clientChan: // Format as standard SSE message data := fmt.Sprintf("data: %s\n\n", msg) if _, err := c.Response().Write([]byte(data)); err != nil { return err } c.Response().Flush() } } }