Simplified the SSE system!

This commit is contained in:
Atridad Lahiji 2024-06-04 15:40:29 -06:00
parent b1d82ca609
commit 924906431f
No known key found for this signature in database
10 changed files with 83 additions and 347 deletions

View file

@ -1,19 +1,14 @@
package api
import (
"errors"
"fmt"
"time"
"atri.dad/lib"
"atri.dad/lib/pubsub"
"github.com/labstack/echo/v4"
)
func SSE(c echo.Context, pubSub pubsub.PubSub) error {
if pubSub == nil {
return errors.New("pubSub is nil")
}
func SSE(c echo.Context) error {
channel := c.QueryParam("channel")
if channel == "" {
channel = "default"
@ -22,25 +17,44 @@ func SSE(c echo.Context, pubSub pubsub.PubSub) error {
// Use the request context, which is cancelled when the client disconnects
ctx := c.Request().Context()
pubsub, err := pubSub.SubscribeToChannel(channel)
if err != nil {
return fmt.Errorf("failed to subscribe to channel: %w", err)
}
c.Response().Header().Set(echo.HeaderContentType, "text/event-stream")
c.Response().Header().Set(echo.HeaderConnection, "keep-alive")
c.Response().Header().Set(echo.HeaderCacheControl, "no-cache")
lib.SetSSEHeaders(c)
// Create a channel to receive messages from the lib.SSEServer
clientChan := make(chan string)
// Create a client channel and add it to the SSE server
client := make(chan string)
lib.SSEServer.AddClient(channel, client)
defer lib.SSEServer.RemoveClient(channel, client)
// Add the client to the lib.SSEServer
lib.SSEServer.AddClient(channel, clientChan)
go lib.HandleIncomingMessages(c, pubsub, client)
defer func() {
// Remove the client from the lib.SSEServer when the connection is closed
lib.SSEServer.RemoveClient(channel, clientChan)
}()
// Create a ticker that fires every 15 seconds
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
// If the client has disconnected, stop the loop
return nil
case <-ticker.C:
// Every 30 seconds, send a comment to keep the connection alive
if _, err := c.Response().Write([]byte(": keep-alive\n\n")); err != nil {
return err
}
c.Response().Flush()
case msg := <-clientChan:
// Handle incoming messages from the lib.SSEServer
data := fmt.Sprintf("data: %s\n\n", msg)
if _, err := c.Response().Write([]byte(data)); err != nil {
return err
}
c.Response().Flush()
}
}
}

View file

@ -4,11 +4,10 @@ import (
"net/http"
"atri.dad/lib"
"atri.dad/lib/pubsub"
"github.com/labstack/echo/v4"
)
func SSEDemoSend(c echo.Context, pubSub pubsub.PubSub) error {
func SSEDemoSend(c echo.Context) error {
channel := c.QueryParam("channel")
if channel == "" {
channel = "default"
@ -31,7 +30,8 @@ func SSEDemoSend(c echo.Context, pubSub pubsub.PubSub) error {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "message parameter is required"})
}
lib.SendSSE(c.Request().Context(), pubSub, "default", message)
// Send message
lib.SendSSE(channel, message)
return c.JSON(http.StatusOK, map[string]string{"status": "message sent"})
}

17
go.mod
View file

@ -5,8 +5,6 @@ go 1.22.0
require github.com/alecthomas/chroma/v2 v2.14.0
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.11.0 // indirect
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
@ -16,16 +14,16 @@ require (
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
golang.org/x/image v0.16.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/image v0.17.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)
require (
github.com/aws/aws-sdk-go v1.53.15
github.com/aws/aws-sdk-go v1.53.16
github.com/clerkinc/clerk-sdk-go v1.49.1
github.com/disintegration/imaging v1.6.2
github.com/fatih/color v1.17.0
@ -34,12 +32,11 @@ require (
github.com/labstack/echo/v4 v4.12.0
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/redis/go-redis/v9 v9.5.2
github.com/stripe/stripe-go/v76 v76.25.0
github.com/svix/svix-webhooks v1.24.0
github.com/yuin/goldmark v1.7.1
github.com/yuin/goldmark-highlighting/v2 v2.0.0-20230729083705-37449abec8cc
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/sys v0.21.0 // indirect
gopkg.in/yaml.v2 v2.4.0
)

38
go.sum
View file

@ -6,22 +6,14 @@ github.com/alecthomas/chroma/v2 v2.14.0/go.mod h1:QolEbTfmUHIMVpBqxeDnNBj2uoeI4E
github.com/alecthomas/repr v0.0.0-20220113201626-b1b626ac65ae/go.mod h1:2kn6fqh/zIyPLmm3ugklbEi5hg5wS435eygvNfaDQL8=
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/aws/aws-sdk-go v1.53.15 h1:FtZmkg7xM8RfP2oY6p7xdKBYrRgkITk9yve2QV7N938=
github.com/aws/aws-sdk-go v1.53.15/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go v1.53.16 h1:8oZjKQO/ml1WLUZw5hvF7pvYjPf8o9f57Wldoy/q9Qc=
github.com/aws/aws-sdk-go v1.53.16/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/brianvoe/gofakeit/v6 v6.19.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/clerkinc/clerk-sdk-go v1.49.1 h1:3YfEFuXrM7fg6+GYxXR0umbV3aboErNUlOcFMuR5rfY=
github.com/clerkinc/clerk-sdk-go v1.49.1/go.mod h1:pejhMTTDAuw5aBpiHBEOOOHMAsxNfPvKfM5qexFJYlc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c=
github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4=
github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc=
@ -65,8 +57,6 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.5.2 h1:L0L3fcSNReTRGyZ6AqAEN0K56wYeYAwapBIhkvh0f3E=
github.com/redis/go-redis/v9 v9.5.2/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -93,11 +83,11 @@ golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.16.0 h1:9kloLAKhUufZhA12l5fwnx2NZW39/we1UhBesW433jw=
golang.org/x/image v0.16.0/go.mod h1:ugSZItdV4nOxyqp56HmXwH0Ry0nBCpjnZdpDaIHdoPs=
golang.org/x/image v0.17.0 h1:nTRVVdajgB8zCMZVsViyzhnMKPwYeroEERRC64JuLco=
golang.org/x/image v0.17.0/go.mod h1:4yyo5vMFQjVjUcVk4jEQcU9MGy/rulF5WvUILseCM2E=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@ -108,10 +98,10 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo=
golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -128,8 +118,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@ -144,8 +134,8 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View file

@ -1,27 +1,29 @@
package lib
import "github.com/fatih/color"
import (
"github.com/fatih/color"
)
// Error logging
var red = color.New(color.FgRed)
var LogError = red.Add(color.Bold)
var red = color.New(color.FgRed, color.Bold)
var LogError = red
// Info logging
var cyan = color.New(color.FgCyan)
var LogInfo = cyan.Add(color.Bold)
var cyan = color.New(color.FgCyan, color.Bold)
var LogInfo = cyan
// Success logging
var green = color.New(color.FgGreen)
var LogSuccess = green.Add(color.Bold)
var green = color.New(color.FgGreen, color.Bold)
var LogSuccess = green
// Warning logging
var yellow = color.New(color.FgYellow)
var LogWarning = yellow.Add(color.Bold)
var yellow = color.New(color.FgYellow, color.Bold)
var LogWarning = yellow
// Debug logging
var magenta = color.New(color.FgMagenta)
var LogDebug = magenta.Add(color.Bold)
var magenta = color.New(color.FgMagenta, color.Bold)
var LogDebug = magenta
// Custom logging
var white = color.New(color.FgWhite)
var LogCustom = white.Add(color.Bold)
var white = color.New(color.FgWhite, color.Bold)
var LogCustom = white

View file

@ -1,86 +0,0 @@
package adapters
import (
"context"
"sync"
"atri.dad/lib"
"atri.dad/lib/pubsub"
)
type LocalPubSub struct {
subscribers map[string][]chan pubsub.Message
lock sync.RWMutex
}
type LocalPubSubMessage struct {
messages <-chan pubsub.Message
}
func (ps *LocalPubSub) SubscribeToChannel(channel string) (pubsub.PubSubMessage, error) {
ps.lock.Lock()
defer ps.lock.Unlock()
if ps.subscribers == nil {
ps.subscribers = make(map[string][]chan pubsub.Message)
}
ch := make(chan pubsub.Message, 100)
ps.subscribers[channel] = append(ps.subscribers[channel], ch)
lib.LogInfo.Printf("[PUBSUB/LOCAL] Subscribed to channel %s\n", channel)
return &LocalPubSubMessage{messages: ch}, nil
}
func (ps *LocalPubSub) PublishToChannel(channel string, message string) error {
subscribers, ok := ps.subscribers[channel]
if !ok {
lib.LogWarning.Printf("\n[PUBSUB/LOCAL] No subscribers for channel %s\n", channel)
return nil
}
ps.lock.Lock()
defer ps.lock.Unlock()
lib.LogInfo.Printf("\n[PUBSUB/LOCAL] Publishing message to channel %s: %s\n", channel, message)
for _, ch := range subscribers {
ch <- pubsub.Message{Payload: message}
}
return nil
}
func (ps *LocalPubSub) UnsubscribeFromChannel(channel string, ch <-chan pubsub.Message) {
ps.lock.Lock()
defer ps.lock.Unlock()
subscribers := ps.subscribers[channel]
for i, subscriber := range subscribers {
if subscriber == ch {
// Remove the subscriber from the slice
subscribers = append(subscribers[:i], subscribers[i+1:]...)
break
}
}
if len(subscribers) == 0 {
delete(ps.subscribers, channel)
} else {
ps.subscribers[channel] = subscribers
}
}
func (m *LocalPubSubMessage) ReceiveMessage(ctx context.Context) (*pubsub.Message, error) {
for {
select {
case <-ctx.Done():
// The client has disconnected. Stop trying to send messages.
return nil, ctx.Err()
case msg := <-m.messages:
// A message has been received. Send it to the client.
lib.LogInfo.Printf("\n[PUBSUB/LOCAL] Received message: %s\n", msg.Payload)
return &msg, nil
}
}
}

View file

@ -1,71 +0,0 @@
package adapters
import (
"context"
"os"
"atri.dad/lib"
"atri.dad/lib/pubsub"
"github.com/joho/godotenv"
"github.com/redis/go-redis/v9"
)
var RedisClient *redis.Client
type RedisPubSubMessage struct {
pubsub *redis.PubSub
}
// RedisPubSub is a Redis implementation of the PubSub interface.
type RedisPubSub struct {
Client *redis.Client
}
func NewRedisClient() (*redis.Client, error) {
if RedisClient != nil {
return RedisClient, nil
}
godotenv.Load(".env")
redis_url := os.Getenv("REDIS_URL")
opts, err := redis.ParseURL(redis_url)
if err != nil {
return nil, err
}
lib.LogInfo.Printf("\n[PUBSUB/REDIS]Connecting to Redis at %s\n", opts.Addr)
RedisClient = redis.NewClient(opts)
return RedisClient, nil
}
func (m *RedisPubSubMessage) ReceiveMessage(ctx context.Context) (*pubsub.Message, error) {
msg, err := m.pubsub.ReceiveMessage(ctx)
if err != nil {
return nil, err
}
lib.LogInfo.Printf("\n[PUBSUB/REDIS] Received message: %s\n", msg.Payload)
return &pubsub.Message{Payload: msg.Payload}, nil
}
func (ps *RedisPubSub) SubscribeToChannel(channel string) (pubsub.PubSubMessage, error) {
pubsub := ps.Client.Subscribe(context.Background(), channel)
_, err := pubsub.Receive(context.Background())
if err != nil {
return nil, err
}
lib.LogInfo.Printf("\n[PUBSUB/REDIS] Subscribed to channel %s\n", channel)
return &RedisPubSubMessage{pubsub: pubsub}, nil
}
func (r *RedisPubSub) PublishToChannel(channel string, message string) error {
err := r.Client.Publish(context.Background(), channel, message).Err()
if err != nil {
return err
}
lib.LogInfo.Printf("\n[PUBSUB/REDIS] Publishing message to channel %s: %s\n", channel, message)
return nil
}

View file

@ -1,16 +0,0 @@
package pubsub
import "context"
type Message struct {
Payload string
}
type PubSubMessage interface {
ReceiveMessage(ctx context.Context) (*Message, error)
}
type PubSub interface {
SubscribeToChannel(channel string) (PubSubMessage, error)
PublishToChannel(channel string, message string) error
}

View file

@ -1,14 +1,6 @@
package lib
import (
"context"
"fmt"
"net/http"
"sync"
"atri.dad/lib/pubsub"
"github.com/labstack/echo/v4"
)
import "sync"
type SSEServerType struct {
clients map[string]map[chan string]bool
@ -37,6 +29,8 @@ func (s *SSEServerType) AddClient(channel string, client chan string) {
s.clients[channel] = make(map[chan string]bool)
}
s.clients[channel][client] = true
LogInfo.Printf("\nClient connected to channel %s\n", channel)
}
func (s *SSEServerType) RemoveClient(channel string, client chan string) {
@ -47,6 +41,8 @@ func (s *SSEServerType) RemoveClient(channel string, client chan string) {
if len(s.clients[channel]) == 0 {
delete(s.clients, channel)
}
LogInfo.Printf("\nClient disconnected from channel %s\n", channel)
}
func (s *SSEServerType) ClientCount(channel string) int {
@ -56,87 +52,15 @@ func (s *SSEServerType) ClientCount(channel string) int {
return len(s.clients[channel])
}
func SendSSE(ctx context.Context, messageBroker pubsub.PubSub, channel string, message string) error {
LogInfo.Printf("Sending SSE message to channel %s", channel)
func SendSSE(channel string, message string) error {
SSEServer.mu.Lock()
defer SSEServer.mu.Unlock()
errCh := make(chan error, 1)
go func() {
select {
case <-ctx.Done():
errCh <- ctx.Err()
default:
err := messageBroker.PublishToChannel(channel, message)
errCh <- err
}
}()
err := <-errCh
if err != nil {
LogError.Printf("Error sending SSE message: %v", err)
return err
for client := range SSEServer.clients[channel] {
client <- message
}
LogSuccess.Printf("SSE message sent successfully")
LogDebug.Printf("\nMessage broadcast on channel %s: %s\n", channel, message)
return nil
}
func SetSSEHeaders(c echo.Context) {
c.Response().Header().Set(echo.HeaderContentType, "text/event-stream")
c.Response().Header().Set(echo.HeaderConnection, "keep-alive")
c.Response().Header().Set(echo.HeaderCacheControl, "no-cache")
c.Response().Header().Set("X-Accel-Buffering", "no")
}
func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client chan string) {
if c.Response().Writer == nil {
LogError.Printf("Cannot handle incoming messages: ResponseWriter is nil")
return
}
var mutex sync.Mutex
for {
// Receive messages using the context of the request, which is cancelled when the client disconnects
msg, err := pubsub.ReceiveMessage(c.Request().Context())
if err != nil {
if err == context.Canceled {
// Log when the client disconnects and stop the message forwarding
LogInfo.Printf("Client disconnected, stopping message forwarding")
return
}
// Log errors other than client disconnection
LogError.Printf("Failed to receive message: %v", err)
return
}
// Prepare the data string to be sent as an SSE
data := fmt.Sprintf("data: %s\n\n", msg.Payload)
// Locking before writing to the response writer to avoid concurrent write issues
mutex.Lock()
if c.Response().Writer != nil {
_, err := c.Response().Write([]byte(data))
if err != nil {
// Log failure to write and unlock before returning
LogError.Printf("Failed to write message: %v", err)
mutex.Unlock()
return
}
// Flush the response if possible
flusher, ok := c.Response().Writer.(http.Flusher)
if ok {
flusher.Flush()
} else {
LogError.Println("Failed to flush: ResponseWriter does not implement http.Flusher")
}
} else {
LogError.Println("Failed to write: ResponseWriter is nil")
}
// Ensure the mutex is unlocked after processing each message
mutex.Unlock()
}
}

22
main.go
View file

@ -9,8 +9,6 @@ import (
"atri.dad/api"
"atri.dad/api/webhooks"
"atri.dad/lib"
"atri.dad/lib/pubsub"
"atri.dad/lib/pubsub/adapters"
"atri.dad/pages"
"github.com/joho/godotenv"
@ -25,22 +23,6 @@ func main() {
// Load environment variables
godotenv.Load(".env")
// Initialize Redis client
redisClient, redisError := adapters.NewRedisClient()
// Initialize pubsub
var pubSub pubsub.PubSub
if redisError != nil {
lib.LogWarning.Printf("\n[PUBSUB/INIT] Failed to connect to Redis: %v\n", redisError)
lib.LogWarning.Printf("\n[PUBSUB/INIT] Falling back to LocalPubSub\n")
pubSub = &adapters.LocalPubSub{}
} else {
adapters.RedisClient = redisClient
pubSub = &adapters.RedisPubSub{
Client: adapters.RedisClient,
}
}
// Initialize Echo router
e := echo.New()
@ -79,11 +61,11 @@ func main() {
apiGroup.GET("/post/copy", api.PostCopy)
apiGroup.GET("/sse", func(c echo.Context) error {
return api.SSE(c, pubSub)
return api.SSE(c)
})
apiGroup.POST("/tools/sendsse", func(c echo.Context) error {
return api.SSEDemoSend(c, pubSub)
return api.SSEDemoSend(c)
})
apiGroup.POST("/tools/resize", api.ResizeHandler)