Skip to content

Commit

Permalink
Add volume management commands (#139)
Browse files Browse the repository at this point in the history
- Add deploy common command
- Add list volume mgmt command
- Add create volume mgmt command
- Fix error field names to be consistent
- Fix var names in task.py to be consistent

Resolve BE-1308
  • Loading branch information
nickpetrovic authored Apr 16, 2024
1 parent 125077a commit df99692
Show file tree
Hide file tree
Showing 14 changed files with 783 additions and 171 deletions.
97 changes: 90 additions & 7 deletions internal/abstractions/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/beam-cloud/beta9/internal/repository"
"github.com/beam-cloud/beta9/internal/types"
pb "github.com/beam-cloud/beta9/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

type VolumeService interface {
Expand All @@ -36,18 +37,55 @@ func NewGlobalVolumeService(backendRepo repository.BackendRepository) (VolumeSer

func (vs *GlobalVolumeService) GetOrCreateVolume(ctx context.Context, in *pb.GetOrCreateVolumeRequest) (*pb.GetOrCreateVolumeResponse, error) {
authInfo, _ := auth.AuthInfoFromContext(ctx)
workspaceId := authInfo.Workspace.Id

volume, err := vs.backendRepo.GetOrCreateVolume(ctx, workspaceId, in.Name)
volume, err := vs.getOrCreateVolume(ctx, authInfo.Workspace, in.Name)
if err != nil {
return &pb.GetOrCreateVolumeResponse{
Ok: false,
Ok: false,
ErrMsg: "Unable to get or create volume",
}, nil
}

return &pb.GetOrCreateVolumeResponse{
VolumeId: volume.ExternalId,
Ok: true,
Ok: true,
Volume: &pb.VolumeInstance{
Id: volume.ExternalId,
Name: volume.Name,
CreatedAt: timestamppb.New(volume.CreatedAt),
UpdatedAt: timestamppb.New(volume.UpdatedAt),
WorkspaceId: authInfo.Workspace.ExternalId,
WorkspaceName: authInfo.Workspace.Name,
},
}, nil
}

func (vs *GlobalVolumeService) ListVolumes(ctx context.Context, in *pb.ListVolumesRequest) (*pb.ListVolumesResponse, error) {
authInfo, _ := auth.AuthInfoFromContext(ctx)

volumes, err := vs.listVolumes(ctx, authInfo.Workspace)
if err != nil {
return &pb.ListVolumesResponse{
Ok: false,
ErrMsg: err.Error(),
}, nil
}

vols := make([]*pb.VolumeInstance, len(volumes))
for i, v := range volumes {
vols[i] = &pb.VolumeInstance{
Id: v.ExternalId,
Name: v.Name,
Size: v.Size,
CreatedAt: timestamppb.New(v.CreatedAt),
UpdatedAt: timestamppb.New(v.UpdatedAt),
WorkspaceId: v.Workspace.ExternalId,
WorkspaceName: v.Workspace.Name,
}
}

return &pb.ListVolumesResponse{
Ok: true,
Volumes: vols,
}, nil
}

Expand Down Expand Up @@ -95,8 +133,8 @@ func (vs *GlobalVolumeService) CopyPathStream(stream pb.VolumeService_CopyPathSt

if err := vs.copyPathStream(ctx, ch, authInfo.Workspace); err != nil {
return stream.SendAndClose(&pb.CopyPathResponse{
Ok: false,
ErrorMsg: err.Error(),
Ok: false,
ErrMsg: err.Error(),
})
}

Expand All @@ -122,6 +160,40 @@ func (vs *GlobalVolumeService) DeletePath(ctx context.Context, in *pb.DeletePath

// Volume business logic

func (vs *GlobalVolumeService) getOrCreateVolume(ctx context.Context, workspace *types.Workspace, volumeName string) (*types.Volume, error) {
volume, err := vs.backendRepo.GetOrCreateVolume(ctx, workspace.Id, volumeName)
if err != nil {
return nil, err
}

volumePath := JoinVolumePath(workspace.Name, volume.ExternalId)
if _, err := os.Stat(volumePath); os.IsNotExist(err) {
os.MkdirAll(volumePath, os.FileMode(0755))
}

return volume, nil
}

func (vs *GlobalVolumeService) listVolumes(ctx context.Context, workspace *types.Workspace) ([]types.VolumeWithRelated, error) {
volumes, err := vs.backendRepo.ListVolumesWithRelated(ctx, workspace.Id)
if err != nil {
return nil, err
}

for i, v := range volumes {
path := JoinVolumePath(workspace.Name, v.ExternalId)

size, err := CalculateDirSize(path)
if err != nil {
size = 0
}

volumes[i].Size = size
}

return volumes, nil
}

type CopyPathContent struct {
Path string
Content []byte
Expand Down Expand Up @@ -281,3 +353,14 @@ func GetVolumePaths(workspaceName string, volumeExternalId string, volumePath st
func JoinVolumePath(workspaceName, volumeExternalId string, subPaths ...string) string {
return path.Join(append([]string{types.DefaultVolumesPath, workspaceName, volumeExternalId}, subPaths...)...)
}

func CalculateDirSize(path string) (uint64, error) {
var size uint64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if !info.IsDir() {
size += uint64(info.Size())
}
return err
})
return size, err
}
25 changes: 23 additions & 2 deletions internal/abstractions/volume/volume.proto
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
syntax = "proto3";

option go_package = "github.com/beam-cloud/beta9/proto";
import "google/protobuf/timestamp.proto";

package volume;

service VolumeService {
rpc GetOrCreateVolume(GetOrCreateVolumeRequest) returns (GetOrCreateVolumeResponse) {}
rpc ListVolumes(ListVolumesRequest) returns (ListVolumesResponse) {}
rpc ListPath(ListPathRequest) returns (ListPathResponse) {}
rpc DeletePath(DeletePathRequest) returns (DeletePathResponse) {}
rpc CopyPathStream(stream CopyPathRequest) returns (CopyPathResponse) {}
}

message VolumeInstance {
string id = 1;
string name = 2;
uint64 size = 3;
google.protobuf.Timestamp created_at = 4;
google.protobuf.Timestamp updated_at = 5;
string workspace_id = 6;
string workspace_name = 7;
}

message GetOrCreateVolumeRequest {
string name = 1;
}

message GetOrCreateVolumeResponse {
bool ok = 1;
string volume_id = 2;
string err_msg = 2;
VolumeInstance volume = 3;
}

message ListPathRequest {
Expand Down Expand Up @@ -49,5 +62,13 @@ message CopyPathRequest {
message CopyPathResponse {
bool ok = 1;
string object_id = 2;
string error_msg = 3;
string err_msg = 3;
}

message ListVolumesRequest {}

message ListVolumesResponse {
bool ok = 1;
string err_msg = 2;
repeated VolumeInstance volumes = 3;
}
35 changes: 26 additions & 9 deletions internal/repository/backend_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,10 +650,12 @@ func (r *PostgresBackendRepository) GetStubByExternalId(ctx context.Context, ext
return &stub, nil
}

// Volume

func (c *PostgresBackendRepository) GetVolume(ctx context.Context, workspaceId uint, name string) (*types.Volume, error) {
var volume types.Volume

queryGet := `SELECT id, external_id, name, workspace_id, created_at FROM volume WHERE name = $1 AND workspace_id = $2;`
queryGet := `SELECT id, external_id, name, workspace_id, created_at, updated_at FROM volume WHERE name = $1 AND workspace_id = $2;`

if err := c.client.GetContext(ctx, &volume, queryGet, name, workspaceId); err != nil {
return nil, err
Expand All @@ -663,22 +665,37 @@ func (c *PostgresBackendRepository) GetVolume(ctx context.Context, workspaceId u
}

func (c *PostgresBackendRepository) GetOrCreateVolume(ctx context.Context, workspaceId uint, name string) (*types.Volume, error) {
var volume *types.Volume
var err error

volume, err = c.GetVolume(ctx, workspaceId, name)
v, err := c.GetVolume(ctx, workspaceId, name)
if err == nil {
return volume, nil
return v, nil
}

queryCreate := `INSERT INTO volume (name, workspace_id) VALUES ($1, $2) RETURNING id, external_id, name, workspace_id, created_at;`
queryCreate := `INSERT INTO volume (name, workspace_id) VALUES ($1, $2) RETURNING id, external_id, name, workspace_id, created_at, updated_at;`

var volume types.Volume
err = c.client.GetContext(ctx, &volume, queryCreate, name, workspaceId)
if err != nil {
return &types.Volume{}, err
return nil, err
}

return &volume, nil
}

func (c *PostgresBackendRepository) ListVolumesWithRelated(ctx context.Context, workspaceId uint) ([]types.VolumeWithRelated, error) {
var volumes []types.VolumeWithRelated
query := `
SELECT v.id, v.external_id, v.name, v.workspace_id, v.created_at, v.updated_at, w.name as "workspace.name"
FROM volume v
JOIN workspace w ON v.workspace_id = w.id
WHERE v.workspace_id = $1;
`

err := c.client.SelectContext(ctx, &volumes, query, workspaceId)
if err != nil {
return nil, err
}

return volume, nil
return volumes, nil
}

// Deployment
Expand Down
1 change: 1 addition & 0 deletions internal/repository/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type BackendRepository interface {
GetStubByExternalId(ctx context.Context, externalId string) (*types.StubWithRelated, error)
GetVolume(ctx context.Context, workspaceId uint, name string) (*types.Volume, error)
GetOrCreateVolume(ctx context.Context, workspaceId uint, name string) (*types.Volume, error)
ListVolumesWithRelated(ctx context.Context, workspaceId uint) ([]types.VolumeWithRelated, error)
ListDeployments(ctx context.Context, filters types.DeploymentFilter) ([]types.DeploymentWithRelated, error)
ListDeploymentsPaginated(ctx context.Context, filters types.DeploymentFilter) (common.CursorPaginationInfo[types.DeploymentWithRelated], error)
GetLatestDeploymentByName(ctx context.Context, workspaceId uint, name string, stubType string) (*types.Deployment, error)
Expand Down
7 changes: 7 additions & 0 deletions internal/types/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ type Volume struct {
Id uint `db:"id"`
ExternalId string `db:"external_id"`
Name string `db:"name"`
Size uint64 // Populated by volume abstraction
WorkspaceId uint `db:"workspace_id"` // Foreign key to Workspace
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}

type VolumeWithRelated struct {
Volume
Workspace Workspace `db:"workspace" json:"workspace"`
}

type Deployment struct {
Expand Down
Loading

0 comments on commit df99692

Please sign in to comment.