Skip to content

Commit

Permalink
keep example
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Aug 10, 2024
1 parent fb389c8 commit b8e6bdb
Show file tree
Hide file tree
Showing 16 changed files with 2,798 additions and 1 deletion.
136 changes: 136 additions & 0 deletions _examples/custom_engine_tarantool/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title></title>
<style type="text/css">
input[type="text"] { width: 300px; }
.muted {color: #CCCCCC; font-size: 10px;}
</style>
<script type="text/javascript" src="https://unpkg.com/centrifuge@^5/dist/centrifuge.js"></script>
<script type="text/javascript">
// helper functions to work with escaping html.
const tagsToReplace = {'&': '&amp;', '<': '&lt;', '>': '&gt;'};
function replaceTag(tag) {return tagsToReplace[tag] || tag;}
function safeTagsReplace(str) {return str.replace(/[&<>]/g, replaceTag);}

const channel = "chat:index";

window.addEventListener('load', function() {
const input = document.getElementById('input');
const container = document.getElementById('messages');

const centrifuge = new Centrifuge('ws://' + window.location.host + '/connection/websocket');

centrifuge.on('connecting', function(ctx){
drawText('Connecting: ' + ctx.reason);
input.setAttribute('disabled', 'true');
});

centrifuge.on('disconnected', function(ctx){
drawText('Disconnected: ' + ctx.reason);
input.setAttribute('disabled', 'true');
});

// bind listeners on centrifuge object instance events.
centrifuge.on('connected', function(ctx){
drawText('Connected with client ID ' + ctx.client + ' over ' + ctx.transport);
input.removeAttribute('disabled');
});

// subscribe on channel and bind various event listeners. Actual
// subscription request will be sent after client connects to
// a server.
const sub = centrifuge.newSubscription(channel);

sub.on("publication", handlePublication)
.on("join", handleJoin)
.on("leave", handleLeave)
.on("unsubscribed", handleUnsubscribed)
.on("subscribed", handleSubscribed)
.on("subscribing", handleSubscribing)
.on("error", handleSubscriptionError);

sub.subscribe();

// Trigger actual connection establishing with a server.
// At this moment actual client work starts - i.e. subscriptions
// defined start subscribing etc.
centrifuge.connect();

// show how many users currently in channel.
function showPresence(sub) {
sub.presence().then(function(result) {
let count = 0;
for (let key in result.clients){
count++;
}
drawText('Presence: now in this room – ' + count + ' clients');
}, function(err) {
drawText("Presence error: " + JSON.stringify(err));
});
}

function handleSubscribed(ctx) {
drawText('Subscribed on channel ' + ctx.channel + ': ' + JSON.stringify(ctx));
showPresence(sub);

centrifuge.rpc("getCurrentYear", {}).then(function(data){
drawText("RPC response data: " + JSON.stringify(data));
}, function(err) {
drawText("RPC error: " + JSON.stringify(err));
});
}

function handleUnsubscribed(ctx) {
drawText('Unsubscribed from channel ' + ctx.channel + ', ' + JSON.stringify(ctx));
}

function handleSubscribing(ctx) {
drawText('Subscribing on channel ' + ctx.channel + ', ' + JSON.stringify(ctx));
}

function handleSubscriptionError(ctx) {
drawText('Error subscribing on channel ' + JSON.stringify(ctx));
}

function handlePublication(message) {
let clientID;
if (message.info){
clientID = message.info.client;
} else {
clientID = null;
}
const inputText = message.data["input"].toString();
const text = safeTagsReplace(inputText) + ' <span class="muted">from ' + clientID + '</span>';
drawText(text);
}

function handleJoin(ctx) {
drawText('Client joined channel ' + this.channel + ' (uid ' + ctx.info["client"] + ', user '+ ctx.info["user"] +')');
}

function handleLeave(ctx) {
drawText('Client left channel ' + this.channel + ' (uid ' + ctx.info["client"] + ', user '+ ctx.info["user"] +')');
}

document.getElementById('form').addEventListener('submit', function(event) {
event.preventDefault();
sub.publish({"input": input.value}).then(function() {
console.log('message accepted by server');
}, function(err) {
console.log('error publishing message', err);
});
input.value = '';
});
});
</script>
</head>
<body>
<form id="form">
<input type="text" id="input" autocomplete="off" />
<input type="submit" id="submit" value="»">
</form>
<ul id="messages"></ul>
</body>
</html>
203 changes: 203 additions & 0 deletions _examples/custom_engine_tarantool/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package main

import (
"context"
"flag"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

_ "net/http/pprof"

"github.com/centrifugal/centrifuge"
"github.com/centrifugal/centrifuge/_examples/custom_engine_tarantool/tntengine"
)

var (
port = flag.Int("port", 8000, "Port to bind app to")
sharded = flag.Bool("sharded", false, "Start sharded example")
ha = flag.Bool("ha", false, "Start high availability example")
raft = flag.Bool("raft", false, "Using Raft-based replication")
user = flag.String("user", "guest", "Connection user")
password = flag.String("password", "", "Connection password")
addresses = flag.String("addresses", "", "Configure Tarantool addresses (by default we use hardcoded here)")
)

func handleLog(e centrifuge.LogEntry) {
log.Printf("[centrifuge] %s: %v", e.Message, e.Fields)
}

func authMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx = centrifuge.SetCredentials(ctx, &centrifuge.Credentials{
UserID: "42",
Info: []byte(`{"name": "Alexander"}`),
})
r = r.WithContext(ctx)
h.ServeHTTP(w, r)
})
}

