Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor - Move command execution responsibility to CommandHandler from IOThread #1358

Merged
merged 28 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
16ab8bd
refactored to separate commandHandler from ioThread
psrvere Dec 4, 2024
0ffb45d
consolidated write to io channel in one place
psrvere Dec 4, 2024
d54d4bb
fixed linter warnings
psrvere Dec 4, 2024
9b86beb
don't stop command handler at command level error
psrvere Dec 5, 2024
690430d
fixed type assertion for command handler
psrvere Dec 5, 2024
3e15eca
added an error channel between io thread and command handler to signa…
psrvere Dec 5, 2024
07ca2b7
bug fix
psrvere Dec 5, 2024
c89b688
merged with master
psrvere Dec 16, 2024
4aab95c
fix build fail
psrvere Dec 16, 2024
05ec942
merged with master
psrvere Dec 17, 2024
cfaff73
merged with master
psrvere Dec 21, 2024
de67803
bug fix
psrvere Dec 22, 2024
3b20026
Merge branch 'master' into refactor/commandhandler
psrvere Dec 22, 2024
4d0e763
remove unnecessary field maxCmdHandlers, use MaxClients instead
psrvere Dec 22, 2024
82c402a
rename Command Handler Manager to Registry
psrvere Dec 22, 2024
0c2c2c0
added ok idiom in cmdHandler type casting
psrvere Dec 22, 2024
48c21e7
moved RespAuth to command handler file
psrvere Dec 22, 2024
8fdb6b1
changed field order in BaseCommandHandler to achieve minimum memomory…
psrvere Dec 22, 2024
901433a
used config value in placed of hard coded max clients value
psrvere Dec 22, 2024
d3b436b
changed all decompose command functions to pointers functions of Base…
psrvere Dec 22, 2024
b80bb42
changed stringNil constant to Nil in httpServer.go
psrvere Dec 22, 2024
40a3fef
don't show missing command hanlder function for global functionserror…
psrvere Dec 22, 2024
de57154
use uint32 for client connection fields in command handler and io thread
psrvere Dec 22, 2024
8076e44
removed unnecessary checks from command handler registration
psrvere Dec 22, 2024
183a1f9
registration for both io thread and command handler is done before st…
psrvere Dec 22, 2024
7c21a78
changed id format for io thread and command handler
psrvere Dec 22, 2024
593c283
Merge branch 'master' into refactor/commandhandler
psrvere Jan 4, 2025
6070585
fix pfmerge build errors
psrvere Jan 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions integration_tests/commands/resp/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"testing"
"time"

