Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements Command Watch Manager and Adds GET.WATCH Command Support #924

Merged
merged 15 commits into from
Oct 7, 2024
4 changes: 2 additions & 2 deletions integration_tests/commands/async/getex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ func TestGetEx(t *testing.T) {
Etime10 := strconv.FormatInt(time.Now().Unix()+10, 10)

testCases := []struct {
name string
commands []string
name string
commands []string
expected []interface{}
assertType []string
delay []time.Duration
Expand Down
10 changes: 5 additions & 5 deletions integration_tests/commands/async/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,8 +862,8 @@ func TestJsonNummultby(t *testing.T) {
invalidArgMessage := "ERR wrong number of arguments for 'json.nummultby' command"

testCases := []struct {
name string
commands []string
name string
commands []string
expected []interface{}
assertType []string
}{
Expand Down Expand Up @@ -1021,9 +1021,9 @@ func TestJSONNumIncrBy(t *testing.T) {
defer conn.Close()
invalidArgMessage := "ERR wrong number of arguments for 'json.numincrby' command"
testCases := []struct {
name string
setupData string
commands []string
name string
setupData string
commands []string
expected []interface{}
assertType []string
cleanUp []string
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/commands/async/qwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func setupQWATCHTest(t *testing.T) (net.Conn, []net.Conn, func()) {
subscribers := []net.Conn{getLocalConnection(), getLocalConnection(), getLocalConnection()}

cleanup := func() {
cleanupKeys(publisher)
cleanupQWATCHKeys(publisher)
if err := publisher.Close(); err != nil {
t.Errorf("Error closing publisher connection: %v", err)
}
Expand Down Expand Up @@ -342,7 +342,7 @@ func verifyJSONUpdates(t *testing.T, rp *clientio.RESPParser, tc JSONTestCase) {
}
}

func cleanupKeys(publisher net.Conn) {
func cleanupQWATCHKeys(publisher net.Conn) {
for _, tc := range qWatchTestCases {
FireCommand(publisher, fmt.Sprintf("DEL %s:%d", tc.key, tc.userID))
}
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/commands/async/set_data_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func TestSetDataCommand(t *testing.T) {
defer conn.Close()

testCases := []struct {
name string
cmd []string
name string
cmd []string
expected []interface{}
assertType []string
delay []time.Duration
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/commands/async/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func RunTestServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
var err error
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Server.WatchChanBufSize)
gec := make(chan error)
shardManager := shard.NewShardManager(1, watchChan, gec, opt.Logger)
shardManager := shard.NewShardManager(1, watchChan, nil, gec, opt.Logger)
// Initialize the AsyncServer
testServer := server.NewAsyncServer(shardManager, watchChan, opt.Logger)

Expand Down
4 changes: 2 additions & 2 deletions integration_tests/commands/async/touch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ func TestTouch(t *testing.T) {
defer conn.Close()

testCases := []struct {
name string
commands []string
name string
commands []string
expected []interface{}
assertType []string
delay []time.Duration
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/commands/http/getex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ func TestGetEx(t *testing.T) {
Etime10 := strconv.FormatInt(time.Now().Unix()+10, 10)

testCases := []struct {
name string
commands []HTTPCommand
name string
commands []HTTPCommand
expected []interface{}
assertType []string
delay []time.Duration
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/commands/http/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption

globalErrChannel := make(chan error)
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Server.WatchChanBufSize)
shardManager := shard.NewShardManager(1, watchChan, globalErrChannel, opt.Logger)
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel, opt.Logger)
queryWatcherLocal := querymanager.NewQueryManager(opt.Logger)
config.HTTPPort = opt.Port
// Initialize the HTTPServer
Expand Down
80 changes: 80 additions & 0 deletions integration_tests/commands/resp/getwatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package resp

import (
"fmt"
"github.com/dicedb/dice/internal/clientio"
"gotest.tools/v3/assert"
"net"
"testing"
"time"
)

const getWatchKey = "getwatchkey"

type getWatchTestCase struct {
key string
val string
}

var getWatchTestCases = []getWatchTestCase{
{getWatchKey, "value1"},
{getWatchKey, "value2"},
{getWatchKey, "value3"},
{getWatchKey, "value4"},
}

func TestGETWATCH(t *testing.T) {
publisher := getLocalConnection()
subscribers := []net.Conn{getLocalConnection(), getLocalConnection(), getLocalConnection()}

defer func() {
if err := publisher.Close(); err != nil {
t.Errorf("Error closing publisher connection: %v", err)
}
for _, sub := range subscribers {
//FireCommand(sub, fmt.Sprintf("GET.UNWATCH %s", fingerprint))
time.Sleep(100 * time.Millisecond)
if err := sub.Close(); err != nil {
t.Errorf("Error closing subscriber connection: %v", err)
}
}
}()

// Fire a SET command to set a key
res := FireCommand(publisher, fmt.Sprintf("SET %s %s", getWatchKey, "value"))
assert.Equal(t, "OK", res)

respParsers := make([]*clientio.RESPParser, len(subscribers))
for i, subscriber := range subscribers {
rp := fireCommandAndGetRESPParser(subscriber, fmt.Sprintf("GET.WATCH %s", getWatchKey))
assert.Assert(t, rp != nil)
respParsers[i] = rp

v, err := rp.DecodeOne()
assert.NilError(t, err)
castedValue, ok := v.([]interface{})
if !ok {
t.Errorf("Type assertion to []interface{} failed for value: %v", v)
}
assert.Equal(t, 3, len(castedValue))
}

// Fire updates to the key using the publisher, then check if the subscribers receive the updates in the push-response form (i.e. array of three elements, with third element being the value)
for _, tc := range getWatchTestCases {
res := FireCommand(publisher, fmt.Sprintf("SET %s %s", tc.key, tc.val))
assert.Equal(t, "OK", res)

for _, rp := range respParsers {
v, err := rp.DecodeOne()
assert.NilError(t, err)
castedValue, ok := v.([]interface{})
if !ok {
t.Errorf("Type assertion to []interface{} failed for value: %v", v)
}
assert.Equal(t, 3, len(castedValue))
assert.Equal(t, "GET", castedValue[0])
assert.Equal(t, "1768826704", castedValue[1])
assert.Equal(t, tc.val, castedValue[2])
}
}
}
9 changes: 5 additions & 4 deletions integration_tests/commands/resp/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) {
config.DiceConfig.Server.Port = 9739
}

watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Server.KeysLimit)
queryWatchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Server.KeysLimit)
cmdWatchChan := make(chan dstore.CmdWatchEvent, config.DiceConfig.Server.KeysLimit)
gec := make(chan error)
shardManager := shard.NewShardManager(1, watchChan, gec, logr)
shardManager := shard.NewShardManager(1, queryWatchChan, cmdWatchChan, gec, logr)
workerManager := worker.NewWorkerManager(20000, shardManager)
// Initialize the REST Server
testServer := resp.NewServer(shardManager, workerManager, gec, logr)
// Initialize the RESP Server
testServer := resp.NewServer(shardManager, workerManager, cmdWatchChan, gec, logr)

ctx, cancel := context.WithCancel(context.Background())
fmt.Println("Starting the test server on port", config.DiceConfig.Server.Port)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/commands/websocket/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO
// Initialize the WebsocketServer
globalErrChannel := make(chan error)
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Server.WatchChanBufSize)
shardManager := shard.NewShardManager(1, watchChan, globalErrChannel, opt.Logger)
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel, opt.Logger)
config.WebsocketPort = opt.Port
testServer := server.NewWebSocketServer(shardManager, watchChan, opt.Logger)

Expand Down
16 changes: 9 additions & 7 deletions internal/clientio/push_response.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package clientio

import (
"github.com/dicedb/dice/internal/sql"
const (
ResponseTypeRegular = iota
ResponseTypePush
)

// CreatePushResponse creates a push response. Push responses refer to messages that the server sends to clients without
// the client explicitly requesting them. These are typically seen in scenarios where the client has subscribed to some
// kind of event or data feed and is notified in real-time when changes occur
func CreatePushResponse(query *sql.DSQLQuery, result *[]sql.QueryResultRow) (response []interface{}) {
// kind of event or data feed and is notified in real-time when changes occur.
// `key` is the unique key that identifies the push response.
func CreatePushResponse[T any](cmd, key string, result T) (response []interface{}) {
response = make([]interface{}, 3)
response[0] = sql.Qwatch
response[1] = query.String()
response[2] = *result
response[0] = cmd
response[1] = key
response[2] = result
return
}
21 changes: 21 additions & 0 deletions internal/cmd/cmds.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package cmd

import (
"fmt"
"strings"

"github.com/dgryski/go-farm"
)

type DiceDBCmd struct {
RequestID uint32
Cmd string
Expand All @@ -10,3 +17,17 @@ type RedisCmds struct {
Cmds []*DiceDBCmd
RequestID uint32
}

// GetFingerprint returns a 32-bit fingerprint of the command and its arguments.
func (cmd *DiceDBCmd) GetFingerprint() uint32 {
return farm.Fingerprint32([]byte(fmt.Sprintf("%s-%s", cmd.Cmd, strings.Join(cmd.Args, " "))))
}

// GetKey Returns the key which the command operates on.
//
// TODO: This is a naive implementation which assumes that the first argument is the key.
// This is not true for all commands, however, for now this is only used by the watch manager,
// which as of now only supports a small subset of commands (all of which fit this implementation).
func (cmd *DiceDBCmd) GetKey() string {
return cmd.Args[0]
}
6 changes: 6 additions & 0 deletions internal/comm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import (
"github.com/dicedb/dice/internal/cmd"
)

type CmdWatchResponse struct {
ClientIdentifierID uint32
Result interface{}
Error error
}

type QwatchResponse struct {
ClientIdentifierID uint32
Result interface{}
Expand Down
4 changes: 2 additions & 2 deletions internal/eval/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func TestBloomFilter(t *testing.T) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
// This test only contains some basic checks for all the bloom filter
// operations like BFINIT, BFADD, BFEXISTS. It assumes that the
// functions called in the main function are working correctly and
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestBloomFilter(t *testing.T) {
}

func TestGetOrCreateBloomFilter(t *testing.T) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
// Create a key and default opts
key := "bf"
opts, _ := newBloomOpts([]string{}, true)
Expand Down
2 changes: 1 addition & 1 deletion internal/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -2128,7 +2128,7 @@ func EvalQWATCH(args []string, httpOp bool, client *comm.Client, store *dstore.S
}

// TODO: We should return the list of all queries being watched by the client.
return clientio.Encode(clientio.CreatePushResponse(&query, queryResult.Result), false)
return clientio.Encode(clientio.CreatePushResponse(sql.Qwatch, query.String(), *queryResult.Result), false)
}

// EvalQUNWATCH removes the specified key from the watch list for the caller client.
Expand Down
Loading