Skip to content

Commit

Permalink
feat: stop time in dagger
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishan Arya committed Oct 7, 2024
1 parent e346119 commit 4301fdc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
6 changes: 6 additions & 0 deletions modules/dagger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/goto/entropy/core/module"
"github.com/goto/entropy/modules"
Expand Down Expand Up @@ -85,6 +86,10 @@ var (
validateConfig = validator.FromJSONSchema(configSchemaRaw)
)

type StartParams struct {
StopTime *time.Time `json:"stop_time"`
}

type UsageSpec struct {
CPU string `json:"cpu,omitempty" validate:"required"`
Memory string `json:"memory,omitempty" validate:"required"`
Expand Down Expand Up @@ -114,6 +119,7 @@ type Config struct {
State string `json:"state"`
JobState string `json:"job_state"`
ResetOffset string `json:"reset_offset"`
StopTime *time.Time `json:"stop_time,omitempty"`
}

type ChartValues struct {
Expand Down
18 changes: 14 additions & 4 deletions modules/dagger/driver_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (

const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"
const (
JobStateRunning = "running"
JobStateSuspended = "suspended"
StateDeployed = "DEPLOYED"
StateUserStopped = "USER_STOPPED"
JobStateRunning = "running"
JobStateSuspended = "suspended"
StateDeployed = "DEPLOYED"
StateUserStopped = "USER_STOPPED"
StateSystemStopped = "SYSTEM_STOPPED"
)

func (dd *daggerDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
Expand Down Expand Up @@ -113,7 +114,16 @@ func (dd *daggerDriver) planChange(exr module.ExpandedResource, act module.Actio
curConf.State = StateDeployed
curConf.JobState = JobStateRunning

var startParams StartParams
if err := json.Unmarshal(act.Params, &startParams); err != nil {
return nil, errors.ErrInvalid.WithMsgf("invalid params for start action").WithCausef(err.Error())
}
if startParams.StopTime != nil {
curConf.StopTime = startParams.StopTime
}

}

immediately := dd.timeNow()

exr.Resource.Spec.Configs = modules.MustJSON(curConf)
Expand Down
10 changes: 10 additions & 0 deletions modules/dagger/driver_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ func (dd *daggerDriver) Sync(ctx context.Context, exr module.ExpandedResource) (
return &finalState, nil
}

finalState.NextSyncAt = conf.StopTime
if conf.StopTime != nil && conf.StopTime.Before(dd.timeNow()) {
conf.JobState = JobStateSuspended
conf.State = StateSystemStopped
if err := dd.releaseSync(ctx, exr.Resource, false, *conf, flinkOut.KubeCluster); err != nil {
return nil, err
}
finalState.NextSyncAt = nil
}

finalOut, err := dd.refreshOutput(ctx, exr.Resource, *conf, *out, flinkOut.KubeCluster)
if err != nil {
return nil, err
Expand Down

0 comments on commit 4301fdc

Please sign in to comment.