Skip to content

Commit

Permalink
Fix Fossil delta construction in recovered publications (#415)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Oct 5, 2024
1 parent f3e8061 commit cb46d6b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 10 deletions.
2 changes: 1 addition & 1 deletion _examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ require (
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/redis/rueidis v1.0.46 // indirect
github.com/redis/rueidis v1.0.47 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/segmentio/encoding v0.4.0 // indirect
github.com/shadowspore/fossil-delta v0.0.0-20240102155221-e3a8590b820b // indirect
Expand Down
4 changes: 2 additions & 2 deletions _examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM=
github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M=
github.com/redis/rueidis v1.0.46 h1:D4XWUZU2ByZpWky+LfIB/AouuLCkIL44BDI2NlJN63E=
github.com/redis/rueidis v1.0.46/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM=
github.com/redis/rueidis v1.0.47 h1:41UdeXOo4eJuW+cfpUJuLtVGyO0QJY3A2rEYgJWlfHs=
github.com/redis/rueidis v1.0.47/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
Expand Down
24 changes: 17 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/google/uuid"
"github.com/segmentio/encoding/json"
fdelta "github.com/shadowspore/fossil-delta"
"google.golang.org/protobuf/encoding/protojson"
)

// Empty Replies/Pushes for pings.
Expand Down Expand Up @@ -1017,24 +1018,33 @@ func (c *Client) traceInCmd(cmd *protocol.Command) {
c.mu.RLock()
user := c.user
c.mu.RUnlock()
jsonBytes, _ := json.Marshal(cmd)
c.node.logger.log(newLogEntry(LogLevelTrace, "<--", map[string]any{"client": c.ID(), "user": user, "command": string(jsonBytes)}))
jsonBytes, err := json.Marshal(cmd)
if err != nil {
jsonBytes, _ = protojson.Marshal(cmd)
}
c.node.logger.log(newLogEntry(LogLevelTrace, "<-in--", map[string]any{"client": c.ID(), "user": user, "command": string(jsonBytes)}))
}

func (c *Client) traceOutReply(rep *protocol.Reply) {
c.mu.RLock()
user := c.user
c.mu.RUnlock()
jsonBytes, _ := json.Marshal(rep)
c.node.logger.log(newLogEntry(LogLevelTrace, "-->", map[string]any{"client": c.ID(), "user": user, "reply": string(jsonBytes)}))
jsonBytes, err := json.Marshal(rep)
if err != nil {
jsonBytes, _ = protojson.Marshal(rep)
}
c.node.logger.log(newLogEntry(LogLevelTrace, "-out->", map[string]any{"client": c.ID(), "user": user, "reply": string(jsonBytes)}))
}

func (c *Client) traceOutPush(push *protocol.Push) {
c.mu.RLock()
user := c.user
c.mu.RUnlock()
jsonBytes, _ := json.Marshal(push)
c.node.logger.log(newLogEntry(LogLevelTrace, "-->", map[string]any{"client": c.ID(), "user": user, "push": string(jsonBytes)}))
jsonBytes, err := json.Marshal(push)
if err != nil {
jsonBytes, _ = protojson.Marshal(push)
}
c.node.logger.log(newLogEntry(LogLevelTrace, "-out->", map[string]any{"client": c.ID(), "user": user, "push": string(jsonBytes)}))
}

// Lock must be held outside.
Expand Down Expand Up @@ -3088,8 +3098,8 @@ func (c *Client) makeRecoveredPubsDeltaFossil(recoveredPubs []*protocol.Publicat
Tags: pub.Tags,
Delta: delta,
}
prevPub = recoveredPubs[i+1]
recoveredPubs[i+1] = deltaPub
prevPub = recoveredPubs[i]
}
}
return recoveredPubs
Expand Down
55 changes: 55 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/centrifugal/protocol"
fdelta "github.com/shadowspore/fossil-delta"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1330,15 +1331,69 @@ func newTestClientCustomTransport(t testing.TB, ctx context.Context, node *Node,
}

func newTestClientV2(t testing.TB, node *Node, userID string) *Client {
return newTestClientV2Protocol(t, node, userID, ProtocolTypeJSON)
}

func newTestClientV2Protocol(t testing.TB, node *Node, userID string, protocol ProtocolType) *Client {
ctx, cancelFn := context.WithCancel(context.Background())
transport := newTestTransport(cancelFn)
transport.setProtocolVersion(ProtocolVersion2)
transport.setProtocolType(protocol)
newCtx := SetCredentials(ctx, &Credentials{UserID: userID})
client, err := newClient(newCtx, node, transport)
require.NoError(t, err)
return client
}

func TestFossilRecoveredPubs(t *testing.T) {
t.Parallel()
node := defaultNodeNoHandlers()
defer func() { _ = node.Shutdown(context.Background()) }()
client := newTestClientV2Protocol(t, node, "42", ProtocolTypeProtobuf)

pubs := client.makeRecoveredPubsDeltaFossil([]*protocol.Publication{})
require.Len(t, pubs, 0)

pubs = client.makeRecoveredPubsDeltaFossil([]*protocol.Publication{
{
Offset: 1,
Data: []byte("test"),
},
})
require.Len(t, pubs, 1)
require.False(t, pubs[0].Delta)

pubs = client.makeRecoveredPubsDeltaFossil([]*protocol.Publication{
{
Offset: 1,
Data: []byte("This is a message to test Fossil: I just subscribed to channel"),
},
{
Offset: 2,
Data: []byte("This is a message to test Fossil: Hi from Java"),
},
{
Offset: 3,
Data: []byte("This is a message to test Fossil: I just subscribed to channel"),
},
})
require.Len(t, pubs, 3)
require.False(t, pubs[0].Delta)
require.True(t, pubs[1].Delta)
require.True(t, pubs[2].Delta)

data := pubs[0].Data
require.Equal(t, []byte("This is a message to test Fossil: I just subscribed to channel"), []byte(data))

data1, err := fdelta.Apply(data, pubs[1].Data)
require.NoError(t, err)
require.Equal(t, []byte("This is a message to test Fossil: Hi from Java"), data1)

data2, err := fdelta.Apply(data1, pubs[2].Data)
require.NoError(t, err)
require.Equal(t, []byte("This is a message to test Fossil: I just subscribed to channel"), data2)
}

func TestClientUnsubscribeClientSide(t *testing.T) {
t.Parallel()
node := defaultNodeNoHandlers()
Expand Down

0 comments on commit cb46d6b

Please sign in to comment.