-
-
Notifications
You must be signed in to change notification settings - Fork 94
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add example of real-time document sync
- Loading branch information
Showing
4 changed files
with
387 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
<!DOCTYPE html> | ||
<html> | ||
<head> | ||
<meta charset="utf-8"> | ||
<title></title> | ||
<style> | ||
body, html {height: 100%;margin: 0;display: flex;justify-content: center;align-items: center;background-color: #f0f8ff;font-family: Roboto, Arial, sans-serif;} | ||
#counter {font-size: 160px;font-weight: bold;color: #ff6347;} | ||
</style> | ||
</head> | ||
<body> | ||
<div id="counter"></div> | ||
<script type="text/javascript" src="https://unpkg.com/[email protected]/dist/centrifuge.js"></script> | ||
<script type="text/javascript" src="rtdocument.js"></script> | ||
<script type="text/javascript"> | ||
window.addEventListener('load', function() { | ||
const counterContainer = document.getElementById("counter"); | ||
|
||
const client = new Centrifuge('ws://localhost:8000/connection/websocket', {}); | ||
const subscription = client.newSubscription('counter', {}); | ||
|
||
const realTimeDocument = new RealTimeDocument({ | ||
subscription, | ||
load: async () => { | ||
const response = await fetch('/api/counter'); | ||
const result = await response.json(); | ||
return { document: result.value, version: result.version }; | ||
}, | ||
applyUpdate: (document, update) => { | ||
console.log("Applying update", update, "to", document); | ||
return update.value; | ||
}, | ||
compareVersion: (publication, currentVersion) => { | ||
const newVersion = publication.data.version; | ||
return newVersion > currentVersion ? newVersion : null; | ||
}, | ||
onChange: (document) => { | ||
console.log("Counter changed", document); | ||
counterContainer.textContent = document; | ||
}, | ||
debug: true, | ||
}); | ||
client.connect(); | ||
|
||
// Note – we can call sync even before connect. | ||
realTimeDocument.startSync(); | ||
}); | ||
</script> | ||
</body> | ||
</html> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"log" | ||
"net/http" | ||
"os" | ||
"os/signal" | ||
"sync" | ||
"syscall" | ||
"time" | ||
|
||
_ "net/http/pprof" | ||
|
||
"github.com/centrifugal/centrifuge" | ||
) | ||
|
||
// Counter is a document we sync here. For counter Version always matches Value – but | ||
// that's not the rule for other more complex documents of course. | ||
type Counter struct { | ||
Version int `json:"version"` | ||
Value int `json:"value"` | ||
} | ||
|
||
var ( | ||
counter Counter | ||
counterLock sync.RWMutex | ||
) | ||
|
||
func handleLog(e centrifuge.LogEntry) { | ||
log.Printf("%s: %v", e.Message, e.Fields) | ||
} | ||
|
||
const exampleChannel = "counter" | ||
|
||
// Check whether channel is allowed for subscribing. In real case permission | ||
// check will probably be more complex than in this example. | ||
func channelSubscribeAllowed(channel string) bool { | ||
return channel == exampleChannel | ||
} | ||
|
||
func authMiddleware(h http.Handler) http.Handler { | ||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
ctx := r.Context() | ||
newCtx := centrifuge.SetCredentials(ctx, ¢rifuge.Credentials{ | ||
UserID: "42", | ||
}) | ||
r = r.WithContext(newCtx) | ||
h.ServeHTTP(w, r) | ||
}) | ||
} | ||
|
||
func waitExitSignal(n *centrifuge.Node) { | ||
sigCh := make(chan os.Signal, 1) | ||
done := make(chan bool, 1) | ||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) | ||
go func() { | ||
<-sigCh | ||
_ = n.Shutdown(context.Background()) | ||
done <- true | ||
}() | ||
<-done | ||
} | ||
|
||
func main() { | ||
node, _ := centrifuge.New(centrifuge.Config{ | ||
LogLevel: centrifuge.LogLevelDebug, | ||
LogHandler: handleLog, | ||
}) | ||
|
||
node.OnConnect(func(client *centrifuge.Client) { | ||
transport := client.Transport() | ||
log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol()) | ||
|
||
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { | ||
log.Printf("user %s subscribes on %s", client.UserID(), e.Channel) | ||
|
||
if !channelSubscribeAllowed(e.Channel) { | ||
log.Printf("user %s disallowed to subscribe on %s", client.UserID(), e.Channel) | ||
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied) | ||
return | ||
} | ||
|
||
cb(centrifuge.SubscribeReply{ | ||
Options: centrifuge.SubscribeOptions{ | ||
EnableRecovery: true, | ||
}, | ||
}, nil) | ||
}) | ||
|
||
client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) { | ||
log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel) | ||
}) | ||
|
||
client.OnDisconnect(func(e centrifuge.DisconnectEvent) { | ||
log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect) | ||
}) | ||
}) | ||
|
||
if err := node.Run(); err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{}))) | ||
http.HandleFunc("/api/counter", getCounterHandler) | ||
http.Handle("/", http.FileServer(http.Dir("./"))) | ||
|
||
counter = Counter{Version: 0, Value: 0} | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
// Start the counter increment simulation. | ||
go simulateCounterIncrease(ctx, node) | ||
|
||
go func() { | ||
if err := http.ListenAndServe(":8000", nil); err != nil { | ||
log.Fatal(err) | ||
} | ||
}() | ||
|
||
waitExitSignal(node) | ||
log.Println("bye!") | ||
} | ||
|
||
func getCounterHandler(w http.ResponseWriter, _ *http.Request) { | ||
counterLock.RLock() | ||
defer counterLock.RUnlock() | ||
w.Header().Set("Content-Type", "application/json") | ||
_ = json.NewEncoder(w).Encode(counter) | ||
} | ||
|
||
func simulateCounterIncrease(ctx context.Context, node *centrifuge.Node) { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-time.After(50 * time.Millisecond): | ||
counterLock.Lock() | ||
counter.Version++ | ||
counter.Value++ | ||
// Publishing under the lock here which is generally not good, but we want | ||
// to emulate transactional outbox or CDC guarantees. | ||
err := publishToChannel(node) | ||
if err != nil { | ||
// Emulate transaction rollback. | ||
log.Println("publish to channel error", err) | ||
counter.Version-- | ||
counter.Value-- | ||
} | ||
counterLock.Unlock() | ||
} | ||
} | ||
} | ||
|
||
func publishToChannel(node *centrifuge.Node) error { | ||
data, _ := json.Marshal(counter) | ||
_, err := node.Publish(exampleChannel, data, | ||
centrifuge.WithHistory(100, 10*time.Second)) | ||
return err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
This example demonstrates a real-time document synchronization. To achieve it we are using a helper class RealTimeDocument – we delegate Centrifuge Subscription to it, also a function to load the document and its version. Subscription channel must uniquely correlate with the document we want to sync. | ||
|
||
We also provide a couple of additional functions to apply real-time updates to the current document version and compare versions (to ensure we are not applying non-actual real-time updates to the state). | ||
|
||
To achieve the proper sync the version in the document should be incremental. In the real-world case this incremental version should be an incremental field in database. And we assume that instead of direct publishing to Centrifuge the backend is using transactional outbox or CDC approach – so that changes in document atomically saved and reliably exported to Centrifuge/Centrifugo. | ||
|
||
Keep in mind, that messages must be sent to Centrifugo channel in the correct version order since we rely on version checks on the frontend. | ||
|
||
The important thing here is that we should first subscribe to the channel, then load the document from the backend. This helps to not miss intermediary updates, happening between document load and subscription request. If we can guarantee that all updates eventually reach Centrifugo (and in case of transactional outbox or CDC we can) – then the sync will work properly. | ||
|
||
To start the example run the following command from the example directory: | ||
|
||
``` | ||
go run main.go | ||
``` | ||
|
||
Then go to http://localhost:8000 to see it in action. Open a couple of browser windows. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
class RealTimeDocument { | ||
#subscription; | ||
#channel; | ||
#load; | ||
#applyUpdate; | ||
#compareVersion; | ||
#onChange; | ||
#messageBuffer = []; | ||
#document = null; | ||
#version = null; | ||
#isLoaded = false; | ||
#reSyncTimer = null; | ||
#minReSyncDelay; | ||
#maxReSyncDelay; | ||
#reSyncAttempts = 0; | ||
#debug = false; | ||
|
||
constructor({ | ||
subscription, | ||
load, | ||
applyUpdate, | ||
compareVersion, | ||
onChange, | ||
minReSyncDelay = 100, | ||
maxReSyncDelay = 10000, | ||
debug = false, | ||
}) { | ||
if (!subscription) throw new Error("subscription is required"); | ||
if (!load) throw new Error("load function is required"); | ||
if (!applyUpdate) throw new Error("applyUpdate function is required"); | ||
if (!compareVersion) throw new Error("compareVersion function is required"); | ||
if (!onChange) throw new Error("onChange function is required"); | ||
|
||
this.#subscription = subscription; | ||
this.#channel = subscription.channel; | ||
this.#load = load; | ||
this.#applyUpdate = applyUpdate; | ||
this.#compareVersion = compareVersion; | ||
this.#onChange = onChange; | ||
this.#minReSyncDelay = minReSyncDelay; | ||
this.#maxReSyncDelay = maxReSyncDelay; | ||
this.#debug = debug; | ||
|
||
this.#subscription.on('publication', (ctx) => { | ||
if (!this.#isLoaded) { | ||
// Buffer messages until initial state is loaded. | ||
this.#messageBuffer.push(ctx); | ||
return; | ||
} | ||
// Process new messages immediately if initial state is already loaded. | ||
const newVersion = this.#compareVersion(ctx, this.#version); | ||
if (newVersion === null) { | ||
this.#debugLog("Skip real-time publication", ctx); | ||
return; | ||
} | ||
this.#document = this.#applyUpdate(this.#document, ctx.data); | ||
this.#version = newVersion; | ||
this.#onChange(this.#document); | ||
}).on('subscribed', (ctx) => { | ||
if (ctx.wasRecovering) { | ||
if (ctx.recovered) { | ||
this.#debugLog("Successfully re-attached to a stream"); | ||
} else { | ||
this.#debugLog("Re-syncing due to failed recovery"); | ||
this.#reSync(); | ||
} | ||
} else { | ||
this.#debugLog("Load data for the first time"); | ||
this.#loadDocumentApplyBuffered().catch(error => this.#debugLog("Unhandled error in loadDocumentApplyBuffered", error)); | ||
} | ||
}).on('unsubscribed', (ctx) => { | ||
this.#debugLog("Subscription unsubscribed", ctx); | ||
this.stopSync(); | ||
}); | ||
} | ||
|
||
startSync() { | ||
if (!this.#subscription) { | ||
this.#debugLog("Document already disposed", this.#channel); | ||
return; | ||
} | ||
this.#subscription.subscribe(); | ||
} | ||
|
||
stopSync() { | ||
this.#stopReSync() | ||
this.#subscription.unsubscribe(); | ||
this.#debugLog("Stopped and unsubscribed from channel", this.#channel); | ||
} | ||
|
||
#debugLog(...args) { | ||
if (!this.#debug) { | ||
return; | ||
} | ||
console.log(...args); | ||
} | ||
|
||
#randomInt(min, max) { // min and max included | ||
return Math.floor(Math.random() * (max - min + 1) + min); | ||
} | ||
|
||
#backoff(step, min, max) { | ||
// Full jitter technique, see: | ||
// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ | ||
if (step > 31) { step = 31; } | ||
const interval = this.#randomInt(0, Math.min(max, min * Math.pow(2, step))); | ||
return Math.min(max, min + interval); | ||
} | ||
|
||
#stopReSync() { | ||
if (this.#reSyncTimer) { | ||
clearTimeout(this.#reSyncTimer); | ||
this.#reSyncTimer = null; | ||
} | ||
} | ||
|
||
async #loadDocumentApplyBuffered() { | ||
try { | ||
const result = await this.#load(); | ||
this.#document = result.document; | ||
this.#version = result.version; | ||
this.#isLoaded = true; | ||
this.#debugLog("Initial state loaded", this.#document, "version:", this.#version); | ||
this.#processBufferedMessages(); | ||
this.#reSyncAttempts = 0; | ||
} catch (error) { | ||
const delay = this.#backoff(this.#reSyncAttempts, this.#minReSyncDelay, this.#maxReSyncDelay); | ||
this.#reSyncAttempts++; | ||
this.#debugLog('Failed to load initial data', error, 'retry in', delay); | ||
this.#reSyncTimer = setTimeout( () => { | ||
this.#debugLog("Re-syncing due to failed load"); | ||
this.#reSync(); | ||
}, delay); | ||
} | ||
} | ||
|
||
#reSync() { | ||
this.#isLoaded = false; // Reset the flag to collect new messages to the buffer. | ||
this.#messageBuffer = []; | ||
this.#loadDocumentApplyBuffered().catch( | ||
error => this.#debugLog("Unhandled error in loadDocumentApplyBuffered", error)); | ||
} | ||
|
||
#processBufferedMessages() { | ||
this.#messageBuffer.forEach((msg) => { | ||
const newVersion = this.#compareVersion(msg, this.#version); | ||
if (newVersion) { | ||
this.#document = this.#applyUpdate(this.#document, msg.data); | ||
this.#version = newVersion; | ||
} else { | ||
this.#debugLog("Skip buffered publication", msg); | ||
} | ||
}); | ||
// Clear the buffer after processing. | ||
this.#messageBuffer = []; | ||
this.#onChange(this.#document); | ||
} | ||
} |