Skip to content

Commit

Permalink
recovery mode cache example
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed May 15, 2024
1 parent 09b64ec commit 137092d
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 2 deletions.
6 changes: 4 additions & 2 deletions _examples/compression_playground/readme.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
This is a sample simulation of football match where the entire state is sent into WebSocket connection upon every
match event.
match event. The example is not very idiomatic because we try to simulate various modes thus several different
files were required. In practice, you will have JSON or Protobuf case only, and there is no need to tweak behaviour
over URL params like we do here.

The goal is to compare different compression strategies for WebSocket data transfer. Please note, that results
The goal was to compare different compression strategies for WebSocket data transfer. Please note, that results
depend a lot on the data you send. You may get absolutely different results for your data. Still we hope this
example gives some insights on how to choose the best compression strategy and what to expect from Centrifuge.

Expand Down
3 changes: 3 additions & 0 deletions _examples/compression_playground/templates/json.html
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
debug: true,
});

// We are using cache recovery feature in the example, passing since: {} allows to trigger recovery
// on initial subscribe. For cache recovery the previous stream position does not matter - server
// just tries to extract latest publication from channel history and send it to client.
let subOptions = {
since: {},
}
Expand Down
7 changes: 7 additions & 0 deletions _examples/recovery_mode_cache/centrifuge.js

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions _examples/recovery_mode_cache/d3.v7.min.js

Large diffs are not rendered by default.

135 changes: 135 additions & 0 deletions _examples/recovery_mode_cache/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="d3.v7.min.js"></script>
<script type="text/javascript" src="centrifuge.js"></script>
<title>Speedometer Visualization</title>
<style>
body {
font-family: 'Arial Black', sans-serif;
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
margin: 0;
background-color: #1e1e1e;
}
#speedometer {
position: relative;
}
.speedometer {
font: 14px 'Arial Black', sans-serif;
}
.speedometer circle {
fill: #333;
stroke: #ff4500;
stroke-width: 6;
}
.speedometer line {
stroke: #ff4500;
stroke-width: 6;
stroke-linecap: round;
}
.speedometer line {
stroke: #ff4500;
stroke-width: 6;
stroke-linecap: round;
}
.speedometer .tick {
fill: #fcd8d4;
font-weight: bold;
font-size: 1.2em;
}
.speedometer .circle-background {
fill: #ff4500;
}
</style>
</head>
<body>
<div id="speedometer"></div>
<script type="text/javascript">
const width = 400, height = 400, radius = Math.min(width, height) / 2 - 3;
const needleLength = radius * 0.9;
const speedRange = 180; // Speed range in degrees (from -90 to 90)

const speedometer = d3.select("#speedometer").append("svg")
.attr("width", width)
.attr("height", height)
.append("g")
.attr("transform", `translate(${width / 2},${height / 2})`)
.attr("class", "speedometer");

speedometer.append("circle")
.attr("r", radius)
.attr("class", "circle");

const scale = d3.scaleLinear()
.domain([0, 200]) // Assuming the speed range is 0 to 200
.range([-speedRange / 2, speedRange / 2]);

speedometer.selectAll(".tick")
.data(scale.ticks(10))
.enter().append("text")
.attr("class", "tick")
.attr("x", d => Math.cos((scale(d) - 90) * Math.PI / 180) * (radius - 30))
.attr("y", d => Math.sin((scale(d) - 90) * Math.PI / 180) * (radius - 30))
.attr("text-anchor", "middle")
.attr("alignment-baseline", "middle")
.text(d => d);

let lineRendered = false;
const line = speedometer.append("line")
.attr("x1", 0)
.attr("y1", 0)
.attr("x2", 0)
.attr("y2", -needleLength)
.attr("class", "line")
.style("opacity", 0); // Initially set the needle to be invisible

// Add a circle at the base of the needle
speedometer.append("circle")
.attr("cx", 0)
.attr("cy", 0)
.attr("r", 10)
.attr("stroke-width", 2);

