From 924906431fbbca6d2380e482238a4b785db74f77 Mon Sep 17 00:00:00 2001 From: Atridad Lahiji Date: Tue, 4 Jun 2024 15:40:29 -0600 Subject: [PATCH] Simplified the SSE system! --- api/sse.go | 48 +++++++++------ api/tools.sendsse.go | 6 +- go.mod | 17 +++--- go.sum | 38 +++++------- lib/logging.go | 28 +++++---- lib/pubsub/adapters/localpubsub.go | 86 -------------------------- lib/pubsub/adapters/redispubsub.go | 71 ---------------------- lib/pubsub/interface.go | 16 ----- lib/sse.go | 98 ++++-------------------------- main.go | 22 +------ 10 files changed, 83 insertions(+), 347 deletions(-) delete mode 100644 lib/pubsub/adapters/localpubsub.go delete mode 100644 lib/pubsub/adapters/redispubsub.go delete mode 100644 lib/pubsub/interface.go diff --git a/api/sse.go b/api/sse.go index 8fe77a9..e2bed48 100644 --- a/api/sse.go +++ b/api/sse.go @@ -1,19 +1,14 @@ package api import ( - "errors" "fmt" + "time" "atri.dad/lib" - "atri.dad/lib/pubsub" "github.com/labstack/echo/v4" ) -func SSE(c echo.Context, pubSub pubsub.PubSub) error { - if pubSub == nil { - return errors.New("pubSub is nil") - } - +func SSE(c echo.Context) error { channel := c.QueryParam("channel") if channel == "" { channel = "default" @@ -22,25 +17,44 @@ func SSE(c echo.Context, pubSub pubsub.PubSub) error { // Use the request context, which is cancelled when the client disconnects ctx := c.Request().Context() - pubsub, err := pubSub.SubscribeToChannel(channel) - if err != nil { - return fmt.Errorf("failed to subscribe to channel: %w", err) - } + c.Response().Header().Set(echo.HeaderContentType, "text/event-stream") + c.Response().Header().Set(echo.HeaderConnection, "keep-alive") + c.Response().Header().Set(echo.HeaderCacheControl, "no-cache") - lib.SetSSEHeaders(c) + // Create a channel to receive messages from the lib.SSEServer + clientChan := make(chan string) - // Create a client channel and add it to the SSE server - client := make(chan string) - lib.SSEServer.AddClient(channel, client) - defer lib.SSEServer.RemoveClient(channel, client) + // Add the client to the lib.SSEServer + lib.SSEServer.AddClient(channel, clientChan) - go lib.HandleIncomingMessages(c, pubsub, client) + defer func() { + // Remove the client from the lib.SSEServer when the connection is closed + lib.SSEServer.RemoveClient(channel, clientChan) + }() + + // Create a ticker that fires every 15 seconds + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() for { select { case <-ctx.Done(): // If the client has disconnected, stop the loop return nil + case <-ticker.C: + // Every 30 seconds, send a comment to keep the connection alive + if _, err := c.Response().Write([]byte(": keep-alive\n\n")); err != nil { + return err + } + c.Response().Flush() + case msg := <-clientChan: + // Handle incoming messages from the lib.SSEServer + data := fmt.Sprintf("data: %s\n\n", msg) + if _, err := c.Response().Write([]byte(data)); err != nil { + return err + } + + c.Response().Flush() } } } diff --git a/api/tools.sendsse.go b/api/tools.sendsse.go index ca31a95..95a4609 100644 --- a/api/tools.sendsse.go +++ b/api/tools.sendsse.go @@ -4,11 +4,10 @@ import ( "net/http" "atri.dad/lib" - "atri.dad/lib/pubsub" "github.com/labstack/echo/v4" ) -func SSEDemoSend(c echo.Context, pubSub pubsub.PubSub) error { +func SSEDemoSend(c echo.Context) error { channel := c.QueryParam("channel") if channel == "" { channel = "default" @@ -31,7 +30,8 @@ func SSEDemoSend(c echo.Context, pubSub pubsub.PubSub) error { return c.JSON(http.StatusBadRequest, map[string]string{"error": "message parameter is required"}) } - lib.SendSSE(c.Request().Context(), pubSub, "default", message) + // Send message + lib.SendSSE(channel, message) return c.JSON(http.StatusOK, map[string]string{"status": "message sent"}) } diff --git a/go.mod b/go.mod index 3c71d29..950d4b9 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,6 @@ go 1.22.0 require github.com/alecthomas/chroma/v2 v2.14.0 require ( - github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dlclark/regexp2 v1.11.0 // indirect github.com/go-jose/go-jose/v3 v3.0.3 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect @@ -16,16 +14,16 @@ require ( github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect - golang.org/x/image v0.16.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/oauth2 v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/image v0.17.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/oauth2 v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect ) require ( - github.com/aws/aws-sdk-go v1.53.15 + github.com/aws/aws-sdk-go v1.53.16 github.com/clerkinc/clerk-sdk-go v1.49.1 github.com/disintegration/imaging v1.6.2 github.com/fatih/color v1.17.0 @@ -34,12 +32,11 @@ require ( github.com/labstack/echo/v4 v4.12.0 github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/redis/go-redis/v9 v9.5.2 github.com/stripe/stripe-go/v76 v76.25.0 github.com/svix/svix-webhooks v1.24.0 github.com/yuin/goldmark v1.7.1 github.com/yuin/goldmark-highlighting/v2 v2.0.0-20230729083705-37449abec8cc - golang.org/x/crypto v0.23.0 // indirect - golang.org/x/sys v0.20.0 // indirect + golang.org/x/crypto v0.24.0 // indirect + golang.org/x/sys v0.21.0 // indirect gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 3cae5ad..9ca37c9 100644 --- a/go.sum +++ b/go.sum @@ -6,22 +6,14 @@ github.com/alecthomas/chroma/v2 v2.14.0/go.mod h1:QolEbTfmUHIMVpBqxeDnNBj2uoeI4E github.com/alecthomas/repr v0.0.0-20220113201626-b1b626ac65ae/go.mod h1:2kn6fqh/zIyPLmm3ugklbEi5hg5wS435eygvNfaDQL8= github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= -github.com/aws/aws-sdk-go v1.53.15 h1:FtZmkg7xM8RfP2oY6p7xdKBYrRgkITk9yve2QV7N938= -github.com/aws/aws-sdk-go v1.53.15/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go v1.53.16 h1:8oZjKQO/ml1WLUZw5hvF7pvYjPf8o9f57Wldoy/q9Qc= +github.com/aws/aws-sdk-go v1.53.16/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/brianvoe/gofakeit/v6 v6.19.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8= -github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= -github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= -github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= -github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= -github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/clerkinc/clerk-sdk-go v1.49.1 h1:3YfEFuXrM7fg6+GYxXR0umbV3aboErNUlOcFMuR5rfY= github.com/clerkinc/clerk-sdk-go v1.49.1/go.mod h1:pejhMTTDAuw5aBpiHBEOOOHMAsxNfPvKfM5qexFJYlc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c= github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4= github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= @@ -65,8 +57,6 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.5.2 h1:L0L3fcSNReTRGyZ6AqAEN0K56wYeYAwapBIhkvh0f3E= -github.com/redis/go-redis/v9 v9.5.2/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -93,11 +83,11 @@ golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= -golang.org/x/image v0.16.0 h1:9kloLAKhUufZhA12l5fwnx2NZW39/we1UhBesW433jw= -golang.org/x/image v0.16.0/go.mod h1:ugSZItdV4nOxyqp56HmXwH0Ry0nBCpjnZdpDaIHdoPs= +golang.org/x/image v0.17.0 h1:nTRVVdajgB8zCMZVsViyzhnMKPwYeroEERRC64JuLco= +golang.org/x/image v0.17.0/go.mod h1:4yyo5vMFQjVjUcVk4jEQcU9MGy/rulF5WvUILseCM2E= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -108,10 +98,10 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= -golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= +golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -128,8 +118,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -144,8 +134,8 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/lib/logging.go b/lib/logging.go index 4744899..78884d3 100644 --- a/lib/logging.go +++ b/lib/logging.go @@ -1,27 +1,29 @@ package lib -import "github.com/fatih/color" +import ( + "github.com/fatih/color" +) // Error logging -var red = color.New(color.FgRed) -var LogError = red.Add(color.Bold) +var red = color.New(color.FgRed, color.Bold) +var LogError = red // Info logging -var cyan = color.New(color.FgCyan) -var LogInfo = cyan.Add(color.Bold) +var cyan = color.New(color.FgCyan, color.Bold) +var LogInfo = cyan // Success logging -var green = color.New(color.FgGreen) -var LogSuccess = green.Add(color.Bold) +var green = color.New(color.FgGreen, color.Bold) +var LogSuccess = green // Warning logging -var yellow = color.New(color.FgYellow) -var LogWarning = yellow.Add(color.Bold) +var yellow = color.New(color.FgYellow, color.Bold) +var LogWarning = yellow // Debug logging -var magenta = color.New(color.FgMagenta) -var LogDebug = magenta.Add(color.Bold) +var magenta = color.New(color.FgMagenta, color.Bold) +var LogDebug = magenta // Custom logging -var white = color.New(color.FgWhite) -var LogCustom = white.Add(color.Bold) +var white = color.New(color.FgWhite, color.Bold) +var LogCustom = white diff --git a/lib/pubsub/adapters/localpubsub.go b/lib/pubsub/adapters/localpubsub.go deleted file mode 100644 index 941cdc2..0000000 --- a/lib/pubsub/adapters/localpubsub.go +++ /dev/null @@ -1,86 +0,0 @@ -package adapters - -import ( - "context" - "sync" - - "atri.dad/lib" - "atri.dad/lib/pubsub" -) - -type LocalPubSub struct { - subscribers map[string][]chan pubsub.Message - lock sync.RWMutex -} - -type LocalPubSubMessage struct { - messages <-chan pubsub.Message -} - -func (ps *LocalPubSub) SubscribeToChannel(channel string) (pubsub.PubSubMessage, error) { - ps.lock.Lock() - defer ps.lock.Unlock() - - if ps.subscribers == nil { - ps.subscribers = make(map[string][]chan pubsub.Message) - } - - ch := make(chan pubsub.Message, 100) - ps.subscribers[channel] = append(ps.subscribers[channel], ch) - - lib.LogInfo.Printf("[PUBSUB/LOCAL] Subscribed to channel %s\n", channel) - - return &LocalPubSubMessage{messages: ch}, nil -} - -func (ps *LocalPubSub) PublishToChannel(channel string, message string) error { - subscribers, ok := ps.subscribers[channel] - if !ok { - lib.LogWarning.Printf("\n[PUBSUB/LOCAL] No subscribers for channel %s\n", channel) - return nil - } - - ps.lock.Lock() - defer ps.lock.Unlock() - - lib.LogInfo.Printf("\n[PUBSUB/LOCAL] Publishing message to channel %s: %s\n", channel, message) - for _, ch := range subscribers { - ch <- pubsub.Message{Payload: message} - } - - return nil -} - -func (ps *LocalPubSub) UnsubscribeFromChannel(channel string, ch <-chan pubsub.Message) { - ps.lock.Lock() - defer ps.lock.Unlock() - - subscribers := ps.subscribers[channel] - for i, subscriber := range subscribers { - if subscriber == ch { - // Remove the subscriber from the slice - subscribers = append(subscribers[:i], subscribers[i+1:]...) - break - } - } - - if len(subscribers) == 0 { - delete(ps.subscribers, channel) - } else { - ps.subscribers[channel] = subscribers - } -} - -func (m *LocalPubSubMessage) ReceiveMessage(ctx context.Context) (*pubsub.Message, error) { - for { - select { - case <-ctx.Done(): - // The client has disconnected. Stop trying to send messages. - return nil, ctx.Err() - case msg := <-m.messages: - // A message has been received. Send it to the client. - lib.LogInfo.Printf("\n[PUBSUB/LOCAL] Received message: %s\n", msg.Payload) - return &msg, nil - } - } -} diff --git a/lib/pubsub/adapters/redispubsub.go b/lib/pubsub/adapters/redispubsub.go deleted file mode 100644 index 0d3f2ec..0000000 --- a/lib/pubsub/adapters/redispubsub.go +++ /dev/null @@ -1,71 +0,0 @@ -package adapters - -import ( - "context" - "os" - - "atri.dad/lib" - "atri.dad/lib/pubsub" - "github.com/joho/godotenv" - "github.com/redis/go-redis/v9" -) - -var RedisClient *redis.Client - -type RedisPubSubMessage struct { - pubsub *redis.PubSub -} - -// RedisPubSub is a Redis implementation of the PubSub interface. -type RedisPubSub struct { - Client *redis.Client -} - -func NewRedisClient() (*redis.Client, error) { - if RedisClient != nil { - return RedisClient, nil - } - - godotenv.Load(".env") - redis_url := os.Getenv("REDIS_URL") - - opts, err := redis.ParseURL(redis_url) - - if err != nil { - return nil, err - } - - lib.LogInfo.Printf("\n[PUBSUB/REDIS]Connecting to Redis at %s\n", opts.Addr) - RedisClient = redis.NewClient(opts) - - return RedisClient, nil -} - -func (m *RedisPubSubMessage) ReceiveMessage(ctx context.Context) (*pubsub.Message, error) { - msg, err := m.pubsub.ReceiveMessage(ctx) - if err != nil { - return nil, err - } - lib.LogInfo.Printf("\n[PUBSUB/REDIS] Received message: %s\n", msg.Payload) - return &pubsub.Message{Payload: msg.Payload}, nil -} - -func (ps *RedisPubSub) SubscribeToChannel(channel string) (pubsub.PubSubMessage, error) { - pubsub := ps.Client.Subscribe(context.Background(), channel) - _, err := pubsub.Receive(context.Background()) - if err != nil { - return nil, err - } - lib.LogInfo.Printf("\n[PUBSUB/REDIS] Subscribed to channel %s\n", channel) - - return &RedisPubSubMessage{pubsub: pubsub}, nil -} - -func (r *RedisPubSub) PublishToChannel(channel string, message string) error { - err := r.Client.Publish(context.Background(), channel, message).Err() - if err != nil { - return err - } - lib.LogInfo.Printf("\n[PUBSUB/REDIS] Publishing message to channel %s: %s\n", channel, message) - return nil -} diff --git a/lib/pubsub/interface.go b/lib/pubsub/interface.go deleted file mode 100644 index b43da28..0000000 --- a/lib/pubsub/interface.go +++ /dev/null @@ -1,16 +0,0 @@ -package pubsub - -import "context" - -type Message struct { - Payload string -} - -type PubSubMessage interface { - ReceiveMessage(ctx context.Context) (*Message, error) -} - -type PubSub interface { - SubscribeToChannel(channel string) (PubSubMessage, error) - PublishToChannel(channel string, message string) error -} diff --git a/lib/sse.go b/lib/sse.go index 5b6e8eb..cd60300 100644 --- a/lib/sse.go +++ b/lib/sse.go @@ -1,14 +1,6 @@ package lib -import ( - "context" - "fmt" - "net/http" - "sync" - - "atri.dad/lib/pubsub" - "github.com/labstack/echo/v4" -) +import "sync" type SSEServerType struct { clients map[string]map[chan string]bool @@ -37,6 +29,8 @@ func (s *SSEServerType) AddClient(channel string, client chan string) { s.clients[channel] = make(map[chan string]bool) } s.clients[channel][client] = true + + LogInfo.Printf("\nClient connected to channel %s\n", channel) } func (s *SSEServerType) RemoveClient(channel string, client chan string) { @@ -47,6 +41,8 @@ func (s *SSEServerType) RemoveClient(channel string, client chan string) { if len(s.clients[channel]) == 0 { delete(s.clients, channel) } + + LogInfo.Printf("\nClient disconnected from channel %s\n", channel) } func (s *SSEServerType) ClientCount(channel string) int { @@ -56,87 +52,15 @@ func (s *SSEServerType) ClientCount(channel string) int { return len(s.clients[channel]) } -func SendSSE(ctx context.Context, messageBroker pubsub.PubSub, channel string, message string) error { - LogInfo.Printf("Sending SSE message to channel %s", channel) +func SendSSE(channel string, message string) error { + SSEServer.mu.Lock() + defer SSEServer.mu.Unlock() - errCh := make(chan error, 1) - - go func() { - select { - case <-ctx.Done(): - errCh <- ctx.Err() - default: - err := messageBroker.PublishToChannel(channel, message) - errCh <- err - } - }() - - err := <-errCh - if err != nil { - LogError.Printf("Error sending SSE message: %v", err) - - return err + for client := range SSEServer.clients[channel] { + client <- message } - LogSuccess.Printf("SSE message sent successfully") + LogDebug.Printf("\nMessage broadcast on channel %s: %s\n", channel, message) return nil } - -func SetSSEHeaders(c echo.Context) { - c.Response().Header().Set(echo.HeaderContentType, "text/event-stream") - c.Response().Header().Set(echo.HeaderConnection, "keep-alive") - c.Response().Header().Set(echo.HeaderCacheControl, "no-cache") - c.Response().Header().Set("X-Accel-Buffering", "no") -} - -func HandleIncomingMessages(c echo.Context, pubsub pubsub.PubSubMessage, client chan string) { - if c.Response().Writer == nil { - LogError.Printf("Cannot handle incoming messages: ResponseWriter is nil") - return - } - - var mutex sync.Mutex - - for { - // Receive messages using the context of the request, which is cancelled when the client disconnects - msg, err := pubsub.ReceiveMessage(c.Request().Context()) - if err != nil { - if err == context.Canceled { - // Log when the client disconnects and stop the message forwarding - LogInfo.Printf("Client disconnected, stopping message forwarding") - return - } - // Log errors other than client disconnection - LogError.Printf("Failed to receive message: %v", err) - return - } - - // Prepare the data string to be sent as an SSE - data := fmt.Sprintf("data: %s\n\n", msg.Payload) - - // Locking before writing to the response writer to avoid concurrent write issues - mutex.Lock() - if c.Response().Writer != nil { - _, err := c.Response().Write([]byte(data)) - if err != nil { - // Log failure to write and unlock before returning - LogError.Printf("Failed to write message: %v", err) - mutex.Unlock() - return - } - - // Flush the response if possible - flusher, ok := c.Response().Writer.(http.Flusher) - if ok { - flusher.Flush() - } else { - LogError.Println("Failed to flush: ResponseWriter does not implement http.Flusher") - } - } else { - LogError.Println("Failed to write: ResponseWriter is nil") - } - // Ensure the mutex is unlocked after processing each message - mutex.Unlock() - } -} diff --git a/main.go b/main.go index 18ec487..efb7851 100755 --- a/main.go +++ b/main.go @@ -9,8 +9,6 @@ import ( "atri.dad/api" "atri.dad/api/webhooks" "atri.dad/lib" - "atri.dad/lib/pubsub" - "atri.dad/lib/pubsub/adapters" "atri.dad/pages" "github.com/joho/godotenv" @@ -25,22 +23,6 @@ func main() { // Load environment variables godotenv.Load(".env") - // Initialize Redis client - redisClient, redisError := adapters.NewRedisClient() - - // Initialize pubsub - var pubSub pubsub.PubSub - if redisError != nil { - lib.LogWarning.Printf("\n[PUBSUB/INIT] Failed to connect to Redis: %v\n", redisError) - lib.LogWarning.Printf("\n[PUBSUB/INIT] Falling back to LocalPubSub\n") - pubSub = &adapters.LocalPubSub{} - } else { - adapters.RedisClient = redisClient - pubSub = &adapters.RedisPubSub{ - Client: adapters.RedisClient, - } - } - // Initialize Echo router e := echo.New() @@ -79,11 +61,11 @@ func main() { apiGroup.GET("/post/copy", api.PostCopy) apiGroup.GET("/sse", func(c echo.Context) error { - return api.SSE(c, pubSub) + return api.SSE(c) }) apiGroup.POST("/tools/sendsse", func(c echo.Context) error { - return api.SSEDemoSend(c, pubSub) + return api.SSEDemoSend(c) }) apiGroup.POST("/tools/resize", api.ResizeHandler)