Skip to content

Commit

Permalink
Revert "Fix: Add a TTL on build containers for cleanup (#819)"
Browse files Browse the repository at this point in the history
This reverts commit b5f5bb7.
  • Loading branch information
jsun-m committed Jan 9, 2025
1 parent 4381d36 commit c9887e8
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 104 deletions.
28 changes: 1 addition & 27 deletions pkg/abstractions/image/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type Builder struct {
registry *common.ImageRegistry
containerRepo repository.ContainerRepository
tailscale *network.Tailscale
rdb *common.RedisClient
}

type BuildStep struct {
Expand Down Expand Up @@ -131,14 +130,13 @@ func (o *BuildOpts) addPythonRequirements() {
o.PythonPackages = append(filteredPythonPackages, baseRequirementsSlice...)
}

func NewBuilder(config types.AppConfig, registry *common.ImageRegistry, scheduler *scheduler.Scheduler, tailscale *network.Tailscale, containerRepo repository.ContainerRepository, rdb *common.RedisClient) (*Builder, error) {
func NewBuilder(config types.AppConfig, registry *common.ImageRegistry, scheduler *scheduler.Scheduler, tailscale *network.Tailscale, containerRepo repository.ContainerRepository) (*Builder, error) {
return &Builder{
config: config,
scheduler: scheduler,
tailscale: tailscale,
registry: registry,
containerRepo: containerRepo,
rdb: rdb,
}, nil
}

Expand Down Expand Up @@ -286,14 +284,6 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co
return err
}

err = b.rdb.Set(ctx, Keys.imageBuildContainerTTL(containerId), "1", time.Duration(imageContainerTtlS)*time.Second).Err()
if err != nil {
outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Failed to connect to build container.\n"}
return err
}

go b.keepAlive(ctx, containerId, ctx.Done())

conn, err := network.ConnectToHost(ctx, hostname, time.Second*30, b.tailscale, b.config.Tailscale)
if err != nil {
outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Failed to connect to build container.\n"}
Expand Down Expand Up @@ -448,22 +438,6 @@ func (b *Builder) Exists(ctx context.Context, imageId string) bool {
return b.registry.Exists(ctx, imageId)
}

func (b *Builder) keepAlive(ctx context.Context, containerId string, done <-chan struct{}) {
ticker := time.NewTicker(time.Duration(buildContainerKeepAliveIntervalS) * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-done:
return
case <-ticker.C:
b.rdb.Set(ctx, Keys.imageBuildContainerTTL(containerId), "1", time.Duration(imageContainerTtlS)*time.Second).Err()
}
}
}

var imageNamePattern = regexp.MustCompile(
`^` + // Assert position at the start of the string
`(?:(?P<Registry>(?:(?:localhost|[\w.-]+(?:\.[\w.-]+)+)(?::\d+)?)|[\w]+:\d+)\/)?` + // Optional registry, which can be localhost, a domain with optional port, or a simple registry with port
Expand Down
81 changes: 9 additions & 72 deletions pkg/abstractions/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package image
import (
"context"
"fmt"
"strings"

"github.com/beam-cloud/beta9/pkg/auth"
"github.com/beam-cloud/beta9/pkg/common"
Expand All @@ -24,12 +23,9 @@ type ImageService interface {

type RuncImageService struct {
pb.UnimplementedImageServiceServer
builder *Builder
config types.AppConfig
backendRepo repository.BackendRepository
rdb *common.RedisClient
keyEventChan chan common.KeyEvent
keyEventManager *common.KeyEventManager
builder *Builder
config types.AppConfig
backendRepo repository.BackendRepository
}

type ImageServiceOpts struct {
Expand All @@ -38,12 +34,8 @@ type ImageServiceOpts struct {
BackendRepo repository.BackendRepository
Scheduler *scheduler.Scheduler
Tailscale *network.Tailscale
RedisClient *common.RedisClient
}

const buildContainerKeepAliveIntervalS int = 10
const imageContainerTtlS int = 60

func NewRuncImageService(
ctx context.Context,
opts ImageServiceOpts,
Expand All @@ -53,30 +45,16 @@ func NewRuncImageService(
return nil, err
}

builder, err := NewBuilder(opts.Config, registry, opts.Scheduler, opts.Tailscale, opts.ContainerRepo, opts.RedisClient)
if err != nil {
return nil, err
}

keyEventManager, err := common.NewKeyEventManager(opts.RedisClient)
builder, err := NewBuilder(opts.Config, registry, opts.Scheduler, opts.Tailscale, opts.ContainerRepo)
if err != nil {
return nil, err
}

is := RuncImageService{
builder: builder,
config: opts.Config,
backendRepo: opts.BackendRepo,
keyEventChan: make(chan common.KeyEvent),
keyEventManager: keyEventManager,
rdb: opts.RedisClient,
}

go is.monitorImageContainers(ctx)
go is.keyEventManager.ListenForPattern(ctx, Keys.imageBuildContainerTTL("*"), is.keyEventChan)
go is.keyEventManager.ListenForPattern(ctx, common.RedisKeys.SchedulerContainerState("*"), is.keyEventChan)

return &is, nil
return &RuncImageService{
builder: builder,
config: opts.Config,
backendRepo: opts.BackendRepo,
}, nil
}

func (is *RuncImageService) VerifyImageBuild(ctx context.Context, in *pb.VerifyImageBuildRequest) (*pb.VerifyImageBuildResponse, error) {
Expand Down Expand Up @@ -206,35 +184,6 @@ func (is *RuncImageService) retrieveBuildSecrets(ctx context.Context, secrets []
return buildSecrets, nil
}

func (is *RuncImageService) monitorImageContainers(ctx context.Context) {
for {
select {
case event := <-is.keyEventChan:
switch event.Operation {
case common.KeyOperationSet:
if strings.Contains(event.Key, common.RedisKeys.SchedulerContainerState("")) {
containerId := strings.TrimPrefix(is.keyEventManager.TrimKeyspacePrefix(event.Key), common.RedisKeys.SchedulerContainerState(""))

if is.rdb.Exists(ctx, Keys.imageBuildContainerTTL(containerId)).Val() == 0 {
is.builder.scheduler.Stop(&types.StopContainerArgs{
ContainerId: containerId,
Force: true,
})
}
}
case common.KeyOperationExpired:
containerId := strings.TrimPrefix(is.keyEventManager.TrimKeyspacePrefix(event.Key), Keys.imageBuildContainerTTL(""))
is.builder.scheduler.Stop(&types.StopContainerArgs{
ContainerId: containerId,
Force: true,
})
}
case <-ctx.Done():
return
}
}
}

func convertBuildSteps(buildSteps []*pb.BuildStep) []BuildStep {
steps := make([]BuildStep, len(buildSteps))
for i, s := range buildSteps {
Expand All @@ -245,15 +194,3 @@ func convertBuildSteps(buildSteps []*pb.BuildStep) []BuildStep {
}
return steps
}

var (
imageBuildContainerTTL string = "image:build_container_ttl:%s"
)

var Keys = &keys{}

type keys struct{}

func (k *keys) imageBuildContainerTTL(containerId string) string {
return fmt.Sprintf(imageBuildContainerTTL, containerId)
}
8 changes: 4 additions & 4 deletions pkg/common/key_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ func NewKeyEventManager(rdb *RedisClient) (*KeyEventManager, error) {
return &KeyEventManager{rdb: rdb}, nil
}

func (kem *KeyEventManager) TrimKeyspacePrefix(key string) string {
return strings.TrimPrefix(key, keyspacePrefix)
}

func (kem *KeyEventManager) fetchExistingKeys(patternPrefix string) ([]string, error) {
pattern := fmt.Sprintf("%s*", patternPrefix)

Expand All @@ -53,6 +49,10 @@ func (kem *KeyEventManager) fetchExistingKeys(patternPrefix string) ([]string, e
return trimmedKeys, nil
}

func (kem *KeyEventManager) TrimKeyspacePrefix(key string) string {
return strings.TrimPrefix(key, keyspacePrefix)
}

func (kem *KeyEventManager) ListenForPattern(ctx context.Context, patternPrefix string, keyEventChan chan KeyEvent) error {
existingKeys, err := kem.fetchExistingKeys(patternPrefix)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ func (g *Gateway) registerServices() error {
Scheduler: g.Scheduler,
Tailscale: g.Tailscale,
BackendRepo: g.BackendRepo,
RedisClient: g.RedisClient,
})
if err != nil {
return err
Expand Down

0 comments on commit c9887e8

Please sign in to comment.