function updateSpeedometer(speed) {
const degrees = scale(speed);

let duration = 110;
if (!lineRendered) {
// On first render move to the desired speed without animation duration.
duration = 0;
}

line
.transition()
.duration(duration)
.attr("transform", `rotate(${degrees})`);

if (!lineRendered) {
line.style("opacity", 1);
lineRendered = true;
}
}

const centrifuge = new Centrifuge('ws://localhost:8000/connection/websocket', {});

// We are using cache recovery feature in the example, passing since: {} allows to trigger recovery
// on initial subscribe. For cache recovery the previous stream position does not matter - server
// just tries to extract latest publication from channel history and send it to client.
const sub = centrifuge.newSubscription("speed", {
since: {},
});

sub.on("publication", (ctx) => {
updateSpeedometer(ctx.data.speed);
})

sub.subscribe();
centrifuge.connect();
</script>
</body>
</html>
166 changes: 166 additions & 0 deletions _examples/recovery_mode_cache/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

_ "net/http/pprof"

"github.com/centrifugal/centrifuge"
)

var port = flag.Int("port", 8000, "Port to bind app to")

func handleLog(e centrifuge.LogEntry) {
log.Printf("%s: %v", e.Message, e.Fields)
}

func authMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
newCtx := centrifuge.SetCredentials(ctx, &centrifuge.Credentials{
UserID: "",
})
r = r.WithContext(newCtx)
h.ServeHTTP(w, r)
})
}

func waitExitSignal(n *centrifuge.Node, s *http.Server) {
sigCh := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = n.Shutdown(ctx)
_ = s.Shutdown(ctx)
done <- true
}()
<-done
}

const exampleChannel = "speed"

// Check whether channel is allowed for subscribing. In real case permission
// check will probably be more complex than in this example.
func channelSubscribeAllowed(channel string) bool {
return channel == exampleChannel
}

func main() {
node, _ := centrifuge.New(centrifuge.Config{
LogLevel: centrifuge.LogLevelInfo,
LogHandler: handleLog,
HistoryMetaTTL: 24 * time.Hour,
})

node.OnConnect(func(client *centrifuge.Client) {
transport := client.Transport()
log.Printf("[user %s] connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())

client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
log.Printf("[user %s] subscribes on %s", client.UserID(), e.Channel)

if !channelSubscribeAllowed(e.Channel) {
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied)
return
}

cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
EnableRecovery: true,
RecoveryMode: centrifuge.RecoveryModeCache,
},
}, nil)
})

client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) {
log.Printf("[user %s] unsubscribed from %s: %s", client.UserID(), e.Channel, e.Reason)
})

client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
log.Printf("[user %s] disconnected: %s", client.UserID(), e.Reason)
})
})

if err := node.Run(); err != nil {
log.Fatal(err)
}

go func() {
const (
accelerationRate = 2.0 // Speed increment per 100 ms
brakingRate = 10.0 // Speed decrement per 100 ms
maxSpeed = 190.0
minSpeed = 50.0
)

speed := 0.0
increasing := true

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if increasing {
speed += accelerationRate
if speed >= maxSpeed {
increasing = false
}
} else {
speed -= brakingRate
if speed <= minSpeed {
increasing = true
}
}
_, err := node.Publish(
exampleChannel,
[]byte(`{"speed": `+fmt.Sprint(speed)+`}`),
centrifuge.WithHistory(1, time.Minute),
)
if err != nil {
log.Printf("error publishing to personal channel: %s", err)
}
}
}
}()

mux := http.DefaultServeMux

websocketHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{
ReadBufferSize: 1024,
UseWriteBufferPool: true,
})
mux.Handle("/connection/websocket", authMiddleware(websocketHandler))
mux.Handle("/", http.FileServer(http.Dir("./")))

server := &http.Server{
Handler: mux,
Addr: "127.0.0.1:" + strconv.Itoa(*port),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

log.Print("Starting server, visit http://localhost:8000")
go func() {
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()

waitExitSignal(node, server)
log.Println("bye!")
}

0 comments on commit 137092d

Please sign in to comment.