func waitExitSignal(n *centrifuge.Node) {
sigCh := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
_ = n.Shutdown(context.Background())
done <- true
}()
<-done
}

func main() {
flag.Parse()

node, _ := centrifuge.New(centrifuge.Config{
LogLevel: centrifuge.LogLevelDebug,
LogHandler: handleLog,
})

node.OnConnect(func(client *centrifuge.Client) {
transport := client.Transport()
log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())

client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
log.Printf("user %s subscribes on %s", client.UserID(), e.Channel)
cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
EmitPresence: true,
EmitJoinLeave: true,
PushJoinLeave: true,
EnableRecovery: true,
},
}, nil)
})

client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) {
log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel)
})

client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
log.Printf("user %s publishes into channel %s: %s", client.UserID(), e.Channel, string(e.Data))
cb(centrifuge.PublishReply{
Options: centrifuge.PublishOptions{
HistorySize: 10,
HistoryTTL: 10 * time.Minute,
},
}, nil)
})

client.OnPresence(func(e centrifuge.PresenceEvent, cb centrifuge.PresenceCallback) {
log.Printf("user %s calls presence on %s", client.UserID(), e.Channel)
if !client.IsSubscribed(e.Channel) {
cb(centrifuge.PresenceReply{}, centrifuge.ErrorPermissionDenied)
return
}
cb(centrifuge.PresenceReply{}, nil)
})

client.OnPresenceStats(func(e centrifuge.PresenceStatsEvent, cb centrifuge.PresenceStatsCallback) {
log.Printf("user %s calls presence stats on %s", client.UserID(), e.Channel)
if !client.IsSubscribed(e.Channel) {
cb(centrifuge.PresenceStatsReply{}, centrifuge.ErrorPermissionDenied)
return
}
cb(centrifuge.PresenceStatsReply{}, nil)
})

client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect)
})
})

// Single Tarantool.
mode := tntengine.ConnectionModeSingleInstance
shardAddresses := [][]string{
{"127.0.0.1:3301"},
}

if *ha {
if *raft {
// Single Tarantool RS with automatic leader election with Raft (Tarantool >= 2.7.0).
shardAddresses = [][]string{
{"127.0.0.1:3301", "127.0.0.1:3302", "127.0.0.1:3303"},
}
mode = tntengine.ConnectionModeLeaderFollowerRaft
} else {
// Single Tarantool RS with automatic leader election (ex. in Cartridge).
shardAddresses = [][]string{
{"127.0.0.1:3301", "127.0.0.1:3302"},
}
mode = tntengine.ConnectionModeLeaderFollower
}
} else if *sharded {
// Client-side sharding between two Tarantool instances (without HA).
shardAddresses = [][]string{
{"127.0.0.1:3301"},
{"127.0.0.1:3302"},
}
}

if *addresses != "" {
var customShardAddresses [][]string
shardParts := strings.Split(*addresses, " ")
for _, shardPart := range shardParts {
customShardAddresses = append(customShardAddresses, strings.Split(shardPart, ","))
}
shardAddresses = customShardAddresses
}

var shards []*tntengine.Shard
for _, addresses := range shardAddresses {
shard, err := tntengine.NewShard(tntengine.ShardConfig{
Addresses: addresses,
User: *user,
Password: *password,
ConnectionMode: mode,
})
if err != nil {
log.Fatal(err)
}
shards = append(shards, shard)
}

broker, err := tntengine.NewBroker(node, tntengine.BrokerConfig{
UsePolling: false,
Shards: shards,
})
if err != nil {
log.Fatal(err)
}
node.SetBroker(broker)

presenceManager, err := tntengine.NewPresenceManager(node, tntengine.PresenceManagerConfig{
Shards: shards,
})
if err != nil {
log.Fatal(err)
}
node.SetPresenceManager(presenceManager)

if err := node.Run(); err != nil {
log.Fatal(err)
}

http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})))
http.Handle("/", http.FileServer(http.Dir("./")))

go func() {
if err := http.ListenAndServe(":"+strconv.Itoa(*port), nil); err != nil {
log.Fatal(err)
}
}()

waitExitSignal(node)
log.Println("bye!")
}
Loading

0 comments on commit b8e6bdb

Please sign in to comment.