Skip to content

Commit

Permalink
Implement PyTorch job manager with job context. (#1458)
Browse files Browse the repository at this point in the history
  • Loading branch information
workingloong authored Feb 2, 2025
1 parent 190984c commit 6740f60
Show file tree
Hide file tree
Showing 15 changed files with 253 additions and 78 deletions.
6 changes: 3 additions & 3 deletions go/master/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ func main() {

// Listen and serve on defined port
logger.Infof("The master starts with namespece %s, jobName %s, port %d", namespace, jobName, port)
var k8sClient *kubeutils.K8sClient
if k8sScheduling {
k8sClient = kubeutils.NewK8sClient("")
// Use incluster mode without kubeconfig.
kubeutils.NewGlobalK8sClient("", namespace)
}
master := master.NewJobMaster(namespace, jobName, k8sClient)
master := master.NewJobMaster(namespace, jobName)
master.Run()
}
43 changes: 10 additions & 33 deletions go/master/pkg/batchscheduler/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,31 @@
package batchscheduler

import (
"time"
"context"

"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils"
logger "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
)

// ElasticScheduler launches pods without waiting for all resouces of pod are ready
type ElasticScheduler struct {
k8sClient *kubeutils.K8sClient
toCreatePods *common.Queue
KubeScheduler
SchedulerName string
}

// NewElasticScheduler creates an elastic scheduler.
func NewElasticScheduler(k8sClient *kubeutils.K8sClient) *ElasticScheduler {
func NewElasticScheduler() *ElasticScheduler {
return &ElasticScheduler{
k8sClient: k8sClient,
toCreatePods: common.NewQueue(),
KubeScheduler: KubeScheduler{
toCreatePods: common.NewQueue(),
},
SchedulerName: "elastic",
}
}

// Start starts a routine to launch Pods.
func (scheduler *ElasticScheduler) Start(jobContext *common.JobContext) {
go scheduler.createPodLoop(jobContext.NameSpace)
func (scheduler *ElasticScheduler) Start(ctx context.Context, jobContext *common.JobContext) {
go scheduler.LoopToLaunchPods(ctx)
}

// DoScheduling creates/updates/deletes pods
Expand All @@ -52,7 +51,6 @@ func (scheduler *ElasticScheduler) DoScheduling(jobContext *common.JobContext, p
Number: spec.Replicas,
Rank: i,
}

podConfig := &kubeutils.PodConfig{
Replica: replicaConfig,
TemplateSpec: spec.Template.DeepCopy(),
Expand All @@ -62,24 +60,3 @@ func (scheduler *ElasticScheduler) DoScheduling(jobContext *common.JobContext, p
}
}
}

func (scheduler *ElasticScheduler) createPodLoop(namespace string) {
for {
for scheduler.toCreatePods.Len() > 0 {
pod := scheduler.toCreatePods.PopFront().(*corev1.Pod)
err := scheduler.k8sClient.CreatePod(namespace, pod)
if errors.IsAlreadyExists(err) {
logger.Warnf("The pod %s already exists.", pod.ObjectMeta.Name)
} else if errors.IsTooManyRequests(err) || errors.IsTimeout(err) || errors.IsServerTimeout(err) {
logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err)
// Retry to create pod due to timeout.
scheduler.toCreatePods.PushFront(pod)
time.Sleep(5 * time.Second)
} else {
logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err)
panic(err.Error())
}
}
time.Sleep(1 * time.Second)
}
}
2 changes: 1 addition & 1 deletion go/master/pkg/batchscheduler/elastic_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var _ = Describe("Elastic", func() {
},
}
schedulingPlan := &SchedulingPlan{ReplicaSpecs: replicas}
scheduler := NewElasticScheduler(nil)
scheduler := NewElasticScheduler()
scheduler.DoScheduling(jobContext, schedulingPlan)
Expect(scheduler.toCreatePods.Len()).To(Equal(3))
for i := 0; i < 3; i++ {
Expand Down
51 changes: 50 additions & 1 deletion go/master/pkg/batchscheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@
package batchscheduler

import (
"context"
"time"

elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1"
commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils"
logger "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
)

// BatchScheduler creates/updates/deletes the batch pods of an elastic job.
type BatchScheduler interface {
DoScheduling(*SchedulingPlan)
Start(ctx context.Context, jobContext *common.JobContext)
DoScheduling(jobContext *common.JobContext, plan *SchedulingPlan)
}

// SchedulingPlan is the scheduling plan to notify the scheduler CURD pods.
Expand All @@ -38,3 +46,44 @@ type SchedulingPlan struct {
// OwnerJob specifies a job to scale.
OwnerJob *elasticjob.ElasticJob
}

// NewBatchScheduler creates a batch scheduler according to the scheduler name.
func NewBatchScheduler(schedulerName string) BatchScheduler {
if schedulerName == "elastic" || schedulerName == "" {
scheduler := NewElasticScheduler()
return scheduler
}
return nil
}

// KubeScheduler is the base scheduler to create/update/remove pods.
type KubeScheduler struct {
toCreatePods *common.Queue
}

// LoopToLaunchPods launches pods from the pod queue.
func (scheduler *KubeScheduler) LoopToLaunchPods(ctx context.Context) {
for {
select {
case <-ctx.Done():
logger.Infof("The loop to launch Pod exists.")
default:
for scheduler.toCreatePods.Len() > 0 {
pod := scheduler.toCreatePods.PopFront().(*corev1.Pod)
err := kubeutils.GlobalK8sClient.CreatePod(ctx, pod)
if errors.IsAlreadyExists(err) {
logger.Warnf("The pod %s already exists.", pod.ObjectMeta.Name)
} else if errors.IsTooManyRequests(err) || errors.IsTimeout(err) || errors.IsServerTimeout(err) {
logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err)
// Retry to create pod due to timeout.
scheduler.toCreatePods.PushFront(pod)
time.Sleep(5 * time.Second)
} else {
logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err)
panic(err.Error())
}
}
}
time.Sleep(1 * time.Second)
}
}
62 changes: 61 additions & 1 deletion go/master/pkg/common/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@

