Skip to content

Commit

Permalink
Improve request concurrency (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickpetrovic authored Jan 24, 2024
1 parent b151892 commit 59c6373
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 19 deletions.
5 changes: 5 additions & 0 deletions internal/common/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (

var (
workerPrefix string = "worker"
workerImageLock string = "worker:%s:image:%s:lock"
workerContainerRequest string = "worker:%s:container:%s:request"
workerContainerResourceUsage string = "worker:%s:container:%s:resource_usage"
)
Expand Down Expand Up @@ -127,6 +128,10 @@ func (rk *redisKeys) WorkerContainerResourceUsage(workerId string, containerId s
return fmt.Sprintf(workerContainerResourceUsage, workerId, containerId)
}

func (rk *redisKeys) WorkerImageLock(workerId string, imageId string) string {
return fmt.Sprintf(workerImageLock, workerId, imageId)
}

// WorkerPool keys
func (rk *redisKeys) WorkerPoolLock(poolId string) string {
return fmt.Sprintf(workerPoolLock, poolId)
Expand Down
2 changes: 2 additions & 0 deletions internal/repository/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type WorkerRepository interface {
AddContainerRequestToWorker(workerId string, containerId string, request *types.ContainerRequest) error
RemoveContainerRequestFromWorker(workerId string, containerId string) error
SetContainerResourceValues(workerId string, containerId string, usage types.ContainerResourceUsage) error
SetImagePullLock(workerId, imageId string) error
RemoveImagePullLock(workerId, imageId string) error
}

type ContainerRepository interface {
Expand Down
2 changes: 1 addition & 1 deletion internal/repository/container_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (cr *ContainerRedisRepository) SetContainerState(containerId string, info *
}

func (cr *ContainerRedisRepository) SetContainerExitCode(containerId string, exitCode int) error {
err := cr.lock.Acquire(context.TODO(), common.RedisKeys.SchedulerContainerLock(containerId), common.RedisLockOptions{TtlS: 10, Retries: 0})
err := cr.lock.Acquire(context.TODO(), common.RedisKeys.SchedulerContainerLock(containerId), common.RedisLockOptions{TtlS: 10, Retries: 1})
if err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions internal/repository/worker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log"
"strings"
"time"

"github.com/beam-cloud/beta9/internal/common"
"github.com/beam-cloud/beta9/internal/types"
Expand Down Expand Up @@ -355,3 +356,11 @@ func (r *WorkerRedisRepository) SetContainerResourceValues(workerId string, cont

return nil
}

func (r *WorkerRedisRepository) SetImagePullLock(workerId, imageId string) error {
return r.rdb.Set(context.TODO(), common.RedisKeys.WorkerImageLock(workerId, imageId), true, 5*time.Minute).Err()
}

func (r *WorkerRedisRepository) RemoveImagePullLock(workerId, imageId string) error {
return r.rdb.Del(context.TODO(), common.RedisKeys.WorkerImageLock(workerId, imageId)).Err()
}
25 changes: 15 additions & 10 deletions internal/worker/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"log"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"syscall"
"time"

"github.com/beam-cloud/beta9/internal/abstractions/image"
common "github.com/beam-cloud/beta9/internal/common"
"github.com/beam-cloud/beta9/internal/repository"
types "github.com/beam-cloud/beta9/internal/types"
"github.com/beam-cloud/clip/pkg/clip"
clipCommon "github.com/beam-cloud/clip/pkg/common"
Expand Down Expand Up @@ -46,9 +46,11 @@ type ImageClient struct {
Debug bool
Creds string
config types.ImageServiceConfig
workerId string
workerRepo repository.WorkerRepository
}

func NewImageClient(config types.ImageServiceConfig) (*ImageClient, error) {
func NewImageClient(config types.ImageServiceConfig, workerId string, workerRepo repository.WorkerRepository) (*ImageClient, error) {
var provider CredentialProvider // Configure image registry credentials

switch config.RegistryCredentialProviderName {
Expand Down Expand Up @@ -78,8 +80,10 @@ func NewImageClient(config types.ImageServiceConfig) (*ImageClient, error) {
}
}

baseImagePath := filepath.Join(imageCachePath)
os.MkdirAll(baseImagePath, os.ModePerm)
err = os.MkdirAll(imageCachePath, os.ModePerm)
if err != nil {
return nil, err
}

creds, err := provider.GetAuthString()
if err != nil {
Expand All @@ -90,11 +94,13 @@ func NewImageClient(config types.ImageServiceConfig) (*ImageClient, error) {
config: config,
registry: registry,
cacheClient: cacheClient,
ImagePath: baseImagePath,
ImagePath: imageCachePath,
PullCommand: imagePullCommand,
CommandTimeout: -1,
Debug: false,
Creds: creds,
workerId: workerId,
workerRepo: workerRepo,
}, nil
}

Expand All @@ -121,13 +127,12 @@ func (c *ImageClient) PullLazy(imageId string) error {
return nil
}

// Attempt to acquire the lock
fileLock := NewFileLock(path.Join(imagePath, fmt.Sprintf("%s_%s", imageId, imageMountLockFilename)))
if err := fileLock.Acquire(); err != nil {
fmt.Printf("Unable to acquire mount lock: %v\n", err)
// Get lock on image mount
err = c.workerRepo.SetImagePullLock(c.workerId, imageId)
if err != nil {
return err
}
defer fileLock.Release()
defer c.workerRepo.RemoveImagePullLock(c.workerId, imageId)

startServer, _, err := clip.MountArchive(*mountOptions)
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ func NewWorker() (*Worker, error) {
return nil, err
}

imageClient, err := NewImageClient(config.ImageService)
containerRepo := repo.NewContainerRedisRepository(redisClient)
workerRepo := repo.NewWorkerRedisRepository(redisClient)
statsdRepo := repo.NewMetricsStatsdRepository()

imageClient, err := NewImageClient(config.ImageService, workerId, workerRepo)
if err != nil {
return nil, err
}
Expand All @@ -137,11 +141,6 @@ func NewWorker() (*Worker, error) {
}

ctx, cancel := context.WithCancel(context.Background())

containerRepo := repo.NewContainerRedisRepository(redisClient)
workerRepo := repo.NewWorkerRedisRepository(redisClient)
statsdRepo := repo.NewMetricsStatsdRepository()

workerMetrics := NewWorkerMetrics(ctx, podHostName, statsdRepo, workerRepo, repo.NewMetricsStreamRepository(ctx, config.Metrics))

return &Worker{
Expand Down
5 changes: 3 additions & 2 deletions sdk/src/beta9/abstractions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Callable, Iterator, List, Optional, Sequence, Union

import cloudpickle

from beta9 import terminal
from beta9.abstractions.base.runner import (
FUNCTION_DEPLOYMENT_STUB_TYPE,
Expand Down Expand Up @@ -101,7 +102,7 @@ async def _call_remote(self, *args, **kwargs) -> Any:
)

terminal.header("Running function")
last_response: Union[None, FunctionInvokeResponse] = None
last_response: Optional[FunctionInvokeResponse] = None

async for r in self.parent.function_stub.function_invoke(
stub_id=self.parent.stub_id,
Expand All @@ -114,7 +115,7 @@ async def _call_remote(self, *args, **kwargs) -> Any:
last_response = r
break

if not last_response.done or last_response.exit_code != 0:
if last_response is None or not last_response.done or last_response.exit_code != 0:
terminal.error("Function failed ☠️")
return None

Expand Down

0 comments on commit 59c6373

Please sign in to comment.