Skip to content

Commit

Permalink
feat(opencurve#27): supports a common way for deployment waiting
Browse files Browse the repository at this point in the history
Signed-off-by: Anur Ijuokarukas <[email protected]>
  • Loading branch information
anurnomeru committed Apr 19, 2023
1 parent ebde98e commit 2f697e2
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 28 deletions.
8 changes: 6 additions & 2 deletions pkg/chunkserver/chunkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/coreos/pkg/capnslog"
"github.com/pkg/errors"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"

curvev1 "github.com/opencurve/curve-operator/api/v1"
Expand Down Expand Up @@ -106,14 +107,17 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
logger.Info("create physical pool successed")

// 3. startChunkServers start all chunkservers for each device of every node
err = c.startChunkServers()
var chunkServers []*v1.Deployment
chunkServers, err = c.startChunkServers()
if err != nil {
return errors.Wrap(err, "failed to start chunkserver")
}

// 4. wait all chunkservers online before create logical pool
logger.Info("starting all chunkserver")
time.Sleep(30 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
k8sutil.WaitForDeploymentsToStart(ctx, c.context.Clientset, 3*time.Second, chunkServers...)

// 5. create logical pool
_, err = c.runCreatePoolJob(nodeNameIP, "logical_pool")
Expand Down
53 changes: 27 additions & 26 deletions pkg/chunkserver/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (

"github.com/pkg/errors"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -15,20 +16,21 @@ import (
)

// startChunkServers start all chunkservers for each device of every node
func (c *Cluster) startChunkServers() error {
func (c *Cluster) startChunkServers() ([]*v1.Deployment, error) {
results := make([]*v1.Deployment, 0)
if len(job2DeviceInfos) == 0 {
logger.Errorf("no job to format device and provision chunk file")
return nil
return results, nil
}

if len(chunkserverConfigs) == 0 {
logger.Errorf("no device need to start chunkserver")
return nil
return results, nil
}

if len(job2DeviceInfos) != len(chunkserverConfigs) {
logger.Errorf("no device need to start chunkserver")
return errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config")
return results, errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config")
}

_ = c.createStartCSConfigMap()
Expand All @@ -41,18 +43,18 @@ func (c *Cluster) startChunkServers() error {

err := c.createConfigMap(csConfig)
if err != nil {
return errors.Wrapf(err, "failed to create chunkserver configmap for %v", config.ChunkserverConfigMapName)
return results, errors.Wrapf(err, "failed to create chunkserver configmap for %v", config.ChunkserverConfigMapName)
}

d, err := c.makeDeployment(&csConfig)
if err != nil {
return errors.Wrap(err, "failed to create chunkserver Deployment")
return results, errors.Wrap(err, "failed to create chunkserver Deployment")
}

newDeployment, err := c.context.Clientset.AppsV1().Deployments(c.namespacedName.Namespace).Create(d)
if err != nil {
if !kerrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName)
return results, errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName)
}
logger.Infof("deployment for chunkserver %s already exists. updating if needed", csConfig.ResourceName)

Expand All @@ -63,12 +65,11 @@ func (c *Cluster) startChunkServers() error {
} else {
logger.Infof("Deployment %s has been created , waiting for startup", newDeployment.GetName())
// TODO:wait for the new deployment
// deploymentsToWaitFor = append(deploymentsToWaitFor, newDeployment)
results = append(results, newDeployment)
}
// update condition type and phase etc.
}

return nil
return results, nil
}

// createCSClientConfigMap create cs_client configmap
Expand All @@ -95,7 +96,7 @@ func (c *Cluster) createCSClientConfigMap() error {
config.CSClientConfigMapDataKey: replacedCsClientData,
}

cm := &v1.ConfigMap{
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: config.CSClientConfigMapName,
Namespace: c.namespacedName.Namespace,
Expand Down Expand Up @@ -146,7 +147,7 @@ func (c *Cluster) CreateS3ConfigMap() error {
config.S3ConfigMapDataKey: configMapData,
}

cm := &v1.ConfigMap{
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: config.S3ConfigMapName,
Namespace: c.namespacedName.Namespace,
Expand Down Expand Up @@ -175,7 +176,7 @@ func (c *Cluster) createStartCSConfigMap() error {
startChunkserverScriptFileDataKey: script.START,
}

cm := &v1.ConfigMap{
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: startChunkserverConfigMapName,
Namespace: c.namespacedName.Namespace,
Expand Down Expand Up @@ -227,7 +228,7 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
config.ChunkserverConfigMapDataKey: replacedChunkServerData,
}

cm := &v1.ConfigMap{
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: csConfig.CurrentConfigMapName,
Namespace: c.namespacedName.Namespace,
Expand All @@ -254,19 +255,19 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,
vols, _ := c.createTopoAndToolVolumeAndMount()
volumes = append(volumes, vols...)

podSpec := v1.PodTemplateSpec{
podSpec := corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: csConfig.ResourceName,
Labels: c.getChunkServerPodLabels(csConfig),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
c.makeCSDaemonContainer(csConfig),
},
NodeName: csConfig.NodeName,
RestartPolicy: v1.RestartPolicyAlways,
RestartPolicy: corev1.RestartPolicyAlways,
HostNetwork: true,
DNSPolicy: v1.DNSClusterFirstWithHostNet,
DNSPolicy: corev1.DNSClusterFirstWithHostNet,
Volumes: volumes,
},
}
Expand Down Expand Up @@ -301,7 +302,7 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,
}

// makeCSDaemonContainer create chunkserver container
func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Container {
func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) corev1.Container {

privileged := true
runAsUser := int64(0)
Expand All @@ -321,7 +322,7 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe
argsChunkserverPort := strconv.Itoa(csConfig.Port)
argsConfigFileMountPath := path.Join(config.ChunkserverConfigMapMountPathDir, config.ChunkserverConfigMapDataKey)

container := v1.Container{
container := corev1.Container{
Name: "chunkserver",
Command: []string{
"/bin/bash",
Expand All @@ -339,16 +340,16 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe
Image: c.spec.CurveVersion.Image,
ImagePullPolicy: c.spec.CurveVersion.ImagePullPolicy,
VolumeMounts: volMounts,
Ports: []v1.ContainerPort{
Ports: []corev1.ContainerPort{
{
Name: "listen-port",
ContainerPort: int32(csConfig.Port),
HostPort: int32(csConfig.Port),
Protocol: v1.ProtocolTCP,
Protocol: corev1.ProtocolTCP,
},
},
Env: []v1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}},
SecurityContext: &v1.SecurityContext{
Env: []corev1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}},
SecurityContext: &corev1.SecurityContext{
Privileged: &privileged,
RunAsUser: &runAsUser,
RunAsNonRoot: &runAsNonRoot,
Expand Down
94 changes: 94 additions & 0 deletions pkg/k8sutil/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package k8sutil

import (
"context"
"time"

v1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// WaitForDeploymentsToStart waits for the deployments to start, and returns a channel to indicate whether
// all deployments are started or not
//
// tickDuration is the interval to check the deployment status
// objectMeta is the metadata of the deployment
//
// we use the hub chan to collect the result of each deployment, and when all deployments are started,
// we return true, otherwise, we return false, this design let WaitForDeploymentToStart and
// WaitForDeploymentsToStart can be used in the same way
func WaitForDeploymentsToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration,
objectMetas ...*v1.Deployment) chan bool {
length := len(objectMetas)
hub := make(chan bool, length)
defer close(hub)
for i := range objectMetas {
objectMata := objectMetas[i]
go func() {
if succeed := <-WaitForDeploymentToStart(ctx, clientSet, tickDuration, objectMata); !succeed {
hub <- false
return
}
}()
}

chn := make(chan bool)
go func() {
defer close(chn)
for i := 0; i < length; i++ {
if succeed := <-hub; !succeed {
chn <- false
return
}
}
chn <- true
return
}()
return chn
}

// WaitForDeploymentToStart waits for the deployment to start, and returns a channel to indicate whether
// the deployment is started or not
//
// tickDuration is the interval to check the deployment status
// objectMeta is the metadata of the deployment
func WaitForDeploymentToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration,
objectMeta *v1.Deployment) chan bool {
ticker := time.NewTicker(tickDuration)
defer ticker.Stop()

chn := make(chan bool)
go func() {
defer close(chn)
for {
select {
case <-ticker.C:
deployment, err := clientSet.AppsV1().Deployments(objectMeta.GetNamespace()).Get(objectMeta.GetName(),
metav1.GetOptions{})
logger.Infof("waiting for deployment %s starting", deployment.Name)
if err != nil {

// TODO: return the failed reason is required??
logger.Errorf("failed to get deployment %s in cluster", objectMeta.GetName())
chn <- false
return
}
if deployment.Status.ObservedGeneration != deployment.Status.ObservedGeneration &&
deployment.Status.UpdatedReplicas > 0 &&
deployment.Status.ReadyReplicas > 0 {
logger.Infof("deployment %s has been started", deployment.Name)
chn <- true
return
}

// TODO: should log the unready reason, e.g. conditions, etc. to help debugging??
case <-ctx.Done():
chn <- false
logger.Infof("stop waiting for deployment %s to start due to context is done", objectMeta.GetName())
return
}
}
}()
return chn
}

0 comments on commit 2f697e2

Please sign in to comment.