package common

import (
"fmt"
"net"
"os"
"strconv"
"time"
)

const masterServicePort = 215000

// JobContext stores the elastic job context.
type JobContext struct {
// Namespace is the kubernetes namespace where the job runs.
Expand All @@ -22,5 +32,55 @@ type JobContext struct {
// MasterHost is the host of master service.
MasterHost string
// MasterPort is the host of master port.
MasterPort int32
MasterPort int
}

// NewJobContext creates a job context.
func NewJobContext(namespace string, name string) *JobContext {
host := fmt.Sprintf("elasticjob-%s-dlrover-master", name)
port := masterServicePort

if !checkAddressReachable(host, port) {
host = os.Getenv("MY_POD_IP")
freePort, err := getFreePort()
if err != nil {
panic(err.Error())
}
port = freePort
}

return &JobContext{
NameSpace: namespace,
Name: name,
MasterHost: host,
MasterPort: port,
}
}

func checkAddressReachable(host string, port int) bool {
timeout := time.Second
masterAddr := net.JoinHostPort(host, strconv.Itoa(port))
conn, err := net.DialTimeout("tcp", masterAddr, timeout)
if err != nil {
return false
}
if conn != nil {
defer conn.Close()
}
return true
}

func getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}

defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}
31 changes: 31 additions & 0 deletions go/master/pkg/common/context_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"os"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Context", func() {
It("New Job Context", func() {
os.Setenv("MY_POD_IP", "127.0.0.1")
jobContext := NewJobContext("dlrover", "train-demo")
Expect(jobContext.MasterHost).To(Equal("127.0.0.1"))
Expect(jobContext.MasterPort > 0).To(BeTrue())
})

})
16 changes: 16 additions & 0 deletions go/master/pkg/jobmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@

package jobmanager

import (
"context"

elasticjobv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"
)

// JobManager is the interface to manager job lifecycle.
type JobManager interface {
Start(ctx context.Context, jobContext *common.JobContext)
}

