diff --git a/go/master/main.go b/go/master/main.go index 0f07295c0..b6904e050 100644 --- a/go/master/main.go +++ b/go/master/main.go @@ -39,10 +39,10 @@ func main() { // Listen and serve on defined port logger.Infof("The master starts with namespece %s, jobName %s, port %d", namespace, jobName, port) - var k8sClient *kubeutils.K8sClient if k8sScheduling { - k8sClient = kubeutils.NewK8sClient("") + // Use incluster mode without kubeconfig. + kubeutils.NewGlobalK8sClient("", namespace) } - master := master.NewJobMaster(namespace, jobName, k8sClient) + master := master.NewJobMaster(namespace, jobName) master.Run() } diff --git a/go/master/pkg/batchscheduler/elastic.go b/go/master/pkg/batchscheduler/elastic.go index 6101b78e5..fbd6d37d4 100644 --- a/go/master/pkg/batchscheduler/elastic.go +++ b/go/master/pkg/batchscheduler/elastic.go @@ -14,32 +14,31 @@ package batchscheduler import ( - "time" + "context" "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" - logger "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" ) // ElasticScheduler launches pods without waiting for all resouces of pod are ready type ElasticScheduler struct { - k8sClient *kubeutils.K8sClient - toCreatePods *common.Queue + KubeScheduler + SchedulerName string } // NewElasticScheduler creates an elastic scheduler. -func NewElasticScheduler(k8sClient *kubeutils.K8sClient) *ElasticScheduler { +func NewElasticScheduler() *ElasticScheduler { return &ElasticScheduler{ - k8sClient: k8sClient, - toCreatePods: common.NewQueue(), + KubeScheduler: KubeScheduler{ + toCreatePods: common.NewQueue(), + }, + SchedulerName: "elastic", } } // Start starts a routine to launch Pods. -func (scheduler *ElasticScheduler) Start(jobContext *common.JobContext) { - go scheduler.createPodLoop(jobContext.NameSpace) +func (scheduler *ElasticScheduler) Start(ctx context.Context, jobContext *common.JobContext) { + go scheduler.LoopToLaunchPods(ctx) } // DoScheduling creates/updates/deletes pods @@ -52,7 +51,6 @@ func (scheduler *ElasticScheduler) DoScheduling(jobContext *common.JobContext, p Number: spec.Replicas, Rank: i, } - podConfig := &kubeutils.PodConfig{ Replica: replicaConfig, TemplateSpec: spec.Template.DeepCopy(), @@ -62,24 +60,3 @@ func (scheduler *ElasticScheduler) DoScheduling(jobContext *common.JobContext, p } } } - -func (scheduler *ElasticScheduler) createPodLoop(namespace string) { - for { - for scheduler.toCreatePods.Len() > 0 { - pod := scheduler.toCreatePods.PopFront().(*corev1.Pod) - err := scheduler.k8sClient.CreatePod(namespace, pod) - if errors.IsAlreadyExists(err) { - logger.Warnf("The pod %s already exists.", pod.ObjectMeta.Name) - } else if errors.IsTooManyRequests(err) || errors.IsTimeout(err) || errors.IsServerTimeout(err) { - logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err) - // Retry to create pod due to timeout. - scheduler.toCreatePods.PushFront(pod) - time.Sleep(5 * time.Second) - } else { - logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err) - panic(err.Error()) - } - } - time.Sleep(1 * time.Second) - } -} diff --git a/go/master/pkg/batchscheduler/elastic_internal_test.go b/go/master/pkg/batchscheduler/elastic_internal_test.go index fe66f4ccb..6c8202bd8 100644 --- a/go/master/pkg/batchscheduler/elastic_internal_test.go +++ b/go/master/pkg/batchscheduler/elastic_internal_test.go @@ -50,7 +50,7 @@ var _ = Describe("Elastic", func() { }, } schedulingPlan := &SchedulingPlan{ReplicaSpecs: replicas} - scheduler := NewElasticScheduler(nil) + scheduler := NewElasticScheduler() scheduler.DoScheduling(jobContext, schedulingPlan) Expect(scheduler.toCreatePods.Len()).To(Equal(3)) for i := 0; i < 3; i++ { diff --git a/go/master/pkg/batchscheduler/scheduler.go b/go/master/pkg/batchscheduler/scheduler.go index cc4a9cf1b..6d26f5d8f 100644 --- a/go/master/pkg/batchscheduler/scheduler.go +++ b/go/master/pkg/batchscheduler/scheduler.go @@ -14,14 +14,22 @@ package batchscheduler import ( + "context" + "time" + elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" + logger "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" ) // BatchScheduler creates/updates/deletes the batch pods of an elastic job. type BatchScheduler interface { - DoScheduling(*SchedulingPlan) + Start(ctx context.Context, jobContext *common.JobContext) + DoScheduling(jobContext *common.JobContext, plan *SchedulingPlan) } // SchedulingPlan is the scheduling plan to notify the scheduler CURD pods. @@ -38,3 +46,44 @@ type SchedulingPlan struct { // OwnerJob specifies a job to scale. OwnerJob *elasticjob.ElasticJob } + +// NewBatchScheduler creates a batch scheduler according to the scheduler name. +func NewBatchScheduler(schedulerName string) BatchScheduler { + if schedulerName == "elastic" || schedulerName == "" { + scheduler := NewElasticScheduler() + return scheduler + } + return nil +} + +// KubeScheduler is the base scheduler to create/update/remove pods. +type KubeScheduler struct { + toCreatePods *common.Queue +} + +// LoopToLaunchPods launches pods from the pod queue. +func (scheduler *KubeScheduler) LoopToLaunchPods(ctx context.Context) { + for { + select { + case <-ctx.Done(): + logger.Infof("The loop to launch Pod exists.") + default: + for scheduler.toCreatePods.Len() > 0 { + pod := scheduler.toCreatePods.PopFront().(*corev1.Pod) + err := kubeutils.GlobalK8sClient.CreatePod(ctx, pod) + if errors.IsAlreadyExists(err) { + logger.Warnf("The pod %s already exists.", pod.ObjectMeta.Name) + } else if errors.IsTooManyRequests(err) || errors.IsTimeout(err) || errors.IsServerTimeout(err) { + logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err) + // Retry to create pod due to timeout. + scheduler.toCreatePods.PushFront(pod) + time.Sleep(5 * time.Second) + } else { + logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err) + panic(err.Error()) + } + } + } + time.Sleep(1 * time.Second) + } +} diff --git a/go/master/pkg/common/context.go b/go/master/pkg/common/context.go index 80a083422..dfb61ffbd 100644 --- a/go/master/pkg/common/context.go +++ b/go/master/pkg/common/context.go @@ -13,6 +13,16 @@ package common +import ( + "fmt" + "net" + "os" + "strconv" + "time" +) + +const masterServicePort = 215000 + // JobContext stores the elastic job context. type JobContext struct { // Namespace is the kubernetes namespace where the job runs. @@ -22,5 +32,55 @@ type JobContext struct { // MasterHost is the host of master service. MasterHost string // MasterPort is the host of master port. - MasterPort int32 + MasterPort int +} + +// NewJobContext creates a job context. +func NewJobContext(namespace string, name string) *JobContext { + host := fmt.Sprintf("elasticjob-%s-dlrover-master", name) + port := masterServicePort + + if !checkAddressReachable(host, port) { + host = os.Getenv("MY_POD_IP") + freePort, err := getFreePort() + if err != nil { + panic(err.Error()) + } + port = freePort + } + + return &JobContext{ + NameSpace: namespace, + Name: name, + MasterHost: host, + MasterPort: port, + } +} + +func checkAddressReachable(host string, port int) bool { + timeout := time.Second + masterAddr := net.JoinHostPort(host, strconv.Itoa(port)) + conn, err := net.DialTimeout("tcp", masterAddr, timeout) + if err != nil { + return false + } + if conn != nil { + defer conn.Close() + } + return true +} + +func getFreePort() (int, error) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return 0, err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return 0, err + } + + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil } diff --git a/go/master/pkg/common/context_internal_test.go b/go/master/pkg/common/context_internal_test.go new file mode 100644 index 000000000..7671fe4ca --- /dev/null +++ b/go/master/pkg/common/context_internal_test.go @@ -0,0 +1,31 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Context", func() { + It("New Job Context", func() { + os.Setenv("MY_POD_IP", "127.0.0.1") + jobContext := NewJobContext("dlrover", "train-demo") + Expect(jobContext.MasterHost).To(Equal("127.0.0.1")) + Expect(jobContext.MasterPort > 0).To(BeTrue()) + }) + +}) diff --git a/go/master/pkg/jobmanager/manager.go b/go/master/pkg/jobmanager/manager.go index 86a61d2a3..d5569db28 100644 --- a/go/master/pkg/jobmanager/manager.go +++ b/go/master/pkg/jobmanager/manager.go @@ -13,6 +13,22 @@ package jobmanager +import ( + "context" + + elasticjobv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" +) + // JobManager is the interface to manager job lifecycle. type JobManager interface { + Start(ctx context.Context, jobContext *common.JobContext) +} + +// NewJobManager creates a job manager. +func NewJobManager(elasticJob *elasticjobv1.ElasticJob) JobManager { + if elasticJob.Spec.DistributionStrategy == "pytorch" { + return NewPyTorchJobManager(elasticJob) + } + return nil } diff --git a/go/master/pkg/jobmanager/pytorch.go b/go/master/pkg/jobmanager/pytorch.go index 69e790164..240bc7432 100644 --- a/go/master/pkg/jobmanager/pytorch.go +++ b/go/master/pkg/jobmanager/pytorch.go @@ -12,3 +12,35 @@ // limitations under the License. package jobmanager + +import ( + "context" + + elasticjobv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/batchscheduler" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" +) + +// PyTorchJobManager is the lifecycle manager of a PyTorch distributed training job. +type PyTorchJobManager struct { + replicaSchedulers map[string]batchscheduler.BatchScheduler +} + +// NewPyTorchJobManager creates PyTorch distributed training job manager. +func NewPyTorchJobManager(elasticJob *elasticjobv1.ElasticJob) *PyTorchJobManager { + schedulers := make(map[string]batchscheduler.BatchScheduler) + for replicaType, spec := range elasticJob.Spec.ReplicaSpecs { + scheduler := batchscheduler.NewBatchScheduler(spec.BatchScheduler) + schedulers[string(replicaType)] = scheduler + } + return &PyTorchJobManager{ + replicaSchedulers: schedulers, + } +} + +// Start starts the modules of the job manager. +func (jobManager *PyTorchJobManager) Start(ctx context.Context, jobContext *common.JobContext) { + for _, scheduler := range jobManager.replicaSchedulers { + scheduler.Start(ctx, jobContext) + } +} diff --git a/go/master/pkg/kubeutils/client.go b/go/master/pkg/kubeutils/client.go index a5195474d..db64d2681 100644 --- a/go/master/pkg/kubeutils/client.go +++ b/go/master/pkg/kubeutils/client.go @@ -27,8 +27,12 @@ import ( "k8s.io/client-go/tools/clientcmd" ) +// GlobalK8sClient is the global client to access a k8s cluster. +var GlobalK8sClient *K8sClient + // K8sClient contains the instance to access a k8s cluster. type K8sClient struct { + namespace string config *rest.Config clientset *k8sApi.Clientset dynamicClient *dynamic.DynamicClient @@ -39,9 +43,16 @@ func GetGroupVersionResource(group, version, resource string) schema.GroupVersio return schema.GroupVersionResource{Group: group, Version: version, Resource: resource} } +// NewGlobalK8sClient initialize the global k8s client. +func NewGlobalK8sClient(kubeConfigPath string, namespace string) { + GlobalK8sClient = NewK8sClient(kubeConfigPath, namespace) +} + // NewK8sClient creates a k8s client instance. -func NewK8sClient(kubeConfigPath string) *K8sClient { - client := &K8sClient{} +func NewK8sClient(kubeConfigPath string, namespace string) *K8sClient { + client := &K8sClient{ + namespace: namespace, + } // creates the in-cluster config if kubeConfigPath == "" { @@ -74,14 +85,13 @@ func NewK8sClient(kubeConfigPath string) *K8sClient { } // GetCustomResourceInstance gets a custom resource instance from a k8s cluster. -func (client *K8sClient) GetCustomResourceInstance( - namespace string, name string, gvr schema.GroupVersionResource) ( +func (client *K8sClient) GetCustomResourceInstance(name string, gvr schema.GroupVersionResource) ( *unstructured.Unstructured, error, ) { // Unstructured utd, err := client.dynamicClient. Resource(gvr). - Namespace(namespace). + Namespace(client.namespace). Get(context.Background(), name, metav1.GetOptions{}) if err != nil { logger.Infof("fail to get %s %s", gvr.String(), name) @@ -90,12 +100,10 @@ func (client *K8sClient) GetCustomResourceInstance( } // CreatePod creates a Pod instance in the cluster -func (client *K8sClient) CreatePod(namespace string, pod *corev1.Pod) error { - _, err := client.clientset.CoreV1().Pods(namespace).Create( - context.Background(), pod, metav1.CreateOptions{}, - ) - if err != nil { - logger.Infof("fail to create a pod : %s", pod.ObjectMeta.Name) - } +func (client *K8sClient) CreatePod(ctx context.Context, pod *corev1.Pod) error { + _, err := client.clientset. + CoreV1(). + Pods(client.namespace). + Create(ctx, pod, metav1.CreateOptions{}) return err } diff --git a/go/master/pkg/kubeutils/elasticjob.go b/go/master/pkg/kubeutils/elasticjob.go index bff45b237..1539834d1 100644 --- a/go/master/pkg/kubeutils/elasticjob.go +++ b/go/master/pkg/kubeutils/elasticjob.go @@ -27,10 +27,9 @@ const ( ) // GetElasticJobInstance gets an elasticjob instance. -func GetElasticJobInstance(client *K8sClient, namespace string, jobName string) *elasticjob.ElasticJob { - +func GetElasticJobInstance(jobName string) *elasticjob.ElasticJob { gvr := GetGroupVersionResource(GROUP, VERSION, "elasticjobs") - utd, err := client.GetCustomResourceInstance(namespace, jobName, gvr) + utd, err := GlobalK8sClient.GetCustomResourceInstance(jobName, gvr) if err != nil { return nil } diff --git a/go/master/pkg/kubeutils/elasticjob_internal_test.go b/go/master/pkg/kubeutils/elasticjob_internal_test.go index 45a4ce06b..4da2b5d09 100644 --- a/go/master/pkg/kubeutils/elasticjob_internal_test.go +++ b/go/master/pkg/kubeutils/elasticjob_internal_test.go @@ -24,8 +24,8 @@ var _ = Describe("Elasticjob", func() { It("Get an elasticjob instance", func() { kubeConfigPath := os.Getenv("KUBENETES_CLUSTER_CONFIG") if kubeConfigPath != "" { - k8sClient := NewK8sClient(kubeConfigPath) - job := GetElasticJobInstance(k8sClient, "dlrover", "torch-mnist") + NewGlobalK8sClient(kubeConfigPath, "dlrover") + job := GetElasticJobInstance("torch-mnist") Expect(job.Name).To(Equal("torch-minst")) } }) diff --git a/go/master/pkg/kubeutils/pod.go b/go/master/pkg/kubeutils/pod.go index 4fcbe2529..b307ee3cf 100644 --- a/go/master/pkg/kubeutils/pod.go +++ b/go/master/pkg/kubeutils/pod.go @@ -86,7 +86,7 @@ func BuildPod(jobContext *common.JobContext, podConfig *PodConfig) *corev1.Pod { return pod } -func insertJobMasterAddrEnv(container *corev1.Container, host string, port int32) { +func insertJobMasterAddrEnv(container *corev1.Container, host string, port int) { jobMasterServiceEnv := corev1.EnvVar{ Name: envMasterAddr, Value: fmt.Sprintf("%s:%d", host, port), diff --git a/go/master/pkg/kubeutils/pod_internal_test.go b/go/master/pkg/kubeutils/pod_internal_test.go index 181a4e854..9cc5e8e4e 100644 --- a/go/master/pkg/kubeutils/pod_internal_test.go +++ b/go/master/pkg/kubeutils/pod_internal_test.go @@ -14,6 +14,7 @@ package kubeutils import ( + "context" "errors" "fmt" "os" @@ -68,17 +69,17 @@ var _ = Describe("Pod", func() { Skip(fmt.Sprintf("The config file %s is not exist.", configPath)) } - k8sClient := NewK8sClient(configPath) + k8sClient := NewK8sClient(configPath, "dlrover") pod.ObjectMeta.Namespace = "no-namspace" - err := k8sClient.CreatePod("dlrover", pod) + err := k8sClient.CreatePod(context.Background(), pod) Expect(kubeerrors.IsBadRequest(err)).To(BeTrue()) pod.ObjectMeta.Namespace = "dlrover" - err = k8sClient.CreatePod("dlrover", pod) + err = k8sClient.CreatePod(context.Background(), pod) Expect(kubeerrors.IsAlreadyExists(err)).To(BeTrue()) pod.ObjectMeta.Name = "" - err = k8sClient.CreatePod("dlrover", pod) + err = k8sClient.CreatePod(context.Background(), pod) Expect(kubeerrors.IsInvalid(err)).To(BeTrue()) }) }) diff --git a/go/master/pkg/master.go b/go/master/pkg/master.go index 1b13fceae..651026ed0 100644 --- a/go/master/pkg/master.go +++ b/go/master/pkg/master.go @@ -14,37 +14,39 @@ package master import ( + "context" "time" - elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/jobmanager" "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" logger "github.com/sirupsen/logrus" ) // JobMaster is the master of an elasticjob. type JobMaster struct { - Namespace string - JobName string - K8sClient *kubeutils.K8sClient - Job *elasticjob.ElasticJob + jobContext *common.JobContext + jobManager jobmanager.JobManager } // NewJobMaster creates the master for an elasticjob. -func NewJobMaster(namespace string, jobName string, k8sClient *kubeutils.K8sClient) *JobMaster { - master := &JobMaster{ - Namespace: namespace, - JobName: jobName, - } - if k8sClient != nil { - job := kubeutils.GetElasticJobInstance(k8sClient, namespace, jobName) - master.K8sClient = k8sClient - master.Job = job +func NewJobMaster(namespace string, jobName string) *JobMaster { + master := &JobMaster{} + if kubeutils.GlobalK8sClient != nil { + elasticjob := kubeutils.GetElasticJobInstance(jobName) + master.jobManager = jobmanager.NewJobManager(elasticjob) } + master.jobContext = common.NewJobContext(namespace, jobName) logger.Infof("create a master of job %s.", jobName) return master } // Run starts the master instance. func (master *JobMaster) Run() { + ctx, cancel := context.WithCancel(context.Background()) + if master.jobManager != nil { + master.jobManager.Start(ctx, master.jobContext) + } + defer cancel() time.Sleep(10 * time.Hour) } diff --git a/go/master/pkg/master_internal_test.go b/go/master/pkg/master_internal_test.go index d651f513a..573ddc91d 100644 --- a/go/master/pkg/master_internal_test.go +++ b/go/master/pkg/master_internal_test.go @@ -20,8 +20,8 @@ import ( var _ = Describe("Master", func() { It("Create a master", func() { - master := NewJobMaster("dlrover", "test-master", nil) - Expect(master.Namespace).To(Equal("dlrover")) - Expect(master.JobName).To(Equal("test-master")) + master := NewJobMaster("dlrover", "test-master") + Expect(master.jobContext.NameSpace).To(Equal("dlrover")) + Expect(master.jobContext.Name).To(Equal("test-master")) }) })