Skip to content

Commit

Permalink
Namespace more stuff by workspaceName (#46)
Browse files Browse the repository at this point in the history
* namespace more stuff by workspace name

* namespace everything by workspace

---------

Co-authored-by: Luke Lombardi <[email protected]>
  • Loading branch information
luke-lombardi and Luke Lombardi authored Jan 11, 2024
1 parent 9daeb6d commit f73da7a
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 160 deletions.
28 changes: 16 additions & 12 deletions internal/abstractions/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (fs *RunCFunctionService) FunctionInvoke(in *pb.FunctionInvokeRequest, stre
}

go client.StreamLogs(ctx, task.ContainerId, outputChan)
return fs.handleStreams(ctx, stream, task.ExternalId, task.ContainerId, outputChan, keyEventChan)
return fs.handleStreams(ctx, stream, authInfo.Workspace.Name, task.ExternalId, task.ContainerId, outputChan, keyEventChan)
}

func (fs *RunCFunctionService) invoke(ctx context.Context, authInfo *auth.AuthInfo, stubId string, args []byte, keyEventChan chan common.KeyEvent) (*types.Task, error) {
Expand Down Expand Up @@ -124,7 +124,7 @@ func (fs *RunCFunctionService) invoke(ctx context.Context, authInfo *auth.AuthIn
return nil, err
}

err = fs.rdb.Set(ctx, Keys.FunctionArgs(taskId), args, functionArgsExpirationTimeout).Err()
err = fs.rdb.Set(ctx, Keys.FunctionArgs(authInfo.Workspace.Name, taskId), args, functionArgsExpirationTimeout).Err()
if err != nil {
return nil, errors.New("unable to store function args")
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (fs *RunCFunctionService) createTask(ctx context.Context, authInfo *auth.Au

func (fs *RunCFunctionService) handleStreams(ctx context.Context,
stream pb.FunctionService_FunctionInvokeServer,
taskId string, containerId string,
workspaceName, taskId, containerId string,
outputChan chan common.OutputMsg, keyEventChan chan common.KeyEvent) error {

var lastMessage common.OutputMsg
Expand All @@ -207,7 +207,7 @@ _stream:
break _stream
}
case <-keyEventChan:
result, _ := fs.rdb.Get(stream.Context(), Keys.FunctionResult(taskId)).Bytes()
result, _ := fs.rdb.Get(stream.Context(), Keys.FunctionResult(workspaceName, taskId)).Bytes()
if err := stream.Send(&pb.FunctionInvokeResponse{TaskId: taskId, Done: true, Result: result, ExitCode: 0}); err != nil {
break
}
Expand All @@ -224,7 +224,9 @@ _stream:
}

func (fs *RunCFunctionService) FunctionGetArgs(ctx context.Context, in *pb.FunctionGetArgsRequest) (*pb.FunctionGetArgsResponse, error) {
value, err := fs.rdb.Get(ctx, Keys.FunctionArgs(in.TaskId)).Bytes()
authInfo, _ := auth.AuthInfoFromContext(ctx)

value, err := fs.rdb.Get(ctx, Keys.FunctionArgs(authInfo.Workspace.Name, in.TaskId)).Bytes()
if err != nil {
return &pb.FunctionGetArgsResponse{Ok: false, Args: nil}, nil
}
Expand All @@ -236,7 +238,9 @@ func (fs *RunCFunctionService) FunctionGetArgs(ctx context.Context, in *pb.Funct
}

func (fs *RunCFunctionService) FunctionSetResult(ctx context.Context, in *pb.FunctionSetResultRequest) (*pb.FunctionSetResultResponse, error) {
err := fs.rdb.Set(ctx, Keys.FunctionResult(in.TaskId), in.Result, functionResultExpirationTimeout).Err()
authInfo, _ := auth.AuthInfoFromContext(ctx)

err := fs.rdb.Set(ctx, Keys.FunctionResult(authInfo.Workspace.Name, in.TaskId), in.Result, functionResultExpirationTimeout).Err()
if err != nil {
return &pb.FunctionSetResultResponse{Ok: false}, nil
}
Expand All @@ -253,8 +257,8 @@ func (fs *RunCFunctionService) genContainerId(taskId string) string {
// Redis keys
var (
functionPrefix string = "function"
functionArgs string = "function:%s:args"
functionResult string = "function:%s:result"
functionArgs string = "function:%s:%s:args"
functionResult string = "function:%s:%s:result"
)

var Keys = &keys{}
Expand All @@ -265,10 +269,10 @@ func (k *keys) FunctionPrefix() string {
return functionPrefix
}

func (k *keys) FunctionArgs(taskId string) string {
return fmt.Sprintf(functionArgs, taskId)
func (k *keys) FunctionArgs(workspaceName, taskId string) string {
return fmt.Sprintf(functionArgs, workspaceName, taskId)
}

func (k *keys) FunctionResult(taskId string) string {
return fmt.Sprintf(functionResult, taskId)
func (k *keys) FunctionResult(workspaceName, taskId string) string {
return fmt.Sprintf(functionResult, workspaceName, taskId)
}
27 changes: 19 additions & 8 deletions internal/abstractions/map/map_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

"github.com/beam-cloud/beam/internal/auth"
"github.com/beam-cloud/beam/internal/common"
pb "github.com/beam-cloud/beam/proto"
)
Expand All @@ -23,7 +24,9 @@ func NewRedisMapService(rdb *common.RedisClient) (MapService, error) {

// Map service implementations
func (m *RedisMapService) MapSet(ctx context.Context, in *pb.MapSetRequest) (*pb.MapSetResponse, error) {
err := m.rdb.Set(context.TODO(), Keys.MapEntry(in.Name, in.Key), in.Value, 0).Err()
authInfo, _ := auth.AuthInfoFromContext(ctx)

err := m.rdb.Set(context.TODO(), Keys.MapEntry(authInfo.Workspace.Name, in.Name, in.Key), in.Value, 0).Err()
if err != nil {
return &pb.MapSetResponse{Ok: false}, err
}
Expand All @@ -32,7 +35,9 @@ func (m *RedisMapService) MapSet(ctx context.Context, in *pb.MapSetRequest) (*pb
}

func (m *RedisMapService) MapGet(ctx context.Context, in *pb.MapGetRequest) (*pb.MapGetResponse, error) {
value, err := m.rdb.Get(context.TODO(), Keys.MapEntry(in.Name, in.Key)).Bytes()
authInfo, _ := auth.AuthInfoFromContext(ctx)

value, err := m.rdb.Get(context.TODO(), Keys.MapEntry(authInfo.Workspace.Name, in.Name, in.Key)).Bytes()
if err != nil {
return &pb.MapGetResponse{Ok: false, Value: nil}, err
}
Expand All @@ -41,7 +46,9 @@ func (m *RedisMapService) MapGet(ctx context.Context, in *pb.MapGetRequest) (*pb
}

func (m *RedisMapService) MapDelete(ctx context.Context, in *pb.MapDeleteRequest) (*pb.MapDeleteResponse, error) {
err := m.rdb.Del(context.TODO(), Keys.MapEntry(in.Name, in.Key)).Err()
authInfo, _ := auth.AuthInfoFromContext(ctx)

err := m.rdb.Del(context.TODO(), Keys.MapEntry(authInfo.Workspace.Name, in.Name, in.Key)).Err()
if err != nil {
return &pb.MapDeleteResponse{Ok: false}, err
}
Expand All @@ -50,7 +57,9 @@ func (m *RedisMapService) MapDelete(ctx context.Context, in *pb.MapDeleteRequest
}

func (m *RedisMapService) MapCount(ctx context.Context, in *pb.MapCountRequest) (*pb.MapCountResponse, error) {
keys, err := m.rdb.Scan(context.TODO(), Keys.MapEntry(in.Name, "*"))
authInfo, _ := auth.AuthInfoFromContext(ctx)

keys, err := m.rdb.Scan(context.TODO(), Keys.MapEntry(authInfo.Workspace.Name, in.Name, "*"))
if err != nil {
return &pb.MapCountResponse{Ok: false, Count: 0}, err
}
Expand All @@ -59,7 +68,9 @@ func (m *RedisMapService) MapCount(ctx context.Context, in *pb.MapCountRequest)
}

func (m *RedisMapService) MapKeys(ctx context.Context, in *pb.MapKeysRequest) (*pb.MapKeysResponse, error) {
keys, err := m.rdb.Scan(context.TODO(), Keys.MapEntry(in.Name, "*"))
authInfo, _ := auth.AuthInfoFromContext(ctx)

keys, err := m.rdb.Scan(context.TODO(), Keys.MapEntry(authInfo.Workspace.Name, in.Name, "*"))
if err != nil {
return &pb.MapKeysResponse{Ok: false, Keys: []string{}}, err
}
Expand All @@ -75,7 +86,7 @@ func (m *RedisMapService) MapKeys(ctx context.Context, in *pb.MapKeysRequest) (*
// Redis keys
var (
mapPrefix string = "map"
mapEntry string = "map:%s:%s"
mapEntry string = "map:%s:%s:%s"
)

var Keys = &keys{}
Expand All @@ -86,6 +97,6 @@ func (k *keys) MapPrefix() string {
return mapPrefix
}

func (k *keys) MapEntry(name, key string) string {
return fmt.Sprintf(mapEntry, name, key)
func (k *keys) MapEntry(workspaceName, name, key string) string {
return fmt.Sprintf(mapEntry, workspaceName, name, key)
}
18 changes: 9 additions & 9 deletions internal/abstractions/queue/queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ option go_package = "github.com/beam-cloud/beam/proto";
package simplequeue;

service SimpleQueueService {
rpc Put(SimpleQueuePutRequest) returns (SimpleQueuePutResponse) {}
rpc Pop(SimpleQueuePopRequest) returns (SimpleQueuePopResponse) {}
rpc Peek(SimpleQueueRequest) returns (SimpleQueuePeekResponse) {}
rpc Empty(SimpleQueueRequest) returns (SimpleQueueEmptyResponse) {}
rpc Size(SimpleQueueRequest) returns (SimpleQueueSizeResponse) {}
rpc SimpleQueuePut(SimpleQueuePutRequest) returns (SimpleQueuePutResponse) {}
rpc SimpleQueuePop(SimpleQueuePopRequest) returns (SimpleQueuePopResponse) {}
rpc SimpleQueuePeek(SimpleQueueRequest) returns (SimpleQueuePeekResponse) {}
rpc SimpleQueueEmpty(SimpleQueueRequest) returns (SimpleQueueEmptyResponse) {}
rpc SimpleQueueSize(SimpleQueueRequest) returns (SimpleQueueSizeResponse) {}
}

message SimpleQueuePutRequest {
string name = 1;
bytes value = 2;
string name = 1;
bytes value = 2;
}

message SimpleQueuePutResponse { bool ok = 1; }

message SimpleQueuePopRequest {
string name = 1;
bytes value = 2;
string name = 1;
bytes value = 2;
}

message SimpleQueuePopResponse {
Expand Down
10 changes: 5 additions & 5 deletions internal/abstractions/queue/simplequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

type SimpleQueueService interface {
pb.SimpleQueueServiceServer
Put(ctx context.Context, in *pb.SimpleQueuePutRequest) (*pb.SimpleQueuePutResponse, error)
Pop(ctx context.Context, in *pb.SimpleQueuePopRequest) (*pb.SimpleQueuePopResponse, error)
Peek(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueuePeekResponse, error)
Empty(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueueEmptyResponse, error)
Size(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueueSizeResponse, error)
SimpleQueuePut(ctx context.Context, in *pb.SimpleQueuePutRequest) (*pb.SimpleQueuePutResponse, error)
SimpleQueuePop(ctx context.Context, in *pb.SimpleQueuePopRequest) (*pb.SimpleQueuePopResponse, error)
SimpleQueuePeek(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueuePeekResponse, error)
SimpleQueueEmpty(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueueEmptyResponse, error)
SimpleQueueSize(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueueSizeResponse, error)
}
30 changes: 18 additions & 12 deletions internal/abstractions/queue/simplequeue_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/beam-cloud/beam/internal/auth"
"github.com/beam-cloud/beam/internal/common"
pb "github.com/beam-cloud/beam/proto"
"github.com/redis/go-redis/v9"
Expand All @@ -22,8 +23,9 @@ func NewRedisSimpleQueueService(rdb *common.RedisClient) (SimpleQueueService, er
}

// Simple queue service implementations
func (s *RedisSimpleQueueService) Put(ctx context.Context, in *pb.SimpleQueuePutRequest) (*pb.SimpleQueuePutResponse, error) {
queueName := Keys.SimpleQueueName(in.Name)
func (s *RedisSimpleQueueService) SimpleQueuePut(ctx context.Context, in *pb.SimpleQueuePutRequest) (*pb.SimpleQueuePutResponse, error) {
authInfo, _ := auth.AuthInfoFromContext(ctx)
queueName := Keys.SimpleQueueName(authInfo.Workspace.Name, in.Name)

err := s.rdb.RPush(context.TODO(), queueName, in.Value).Err()
if err != nil {
Expand All @@ -37,8 +39,9 @@ func (s *RedisSimpleQueueService) Put(ctx context.Context, in *pb.SimpleQueuePut
}, nil
}

func (s *RedisSimpleQueueService) Pop(ctx context.Context, in *pb.SimpleQueuePopRequest) (*pb.SimpleQueuePopResponse, error) {
queueName := Keys.SimpleQueueName(in.Name)
func (s *RedisSimpleQueueService) SimpleQueuePop(ctx context.Context, in *pb.SimpleQueuePopRequest) (*pb.SimpleQueuePopResponse, error) {
authInfo, _ := auth.AuthInfoFromContext(ctx)
queueName := Keys.SimpleQueueName(authInfo.Workspace.Name, in.Name)

value, err := s.rdb.LPop(context.TODO(), queueName).Bytes()
if err == redis.Nil {
Expand All @@ -59,8 +62,9 @@ func (s *RedisSimpleQueueService) Pop(ctx context.Context, in *pb.SimpleQueuePop
}, nil
}

func (s *RedisSimpleQueueService) Peek(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueuePeekResponse, error) {
queueName := Keys.SimpleQueueName(in.Name)
func (s *RedisSimpleQueueService) SimpleQueuePeek(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueuePeekResponse, error) {
authInfo, _ := auth.AuthInfoFromContext(ctx)
queueName := Keys.SimpleQueueName(authInfo.Workspace.Name, in.Name)

res, err := s.rdb.LRange(context.TODO(), queueName, 0, 0)
if err != nil {
Expand All @@ -81,8 +85,9 @@ func (s *RedisSimpleQueueService) Peek(ctx context.Context, in *pb.SimpleQueueRe
}, nil
}

func (s *RedisSimpleQueueService) Empty(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueueEmptyResponse, error) {
queueName := Keys.SimpleQueueName(in.Name)
func (s *RedisSimpleQueueService) SimpleQueueEmpty(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueueEmptyResponse, error) {
authInfo, _ := auth.AuthInfoFromContext(ctx)
queueName := Keys.SimpleQueueName(authInfo.Workspace.Name, in.Name)

length, err := s.rdb.LLen(context.TODO(), queueName).Result()
if err != nil {
Expand All @@ -105,8 +110,9 @@ func (s *RedisSimpleQueueService) Empty(ctx context.Context, in *pb.SimpleQueueR
}, nil
}

func (s *RedisSimpleQueueService) Size(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueueSizeResponse, error) {
queueName := Keys.SimpleQueueName(in.Name)
func (s *RedisSimpleQueueService) SimpleQueueSize(ctx context.Context, in *pb.SimpleQueueRequest) (*pb.SimpleQueueSizeResponse, error) {
authInfo, _ := auth.AuthInfoFromContext(ctx)
queueName := Keys.SimpleQueueName(authInfo.Workspace.Name, in.Name)

length, err := s.rdb.LLen(context.TODO(), queueName).Result()
if err != nil {
Expand Down Expand Up @@ -136,6 +142,6 @@ func (k *keys) SimpleQueuePrefix() string {
return queuePrefix
}

func (k *keys) SimpleQueueName(name string) string {
return fmt.Sprintf(queueName, name)
func (k *keys) SimpleQueueName(workspaceName, name string) string {
return fmt.Sprintf(queueName, workspaceName, name)
}
Loading

0 comments on commit f73da7a

Please sign in to comment.