Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jul 11, 2024
1 parent 7665a0a commit 1b95e05
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 79 deletions.
98 changes: 23 additions & 75 deletions _examples/chat_json/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"os"
"os/signal"
Expand All @@ -28,10 +27,6 @@ type clientMessage struct {
Input string `json:"input"`
}

func init() {
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
}

func handleLog(e centrifuge.LogEntry) {
log.Printf("%s: %v", e.Message, e.Fields)
}
Expand Down Expand Up @@ -74,33 +69,11 @@ func channelSubscribeAllowed(channel string) bool {

func main() {
node, _ := centrifuge.New(centrifuge.Config{
LogLevel: centrifuge.LogLevelError,
LogLevel: centrifuge.LogLevelInfo,
LogHandler: handleLog,
HistoryMetaTTL: 24 * time.Hour,
})

redisShardConfigs := []centrifuge.RedisShardConfig{
{Address: "localhost:6379"},
//{Address: "localhost:6380"},
}
var redisShards []*centrifuge.RedisShard
for _, redisConf := range redisShardConfigs {
redisShard, err := centrifuge.NewRedisShard(node, redisConf)
if err != nil {
log.Fatal(err)
}
redisShards = append(redisShards, redisShard)
}

broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{
// And configure a couple of shards to use.
Shards: redisShards,
})
if err != nil {
log.Fatal(err)
}
node.SetBroker(broker)

node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
cred, _ := centrifuge.GetCredentials(ctx)
return centrifuge.ConnectReply{
Expand All @@ -110,54 +83,33 @@ func main() {
"#" + cred.UserID: {
EnableRecovery: true,
EmitPresence: true,
//EmitJoinLeave: true,
//PushJoinLeave: true,
EmitJoinLeave: true,
PushJoinLeave: true,
},
},
}, nil
})

go func() {
for {
time.Sleep(1000 * time.Millisecond)
fmt.Println("NUM CLIENT-SIDE SUBSCRIBERS", node.Hub().NumSubscribers(exampleChannel))
fmt.Println("NUM SERVER-SIDE SUBSCRIBERS", node.Hub().NumSubscribers("#42"))
fmt.Println("NUM CLIENTS", node.Hub().NumClients())
}
}()

go func() {
for {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
err := node.Unsubscribe("42", exampleChannel, centrifuge.WithCustomUnsubscribe(centrifuge.Unsubscribe{
Code: centrifuge.UnsubscribeCodeInsufficient,
}))
if err != nil {
log.Printf("error unsubscribing from channel: %s", err)
}
}
}()

go func() {
for {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
err := node.Unsubscribe("42", "#42", centrifuge.WithCustomUnsubscribe(centrifuge.Unsubscribe{
Code: centrifuge.UnsubscribeCodeInsufficient,
}))
if err != nil {
log.Printf("error unsubscribing from channel: %s", err)
}
}
}()

node.OnConnect(func(client *centrifuge.Client) {
transport := client.Transport()
log.Printf("[user %s] connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())

// Event handler should not block, so start separate goroutine to
// periodically send messages to client.
go func() {
if rand.Intn(100) > 50 {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
client.Disconnect(centrifuge.DisconnectForceReconnect)
for {
select {
case <-client.Context().Done():
return
case <-time.After(5 * time.Second):
err := client.Send([]byte(`{"time": "` + strconv.FormatInt(time.Now().Unix(), 10) + `"}`))
if err != nil {
if err == io.EOF {
return
}
log.Printf("error sending message: %s", err)
}
}
}
}()

Expand All @@ -170,24 +122,20 @@ func main() {
})

client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
log.Printf("client %s [user %s] subscribes on %s", client.ID(), client.UserID(), e.Channel)
log.Printf("[user %s] subscribes on %s", client.UserID(), e.Channel)

if !channelSubscribeAllowed(e.Channel) {
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied)
return
}

time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

log.Printf("client %s [user %s] allowed to subscribe on %s", client.ID(), client.UserID(), e.Channel)

cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
EnableRecovery: true,
EmitPresence: true,
//EmitJoinLeave: true,
//PushJoinLeave: true,
Data: []byte(`{"msg": "welcome"}`),
EmitJoinLeave: true,
PushJoinLeave: true,
Data: []byte(`{"msg": "welcome"}`),
},
}, nil)
})
Expand Down
4 changes: 0 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"hash/fnv"
"math/rand"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -977,9 +976,6 @@ func (n *Node) removeClient(c *Client) error {
// addSubscription registers subscription of connection on channel in both
// Hub and Broker.
func (n *Node) addSubscription(ch string, sub subInfo) error {
if rand.Intn(100) > 50 {
return errors.New("boom")
}
n.metrics.incActionCount("add_subscription")
mu := n.subLock(ch)
mu.Lock()
Expand Down

0 comments on commit 1b95e05

Please sign in to comment.