Skip to content

Commit

Permalink
linter
Browse files Browse the repository at this point in the history
  • Loading branch information
JyotinderSingh committed Oct 6, 2024
1 parent 9efd40b commit 81d7240
Show file tree
Hide file tree
Showing 13 changed files with 64 additions and 66 deletions.
2 changes: 1 addition & 1 deletion integration_tests/commands/async/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func RunTestServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
var err error
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Server.KeysLimit)
gec := make(chan error)
shardManager := shard.NewShardManager(1, watchChan, gec, opt.Logger)
shardManager := shard.NewShardManager(1, watchChan, nil, gec, opt.Logger)
// Initialize the AsyncServer
testServer := server.NewAsyncServer(shardManager, watchChan, opt.Logger)

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/commands/http/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption

globalErrChannel := make(chan error)
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Server.KeysLimit)
shardManager := shard.NewShardManager(1, watchChan, globalErrChannel, opt.Logger)
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel, opt.Logger)
queryWatcherLocal := querymanager.NewQueryManager(opt.Logger)
config.HTTPPort = opt.Port
// Initialize the HTTPServer
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/commands/websocket/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO
// Initialize the WebsocketServer
globalErrChannel := make(chan error)
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Server.KeysLimit)
shardManager := shard.NewShardManager(1, watchChan, globalErrChannel, opt.Logger)
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel, opt.Logger)
config.WebsocketPort = opt.Port
testServer := server.NewWebSocketServer(shardManager, watchChan, opt.Logger)

Expand Down
3 changes: 2 additions & 1 deletion internal/cmd/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package cmd

import (
"fmt"
"github.com/dgryski/go-farm"
"strings"

"github.com/dgryski/go-farm"
)

