Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: comply with lambdo definition #18

Merged
merged 5 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# ROOT_FS_STORAGE_DSN=
LAMBDO_URL="http://localhost:3000"
VM_STATE_URL=redis://localhost:6379
FUNCTION_STATE_STORAGE_DSN='host=localhost user=postgres password=postgres dbname=postgres port=5432 sslmode=disable TimeZone=Asia/Shanghai'
JWT_SECRET="I am so secret! Hopefully someone doesn't commit me.."
Expand Down
11 changes: 6 additions & 5 deletions database/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
)

type Function struct {
ID uuid.UUID `json:"id" ,gorm:"primarykey;type:uuid;default:gen_random_uuid()"`
Name string `json:"name"`
Description string `json:"description"`
Language string `json:"language"`
Built bool `json:"built"` // The builder has built the image for this function
ID uuid.UUID `json:"id" ,gorm:"primarykey;type:uuid;default:gen_random_uuid()"`
Name string `json:"name"`
Description string `json:"description"`
Language string `json:"language"`
Built bool `json:"built"` // The builder has built the image for this function
BuildTimestamp int64 `json:"build_timestamp"` // The timestamp of the last build in Unix time

OwnerID int `json:"owner_id"`
Owner User `json:"-"`
Expand Down
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package main

import (
"context"
"fmt"
"log"
"time"

"github.com/do4-2022/grobuzin/database"
"github.com/do4-2022/grobuzin/objectStorage"
"github.com/do4-2022/grobuzin/routes"
"github.com/do4-2022/grobuzin/scheduler"

Expand All @@ -19,6 +21,7 @@ import (

type Config struct {
// rootFsStorageDSN string `env:"ROOT_FS_STORAGE_DSN,notEmpty"`
LambdoURL string `env:"LAMBDO_URL,notEmpty"`
VMStateURL string `env:"VM_STATE_URL,notEmpty"`
FuntionStateStorageDSN string `env:"FUNCTION_STATE_STORAGE_DSN,notEmpty" envDefault:"host=localhost user=postgres password=postgres dbname=postgres port=5432 sslmode=disable TimeZone=Asia/Shanghai"`
JWTSecret string `env:"JWT_SECRET,notEmpty"`
Expand All @@ -41,6 +44,14 @@ func main() {
s := &scheduler.Scheduler{
Redis: redis,
Context: &ctx,
Lambdo: &scheduler.LambdoService{
URL: cfg.LambdoURL,
BucketURL: fmt.Sprint(
cfg.MinioEndpoint,
"/",
objectStorage.BucketName,
),
},
}
//Now inject the scheduler into the routes that need it!

Expand Down
22 changes: 12 additions & 10 deletions objectStorage/codeStorageService.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"github.com/minio/minio-go/v7"
)

// Constants starting with a capital letter are exported
const (
bucketName = "functions"
BucketName = "functions"
codeFileSuffix = "/code.json"
RooFSFile = "/rootfs.ext4"
location = "eu-west-1"
)

Expand All @@ -24,17 +26,17 @@ type CodeStorageService struct {
func (service *CodeStorageService) Init() {

ctx := context.Background()
err := service.MinioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: location})
err := service.MinioClient.MakeBucket(ctx, BucketName, minio.MakeBucketOptions{Region: location})
if err != nil {
// Check to see if we already own this bucket (which happens if you run this twice)
exists, errBucketExists := service.MinioClient.BucketExists(ctx, bucketName)
exists, errBucketExists := service.MinioClient.BucketExists(ctx, BucketName)
if errBucketExists == nil && exists {
log.Printf("We already own %s\n", bucketName)
log.Printf("We already own %s\n", BucketName)
} else {
log.Fatalln(err)
}
} else {
log.Printf("Successfully created %s\n", bucketName)
log.Printf("Successfully created %s\n", BucketName)
}
}
func (service *CodeStorageService) PutCode(id uuid.UUID, files map[string]string) {
Expand All @@ -50,7 +52,7 @@ func (service *CodeStorageService) PutCode(id uuid.UUID, files map[string]string

reader := bytes.NewReader([]byte(jsonFiles))

_, err = service.MinioClient.PutObject(ctx, bucketName, filePath, reader, -1, minio.PutObjectOptions{ContentType: contentType})
_, err = service.MinioClient.PutObject(ctx, BucketName, filePath, reader, -1, minio.PutObjectOptions{ContentType: contentType})
if err != nil {
log.Fatalln(err)
}
Expand All @@ -60,7 +62,7 @@ func (service *CodeStorageService) GetCode(id uuid.UUID) (map[string]string, err
ctx := context.Background()
filePath := id.String() + codeFileSuffix

object, err := service.MinioClient.GetObject(ctx, bucketName, filePath, minio.GetObjectOptions{})
object, err := service.MinioClient.GetObject(ctx, BucketName, filePath, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
Expand All @@ -84,7 +86,7 @@ func (service *CodeStorageService) DeleteCode(id uuid.UUID) error {
ctx := context.Background()
filePath := id.String() + codeFileSuffix

err := service.MinioClient.RemoveObject(ctx, bucketName, filePath, minio.RemoveObjectOptions{})
err := service.MinioClient.RemoveObject(ctx, BucketName, filePath, minio.RemoveObjectOptions{})
if err != nil {
return err
}
Expand All @@ -96,9 +98,9 @@ func (service *CodeStorageService) DeleteCode(id uuid.UUID) error {
func (service *CodeStorageService) DeleteRootFs(id uuid.UUID) error {
ctx := context.Background()

filePath := fmt.Sprintf("function/%s/rootfs.ext4", id)
filePath := fmt.Sprintf("function/%s/%s", id, RooFSFile)

err := service.MinioClient.RemoveObject(ctx, bucketName, filePath, minio.RemoveObjectOptions{})
err := service.MinioClient.RemoveObject(ctx, BucketName, filePath, minio.RemoveObjectOptions{})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion routes/function/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (c *Controller) RunFunction(ctx *gin.Context) {

// if the function does not have an instance, we create ask the scheduler to create one
if errors.Is(err, scheduler.ErrRecordNotFound) {
res, err := c.Scheduler.SpawnVM(fnID)
res, err := c.Scheduler.SpawnVM(fn)

if err != nil {
ctx.AbortWithStatusJSON(500, gin.H{"error": err.Error()})
Expand Down
40 changes: 31 additions & 9 deletions scheduler/lambdoService.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,74 @@ package scheduler

import (
"fmt"
"log"

"github.com/google/uuid"

"github.com/do4-2022/grobuzin/database"
"github.com/do4-2022/grobuzin/objectStorage"

"bytes"
"encoding/json"
"io"
"net/http"
)

const (
IDTimestampSeparator = "_"
)

type LambdoService struct {
URL string
MinioURL string
BucketURL string
}

type RootFSInfo struct {
ID string `json:"id"`
Location string `json:"location"`
}

type LambdoSpawnRequest struct {
// URL to the rootfs of the function
RootfsURL string `json:"rootfs"`
Rootfs RootFSInfo `json:"rootfs"`
// Ports that the virtual machine needs to be exposed
// right now we only support one port
RequestedPorts []uint16 `json:"requestedPorts"`
}

type LambdoSpawnResponse struct {
ID string `json:"ID"`
ID string `json:"id"`
// Ports mapped by lambdo, leading to the requested ports
Ports []uint16 `json:"ports"`
// this a tuple under the form [host_port, vm_port]
// for now we only support one port
Ports [][2]uint16 `json:"port_mapping"`
}

func (service *LambdoService) SpawnVM(function_id uuid.UUID) (data LambdoSpawnResponse, err error) {
func (service *LambdoService) SpawnVM(function database.Function) (data LambdoSpawnResponse, err error) {
var res *http.Response
defer func() {
if res != nil {
res.Body.Close()
}
}()

log.Println("Spawning VM for function", function.ID.String())

RootfsURL := fmt.Sprint(service.BucketURL, "/", function.ID.String(), objectStorage.RooFSFile)

body, err := json.Marshal(&LambdoSpawnRequest{
RootfsURL: fmt.Sprintf("%s/%s", service.MinioURL, function_id),
})
Rootfs: RootFSInfo{
ID: fmt.Sprint(function.ID.String(), IDTimestampSeparator, function.BuildTimestamp),
Location: RootfsURL,
},
RequestedPorts: []uint16{8080}, // for now there's only a gin gonic instance for the agent is serving on 8080
})

if err != nil {
return
}

res, err = http.Post(
fmt.Sprintf(service.URL, "/spawn"),
fmt.Sprint(service.URL, "/spawn"),
"application/json",
bytes.NewReader(body),
)
Expand Down
8 changes: 4 additions & 4 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,19 @@ func (s *Scheduler) LookForReadyInstance(functionId uuid.UUID, cursor uint64) (f
return fnState, 0, ErrRecordNotFound // we did not find anything thus, id is empty
}

func (s *Scheduler) SpawnVM(functionId uuid.UUID) (fnState database.FunctionState, err error) {
res, err := s.Lambdo.SpawnVM(functionId)
func (s *Scheduler) SpawnVM(function database.Function) (fnState database.FunctionState, err error) {
res, err := s.Lambdo.SpawnVM(function)

if (err != nil) {
return
}

stateID := fmt.Sprintf(functionId.String(), ":", res.ID)
stateID := fmt.Sprintf(function.ID.String(), ":", res.ID)

fnState = database.FunctionState{
ID: res.ID,
Address: s.Lambdo.URL,
Port: res.Ports[0],
Port: res.Ports[0][0],
Status: database.FnReady,
LastUsed: "never",
}
Expand Down
Loading