From bd7dad18a4238c92229e8abf748fd6206ad4e6a2 Mon Sep 17 00:00:00 2001 From: warjiang <1096409085@qq.com> Date: Mon, 9 Sep 2024 00:00:06 +0800 Subject: [PATCH 1/2] feat: add resource management for cronjob Signed-off-by: warjiang <1096409085@qq.com> --- pkg/resource/common/resourcechannels.go | 25 ++++ pkg/resource/cronjob/common.go | 69 +++++++++++ pkg/resource/cronjob/detail.go | 41 +++++++ pkg/resource/cronjob/events.go | 21 ++++ pkg/resource/cronjob/jobs.go | 147 ++++++++++++++++++++++++ pkg/resource/cronjob/list.go | 97 ++++++++++++++++ 6 files changed, 400 insertions(+) create mode 100644 pkg/resource/cronjob/common.go create mode 100644 pkg/resource/cronjob/detail.go create mode 100644 pkg/resource/cronjob/events.go create mode 100644 pkg/resource/cronjob/jobs.go create mode 100644 pkg/resource/cronjob/list.go diff --git a/pkg/resource/common/resourcechannels.go b/pkg/resource/common/resourcechannels.go index a726b98..1544a46 100644 --- a/pkg/resource/common/resourcechannels.go +++ b/pkg/resource/common/resourcechannels.go @@ -260,6 +260,31 @@ type CronJobListChannel struct { Error chan error } +// GetCronJobListChannel returns a pair of channels to a Cron Job list and errors that both must be read numReads times. +func GetCronJobListChannel(client client.Interface, nsQuery *NamespaceQuery, numReads int) CronJobListChannel { + channel := CronJobListChannel{ + List: make(chan *batch.CronJobList, numReads), + Error: make(chan error, numReads), + } + + go func() { + list, err := client.BatchV1().CronJobs(nsQuery.ToRequestParam()).List(context.TODO(), helpers.ListEverything) + var filteredItems []batch.CronJob + for _, item := range list.Items { + if nsQuery.Matches(item.ObjectMeta.Namespace) { + filteredItems = append(filteredItems, item) + } + } + list.Items = filteredItems + for i := 0; i < numReads; i++ { + channel.List <- list + channel.Error <- err + } + }() + + return channel +} + // ServiceListChannel is a list and error channels to Services. type ServiceListChannel struct { List chan *v1.ServiceList diff --git a/pkg/resource/cronjob/common.go b/pkg/resource/cronjob/common.go new file mode 100644 index 0000000..455d35c --- /dev/null +++ b/pkg/resource/cronjob/common.go @@ -0,0 +1,69 @@ +package cronjob + +import ( + "github.com/karmada-io/dashboard/pkg/dataselect" + "github.com/karmada-io/dashboard/pkg/resource/common" + batch "k8s.io/api/batch/v1" +) + +// The code below allows to perform complex data section on []batch.CronJob + +type CronJobCell batch.CronJob + +func (self CronJobCell) GetProperty(name dataselect.PropertyName) dataselect.ComparableValue { + switch name { + case dataselect.NameProperty: + return dataselect.StdComparableString(self.ObjectMeta.Name) + case dataselect.CreationTimestampProperty: + return dataselect.StdComparableTime(self.ObjectMeta.CreationTimestamp.Time) + case dataselect.NamespaceProperty: + return dataselect.StdComparableString(self.ObjectMeta.Namespace) + default: + // if name is not supported then just return a constant dummy value, sort will have no effect. + return nil + } +} + +func ToCells(std []batch.CronJob) []dataselect.DataCell { + cells := make([]dataselect.DataCell, len(std)) + for i := range std { + cells[i] = CronJobCell(std[i]) + } + return cells +} + +func FromCells(cells []dataselect.DataCell) []batch.CronJob { + std := make([]batch.CronJob, len(cells)) + for i := range std { + std[i] = batch.CronJob(cells[i].(CronJobCell)) + } + return std +} + +func getStatus(list *batch.CronJobList) common.ResourceStatus { + info := common.ResourceStatus{} + if list == nil { + return info + } + + for _, cronJob := range list.Items { + if cronJob.Spec.Suspend != nil && !(*cronJob.Spec.Suspend) { + info.Running++ + } else { + info.Failed++ + } + } + + return info +} + +func getContainerImages(cronJob *batch.CronJob) []string { + podSpec := cronJob.Spec.JobTemplate.Spec.Template.Spec + result := make([]string, 0) + + for _, container := range podSpec.Containers { + result = append(result, container.Image) + } + + return result +} diff --git a/pkg/resource/cronjob/detail.go b/pkg/resource/cronjob/detail.go new file mode 100644 index 0000000..17b76bd --- /dev/null +++ b/pkg/resource/cronjob/detail.go @@ -0,0 +1,41 @@ +package cronjob + +import ( + "context" + + batch "k8s.io/api/batch/v1" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sClient "k8s.io/client-go/kubernetes" +) + +// CronJobDetail contains Cron Job details. +type CronJobDetail struct { + // Extends list item structure. + CronJob `json:",inline"` + + ConcurrencyPolicy string `json:"concurrencyPolicy"` + StartingDeadLineSeconds *int64 `json:"startingDeadlineSeconds"` + + // List of non-critical errors, that occurred during resource retrieval. + Errors []error `json:"errors"` +} + +// GetCronJobDetail gets Cron Job details. +func GetCronJobDetail(client k8sClient.Interface, namespace, name string) (*CronJobDetail, error) { + + rawObject, err := client.BatchV1().CronJobs(namespace).Get(context.TODO(), name, metaV1.GetOptions{}) + if err != nil { + return nil, err + } + + cj := toCronJobDetail(rawObject) + return &cj, nil +} + +func toCronJobDetail(cj *batch.CronJob) CronJobDetail { + return CronJobDetail{ + CronJob: toCronJob(cj), + ConcurrencyPolicy: string(cj.Spec.ConcurrencyPolicy), + StartingDeadLineSeconds: cj.Spec.StartingDeadlineSeconds, + } +} diff --git a/pkg/resource/cronjob/events.go b/pkg/resource/cronjob/events.go new file mode 100644 index 0000000..1c56578 --- /dev/null +++ b/pkg/resource/cronjob/events.go @@ -0,0 +1,21 @@ +package cronjob + +import ( + "github.com/karmada-io/dashboard/pkg/dataselect" + "github.com/karmada-io/dashboard/pkg/resource/common" + "github.com/karmada-io/dashboard/pkg/resource/event" + client "k8s.io/client-go/kubernetes" +) + +// GetCronJobEvents gets events associated to cron job. +func GetCronJobEvents(client client.Interface, dsQuery *dataselect.DataSelectQuery, namespace, name string) ( + *common.EventList, error) { + + raw, err := event.GetEvents(client, namespace, name) + if err != nil { + return event.EmptyEventList, err + } + + events := event.CreateEventList(raw, dsQuery) + return &events, nil +} diff --git a/pkg/resource/cronjob/jobs.go b/pkg/resource/cronjob/jobs.go new file mode 100644 index 0000000..00ce128 --- /dev/null +++ b/pkg/resource/cronjob/jobs.go @@ -0,0 +1,147 @@ +package cronjob + +import ( + "context" + "github.com/karmada-io/dashboard/pkg/common/errors" + "github.com/karmada-io/dashboard/pkg/common/types" + "github.com/karmada-io/dashboard/pkg/dataselect" + "github.com/karmada-io/dashboard/pkg/resource/common" + "github.com/karmada-io/dashboard/pkg/resource/job" + + batch "k8s.io/api/batch/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + apimachinery "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" + client "k8s.io/client-go/kubernetes" +) + +const ( + CronJobAPIVersion = "v1" + CronJobKindName = "cronjob" +) + +var emptyJobList = &job.JobList{ + Jobs: make([]job.Job, 0), + Errors: make([]error, 0), + ListMeta: types.ListMeta{ + TotalItems: 0, + }, +} + +// GetCronJobJobs returns list of jobs owned by cron job. +func GetCronJobJobs(client client.Interface, + dsQuery *dataselect.DataSelectQuery, namespace, name string, active bool) (*job.JobList, error) { + + cronJob, err := client.BatchV1().CronJobs(namespace).Get(context.TODO(), name, meta.GetOptions{}) + if err != nil { + return emptyJobList, err + } + + channels := &common.ResourceChannels{ + JobList: common.GetJobListChannel(client, common.NewSameNamespaceQuery(namespace), 1), + PodList: common.GetPodListChannel(client, common.NewSameNamespaceQuery(namespace), 1), + EventList: common.GetEventListChannel(client, common.NewSameNamespaceQuery(namespace), 1), + } + + jobs := <-channels.JobList.List + err = <-channels.JobList.Error + nonCriticalErrors, criticalError := errors.ExtractErrors(err) + if criticalError != nil { + return emptyJobList, nil + } + + pods := <-channels.PodList.List + err = <-channels.PodList.Error + nonCriticalErrors, criticalError = errors.AppendError(err, nonCriticalErrors) + if criticalError != nil { + return emptyJobList, criticalError + } + + events := <-channels.EventList.List + err = <-channels.EventList.Error + nonCriticalErrors, criticalError = errors.AppendError(err, nonCriticalErrors) + if criticalError != nil { + return emptyJobList, criticalError + } + + jobs.Items = filterJobsByOwnerUID(cronJob.UID, jobs.Items) + jobs.Items = filterJobsByState(active, jobs.Items) + + return job.ToJobList(jobs.Items, pods.Items, events.Items, nonCriticalErrors, dsQuery), nil +} + +// TriggerCronJob manually triggers a cron job and creates a new job. +func TriggerCronJob(client client.Interface, + namespace, name string) error { + + cronJob, err := client.BatchV1().CronJobs(namespace).Get(context.TODO(), name, meta.GetOptions{}) + + if err != nil { + return err + } + + annotations := make(map[string]string) + annotations["cronjob.kubernetes.io/instantiate"] = "manual" + + labels := make(map[string]string) + for k, v := range cronJob.Spec.JobTemplate.Labels { + labels[k] = v + } + + //job name cannot exceed DNS1053LabelMaxLength (52 characters) + var newJobName string + if len(cronJob.Name) < 42 { + newJobName = cronJob.Name + "-manual-" + rand.String(3) + } else { + newJobName = cronJob.Name[0:41] + "-manual-" + rand.String(3) + } + + jobToCreate := &batch.Job{ + ObjectMeta: meta.ObjectMeta{ + Name: newJobName, + Namespace: namespace, + Annotations: annotations, + Labels: labels, + OwnerReferences: []meta.OwnerReference{{ + APIVersion: CronJobAPIVersion, + Kind: CronJobKindName, + Name: cronJob.Name, + UID: cronJob.UID, + }}, + }, + Spec: cronJob.Spec.JobTemplate.Spec, + } + + _, err = client.BatchV1().Jobs(namespace).Create(context.TODO(), jobToCreate, meta.CreateOptions{}) + + if err != nil { + return err + } + + return nil +} + +func filterJobsByOwnerUID(UID apimachinery.UID, jobs []batch.Job) (matchingJobs []batch.Job) { + for _, j := range jobs { + for _, i := range j.OwnerReferences { + if i.UID == UID { + matchingJobs = append(matchingJobs, j) + break + } + } + } + return +} + +func filterJobsByState(active bool, jobs []batch.Job) (matchingJobs []batch.Job) { + for _, j := range jobs { + if active && j.Status.Active > 0 { + matchingJobs = append(matchingJobs, j) + } else if !active && j.Status.Active == 0 { + matchingJobs = append(matchingJobs, j) + } else { + //sup + } + } + return +} diff --git a/pkg/resource/cronjob/list.go b/pkg/resource/cronjob/list.go new file mode 100644 index 0000000..68a7eae --- /dev/null +++ b/pkg/resource/cronjob/list.go @@ -0,0 +1,97 @@ +package cronjob + +import ( + "github.com/karmada-io/dashboard/pkg/common/errors" + "github.com/karmada-io/dashboard/pkg/common/types" + "github.com/karmada-io/dashboard/pkg/dataselect" + "github.com/karmada-io/dashboard/pkg/resource/common" + "log" + + batch "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + client "k8s.io/client-go/kubernetes" +) + +// CronJobList contains a list of CronJobs in the cluster. +type CronJobList struct { + ListMeta types.ListMeta `json:"listMeta"` + Items []CronJob `json:"items"` + + // Basic information about resources status on the list. + Status common.ResourceStatus `json:"status"` + + // List of non-critical errors, that occurred during resource retrieval. + Errors []error `json:"errors"` +} + +// CronJob is a presentation layer view of Kubernetes Cron Job resource. +type CronJob struct { + ObjectMeta types.ObjectMeta `json:"objectMeta"` + TypeMeta types.TypeMeta `json:"typeMeta"` + Schedule string `json:"schedule"` + Suspend *bool `json:"suspend"` + Active int `json:"active"` + LastSchedule *metav1.Time `json:"lastSchedule"` + + // ContainerImages holds a list of the CronJob images. + ContainerImages []string `json:"containerImages"` +} + +// GetCronJobList returns a list of all CronJobs in the cluster. +func GetCronJobList(client client.Interface, nsQuery *common.NamespaceQuery, + dsQuery *dataselect.DataSelectQuery) (*CronJobList, error) { + log.Print("Getting list of all cron jobs in the cluster") + + channels := &common.ResourceChannels{ + CronJobList: common.GetCronJobListChannel(client, nsQuery, 1), + } + + return GetCronJobListFromChannels(channels, dsQuery) +} + +// GetCronJobListFromChannels returns a list of all CronJobs in the cluster reading required resource +// list once from the channels. +func GetCronJobListFromChannels(channels *common.ResourceChannels, dsQuery *dataselect.DataSelectQuery) (*CronJobList, error) { + + cronJobs := <-channels.CronJobList.List + err := <-channels.CronJobList.Error + nonCriticalErrors, criticalError := errors.ExtractErrors(err) + if criticalError != nil { + return nil, criticalError + } + + cronJobList := toCronJobList(cronJobs.Items, nonCriticalErrors, dsQuery) + cronJobList.Status = getStatus(cronJobs) + return cronJobList, nil +} + +func toCronJobList(cronJobs []batch.CronJob, nonCriticalErrors []error, dsQuery *dataselect.DataSelectQuery) *CronJobList { + + list := &CronJobList{ + Items: make([]CronJob, 0), + ListMeta: types.ListMeta{TotalItems: len(cronJobs)}, + Errors: nonCriticalErrors, + } + + cronJobCells, filteredTotal := dataselect.GenericDataSelectWithFilter(ToCells(cronJobs), dsQuery) + cronJobs = FromCells(cronJobCells) + list.ListMeta = types.ListMeta{TotalItems: filteredTotal} + + for _, cronJob := range cronJobs { + list.Items = append(list.Items, toCronJob(&cronJob)) + } + + return list +} + +func toCronJob(cj *batch.CronJob) CronJob { + return CronJob{ + ObjectMeta: types.NewObjectMeta(cj.ObjectMeta), + TypeMeta: types.NewTypeMeta(types.ResourceKindCronJob), + Schedule: cj.Spec.Schedule, + Suspend: cj.Spec.Suspend, + Active: len(cj.Status.Active), + LastSchedule: cj.Status.LastScheduleTime, + ContainerImages: getContainerImages(cj), + } +} From 090f4cf03c38d727f8f3f5f88fb57de07b0ccb04 Mon Sep 17 00:00:00 2001 From: warjiang <1096409085@qq.com> Date: Mon, 9 Sep 2024 00:00:42 +0800 Subject: [PATCH 2/2] feat: add api for cronjob Signed-off-by: warjiang <1096409085@qq.com> --- cmd/api/app/api.go | 1 + cmd/api/app/routes/cronjob/handler.go | 54 +++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 cmd/api/app/routes/cronjob/handler.go diff --git a/cmd/api/app/api.go b/cmd/api/app/api.go index 04d26bf..3ea3df9 100644 --- a/cmd/api/app/api.go +++ b/cmd/api/app/api.go @@ -19,6 +19,7 @@ import ( _ "github.com/karmada-io/dashboard/cmd/api/app/routes/cluster" _ "github.com/karmada-io/dashboard/cmd/api/app/routes/config" _ "github.com/karmada-io/dashboard/cmd/api/app/routes/configmap" + _ "github.com/karmada-io/dashboard/cmd/api/app/routes/cronjob" _ "github.com/karmada-io/dashboard/cmd/api/app/routes/daemonset" _ "github.com/karmada-io/dashboard/cmd/api/app/routes/deployment" _ "github.com/karmada-io/dashboard/cmd/api/app/routes/job" diff --git a/cmd/api/app/routes/cronjob/handler.go b/cmd/api/app/routes/cronjob/handler.go new file mode 100644 index 0000000..c519d8b --- /dev/null +++ b/cmd/api/app/routes/cronjob/handler.go @@ -0,0 +1,54 @@ +package deployment + +import ( + "github.com/gin-gonic/gin" + "github.com/karmada-io/dashboard/cmd/api/app/router" + "github.com/karmada-io/dashboard/cmd/api/app/types/common" + "github.com/karmada-io/dashboard/pkg/client" + "github.com/karmada-io/dashboard/pkg/resource/cronjob" + "github.com/karmada-io/dashboard/pkg/resource/event" +) + +func handleGetCronJob(c *gin.Context) { + namespace := common.ParseNamespacePathParameter(c) + dataSelect := common.ParseDataSelectPathParameter(c) + k8sClient := client.InClusterClientForKarmadaApiServer() + result, err := cronjob.GetCronJobList(k8sClient, namespace, dataSelect) + if err != nil { + common.Fail(c, err) + return + } + common.Success(c, result) +} + +func handleGetCronJobDetail(c *gin.Context) { + namespace := c.Param("namespace") + name := c.Param("statefulset") + k8sClient := client.InClusterClientForKarmadaApiServer() + result, err := cronjob.GetCronJobDetail(k8sClient, namespace, name) + if err != nil { + common.Fail(c, err) + return + } + common.Success(c, result) +} + +func handleGetCronJobEvents(c *gin.Context) { + namespace := c.Param("namespace") + name := c.Param("statefulset") + k8sClient := client.InClusterClientForKarmadaApiServer() + dataSelect := common.ParseDataSelectPathParameter(c) + result, err := event.GetResourceEvents(k8sClient, dataSelect, namespace, name) + if err != nil { + common.Fail(c, err) + return + } + common.Success(c, result) +} +func init() { + r := router.V1() + r.GET("/cronjob", handleGetCronJob) + r.GET("/cronjob/:namespace", handleGetCronJob) + r.GET("/cronjob/:namespace/:statefulset", handleGetCronJobDetail) + r.GET("/cronjob/:namespace/:statefulset/event", handleGetCronJobEvents) +}