// NewJobManager creates a job manager.
func NewJobManager(elasticJob *elasticjobv1.ElasticJob) JobManager {
if elasticJob.Spec.DistributionStrategy == "pytorch" {
return NewPyTorchJobManager(elasticJob)
}
return nil
}
32 changes: 32 additions & 0 deletions go/master/pkg/jobmanager/pytorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,35 @@
// limitations under the License.

package jobmanager

import (
"context"

elasticjobv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/batchscheduler"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"
)

// PyTorchJobManager is the lifecycle manager of a PyTorch distributed training job.
type PyTorchJobManager struct {
replicaSchedulers map[string]batchscheduler.BatchScheduler
}

// NewPyTorchJobManager creates PyTorch distributed training job manager.
func NewPyTorchJobManager(elasticJob *elasticjobv1.ElasticJob) *PyTorchJobManager {
schedulers := make(map[string]batchscheduler.BatchScheduler)
for replicaType, spec := range elasticJob.Spec.ReplicaSpecs {
scheduler := batchscheduler.NewBatchScheduler(spec.BatchScheduler)
schedulers[string(replicaType)] = scheduler
}
return &PyTorchJobManager{
replicaSchedulers: schedulers,
}
}

// Start starts the modules of the job manager.
func (jobManager *PyTorchJobManager) Start(ctx context.Context, jobContext *common.JobContext) {
for _, scheduler := range jobManager.replicaSchedulers {
scheduler.Start(ctx, jobContext)
}
}
32 changes: 20 additions & 12 deletions go/master/pkg/kubeutils/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ import (
"k8s.io/client-go/tools/clientcmd"
)

// GlobalK8sClient is the global client to access a k8s cluster.
var GlobalK8sClient *K8sClient

// K8sClient contains the instance to access a k8s cluster.
type K8sClient struct {
namespace string
config *rest.Config
clientset *k8sApi.Clientset
dynamicClient *dynamic.DynamicClient
Expand All @@ -39,9 +43,16 @@ func GetGroupVersionResource(group, version, resource string) schema.GroupVersio
return schema.GroupVersionResource{Group: group, Version: version, Resource: resource}
}

// NewGlobalK8sClient initialize the global k8s client.
func NewGlobalK8sClient(kubeConfigPath string, namespace string) {
GlobalK8sClient = NewK8sClient(kubeConfigPath, namespace)
}

// NewK8sClient creates a k8s client instance.
func NewK8sClient(kubeConfigPath string) *K8sClient {
client := &K8sClient{}
func NewK8sClient(kubeConfigPath string, namespace string) *K8sClient {
client := &K8sClient{
namespace: namespace,
}

// creates the in-cluster config
if kubeConfigPath == "" {
Expand Down Expand Up @@ -74,14 +85,13 @@ func NewK8sClient(kubeConfigPath string) *K8sClient {
}

// GetCustomResourceInstance gets a custom resource instance from a k8s cluster.
func (client *K8sClient) GetCustomResourceInstance(
namespace string, name string, gvr schema.GroupVersionResource) (
func (client *K8sClient) GetCustomResourceInstance(name string, gvr schema.GroupVersionResource) (
*unstructured.Unstructured, error,
) {
// Unstructured
utd, err := client.dynamicClient.
Resource(gvr).
Namespace(namespace).
Namespace(client.namespace).
Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
logger.Infof("fail to get %s %s", gvr.String(), name)
Expand All @@ -90,12 +100,10 @@ func (client *K8sClient) GetCustomResourceInstance(
}

// CreatePod creates a Pod instance in the cluster
func (client *K8sClient) CreatePod(namespace string, pod *corev1.Pod) error {
_, err := client.clientset.CoreV1().Pods(namespace).Create(
context.Background(), pod, metav1.CreateOptions{},
)
if err != nil {
logger.Infof("fail to create a pod : %s", pod.ObjectMeta.Name)
}
func (client *K8sClient) CreatePod(ctx context.Context, pod *corev1.Pod) error {
_, err := client.clientset.
CoreV1().
Pods(client.namespace).
Create(ctx, pod, metav1.CreateOptions{})
return err
}
Loading

0 comments on commit 6740f60

Please sign in to comment.