type DiceDBCmd struct {
Expand Down
4 changes: 2 additions & 2 deletions internal/eval/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func TestBloomFilter(t *testing.T) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
// This test only contains some basic checks for all the bloom filter
// operations like BFINIT, BFADD, BFEXISTS. It assumes that the
// functions called in the main function are working correctly and
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestBloomFilter(t *testing.T) {
}

func TestGetOrCreateBloomFilter(t *testing.T) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
// Create a key and default opts
key := "bf"
opts, _ := newBloomOpts([]string{}, true)
Expand Down
10 changes: 0 additions & 10 deletions internal/eval/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,16 +1051,6 @@ var (
Arity: -4,
KeySpecs: KeySpecs{BeginIndex: 1},
}
getWatchCmdMeta = DiceCmdMeta{
Name: "GET.WATCH",
Info: `GET.WATCH key
Returns the value of the key and starts watching for changes in the key's value. Note that some update
deliveries may be missed in case of high write rate on the given key. However, the values being delivered will
always be monotonically consistent.`,
Arity: 2,
KeySpecs: KeySpecs{BeginIndex: 1},
CmdEquivalent: "GET",
}
geoAddCmdMeta = DiceCmdMeta{
Name: "GEOADD",
Info: `Adds one or more members to a geospatial index. The key is created if it doesn't exist.`,
Expand Down
36 changes: 18 additions & 18 deletions internal/eval/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func setupTest(store *dstore.Store) *dstore.Store {
}

func TestEval(t *testing.T) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)

testEvalMSET(t, store)
testEvalECHO(t, store)
Expand Down Expand Up @@ -1108,7 +1108,7 @@ func testEvalJSONOBJLEN(t *testing.T, store *dstore.Store) {

func BenchmarkEvalJSONOBJLEN(b *testing.B) {
sizes := []int{0, 10, 100, 1000, 10000, 100000} // Various sizes of JSON objects
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)

for _, size := range sizes {
b.Run(fmt.Sprintf("JSONObjectSize_%d", size), func(b *testing.B) {
Expand Down Expand Up @@ -2747,13 +2747,13 @@ func runEvalTests(t *testing.T, tests map[string]evalTestCase, evalFunc func([]s
func BenchmarkEvalMSET(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
evalMSET([]string{"KEY", "VAL", "KEY2", "VAL2"}, store)
}
}

func BenchmarkEvalHSET(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
for i := 0; i < b.N; i++ {
evalHSET([]string{"KEY", fmt.Sprintf("FIELD_%d", i), fmt.Sprintf("VALUE_%d", i)}, store)
}
Expand Down Expand Up @@ -2888,7 +2888,7 @@ func testEvalHKEYS(t *testing.T, store *dstore.Store) {
}

func BenchmarkEvalHKEYS(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)

for i := 0; i < b.N; i++ {
evalHSET([]string{"KEY", fmt.Sprintf("FIELD_%d", i), fmt.Sprintf("VALUE_%d", i)}, store)
Expand All @@ -2899,7 +2899,7 @@ func BenchmarkEvalHKEYS(b *testing.B) {
}
}
func BenchmarkEvalPFCOUNT(b *testing.B) {
store := *dstore.NewStore(nil)
store := *dstore.NewStore(nil, nil)

// Helper function to create and insert HLL objects
createAndInsertHLL := func(key string, items []string) {
Expand Down Expand Up @@ -3247,7 +3247,7 @@ func testEvalHLEN(t *testing.T, store *dstore.Store) {

func BenchmarkEvalHLEN(b *testing.B) {
sizes := []int{0, 10, 100, 1000, 10000, 100000}
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)

for _, size := range sizes {
b.Run(fmt.Sprintf("HashSize_%d", size), func(b *testing.B) {
Expand Down Expand Up @@ -3489,7 +3489,7 @@ func testEvalTYPE(t *testing.T, store *dstore.Store) {
}

func BenchmarkEvalTYPE(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)

// Define different types of objects to benchmark
objectTypes := map[string]func(){
Expand Down Expand Up @@ -3680,7 +3680,7 @@ func testEvalJSONOBJKEYS(t *testing.T, store *dstore.Store) {

func BenchmarkEvalJSONOBJKEYS(b *testing.B) {
sizes := []int{0, 10, 100, 1000, 10000, 100000} // Various sizes of JSON objects
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)

for _, size := range sizes {
b.Run(fmt.Sprintf("JSONObjectSize_%d", size), func(b *testing.B) {
Expand Down Expand Up @@ -3848,7 +3848,7 @@ func testEvalGETRANGE(t *testing.T, store *dstore.Store) {
}

func BenchmarkEvalGETRANGE(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
store.Put("BENCHMARK_KEY", store.NewObj("Hello World", maxExDuration, object.ObjTypeString, object.ObjEncodingRaw))

inputs := []struct {
Expand All @@ -3873,7 +3873,7 @@ func BenchmarkEvalGETRANGE(b *testing.B) {
}

func BenchmarkEvalHSETNX(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
for i := 0; i < b.N; i++ {
evalHSETNX([]string{"KEY", fmt.Sprintf("FIELD_%d", i/2), fmt.Sprintf("VALUE_%d", i)}, store)
}
Expand Down Expand Up @@ -3935,15 +3935,15 @@ func testEvalHSETNX(t *testing.T, store *dstore.Store) {
}

func TestMSETConsistency(t *testing.T) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
evalMSET([]string{"KEY", "VAL", "KEY2", "VAL2"}, store)

assert.Equal(t, "VAL", store.Get("KEY").Value)
assert.Equal(t, "VAL2", store.Get("KEY2").Value)
}

func BenchmarkEvalHINCRBY(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)

// creating new fields
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -4193,7 +4193,7 @@ func testEvalSETEX(t *testing.T, store *dstore.Store) {
}

func BenchmarkEvalSETEX(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -4368,7 +4368,7 @@ func testEvalINCRBYFLOAT(t *testing.T, store *dstore.Store) {
}

func BenchmarkEvalINCRBYFLOAT(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
store.Put("key1", store.NewObj("1", maxExDuration, object.ObjTypeString, object.ObjEncodingEmbStr))
store.Put("key2", store.NewObj("1.2", maxExDuration, object.ObjTypeString, object.ObjEncodingEmbStr))

Expand Down Expand Up @@ -4483,7 +4483,7 @@ func testEvalBITOP(t *testing.T, store *dstore.Store) {
}

func BenchmarkEvalBITOP(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)

// Setup initial data for benchmarking
store.Put("key1", store.NewObj(&ByteArray{data: []byte{0x01, 0x02, 0xff}}, maxExDuration, object.ObjTypeByteArray, object.ObjEncodingByteArray))
Expand Down Expand Up @@ -4777,7 +4777,7 @@ func testEvalAPPEND(t *testing.T, store *dstore.Store) {
}

func BenchmarkEvalAPPEND(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
for i := 0; i < b.N; i++ {
evalAPPEND([]string{"key", fmt.Sprintf("val_%d", i)}, store)
}
Expand Down Expand Up @@ -5250,7 +5250,7 @@ func testEvalHINCRBYFLOAT(t *testing.T, store *dstore.Store) {
}

func BenchmarkEvalHINCRBYFLOAT(b *testing.B) {
store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)

// Setting initial fields with some values
store.Put("key1", store.NewObj(HashMap{"field1": "1.0", "field2": "1.2"}, maxExDuration, object.ObjTypeHashMap, object.ObjEncodingHashMap))
Expand Down
2 changes: 1 addition & 1 deletion internal/eval/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestMain(m *testing.M) {
l := logger.New(logger.Opts{WithTimestamp: false})
slog.SetDefault(l)

store := dstore.NewStore(nil)
store := dstore.NewStore(nil, nil)
store.ResetStore()

exitCode := m.Run()
Expand Down
5 changes: 3 additions & 2 deletions internal/server/resp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"context"
"errors"
"fmt"
dstore "github.com/dicedb/dice/internal/store"
"github.com/dicedb/dice/internal/watchmanager"
"log/slog"
"net"
"sync"
"sync/atomic"
"syscall"
"time"

dstore "github.com/dicedb/dice/internal/store"
"github.com/dicedb/dice/internal/watchmanager"

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/clientio/iohandler/netconn"
respparser "github.com/dicedb/dice/internal/clientio/requestparser/resp"
Expand Down
2 changes: 1 addition & 1 deletion internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (store *Store) notifyQueryManager(k, operation string, obj object.Obj) {
store.queryWatchChan <- QueryWatchEvent{k, operation, obj}
}

func (store *Store) notifyWatchManager(cmd string, affectedKey string) {
func (store *Store) notifyWatchManager(cmd, affectedKey string) {
store.cmdWatchChan <- CmdWatchEvent{cmd, affectedKey}
}

Expand Down
35 changes: 18 additions & 17 deletions internal/watchmanager/watch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,25 @@ package watchmanager

import (
"context"
"github.com/dicedb/dice/internal/cmd"
dstore "github.com/dicedb/dice/internal/store"
"log/slog"
"sync"

"github.com/dicedb/dice/internal/cmd"
dstore "github.com/dicedb/dice/internal/store"
)

type (
WatchSubscription struct {
Subscribe bool // Subscribe is true for subscribe, false for unsubscribe. Required.
AdhocReqChan chan *cmd.RedisCmd // AdhocReqChan is the channel to send adhoc requests to the worker. Required.
WatchCmd *cmd.RedisCmd // WatchCmd Represents a unique key for each watch artifact, only populated for subscriptions.
Fingerprint uint32 // Fingerprint is a unique identifier for each watch artifact, only populated for unsubscriptions.
Subscribe bool // Subscribe is true for subscribe, false for unsubscribe. Required.
AdhocReqChan chan *cmd.DiceDBCmd // AdhocReqChan is the channel to send adhoc requests to the worker. Required.
WatchCmd *cmd.DiceDBCmd // WatchCmd Represents a unique key for each watch artifact, only populated for subscriptions.
Fingerprint uint32 // Fingerprint is a unique identifier for each watch artifact, only populated for unsubscriptions.
}

Manager struct {
querySubscriptionMap map[string]map[uint32]struct{} // querySubscriptionMap is a map of Key -> [fingerprint1, fingerprint2, ...]
tcpSubscriptionMap map[uint32]map[chan *cmd.RedisCmd]struct{} // tcpSubscriptionMap is a map of fingerprint -> [client1Chan, client2Chan, ...]
fingerprintCmdMap map[uint32]*cmd.RedisCmd // fingerprintCmdMap is a map of fingerprint -> RedisCmd
querySubscriptionMap map[string]map[uint32]struct{} // querySubscriptionMap is a map of Key -> [fingerprint1, fingerprint2, ...]
tcpSubscriptionMap map[uint32]map[chan *cmd.DiceDBCmd]struct{} // tcpSubscriptionMap is a map of fingerprint -> [client1Chan, client2Chan, ...]
fingerprintCmdMap map[uint32]*cmd.DiceDBCmd // fingerprintCmdMap is a map of fingerprint -> DiceDBCmd
logger *slog.Logger
}
)
Expand All @@ -36,8 +37,8 @@ func NewManager(logger *slog.Logger) *Manager {
CmdWatchSubscriptionChan = make(chan WatchSubscription)
return &Manager{
querySubscriptionMap: make(map[string]map[uint32]struct{}),
tcpSubscriptionMap: make(map[uint32]map[chan *cmd.RedisCmd]struct{}),
fingerprintCmdMap: make(map[uint32]*cmd.RedisCmd),
tcpSubscriptionMap: make(map[uint32]map[chan *cmd.DiceDBCmd]struct{}),
fingerprintCmdMap: make(map[uint32]*cmd.DiceDBCmd),
logger: logger,
}
}
Expand Down Expand Up @@ -85,12 +86,12 @@ func (m *Manager) handleSubscription(sub WatchSubscription) {
}
m.querySubscriptionMap[key][fingerprint] = struct{}{}

// Add RedisCmd to fingerprintCmdMap
// Add DiceDBCmd to fingerprintCmdMap
m.fingerprintCmdMap[fingerprint] = sub.WatchCmd

// Add client channel to tcpSubscriptionMap
if _, exists := m.tcpSubscriptionMap[fingerprint]; !exists {
m.tcpSubscriptionMap[fingerprint] = make(map[chan *cmd.RedisCmd]struct{})
m.tcpSubscriptionMap[fingerprint] = make(map[chan *cmd.DiceDBCmd]struct{})
}
m.tcpSubscriptionMap[fingerprint][sub.AdhocReqChan] = struct{}{}
}
Expand All @@ -116,8 +117,8 @@ func (m *Manager) handleUnsubscription(sub WatchSubscription) {
}

// Remove fingerprint from querySubscriptionMap
if redisCmd, ok := m.fingerprintCmdMap[fingerprint]; ok {
key := redisCmd.GetKey()
if diceDBCmd, ok := m.fingerprintCmdMap[fingerprint]; ok {
key := diceDBCmd.GetKey()
if fingerprints, ok := m.querySubscriptionMap[key]; ok {
// Remove the fingerprint from the list of fingerprints listening to this key
delete(fingerprints, fingerprint)
Expand Down Expand Up @@ -161,7 +162,7 @@ func (m *Manager) handleWatchEvent(event dstore.CmdWatchEvent) {
}

// notifyClients sends cmd to all clients listening to this fingerprint, so that they can execute it.
func (m *Manager) notifyClients(fingerprint uint32, cmd *cmd.RedisCmd) {
func (m *Manager) notifyClients(fingerprint uint32, diceDBCmd *cmd.DiceDBCmd) {
clients, exists := m.tcpSubscriptionMap[fingerprint]
if !exists {
m.logger.Warn("No clients found for fingerprint",
Expand All @@ -170,6 +171,6 @@ func (m *Manager) notifyClients(fingerprint uint32, cmd *cmd.RedisCmd) {
}

for clientChan := range clients {
clientChan <- cmd
clientChan <- diceDBCmd
}
}
Loading

0 comments on commit 81d7240

Please sign in to comment.