From b8e6bdb5bc0ccdb17b60034d1b2bee0b46b869b7 Mon Sep 17 00:00:00 2001 From: FZambia Date: Sat, 10 Aug 2024 16:58:03 +0300 Subject: [PATCH] keep example --- _examples/custom_engine_tarantool/index.html | 136 +++ _examples/custom_engine_tarantool/main.go | 203 +++++ _examples/custom_engine_tarantool/readme.md | 111 +++ .../tntengine/broker.go | 837 ++++++++++++++++++ .../tntengine/broker_test.go | 445 ++++++++++ .../tntengine/multi_conn.go | 185 ++++ .../tntengine/presence.go | 181 ++++ .../tntengine/shard.go | 133 +++ .../custom_engine_tarantool/tntengine/util.go | 109 +++ .../tntserver/centrifuge.lua | 316 +++++++ .../tntserver/docker-compose.yaml | 26 + .../custom_engine_tarantool/tntserver/ha.lua | 58 ++ .../tntserver/init.lua | 31 + .../tntserver/readme.md | 17 + _examples/go.mod | 5 +- _examples/go.sum | 6 + 16 files changed, 2798 insertions(+), 1 deletion(-) create mode 100644 _examples/custom_engine_tarantool/index.html create mode 100644 _examples/custom_engine_tarantool/main.go create mode 100644 _examples/custom_engine_tarantool/readme.md create mode 100644 _examples/custom_engine_tarantool/tntengine/broker.go create mode 100644 _examples/custom_engine_tarantool/tntengine/broker_test.go create mode 100644 _examples/custom_engine_tarantool/tntengine/multi_conn.go create mode 100644 _examples/custom_engine_tarantool/tntengine/presence.go create mode 100644 _examples/custom_engine_tarantool/tntengine/shard.go create mode 100644 _examples/custom_engine_tarantool/tntengine/util.go create mode 100644 _examples/custom_engine_tarantool/tntserver/centrifuge.lua create mode 100644 _examples/custom_engine_tarantool/tntserver/docker-compose.yaml create mode 100644 _examples/custom_engine_tarantool/tntserver/ha.lua create mode 100644 _examples/custom_engine_tarantool/tntserver/init.lua create mode 100644 _examples/custom_engine_tarantool/tntserver/readme.md diff --git a/_examples/custom_engine_tarantool/index.html b/_examples/custom_engine_tarantool/index.html new file mode 100644 index 00000000..7d927857 --- /dev/null +++ b/_examples/custom_engine_tarantool/index.html @@ -0,0 +1,136 @@ + + + + + + + + + + +
+ + +
+ + + diff --git a/_examples/custom_engine_tarantool/main.go b/_examples/custom_engine_tarantool/main.go new file mode 100644 index 00000000..b448b58c --- /dev/null +++ b/_examples/custom_engine_tarantool/main.go @@ -0,0 +1,203 @@ +package main + +import ( + "context" + "flag" + "log" + "net/http" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + _ "net/http/pprof" + + "github.com/centrifugal/centrifuge" + "github.com/centrifugal/centrifuge/_examples/custom_engine_tarantool/tntengine" +) + +var ( + port = flag.Int("port", 8000, "Port to bind app to") + sharded = flag.Bool("sharded", false, "Start sharded example") + ha = flag.Bool("ha", false, "Start high availability example") + raft = flag.Bool("raft", false, "Using Raft-based replication") + user = flag.String("user", "guest", "Connection user") + password = flag.String("password", "", "Connection password") + addresses = flag.String("addresses", "", "Configure Tarantool addresses (by default we use hardcoded here)") +) + +func handleLog(e centrifuge.LogEntry) { + log.Printf("[centrifuge] %s: %v", e.Message, e.Fields) +} + +func authMiddleware(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + ctx = centrifuge.SetCredentials(ctx, ¢rifuge.Credentials{ + UserID: "42", + Info: []byte(`{"name": "Alexander"}`), + }) + r = r.WithContext(ctx) + h.ServeHTTP(w, r) + }) +} + +func waitExitSignal(n *centrifuge.Node) { + sigCh := make(chan os.Signal, 1) + done := make(chan bool, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + _ = n.Shutdown(context.Background()) + done <- true + }() + <-done +} + +func main() { + flag.Parse() + + node, _ := centrifuge.New(centrifuge.Config{ + LogLevel: centrifuge.LogLevelDebug, + LogHandler: handleLog, + }) + + node.OnConnect(func(client *centrifuge.Client) { + transport := client.Transport() + log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol()) + + client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { + log.Printf("user %s subscribes on %s", client.UserID(), e.Channel) + cb(centrifuge.SubscribeReply{ + Options: centrifuge.SubscribeOptions{ + EmitPresence: true, + EmitJoinLeave: true, + PushJoinLeave: true, + EnableRecovery: true, + }, + }, nil) + }) + + client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) { + log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel) + }) + + client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) { + log.Printf("user %s publishes into channel %s: %s", client.UserID(), e.Channel, string(e.Data)) + cb(centrifuge.PublishReply{ + Options: centrifuge.PublishOptions{ + HistorySize: 10, + HistoryTTL: 10 * time.Minute, + }, + }, nil) + }) + + client.OnPresence(func(e centrifuge.PresenceEvent, cb centrifuge.PresenceCallback) { + log.Printf("user %s calls presence on %s", client.UserID(), e.Channel) + if !client.IsSubscribed(e.Channel) { + cb(centrifuge.PresenceReply{}, centrifuge.ErrorPermissionDenied) + return + } + cb(centrifuge.PresenceReply{}, nil) + }) + + client.OnPresenceStats(func(e centrifuge.PresenceStatsEvent, cb centrifuge.PresenceStatsCallback) { + log.Printf("user %s calls presence stats on %s", client.UserID(), e.Channel) + if !client.IsSubscribed(e.Channel) { + cb(centrifuge.PresenceStatsReply{}, centrifuge.ErrorPermissionDenied) + return + } + cb(centrifuge.PresenceStatsReply{}, nil) + }) + + client.OnDisconnect(func(e centrifuge.DisconnectEvent) { + log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect) + }) + }) + + // Single Tarantool. + mode := tntengine.ConnectionModeSingleInstance + shardAddresses := [][]string{ + {"127.0.0.1:3301"}, + } + + if *ha { + if *raft { + // Single Tarantool RS with automatic leader election with Raft (Tarantool >= 2.7.0). + shardAddresses = [][]string{ + {"127.0.0.1:3301", "127.0.0.1:3302", "127.0.0.1:3303"}, + } + mode = tntengine.ConnectionModeLeaderFollowerRaft + } else { + // Single Tarantool RS with automatic leader election (ex. in Cartridge). + shardAddresses = [][]string{ + {"127.0.0.1:3301", "127.0.0.1:3302"}, + } + mode = tntengine.ConnectionModeLeaderFollower + } + } else if *sharded { + // Client-side sharding between two Tarantool instances (without HA). + shardAddresses = [][]string{ + {"127.0.0.1:3301"}, + {"127.0.0.1:3302"}, + } + } + + if *addresses != "" { + var customShardAddresses [][]string + shardParts := strings.Split(*addresses, " ") + for _, shardPart := range shardParts { + customShardAddresses = append(customShardAddresses, strings.Split(shardPart, ",")) + } + shardAddresses = customShardAddresses + } + + var shards []*tntengine.Shard + for _, addresses := range shardAddresses { + shard, err := tntengine.NewShard(tntengine.ShardConfig{ + Addresses: addresses, + User: *user, + Password: *password, + ConnectionMode: mode, + }) + if err != nil { + log.Fatal(err) + } + shards = append(shards, shard) + } + + broker, err := tntengine.NewBroker(node, tntengine.BrokerConfig{ + UsePolling: false, + Shards: shards, + }) + if err != nil { + log.Fatal(err) + } + node.SetBroker(broker) + + presenceManager, err := tntengine.NewPresenceManager(node, tntengine.PresenceManagerConfig{ + Shards: shards, + }) + if err != nil { + log.Fatal(err) + } + node.SetPresenceManager(presenceManager) + + if err := node.Run(); err != nil { + log.Fatal(err) + } + + http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{}))) + http.Handle("/", http.FileServer(http.Dir("./"))) + + go func() { + if err := http.ListenAndServe(":"+strconv.Itoa(*port), nil); err != nil { + log.Fatal(err) + } + }() + + waitExitSignal(node) + log.Println("bye!") +} diff --git a/_examples/custom_engine_tarantool/readme.md b/_examples/custom_engine_tarantool/readme.md new file mode 100644 index 00000000..4a256e14 --- /dev/null +++ b/_examples/custom_engine_tarantool/readme.md @@ -0,0 +1,111 @@ +This example shows how to use custom engine implementation based on [Tarantool](https://www.tarantool.io/en/): i.e. it provides `Broker` and `PresenceManager`. + +Tarantool provides faster history and presence operations than Redis (up to 10x), while being on pair in subscribe and publish performance. + +**Important limitation to know**: Tarantool Broker uses channels that start with `__` (two underscores) for internal needs and does not allow subscribing on them from the outside. + +Tarantool Broker PUB/SUB can be customized to work over PUSH or POLL. + +**Since this is just an example – we do not guarantee any stability here**. This implementation have not been tested in production environment. + +## Single Tarantool instance example + +Go to `tntserver` dir and install `indexpiration` rock dependency: + +``` +tarantoolctl rocks install https://raw.githubusercontent.com/moonlibs/indexpiration/master/rockspecs/indexpiration-scm-1.rockspec +``` + +Create a directory for Tarantool `snap` and `xlog` files: + +``` +mkdir tnt1 +``` + +Start Tarantool: + +``` +tarantool init.lua 1 +``` + +In another terminal start chat application running the following command from example directory: + +``` +go run main.go +``` + +Go to http://localhost:8000. You will see simple chat app, try writing a chat message in one browser tab – you should see it appears in another tab. + +## Client-side sharding + +Go to `tntserver` dir and create directories for Tarantool files: + +``` +mkdir tnt1 +mkdir tnt2 +``` + +Start first Tarantool instance: + +``` +tarantool init.lua 1 +``` + +Then second one: + +``` +tarantool init.lua 2 +``` + +Now run first application instance on port 8000: + +``` +go run main.go --port 8000 --sharded +``` + +The second one on port 8001: + +``` +go run main.go --port 8001 --sharded +``` + +Go to http://localhost:8000 or http://localhost:8001 – nodes will be connected over Tarantool and data is consistently sharded between 2 different Tarantool instances (by a channel). + +## Tarantool high availability example + +Run Tarantool with leader-follower setup with Cartridge (using `127.0.0.1:3301` and `127.0.0.1:3302`). Then start application example: + +``` +go run main.go --port 8000 --ha +``` + +## Tarantool Raft high availability example + +**Requires Tarantool 2.6.1+ since example uses Raft-based replication**. + +Create directories for Tarantool files inside `tntserver` folder: + +``` +mkdir ha_tnt1 ha_tnt2 ha_tnt3 +``` + +Run Tarantool cluster: + +``` +docker-compose up +``` + +Then start application: + +``` +go run main.go --port 8000 --ha --raft +``` + +At this moment you can temporary stop/run one of Tarantool instances using: + +``` +docker-compose pause tnt1 +docker-compose unpause tnt1 +``` + +See that chat application continues to work after a short downtime. diff --git a/_examples/custom_engine_tarantool/tntengine/broker.go b/_examples/custom_engine_tarantool/tntengine/broker.go new file mode 100644 index 00000000..097532a1 --- /dev/null +++ b/_examples/custom_engine_tarantool/tntengine/broker.go @@ -0,0 +1,837 @@ +package tntengine + +import ( + "context" + "errors" + "fmt" + "runtime" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/FZambia/tarantool" + "github.com/centrifugal/centrifuge" + "github.com/centrifugal/protocol" + "github.com/google/uuid" + "github.com/vmihailenco/msgpack/v5" +) + +const internalChannelPrefix = "__" + +const ( + // tarantoolControlChannel is a name for control channel. + tarantoolControlChannel = internalChannelPrefix + "control" + // tarantoolNodeChannelPrefix is a prefix for node channel. + tarantoolNodeChannelPrefix = internalChannelPrefix + "node." +) + +// Broker uses Tarantool to implement centrifuge.Broker functionality. +type Broker struct { + controlRound uint64 // Keep atomic on struct top for 32-bit architectures. + node *centrifuge.Node + sharding bool + config BrokerConfig + shards []*Shard + nodeChannel string +} + +var _ centrifuge.Broker = (*Broker)(nil) + +// BrokerConfig is a config for Tarantool Broker. +type BrokerConfig struct { + // HistoryMetaTTL sets a time of stream meta key expiration in Tarantool. Stream + // meta key is a Tarantool HASH that contains top offset in channel and epoch value. + // By default, stream meta keys do not expire. + HistoryMetaTTL time.Duration + + // UsePolling allows turning on polling mode instead of push. + UsePolling bool + + // Shards is a list of Tarantool instances to shard data by channel. + Shards []*Shard +} + +// NewBroker initializes Tarantool Broker. +func NewBroker(n *centrifuge.Node, config BrokerConfig) (*Broker, error) { + if len(config.Shards) == 0 { + return nil, errors.New("no Tarantool shards provided in configuration") + } + if len(config.Shards) > 1 { + n.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, fmt.Sprintf("Tarantool sharding enabled: %d shards", len(config.Shards)))) + } + e := &Broker{ + node: n, + shards: config.Shards, + config: config, + sharding: len(config.Shards) > 1, + nodeChannel: nodeChannel(n.ID()), + } + return e, nil +} + +// Run runs broker after node initialized. +func (b *Broker) Run(h centrifuge.BrokerEventHandler) error { + for _, shard := range b.shards { + if err := b.runShard(shard, h); err != nil { + return err + } + } + return nil +} + +func (b *Broker) runForever(fn func(), minDelay time.Duration) { + for { + started := time.Now() + fn() + elapsed := time.Since(started) + if elapsed < minDelay { + // Sleep for a while to prevent busy loop when reconnecting. + // If elapsed >= minDelay then fn will be restarted right away – this is + // intentional for fast reconnect in case of one random error. + time.Sleep(minDelay - elapsed) + } + } +} + +const pubSubRoutineMinDelay = 300 * time.Millisecond + +// Run Tarantool shard. +func (b *Broker) runShard(s *Shard, h centrifuge.BrokerEventHandler) error { + go b.runForever(func() { + b.runPubSub(s, h) + }, pubSubRoutineMinDelay) + go b.runForever(func() { + b.runControlPubSub(s, h) + }, pubSubRoutineMinDelay) + return nil +} + +type pubRequest struct { + MsgType string + Channel string + Data string + Info string + HistoryTTL int + HistorySize int + HistoryMetaTTL int +} + +type pubResponse struct { + Offset uint64 + Epoch string +} + +func (m *pubResponse) DecodeMsgpack(d *msgpack.Decoder) error { + var err error + var l int + if l, err = d.DecodeArrayLen(); err != nil { + return err + } + if l != 2 { + return fmt.Errorf("malformed array len: %d", l) + } + if m.Offset, err = d.DecodeUint64(); err != nil { + return err + } + if m.Epoch, err = d.DecodeString(); err != nil { + return err + } + return nil +} + +// Publish - see centrifuge.Broker interface description. +func (b *Broker) Publish(ch string, data []byte, opts centrifuge.PublishOptions) (centrifuge.StreamPosition, bool, error) { + s := consistentShard(ch, b.shards) + pr := &pubRequest{ + MsgType: "p", + Channel: ch, + Data: string(data), + Info: b.clientInfoString(opts.ClientInfo), + HistoryTTL: int(opts.HistoryTTL.Seconds()), + HistorySize: opts.HistorySize, + HistoryMetaTTL: int(b.config.HistoryMetaTTL.Seconds()), + } + var resp pubResponse + err := s.ExecTyped(tarantool.Call("centrifuge.publish", pr), &resp) + if err != nil { + return centrifuge.StreamPosition{}, false, err + } + return centrifuge.StreamPosition{Offset: resp.Offset, Epoch: resp.Epoch}, false, err +} + +// PublishJoin - see centrifuge.Broker interface description. +func (b *Broker) PublishJoin(ch string, info *centrifuge.ClientInfo) error { + s := consistentShard(ch, b.shards) + pr := pubRequest{ + MsgType: "j", + Channel: ch, + Info: b.clientInfoString(info), + } + _, err := s.Exec(tarantool.Call("centrifuge.publish", pr)) + return err +} + +// PublishLeave - see centrifuge.Broker interface description. +func (b *Broker) PublishLeave(ch string, info *centrifuge.ClientInfo) error { + s := consistentShard(ch, b.shards) + pr := pubRequest{ + MsgType: "l", + Channel: ch, + Info: b.clientInfoString(info), + } + _, err := s.Exec(tarantool.Call("centrifuge.publish", pr)) + return err +} + +func (b *Broker) clientInfoString(clientInfo *centrifuge.ClientInfo) string { + var info string + if clientInfo != nil { + byteMessage, err := infoToProto(clientInfo).MarshalVT() + if err != nil { + return info + } + info = string(byteMessage) + } + return info +} + +// PublishControl - see centrifuge.Broker interface description. +func (b *Broker) PublishControl(data []byte, nodeID, _ string) error { + currentRound := atomic.AddUint64(&b.controlRound, 1) + index := currentRound % uint64(len(b.shards)) + var channel string + if nodeID != "" { + channel = nodeChannel(nodeID) + } else { + channel = b.controlChannel() + } + pr := pubRequest{ + MsgType: "c", + Channel: channel, + Data: string(data), + } + _, err := b.shards[index].Exec(tarantool.Call("centrifuge.publish", pr)) + return err +} + +func (b *Broker) controlChannel() string { + return tarantoolControlChannel +} + +func nodeChannel(nodeID string) string { + return tarantoolNodeChannelPrefix + nodeID +} + +// Subscribe - see centrifuge.Broker interface description. +func (b *Broker) Subscribe(ch string) error { + if strings.HasPrefix(ch, internalChannelPrefix) { + return centrifuge.ErrorBadRequest + } + if b.node.LogEnabled(centrifuge.LogLevelDebug) { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "subscribe node on channel", map[string]any{"channel": ch})) + } + r := newSubRequest([]string{ch}, true) + s := b.shards[consistentIndex(ch, len(b.shards))] + return b.sendSubscribe(s, r) +} + +// Unsubscribe - see centrifuge.Broker interface description. +func (b *Broker) Unsubscribe(ch string) error { + if b.node.LogEnabled(centrifuge.LogLevelDebug) { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "unsubscribe node from channel", map[string]any{"channel": ch})) + } + r := newSubRequest([]string{ch}, false) + s := b.shards[consistentIndex(ch, len(b.shards))] + return b.sendSubscribe(s, r) +} + +var errOpTimeout = errors.New("operation timed out") + +func (b *Broker) sendSubscribe(shard *Shard, r subRequest) error { + select { + case shard.subCh <- r: + default: + timer := AcquireTimer(defaultRequestTimeout) + defer ReleaseTimer(timer) + select { + case shard.subCh <- r: + case <-timer.C: + return errOpTimeout + } + } + return r.result() +} + +type historyRequest struct { + Channel string + Offset uint64 + Limit int + Reverse bool + IncludePubs bool + HistoryMetaTTL int +} + +type historyResponse struct { + Offset uint64 + Epoch string + Pubs []*centrifuge.Publication +} + +func (m *historyResponse) DecodeMsgpack(d *msgpack.Decoder) error { + var err error + var l int + if l, err = d.DecodeArrayLen(); err != nil { + return err + } + if l != 3 { + return fmt.Errorf("malformed array len: %d", l) + } + if m.Offset, err = d.DecodeUint64(); err != nil { + return err + } + if m.Epoch, err = d.DecodeString(); err != nil { + return err + } + if l, err = d.DecodeArrayLen(); err != nil { + return err + } + if l == -1 { + return nil + } + + pubs := make([]*centrifuge.Publication, 0, l) + + for i := 0; i < l; i++ { + var pub centrifuge.Publication + var l int + if l, err = d.DecodeArrayLen(); err != nil { + return err + } + if l != 6 { + return fmt.Errorf("malformed array len: %d", l) + } + if _, err = d.DecodeUint64(); err != nil { + return err + } + if _, err = d.DecodeString(); err != nil { + return err + } + if pub.Offset, err = d.DecodeUint64(); err != nil { + return err + } + if _, err = d.DecodeFloat64(); err != nil { + return err + } + if data, err := d.DecodeString(); err != nil { + return err + } else { + if len(data) > 0 { + pub.Data = []byte(data) + } + } + if info, err := d.DecodeString(); err != nil { + return err + } else { + if len(info) > 0 { + var i protocol.ClientInfo + if err = i.UnmarshalVT([]byte(info)); err != nil { + return err + } + pub.Info = infoFromProto(&i) + } + } + pubs = append(pubs, &pub) + } + m.Pubs = pubs + return nil +} + +// History - see centrifuge.Broker interface description. +func (b *Broker) History(ch string, opts centrifuge.HistoryOptions) ([]*centrifuge.Publication, centrifuge.StreamPosition, error) { + filter := opts.Filter + var includePubs = true + var offset uint64 + if filter.Since != nil { + if filter.Reverse { + offset = filter.Since.Offset - 1 + } else { + offset = filter.Since.Offset + 1 + } + } + var limit int + if filter.Limit == 0 { + includePubs = false + } + if filter.Limit > 0 { + limit = filter.Limit + } + historyMetaTTLSeconds := int(b.config.HistoryMetaTTL.Seconds()) + s := consistentShard(ch, b.shards) + req := historyRequest{ + Channel: ch, + Offset: offset, + Limit: limit, + Reverse: filter.Reverse, + IncludePubs: includePubs, + HistoryMetaTTL: historyMetaTTLSeconds, + } + var resp historyResponse + err := s.ExecTyped(tarantool.Call("centrifuge.history", req), &resp) + if err != nil { + return nil, centrifuge.StreamPosition{}, err + } + streamPosition := centrifuge.StreamPosition{Offset: resp.Offset, Epoch: resp.Epoch} + return resp.Pubs, streamPosition, nil +} + +type removeHistoryRequest struct { + Channel string +} + +// RemoveHistory - see centrifuge.Broker interface description. +func (b *Broker) RemoveHistory(ch string) error { + s := consistentShard(ch, b.shards) + _, err := s.Exec(tarantool.Call("centrifuge.remove_history", removeHistoryRequest{Channel: ch})) + return err +} + +const ( + // tarantoolPubSubWorkerChannelSize sets buffer size of channel to which we send all + // messages received from Tarantool PUB/SUB connection to process in separate goroutine. + tarantoolPubSubWorkerChannelSize = 512 + // tarantoolSubscribeBatchLimit is a maximum number of channels to include in a single + // batch subscribe call. + tarantoolSubscribeBatchLimit = 512 +) + +func (b *Broker) getShard(channel string) *Shard { + if !b.sharding { + return b.shards[0] + } + return b.shards[consistentIndex(channel, len(b.shards))] +} + +type pollRequest struct { + ConnID string + UsePolling bool + Timeout int +} + +type subscribeRequest struct { + ConnID string + Channels []string +} + +type pubSubMessage struct { + Type string + Channel string + Offset uint64 + Epoch string + Data []byte + Info []byte +} + +func (m *pubSubMessage) DecodeMsgpack(d *msgpack.Decoder) error { + var err error + var l int + if l, err = d.DecodeArrayLen(); err != nil { + return err + } + if l != 6 { + return fmt.Errorf("wrong array len: %d", l) + } + if m.Type, err = d.DecodeString(); err != nil { + return err + } + if m.Channel, err = d.DecodeString(); err != nil { + return err + } + if m.Offset, err = d.DecodeUint64(); err != nil { + return err + } + if m.Epoch, err = d.DecodeString(); err != nil { + return err + } + if data, err := d.DecodeString(); err != nil { + return err + } else { + m.Data = []byte(data) + } + if info, err := d.DecodeString(); err != nil { + return err + } else { + m.Info = []byte(info) + } + return nil +} + +func (b *Broker) runPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler) { + logError := func(errString string) { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart pub/sub", map[string]any{"error": errString})) + } + + u, err := uuid.NewRandom() + if err != nil { + logError(err.Error()) + return + } + connID := u.String() + + conn, cancel, err := s.pubSubConn() + if err != nil { + logError(err.Error()) + return + } + defer cancel() + defer func() { _ = conn.Close() }() + + // Register poller with unique ID. + result, err := conn.Exec(tarantool.Call("centrifuge.get_messages", pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 0})) + if err != nil { + logError(err.Error()) + return + } + if result.Error != "" { + logError(result.Error) + return + } + + numWorkers := runtime.NumCPU() + + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, fmt.Sprintf("running Tarantool PUB/SUB, num workers: %d", numWorkers))) + defer func() { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "stopping Tarantool PUB/SUB")) + }() + + done := make(chan struct{}) + var doneOnce sync.Once + closeDoneOnce := func() { + doneOnce.Do(func() { + close(done) + _ = conn.Close() + }) + } + defer closeDoneOnce() + + // Run subscriber goroutine. + go func(conn *tarantool.Connection) { + for { + select { + case <-done: + return + case r := <-s.subCh: + isSubscribe := r.subscribe + channelBatch := []subRequest{r} + + chIDs := r.channels + + var otherR *subRequest + + loop: + for len(chIDs) < tarantoolSubscribeBatchLimit { + select { + case r := <-s.subCh: + if r.subscribe != isSubscribe { + // We can not mix subscribe and unsubscribe request into one batch + // so must stop here. As we consumed a subRequest value from channel + // we should take care of it later. + otherR = &r + break loop + } + channelBatch = append(channelBatch, r) + for _, ch := range r.channels { + chIDs = append(chIDs, ch) + } + default: + break loop + } + } + + var opErr error + if isSubscribe { + _, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) + opErr = err + } else { + _, err = conn.Exec(tarantool.Call("centrifuge.unsubscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) + opErr = err + } + + if opErr != nil { + for _, r := range channelBatch { + r.done(opErr) + } + if otherR != nil { + otherR.done(opErr) + } + // Close conn, this should cause Receive to return with err below + // and whole runPubSub method to restart. + closeDoneOnce() + return + } + for _, r := range channelBatch { + r.done(nil) + } + if otherR != nil { + chIDs := otherR.channels + var opErr error + if otherR.subscribe { + _, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) + opErr = err + } else { + _, err = conn.Exec(tarantool.Call("centrifuge.unsubscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) + opErr = err + } + if opErr != nil { + otherR.done(opErr) + // Close conn, this should cause Receive to return with err below + // and whole runPubSub method to restart. + closeDoneOnce() + return + } + otherR.done(nil) + } + } + } + }(conn) + + // Run workers to spread received message processing work over worker goroutines. + workers := make(map[int]chan pubSubMessage) + for i := 0; i < numWorkers; i++ { + workerCh := make(chan pubSubMessage, tarantoolPubSubWorkerChannelSize) + workers[i] = workerCh + go func(ch chan pubSubMessage) { + for { + select { + case <-done: + return + case n := <-ch: + err := b.handleMessage(eventHandler, n) + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling client message", map[string]any{"error": err.Error()})) + continue + } + } + } + }(workerCh) + } + + go func() { + var chIDs []string + + channels := b.node.Hub().Channels() + for i := 0; i < len(channels); i++ { + if b.getShard(channels[i]) == s { + chIDs = append(chIDs, channels[i]) + } + } + + batch := make([]string, 0) + + for i, ch := range chIDs { + if len(batch) > 0 && i%tarantoolSubscribeBatchLimit == 0 { + r := newSubRequest(batch, true) + err := b.sendSubscribe(s, r) + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]any{"error": err.Error()})) + closeDoneOnce() + return + } + batch = nil + } + batch = append(batch, ch) + } + if len(batch) > 0 { + r := newSubRequest(batch, true) + err := b.sendSubscribe(s, r) + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]any{"error": err.Error()})) + closeDoneOnce() + return + } + } + }() + + processPubSubMessages := func(messages []pubSubMessage) { + for _, msg := range messages { + // Add message to worker channel preserving message order - i.e. messages + // from the same channel will be processed in the same worker. + workers[index(msg.Channel, numWorkers)] <- msg + } + } + + for { + err := b.waitPubSubMessages(conn, connID, processPubSubMessages) + if err != nil { + logError(err.Error()) + return + } + } +} + +func (b *Broker) waitPubSubMessages(conn *tarantool.Connection, connID string, cb func([]pubSubMessage)) error { + if !b.config.UsePolling { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + _, err := conn.ExecContext(ctx, tarantool.Call( + "centrifuge.get_messages", + pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 25}, + ).WithPushTyped(func(decode func(any) error) { + var m [][]pubSubMessage + if err := decode(&m); err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding push", map[string]any{"error": err.Error()})) + return + } + if len(m) == 1 { + cb(m[0]) + } + })) + if err != nil { + return err + } + } else { + var m [][]pubSubMessage + err := conn.ExecTyped(tarantool.Call( + "centrifuge.get_messages", + pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 25}), + &m, + ) + if err != nil { + return err + } + if len(m) == 1 { + cb(m[0]) + } + } + return nil +} + +func (b *Broker) handleMessage(eventHandler centrifuge.BrokerEventHandler, msg pubSubMessage) error { + switch msg.Type { + case "p": + pub := ¢rifuge.Publication{ + Offset: msg.Offset, + Data: msg.Data, + } + if len(msg.Info) > 0 { + var info protocol.ClientInfo + err := info.UnmarshalVT(msg.Info) + if err == nil { + pub.Info = infoFromProto(&info) + } + } + _ = eventHandler.HandlePublication( + msg.Channel, pub, centrifuge.StreamPosition{Offset: msg.Offset, Epoch: msg.Epoch}, nil) + case "j": + var info protocol.ClientInfo + err := info.UnmarshalVT(msg.Info) + if err == nil { + _ = eventHandler.HandleJoin(msg.Channel, infoFromProto(&info)) + } + case "l": + var info protocol.ClientInfo + err := info.UnmarshalVT(msg.Info) + if err == nil { + _ = eventHandler.HandleLeave(msg.Channel, infoFromProto(&info)) + } + } + return nil +} + +func (b *Broker) runControlPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler) { + logError := func(errString string) { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart control pub/sub", map[string]any{"error": errString})) + } + + u, err := uuid.NewRandom() + if err != nil { + logError(err.Error()) + return + } + connID := u.String() + + conn, cancel, err := s.pubSubConn() + if err != nil { + logError(err.Error()) + return + } + defer cancel() + defer func() { _ = conn.Close() }() + + // Register poller with unique ID. + result, err := conn.Exec(tarantool.Call("centrifuge.get_messages", pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 0})) + if err != nil { + logError(err.Error()) + return + } + if result.Error != "" { + logError(result.Error) + return + } + + numWorkers := runtime.NumCPU() + + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, fmt.Sprintf("running Tarantool control PUB/SUB, num workers: %d", numWorkers))) + defer func() { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "stopping Tarantool control PUB/SUB")) + }() + + done := make(chan struct{}) + var doneOnce sync.Once + closeDoneOnce := func() { + doneOnce.Do(func() { + close(done) + _ = conn.Close() + }) + } + defer closeDoneOnce() + + // Run workers to spread message processing work over worker goroutines. + workCh := make(chan pubSubMessage) + for i := 0; i < numWorkers; i++ { + go func() { + for { + select { + case <-done: + return + case n := <-workCh: + err := eventHandler.HandleControl(n.Data) + if err != nil { + b.node.Log( + centrifuge.NewLogEntry( + centrifuge.LogLevelError, "error handling control message", + map[string]any{"error": err.Error()}, + ), + ) + continue + } + } + } + }() + } + + controlChannel := b.controlChannel() + result, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: []string{controlChannel, b.nodeChannel}})) + if err != nil || result.Error != "" { + if err != nil { + logError(err.Error()) + } else { + logError(result.Error) + } + return + } + + processPubSubMessages := func(messages []pubSubMessage) { + for _, msg := range messages { + workCh <- msg + } + } + + for { + err := b.waitPubSubMessages(conn, connID, processPubSubMessages) + if err != nil { + logError(err.Error()) + return + } + } +} diff --git a/_examples/custom_engine_tarantool/tntengine/broker_test.go b/_examples/custom_engine_tarantool/tntengine/broker_test.go new file mode 100644 index 00000000..b3ebc7b2 --- /dev/null +++ b/_examples/custom_engine_tarantool/tntengine/broker_test.go @@ -0,0 +1,445 @@ +package tntengine + +import ( + "context" + "log" + "strconv" + "testing" + "time" + + "github.com/centrifugal/centrifuge" + "github.com/centrifugal/protocol" + "github.com/stretchr/testify/require" +) + +func newTestTarantoolEngine(tb testing.TB) (*Broker, *PresenceManager) { + n, _ := centrifuge.New(centrifuge.Config{}) + var shards []*Shard + for _, port := range []string{"3301"} { + shard, err := NewShard(ShardConfig{Addresses: []string{"127.0.0.1:" + port}}) + if err != nil { + log.Fatal(err) + } + shards = append(shards, shard) + } + + broker, err := NewBroker(n, BrokerConfig{ + UsePolling: false, + Shards: shards, + }) + if err != nil { + tb.Fatal(err) + } + + presenceManager, err := NewPresenceManager(n, PresenceManagerConfig{ + Shards: shards, + }) + if err != nil { + tb.Fatal(err) + } + + n.SetBroker(broker) + n.SetPresenceManager(presenceManager) + err = n.Run() + if err != nil { + tb.Fatal(err) + } + return broker, presenceManager +} + +type recoverTest struct { + Name string + HistorySize int + HistoryLifetime int + NumPublications int + SinceOffset uint64 + NumRecovered int + Sleep int + Limit int + Recovered bool +} + +var recoverTests = []recoverTest{ + {"empty_stream", 10, 60, 0, 0, 0, 0, 0, true}, + {"from_position", 10, 60, 10, 8, 2, 0, 0, true}, + {"from_position_limited", 10, 60, 10, 5, 2, 0, 2, false}, + {"from_position_with_server_limit", 10, 60, 10, 5, 1, 0, 1, false}, + {"from_position_that_already_gone", 10, 60, 20, 8, 10, 0, 0, false}, + {"from_position_that_not_exist_yet", 10, 60, 20, 108, 0, 0, 0, false}, + {"same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true}, + {"empty_position_recover_expected", 10, 60, 4, 0, 4, 0, 0, true}, + {"from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false}, + {"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true}, +} + +func TestTarantoolClientSubscribeRecover(t *testing.T) { + for _, tt := range recoverTests { + t.Run(tt.Name, func(t *testing.T) { + testTarantoolClientSubscribeRecover(t, tt) + }) + } +} + +func nodeWithTarantoolBroker(tb testing.TB) *centrifuge.Node { + c := centrifuge.Config{} + return nodeWithTarantoolBrokerWithConfig(tb, c) +} + +func nodeWithTarantoolBrokerWithConfig(tb testing.TB, c centrifuge.Config) *centrifuge.Node { + n, err := centrifuge.New(c) + if err != nil { + tb.Fatal(err) + } + e, _ := newTestTarantoolEngine(tb) + n.SetBroker(e) + err = n.Run() + if err != nil { + tb.Fatal(err) + } + return n +} + +func pubToProto(pub *centrifuge.Publication) *protocol.Publication { + if pub == nil { + return nil + } + return &protocol.Publication{ + Offset: pub.Offset, + Data: pub.Data, + Info: infoToProto(pub.Info), + } +} + +func isRecovered(historyResult centrifuge.HistoryResult, cmdOffset uint64, cmdEpoch string) ([]*protocol.Publication, bool) { + latestOffset := historyResult.Offset + latestEpoch := historyResult.Epoch + + recoveredPubs := make([]*protocol.Publication, 0, len(historyResult.Publications)) + for _, pub := range historyResult.Publications { + protoPub := pubToProto(pub) + recoveredPubs = append(recoveredPubs, protoPub) + } + + nextOffset := cmdOffset + 1 + var recovered bool + if len(recoveredPubs) == 0 { + recovered = latestOffset == cmdOffset && latestEpoch == cmdEpoch + } else { + recovered = recoveredPubs[0].Offset == nextOffset && + recoveredPubs[len(recoveredPubs)-1].Offset == latestOffset && + latestEpoch == cmdEpoch + } + + return recoveredPubs, recovered +} + +// recoverHistory recovers publications since StreamPosition last seen by client. +func recoverHistory(node *centrifuge.Node, ch string, since centrifuge.StreamPosition, maxPublicationLimit int) (centrifuge.HistoryResult, error) { + limit := centrifuge.NoLimit + if maxPublicationLimit > 0 { + limit = maxPublicationLimit + } + return node.History(ch, centrifuge.WithLimit(limit), centrifuge.WithSince(&since)) +} + +func testTarantoolClientSubscribeRecover(t *testing.T, tt recoverTest) { + node := nodeWithTarantoolBroker(t) + defer func() { _ = node.Shutdown(context.Background()) }() + + channel := "test_recovery_tarantool_" + tt.Name + + for i := 1; i <= tt.NumPublications; i++ { + _, err := node.Publish(channel, []byte(`{"n": `+strconv.Itoa(i)+`}`), centrifuge.WithHistory(tt.HistorySize, time.Duration(tt.HistoryLifetime)*time.Second)) + require.NoError(t, err) + } + + time.Sleep(time.Duration(tt.Sleep) * time.Second) + + res, err := node.History(channel) + require.NoError(t, err) + streamTop := res.StreamPosition + + historyResult, err := recoverHistory(node, channel, centrifuge.StreamPosition{Offset: tt.SinceOffset, Epoch: streamTop.Epoch}, tt.Limit) + require.NoError(t, err) + recoveredPubs, recovered := isRecovered(historyResult, tt.SinceOffset, streamTop.Epoch) + require.Equal(t, tt.NumRecovered, len(recoveredPubs)) + require.Equal(t, tt.Recovered, recovered) +} + +func BenchmarkTarantoolPublish_OneChannel(b *testing.B) { + broker, _ := newTestTarantoolEngine(b) + rawData := []byte(`{"bench": true}`) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, err := broker.Publish("channel", rawData, centrifuge.PublishOptions{}) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkTarantoolPublish_OneChannel_Parallel(b *testing.B) { + broker, _ := newTestTarantoolEngine(b) + rawData := []byte(`{"bench": true}`) + b.SetParallelism(128) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, _, err := broker.Publish("channel", rawData, centrifuge.PublishOptions{}) + if err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkTarantoolSubscribe(b *testing.B) { + broker, _ := newTestTarantoolEngine(b) + j := 0 + b.ResetTimer() + for i := 0; i < b.N; i++ { + j++ + err := broker.Subscribe("subscribe" + strconv.Itoa(j)) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkTarantoolSubscribe_Parallel(b *testing.B) { + broker, _ := newTestTarantoolEngine(b) + i := 0 + b.SetParallelism(128) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + i++ + err := broker.Subscribe("subscribe" + strconv.Itoa(i)) + if err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkTarantoolRecover_OneChannel_Parallel(b *testing.B) { + broker, _ := newTestTarantoolEngine(b) + rawData := []byte("{}") + numMessages := 1000 + numMissing := 5 + for i := 1; i <= numMessages; i++ { + _, _, err := broker.Publish("channel", rawData, centrifuge.PublishOptions{HistorySize: numMessages, HistoryTTL: 300 * time.Second}) + require.NoError(b, err) + } + _, sp, err := broker.History("channel", centrifuge.HistoryOptions{}) + require.NoError(b, err) + b.ResetTimer() + b.SetParallelism(128) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + pubs, _, err := broker.History("channel", centrifuge.HistoryOptions{ + Filter: centrifuge.HistoryFilter{ + Limit: -1, + Since: ¢rifuge.StreamPosition{Offset: sp.Offset - uint64(numMissing), Epoch: ""}, + }, + }) + if err != nil { + b.Fatal(err) + } + if len(pubs) != numMissing { + b.Fatalf("len pubs: %d, expected: %d", len(pubs), numMissing) + } + } + }) +} + +func BenchmarkTarantoolPresence_OneChannel(b *testing.B) { + _, pm := newTestTarantoolEngine(b) + _ = pm.AddPresence("channel", "uid", ¢rifuge.ClientInfo{}) + b.ResetTimer() + for i := 0; i < b.N; i++ { + p, err := pm.Presence("channel") + if err != nil { + b.Fatal(err) + } + if len(p) != 1 { + b.Fatal("wrong presence len") + } + } +} + +func BenchmarkTarantoolPresence_OneChannel_Parallel(b *testing.B) { + _, pm := newTestTarantoolEngine(b) + b.SetParallelism(128) + _ = pm.AddPresence("channel", "uid", ¢rifuge.ClientInfo{}) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p, err := pm.Presence("channel") + if err != nil { + b.Fatal(err) + } + if len(p) != 1 { + b.Fatal("wrong presence len") + } + } + }) +} + +func BenchmarkTarantoolAddPresence_OneChannel(b *testing.B) { + _, pm := newTestTarantoolEngine(b) + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := pm.AddPresence("channel", "uid", ¢rifuge.ClientInfo{}) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkTarantoolAddPresence_OneChannel_Parallel(b *testing.B) { + _, pm := newTestTarantoolEngine(b) + b.SetParallelism(128) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + err := pm.AddPresence("channel", "uid", ¢rifuge.ClientInfo{}) + if err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkTarantoolPresenceStats_OneChannel_Parallel(b *testing.B) { + _, pm := newTestTarantoolEngine(b) + _ = pm.AddPresence("channel", "uid", ¢rifuge.ClientInfo{}) + b.SetParallelism(128) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p, err := pm.PresenceStats("channel") + if err != nil { + b.Fatal(err) + } + if p.NumClients != 1 { + b.Fatal("wrong presence stats") + } + } + }) +} + +const historyIterationChannel = "test" + +type historyIterationTest struct { + NumMessages int + IterateBy int +} + +func (it *historyIterationTest) prepareHistoryIteration(t testing.TB, node *centrifuge.Node) centrifuge.StreamPosition { + numMessages := it.NumMessages + + channel := historyIterationChannel + + historyResult, err := node.History(channel) + require.NoError(t, err) + startPosition := historyResult.StreamPosition + + for i := 1; i <= numMessages; i++ { + _, err := node.Publish(channel, []byte(`{}`), centrifuge.WithHistory(numMessages, time.Hour)) + require.NoError(t, err) + } + + historyResult, err = node.History(channel, centrifuge.WithLimit(centrifuge.NoLimit)) + require.NoError(t, err) + require.Equal(t, numMessages, len(historyResult.Publications)) + return startPosition +} + +func (it *historyIterationTest) testHistoryIteration(t testing.TB, node *centrifuge.Node, startPosition centrifuge.StreamPosition) { + var ( + n int + offset = startPosition.Offset + epoch = startPosition.Epoch + iterateBy = it.IterateBy + ) + for { + res, err := node.History( + historyIterationChannel, + centrifuge.WithSince(¢rifuge.StreamPosition{Offset: offset, Epoch: epoch}), + centrifuge.WithLimit(iterateBy), + ) + if err != nil { + t.Fatal(err) + } + offset += uint64(iterateBy) + if len(res.Publications) == 0 { + break + } + n += len(res.Publications) + } + if n != it.NumMessages { + t.Fatal("num messages mismatch") + } +} + +func (it *historyIterationTest) testHistoryIterationReverse(t testing.TB, node *centrifuge.Node, startPosition centrifuge.StreamPosition) { + var ( + n int + offset = startPosition.Offset + epoch = startPosition.Epoch + iterateBy = it.IterateBy + ) + var since *centrifuge.StreamPosition +outer: + for { + res, err := node.History( + historyIterationChannel, + centrifuge.WithSince(since), + centrifuge.WithLimit(iterateBy), + centrifuge.WithReverse(true), + ) + if err != nil { + t.Fatal(err) + } + var checkOffset uint64 + loop: + for _, pub := range res.Publications { + n += 1 + if pub.Offset == startPosition.Offset+1 { + break outer + } + if checkOffset == 0 { + checkOffset = pub.Offset + continue loop + } + if pub.Offset > checkOffset { + t.Fatal("incorrect order") + } + checkOffset = pub.Offset + } + if len(res.Publications) == 0 || len(res.Publications) < iterateBy { + break + } + earliestPub := res.Publications[len(res.Publications)-1] + offset = earliestPub.Offset + since = ¢rifuge.StreamPosition{Offset: offset, Epoch: epoch} + } + if n != it.NumMessages { + t.Fatalf("num messages mismatch, expected %d, got %d", it.NumMessages, n) + } +} + +func TestMemoryBrokerHistoryIteration(t *testing.T) { + e, _ := newTestTarantoolEngine(t) + it := historyIterationTest{10000, 100} + startPosition := it.prepareHistoryIteration(t, e.node) + it.testHistoryIteration(t, e.node, startPosition) +} + +func TestMemoryBrokerHistoryIterationReverse(t *testing.T) { + e, _ := newTestTarantoolEngine(t) + it := historyIterationTest{10000, 100} + startPosition := it.prepareHistoryIteration(t, e.node) + it.testHistoryIterationReverse(t, e.node, startPosition) +} diff --git a/_examples/custom_engine_tarantool/tntengine/multi_conn.go b/_examples/custom_engine_tarantool/tntengine/multi_conn.go new file mode 100644 index 00000000..e10c863c --- /dev/null +++ b/_examples/custom_engine_tarantool/tntengine/multi_conn.go @@ -0,0 +1,185 @@ +package tntengine + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/FZambia/tarantool" +) + +type ConnectionMode int + +const ( + ConnectionModeSingleInstance ConnectionMode = 0 + ConnectionModeLeaderFollower ConnectionMode = 1 + ConnectionModeLeaderFollowerRaft ConnectionMode = 2 +) + +type MultiConnection struct { + opts MultiOpts + leaderMu sync.RWMutex + leaderAddr string + conns map[string]*tarantool.Connection + closeCh chan struct{} + closeOnce sync.Once +} + +type MultiOpts struct { + ConnectionMode ConnectionMode + LeaderCheckInterval time.Duration +} + +func Connect(addrs []string, opts tarantool.Opts, multiOpts MultiOpts) (*MultiConnection, error) { + conns, err := getConns(addrs, opts) + if err != nil { + return nil, err + } + mc := &MultiConnection{ + opts: multiOpts, + conns: conns, + closeCh: make(chan struct{}), + } + leaderFound := mc.checkLeaderOnce() + if !leaderFound { + return nil, ErrNoLeader + } + go mc.checkLeader() + return mc, nil +} + +var ErrNoLeader = errors.New("no leader") + +func (c *MultiConnection) NewLeaderConn(opts tarantool.Opts) (*tarantool.Connection, error) { + c.leaderMu.RLock() + if c.leaderAddr == "" { + c.leaderMu.RUnlock() + return nil, ErrNoLeader + } + c.leaderMu.RUnlock() + return tarantool.Connect(c.leaderAddr, opts) +} + +func (c *MultiConnection) LeaderChanged() { + if c.opts.ConnectionMode == ConnectionModeSingleInstance { + return + } + c.leaderMu.Lock() + defer c.leaderMu.Unlock() + c.leaderAddr = "" +} + +func (c *MultiConnection) LeaderConn() (*tarantool.Connection, error) { + c.leaderMu.RLock() + defer c.leaderMu.RUnlock() + if c.leaderAddr != "" { + return c.conns[c.leaderAddr], nil + } + return nil, ErrNoLeader +} + +func getConns(addrs []string, opts tarantool.Opts) (map[string]*tarantool.Connection, error) { + conns := map[string]*tarantool.Connection{} + var wg sync.WaitGroup + var connsMu sync.Mutex + var firstErr error + var numErrors int + wg.Add(len(addrs)) + for _, addr := range addrs { + go func(addr string) { + defer wg.Done() + conn, err := tarantool.Connect(addr, opts) + if err != nil { + connsMu.Lock() + if firstErr == nil { + firstErr = err + } + numErrors++ + connsMu.Unlock() + return + } + connsMu.Lock() + conns[addr] = conn + connsMu.Unlock() + }(addr) + + } + wg.Wait() + if numErrors == len(addrs) { + return nil, firstErr + } + return conns, nil +} + +func (c *MultiConnection) checkLeader() { + if c.opts.ConnectionMode == ConnectionModeSingleInstance { + return + } + checkInterval := c.opts.LeaderCheckInterval + if checkInterval == 0 { + checkInterval = time.Second + } + for { + select { + case <-c.closeCh: + return + case <-time.After(checkInterval): + c.checkLeaderOnce() + } + } +} + +func (c *MultiConnection) IsLeader(conn *tarantool.Connection) (bool, error) { + if c.opts.ConnectionMode == ConnectionModeSingleInstance { + return true, nil + } + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + leaderCheck := "return box.info.ro == false" + if c.opts.ConnectionMode == ConnectionModeLeaderFollowerRaft { + leaderCheck = "return box.info.election.state == 'leader'" + } + resp, err := conn.ExecContext(ctx, tarantool.Eval(leaderCheck, []any{})) + if err != nil { + return false, err + } + if len(resp.Data) < 1 { + return false, errors.New("unexpected leader check result") + } + isLeader, ok := resp.Data[0].(bool) + if !ok { + return false, errors.New("malformed leader check result") + } + return isLeader, nil +} + +func (c *MultiConnection) checkLeaderOnce() bool { + for addr, conn := range c.conns { + if len(c.conns) == 1 { + c.leaderAddr = addr + return true + } + isLeader, err := c.IsLeader(conn) + if err != nil { + continue + } + if isLeader { + c.leaderMu.Lock() + c.leaderAddr = addr + c.leaderMu.Unlock() + return true + } + } + return false +} + +func (c *MultiConnection) Close() error { + c.closeOnce.Do(func() { + close(c.closeCh) + for _, conn := range c.conns { + _ = conn.Close() + } + }) + return nil +} diff --git a/_examples/custom_engine_tarantool/tntengine/presence.go b/_examples/custom_engine_tarantool/tntengine/presence.go new file mode 100644 index 00000000..488fb8ba --- /dev/null +++ b/_examples/custom_engine_tarantool/tntengine/presence.go @@ -0,0 +1,181 @@ +package tntengine + +import ( + "errors" + "fmt" + "time" + + "github.com/FZambia/tarantool" + + "github.com/centrifugal/centrifuge" + "github.com/vmihailenco/msgpack/v5" +) + +// DefaultPresenceTTL is a default value for presence TTL in Tarantool. +const DefaultPresenceTTL = 60 * time.Second + +// PresenceManagerConfig is a config for Tarantool-based PresenceManager. +type PresenceManagerConfig struct { + // PresenceTTL is an interval how long to consider presence info + // valid after receiving presence update. This allows to automatically + // clean up unnecessary presence entries after TTL passed. + PresenceTTL time.Duration + + // Shards is a list of Tarantool instances to shard data by channel. + Shards []*Shard +} + +// NewPresenceManager initializes Tarantool-based centrifuge.PresenceManager. +func NewPresenceManager(n *centrifuge.Node, config PresenceManagerConfig) (*PresenceManager, error) { + if len(config.Shards) == 0 { + return nil, errors.New("no Tarantool shards provided in configuration") + } + if len(config.Shards) > 1 { + n.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, fmt.Sprintf("Tarantool sharding enabled: %d shards", len(config.Shards)))) + } + e := &PresenceManager{ + node: n, + shards: config.Shards, + config: config, + sharding: len(config.Shards) > 1, + } + return e, nil +} + +var _ centrifuge.PresenceManager = (*PresenceManager)(nil) + +// PresenceManager uses Tarantool to implement centrifuge.PresenceManager functionality. +type PresenceManager struct { + node *centrifuge.Node + sharding bool + config PresenceManagerConfig + shards []*Shard +} + +type presenceRequest struct { + Channel string +} + +func (m PresenceManager) Presence(ch string) (map[string]*centrifuge.ClientInfo, error) { + s := consistentShard(ch, m.shards) + res, err := s.Exec(tarantool.Call("centrifuge.presence", presenceRequest{Channel: ch})) + if err != nil { + return nil, err + } + if len(res.Data) == 0 { + return nil, errors.New("malformed presence result") + } + presenceInterfaceSlice, ok := res.Data[0].([]any) + if !ok { + return nil, errors.New("malformed presence format: map expected") + } + presence := make(map[string]*centrifuge.ClientInfo, len(presenceInterfaceSlice)) + for _, v := range presenceInterfaceSlice { + presenceRow, ok := v.([]any) + if !ok { + return nil, errors.New("malformed presence format: tuple expected") + } + clientID, ok := presenceRow[1].(string) + if !ok { + return nil, errors.New("malformed presence format: string client id expected") + } + userID, ok := presenceRow[2].(string) + if !ok { + return nil, errors.New("malformed presence format: string user id expected") + } + connInfo, ok := presenceRow[3].(string) + if !ok { + return nil, errors.New("malformed presence format: string conn info expected") + } + chanInfo, ok := presenceRow[4].(string) + if !ok { + return nil, errors.New("malformed presence format: string chan info expected") + } + ci := ¢rifuge.ClientInfo{ + ClientID: clientID, + UserID: userID, + } + if len(connInfo) > 0 { + ci.ConnInfo = []byte(connInfo) + } + if len(chanInfo) > 0 { + ci.ChanInfo = []byte(chanInfo) + } + presence[clientID] = ci + } + return presence, nil +} + +type presenceStatsRequest struct { + Channel string +} + +type presenceStatsResponse struct { + NumClients uint32 + NumUsers uint32 +} + +func (m *presenceStatsResponse) DecodeMsgpack(d *msgpack.Decoder) error { + var err error + var l int + if l, err = d.DecodeArrayLen(); err != nil { + return err + } + if l != 2 { + return fmt.Errorf("array len doesn't match: %d", l) + } + if m.NumClients, err = d.DecodeUint32(); err != nil { + return err + } + if m.NumUsers, err = d.DecodeUint32(); err != nil { + return err + } + return nil +} + +func (m PresenceManager) PresenceStats(ch string) (centrifuge.PresenceStats, error) { + s := consistentShard(ch, m.shards) + var resp presenceStatsResponse + err := s.ExecTyped(tarantool.Call("centrifuge.presence_stats", presenceStatsRequest{Channel: ch}), &resp) + if err != nil { + return centrifuge.PresenceStats{}, err + } + return centrifuge.PresenceStats{NumClients: int(resp.NumClients), NumUsers: int(resp.NumUsers)}, err +} + +type addPresenceRequest struct { + Channel string + TTL int + ClientID string + UserID string + ConnInfo string + ChanInfo string +} + +func (m PresenceManager) AddPresence(ch string, clientID string, info *centrifuge.ClientInfo) error { + s := consistentShard(ch, m.shards) + ttl := DefaultPresenceTTL + if m.config.PresenceTTL > 0 { + ttl = m.config.PresenceTTL + } + _, err := s.Exec(tarantool.Call("centrifuge.add_presence", addPresenceRequest{ + Channel: ch, + TTL: int(ttl.Seconds()), + ClientID: clientID, + UserID: info.UserID, + ConnInfo: string(info.ConnInfo), + ChanInfo: string(info.ChanInfo), + })) + return err +} + +type removePresenceRequest struct { + Channel string + ClientID string +} + +func (m PresenceManager) RemovePresence(ch string, clientID string, _ string) error { + s := consistentShard(ch, m.shards) + _, err := s.Exec(tarantool.Call("centrifuge.remove_presence", removePresenceRequest{Channel: ch, ClientID: clientID})) + return err +} diff --git a/_examples/custom_engine_tarantool/tntengine/shard.go b/_examples/custom_engine_tarantool/tntengine/shard.go new file mode 100644 index 00000000..649c65ef --- /dev/null +++ b/_examples/custom_engine_tarantool/tntengine/shard.go @@ -0,0 +1,133 @@ +package tntengine + +import ( + "context" + "fmt" + "time" + + "github.com/FZambia/tarantool" +) + +const ( + defaultConnectTimeout = time.Second + defaultRequestTimeout = time.Second + defaultReadTimeout = 5 * time.Second + defaultWriteTimeout = time.Second +) + +// Shard represents single Tarantool instance. +type Shard struct { + config ShardConfig + subCh chan subRequest + mc *MultiConnection +} + +// ShardConfig allows providing options to connect to Tarantool. +type ShardConfig struct { + // Addresses of Tarantool instances. + Addresses []string + // User for auth. + User string + // Password for auth. + Password string + // ConnectionMode for shard. + ConnectionMode ConnectionMode +} + +func NewShard(c ShardConfig) (*Shard, error) { + shard := &Shard{ + config: c, + subCh: make(chan subRequest), + } + + mc, err := Connect(c.Addresses, tarantool.Opts{ + ConnectTimeout: defaultConnectTimeout, + RequestTimeout: defaultRequestTimeout, + ReadTimeout: defaultReadTimeout, + WriteTimeout: defaultWriteTimeout, + ReconnectDelay: 50 * time.Millisecond, + User: c.User, + Password: c.Password, + SkipSchema: true, + }, MultiOpts{ + ConnectionMode: c.ConnectionMode, + }) + if err != nil { + return nil, fmt.Errorf("error creating req connection to %#v: %w", c.Addresses, err) + } + shard.mc = mc + return shard, nil +} + +func (s *Shard) Exec(request *tarantool.Request) (*tarantool.Response, error) { + conn, err := s.mc.LeaderConn() + if err != nil { + return nil, err + } + return conn.Exec(request) +} + +func (s *Shard) ExecTyped(request *tarantool.Request, result any) error { + conn, err := s.mc.LeaderConn() + if err != nil { + return err + } + return conn.ExecTyped(request, result) +} + +func (s *Shard) pubSubConn() (*tarantool.Connection, func(), error) { + conn, err := s.mc.NewLeaderConn(tarantool.Opts{ + ConnectTimeout: defaultConnectTimeout, + RequestTimeout: 5 * time.Second, + ReadTimeout: defaultReadTimeout, + WriteTimeout: defaultWriteTimeout, + ReconnectDelay: 0, + User: s.config.User, + Password: s.config.Password, + SkipSchema: true, + }) + if err != nil { + return nil, nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + ok, err := s.mc.IsLeader(conn) + if err != nil || !ok { + s.mc.LeaderChanged() + _ = conn.Close() + } + } + } + }() + return conn, cancel, nil +} + +// subRequest is an internal request to subscribe or unsubscribe from one or more channels +type subRequest struct { + channels []string + subscribe bool + err chan error +} + +// newSubRequest creates a new request to subscribe or unsubscribe form a channel. +func newSubRequest(chIDs []string, subscribe bool) subRequest { + return subRequest{ + channels: chIDs, + subscribe: subscribe, + err: make(chan error, 1), + } +} + +// done should only be called once for subRequest. +func (sr *subRequest) done(err error) { + sr.err <- err +} + +func (sr *subRequest) result() error { + return <-sr.err +} diff --git a/_examples/custom_engine_tarantool/tntengine/util.go b/_examples/custom_engine_tarantool/tntengine/util.go new file mode 100644 index 00000000..2873ccc3 --- /dev/null +++ b/_examples/custom_engine_tarantool/tntengine/util.go @@ -0,0 +1,109 @@ +package tntengine + +import ( + "hash/fnv" + "sync" + "time" + + "github.com/centrifugal/centrifuge" + "github.com/centrifugal/protocol" +) + +// index chooses bucket number in range [0, numBuckets). +func index(s string, numBuckets int) int { + if numBuckets == 1 { + return 0 + } + hash := fnv.New64a() + _, _ = hash.Write([]byte(s)) + return int(hash.Sum64() % uint64(numBuckets)) +} + +// consistentIndex is an adapted function from https://github.com/dgryski/go-jump +// package by Damian Gryski. It consistently chooses a hash bucket number in the +// range [0, numBuckets) for the given string. numBuckets must be >= 1. +func consistentIndex(s string, numBuckets int) int { + hash := fnv.New64a() + _, _ = hash.Write([]byte(s)) + key := hash.Sum64() + + var ( + b int64 = -1 + j int64 + ) + + for j < int64(numBuckets) { + b = j + key = key*2862933555777941757 + 1 + j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1))) + } + + return int(b) +} + +func consistentShard(ch string, shards []*Shard) *Shard { + if len(shards) == 1 { + return shards[0] + } + return shards[consistentIndex(ch, len(shards))] +} + +func infoToProto(v *centrifuge.ClientInfo) *protocol.ClientInfo { + if v == nil { + return nil + } + info := &protocol.ClientInfo{ + Client: v.ClientID, + User: v.UserID, + } + if len(v.ConnInfo) > 0 { + info.ConnInfo = v.ConnInfo + } + if len(v.ChanInfo) > 0 { + info.ChanInfo = v.ChanInfo + } + return info +} + +func infoFromProto(v *protocol.ClientInfo) *centrifuge.ClientInfo { + if v == nil { + return nil + } + info := ¢rifuge.ClientInfo{ + ClientID: v.GetClient(), + UserID: v.GetUser(), + } + if len(v.ConnInfo) > 0 { + info.ConnInfo = v.ConnInfo + } + if len(v.ChanInfo) > 0 { + info.ChanInfo = v.ChanInfo + } + return info +} + +var timerPool sync.Pool + +// AcquireTimer from pool. +func AcquireTimer(d time.Duration) *time.Timer { + v := timerPool.Get() + if v == nil { + return time.NewTimer(d) + } + + tm := v.(*time.Timer) + if tm.Reset(d) { + panic("Received an active timer from the pool!") + } + return tm +} + +// ReleaseTimer to pool. +func ReleaseTimer(tm *time.Timer) { + if !tm.Stop() { + // Do not reuse timer that has been already stopped. + // See https://groups.google.com/forum/#!topic/golang-nuts/-8O3AknKpwk + return + } + timerPool.Put(tm) +} diff --git a/_examples/custom_engine_tarantool/tntserver/centrifuge.lua b/_examples/custom_engine_tarantool/tntserver/centrifuge.lua new file mode 100644 index 00000000..aebfb3b2 --- /dev/null +++ b/_examples/custom_engine_tarantool/tntserver/centrifuge.lua @@ -0,0 +1,316 @@ +local clock = require 'clock' +local fiber = require 'fiber' +local log = require 'log' +local json = require 'json'.new() +local indexpiration = require 'indexpiration' + +--================================================================================ +-- Centrifuge Tarantool module, provides Broker and PresenceManager functionality. +--================================================================================ + +local centrifuge = {} + +centrifuge.init = function(opts) + if not opts then opts = {} end + log.info("Centrifuge init with opts: %s", json.encode(opts)) + + local pubs_temporary = opts.pubs_temporary or false + local meta_temporary = opts.meta_temporary or false + local presence_temporary = opts.presence_temporary or false + + box.schema.create_space('pubs', {if_not_exists = true; temporary = pubs_temporary}) + box.space.pubs:format( { + {name = 'id'; type = 'unsigned'}, + {name = 'channel'; type = 'string'}, + {name = 'offset'; type = 'unsigned'}, + {name = 'exp'; type = 'number'}, + {name = 'data'; type = 'string'}, + {name = 'info'; type = 'string'}, + }); + box.space.pubs:create_index('primary', { + parts = {{field='id', type='unsigned'}}; + if_not_exists = true; + }) + box.space.pubs:create_index('channel', { + parts = {{field='channel', type='string'}, {field='offset', type='unsigned'}}; + if_not_exists = true; + }) + box.space.pubs:create_index('exp', { + parts = {{field='exp', type='number'}, {field='id', type='unsigned'}}; + if_not_exists = true; + }) + + box.schema.create_space('meta', {if_not_exists = true; temporary = meta_temporary}) + box.space.meta:format({ + {name = 'channel'; type = 'string'}, + {name = 'offset'; type = 'unsigned'}, + {name = 'epoch'; type = 'string'}, + {name = 'exp'; type = 'number'}, + }); + box.space.meta:create_index('primary', { + parts = {{field='channel', type='string'}}; + if_not_exists = true; + }) + box.space.meta:create_index('exp', { + parts = {{field='exp', type='number'}, {field='channel', type='string'}}; + if_not_exists = true; + }) + + box.schema.create_space('presence', {if_not_exists = true; temporary = presence_temporary}) + box.space.presence:format({ + {name = 'channel'; type = 'string'}, + {name = 'client_id'; type = 'string'}, + {name = 'user_id'; type = 'string'}, + {name = 'conn_info'; type = 'string'}, + {name = 'chan_info'; type = 'string'}, + {name = 'exp'; type = 'number'}, + }); + box.space.presence:create_index('primary', { + parts = {{field='channel', type='string'}, {field='client_id', type='string'}}; + if_not_exists = true; + }) + box.space.presence:create_index('exp', { + parts = {{field='exp', type='number'}}; + if_not_exists = true; + }) + + indexpiration(box.space.pubs, { + field = 'exp'; + kind = 'time'; + precise = true; + on_delete = function(t) end + }) + + indexpiration(box.space.meta, { + field = 'exp'; + kind = 'time'; + precise = true; + on_delete = function(t) end + }) + + indexpiration(box.space.presence, { + field = 'exp'; + kind = 'time'; + precise = true; + on_delete = function(t) end + }) +end + +centrifuge.id_to_channels = {} +centrifuge.channel_to_ids = {} +centrifuge.id_to_messages = {} +centrifuge.id_to_fiber = {} + +box.session.on_connect(function() end) + +box.session.on_disconnect(function() + local id = box.session.storage.subscriber_id + if id then + local channelsById = centrifuge.id_to_channels[id] + if channelsById then + while next(channelsById) do + for key, _ in pairs(channelsById) do + centrifuge.channel_to_ids[key][id] = nil + if next(centrifuge.channel_to_ids[key]) == nil then + centrifuge.channel_to_ids[key] = nil + end + channelsById[key] = nil + end + end + centrifuge.id_to_channels[id] = nil + end + centrifuge.id_to_fiber[id]:close() + centrifuge.id_to_fiber[id] = nil + centrifuge.id_to_messages[id] = nil + end +end) + +function centrifuge.get_messages(id, use_polling, timeout) + if not box.session.storage.subscriber_id then + -- register poller connection. Connection will use this id + -- to register or remove subscriptions. + box.session.storage.subscriber_id = id + centrifuge.id_to_fiber[id] = fiber.channel() + return + end + box.session.storage.subscriber_id = id + if not timeout then timeout = 0 end + local now = fiber.time() + while true do + local messages = centrifuge.id_to_messages[id] + centrifuge.id_to_messages[id] = nil + if messages then + if use_polling then + return messages + else + local ok = box.session.push(messages) + if ok ~= true then + error("write error") + end + end + else + local left = (now + timeout) - fiber.time() + if left <= 0 then + -- timed out, poller will call get_messages again. + return + end + centrifuge.id_to_fiber[id]:get(left) + end + end +end + +function centrifuge.subscribe(id, channels) + for k,v in pairs(channels) do + local idChannels = centrifuge.id_to_channels[id] or {} + idChannels[v] = true + centrifuge.id_to_channels[id] = idChannels + + local channelIds = centrifuge.channel_to_ids[v] or {} + channelIds[id] = true + centrifuge.channel_to_ids[v] = channelIds + end +end + +function centrifuge.unsubscribe(id, channels) + for k,v in pairs(channels) do + if centrifuge.id_to_channels[id] then + centrifuge.id_to_channels[id][v] = nil + end + if centrifuge.channel_to_ids[v] then + centrifuge.channel_to_ids[v][id] = nil + end + if centrifuge.id_to_channels[id] then + centrifuge.id_to_channels[id] = nil + end + if centrifuge.channel_to_ids[v] and next(centrifuge.channel_to_ids[v]) == nil then + centrifuge.channel_to_ids[v] = nil + end + end +end + +local function publish_to_subscribers(channel, message_tuple) + local channelIds = centrifuge.channel_to_ids[channel] or {} + if channelIds then + for k,v in pairs(channelIds) do + local id_to_messages = centrifuge.id_to_messages[k] or {} + table.insert(id_to_messages, message_tuple) + centrifuge.id_to_messages[k] = id_to_messages + end + end +end + +local function wake_up_subscribers(channel) + local ids = centrifuge.channel_to_ids[channel] + if ids then + for k, _ in pairs(ids) do + local channel = centrifuge.id_to_fiber[k] + if channel:has_readers() then channel:put(true, 0) end + end + end +end + +function centrifuge.publish(msg_type, channel, data, info, ttl, size, meta_ttl) + if not ttl then ttl = 0 end + if not size then size = 0 end + if not meta_ttl then meta_ttl = 0 end + local epoch = "" + local offset = 0 + box.begin() + if ttl > 0 and size > 0 then + local now = clock.realtime() + local meta_exp = 0 + if meta_ttl > 0 then + meta_exp = now + meta_ttl + end + local stream_meta = box.space.meta:get(channel) + if stream_meta then + offset = stream_meta[2] + 1 + epoch = stream_meta[3] + else + epoch = tostring(now) + offset = 1 + end + box.space.meta:upsert({channel, offset, epoch, meta_exp}, {{'=', 'channel', channel}, {'+', 'offset', 1}, {'=', 'exp', meta_exp}}) + box.space.pubs:auto_increment{channel, offset, clock.realtime() + tonumber(ttl), data, info} + local max_offset_to_keep = offset - size + if max_offset_to_keep > 0 then + for _, v in box.space.pubs.index.channel:pairs({channel, max_offset_to_keep}, {iterator = box.index.LE}) do + box.space.pubs:delete{v.id} + end + end + end + publish_to_subscribers(channel, {msg_type, channel, offset, epoch, data, info}) + wake_up_subscribers(channel) + box.commit() + return offset, epoch +end + +function centrifuge.history(channel, since_offset, limit, include_pubs, meta_ttl) + if not meta_ttl then meta_ttl = 0 end + local meta_exp = 0 + local now = clock.realtime() + if meta_ttl > 0 then + meta_exp = now + meta_ttl + end + local epoch = tostring(now) + box.begin() + box.space.meta:upsert({channel, 0, epoch, meta_exp}, {{'=', 'channel', channel}, {'=', 'exp', meta_exp}}) + local stream_meta = box.space.meta:get(channel) + if not include_pubs then + box.commit() + return stream_meta[2], stream_meta[3], nil + end + if stream_meta[2] == since_offset - 1 then + box.commit() + return stream_meta[2], stream_meta[3], nil + end + local num_entries = 0 + local pubs = box.space.pubs.index.channel:pairs({channel, since_offset}, {iterator = box.index.GE}):take_while(function(x) + num_entries = num_entries + 1 + return x.channel == channel and (limit < 1 or num_entries < limit + 1) + end):totable() + box.commit() + return stream_meta[2], stream_meta[3], pubs +end + +function centrifuge.remove_history(channel) + box.begin() + for _, v in box.space.pubs.index.channel:pairs{channel} do + box.space.pubs:delete{v.id} + end + box.commit() +end + +function centrifuge.add_presence(channel, ttl, client_id, user_id, conn_info, chan_info) + if not ttl then ttl = 0 end + if not conn_info then conn_info = "" end + if not chan_info then chan_info = "" end + local exp = clock.realtime() + ttl + box.space.presence:put({channel, client_id, user_id, conn_info, chan_info, exp}) +end + +function centrifuge.remove_presence(channel, client_id) + for _, v in box.space.presence:pairs({channel, client_id}, {iterator = box.index.EQ}) do + box.space.presence:delete{channel, client_id} + end +end + +function centrifuge.presence(channel) + return box.space.presence:select{channel} +end + +function centrifuge.presence_stats(channel) + local users = {} + local num_clients = 0 + local num_users = 0 + for _, v in box.space.presence:pairs({channel}, {iterator = box.index.EQ}) do + num_clients = num_clients + 1 + if not users[v.user_id] then + num_users = num_users + 1 + users[v.user_id] = true + end + end + return num_clients, num_users +end + +return centrifuge diff --git a/_examples/custom_engine_tarantool/tntserver/docker-compose.yaml b/_examples/custom_engine_tarantool/tntserver/docker-compose.yaml new file mode 100644 index 00000000..22b2477a --- /dev/null +++ b/_examples/custom_engine_tarantool/tntserver/docker-compose.yaml @@ -0,0 +1,26 @@ +version: "3.9" +services: + tnt1: + container_name: tnt1 + image: tarantool/tarantool:2.7.0 + volumes: + - .:/opt/tarantool/ + command: tarantool ha.lua 1 + ports: + - 3301:3301 + tnt2: + container_name: tnt2 + image: tarantool/tarantool:2.7.0 + volumes: + - .:/opt/tarantool/ + command: tarantool ha.lua 2 + ports: + - 3302:3302 + tnt3: + container_name: tnt3 + image: tarantool/tarantool:2.7.0 + volumes: + - .:/opt/tarantool/ + command: tarantool ha.lua 3 + ports: + - 3303:3303 diff --git a/_examples/custom_engine_tarantool/tntserver/ha.lua b/_examples/custom_engine_tarantool/tntserver/ha.lua new file mode 100644 index 00000000..b7ad4368 --- /dev/null +++ b/_examples/custom_engine_tarantool/tntserver/ha.lua @@ -0,0 +1,58 @@ +require 'strict'.on() +fiber = require 'fiber' + +local instance_id = string.match(arg[1], '^%d+$') +assert(instance_id, 'malformed instance id') + +local port = 3300 + instance_id +local workdir = 'ha_tnt'..instance_id + +box.cfg{ + listen = '0.0.0.0:'..port, + wal_dir = workdir, + memtx_dir = workdir, + readahead = 10 * 1024 * 1024, -- to keep up with benchmark load. + net_msg_max = 1024, -- to keep up with benchmark load. + + instance_uuid='aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa'..instance_id, + + replication = { + 'tnt1:'..3301, + 'tnt2:'..3302, + 'tnt3:'..3303, + }, + replication_connect_quorum=0, + + -- The instance is set to candidate, so it may become leader itself + -- as well as vote for other instances. + -- + -- Alternative: set one of the three instances to `voter`, so that it + -- never becomes a leader but still votes for one of its peers and helps + -- it reach election quorum (2 in our case). + election_mode='candidate', + -- Quorum for both synchronous transactions and + -- leader election votes. + replication_synchro_quorum=2, + -- Synchronous replication timeout. The transaction will be + -- rolled back if no quorum is achieved during 1 second. + replication_synchro_timeout=1, + -- Heartbeat timeout. A leader is considered dead if it doesn't + -- send heartbeats for 4 * replication_timeout (1 second in our case). + -- Once the leader is dead, remaining instances start a new election round. + replication_timeout=0.25, + -- Timeout between elections. Needed to restart elections when no leader + -- emerges soon enough. + election_timeout=0.25, +} + +box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true }) + +centrifuge = require 'centrifuge' + +box.once("centrifuge:schema:1", function() + centrifuge.init({ + presence_temporary=true + }) +end) + +require'console'.start() diff --git a/_examples/custom_engine_tarantool/tntserver/init.lua b/_examples/custom_engine_tarantool/tntserver/init.lua new file mode 100644 index 00000000..1cbc4fc5 --- /dev/null +++ b/_examples/custom_engine_tarantool/tntserver/init.lua @@ -0,0 +1,31 @@ +require 'strict'.on() +fiber = require 'fiber' + +local instance_id = string.match(arg[1], '^%d+$') +assert(instance_id, 'malformed instance id') + +local port = 3300 + instance_id +local workdir = 'tnt'..instance_id + +box.cfg{ + listen = '0.0.0.0:'..port, + wal_mode = 'none', + wal_dir = workdir, -- though WAL used here by default, see above. + memtx_dir = workdir, + readahead = 10 * 1024 * 1024, -- to keep up with benchmark load. + net_msg_max = 1024, -- to keep up with benchmark load. +} +box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true }) + +centrifuge = require 'centrifuge' + +box.once("centrifuge:schema:1", function() + centrifuge.init({ + presence_temporary=true + }) +end) + +if not fiber.self().storage.console then + require'console'.start() + os.exit() +end diff --git a/_examples/custom_engine_tarantool/tntserver/readme.md b/_examples/custom_engine_tarantool/tntserver/readme.md new file mode 100644 index 00000000..b9da8246 --- /dev/null +++ b/_examples/custom_engine_tarantool/tntserver/readme.md @@ -0,0 +1,17 @@ +## Tarantool-based Centrifugo/Centrifuge Broker and PresenceManager + +What's inside: + +* PUB/SUB implementation to scale Centrifuge/Centrifugo nodes (PULL or PUSH based) +* Message history inside channels with retention (ttl and size) to survive mass reconnects and prevent message loss +* Presence support – to answer on a question who are the current subscribers of a certain channel + +# The underlying spaces + +The module creates several spaces internally. + +* `pubs` to keep publication history for channels (for channels with history enabled) +* `meta` to keep history metadata (channel current max offset and epoch) +* `presence` to keep channel presence information (advice: make it temporary) + +These spaces created automatically when you initialize the module by calling `centrifuge.init()`. diff --git a/_examples/go.mod b/_examples/go.mod index db42b6b3..a3d5f096 100644 --- a/_examples/go.mod +++ b/_examples/go.mod @@ -5,6 +5,7 @@ go 1.21 replace github.com/centrifugal/centrifuge => ../ require ( + github.com/FZambia/tarantool v0.2.2 github.com/centrifugal/centrifuge v0.8.2 github.com/centrifugal/protocol v0.13.4 github.com/cristalhq/jwt/v5 v5.4.0 @@ -12,6 +13,7 @@ require ( github.com/gin-contrib/sessions v0.0.3 github.com/gin-gonic/gin v1.10.0 github.com/gobwas/ws v1.3.2 + github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 github.com/gorilla/sessions v1.3.0 github.com/gorilla/websocket v1.5.0 @@ -20,6 +22,7 @@ require ( github.com/prometheus/client_golang v1.19.1 github.com/quic-go/quic-go v0.42.0 github.com/stretchr/testify v1.9.0 + github.com/vmihailenco/msgpack/v5 v5.3.1 golang.org/x/oauth2 v0.22.0 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 @@ -49,7 +52,6 @@ require ( github.com/gobwas/pool v0.2.1 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/context v1.1.1 // indirect github.com/gorilla/securecookie v1.1.2 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -79,6 +81,7 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.uber.org/mock v0.4.0 // indirect golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.23.0 // indirect diff --git a/_examples/go.sum b/_examples/go.sum index 250c387b..da8e5242 100644 --- a/_examples/go.sum +++ b/_examples/go.sum @@ -2,6 +2,8 @@ cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2Qx cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= github.com/FZambia/eagle v0.1.0 h1:9gyX6x+xjoIfglgyPTcYm7dvY7FJ93us1QY5De4CyXA= github.com/FZambia/eagle v0.1.0/go.mod h1:YjGSPVkQTNcVLfzEUQJNgW9ScPR0K4u/Ky0yeFa4oDA= +github.com/FZambia/tarantool v0.2.2 h1:uC4clbBxkpvILYcHj4dktyYwUs57BeODbY/yWgH67pU= +github.com/FZambia/tarantool v0.2.2/go.mod h1:MSuWem4S/t7G+qxg8PZk8Mn25UfoXLYf+UxYFIfEydM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/boj/redistore v0.0.0-20180917114910-cd5dcc76aeff/go.mod h1:+RTT1BOk5P97fT2CiHkbFQwkK3mjsFAP6zCYV2aXtjw= @@ -184,6 +186,10 @@ github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65E github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/vmihailenco/msgpack/v5 v5.3.1 h1:0i85a4dsZh8mC//wmyyTEzidDLPQfQAxZIOLtafGbFY= +github.com/vmihailenco/msgpack/v5 v5.3.1/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=