Skip to content

Commit

Permalink
feat: support async runs (#1312)
Browse files Browse the repository at this point in the history
* feat: support async preview

* feat: support async executions, max concurrent async count, max buffer zones, sets logs and results
  • Loading branch information
ffforest authored Nov 13, 2024
1 parent 21b0d49 commit ec0231b
Show file tree
Hide file tree
Showing 41 changed files with 1,867 additions and 187 deletions.
24 changes: 14 additions & 10 deletions pkg/cmd/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import (

func NewServerOptions() *ServerOptions {
return &ServerOptions{
Mode: DefaultMode,
Port: DefaultPort,
AuthEnabled: false,
AuthWhitelist: []string{},
AuthKeyType: DefaultAuthKeyType,
Database: DatabaseOptions{},
DefaultBackend: DefaultBackendOptions{},
DefaultSource: DefaultSourceOptions{},
MaxConcurrent: constant.MaxConcurrent,
LogFilePath: constant.DefaultLogFilePath,
Mode: DefaultMode,
Port: DefaultPort,
AuthEnabled: false,
AuthWhitelist: []string{},
AuthKeyType: DefaultAuthKeyType,
Database: DatabaseOptions{},
DefaultBackend: DefaultBackendOptions{},
DefaultSource: DefaultSourceOptions{},
MaxConcurrent: constant.MaxConcurrent,
MaxAsyncConcurrent: constant.MaxAsyncConcurrent,
MaxAsyncBuffer: constant.MaxAsyncBuffer,
LogFilePath: constant.DefaultLogFilePath,
}
}

Expand All @@ -37,6 +39,8 @@ func (o *ServerOptions) Config() (*server.Config, error) {
cfg.AuthWhitelist = o.AuthWhitelist
cfg.AuthKeyType = o.AuthKeyType
cfg.MaxConcurrent = o.MaxConcurrent
cfg.MaxAsyncConcurrent = o.MaxAsyncConcurrent
cfg.MaxAsyncBuffer = o.MaxAsyncBuffer
cfg.LogFilePath = o.LogFilePath
return cfg, nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (o *ServerOptions) AddServerFlags(cmd *cobra.Command) {
i18n.T("Specify the auth key type. Default to RSA"))
cmd.Flags().IntVarP(&o.MaxConcurrent, "max-concurrent", "", 10,
i18n.T("Maximum number of concurrent executions including preview, apply and destroy. Default to 10."))
cmd.Flags().IntVarP(&o.MaxAsyncBuffer, "max-async-buffer", "", 100,
i18n.T("Maximum number of buffer zones during concurrent async executions including generate, preview, apply and destroy. Default to 100."))
cmd.Flags().IntVarP(&o.MaxAsyncConcurrent, "max-async-concurrent", "", 10,
i18n.T("Maximum number of concurrent async executions including generate, preview, apply and destroy. Default to 10."))
cmd.Flags().StringVarP(&o.LogFilePath, "log-file-path", "", constant.DefaultLogFilePath,
i18n.T("File path to write logs to. Default to /home/admin/logs/kusion.log"))
o.Database.AddFlags(cmd.Flags())
Expand Down
22 changes: 12 additions & 10 deletions pkg/cmd/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (
)

type ServerOptions struct {
Mode string
Port int
AuthEnabled bool
AuthWhitelist []string
AuthKeyType string
Database DatabaseOptions
DefaultBackend DefaultBackendOptions
DefaultSource DefaultSourceOptions
MaxConcurrent int
LogFilePath string
Mode string
Port int
AuthEnabled bool
AuthWhitelist []string
AuthKeyType string
Database DatabaseOptions
DefaultBackend DefaultBackendOptions
DefaultSource DefaultSourceOptions
MaxConcurrent int
MaxAsyncConcurrent int
MaxAsyncBuffer int
LogFilePath string
}

type Options interface {
Expand Down
25 changes: 16 additions & 9 deletions pkg/domain/constant/global.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package constant

import "time"

// These constants represent the possible states of a stack.
const (
DefaultUser = "test.user"
DefaultWorkspace = "default"
DefaultBackend = "default"
DefaultOrgOwner = "kusion"
DefaultSourceType = SourceProviderTypeGit
DefaultSourceDesc = "Default source"
DefaultSystemName = "kusion"
MaxConcurrent = 10
DefaultLogFilePath = "/home/admin/logs/kusion.log"
DefaultUser = "test.user"
DefaultWorkspace = "default"
DefaultBackend = "default"
DefaultOrgOwner = "kusion"
DefaultSourceType = SourceProviderTypeGit
DefaultSourceDesc = "Default source"
DefaultSystemName = "kusion"
DefaultReleaseNamespace = "server"
MaxConcurrent = 10
MaxAsyncConcurrent = 1
MaxAsyncBuffer = 100
DefaultLogFilePath = "/home/admin/logs/kusion.log"
RepoCacheTTL = 60 * time.Minute
RunTimeOut = 60 * time.Minute
)
61 changes: 61 additions & 0 deletions pkg/domain/constant/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package constant

import (
"fmt"
)

type (
RunType string
RunStatus string
)

const (
RunTypeGenerate RunType = "Generate"
RunTypePreview RunType = "Preview"
RunTypeApply RunType = "Apply"
RunTypeDestroy RunType = "Destroy"
RunStatusScheduling RunStatus = "Scheduling"
RunStatusInProgress RunStatus = "InProgress"
RunStatusFailed RunStatus = "Failed"
RunStatusSucceeded RunStatus = "Succeeded"
RunStatusCancelled RunStatus = "Cancelled"
RunStatusQueued RunStatus = "Queued"
)

// ParseRunType parses a string into a RunType.
// If the string is not a valid RunType, it returns an error.
func ParseRunType(s string) (RunType, error) {
switch s {
case string(RunTypeGenerate):
return RunTypeGenerate, nil
case string(RunTypePreview):
return RunTypePreview, nil
case string(RunTypeApply):
return RunTypeApply, nil
case string(RunTypeDestroy):
return RunTypeDestroy, nil
default:
return RunType(""), fmt.Errorf("invalid RunType: %q", s)
}
}

// ParseRunStatus parses a string into a RunStatus.
// If the string is not a valid RunStatus, it returns an error.
func ParseRunStatus(s string) (RunStatus, error) {
switch s {
case string(RunStatusScheduling):
return RunStatusScheduling, nil
case string(RunStatusInProgress):
return RunStatusInProgress, nil
case string(RunStatusFailed):
return RunStatusFailed, nil
case string(RunStatusSucceeded):
return RunStatusSucceeded, nil
case string(RunStatusCancelled):
return RunStatusCancelled, nil
case string(RunStatusQueued):
return RunStatusQueued, nil
default:
return RunStatus(""), fmt.Errorf("invalid RunType: %q", s)
}
}
72 changes: 72 additions & 0 deletions pkg/domain/entity/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package entity

import (
"fmt"
"time"

"kusionstack.io/kusion/pkg/domain/constant"
)

// Run represents the specific run, including type
// which should be a specific instance of the run provider.
type Run struct {
// ID is the id of the run.
ID uint `yaml:"id" json:"id"`
// RunType is the type of the run provider.
Type constant.RunType `yaml:"type" json:"type"`
// Stack is the stack of the run.
Stack *Stack `yaml:"stack" json:"stack"`
// Workspace is the target workspace of the run.
Workspace string `yaml:"workspace" json:"workspace"`
// Status is the status of the run.
Status constant.RunStatus `yaml:"status" json:"status"`
// Result is the result of the run.
Result string `yaml:"result" json:"result"`
// Result RunResult `yaml:"result" json:"result"`
// Logs is the logs of the run.
Logs string `yaml:"logs" json:"logs"`
// CreationTimestamp is the timestamp of the created for the run.
CreationTimestamp time.Time `yaml:"creationTimestamp,omitempty" json:"creationTimestamp,omitempty"`
// UpdateTimestamp is the timestamp of the updated for the run.
UpdateTimestamp time.Time `yaml:"updateTimestamp,omitempty" json:"updateTimestamp,omitempty"`
}

// RunResult represents the result of the run.
type RunResult struct {
// ExitCode is the exit code of the run.
ExitCode int `yaml:"exitCode" json:"exitCode"`
// Message is the message of the run.
Message string `yaml:"message" json:"message"`
// Old is the old state of the run.
Old string `yaml:"old" json:"old"`
// New is the new state of the run.
New string `yaml:"new" json:"new"`
}

type RunFilter struct {
ProjectID uint
StackID uint
Workspace string
}

// Validate checks if the run is valid.
// It returns an error if the run is not valid.
func (r *Run) Validate() error {
if r == nil {
return fmt.Errorf("run is nil")
}

if r.Type == "" {
return fmt.Errorf("run must have a run type")
}

if r.Workspace == "" {
return fmt.Errorf("run must have a target workspace")
}

return nil
}

func (r *Run) Summary() string {
return fmt.Sprintf("[%s][%s]", string(r.Type), string(r.Status))
}
6 changes: 6 additions & 0 deletions pkg/domain/entity/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type Source struct {
// ID is the id of the source.
ID uint `yaml:"id" json:"id"`
// Name is the name of the source.
Name string `yaml:"name" json:"name"`
// SourceProvider is the type of the source provider.
SourceProvider constant.SourceProviderType `yaml:"sourceProvider" json:"sourceProvider"`
// Remote is the source URL, including scheme.
Expand All @@ -36,6 +38,10 @@ func (s *Source) Validate() error {
return fmt.Errorf("source is nil")
}

if s.Name == "" {
return fmt.Errorf("source must have a name")
}

if s.SourceProvider == "" {
return fmt.Errorf("source must have a source provider")
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/domain/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,18 @@ type ModuleRepository interface {
// List retrives all the existing modules.
List(ctx context.Context) ([]*entity.Module, error)
}

// RunRepository is an interface that defines the repository operations
// for runs. It follows the principles of domain-driven design (DDD).
type RunRepository interface {
// Create creates a new run.
Create(ctx context.Context, run *entity.Run) error
// Delete deletes a run by its ID.
Delete(ctx context.Context, id uint) error
// Update updates an existing run.
Update(ctx context.Context, run *entity.Run) error
// Get retrieves a run by its ID.
Get(ctx context.Context, id uint) (*entity.Run, error)
// List retrieves all existing run.
List(ctx context.Context, filter *entity.RunFilter) ([]*entity.Run, error)
}
21 changes: 21 additions & 0 deletions pkg/domain/request/execute_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,24 @@ type StackImportRequest struct {
func (payload *StackImportRequest) Decode(r *http.Request) error {
return decode(r, payload)
}

type CreateRunRequest struct {
Type string `json:"type"`
StackID uint `json:"stackID"`
Workspace string `json:"workspace"`
ImportedResources StackImportRequest `json:"importedResources"`
}

type UpdateRunRequest struct {
CreateRunRequest `json:",inline" yaml:",inline"`
}

type UpdateRunResultRequest struct {
Result string `json:"result"`
Status string `json:"status"`
Logs string `json:"logs"`
}

func (payload *CreateRunRequest) Decode(r *http.Request) error {
return decode(r, payload)
}
15 changes: 4 additions & 11 deletions pkg/domain/request/source_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import "net/http"
// CreateSourceRequest represents the create request structure for
// source.
type CreateSourceRequest struct {
// Name is the name of the source.
Name string `json:"name" binding:"required"`
// SourceProvider is the type of the source provider.
SourceProvider string `json:"sourceProvider" binding:"required"`
// Remote is the source URL, including scheme.
Expand All @@ -21,17 +23,8 @@ type CreateSourceRequest struct {
// source.
type UpdateSourceRequest struct {
// ID is the id of the source.
ID uint `json:"id" binding:"required"`
// SourceProvider is the type of the source provider.
SourceProvider string `json:"sourceProvider"`
// Remote is the source URL, including scheme.
Remote string `json:"remote"`
// Description is a human-readable description of the source.
Description string `json:"description"`
// Labels are custom labels associated with the source.
Labels []string `json:"labels"`
// Owners is a list of owners for the source.
Owners []string `json:"owners"`
ID uint `json:"id" binding:"required"`
CreateSourceRequest `json:",inline" yaml:",inline"`
}

func (payload *CreateSourceRequest) Decode(r *http.Request) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/engine/api/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func Apply(
o *APIOptions,
storage release.Storage,
rel *apiv1.Release,
gph *apiv1.Graph,
changes *models.Changes,
out io.Writer,
) (*apiv1.Release, error) {
Expand Down Expand Up @@ -135,6 +136,7 @@ func Apply(
Stack: changes.Stack(),
},
Release: rel,
Graph: gph,
})
if v1.IsErr(st) {
return nil, fmt.Errorf("apply failed, status:\n%v", st)
Expand Down
7 changes: 3 additions & 4 deletions pkg/engine/api/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestApply(t *testing.T) {
changes := models.NewChanges(proj, stack, order)
o := &APIOptions{}
o.DryRun = true
_, err := Apply(context.TODO(), o, &releasestorages.LocalStorage{}, rel, changes, os.Stdout)
_, err := Apply(context.TODO(), o, &releasestorages.LocalStorage{}, rel, &apiv1.Graph{}, changes, os.Stdout)
assert.Nil(t, err)
})
mockey.PatchConvey("apply success", t, func() {
Expand All @@ -86,7 +86,7 @@ func TestApply(t *testing.T) {
}
changes := models.NewChanges(proj, stack, order)

_, err := Apply(context.TODO(), o, &releasestorages.LocalStorage{}, rel, changes, os.Stdout)
_, err := Apply(context.TODO(), o, &releasestorages.LocalStorage{}, rel, &apiv1.Graph{}, changes, os.Stdout)
assert.Nil(t, err)
})
mockey.PatchConvey("apply failed", t, func() {
Expand All @@ -105,8 +105,7 @@ func TestApply(t *testing.T) {
},
}
changes := models.NewChanges(proj, stack, order)

_, err := Apply(context.TODO(), o, &releasestorages.LocalStorage{}, rel, changes, os.Stdout)
_, err := Apply(context.TODO(), o, &releasestorages.LocalStorage{}, rel, &apiv1.Graph{}, changes, os.Stdout)
assert.NotNil(t, err)
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/infra/persistence/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestProjectRepository(t *testing.T) {
Name: "mockedProject",
Source: &entity.Source{
ID: 1,
Name: "mockedSource",
SourceProvider: constant.SourceProviderTypeGithub,
Remote: mockRemoteURL,
},
Expand Down
Loading

0 comments on commit ec0231b

Please sign in to comment.