Skip to content

Commit

Permalink
Task directory_out (#340)
Browse files Browse the repository at this point in the history
* Task directory_out

* UpdateContext

* integration

* nested storage

* remove integrations

* Restyled by gofmt (#346)

Co-authored-by: Restyled.io <[email protected]>

* directory_out must be empty or not exists

* Restyled by gofmt (#351)

Co-authored-by: Restyled.io <[email protected]>

* workdir

* docs: workdir

* docs: minify HCL syntax

Co-authored-by: restyled-io[bot] <32688539+restyled-io[bot]@users.noreply.github.com>
Co-authored-by: Restyled.io <[email protected]>
Co-authored-by: Casper da Costa-Luis <[email protected]>
  • Loading branch information
4 people authored Jan 10, 2022
1 parent caa45bb commit be9e0cd
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 53 deletions.
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,9 @@ Create a file named `main.tf` in an empty directory with the following contents:

```hcl
terraform {
required_providers {
iterative = {
source = "github.com/iterative/iterative"
}
}
required_providers { iterative = { source = "iterative/iterative" } }
}
provider "iterative" {}
# ... other resource blocks ...
```

Expand Down
21 changes: 8 additions & 13 deletions docs/guides/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,16 @@ In the project root directory:

```hcl
terraform {
required_providers {
iterative = {
source = "iterative/iterative"
}
}
required_providers { iterative = { source = "iterative/iterative" } }
}
provider "iterative" {}
resource "iterative_task" "example" {
resource "iterative_task" "task" {
name = "example"
cloud = "aws" # or any of: gcp, az, k8s
directory = "${path.root}/shared"
script = <<-END
workdir {
input = "${path.root}/shared"
}
script = <<-END
#!/bin/bash
echo "Hello World!" > greeting.txt
END
Expand Down Expand Up @@ -70,7 +65,7 @@ $ terraform apply
This command will:

1. Create all the required cloud resources.
2. Upload the specified shared `directory` to the cloud.
2. Upload the specified shared `input` working directory to the cloud.
3. Launch the task `script`.

## Viewing Task Statuses
Expand All @@ -95,7 +90,7 @@ $ terraform destroy

This command will:

1. Download the specified shared `directory` from the cloud.
1. Download the specified shared working directory from the cloud.
2. Delete all the cloud resources created by `terraform apply`.

## Viewing Task Results
Expand Down
8 changes: 1 addition & 7 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,9 @@ Use the Iterative Provider to launch resource-intensive tasks in popular cloud p

```hcl
terraform {
required_providers {
iterative = {
source = "iterative/iterative"
}
}
required_providers { iterative = { source = "iterative/iterative" } }
}
provider "iterative" {}
resource "iterative_task" "task" {
name = "example"
cloud = "aws"
Expand Down
17 changes: 11 additions & 6 deletions docs/resources/task.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
This resource will:

1. Create cloud resources (machines and storage) for the task.
2. Upload the given `directory` to the cloud storage, if specified.
3. Run the given `script` until completion or `timeout` in the cloud machine.
2. Upload the given `workdir.input` to the cloud storage.
3. Run the given `script` on the cloud machine until completion or `timeout`.
4. Download results to the given `workdir.output`.

## Example Usage

Expand All @@ -14,8 +15,11 @@ resource "iterative_task" "task" {
cloud = "aws"
environment = { GREETING = "Hello, world!" }
directory = "${path.root}/shared"
script = <<-END
workdir {
input = "${path.root}/shared"
output = "${path.root}/results"
}
script = <<-END
#!/bin/bash
echo "$GREETING" | tee $(uuidgen)
END
Expand All @@ -38,7 +42,8 @@ resource "iterative_task" "task" {
- `spot` - (Optional) Spot instance price. `-1`: disabled, `0`: automatic price, any other positive number: fixed price.
- `image` - (Optional) [Machine image](#machine-images) to run the task with.
- `parallelism` - (Optional) Number of machines to be launched in parallel.
- `directory` - (Optional) Local directory to synchronize.
- `workdir.input` - (Optional) Local working directory to upload.
- `workdir.output` - (Optional) Local directory to download results to (default: `workdir.input`).
- `environment` - (Optional) Map of environment variable names and values for the task script. Empty string values are replaced with local environment values. Empty values may also be combined with a [glob](<https://en.wikipedia.org/wiki/Glob_(programming)>) name to import all matching variables.
- `timeout` - (Optional) Maximum number of seconds to run before termination.

Expand Down Expand Up @@ -205,7 +210,7 @@ Setting the `region` attribute results in undefined behaviour.

#### Directory storage

Unlike public cloud providers, Kubernetes does not offer any portable way of persisting and sharing storage between pods. When specified, the `directory` attribute will create a `PersistentVolumeClaim` of the default `StorageClass`, with the same lifecycle as the task and the specified `disk_size`.
Unlike public cloud providers, Kubernetes does not offer any portable way of persisting and sharing storage between pods. When specified, the `workdir.input` attribute will create a `PersistentVolumeClaim` of the default `StorageClass`, with the same lifecycle as the task and the specified `disk_size`.

~> **Warning:** Access mode will be `ReadWriteOnce` if `parallelism=1` or `ReadWriteMany` otherwise.

Expand Down
66 changes: 57 additions & 9 deletions iterative/resource_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"time"

Expand All @@ -19,6 +21,7 @@ func resourceTask() *schema.Resource {
CreateContext: resourceTaskCreate,
DeleteContext: resourceTaskDelete,
ReadContext: resourceTaskRead,
UpdateContext: resourceTaskRead,
Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Expand Down Expand Up @@ -103,11 +106,25 @@ func resourceTask() *schema.Resource {
ForceNew: true,
Required: true,
},
"directory": {
Type: schema.TypeString,
ForceNew: true,
"workdir": {
Optional: true,
Default: "",
Type: schema.TypeSet,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"input": {
Type: schema.TypeString,
ForceNew: true,
Optional: true,
Default: "",
},
"output": {
Type: schema.TypeString,
ForceNew: false,
Optional: true,
Default: "",
},
},
},
},
"parallelism": {
Type: schema.TypeInt,
Expand Down Expand Up @@ -260,17 +277,34 @@ func resourceTaskBuild(ctx context.Context, d *schema.ResourceData, m interface{
},
}

directory := ""
directory_out := ""
if d.Get("workdir").(*schema.Set).Len() > 0 {
storage := d.Get("workdir").(*schema.Set).List()[0].(map[string]interface{})
directory = storage["input"].(string)

directory_out = storage["output"].(string)
if directory_out == "" {
directory_out = directory
}
}

if directory_out != "" && !isOutputValid(directory_out) {
return nil, errors.New("output directory " + directory_out + " is not empty!")
}

t := common.Task{
Size: common.Size{
Machine: d.Get("machine").(string),
Storage: d.Get("disk_size").(int),
},
Environment: common.Environment{
Image: d.Get("image").(string),
Script: d.Get("script").(string),
Variables: v,
Directory: d.Get("directory").(string),
Timeout: time.Duration(d.Get("timeout").(int)) * time.Second,
Image: d.Get("image").(string),
Script: d.Get("script").(string),
Variables: v,
Directory: directory,
DirectoryOut: directory_out,
Timeout: time.Duration(d.Get("timeout").(int)) * time.Second,
},
Firewall: common.Firewall{
Ingress: common.FirewallRule{
Expand All @@ -291,3 +325,17 @@ func diagnostic(diags diag.Diagnostics, err error, severity diag.Severity) diag.
Summary: err.Error(),
})
}

func isOutputValid(path string) bool {
f, err := os.Open(path)
if err != nil {
return true
}
defer f.Close()

_, err = f.Readdir(1)
if err == io.EOF {
return true
}
return false
}
4 changes: 2 additions & 2 deletions task/aws/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func (t *Task) Read(ctx context.Context) error {

func (t *Task) Delete(ctx context.Context) error {
log.Println("[INFO] Downloading Directory...")
if t.Attributes.Environment.Directory != "" && t.Read(ctx) == nil {
if err := t.Pull(ctx, t.Attributes.Environment.Directory); err != nil && err != common.NotFoundError {
if t.Attributes.Environment.DirectoryOut != "" && t.Read(ctx) == nil {
if err := t.Pull(ctx, t.Attributes.Environment.DirectoryOut); err != nil && err != common.NotFoundError {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions task/az/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func (t *Task) Read(ctx context.Context) error {

func (t *Task) Delete(ctx context.Context) error {
log.Println("[INFO] Downloading Directory...")
if t.Attributes.Environment.Directory != "" && t.Read(ctx) == nil {
if err := t.Pull(ctx, t.Attributes.Environment.Directory); err != nil && err != common.NotFoundError {
if t.Attributes.Environment.DirectoryOut != "" && t.Read(ctx) == nil {
if err := t.Pull(ctx, t.Attributes.Environment.DirectoryOut); err != nil && err != common.NotFoundError {
return err
}
}
Expand Down
5 changes: 3 additions & 2 deletions task/common/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type Environment struct {
Image string
Script string
Variables
Timeout time.Duration
Directory string
Timeout time.Duration
Directory string
DirectoryOut string
}

type Variables map[string]*string
Expand Down
4 changes: 2 additions & 2 deletions task/gcp/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ func (t *Task) Read(ctx context.Context) error {

func (t *Task) Delete(ctx context.Context) error {
log.Println("[INFO] Downloading Directory...")
if t.Attributes.Environment.Directory != "" && t.Read(ctx) == nil {
if err := t.Pull(ctx, t.Attributes.Environment.Directory); err != nil && err != common.NotFoundError {
if t.Attributes.Environment.DirectoryOut != "" && t.Read(ctx) == nil {
if err := t.Pull(ctx, t.Attributes.Environment.DirectoryOut); err != nil && err != common.NotFoundError {
return err
}
}
Expand Down
12 changes: 9 additions & 3 deletions task/k8s/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func New(ctx context.Context, cloud common.Cloud, identifier common.Identifier,
t.Identifier = identifier
t.Attributes.Task = task
t.Attributes.Directory = persistentVolumeDirectory
t.Attributes.DirectoryOut = persistentVolumeDirectory
if task.Environment.DirectoryOut != "" {
t.Attributes.DirectoryOut = task.Environment.DirectoryOut
}

t.Resources.ConfigMap = resources.NewConfigMap(
t.Client,
t.Identifier,
Expand Down Expand Up @@ -80,7 +85,8 @@ type Task struct {
Identifier common.Identifier
Attributes struct {
common.Task
Directory string
Directory string
DirectoryOut string
}
DataSources struct{}
Resources struct {
Expand Down Expand Up @@ -156,7 +162,7 @@ func (t *Task) Read(ctx context.Context) error {
}

func (t *Task) Delete(ctx context.Context) error {
if t.Attributes.Directory != "" && t.Read(ctx) == nil {
if t.Attributes.DirectoryOut != "" && t.Read(ctx) == nil {
os.Setenv("TPI_TRANSFER_MODE", "true")
os.Setenv("TPI_PULL_MODE", "true")
defer os.Unsetenv("TPI_TRANSFER_MODE")
Expand All @@ -171,7 +177,7 @@ func (t *Task) Delete(ctx context.Context) error {
return err
}
log.Println("[INFO] Downloading Directory...")
if err := t.Pull(ctx, t.Attributes.Directory); err != nil {
if err := t.Pull(ctx, t.Attributes.DirectoryOut); err != nil {
return err
}

Expand Down

0 comments on commit be9e0cd

Please sign in to comment.