diff --git a/lib/pubsub/adapters/localpubsub.go b/lib/pubsub/adapters/localpubsub.go index d4c089b..1c38388 100644 --- a/lib/pubsub/adapters/localpubsub.go +++ b/lib/pubsub/adapters/localpubsub.go @@ -35,38 +35,23 @@ func (ps *LocalPubSub) SubscribeToChannel(channel string) (pubsub.PubSubMessage, } func (ps *LocalPubSub) PublishToChannel(channel string, message string) error { + subscribers, ok := ps.subscribers[channel] + if !ok { + lib.LogWarning.Printf("\n[PUBSUB/LOCAL] No subscribers for channel %s\n", channel) + return nil + } + ps.lock.Lock() defer ps.lock.Unlock() - if subscribers, ok := ps.subscribers[channel]; ok { - lib.LogInfo.Printf("\n[PUBSUB/LOCAL] Publishing message to channel %s: %s\n", channel, message) - for _, ch := range subscribers { - ch <- pubsub.Message{Payload: message} - } - } else { - lib.LogWarning.Printf("\n[PUBSUB/LOCAL] No subscribers for channel %s\n", channel) + lib.LogInfo.Printf("\n[PUBSUB/LOCAL] Publishing message to channel %s: %s\n", channel, message) + for _, ch := range subscribers { + ch <- pubsub.Message{Payload: message} } return nil } -func (m *LocalPubSubMessage) ReceiveMessage(ctx context.Context) (*pubsub.Message, error) { - for { - select { - case <-ctx.Done(): - // The client has disconnected. Stop trying to send messages. - return nil, ctx.Err() - case msg := <-m.messages: - // A message has been received. Send it to the client. - lib.LogInfo.Printf("\n[PUBSUB/LOCAL] Received message: %s\n", msg.Payload) - return &msg, nil - case <-time.After(30 * time.Second): - // No message has been received for 30 seconds. Send a keep-alive message. - return &pubsub.Message{Payload: "keep-alive"}, nil - } - } -} - func (ps *LocalPubSub) UnsubscribeFromChannel(channel string, ch <-chan pubsub.Message) { ps.lock.Lock() defer ps.lock.Unlock() @@ -86,3 +71,20 @@ func (ps *LocalPubSub) UnsubscribeFromChannel(channel string, ch <-chan pubsub.M ps.subscribers[channel] = subscribers } } + +func (m *LocalPubSubMessage) ReceiveMessage(ctx context.Context) (*pubsub.Message, error) { + for { + select { + case <-ctx.Done(): + // The client has disconnected. Stop trying to send messages. + return nil, ctx.Err() + case msg := <-m.messages: + // A message has been received. Send it to the client. + lib.LogInfo.Printf("\n[PUBSUB/LOCAL] Received message: %s\n", msg.Payload) + return &msg, nil + case <-time.After(30 * time.Second): + // No message has been received for 30 seconds. Send a keep-alive message. + return &pubsub.Message{Payload: "keep-alive"}, nil + } + } +}