From 1b95e05d56197c042331a23ba3a38682b869056e Mon Sep 17 00:00:00 2001 From: FZambia Date: Thu, 11 Jul 2024 21:12:47 +0300 Subject: [PATCH] cleanup --- _examples/chat_json/main.go | 98 +++++++++---------------------------- node.go | 4 -- 2 files changed, 23 insertions(+), 79 deletions(-) diff --git a/_examples/chat_json/main.go b/_examples/chat_json/main.go index d8440b09..c460c47a 100644 --- a/_examples/chat_json/main.go +++ b/_examples/chat_json/main.go @@ -5,9 +5,8 @@ import ( "encoding/json" "errors" "flag" - "fmt" + "io" "log" - "math/rand" "net/http" "os" "os/signal" @@ -28,10 +27,6 @@ type clientMessage struct { Input string `json:"input"` } -func init() { - log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds) -} - func handleLog(e centrifuge.LogEntry) { log.Printf("%s: %v", e.Message, e.Fields) } @@ -74,33 +69,11 @@ func channelSubscribeAllowed(channel string) bool { func main() { node, _ := centrifuge.New(centrifuge.Config{ - LogLevel: centrifuge.LogLevelError, + LogLevel: centrifuge.LogLevelInfo, LogHandler: handleLog, HistoryMetaTTL: 24 * time.Hour, }) - redisShardConfigs := []centrifuge.RedisShardConfig{ - {Address: "localhost:6379"}, - //{Address: "localhost:6380"}, - } - var redisShards []*centrifuge.RedisShard - for _, redisConf := range redisShardConfigs { - redisShard, err := centrifuge.NewRedisShard(node, redisConf) - if err != nil { - log.Fatal(err) - } - redisShards = append(redisShards, redisShard) - } - - broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{ - // And configure a couple of shards to use. - Shards: redisShards, - }) - if err != nil { - log.Fatal(err) - } - node.SetBroker(broker) - node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) { cred, _ := centrifuge.GetCredentials(ctx) return centrifuge.ConnectReply{ @@ -110,54 +83,33 @@ func main() { "#" + cred.UserID: { EnableRecovery: true, EmitPresence: true, - //EmitJoinLeave: true, - //PushJoinLeave: true, + EmitJoinLeave: true, + PushJoinLeave: true, }, }, }, nil }) - go func() { - for { - time.Sleep(1000 * time.Millisecond) - fmt.Println("NUM CLIENT-SIDE SUBSCRIBERS", node.Hub().NumSubscribers(exampleChannel)) - fmt.Println("NUM SERVER-SIDE SUBSCRIBERS", node.Hub().NumSubscribers("#42")) - fmt.Println("NUM CLIENTS", node.Hub().NumClients()) - } - }() - - go func() { - for { - time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) - err := node.Unsubscribe("42", exampleChannel, centrifuge.WithCustomUnsubscribe(centrifuge.Unsubscribe{ - Code: centrifuge.UnsubscribeCodeInsufficient, - })) - if err != nil { - log.Printf("error unsubscribing from channel: %s", err) - } - } - }() - - go func() { - for { - time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) - err := node.Unsubscribe("42", "#42", centrifuge.WithCustomUnsubscribe(centrifuge.Unsubscribe{ - Code: centrifuge.UnsubscribeCodeInsufficient, - })) - if err != nil { - log.Printf("error unsubscribing from channel: %s", err) - } - } - }() - node.OnConnect(func(client *centrifuge.Client) { transport := client.Transport() log.Printf("[user %s] connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol()) + // Event handler should not block, so start separate goroutine to + // periodically send messages to client. go func() { - if rand.Intn(100) > 50 { - time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) - client.Disconnect(centrifuge.DisconnectForceReconnect) + for { + select { + case <-client.Context().Done(): + return + case <-time.After(5 * time.Second): + err := client.Send([]byte(`{"time": "` + strconv.FormatInt(time.Now().Unix(), 10) + `"}`)) + if err != nil { + if err == io.EOF { + return + } + log.Printf("error sending message: %s", err) + } + } } }() @@ -170,24 +122,20 @@ func main() { }) client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { - log.Printf("client %s [user %s] subscribes on %s", client.ID(), client.UserID(), e.Channel) + log.Printf("[user %s] subscribes on %s", client.UserID(), e.Channel) if !channelSubscribeAllowed(e.Channel) { cb(centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied) return } - time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) - - log.Printf("client %s [user %s] allowed to subscribe on %s", client.ID(), client.UserID(), e.Channel) - cb(centrifuge.SubscribeReply{ Options: centrifuge.SubscribeOptions{ EnableRecovery: true, EmitPresence: true, - //EmitJoinLeave: true, - //PushJoinLeave: true, - Data: []byte(`{"msg": "welcome"}`), + EmitJoinLeave: true, + PushJoinLeave: true, + Data: []byte(`{"msg": "welcome"}`), }, }, nil) }) diff --git a/node.go b/node.go index eab1f0bd..8dc63af1 100644 --- a/node.go +++ b/node.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "hash/fnv" - "math/rand" "os" "strconv" "strings" @@ -977,9 +976,6 @@ func (n *Node) removeClient(c *Client) error { // addSubscription registers subscription of connection on channel in both // Hub and Broker. func (n *Node) addSubscription(ch string, sub subInfo) error { - if rand.Intn(100) > 50 { - return errors.New("boom") - } n.metrics.incActionCount("add_subscription") mu := n.subLock(ch) mu.Lock()