"github.com/dicedb/dice/internal/commandhandler"
"github.com/dicedb/dice/internal/iothread"
"github.com/dicedb/dice/internal/server/resp"
"github.com/dicedb/dice/internal/wal"
Expand Down Expand Up @@ -211,10 +212,12 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) {
cmdWatchSubscriptionChan := make(chan watchmanager.WatchSubscription)
gec := make(chan error)
shardManager := shard.NewShardManager(1, cmdWatchChan, gec)
ioThreadManager := iothread.NewManager(20000, shardManager)
ioThreadManager := iothread.NewManager(20000)
psrvere marked this conversation as resolved.
Show resolved Hide resolved
cmdHandlerManager := commandhandler.NewRegistry(20000, shardManager)

// Initialize the RESP Server
wl, _ := wal.NewNullWAL()
testServer := resp.NewServer(shardManager, ioThreadManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, wl)
testServer := resp.NewServer(shardManager, ioThreadManager, cmdHandlerManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, wl)

ctx, cancel := context.WithCancel(context.Background())
fmt.Println("Starting the test server on port", config.DiceConfig.RespServer.Port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"math"
Expand All @@ -24,7 +24,7 @@ import (
"github.com/dicedb/dice/internal/ops"
)

// This file contains functions used by the IOThread to handle and process responses
// This file contains functions used by the CommandHandler to handle and process responses
// from multiple shards during distributed operations. For commands that are executed
// across several shards, such as MultiShard commands, dedicated functions are responsible
// for aggregating and managing the results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"fmt"
Expand All @@ -28,7 +28,7 @@ import (

// RespAuth returns with an encoded "OK" if the user is authenticated
// If the user is not authenticated, it returns with an encoded error message
func (t *BaseIOThread) RespAuth(args []string) interface{} {
func (h *BaseCommandHandler) RespAuth(args []string) interface{} {
psrvere marked this conversation as resolved.
Show resolved Hide resolved
// Check for incorrect number of arguments (arity error).
if len(args) < 1 || len(args) > 2 {
return diceerrors.ErrWrongArgumentCount("AUTH")
Expand All @@ -47,7 +47,7 @@ func (t *BaseIOThread) RespAuth(args []string) interface{} {
username, password = args[0], args[1]
}

if err := t.Session.Validate(username, password); err != nil {
if err := h.Session.Validate(username, password); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"context"
Expand All @@ -28,7 +28,7 @@ import (
"github.com/dicedb/dice/internal/store"
)

// This file is utilized by the IOThread to decompose commands that need to be executed
// This file is utilized by the CommandHandler to decompose commands that need to be executed
// across multiple shards. For commands that operate on multiple keys or necessitate
// distribution across shards (e.g., MultiShard commands), a Breakup function is invoked
// to transform the original command into multiple smaller commands, each directed at
Expand All @@ -41,13 +41,13 @@ import (
// decomposeRename breaks down the RENAME command into separate DELETE and SET commands.
// It first waits for the result of a GET command from shards. If successful, it removes
// the old key using a DEL command and sets the new key with the retrieved value using a SET command.
func decomposeRename(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeRename(ctx context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
// Waiting for GET command response
var val string
select {
case <-ctx.Done():
slog.Error("IOThread timed out waiting for response from shards", slog.String("id", thread.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-thread.preprocessingChan:
slog.Error("CommandHandler timed out waiting for response from shards", slog.String("id", h.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-h.preprocessingChan:
if ok {
evalResp := preProcessedResp.EvalResponse
if evalResp.Error != nil {
Expand Down Expand Up @@ -85,13 +85,13 @@ func decomposeRename(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCm
// decomposeCopy breaks down the COPY command into a SET command that copies a value from
// one key to another. It first retrieves the value of the original key from shards, then
// sets the value to the destination key using a SET command.
func decomposeCopy(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeCopy(ctx context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
psrvere marked this conversation as resolved.
Show resolved Hide resolved
// Waiting for GET command response
var resp *ops.StoreResponse
select {
case <-ctx.Done():
slog.Error("IOThread timed out waiting for response from shards", slog.String("id", thread.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-thread.preprocessingChan:
slog.Error("CommandHandler timed out waiting for response from shards", slog.String("id", h.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-h.preprocessingChan:
if ok {
resp = preProcessedResp
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func decomposeCopy(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd)
// decomposeMSet decomposes the MSET (Multi-set) command into individual SET commands.
// It expects an even number of arguments (key-value pairs). For each pair, it creates
// a separate SET command to store the value at the given key.
func decomposeMSet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeMSet(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args)%2 != 0 {
return nil, diceerrors.ErrWrongArgumentCount("MSET")
}
Expand All @@ -148,7 +148,7 @@ func decomposeMSet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cm
// decomposeMGet decomposes the MGET (Multi-get) command into individual GET commands.
// It expects a list of keys, and for each key, it creates a separate GET command to
// retrieve the value associated with that key.
func decomposeMGet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeMGet(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("MGET")
}
Expand All @@ -164,7 +164,7 @@ func decomposeMGet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cm
return decomposedCmds, nil
}

func decomposeSInter(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeSInter(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("SINTER")
}
Expand All @@ -180,7 +180,7 @@ func decomposeSInter(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*
return decomposedCmds, nil
}

func decomposeSDiff(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeSDiff(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("SDIFF")
}
Expand All @@ -196,7 +196,7 @@ func decomposeSDiff(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*c
return decomposedCmds, nil
}

func decomposeJSONMget(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeJSONMget(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 2 {
return nil, diceerrors.ErrWrongArgumentCount("JSON.MGET")
}
Expand All @@ -215,7 +215,7 @@ func decomposeJSONMget(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([
return decomposedCmds, nil
}

func decomposeTouch(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeTouch(_ context.Context, _ *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) == 0 {
return nil, diceerrors.ErrWrongArgumentCount("TOUCH")
}
Expand All @@ -232,13 +232,13 @@ func decomposeTouch(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*c
return decomposedCmds, nil
}

func decomposeDBSize(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeDBSize(_ context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) > 0 {
return nil, diceerrors.ErrWrongArgumentCount("DBSIZE")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.SingleShardSize,
Expand All @@ -249,13 +249,13 @@ func decomposeDBSize(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd)
return decomposedCmds, nil
}

func decomposeKeys(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeKeys(_ context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) != 1 {
return nil, diceerrors.ErrWrongArgumentCount("KEYS")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.SingleShardKeys,
Expand All @@ -266,13 +266,13 @@ func decomposeKeys(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) (
return decomposedCmds, nil
}

func decomposeFlushDB(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func decomposeFlushDB(_ context.Context, h *BaseCommandHandler, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) > 1 {
return nil, diceerrors.ErrWrongArgumentCount("FLUSHDB")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.FlushDB,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"context"
Expand Down Expand Up @@ -214,12 +214,12 @@ const (

type CmdMeta struct {
CmdType
Cmd string
IOThreadHandler func([]string) []byte
Cmd string
CmdHandlerFunction func([]string) []byte

// decomposeCommand is a function that takes a DiceDB command and breaks it down into smaller,
// manageable DiceDB commands for each shard processing. It returns a slice of DiceDB commands.
decomposeCommand func(ctx context.Context, thread *BaseIOThread, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error)
decomposeCommand func(ctx context.Context, h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error)
psrvere marked this conversation as resolved.
Show resolved Hide resolved

// composeResponse is a function that combines multiple responses from the execution of commands
// into a single response object. It accepts a variadic parameter of EvalResponse objects
Expand All @@ -234,10 +234,10 @@ type CmdMeta struct {

// preProcessResponse is a function that handles the preprocessing of a DiceDB command by
// preparing the necessary operations (e.g., fetching values from shards) before the command
// is executed. It takes the io-thread and the original DiceDB command as parameters and
// is executed. It takes the CommandHandler and the original DiceDB command as parameters and
// ensures that any required information is retrieved and processed in advance. Use this when set
// preProcessingReq = true.
preProcessResponse func(thread *BaseIOThread, DiceDBCmd *cmd.DiceDBCmd) error
preProcessResponse func(h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) error
}

var CommandsMeta = map[string]CmdMeta{
Expand Down Expand Up @@ -695,8 +695,8 @@ func init() {
func validateCmdMeta(c string, meta CmdMeta) error {
switch meta.CmdType {
case Global:
if meta.IOThreadHandler == nil {
return fmt.Errorf("global command %s must have IOThreadHandler function", c)
if meta.CmdHandlerFunction == nil {
return fmt.Errorf("global command %s must have CmdHandlerFunction function", c)
psrvere marked this conversation as resolved.
Show resolved Hide resolved
}
case MultiShard, AllShard:
if meta.decomposeCommand == nil || meta.composeResponse == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"github.com/dicedb/dice/internal/cmd"
Expand All @@ -25,13 +25,13 @@ import (
// preProcessRename prepares the RENAME command for preprocessing by sending a GET command
// to retrieve the value of the original key. The retrieved value is used later in the
// decomposeRename function to delete the old key and set the new key.
func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
func preProcessRename(h *BaseCommandHandler, diceDBCmd *cmd.DiceDBCmd) error {
if len(diceDBCmd.Args) < 2 {
return diceerrors.ErrWrongArgumentCount("RENAME")
}

key := diceDBCmd.Args[0]
sid, rc := thread.shardManager.GetShardInfo(key)
sid, rc := h.shardManager.GetShardInfo(key)

preCmd := cmd.DiceDBCmd{
Cmd: "RENAME",
Expand All @@ -42,7 +42,7 @@ func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
SeqID: 0,
RequestID: GenerateUniqueRequestID(),
Cmd: &preCmd,
IOThreadID: thread.id,
CmdHandlerID: h.id,
ShardID: sid,
Client: nil,
PreProcessing: true,
Expand All @@ -54,12 +54,12 @@ func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
// preProcessCopy prepares the COPY command for preprocessing by sending a GET command
// to retrieve the value of the original key. The retrieved value is used later in the
// decomposeCopy function to copy the value to the destination key.
func customProcessCopy(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
func customProcessCopy(h *BaseCommandHandler, diceDBCmd *cmd.DiceDBCmd) error {
psrvere marked this conversation as resolved.
Show resolved Hide resolved
if len(diceDBCmd.Args) < 2 {
return diceerrors.ErrWrongArgumentCount("COPY")
}

sid, rc := thread.shardManager.GetShardInfo(diceDBCmd.Args[0])
sid, rc := h.shardManager.GetShardInfo(diceDBCmd.Args[0])

preCmd := cmd.DiceDBCmd{
Cmd: "COPY",
Expand All @@ -71,7 +71,7 @@ func customProcessCopy(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
SeqID: 0,
RequestID: GenerateUniqueRequestID(),
Cmd: &preCmd,
IOThreadID: thread.id,
CmdHandlerID: h.id,
ShardID: sid,
Client: nil,
PreProcessing: true,
Expand Down
Loading
Loading