Skip to content

Commit

Permalink
Merge pull request #7 from gfieni/master
Browse files Browse the repository at this point in the history
Refactoring of jobs and deployment parts
  • Loading branch information
rouvoy authored Oct 15, 2016
2 parents b946bd4 + 02c30ae commit d5a44e1
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 151 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ You need to be connected to the Grid5000 VPN to create and access your Docker no
Do not forget to configure your DNS or use OpenVPN DNS auto-configuration.
Please follow the instructions on the [Grid5000 Wiki](https://www.grid5000.fr/mediawiki/index.php/VPN).

### Options
### Driver-specific Options
The driver needs a few options to create a machine. Here is a list of options:

| Option | Description | Default value | Required |
Expand All @@ -70,7 +70,7 @@ Error with pre-create check: "[G5K_api] request failed: 400 Bad Request."

More informations about usage of OAR properties are available on the [Grid5000 Wiki](https://www.grid5000.fr/mediawiki/index.php/Advanced_OAR#Other_examples_using_properties).

### Example
### Provisioning Examples
An example of node provisioning :

```bash
Expand Down
102 changes: 61 additions & 41 deletions api/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@ package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"strings"
"time"

"github.com/docker/machine/libmachine/log"
)

// DeploymentRequest represents a new deployment request
type DeploymentRequest struct {
Nodes []string `json:"nodes"`
Environment string `json:"environment"`
Key string `json:"key"`
}

// Deployment represents a deployment response
type Deployment struct {
Nodes []string `json:"nodes"`
Site string `json:"site_uid"`
Expand All @@ -16,65 +24,77 @@ type Deployment struct {
Links []Link `json:"links"`
}

func (a *Api) DeployEnvironment(jobId int, SSHPublicKeyPath string) error {
// SubmitDeployment submits a new deployment request to g5k api
func (a *Api) SubmitDeployment(deploymentReq DeploymentRequest) (string, error) {
// create url for API call
urlDeploy := fmt.Sprintf("%s/sites/%s/deployments", G5kApiFrontend, a.Site)

job, err := a.GetJob(jobId)
// create deployment request json
deploymentArguments, err := json.Marshal(deploymentReq)
if err != nil {
return err
return "", err
}

// read ssh public key
sshPublicKey, err := a.readSSHPublicKey(SSHPublicKeyPath)
log.Infof("Submitting a new deployment... (image: '%s')", deploymentReq.Environment)

// send deployment request
resp, err := a.post(urlDeploy, string(deploymentArguments))
if err != nil {
return err
return "", err
}

// Wait for the nodes to be running
if !a.waitJobIsReady(job) {
return fmt.Errorf("[G5K_api] Job launching failed")
// unmarshal deployment response
var deployment Deployment
err = json.Unmarshal(resp, &deployment)
if err != nil {
return "", err
}

// Format arguments
nodesStrs := make([]string, 0)
for _, nodes := range job.Nodes {
nodesStrs = append(nodesStrs, `"`+nodes+`"`)
}
nodesJson := strings.Join(nodesStrs, ",")
log.Infof("Deployment submitted successfully (id: '%s')", deployment.UID)
return deployment.UID, nil
}

// Deploying
deploymentArguments := fmt.Sprintf(`{"nodes": [%s], "environment": %q, "key": %q}`, nodesJson, a.Environment, sshPublicKey)
var resp []byte
var deployment Deployment
// GetDeployment get the deployment from its id
func (a *Api) GetDeployment(deploymentID string) (*Deployment, error) {
// create url for API call
url := fmt.Sprintf("%s/sites/%s/deployments/%s", G5kApiFrontend, a.Site, deploymentID)

resp, err = a.post(urlDeploy, deploymentArguments)
// send request
resp, err := a.get(url)
if err != nil {
return err
return nil, err
}

// unmarshal json response
var deployment Deployment
err = json.Unmarshal(resp, &deployment)
if err != nil {
return nil, err
}

// Waiting the deployment finishes
for deployment.Status == "waiting" || deployment.Status == "processing" {
time.Sleep(10 * time.Second)
resp, err = a.get(urlDeploy + "/" + deployment.UID)
return &deployment, nil
}

// WaitUntilDeploymentIsFinished will wait until the deployment reach the 'terminated' state (no timeout)
func (a *Api) WaitUntilDeploymentIsFinished(deploymentID string) error {
log.Info("Waiting for deployment to finish, it will take a few minutes...")

// refresh deployment status
for deployment, err := a.GetDeployment(deploymentID); deployment.Status != "terminated"; deployment, err = a.GetDeployment(deploymentID) {
// check if GetDeployment returned an error
if err != nil {
return err
} else if err = json.Unmarshal(resp, &deployment); err != nil {
return err
}
}
if deployment.Status != "terminated" {
return fmt.Errorf("[G5K_api] Deployment failed: status is %s\n", deployment.Status)
}
return nil
}

// readSSHPublicKey read the ssh public key file and return the key in a string
func (a *Api) readSSHPublicKey(SSHPublicKeyPath string) (string, error) {
content, err := ioutil.ReadFile(SSHPublicKeyPath)
if err != nil {
return "", err
// stop if the deployment is in 'canceled' or 'error' state
if deployment.Status == "canceled" || deployment.Status == "error" {
return fmt.Errorf("Can't wait for a deployment in '%s' state", deployment.Status)
}

// wait 10 seconds before making another API call
time.Sleep(10 * time.Second)
}

return string(content), nil
log.Info("Deployment finished successfully")
return nil
}
139 changes: 70 additions & 69 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ import (
"encoding/json"
"fmt"
"time"

"github.com/docker/machine/libmachine/log"
)

// JobRequest represents a new job submission
type JobRequest struct {
Resources string `json:"resources"`
Command string `json:"command"`
Properties string `json:"properties,omitempty"`
Types []string `json:"types"`
}

// Job represents an existing job
type Job struct {
Uid int `json:"uid"`
UID int `json:"uid"`
State string `json:"state"`
Timelife int `json:"walltime"`
Types []string `json:"types"`
Expand All @@ -23,107 +27,104 @@ type Job struct {
Nodes []string `json:"assigned_nodes"`
}

// convertDuration take a string "hh:mm:ss" and convert it in seconds
func convertDuration(t string) (int, error) {
var h, m, s int

if _, err := fmt.Sscanf(t, "%d:%d:%d", &h, &m, &s); err != nil {
return 0, err
}

return (h * 3600) + (m * 60) + s, nil
}

// Submit a job on G5K. Returns the job ID.
func (a *Api) SubmitJob(walltime, resourceProperties string) (int, error) {
urlSubmit := fmt.Sprintf("%s/sites/%s/jobs", G5kApiFrontend, a.Site)
// SubmitJob submit a new job on g5k api and return the job id
func (a *Api) SubmitJob(jobReq JobRequest) (int, error) {
// create url for API call
urlAPI := fmt.Sprintf("%s/sites/%s/jobs", G5kApiFrontend, a.Site)

seconds, err := convertDuration(walltime)
// create job request json
params, err := json.Marshal(jobReq)
if err != nil {
return 0, err
}

// create a new Job request (1 node)
params, err := json.Marshal(JobRequest{
Resources: fmt.Sprintf("nodes=1,walltime=%s", walltime),
Command: fmt.Sprintf("sleep %v", seconds),
Properties: resourceProperties,
Types: []string{"deploy"},
})
log.Info("Submitting a new job...")

// send job request
resp, err := a.post(urlAPI, string(params))
if err != nil {
return 0, err
}

// unmarshal json response
var job Job
var resp []byte

if resp, err = a.post(urlSubmit, string(params)); err != nil {
err = json.Unmarshal(resp, &job)
if err != nil {
return 0, err
} else {
err = json.Unmarshal(resp, &job)
return job.Uid, err
}

log.Infof("Job submitted successfully (id: '%v')", job.UID)
return job.UID, nil
}

// Refresh job's state
func (a *Api) GetJob(jobId int) (*Job, error) {
job := new(Job)
url := fmt.Sprintf("%s/sites/%s/jobs/%v", G5kApiFrontend, a.Site, jobId)
// GetJob get the job from its id
func (a *Api) GetJob(jobID int) (*Job, error) {
// create url for API call
url := fmt.Sprintf("%s/sites/%s/jobs/%v", G5kApiFrontend, a.Site, jobID)

if resp, err := a.get(url); err != nil {
return &Job{}, err
} else {
err = json.Unmarshal(resp, &job)
return job, err
// send request
resp, err := a.get(url)
if err != nil {
return nil, err
}
}

// Returns the job's current state
func (a *Api) GetJobState(jobId int) (string, error) {
if job, err := a.GetJob(jobId); err != nil {
return "", err
} else if a.jobIsOver(job) {
return "terminated", nil
} else {
return job.State, nil
// unmarshal json response
var job Job
err = json.Unmarshal(resp, &job)
if err != nil {
return nil, err
}

return &job, nil
}

// Returns true if the job expired, false otherwise
func (a *Api) jobIsOver(job *Job) bool {
currentTime := time.Now().Unix()
startTime := int64(job.StartTime)
timelife := int64(job.Timelife)
// GetJobState returns the current state of the job
func (a *Api) GetJobState(jobID int) (string, error) {
// get job from api
job, err := a.GetJob(jobID)
if err != nil {
return "", err
}

return (currentTime - startTime) >= timelife
return job.State, nil
}

// Free the nodes allocated to the jobs
func (a *Api) KillJob(jobId int) error {
url := fmt.Sprintf("%s/sites/%s/jobs/%v", G5kApiFrontend, a.Site, jobId)
// KillJob ask for deletion of a job
func (a *Api) KillJob(jobID int) error {
// create url for API call
url := fmt.Sprintf("%s/sites/%s/jobs/%v", G5kApiFrontend, a.Site, jobID)

// send delete request
_, err := a.del(url)

return err
}

func (a *Api) waitJobIsReady(job *Job) bool {
var err error
tmp_job := new(Job)
// WaitUntilJobIsReady wait until the job reach the 'running' state (no timeout)
func (a *Api) WaitUntilJobIsReady(jobID int) error {
log.Info("Waiting for job to run...")

for job.State == "waiting" || job.State == "tolaunch" || job.State == "launching" {
if tmp_job, err = a.GetJob(job.Uid); err != nil {
return false
// refresh job state
for job, err := a.GetJob(jobID); job.State != "running"; job, err = a.GetJob(jobID) {
// check if GetJob returned an error
if err != nil {
return err
}
*job = *tmp_job

// stop if the job is in 'error' or 'terminated' state
if job.State == "error" || job.State == "terminated" {
return fmt.Errorf("Can't wait for a job in '%s' state", job.State)
}

// warn if job is in 'hold' state
if job.State == "hold" {
log.Infof("Job '%s' is in hold state, dont forget to resume it")
}

// wait 3 seconds before making another API call
time.Sleep(3 * time.Second)
}

// If the launching failed
if job.State != "running" {
return false
} else {
return true
}
log.Info("Job is running")
return nil
}
14 changes: 14 additions & 0 deletions api/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package api

import "fmt"

// ConvertDuration take a string "hh:mm:ss" and convert it in seconds
func ConvertDuration(t string) (int, error) {
var h, m, s int

if _, err := fmt.Sscanf(t, "%d:%d:%d", &h, &m, &s); err != nil {
return 0, err
}

return (h * 3600) + (m * 60) + s, nil
}
Loading

0 comments on commit d5a44e1

Please sign in to comment.