Skip to content

Commit

Permalink
Task status, heavy write fix and aws ips (#322)
Browse files Browse the repository at this point in the history
* Add task script status reporting logic

* Update task/common/machine/script.go

* Add support for status reporting

* Reduces S3 writes

* Restyled by gofmt

* Sort rclone on startup script

* Fix IP address propagation

:sweat_smile: It happened because DescribeInstances with
InstanceIds = [] was the same as describing every imaginable
instance in the account.

* Remove dead code

Co-authored-by: Restyled.io <[email protected]>
Co-authored-by: DavidGOrtega <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2022
1 parent d242ce5 commit caa45bb
Show file tree
Hide file tree
Showing 14 changed files with 122 additions and 54 deletions.
6 changes: 5 additions & 1 deletion iterative/resource_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,11 @@ func resourceTaskRead(ctx context.Context, d *schema.ResourceData, m interface{}
}
d.Set("events", events)

d.Set("status", task.Status(ctx))
status, err := task.Status(ctx)
if err != nil {
return diagnostic(diags, err, diag.Warning)
}
d.Set("status", status)

logs, err := task.Logs(ctx)
if err != nil {
Expand Down
42 changes: 23 additions & 19 deletions task/aws/resources/resource_auto_scaling_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,28 +113,32 @@ func (a *AutoScalingGroup) Read(ctx context.Context) error {
return common.NotFoundError
}

var instancesInput ec2.DescribeInstancesInput
for _, instance := range groups.AutoScalingGroups[0].Instances {
instancesInput.InstanceIds = append(instancesInput.InstanceIds, aws.ToString(instance.InstanceId))
}

a.Attributes.Addresses = []net.IP{}
a.Attributes.Status = common.Status{common.StatusCodeRunning: 0}
for instancesPaginator := ec2.NewDescribeInstancesPaginator(a.Client.Services.EC2, &instancesInput); instancesPaginator.HasMorePages(); {
page, err := instancesPaginator.NextPage(ctx)
if err != nil {
return err
a.Attributes.Status = common.Status{common.StatusCodeActive: 0}
if len(groups.AutoScalingGroups[0].Instances) > 0 {
var instancesInput ec2.DescribeInstancesInput
for _, instance := range groups.AutoScalingGroups[0].Instances {
instancesInput.InstanceIds = append(instancesInput.InstanceIds, aws.ToString(instance.InstanceId))
}

for _, reservation := range page.Reservations {
for _, instance := range reservation.Instances {
status := string(instance.State.Name)
if instance.StateReason != nil {
status += " " + aws.ToString(instance.StateReason.Message)
}
a.Attributes.Status[common.StatusCode(status)]++
if address := net.ParseIP(aws.ToString(instance.PublicIpAddress)); address != nil {
a.Attributes.Addresses = append(a.Attributes.Addresses, address)
for instancesPaginator := ec2.NewDescribeInstancesPaginator(a.Client.Services.EC2, &instancesInput); instancesPaginator.HasMorePages(); {
page, err := instancesPaginator.NextPage(ctx)
if err != nil {
return err
}

for _, reservation := range page.Reservations {
for _, instance := range reservation.Instances {
status := string(instance.State.Name)
if instance.StateReason != nil {
status += " " + aws.ToString(instance.StateReason.Message)
}
if status == "running" {
a.Attributes.Status[common.StatusCodeActive]++
}
if address := net.ParseIP(aws.ToString(instance.PublicIpAddress)); address != nil {
a.Attributes.Addresses = append(a.Attributes.Addresses, address)
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions task/aws/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,12 @@ func (t *Task) Events(ctx context.Context) []common.Event {
return t.Attributes.Events
}

func (t *Task) Status(ctx context.Context) common.Status {
return t.Attributes.Status
func (t *Task) Status(ctx context.Context) (common.Status, error) {
if err := t.Read(ctx); err != nil {
return nil, err
}

return machine.Status(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"], t.Attributes.Status)
}

func (t *Task) GetKeyPair(ctx context.Context) (*ssh.DeterministicSSHKeyPair, error) {
Expand Down
5 changes: 2 additions & 3 deletions task/az/resources/resource_virtual_machine_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,16 @@ func (v *VirtualMachineScaleSet) Read(ctx context.Context) error {
}

v.Attributes.Events = []common.Event{}
v.Attributes.Status = common.Status{common.StatusCodeRunning: 0}
v.Attributes.Status = common.Status{common.StatusCodeActive: 0}
scaleSetView, err := v.Client.Services.VirtualMachineScaleSets.GetInstanceView(ctx, v.Dependencies.ResourceGroup.Identifier, v.Identifier)
if err != nil {
return err
}
if scaleSetView.VirtualMachine.StatusesSummary != nil {
for _, status := range *scaleSetView.VirtualMachine.StatusesSummary {
code := to.String(status.Code)
v.Attributes.Status[common.StatusCode(code)] = int(to.Int32(status.Count))
if code == "ProvisioningState/succeeded" {
v.Attributes.Status[common.StatusCodeRunning] = int(to.Int32(status.Count))
v.Attributes.Status[common.StatusCodeActive] = int(to.Int32(status.Count))
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions task/az/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,12 @@ func (t *Task) Events(ctx context.Context) []common.Event {
return t.Attributes.Events
}

func (t *Task) Status(ctx context.Context) common.Status {
return t.Attributes.Status
func (t *Task) Status(ctx context.Context) (common.Status, error) {
if err := t.Read(ctx); err != nil {
return nil, err
}

return machine.Status(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"], t.Attributes.Status)
}

func (t *Task) GetKeyPair(ctx context.Context) (*ssh.DeterministicSSHKeyPair, error) {
Expand Down
18 changes: 13 additions & 5 deletions task/common/machine/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,17 @@ while IFS= read -rd $'\0' variable; do
export "$(perl -0777p -e 's/\\"/"/g;' -e 's/(.+?)="(.+)"/$1=$2/sg' <<< "$variable")"
done < <(perl -0777pe 's/\n*(.+?=".*?((?<!\\)"|\\\\"))\n*/$1\x00/sg' /tmp/tpi-environment)
TPI_MACHINE_IDENTITY="$(uuidgen)"
TPI_LOG_DIRECTORY="$(mktemp --directory)"
TPI_DATA_DIRECTORY="/tmp/tpi-task"
sudo tee /etc/systemd/system/tpi-task.service > /dev/null <<END
[Unit]
After=default.target
[Service]
Type=simple
ExecStart=/usr/bin/tpi-task
ExecStop=/bin/bash -c 'systemctl is-system-running | grep stopping || echo "{\\\\"result\\\\": \\\\"\$SERVICE_RESULT\\\\", \\\\"code\\\\": \\\\"\$EXIT_STATUS\\\\", \\\\"status\\\\": \\\\"\$EXIT_CODE\\\\"}" > "$TPI_LOG_DIRECTORY/status-$TPI_MACHINE_IDENTITY" && rclone copy "$TPI_LOG_DIRECTORY" "$RCLONE_REMOTE/reports"'
ExecStopPost=/usr/bin/tpi-task-shutdown
Environment=HOME=/root
EnvironmentFile=/tmp/tpi-environment
Expand Down Expand Up @@ -98,17 +103,20 @@ rclone copy "$RCLONE_REMOTE/data" /tmp/tpi-task
sudo systemctl daemon-reload
sudo systemctl enable tpi-task.service --now
TPI_MACHINE_IDENTITY="$(uuidgen)"
TPI_LOG_DIRECTORY="$(mktemp --directory)"
while sleep 5; do
journalctl > "$TPI_LOG_DIRECTORY/machine-$TPI_MACHINE_IDENTITY"
journalctl --unit tpi-task > "$TPI_LOG_DIRECTORY/task-$TPI_MACHINE_IDENTITY"
rclone copy "$TPI_LOG_DIRECTORY" "$RCLONE_REMOTE/log"
if [[ "$(stat -t "$TPI_LOG_DIRECTORY")" != "$TPI_LOG_DIRECTORY_HASH" ]]; then
TPI_LOG_DIRECTORY_HASH="$(md5sum "$TPI_LOG_DIRECTORY"/*)"
rclone copy "$TPI_LOG_DIRECTORY" "$RCLONE_REMOTE/reports"
fi
done &
while sleep 10; do
rclone copy /tmp/tpi-task "$RCLONE_REMOTE/data"
if [[ "$(stat -t "$TPI_DATA_DIRECTORY")" != "$TPI_DATA_DIRECTORY_EPOCH" ]]; then
TPI_DATA_DIRECTORY_EPOCH="$(find "$TPI_DATA_DIRECTORY" -printf "%%T@\n" | sort | tail -1)"
rclone copy "$TPI_DATA_DIRECTORY" "$RCLONE_REMOTE/data"
fi
done &
`,
base64.StdEncoding.EncodeToString([]byte(script)),
Expand Down
41 changes: 38 additions & 3 deletions task/common/machine/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package machine
import (
"bytes"
"context"
"encoding/json"
"io"
"path/filepath"
"strings"
Expand All @@ -14,23 +15,31 @@ import (

"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/sync"

"terraform-provider-iterative/task/common"
)

func Logs(ctx context.Context, remote string) ([]string, error) {
type StatusReport struct {
Result string
Status string
Code string
}

func Reports(ctx context.Context, remote, prefix string) ([]string, error) {
remoteFileSystem, err := fs.NewFs(ctx, remote)
if err != nil {
return nil, err
}

entries, err := remoteFileSystem.List(ctx, "/log")
entries, err := remoteFileSystem.List(ctx, "/reports")
if err != nil {
return nil, err
}

var logs []string
for _, entry := range entries {
path := entry.Remote()
if base := filepath.Base(path); !strings.HasPrefix(base, "task-") {
if base := filepath.Base(path); !strings.HasPrefix(base, prefix+"-") {
continue
}

Expand All @@ -53,6 +62,32 @@ func Logs(ctx context.Context, remote string) ([]string, error) {
return logs, nil
}

func Logs(ctx context.Context, remote string) ([]string, error) {
return Reports(ctx, remote, "task")
}

func Status(ctx context.Context, remote string, initialStatus common.Status) (common.Status, error) {
reports, err := Reports(ctx, remote, "status")
if err != nil {
return initialStatus, err
}

for _, report := range reports {
var statusReport StatusReport
if err := json.Unmarshal([]byte(report), &statusReport); err != nil {
return initialStatus, err
}
if statusReport.Code != "" {
if statusReport.Code == "0" {
initialStatus[common.StatusCodeSucceeded] += 1
} else {
initialStatus[common.StatusCodeFailed] += 1
}
}
}
return initialStatus, nil
}

func Transfer(ctx context.Context, source, destination string) error {
sourceFileSystem, err := fs.NewFs(ctx, source)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion task/common/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type Status map[StatusCode]int
type StatusCode string

const (
StatusCodeRunning StatusCode = "running"
StatusCodeActive StatusCode = "running"
StatusCodeSucceeded StatusCode = "succeeded"
StatusCodeFailed StatusCode = "failed"
)

type Size struct {
Expand Down
5 changes: 2 additions & 3 deletions task/gcp/resources/resource_instance_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ func (i *InstanceGroupManager) Read(ctx context.Context) error {
}

i.Attributes.Addresses = []net.IP{}
i.Attributes.Status = common.Status{common.StatusCodeRunning: 0}
i.Attributes.Status = common.Status{common.StatusCodeActive: 0}
for _, groupInstance := range groupInstances.Items {
i.Attributes.Status[common.StatusCode(groupInstance.Status)]++
if groupInstance.Status == "RUNNING" {
instance, err := i.Client.Services.Compute.Instances.Get(i.Client.Credentials.ProjectID, i.Client.Region, filepath.Base(groupInstance.Instance)).Do()
if err != nil {
Expand All @@ -86,7 +85,7 @@ func (i *InstanceGroupManager) Read(ctx context.Context) error {
if address := net.ParseIP(instance.NetworkInterfaces[0].AccessConfigs[0].NatIP); address != nil {
i.Attributes.Addresses = append(i.Attributes.Addresses, address)
}
i.Attributes.Status[common.StatusCodeRunning]++
i.Attributes.Status[common.StatusCodeActive]++
}
}

Expand Down
8 changes: 6 additions & 2 deletions task/gcp/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,12 @@ func (t *Task) Events(ctx context.Context) []common.Event {
return t.Attributes.Events
}

func (t *Task) Status(ctx context.Context) common.Status {
return t.Attributes.Status
func (t *Task) Status(ctx context.Context) (common.Status, error) {
if err := t.Read(ctx); err != nil {
return nil, err
}

return machine.Status(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"], t.Attributes.Status)
}

func (t *Task) GetKeyPair(ctx context.Context) (*ssh.DeterministicSSHKeyPair, error) {
Expand Down
7 changes: 3 additions & 4 deletions task/k8s/resources/resource_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,9 @@ func (j *Job) Read(ctx context.Context) error {
})
}
j.Attributes.Status = common.Status{
common.StatusCodeRunning: int(job.Status.Active),
common.StatusCode("active"): int(job.Status.Active),
common.StatusCode("succeeded"): int(job.Status.Succeeded),
common.StatusCode("failed"): int(job.Status.Failed),
common.StatusCodeActive: int(job.Status.Active),
common.StatusCodeSucceeded: int(job.Status.Succeeded),
common.StatusCodeFailed: int(job.Status.Failed),
}
j.Resource = job
return nil
Expand Down
4 changes: 2 additions & 2 deletions task/k8s/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ func (t *Task) Pull(ctx context.Context, destination string) error {
return nil
}

func (t *Task) Status(ctx context.Context) common.Status {
return t.Attributes.Status
func (t *Task) Status(ctx context.Context) (common.Status, error) {
return t.Attributes.Status, nil
}

func (t *Task) Events(ctx context.Context) []common.Event {
Expand Down
2 changes: 1 addition & 1 deletion task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Task interface {
Push(ctx context.Context, source string) error
Pull(ctx context.Context, destination string) error

Status(ctx context.Context) common.Status
Status(ctx context.Context) (common.Status, error)
Events(ctx context.Context) []common.Event
Logs(ctx context.Context) ([]string, error)

Expand Down
18 changes: 12 additions & 6 deletions task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,23 @@ func TestTask(t *testing.T) {
require.Nil(t, newTask.Stop(ctx))
require.Nil(t, newTask.Stop(ctx))

for assert.Nil(t, newTask.Read(ctx)) &&
newTask.Status(ctx)[common.StatusCodeRunning] > 0 {
continue
for assert.Nil(t, newTask.Read(ctx)) {
status, err := newTask.Status(ctx)
require.Nil(t, err)
if status[common.StatusCodeActive] > 0 {
break
}
}

require.Nil(t, newTask.Start(ctx))
require.Nil(t, newTask.Start(ctx))

for assert.Nil(t, newTask.Read(ctx)) &&
newTask.Status(ctx)[common.StatusCodeRunning] == 0 {
continue
for assert.Nil(t, newTask.Read(ctx)) {
status, err := newTask.Status(ctx)
require.Nil(t, err)
if status[common.StatusCodeActive] == 0 {
break
}
}
}

Expand Down

0 comments on commit caa45bb

Please sign in to comment.