Skip to content

Commit

Permalink
update gozmq example
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Apr 4, 2024
1 parent 07c330c commit 5effde0
Show file tree
Hide file tree
Showing 7 changed files with 446 additions and 137 deletions.
127 changes: 61 additions & 66 deletions examples/distributed/gozmq/README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/distributed/gozmq/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ rank=${FLUX_TASK_RANK}
# Get the host name
host=$(hostname)
echo "Hello I'm host $host"
go run /code/main.go run --size ${workers} --prefix flux-sample --suffix "flux-service.default.svc.cluster.local" --port 5555 --rank ${rank}
go run /code/main.go run --size ${workers} --prefix flux-sample --suffix "flux-service.default.svc.cluster.local" --port 5555 --rank ${rank} --raw
14 changes: 14 additions & 0 deletions examples/distributed/gozmq/kind-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
- role: worker
- role: worker
- role: worker
- role: worker
264 changes: 197 additions & 67 deletions examples/distributed/gozmq/main.go.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,181 @@ package main
import (
"log"
"os"
"strings"
"sync"

"github.com/akamensky/argparse"
zmq "github.com/pebbe/zmq4"

"fmt"
"math/rand"
"time"
)

func workerTask(toHost, fromHost string) {
// Global identity lookup of rank to socket identity and times
var (
// Lookup of rank to identity
ids = map[string]string{}

// Lookup of identity to rank
ranks = map[string]string{}
)

type Rank struct {
Number int
}

func (r *Rank) String() string {
return fmt.Sprintf("%s", r.Number)
}

// ElapsedTime holds a start, end and elapsed time
type ElapsedTime struct {
StartTime time.Time
EndTime time.Time
}

func (e *ElapsedTime) Start() {
e.StartTime = time.Now()
}
func (e *ElapsedTime) Stop() {
e.EndTime = time.Now()
}

func (e *ElapsedTime) Elapsed() time.Duration {
return e.EndTime.Sub(e.StartTime)
}

func (e *ElapsedTime) StartAsString() string {
return e.StartTime.Format(time.RFC3339Nano)
}

func (e *ElapsedTime) SetEndFromString(raw string) {
restored, err := time.Parse(time.RFC3339Nano, raw)
if err != nil {
log.Fatalf("Cannot convert time %s\n", err)
}
e.EndTime = restored
}

// brokerTask is receiving work (client or DEALER calls)
// and responding.
func brokerTask(
broker *zmq.Socket,
measurements int,
size int,
) {

// The total number of expected interactions we should have is
// the number of other workers * measurements
expected := measurements * (size - 1)
count := 0

// Keep going until we hit expected
for count < expected {
identity, err := broker.Recv(0)
if err != nil {
log.Fatalf("Error", err)
}

// Send back to the specific identity asking for more
// We check that the identity we receive at the worker is the one we sent
broker.Send(identity, zmq.SNDMORE)

// This is the envelope delimiter
// If you look at the string it is empty
broker.Recv(0)

// This is the response from the worker
fromIdentity, err := broker.Recv(0)
if fromIdentity != identity {
log.Fatalf("[broker] received message expecting %s got %s\n", identity, fromIdentity)
}
if err != nil {
log.Fatalf("Error broker receiving message", err)
}

// This is completing the round trip, it tells the worker to start
// the next loop and that this message round is finished (I think)
broker.Send("", zmq.SNDMORE)
broker.Send(fromIdentity, 0)
count += 1
}
}

// workerTask SENDS the message and responds
// raw indicates showing raw results instead of a mean
func workerTask(
fromHost, toHost string,
rank int,
raw bool,
wg *sync.WaitGroup,
measurements int,
) {

// Dealer sockets are the clients
worker, err := zmq.NewSocket(zmq.DEALER)
if err != nil {
log.Fatalf("Error", err)
}
defer worker.Close()
set_id(worker) // Set a printable identity
defer wg.Done()

// Set a printable identity and set for times
// This is a lookup of point to point send times
identity := setIdentifier(worker, rank)
worker.Connect(fmt.Sprintf("tcp://%s", toHost))

total := 0
for {
// The client (dealer) is sending and receiving,
// so we keep track of round trip here.
// I think if we time the broker, the broker can store
// messages in memory so the times are too fast.
// Each rank (fromHost ) keeps track of the times from itself
// to one other host (toHost)
times := []time.Duration{}

// Take m measurements
for m := 0; m < measurements; m++ {

// This is a request for work - I think it would
// encompass two messages
_, err := worker.Send("", zmq.SNDMORE)
if err != nil {
log.Fatalf("Error Send More", err)
}

// Tell the broker we're ready for work
worker.Send("", zmq.SNDMORE)
worker.Send("Ready to serve!", 0)

// Get workload from broker, until finished
worker.Recv(0) // Envelope delimiter
workload, _ := worker.Recv(0)
if workload == "Done" {
fmt.Printf("Completed: from %s to %s: %d tasks\n", fromHost, toHost, total)
break
t := ElapsedTime{}
t.Start()

// We send back the worker rank (based on identity) to check
// against the original identity sent to
_, err = worker.Send(identity, 0)
if err != nil {
log.Fatalf("Error Send Message", err)
}

_, err = worker.Recv(0)
if err != nil {
log.Fatalf("Error Receiving Envelope", err)
}
total++
receivedMessage, err := worker.Recv(0)

// Do some random work
time.Sleep(time.Duration(rand.Intn(500)+1) * time.Millisecond)
// This is thd end of the round trip
t.Stop()

if err != nil {
log.Fatalf("Error", err)
}

times = append(times, t.Elapsed())
if receivedMessage != identity {
log.Fatalf("[worker] received message expecting %s got %s\n", identity, receivedMessage)
}
}
if raw {
fmt.Printf(" ⭐️ Times for %d messages %s to %s: %s\n", measurements, fromHost, toHost, times)
} else {
meanTime := calculateMean(times)
fmt.Printf(" ⭐️ Mean times for %d messages %s to %s: %s\n", measurements, fromHost, toHost, meanTime)
}
}

Expand All @@ -49,7 +188,9 @@ func main() {
prefix := runCmd.String("p", "prefix", &argparse.Options{Help: "Hostname prefix (e.g., flux-sample)"})
size := runCmd.Int("s", "size", &argparse.Options{Help: "Number of hosts (count starts at 0)"})
rank := runCmd.Int("r", "rank", &argparse.Options{Help: "Rank of this host"})
tasks := runCmd.Int("t", "tasks", &argparse.Options{Help: "Number of tasks (workers) per node", Default: 1})

// This should only be set to 1 for this example
raw := runCmd.Flag("", "raw", &argparse.Options{Help: "Output raw times instead of mean", Default: false})
measurements := runCmd.Int("m", "measurements", &argparse.Options{Help: "Number of measurements to take (to average over)", Default: 10})
suffix := runCmd.String("", "suffix", &argparse.Options{Help: "Hostname suffix (e.g. .flux-service.default.svc.cluster.local)"})
port := runCmd.String("", "port", &argparse.Options{Help: "Port to use", Default: "5671"})
Expand All @@ -65,6 +206,9 @@ func main() {
// Start the broker on the host
thisHost := fmt.Sprintf("%s-%d.%s:%s", *prefix, *rank, *suffix, *port)

// This is the broker that will be a router on the rank it is running on
// We will ask the worker for a message, and then keep track of the
// round trip time
broker, err := zmq.NewSocket(zmq.ROUTER)
if err != nil {
log.Fatalf("Error", err)
Expand All @@ -74,74 +218,60 @@ func main() {
brokerHost := fmt.Sprintf("tcp://*:%s", *port)
broker.Bind(brokerHost)

// Run a client task for each host
// This will ensure the clients finish, and brokers as well
var wg sync.WaitGroup

// Step 1: launch all the worker tasks!
// We run a client task (worker) to send a message to every other host
// The workers are going to be the main driver to run some number of measurements
for i := 0; i < *size; i++ {

// Don't send to self?
// Don't send to self
if i == *rank {
//row[i+1] = fmt.Sprintf("0")
continue
}

host := fmt.Sprintf("%s-%d.%s:%s", *prefix, i, *suffix, *port)

// Note that we can run more than one worker task here,
// I'm choosing one to mimic(?) point to point (maybe)?
for w := 0; w < *tasks; w++ {
go workerTask(host, thisHost)
}
// We should only have one worker here for a point to point test
// This worker is from thisHost TO the other rank, which should
// also be running a broker. It will perform some number of
// tasks until it receives a Done message
wg.Add(1)
go workerTask(thisHost, host, i, *raw, &wg, *measurements)
}

// Keep matrix of elapsed times
times := make([]time.Duration, *measurements)
for m := 0; m < *measurements; m++ {

// Next message gives us least recently used worker
identity, err := broker.Recv(0)
if err != nil {
log.Fatalf("Error", err)
}
start := time.Now()
broker.Send(identity, zmq.SNDMORE)

// This is the envelope delimiter
broker.Recv(0)

// This is the response from the worker
// This is the round trip time
broker.Recv(0)
end := time.Now()
elapsed := end.Sub(start)

// Add the entry to our matrix
times[m] = elapsed
broker.Send("", zmq.SNDMORE)

// Workers need to keep going until experiment done
broker.Send("Keep going", 0)
}
// Step 2: Kick off workers to receive them. Keep going
// until both it's received all the expected pings (from other workers)
// AND our own workers are done.
wg.Add(1)
go brokerTask(broker, *measurements, *size)

// Tell the worker it's done
toHostPrefix := strings.Split(host, ".")
fromHostPrefix := strings.Split(thisHost, ".")
fmt.Printf(" ⭐️ Times for %s to %s: %s\n", fromHostPrefix[0], toHostPrefix[0], times)
}
// Wait for all workers to finish, and then for all brokers
// to have the number of interactions they expect
wg.Wait()

// Give some time for everyone to finish
time.Sleep(time.Second * 10)
broker.Send("Done", 0)
}
}

// calculateMean calculates the mean duration
// TODO get this working, units are weird
func calculateMean(times []time.Duration) time.Duration {
total := time.Duration(0)
total := time.Duration(0) * time.Nanosecond
for _, t := range times {
total += t
}
return (total / time.Duration(len(times))) * time.Nanosecond
return (total / time.Duration(len(times)) * time.Nanosecond)
}

// getIdentifier for a rank
func getIdentifier(rank int) string {
return fmt.Sprintf("rank-%d", rank)
}

func set_id(soc *zmq.Socket) {
identity := fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000))
// getIdentifier for a rank
func setIdentifier(soc *zmq.Socket, rank int) string {
identity := getIdentifier(rank)
soc.SetIdentity(identity)
return identity
}
Loading

0 comments on commit 5effde0

Please sign in to comment.