Skip to content

Commit

Permalink
[MM-53555] Support defining image registry through configuration (#50)
Browse files Browse the repository at this point in the history
* Support defining image registry through configuration

* Update docs
  • Loading branch information
streamer45 authored Jan 22, 2024
1 parent 94edfd5 commit 79c931a
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 24 deletions.
3 changes: 3 additions & 0 deletions config/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ max_concurrent_jobs = 2
# completion. A zero value means keeping failed jobs indefinitely.
# The supported units of time are "m" (minutes), "h" (hours) and "d" (days).
failed_jobs_retention_time = "30d"
# The image registry used to validate job runners. Defaults to the public
# Mattermost Docker registry (https://hub.docker.com/u/mattermost).
image_registry = "mattermost"

# Kubernetes API optionally supports definining resource limits and requests on
# a per job type basis. Example:
Expand Down
41 changes: 24 additions & 17 deletions public/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,7 @@ const (
MinSupportedTranscriberVersion = "0.1.0"
RecordingJobPrefix = "calls-recorder"
TranscribingJobPrefix = "calls-transcriber"
)

var (
recorderRunnerREs = []*regexp.Regexp{
regexp.MustCompile(`^mattermost/calls-recorder:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))$`),
regexp.MustCompile(`^mattermost/calls-recorder-daily:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))-dev$`),
}
transcriberRunnerREs = []*regexp.Regexp{
regexp.MustCompile(`^mattermost/calls-transcriber:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))$`),
regexp.MustCompile(`^mattermost/calls-transcriber-daily:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))-dev$`),
}
ImageRegistryDefault = "mattermost"
)

type ServiceConfig struct {
Expand All @@ -58,26 +48,43 @@ type Config struct {

type StopCb func(job Job, success bool) error

func (c ServiceConfig) IsValid() error {
func (c ServiceConfig) IsValid(registry string) error {
if len(c.Runners) == 0 {
return fmt.Errorf("invalid empty Runners")
}

if registry == "" {
return fmt.Errorf("registry should not be empty")
}

for _, runner := range c.Runners {
if err := RunnerIsValid(runner); err != nil {
if err := RunnerIsValid(runner, registry); err != nil {
return err
}
}
return nil
}

func RunnerIsValid(runner string) error {
func RunnerIsValid(runner, registry string) error {
if os.Getenv("DEV_MODE") == "true" || os.Getenv("TEST_MODE") == "true" {
return nil
}

if runner == "" {
return fmt.Errorf("should not be empty")
return fmt.Errorf("runner should not be empty")
}

if registry == "" {
return fmt.Errorf("registry should not be empty")
}

recorderRunnerREs := []*regexp.Regexp{
regexp.MustCompile(fmt.Sprintf(`^%s/%s:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))$`, registry, RecordingJobPrefix)),
regexp.MustCompile(fmt.Sprintf(`^%s/%s-daily:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))-dev$`, registry, RecordingJobPrefix)),
}
transcriberRunnerREs := []*regexp.Regexp{
regexp.MustCompile(fmt.Sprintf(`^%s/%s:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))$`, registry, TranscribingJobPrefix)),
regexp.MustCompile(fmt.Sprintf(`^%s/%s-daily:v((?:0|[1-9]\d*)\.(?:0|[1-9]\d*)\.(?:0|[1-9]\d*))-dev$`, registry, TranscribingJobPrefix)),
}

for _, re := range recorderRunnerREs {
Expand All @@ -95,12 +102,12 @@ func RunnerIsValid(runner string) error {
return fmt.Errorf("failed to validate runner %q", runner)
}

func (c Config) IsValid() error {
func (c Config) IsValid(registry string) error {
if c.Type == "" {
return fmt.Errorf("invalid Type value: should not be empty")
}

if err := RunnerIsValid(c.Runner); err != nil {
if err := RunnerIsValid(c.Runner, registry); err != nil {
return fmt.Errorf("invalid Runner value: %w", err)
}

Expand Down
44 changes: 41 additions & 3 deletions public/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestJobConfigIsValid(t *testing.T) {
tcs := []struct {
name string
cfg Config
registry string
expectedError string
}{
{
Expand All @@ -36,14 +37,24 @@ func TestJobConfigIsValid(t *testing.T) {
cfg: Config{
Type: TypeRecording,
},
expectedError: "invalid Runner value: should not be empty",
registry: ImageRegistryDefault,
expectedError: "invalid Runner value: runner should not be empty",
},
{
name: "empty registry",
cfg: Config{
Type: TypeRecording,
Runner: "testrepo/calls-recorder:v0.1.0",
},
expectedError: "invalid Runner value: registry should not be empty",
},
{
name: "invalid runner",
cfg: Config{
Type: TypeRecording,
Runner: "testrepo/calls-recorder:v0.1.0",
},
registry: ImageRegistryDefault,
expectedError: `invalid Runner value: failed to validate runner "testrepo/calls-recorder:v0.1.0"`,
},
{
Expand All @@ -52,6 +63,7 @@ func TestJobConfigIsValid(t *testing.T) {
Type: TypeRecording,
Runner: "testrepo/calls-recorder@sha256:abcde",
},
registry: ImageRegistryDefault,
expectedError: `invalid Runner value: failed to validate runner "testrepo/calls-recorder@sha256:abcde"`,
},
{
Expand All @@ -62,6 +74,7 @@ func TestJobConfigIsValid(t *testing.T) {
InputData: recorderCfg.ToMap(),
MaxDurationSec: -1,
},
registry: ImageRegistryDefault,
expectedError: "invalid MaxDurationSec value: should be positive",
},
{
Expand All @@ -71,6 +84,7 @@ func TestJobConfigIsValid(t *testing.T) {
Runner: "mattermost/calls-recorder:v" + MinSupportedRecorderVersion,
MaxDurationSec: 60,
},
registry: ImageRegistryDefault,
expectedError: "invalid Type value: \"invalid\"",
},
{
Expand All @@ -80,8 +94,20 @@ func TestJobConfigIsValid(t *testing.T) {
Runner: "mattermost/calls-recorder:v0.1.0",
InputData: recorderCfg.ToMap(),
},
registry: ImageRegistryDefault,
expectedError: fmt.Sprintf("invalid Runner value: actual version (0.1.0) is lower than minimum supported version (%s)", MinSupportedRecorderVersion),
},
{
name: "invalid registry",
cfg: Config{
Type: TypeRecording,
Runner: "mattermost/calls-recorder:v" + MinSupportedRecorderVersion,
InputData: recorderCfg.ToMap(),
MaxDurationSec: 60,
},
registry: "custom",
expectedError: fmt.Sprintf("invalid Runner value: failed to validate runner \"mattermost/calls-recorder:v%s\"", MinSupportedRecorderVersion),
},
{
name: "valid",
cfg: Config{
Expand All @@ -90,6 +116,7 @@ func TestJobConfigIsValid(t *testing.T) {
InputData: recorderCfg.ToMap(),
MaxDurationSec: 60,
},
registry: ImageRegistryDefault,
},
{
name: "valid daily",
Expand All @@ -99,12 +126,23 @@ func TestJobConfigIsValid(t *testing.T) {
InputData: recorderCfg.ToMap(),
MaxDurationSec: 60,
},
registry: ImageRegistryDefault,
},
{
name: "valid, non default registry",
cfg: Config{
Type: TypeRecording,
Runner: "custom/calls-recorder:v" + MinSupportedRecorderVersion,
InputData: recorderCfg.ToMap(),
MaxDurationSec: 60,
},
registry: "custom",
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.IsValid()
err := tc.cfg.IsValid(tc.registry)
if tc.expectedError == "" {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -138,7 +176,7 @@ func TestServiceConfigIsValid(t *testing.T) {

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.IsValid()
err := tc.cfg.IsValid(ImageRegistryDefault)
if tc.err == "" {
require.NoError(t, err)
} else {
Expand Down
3 changes: 3 additions & 0 deletions service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/mattermost/calls-offloader/logger"
"github.com/mattermost/calls-offloader/public/job"
"github.com/mattermost/calls-offloader/service/api"
"github.com/mattermost/calls-offloader/service/auth"
"github.com/mattermost/calls-offloader/service/docker"
Expand Down Expand Up @@ -96,6 +97,7 @@ type JobsConfig struct {
APIType JobAPIType `toml:"api_type"`
MaxConcurrentJobs int `toml:"max_concurrent_jobs"`
FailedJobsRetentionTime RetentionTime `toml:"failed_jobs_retention_time" ignored:"true"`
ImageRegistry string `toml:"image_registry"`
Kubernetes kubernetes.JobServiceConfig `toml:"kubernetes"`
Docker docker.JobServiceConfig `toml:"docker"`
}
Expand Down Expand Up @@ -194,6 +196,7 @@ func (c *Config) SetDefaults() {
c.Store.DataSource = "/tmp/calls-offloader-db"
c.Jobs.APIType = JobAPITypeDocker
c.Jobs.MaxConcurrentJobs = 2
c.Jobs.ImageRegistry = job.ImageRegistryDefault
c.Logger.EnableConsole = true
c.Logger.ConsoleJSON = false
c.Logger.ConsoleLevel = "INFO"
Expand Down
3 changes: 2 additions & 1 deletion service/docker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
type JobServiceConfig struct {
MaxConcurrentJobs int
FailedJobsRetentionTime time.Duration
ImageRegistry string
}

func (c JobServiceConfig) IsValid() error {
Expand Down Expand Up @@ -240,7 +241,7 @@ func (s *JobService) updateJobRunner(runner string) error {
}

func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, error) {
if err := cfg.IsValid(); err != nil {
if err := cfg.IsValid(s.cfg.ImageRegistry); err != nil {
return job.Job{}, fmt.Errorf("invalid job config: %w", err)
}

Expand Down
2 changes: 2 additions & 0 deletions service/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/mattermost/calls-offloader/logger"
"github.com/mattermost/calls-offloader/public"
"github.com/mattermost/calls-offloader/public/job"
"github.com/mattermost/calls-offloader/service/api"
"github.com/mattermost/calls-offloader/service/auth"
"github.com/mattermost/calls-offloader/service/docker"
Expand Down Expand Up @@ -87,6 +88,7 @@ func MakeDefaultCfg(tb testing.TB) *Config {
Jobs: JobsConfig{
APIType: JobAPITypeDocker,
MaxConcurrentJobs: 2,
ImageRegistry: job.ImageRegistryDefault,
Docker: docker.JobServiceConfig{
MaxConcurrentJobs: 2,
},
Expand Down
4 changes: 2 additions & 2 deletions service/jobs_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s *Service) handleCreateJob(w http.ResponseWriter, r *http.Request) {
return
}

if err := cfg.IsValid(); err != nil {
if err := cfg.IsValid(s.cfg.Jobs.ImageRegistry); err != nil {
data.err = err.Error()
data.code = http.StatusBadRequest
return
Expand Down Expand Up @@ -212,7 +212,7 @@ func (s *Service) handleInit(w http.ResponseWriter, r *http.Request) {
return
}

if err := cfg.IsValid(); err != nil {
if err := cfg.IsValid(s.cfg.Jobs.ImageRegistry); err != nil {
data.err = "invalid job service config: " + err.Error()
data.code = http.StatusBadRequest
return
Expand Down
2 changes: 2 additions & 0 deletions service/jobs_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ func NewJobService(cfg JobsConfig, log mlog.LoggerIFace) (JobService, error) {
case JobAPITypeDocker:
cfg.Docker.MaxConcurrentJobs = cfg.MaxConcurrentJobs
cfg.Docker.FailedJobsRetentionTime = time.Duration(cfg.FailedJobsRetentionTime)
cfg.Docker.ImageRegistry = cfg.ImageRegistry
log.Info("creating new job service", mlog.Any("apiType", cfg.APIType), mlog.String("config", fmt.Sprintf("%+v", cfg.Docker)))
return docker.NewJobService(log, cfg.Docker)
case JobAPITypeKubernetes:
cfg.Kubernetes.MaxConcurrentJobs = cfg.MaxConcurrentJobs
cfg.Kubernetes.FailedJobsRetentionTime = time.Duration(cfg.FailedJobsRetentionTime)
cfg.Kubernetes.ImageRegistry = cfg.ImageRegistry
log.Info("creating new job service", mlog.Any("apiType", cfg.APIType), mlog.String("config", fmt.Sprintf("%+v", cfg.Kubernetes)))
return kubernetes.NewJobService(log, cfg.Kubernetes)
default:
Expand Down
3 changes: 2 additions & 1 deletion service/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (r *JobsResourceRequirements) UnmarshalTOML(data interface{}) error {
type JobServiceConfig struct {
MaxConcurrentJobs int
FailedJobsRetentionTime time.Duration
ImageRegistry string
JobsResourceRequirements JobsResourceRequirements `toml:"jobs_resource_requirements"`
}

Expand Down Expand Up @@ -124,7 +125,7 @@ func (s *JobService) Init(_ job.ServiceConfig) error {
}

func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, error) {
if err := cfg.IsValid(); err != nil {
if err := cfg.IsValid(s.cfg.ImageRegistry); err != nil {
return job.Job{}, fmt.Errorf("invalid job config: %w", err)
}

Expand Down

0 comments on commit 79c931a

Please sign in to comment.