diff --git a/_examples/document_sync/index.html b/_examples/document_sync/index.html new file mode 100644 index 00000000..c90407d6 --- /dev/null +++ b/_examples/document_sync/index.html @@ -0,0 +1,50 @@ + + + + + + + + +
+ + + + + diff --git a/_examples/document_sync/main.go b/_examples/document_sync/main.go new file mode 100644 index 00000000..b812e53e --- /dev/null +++ b/_examples/document_sync/main.go @@ -0,0 +1,162 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + _ "net/http/pprof" + + "github.com/centrifugal/centrifuge" +) + +// Counter is a document we sync here. For counter Version always matches Value – but +// that's not the rule for other more complex documents of course. +type Counter struct { + Version int `json:"version"` + Value int `json:"value"` +} + +var ( + counter Counter + counterLock sync.RWMutex +) + +func handleLog(e centrifuge.LogEntry) { + log.Printf("%s: %v", e.Message, e.Fields) +} + +const exampleChannel = "counter" + +// Check whether channel is allowed for subscribing. In real case permission +// check will probably be more complex than in this example. +func channelSubscribeAllowed(channel string) bool { + return channel == exampleChannel +} + +func authMiddleware(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + newCtx := centrifuge.SetCredentials(ctx, ¢rifuge.Credentials{ + UserID: "42", + }) + r = r.WithContext(newCtx) + h.ServeHTTP(w, r) + }) +} + +func waitExitSignal(n *centrifuge.Node) { + sigCh := make(chan os.Signal, 1) + done := make(chan bool, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + _ = n.Shutdown(context.Background()) + done <- true + }() + <-done +} + +func main() { + node, _ := centrifuge.New(centrifuge.Config{ + LogLevel: centrifuge.LogLevelDebug, + LogHandler: handleLog, + }) + + node.OnConnect(func(client *centrifuge.Client) { + transport := client.Transport() + log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol()) + + client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { + log.Printf("user %s subscribes on %s", client.UserID(), e.Channel) + + if !channelSubscribeAllowed(e.Channel) { + log.Printf("user %s disallowed to subscribe on %s", client.UserID(), e.Channel) + cb(centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied) + return + } + + cb(centrifuge.SubscribeReply{ + Options: centrifuge.SubscribeOptions{ + EnableRecovery: true, + }, + }, nil) + }) + + client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) { + log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel) + }) + + client.OnDisconnect(func(e centrifuge.DisconnectEvent) { + log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect) + }) + }) + + if err := node.Run(); err != nil { + log.Fatal(err) + } + + http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{}))) + http.HandleFunc("/api/counter", getCounterHandler) + http.Handle("/", http.FileServer(http.Dir("./"))) + + counter = Counter{Version: 0, Value: 0} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the counter increment simulation. + go simulateCounterIncrease(ctx, node) + + go func() { + if err := http.ListenAndServe(":8000", nil); err != nil { + log.Fatal(err) + } + }() + + waitExitSignal(node) + log.Println("bye!") +} + +func getCounterHandler(w http.ResponseWriter, _ *http.Request) { + counterLock.RLock() + defer counterLock.RUnlock() + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(counter) +} + +func simulateCounterIncrease(ctx context.Context, node *centrifuge.Node) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(50 * time.Millisecond): + counterLock.Lock() + counter.Version++ + counter.Value++ + // Publishing under the lock here which is generally not good, but we want + // to emulate transactional outbox or CDC guarantees. + err := publishToChannel(node) + if err != nil { + // Emulate transaction rollback. + log.Println("publish to channel error", err) + counter.Version-- + counter.Value-- + } + counterLock.Unlock() + } + } +} + +func publishToChannel(node *centrifuge.Node) error { + data, _ := json.Marshal(counter) + _, err := node.Publish(exampleChannel, data, + centrifuge.WithHistory(100, 10*time.Second)) + return err +} diff --git a/_examples/document_sync/readme.md b/_examples/document_sync/readme.md new file mode 100644 index 00000000..58e421bd --- /dev/null +++ b/_examples/document_sync/readme.md @@ -0,0 +1,17 @@ +This example demonstrates a real-time document synchronization. To achieve it we are using a helper class RealTimeDocument – we delegate Centrifuge Subscription to it, also a function to load the document and its version. Subscription channel must uniquely correlate with the document we want to sync. + +We also provide a couple of additional functions to apply real-time updates to the current document version and compare versions (to ensure we are not applying non-actual real-time updates to the state). + +To achieve the proper sync the version in the document should be incremental. In the real-world case this incremental version should be an incremental field in database. And we assume that instead of direct publishing to Centrifuge the backend is using transactional outbox or CDC approach – so that changes in document atomically saved and reliably exported to Centrifuge/Centrifugo. + +Keep in mind, that messages must be sent to Centrifugo channel in the correct version order since we rely on version checks on the frontend. + +The important thing here is that we should first subscribe to the channel, then load the document from the backend. This helps to not miss intermediary updates, happening between document load and subscription request. If we can guarantee that all updates eventually reach Centrifugo (and in case of transactional outbox or CDC we can) – then the sync will work properly. + +To start the example run the following command from the example directory: + +``` +go run main.go +``` + +Then go to http://localhost:8000 to see it in action. Open a couple of browser windows. diff --git a/_examples/document_sync/rtdocument.js b/_examples/document_sync/rtdocument.js new file mode 100644 index 00000000..190e9bd4 --- /dev/null +++ b/_examples/document_sync/rtdocument.js @@ -0,0 +1,158 @@ +class RealTimeDocument { + #subscription; + #channel; + #load; + #applyUpdate; + #compareVersion; + #onChange; + #messageBuffer = []; + #document = null; + #version = null; + #isLoaded = false; + #reSyncTimer = null; + #minReSyncDelay; + #maxReSyncDelay; + #reSyncAttempts = 0; + #debug = false; + + constructor({ + subscription, + load, + applyUpdate, + compareVersion, + onChange, + minReSyncDelay = 100, + maxReSyncDelay = 10000, + debug = false, + }) { + if (!subscription) throw new Error("subscription is required"); + if (!load) throw new Error("load function is required"); + if (!applyUpdate) throw new Error("applyUpdate function is required"); + if (!compareVersion) throw new Error("compareVersion function is required"); + if (!onChange) throw new Error("onChange function is required"); + + this.#subscription = subscription; + this.#channel = subscription.channel; + this.#load = load; + this.#applyUpdate = applyUpdate; + this.#compareVersion = compareVersion; + this.#onChange = onChange; + this.#minReSyncDelay = minReSyncDelay; + this.#maxReSyncDelay = maxReSyncDelay; + this.#debug = debug; + + this.#subscription.on('publication', (ctx) => { + if (!this.#isLoaded) { + // Buffer messages until initial state is loaded. + this.#messageBuffer.push(ctx); + return; + } + // Process new messages immediately if initial state is already loaded. + const newVersion = this.#compareVersion(ctx, this.#version); + if (newVersion === null) { + this.#debugLog("Skip real-time publication", ctx); + return; + } + this.#document = this.#applyUpdate(this.#document, ctx.data); + this.#version = newVersion; + this.#onChange(this.#document); + }).on('subscribed', (ctx) => { + if (ctx.wasRecovering) { + if (ctx.recovered) { + this.#debugLog("Successfully re-attached to a stream"); + } else { + this.#debugLog("Re-syncing due to failed recovery"); + this.#reSync(); + } + } else { + this.#debugLog("Load data for the first time"); + this.#loadDocumentApplyBuffered().catch(error => this.#debugLog("Unhandled error in loadDocumentApplyBuffered", error)); + } + }).on('unsubscribed', (ctx) => { + this.#debugLog("Subscription unsubscribed", ctx); + this.stopSync(); + }); + } + + startSync() { + if (!this.#subscription) { + this.#debugLog("Document already disposed", this.#channel); + return; + } + this.#subscription.subscribe(); + } + + stopSync() { + this.#stopReSync() + this.#subscription.unsubscribe(); + this.#debugLog("Stopped and unsubscribed from channel", this.#channel); + } + + #debugLog(...args) { + if (!this.#debug) { + return; + } + console.log(...args); + } + + #randomInt(min, max) { // min and max included + return Math.floor(Math.random() * (max - min + 1) + min); + } + + #backoff(step, min, max) { + // Full jitter technique, see: + // https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + if (step > 31) { step = 31; } + const interval = this.#randomInt(0, Math.min(max, min * Math.pow(2, step))); + return Math.min(max, min + interval); + } + + #stopReSync() { + if (this.#reSyncTimer) { + clearTimeout(this.#reSyncTimer); + this.#reSyncTimer = null; + } + } + + async #loadDocumentApplyBuffered() { + try { + const result = await this.#load(); + this.#document = result.document; + this.#version = result.version; + this.#isLoaded = true; + this.#debugLog("Initial state loaded", this.#document, "version:", this.#version); + this.#processBufferedMessages(); + this.#reSyncAttempts = 0; + } catch (error) { + const delay = this.#backoff(this.#reSyncAttempts, this.#minReSyncDelay, this.#maxReSyncDelay); + this.#reSyncAttempts++; + this.#debugLog('Failed to load initial data', error, 'retry in', delay); + this.#reSyncTimer = setTimeout( () => { + this.#debugLog("Re-syncing due to failed load"); + this.#reSync(); + }, delay); + } + } + + #reSync() { + this.#isLoaded = false; // Reset the flag to collect new messages to the buffer. + this.#messageBuffer = []; + this.#loadDocumentApplyBuffered().catch( + error => this.#debugLog("Unhandled error in loadDocumentApplyBuffered", error)); + } + + #processBufferedMessages() { + this.#messageBuffer.forEach((msg) => { + const newVersion = this.#compareVersion(msg, this.#version); + if (newVersion) { + this.#document = this.#applyUpdate(this.#document, msg.data); + this.#version = newVersion; + } else { + this.#debugLog("Skip buffered publication", msg); + } + }); + // Clear the buffer after processing. + this.#messageBuffer = []; + this.#onChange(this.#document); + } +}