From cb46d6b15f5a0ee3b2c193531fd00e39f0f242af Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Sat, 5 Oct 2024 14:52:40 +0300 Subject: [PATCH] Fix Fossil delta construction in recovered publications (#415) --- _examples/go.mod | 2 +- _examples/go.sum | 4 ++-- client.go | 24 +++++++++++++++------ client_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 10 deletions(-) diff --git a/_examples/go.mod b/_examples/go.mod index 1ca0232a..764e44c8 100644 --- a/_examples/go.mod +++ b/_examples/go.mod @@ -75,7 +75,7 @@ require ( github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/quic-go/qpack v0.4.0 // indirect - github.com/redis/rueidis v1.0.46 // indirect + github.com/redis/rueidis v1.0.47 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/segmentio/encoding v0.4.0 // indirect github.com/shadowspore/fossil-delta v0.0.0-20240102155221-e3a8590b820b // indirect diff --git a/_examples/go.sum b/_examples/go.sum index 72c500d1..e44aaf6e 100644 --- a/_examples/go.sum +++ b/_examples/go.sum @@ -156,8 +156,8 @@ github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM= github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M= -github.com/redis/rueidis v1.0.46 h1:D4XWUZU2ByZpWky+LfIB/AouuLCkIL44BDI2NlJN63E= -github.com/redis/rueidis v1.0.46/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM= +github.com/redis/rueidis v1.0.47 h1:41UdeXOo4eJuW+cfpUJuLtVGyO0QJY3A2rEYgJWlfHs= +github.com/redis/rueidis v1.0.47/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= diff --git a/client.go b/client.go index 2eb84099..7863b9f1 100644 --- a/client.go +++ b/client.go @@ -19,6 +19,7 @@ import ( "github.com/google/uuid" "github.com/segmentio/encoding/json" fdelta "github.com/shadowspore/fossil-delta" + "google.golang.org/protobuf/encoding/protojson" ) // Empty Replies/Pushes for pings. @@ -1017,24 +1018,33 @@ func (c *Client) traceInCmd(cmd *protocol.Command) { c.mu.RLock() user := c.user c.mu.RUnlock() - jsonBytes, _ := json.Marshal(cmd) - c.node.logger.log(newLogEntry(LogLevelTrace, "<--", map[string]any{"client": c.ID(), "user": user, "command": string(jsonBytes)})) + jsonBytes, err := json.Marshal(cmd) + if err != nil { + jsonBytes, _ = protojson.Marshal(cmd) + } + c.node.logger.log(newLogEntry(LogLevelTrace, "<-in--", map[string]any{"client": c.ID(), "user": user, "command": string(jsonBytes)})) } func (c *Client) traceOutReply(rep *protocol.Reply) { c.mu.RLock() user := c.user c.mu.RUnlock() - jsonBytes, _ := json.Marshal(rep) - c.node.logger.log(newLogEntry(LogLevelTrace, "-->", map[string]any{"client": c.ID(), "user": user, "reply": string(jsonBytes)})) + jsonBytes, err := json.Marshal(rep) + if err != nil { + jsonBytes, _ = protojson.Marshal(rep) + } + c.node.logger.log(newLogEntry(LogLevelTrace, "-out->", map[string]any{"client": c.ID(), "user": user, "reply": string(jsonBytes)})) } func (c *Client) traceOutPush(push *protocol.Push) { c.mu.RLock() user := c.user c.mu.RUnlock() - jsonBytes, _ := json.Marshal(push) - c.node.logger.log(newLogEntry(LogLevelTrace, "-->", map[string]any{"client": c.ID(), "user": user, "push": string(jsonBytes)})) + jsonBytes, err := json.Marshal(push) + if err != nil { + jsonBytes, _ = protojson.Marshal(push) + } + c.node.logger.log(newLogEntry(LogLevelTrace, "-out->", map[string]any{"client": c.ID(), "user": user, "push": string(jsonBytes)})) } // Lock must be held outside. @@ -3088,8 +3098,8 @@ func (c *Client) makeRecoveredPubsDeltaFossil(recoveredPubs []*protocol.Publicat Tags: pub.Tags, Delta: delta, } + prevPub = recoveredPubs[i+1] recoveredPubs[i+1] = deltaPub - prevPub = recoveredPubs[i] } } return recoveredPubs diff --git a/client_test.go b/client_test.go index 9a7f3720..a0fbafed 100644 --- a/client_test.go +++ b/client_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/centrifugal/protocol" + fdelta "github.com/shadowspore/fossil-delta" "github.com/stretchr/testify/require" ) @@ -1330,15 +1331,69 @@ func newTestClientCustomTransport(t testing.TB, ctx context.Context, node *Node, } func newTestClientV2(t testing.TB, node *Node, userID string) *Client { + return newTestClientV2Protocol(t, node, userID, ProtocolTypeJSON) +} + +func newTestClientV2Protocol(t testing.TB, node *Node, userID string, protocol ProtocolType) *Client { ctx, cancelFn := context.WithCancel(context.Background()) transport := newTestTransport(cancelFn) transport.setProtocolVersion(ProtocolVersion2) + transport.setProtocolType(protocol) newCtx := SetCredentials(ctx, &Credentials{UserID: userID}) client, err := newClient(newCtx, node, transport) require.NoError(t, err) return client } +func TestFossilRecoveredPubs(t *testing.T) { + t.Parallel() + node := defaultNodeNoHandlers() + defer func() { _ = node.Shutdown(context.Background()) }() + client := newTestClientV2Protocol(t, node, "42", ProtocolTypeProtobuf) + + pubs := client.makeRecoveredPubsDeltaFossil([]*protocol.Publication{}) + require.Len(t, pubs, 0) + + pubs = client.makeRecoveredPubsDeltaFossil([]*protocol.Publication{ + { + Offset: 1, + Data: []byte("test"), + }, + }) + require.Len(t, pubs, 1) + require.False(t, pubs[0].Delta) + + pubs = client.makeRecoveredPubsDeltaFossil([]*protocol.Publication{ + { + Offset: 1, + Data: []byte("This is a message to test Fossil: I just subscribed to channel"), + }, + { + Offset: 2, + Data: []byte("This is a message to test Fossil: Hi from Java"), + }, + { + Offset: 3, + Data: []byte("This is a message to test Fossil: I just subscribed to channel"), + }, + }) + require.Len(t, pubs, 3) + require.False(t, pubs[0].Delta) + require.True(t, pubs[1].Delta) + require.True(t, pubs[2].Delta) + + data := pubs[0].Data + require.Equal(t, []byte("This is a message to test Fossil: I just subscribed to channel"), []byte(data)) + + data1, err := fdelta.Apply(data, pubs[1].Data) + require.NoError(t, err) + require.Equal(t, []byte("This is a message to test Fossil: Hi from Java"), data1) + + data2, err := fdelta.Apply(data1, pubs[2].Data) + require.NoError(t, err) + require.Equal(t, []byte("This is a message to test Fossil: I just subscribed to channel"), data2) +} + func TestClientUnsubscribeClientSide(t *testing.T) { t.Parallel() node := defaultNodeNoHandlers()