Skip to content

Commit

Permalink
update deployment model to include stub_type (#43)
Browse files Browse the repository at this point in the history
Co-authored-by: Luke Lombardi <[email protected]>
  • Loading branch information
luke-lombardi and Luke Lombardi authored Jan 11, 2024
1 parent f2cecb0 commit 8c16ee0
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 19 deletions.
9 changes: 8 additions & 1 deletion internal/abstractions/taskqueue/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strconv"

"github.com/beam-cloud/beam/internal/auth"
"github.com/beam-cloud/beam/internal/types"
"github.com/labstack/echo/v4"
)

Expand All @@ -23,6 +24,12 @@ func registerTaskQueueRoutes(g *echo.Group, tq *RedisTaskQueue) *taskQueueGroup
}

func (g *taskQueueGroup) TaskQueuePut(ctx echo.Context) error {
/*
TODO: support three different unmarshalling strategies
- explicit args/kwargs (nested under {"args", "kwargs"})
- just kwargs (key/value)
- just args (in list)
*/
cc, _ := ctx.(*auth.HttpAuthContext)

stubId := ctx.Param("stubId")
Expand All @@ -37,7 +44,7 @@ func (g *taskQueueGroup) TaskQueuePut(ctx echo.Context) error {
})
}

deployment, err := g.tq.backendRepo.GetDeploymentByNameAndVersion(ctx.Request().Context(), cc.AuthInfo.Workspace.Id, deploymentName, uint(version))
deployment, err := g.tq.backendRepo.GetDeploymentByNameAndVersion(ctx.Request().Context(), cc.AuthInfo.Workspace.Id, deploymentName, uint(version), types.StubTypeTaskQueueDeployment)
if err != nil {
ctx.JSON(http.StatusBadRequest, map[string]interface{}{
"error": "invalid deployment",
Expand Down
4 changes: 2 additions & 2 deletions internal/gateway/services/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (gws *GatewayService) DeployStub(ctx context.Context, in *pb.DeployStubRequ
}, nil
}

lastestDeployment, err := gws.backendRepo.GetLatestDeploymentByName(ctx, authInfo.Workspace.Id, in.Name)
lastestDeployment, err := gws.backendRepo.GetLatestDeploymentByName(ctx, authInfo.Workspace.Id, in.Name, stub.Type)
if err != nil {
return &pb.DeployStubResponse{
Ok: false,
Expand All @@ -81,7 +81,7 @@ func (gws *GatewayService) DeployStub(ctx context.Context, in *pb.DeployStubRequ
version = lastestDeployment.Version + 1
}

deployment, err := gws.backendRepo.CreateDeployment(ctx, authInfo.Workspace.Id, in.Name, version, stub.Id)
deployment, err := gws.backendRepo.CreateDeployment(ctx, authInfo.Workspace.Id, in.Name, version, stub.Id, stub.Type)
if err != nil {
return &pb.DeployStubResponse{
Ok: false,
Expand Down
20 changes: 10 additions & 10 deletions internal/repository/backend_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,18 +410,18 @@ func (c *PostgresBackendRepository) GetOrCreateVolume(ctx context.Context, works

// Deployment

func (c *PostgresBackendRepository) GetLatestDeploymentByName(ctx context.Context, workspaceId uint, name string) (*types.Deployment, error) {
func (c *PostgresBackendRepository) GetLatestDeploymentByName(ctx context.Context, workspaceId uint, name string, stubType string) (*types.Deployment, error) {
var deployment types.Deployment

query := `
SELECT id, external_id, name, active, workspace_id, stub_id, version, created_at, updated_at
FROM deployment
WHERE workspace_id = $1 AND name = $2
WHERE workspace_id = $1 AND name = $2 AND stub_type = $3
ORDER BY version DESC
LIMIT 1;
`

err := c.client.GetContext(ctx, &deployment, query, workspaceId, name)
err := c.client.GetContext(ctx, &deployment, query, workspaceId, name, stubType)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil // Return nil if no deployment found
Expand All @@ -432,7 +432,7 @@ func (c *PostgresBackendRepository) GetLatestDeploymentByName(ctx context.Contex
return &deployment, nil
}

func (c *PostgresBackendRepository) GetDeploymentByNameAndVersion(ctx context.Context, workspaceId uint, name string, version uint) (*types.DeploymentWithRelated, error) {
func (c *PostgresBackendRepository) GetDeploymentByNameAndVersion(ctx context.Context, workspaceId uint, name string, version uint, stubType string) (*types.DeploymentWithRelated, error) {
var deploymentWithRelated types.DeploymentWithRelated

query := `
Expand All @@ -442,28 +442,28 @@ func (c *PostgresBackendRepository) GetDeploymentByNameAndVersion(ctx context.Co
FROM deployment d
JOIN workspace w ON d.workspace_id = w.id
JOIN stub s ON d.stub_id = s.id
WHERE d.workspace_id = $1 AND d.name = $2 AND d.version = $3
WHERE d.workspace_id = $1 AND d.name = $2 AND d.version = $3 AND d.stub_type = $4
LIMIT 1;
`

err := c.client.GetContext(ctx, &deploymentWithRelated, query, workspaceId, name, version)
err := c.client.GetContext(ctx, &deploymentWithRelated, query, workspaceId, name, version, stubType)
if err != nil {
return nil, err
}

return &deploymentWithRelated, nil
}

func (c *PostgresBackendRepository) CreateDeployment(ctx context.Context, workspaceId uint, name string, version uint, stubId uint) (*types.Deployment, error) {
func (c *PostgresBackendRepository) CreateDeployment(ctx context.Context, workspaceId uint, name string, version uint, stubId uint, stubType string) (*types.Deployment, error) {
var deployment types.Deployment

query := `
INSERT INTO deployment (name, active, workspace_id, stub_id, version)
VALUES ($1, true, $2, $3, $4)
INSERT INTO deployment (name, active, workspace_id, stub_id, version, stub_type)
VALUES ($1, true, $2, $3, $4, $5)
RETURNING id, external_id, name, active, workspace_id, stub_id, version, created_at, updated_at;
`

err := c.client.GetContext(ctx, &deployment, query, name, workspaceId, stubId, version)
err := c.client.GetContext(ctx, &deployment, query, name, workspaceId, stubId, version, stubType)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ func upCreateTables(tx *sql.Tx) error {
name VARCHAR(255) NOT NULL,
active BOOLEAN NOT NULL DEFAULT true,
workspace_id INT REFERENCES workspace(id),
stub_type stub_type NOT NULL,
stub_id INT REFERENCES stub(id),
version INTEGER NOT NULL DEFAULT 0 CHECK (version >= 0),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE (name, version, workspace_id)
UNIQUE (name, version, workspace_id, stub_type)
);`,

`CREATE TABLE IF NOT EXISTS task (
Expand Down
6 changes: 3 additions & 3 deletions internal/repository/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ type BackendRepository interface {
GetOrCreateStub(ctx context.Context, name, stubType string, config types.StubConfigV1, objectId, workspaceId uint, forceCreate bool) (types.Stub, error)
GetStubByExternalId(ctx context.Context, externalId string) (*types.StubWithRelated, error)
GetOrCreateVolume(ctx context.Context, workspaceId uint, name string) (*types.Volume, error)
GetLatestDeploymentByName(ctx context.Context, workspaceId uint, name string) (*types.Deployment, error)
GetDeploymentByNameAndVersion(ctx context.Context, workspaceId uint, name string, version uint) (*types.DeploymentWithRelated, error)
CreateDeployment(ctx context.Context, workspaceId uint, name string, version uint, stubId uint) (*types.Deployment, error)
GetLatestDeploymentByName(ctx context.Context, workspaceId uint, name string, stubType string) (*types.Deployment, error)
GetDeploymentByNameAndVersion(ctx context.Context, workspaceId uint, name string, version uint, stubType string) (*types.DeploymentWithRelated, error)
CreateDeployment(ctx context.Context, workspaceId uint, name string, version uint, stubId uint, stubType string) (*types.Deployment, error)
}

type BeamRepository interface {
Expand Down
7 changes: 5 additions & 2 deletions internal/types/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Deployment struct {
Active bool `db:"active"`
WorkspaceId uint `db:"workspace_id"` // Foreign key to Workspace
StubId uint `db:"stub_id"` // Foreign key to Stub
StubType string `db:"stub_type"`
Version uint `db:"version"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
Expand Down Expand Up @@ -167,8 +168,10 @@ type StubConfigV1 struct {
}

const (
StubTypeFunction string = "FUNCTION"
StubTypeTaskQueue string = "TASK_QUEUE"
StubTypeFunction string = "function"
StubTypeFunctionDeployment string = "function/deployment"
StubTypeTaskQueue string = "taskqueue"
StubTypeTaskQueueDeployment string = "taskqueue/deployment"
)

type Stub struct {
Expand Down

0 comments on commit 8c16ee0

Please sign in to comment.