diff --git a/config/config.sample.toml b/config/config.sample.toml index 1ded883..818423a 100644 --- a/config/config.sample.toml +++ b/config/config.sample.toml @@ -36,6 +36,9 @@ max_concurrent_jobs = 2 # completion. A zero value means keeping failed jobs indefinitely. # The supported units of time are "m" (minutes), "h" (hours) and "d" (days). failed_jobs_retention_time = "30d" +# The image registry used to validate job runners. Defaults to the public +# Mattermost Docker registry (https://hub.docker.com/u/mattermost). +image_registry = "mattermost" # Kubernetes API optionally supports definining resource limits and requests on # a per job type basis. Example: diff --git a/public/job/job.go b/public/job/job.go index dd3379d..406ed36 100644 --- a/public/job/job.go +++ b/public/job/job.go @@ -24,17 +24,7 @@ const ( MinSupportedTranscriberVersion = "0.1.0" RecordingJobPrefix = "calls-recorder" TranscribingJobPrefix = "calls-transcriber" -) - -var ( - recorderRunnerREs = []*regexp.Regexp{ - regexp.MustCompile(`^mattermost/calls-recorder:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))$`), - regexp.MustCompile(`^mattermost/calls-recorder-daily:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))-dev$`), - } - transcriberRunnerREs = []*regexp.Regexp{ - regexp.MustCompile(`^mattermost/calls-transcriber:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))$`), - regexp.MustCompile(`^mattermost/calls-transcriber-daily:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))-dev$`), - } + ImageRegistryDefault = "mattermost" ) type ServiceConfig struct { @@ -58,26 +48,43 @@ type Config struct { type StopCb func(job Job, success bool) error -func (c ServiceConfig) IsValid() error { +func (c ServiceConfig) IsValid(registry string) error { if len(c.Runners) == 0 { return fmt.Errorf("invalid empty Runners") } + if registry == "" { + return fmt.Errorf("registry should not be empty") + } + for _, runner := range c.Runners { - if err := RunnerIsValid(runner); err != nil { + if err := RunnerIsValid(runner, registry); err != nil { return err } } return nil } -func RunnerIsValid(runner string) error { +func RunnerIsValid(runner, registry string) error { if os.Getenv("DEV_MODE") == "true" || os.Getenv("TEST_MODE") == "true" { return nil } if runner == "" { - return fmt.Errorf("should not be empty") + return fmt.Errorf("runner should not be empty") + } + + if registry == "" { + return fmt.Errorf("registry should not be empty") + } + + recorderRunnerREs := []*regexp.Regexp{ + regexp.MustCompile(fmt.Sprintf(`^%s/%s:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))$`, registry, RecordingJobPrefix)), + regexp.MustCompile(fmt.Sprintf(`^%s/%s-daily:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))-dev$`, registry, RecordingJobPrefix)), + } + transcriberRunnerREs := []*regexp.Regexp{ + regexp.MustCompile(fmt.Sprintf(`^%s/%s:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))$`, registry, TranscribingJobPrefix)), + regexp.MustCompile(fmt.Sprintf(`^%s/%s-daily:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))-dev$`, registry, TranscribingJobPrefix)), } for _, re := range recorderRunnerREs { @@ -95,12 +102,12 @@ func RunnerIsValid(runner string) error { return fmt.Errorf("failed to validate runner %q", runner) } -func (c Config) IsValid() error { +func (c Config) IsValid(registry string) error { if c.Type == "" { return fmt.Errorf("invalid Type value: should not be empty") } - if err := RunnerIsValid(c.Runner); err != nil { + if err := RunnerIsValid(c.Runner, registry); err != nil { return fmt.Errorf("invalid Runner value: %w", err) } diff --git a/public/job/job_test.go b/public/job/job_test.go index b922674..29c2261 100644 --- a/public/job/job_test.go +++ b/public/job/job_test.go @@ -24,6 +24,7 @@ func TestJobConfigIsValid(t *testing.T) { tcs := []struct { name string cfg Config + registry string expectedError string }{ { @@ -36,7 +37,16 @@ func TestJobConfigIsValid(t *testing.T) { cfg: Config{ Type: TypeRecording, }, - expectedError: "invalid Runner value: should not be empty", + registry: ImageRegistryDefault, + expectedError: "invalid Runner value: runner should not be empty", + }, + { + name: "empty registry", + cfg: Config{ + Type: TypeRecording, + Runner: "testrepo/calls-recorder:v0.1.0", + }, + expectedError: "invalid Runner value: registry should not be empty", }, { name: "invalid runner", @@ -44,6 +54,7 @@ func TestJobConfigIsValid(t *testing.T) { Type: TypeRecording, Runner: "testrepo/calls-recorder:v0.1.0", }, + registry: ImageRegistryDefault, expectedError: `invalid Runner value: failed to validate runner "testrepo/calls-recorder:v0.1.0"`, }, { @@ -52,6 +63,7 @@ func TestJobConfigIsValid(t *testing.T) { Type: TypeRecording, Runner: "testrepo/calls-recorder@sha256:abcde", }, + registry: ImageRegistryDefault, expectedError: `invalid Runner value: failed to validate runner "testrepo/calls-recorder@sha256:abcde"`, }, { @@ -62,6 +74,7 @@ func TestJobConfigIsValid(t *testing.T) { InputData: recorderCfg.ToMap(), MaxDurationSec: -1, }, + registry: ImageRegistryDefault, expectedError: "invalid MaxDurationSec value: should be positive", }, { @@ -71,6 +84,7 @@ func TestJobConfigIsValid(t *testing.T) { Runner: "mattermost/calls-recorder:v" + MinSupportedRecorderVersion, MaxDurationSec: 60, }, + registry: ImageRegistryDefault, expectedError: "invalid Type value: \"invalid\"", }, { @@ -80,8 +94,20 @@ func TestJobConfigIsValid(t *testing.T) { Runner: "mattermost/calls-recorder:v0.1.0", InputData: recorderCfg.ToMap(), }, + registry: ImageRegistryDefault, expectedError: fmt.Sprintf("invalid Runner value: actual version (0.1.0) is lower than minimum supported version (%s)", MinSupportedRecorderVersion), }, + { + name: "invalid registry", + cfg: Config{ + Type: TypeRecording, + Runner: "mattermost/calls-recorder:v" + MinSupportedRecorderVersion, + InputData: recorderCfg.ToMap(), + MaxDurationSec: 60, + }, + registry: "custom", + expectedError: fmt.Sprintf("invalid Runner value: failed to validate runner \"mattermost/calls-recorder:v%s\"", MinSupportedRecorderVersion), + }, { name: "valid", cfg: Config{ @@ -90,6 +116,7 @@ func TestJobConfigIsValid(t *testing.T) { InputData: recorderCfg.ToMap(), MaxDurationSec: 60, }, + registry: ImageRegistryDefault, }, { name: "valid daily", @@ -99,12 +126,23 @@ func TestJobConfigIsValid(t *testing.T) { InputData: recorderCfg.ToMap(), MaxDurationSec: 60, }, + registry: ImageRegistryDefault, + }, + { + name: "valid, non default registry", + cfg: Config{ + Type: TypeRecording, + Runner: "custom/calls-recorder:v" + MinSupportedRecorderVersion, + InputData: recorderCfg.ToMap(), + MaxDurationSec: 60, + }, + registry: "custom", }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - err := tc.cfg.IsValid() + err := tc.cfg.IsValid(tc.registry) if tc.expectedError == "" { require.NoError(t, err) } else { @@ -138,7 +176,7 @@ func TestServiceConfigIsValid(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - err := tc.cfg.IsValid() + err := tc.cfg.IsValid(ImageRegistryDefault) if tc.err == "" { require.NoError(t, err) } else { diff --git a/service/config.go b/service/config.go index 2dd5d65..3865eec 100644 --- a/service/config.go +++ b/service/config.go @@ -11,6 +11,7 @@ import ( "time" "github.com/mattermost/calls-offloader/logger" + "github.com/mattermost/calls-offloader/public/job" "github.com/mattermost/calls-offloader/service/api" "github.com/mattermost/calls-offloader/service/auth" "github.com/mattermost/calls-offloader/service/docker" @@ -96,6 +97,7 @@ type JobsConfig struct { APIType JobAPIType `toml:"api_type"` MaxConcurrentJobs int `toml:"max_concurrent_jobs"` FailedJobsRetentionTime RetentionTime `toml:"failed_jobs_retention_time" ignored:"true"` + ImageRegistry string `toml:"image_registry"` Kubernetes kubernetes.JobServiceConfig `toml:"kubernetes"` Docker docker.JobServiceConfig `toml:"docker"` } @@ -194,6 +196,7 @@ func (c *Config) SetDefaults() { c.Store.DataSource = "/tmp/calls-offloader-db" c.Jobs.APIType = JobAPITypeDocker c.Jobs.MaxConcurrentJobs = 2 + c.Jobs.ImageRegistry = job.ImageRegistryDefault c.Logger.EnableConsole = true c.Logger.ConsoleJSON = false c.Logger.ConsoleLevel = "INFO" diff --git a/service/docker/service.go b/service/docker/service.go index 0a76f2e..e19aea7 100644 --- a/service/docker/service.go +++ b/service/docker/service.go @@ -42,6 +42,7 @@ var ( type JobServiceConfig struct { MaxConcurrentJobs int FailedJobsRetentionTime time.Duration + ImageRegistry string } func (c JobServiceConfig) IsValid() error { @@ -240,7 +241,7 @@ func (s *JobService) updateJobRunner(runner string) error { } func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, error) { - if err := cfg.IsValid(); err != nil { + if err := cfg.IsValid(s.cfg.ImageRegistry); err != nil { return job.Job{}, fmt.Errorf("invalid job config: %w", err) } diff --git a/service/helper_test.go b/service/helper_test.go index 1f5a735..ea358d8 100644 --- a/service/helper_test.go +++ b/service/helper_test.go @@ -10,6 +10,7 @@ import ( "github.com/mattermost/calls-offloader/logger" "github.com/mattermost/calls-offloader/public" + "github.com/mattermost/calls-offloader/public/job" "github.com/mattermost/calls-offloader/service/api" "github.com/mattermost/calls-offloader/service/auth" "github.com/mattermost/calls-offloader/service/docker" @@ -87,6 +88,7 @@ func MakeDefaultCfg(tb testing.TB) *Config { Jobs: JobsConfig{ APIType: JobAPITypeDocker, MaxConcurrentJobs: 2, + ImageRegistry: job.ImageRegistryDefault, Docker: docker.JobServiceConfig{ MaxConcurrentJobs: 2, }, diff --git a/service/jobs_api.go b/service/jobs_api.go index 1f2f04e..863508f 100644 --- a/service/jobs_api.go +++ b/service/jobs_api.go @@ -36,7 +36,7 @@ func (s *Service) handleCreateJob(w http.ResponseWriter, r *http.Request) { return } - if err := cfg.IsValid(); err != nil { + if err := cfg.IsValid(s.cfg.Jobs.ImageRegistry); err != nil { data.err = err.Error() data.code = http.StatusBadRequest return @@ -212,7 +212,7 @@ func (s *Service) handleInit(w http.ResponseWriter, r *http.Request) { return } - if err := cfg.IsValid(); err != nil { + if err := cfg.IsValid(s.cfg.Jobs.ImageRegistry); err != nil { data.err = "invalid job service config: " + err.Error() data.code = http.StatusBadRequest return diff --git a/service/jobs_service.go b/service/jobs_service.go index 36f7cd8..49d33ca 100644 --- a/service/jobs_service.go +++ b/service/jobs_service.go @@ -32,11 +32,13 @@ func NewJobService(cfg JobsConfig, log mlog.LoggerIFace) (JobService, error) { case JobAPITypeDocker: cfg.Docker.MaxConcurrentJobs = cfg.MaxConcurrentJobs cfg.Docker.FailedJobsRetentionTime = time.Duration(cfg.FailedJobsRetentionTime) + cfg.Docker.ImageRegistry = cfg.ImageRegistry log.Info("creating new job service", mlog.Any("apiType", cfg.APIType), mlog.String("config", fmt.Sprintf("%+v", cfg.Docker))) return docker.NewJobService(log, cfg.Docker) case JobAPITypeKubernetes: cfg.Kubernetes.MaxConcurrentJobs = cfg.MaxConcurrentJobs cfg.Kubernetes.FailedJobsRetentionTime = time.Duration(cfg.FailedJobsRetentionTime) + cfg.Kubernetes.ImageRegistry = cfg.ImageRegistry log.Info("creating new job service", mlog.Any("apiType", cfg.APIType), mlog.String("config", fmt.Sprintf("%+v", cfg.Kubernetes))) return kubernetes.NewJobService(log, cfg.Kubernetes) default: diff --git a/service/kubernetes/service.go b/service/kubernetes/service.go index fccd7df..989012a 100644 --- a/service/kubernetes/service.go +++ b/service/kubernetes/service.go @@ -56,6 +56,7 @@ func (r *JobsResourceRequirements) UnmarshalTOML(data interface{}) error { type JobServiceConfig struct { MaxConcurrentJobs int FailedJobsRetentionTime time.Duration + ImageRegistry string JobsResourceRequirements JobsResourceRequirements `toml:"jobs_resource_requirements"` } @@ -124,7 +125,7 @@ func (s *JobService) Init(_ job.ServiceConfig) error { } func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, error) { - if err := cfg.IsValid(); err != nil { + if err := cfg.IsValid(s.cfg.ImageRegistry); err != nil { return job.Job{}, fmt.Errorf("invalid job config: %w", err) }