Skip to content

Commit

Permalink
update: worker public fields
Browse files Browse the repository at this point in the history
  • Loading branch information
amirhnajafiz committed Nov 16, 2023
1 parent a82d920 commit 0dfa475
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 96 deletions.
164 changes: 82 additions & 82 deletions internal/worker/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ const (

// master manages the workers of each resource
type master struct {
Cfg Config
CephDisable bool
Metrics Metrics
cfg Config
cephDisable bool
metrics Metrics
}

// create a yaml file from our object
Expand All @@ -53,8 +53,8 @@ func (m master) exportYaml(obj runtime.Object, name string, path string) error {
return fmt.Errorf("failed to create local file: %v", err)
}

if !m.CephDisable {
if err := m.Cfg.Storage.Upload(name, path); err != nil {
if !m.cephDisable {
if err := m.cfg.Storage.Upload(name, path); err != nil {
return fmt.Errorf("failed to upload file: %v", err)
}
}
Expand All @@ -67,16 +67,16 @@ func (m master) exportYaml(obj runtime.Object, name string, path string) error {
func (m master) newPodResource() *worker {
wo := newWorker(enum.PodResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.CoreV1().Pods(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.CoreV1().Pods(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v12.Pod)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand All @@ -91,16 +91,16 @@ func (m master) newPodResource() *worker {
func (m master) newDeploymentResource() *worker {
wo := newWorker(enum.DeploymentResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.AppsV1().Deployments(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.AppsV1().Deployments(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v13.Deployment)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand All @@ -115,16 +115,16 @@ func (m master) newDeploymentResource() *worker {
func (m master) newServiceResource() *worker {
wo := newWorker(enum.ServiceResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.CoreV1().Services(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.CoreV1().Services(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v12.Service)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand All @@ -139,16 +139,16 @@ func (m master) newServiceResource() *worker {
func (m master) newCronjobResource() *worker {
wo := newWorker(enum.CronjobResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.BatchV1().CronJobs(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.BatchV1().CronJobs(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v1beta1.CronJob)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand All @@ -163,16 +163,16 @@ func (m master) newCronjobResource() *worker {
func (m master) newConfigmapResource() *worker {
wo := newWorker(enum.ConfigMapResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.CoreV1().ConfigMaps(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.CoreV1().ConfigMaps(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v12.ConfigMap)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand All @@ -187,16 +187,16 @@ func (m master) newConfigmapResource() *worker {
func (m master) newSecretResource() *worker {
wo := newWorker(enum.SecretResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.CoreV1().Secrets(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.CoreV1().Secrets(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v12.Secret)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand All @@ -211,16 +211,16 @@ func (m master) newSecretResource() *worker {
func (m master) newServiceAccountResource() *worker {
wo := newWorker(enum.ServiceAccountResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.CoreV1().ServiceAccounts(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.CoreV1().ServiceAccounts(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v12.ServiceAccount)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand All @@ -235,16 +235,16 @@ func (m master) newServiceAccountResource() *worker {
func (m master) newStatefulResource() *worker {
wo := newWorker(enum.StatefulResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.AppsV1().StatefulSets(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.AppsV1().StatefulSets(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v13.StatefulSet)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand All @@ -259,16 +259,16 @@ func (m master) newStatefulResource() *worker {
func (m master) newHPAResource() *worker {
wo := newWorker(enum.HPAResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.AutoscalingV1().HorizontalPodAutoscalers(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.AutoscalingV1().HorizontalPodAutoscalers(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v14.HorizontalPodAutoscaler)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand All @@ -283,16 +283,16 @@ func (m master) newHPAResource() *worker {
func (m master) newIngressResource() *worker {
wo := newWorker(enum.IngressResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.NetworkingV1().Ingresses(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.NetworkingV1().Ingresses(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v15.Ingress)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand All @@ -307,16 +307,16 @@ func (m master) newIngressResource() *worker {
func (m master) newPVCResource() *worker {
wo := newWorker(enum.PVCResource)

if m.Cfg.Has(wo.Resource) {
wo.Status = enum.PendingStatus
wo.WatcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.Metrics.Observe(wo.GetResource())
if m.cfg.Has(wo.resource) {
wo.status = enum.PendingStatus
wo.watcherFunc = func(options v1.ListOptions) (watch.Interface, error) {
m.metrics.Observe(wo.GetResource())

timeOut := int64(m.Cfg.Timeout)
timeOut := int64(m.cfg.Timeout)

return m.Cfg.Client.CoreV1().PersistentVolumeClaims(m.Cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
return m.cfg.Client.CoreV1().PersistentVolumeClaims(m.cfg.Namespace).Watch(context.Background(), v1.ListOptions{TimeoutSeconds: &timeOut})
}
wo.CallBack = func(event watch.Event) error {
wo.callBack = func(event watch.Event) error {
obj := event.Object.(*v12.PersistentVolumeClaim)
name := obj.GetName()
path := fmt.Sprintf("%s/%s.yaml", LocalDir, name)
Expand Down
28 changes: 14 additions & 14 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ type Worker interface {
func Register(cfg Config, metrics Metrics, cephDisable bool) []Worker {
// create a new master
m := master{
Cfg: cfg,
CephDisable: cephDisable,
Metrics: metrics,
cfg: cfg,
cephDisable: cephDisable,
metrics: metrics,
}

return []Worker{
Expand All @@ -46,25 +46,25 @@ func Register(cfg Config, metrics Metrics, cephDisable bool) []Worker {

// each worker calls a watcher function to monitor resources
type worker struct {
WatcherFunc func(options v1.ListOptions) (watch.Interface, error)
CallBack func(event watch.Event) error
Status enum.Status
Resource enum.Resource
watcherFunc func(options v1.ListOptions) (watch.Interface, error)
callBack func(event watch.Event) error
status enum.Status
resource enum.Resource
}

// Watch a resource
func (w worker) Watch() error {
return func() error {
// disabled worker
if w.Status == enum.DisableStatus {
if w.status == enum.DisableStatus {
return nil
}

watcher, _ := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: w.WatcherFunc})
watcher, _ := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: w.watcherFunc})

for event := range watcher.ResultChan() {
if event.Type == watch.Added {
if err := w.CallBack(event); err != nil {
if err := w.callBack(event); err != nil {
log.Println(err)
}
}
Expand All @@ -76,18 +76,18 @@ func (w worker) Watch() error {

// GetStatus of a worker
func (w worker) GetStatus() string {
return w.Status.ToString()
return w.status.ToString()
}

// GetResource name of a worker
func (w worker) GetResource() string {
return w.Resource.ToString()
return w.resource.ToString()
}

// newWorker returns a raw worker with disabled status
func newWorker(resource enum.Resource) *worker {
return &worker{
Resource: resource,
Status: enum.DisableStatus,
resource: resource,
status: enum.DisableStatus,
}
}

0 comments on commit 0dfa475

Please sign in to comment.