-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #224 from flux-framework/add-gomq-example
example: gozmq for pair to pair
- Loading branch information
Showing
9 changed files
with
377 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
FROM golang:latest | ||
WORKDIR /code | ||
RUN apt-get update && apt-get install -y wget && \ | ||
echo "deb http://download.opensuse.org/repositories/network:/messaging:/zeromq:/release-stable/Debian_9.0/ ./" >> /etc/apt/sources.list && \ | ||
wget https://download.opensuse.org/repositories/network:/messaging:/zeromq:/release-stable/Debian_9.0/Release.key -O- | apt-key add && \ | ||
apt-get install -y libzmq3-dev | ||
|
||
COPY entrypoint.sh go.mod ./ | ||
COPY main.go.txt ./main.go | ||
RUN go mod tidy && go mod vendor | ||
ENTRYPOINT ["/entrypoint.sh"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
# ZeroMQ in Go Examples | ||
|
||
Note that we are going to try to use the [DEALER to ROUTER](https://zguide.zeromq.org/docs/chapter3/#The-DEALER-to-ROUTER-Combination) design here. | ||
|
||
Create the kind cluster. | ||
|
||
```bash | ||
kind create cluster --config ../../kind-config.yaml | ||
``` | ||
|
||
Install the flux operator | ||
|
||
```bash | ||
kubectl apply -f ../../dist/flux-operator.yaml | ||
``` | ||
|
||
## Local Test | ||
|
||
You can automate the entire thing: | ||
|
||
```bash | ||
./build.sh | ||
``` | ||
|
||
And then get logs: | ||
|
||
```console | ||
Hello I'm host flux-sample-1 | ||
Hello I'm host flux-sample-0 | ||
⭐️ Times for flux-sample-0 to flux-sample-1: [5.279µs 15.12µs 66.039µs 27.815µs 11.285µs 15.554µs 8.282µs 7.801µs 3.895µs 4.585µs] | ||
⭐️ Times for flux-sample-1 to flux-sample-0: [10.757µs 18.31µs 7.088µs 12.65µs 7.853µs 4.582µs 3.825µs 16.614µs 48.301µs 9.307µs] | ||
``` | ||
|
||
You can look at [build.sh](build.sh) for the build steps, and [entrypoint.sh](entrypoint.sh) for the start command, | ||
and [main.go](main.go) for the defaults and logic. | ||
|
||
## Google Cloud | ||
|
||
The times are probably fast since we are running on the same machine. Let's test on actual physical nodes on Google Cloud. | ||
|
||
```bash | ||
GOOGLE_PROJECT=myproject | ||
gcloud container clusters create test-cluster \ | ||
--threads-per-core=1 \ | ||
--placement-type=COMPACT \ | ||
--num-nodes=8 \ | ||
--region=us-central1-a \ | ||
--project=${GOOGLE_PROJECT} \ | ||
--machine-type=c2d-standard-8 | ||
``` | ||
|
||
Install the flux operator. | ||
|
||
```bash | ||
kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml | ||
``` | ||
|
||
We will build a container that the minicluster can actually pull. | ||
|
||
```bash | ||
docker build -t vanessa/gozmq:0 . | ||
docker push vanessa/gozmq:0 | ||
``` | ||
|
||
And then apply the GKE minicluster. | ||
|
||
```bash | ||
kubectl apply -f minicluster-gke.yaml | ||
kubectl logs flux-sample-0-2prsv -f | ||
``` | ||
```console | ||
Hello I'm host flux-sample-0 | ||
Hello I'm host flux-sample-5 | ||
Hello I'm host flux-sample-6 | ||
Hello I'm host flux-sample-3 | ||
Hello I'm host flux-sample-2 | ||
Hello I'm host flux-sample-1 | ||
Hello I'm host flux-sample-4 | ||
Hello I'm host flux-sample-7 | ||
⭐️ Times for flux-sample-0 to flux-sample-1: [3.01µs 2.8µs 3.82µs 2.72µs 3.21µs 2.6µs 2.6µs 3.3µs 3.37µs 3µs] | ||
⭐️ Times for flux-sample-0 to flux-sample-2: [3.03µs 3.01µs 3.04µs 2.711µs 3.02µs 3.331µs 2.98µs 2.5µs 2.33µs 3.56µs] | ||
⭐️ Times for flux-sample-0 to flux-sample-3: [2.37µs 3.37µs 2.44µs 2.4µs 3.43µs 2.65µs 2.6µs 2.94µs 2.3µs 3.35µs] | ||
⭐️ Times for flux-sample-0 to flux-sample-4: [3.16µs 2.55µs 8.07µs 2.78µs 2.55µs 2.95µs 2.54µs 2.48µs 2.84µs 3.13µs] | ||
⭐️ Times for flux-sample-0 to flux-sample-5: [3.74µs 2.42µs 2.92µs 3.33µs 3.36µs 4.41µs 3.41µs 5.29µs 3.09µs 2.95µs] | ||
⭐️ Times for flux-sample-0 to flux-sample-6: [2.67µs 3.07µs 3.1µs 2.66µs 3.78µs 2.011µs 3.21µs 2.72µs 6.44µs 3.9µs] | ||
⭐️ Times for flux-sample-0 to flux-sample-7: [4.46µs 9.2µs 3.11µs 8.29µs 3.54µs 3.15µs 3.43µs 2.48µs 2.37µs 2.75µs] | ||
⭐️ Times for flux-sample-3 to flux-sample-0: [3.42µs 3.89µs 3.111µs 3.6µs 3.03µs 3.36µs 3.02µs 3.09µs 4.31µs 4.42µs] | ||
⭐️ Times for flux-sample-1 to flux-sample-0: [5.649µs 3.15µs 3.18µs 3.86µs 4.18µs 3.19µs 4.19µs 3.31µs 4.159µs 3.74µs] | ||
⭐️ Times for flux-sample-2 to flux-sample-0: [4.05µs 3.35µs 5.8µs 4.5µs 2.89µs 3.22µs 3.92µs 3.1µs 4.26µs 4.46µs] | ||
⭐️ Times for flux-sample-6 to flux-sample-0: [3.02µs 2.91µs 3.66µs 3.19µs 2.77µs 2.89µs 4.02µs 4.969µs 3.08µs 3.04µs] | ||
⭐️ Times for flux-sample-4 to flux-sample-0: [3.3µs 4.12µs 3.429µs 3.78µs 3.24µs 5.89µs 3.52µs 3.26µs 6.6µs 4.88µs] | ||
⭐️ Times for flux-sample-1 to flux-sample-2: [5.96µs 9.9µs 3.82µs 3.011µs 3µs 3.68µs 3.18µs 2.77µs 5.73µs 4.24µs] | ||
⭐️ Times for flux-sample-5 to flux-sample-0: [3.44µs 2.84µs 3.43µs 3.88µs 4.14µs 3.73µs 4.34µs 5.431µs 4.329µs 3.02µs] | ||
⭐️ Times for flux-sample-7 to flux-sample-0: [2.95µs 3.269µs 2.88µs 3.14µs 2.491µs 3.84µs 2.86µs 3.529µs 2.9µs 2.93µs] | ||
⭐️ Times for flux-sample-1 to flux-sample-3: [2.94µs 3.04µs 4.31µs 3.81µs 2.78µs 3.33µs 3.28µs 2.99µs 2.78µs 10.02µs] | ||
⭐️ Times for flux-sample-1 to flux-sample-4: [2.66µs 14.87µs 3.04µs 3.14µs 2.98µs 2.7µs 2.7µs 2.86µs 2.55µs 3.61µs] | ||
⭐️ Times for flux-sample-1 to flux-sample-5: [3.21µs 20.709µs 3.52µs 3.45µs 3.431µs 3.12µs 2.57µs 3.31µs 3.15µs 2.55µs] | ||
⭐️ Times for flux-sample-3 to flux-sample-1: [9.88µs 5.01µs 4µs 4.65µs 3.471µs 3.22µs 3.29µs 3.25µs 3.11µs 4.04µs] | ||
⭐️ Times for flux-sample-2 to flux-sample-1: [3.99µs 2.78µs 4.02µs 3.62µs 3.39µs 3.42µs 3.67µs 4.26µs 3.08µs 2.91µs] | ||
⭐️ Times for flux-sample-1 to flux-sample-6: [2.87µs 3.13µs 3.33µs 2.89µs 3.15µs 2.53µs 3.06µs 3.1µs 11.16µs 3.05µs] | ||
⭐️ Times for flux-sample-5 to flux-sample-1: [3.64µs 7.3µs 3.81µs 16.24µs 3.86µs 3.42µs 2.96µs 3.19µs 3.85µs 3.03µs] | ||
⭐️ Times for flux-sample-2 to flux-sample-3: [3.78µs 2.87µs 3.54µs 3.43µs 2.9µs 9.64µs 2.38µs 2.9µs 2.58µs 2.58µs] | ||
⭐️ Times for flux-sample-4 to flux-sample-1: [3.29µs 6.68µs 3.85µs 4.15µs 6.09µs 3.8µs 3.14µs 3.02µs 4.05µs 3.57µs] | ||
⭐️ Times for flux-sample-1 to flux-sample-7: [3.13µs 3.04µs 4.04µs 3.19µs 2.99µs 2.611µs 2.94µs 4.34µs 3.82µs 9.65µs] | ||
⭐️ Times for flux-sample-7 to flux-sample-1: [3.009µs 3.4µs 3.26µs 3.91µs 2.85µs 2.84µs 3.151µs 3.16µs 3.58µs 3.21µs] | ||
⭐️ Times for flux-sample-6 to flux-sample-1: [3.43µs 4.65µs 3.38µs 3.02µs 2.95µs 3.03µs 3.08µs 3.071µs 3µs 2.71µs] | ||
⭐️ Times for flux-sample-3 to flux-sample-2: [3.109µs 3.68µs 4.38µs 3.309µs 3.27µs 3.04µs 2.54µs 3µs 3.16µs 4.08µs] | ||
⭐️ Times for flux-sample-2 to flux-sample-4: [2.75µs 2.99µs 3.25µs 2.25µs 2.81µs 3.24µs 5.2µs 4.08µs 4.14µs 2.94µs] | ||
⭐️ Times for flux-sample-4 to flux-sample-2: [4.169µs 3.32µs 3.09µs 3.26µs 3.28µs 3.8µs 13.37µs 2.41µs 3.6µs 2.94µs] | ||
⭐️ Times for flux-sample-2 to flux-sample-5: [3.28µs 3.16µs 2.46µs 2.88µs 2.9µs 3.3µs 4µs 3.24µs 3.49µs 2.87µs] | ||
⭐️ Times for flux-sample-5 to flux-sample-2: [3.66µs 2.689µs 2.96µs 4.02µs 3.02µs 21.68µs 2.66µs 3.27µs 3.209µs 3.16µs] | ||
⭐️ Times for flux-sample-3 to flux-sample-4: [3.77µs 2.67µs 3µs 2.74µs 3.75µs 2.76µs 3.77µs 3.09µs 2.92µs 5.22µs] | ||
⭐️ Times for flux-sample-2 to flux-sample-6: [3.01µs 3.19µs 2.88µs 2.73µs 3.1µs 3.829µs 2.57µs 4.54µs 4.25µs 3.78µs] | ||
⭐️ Times for flux-sample-4 to flux-sample-3: [3.87µs 3.66µs 3.2µs 10.11µs 3.54µs 2.95µs 2.31µs 2.93µs 2.5µs 3.76µs] | ||
⭐️ Times for flux-sample-5 to flux-sample-3: [3µs 3.18µs 2.44µs 3.03µs 3.02µs 6.33µs 3.389µs 3.18µs 2.82µs 3.2µs] | ||
⭐️ Times for flux-sample-6 to flux-sample-2: [3.06µs 3.53µs 3.47µs 2.85µs 2.62µs 3.07µs 3.07µs 2.93µs 2.689µs 3.18µs] | ||
⭐️ Times for flux-sample-3 to flux-sample-5: [3.12µs 4.04µs 3.509µs 3.15µs 3.1µs 2.98µs 2.77µs 2.71µs 3.11µs 2.869µs] | ||
⭐️ Times for flux-sample-7 to flux-sample-2: [2.63µs 3.47µs 3.08µs 12.26µs 3.15µs 2.82µs 2.891µs 5.41µs 3.26µs 2.98µs] | ||
⭐️ Times for flux-sample-2 to flux-sample-7: [4.45µs 3.42µs 2.56µs 4.74µs 3.75µs 3.14µs 1.78µs 3.39µs 3.37µs 3.24µs] | ||
⭐️ Times for flux-sample-5 to flux-sample-4: [3.7µs 3.02µs 3.23µs 4.611µs 3.22µs 4.38µs 3.25µs 2.47µs 7.02µs 3.29µs] | ||
⭐️ Times for flux-sample-3 to flux-sample-6: [2.99µs 3.1µs 3.08µs 3.07µs 2.81µs 3.92µs 2.97µs 3.8µs 2.47µs 2.77µs] | ||
⭐️ Times for flux-sample-4 to flux-sample-5: [2.99µs 6.609µs 3.59µs 3.591µs 3.43µs 3.31µs 3.16µs 3.611µs 3.6µs 2.94µs] | ||
⭐️ Times for flux-sample-6 to flux-sample-3: [2.6µs 2.93µs 3.08µs 3.02µs 2.78µs 6.36µs 3.02µs 2.79µs 3.089µs 3.26µs] | ||
⭐️ Times for flux-sample-3 to flux-sample-7: [2.86µs 3.529µs 3.41µs 2.8µs 2.91µs 2.78µs 2.43µs 3.08µs 5.46µs 2.84µs] | ||
⭐️ Times for flux-sample-5 to flux-sample-6: [2.41µs 2.28µs 4.59µs 3.27µs 13.72µs 3.54µs 2.79µs 3.73µs 4.37µs 2.651µs] | ||
⭐️ Times for flux-sample-7 to flux-sample-3: [3.8µs 2.98µs 2.73µs 5.53µs 6.36µs 3.5µs 3.22µs 2.62µs 3.04µs 3.07µs] | ||
⭐️ Times for flux-sample-4 to flux-sample-6: [2.83µs 3.13µs 2.95µs 3.29µs 2.66µs 2.65µs 3.7µs 3.07µs 2.71µs 2.31µs] | ||
⭐️ Times for flux-sample-6 to flux-sample-4: [7.01µs 3µs 3.02µs 2.81µs 2.99µs 2.95µs 3.43µs 3.17µs 2.991µs 2.81µs] | ||
⭐️ Times for flux-sample-4 to flux-sample-7: [3.83µs 2.65µs 3.76µs 3.09µs 3.6µs 3.29µs 2.38µs 3.66µs 3.35µs 2.97µs] | ||
⭐️ Times for flux-sample-5 to flux-sample-7: [3.58µs 4.73µs 4.58µs 3.16µs 3.21µs 4.66µs 4.55µs 2.42µs 3.16µs 3.4µs] | ||
⭐️ Times for flux-sample-6 to flux-sample-5: [2.85µs 3.44µs 2.48µs 2.4µs 2.88µs 2.6µs 2.98µs 9.98µs 2.96µs 3.08µs] | ||
⭐️ Times for flux-sample-7 to flux-sample-4: [2.87µs 2.57µs 2.531µs 3.53µs 3.14µs 2.56µs 2.58µs 2.74µs 2.39µs 3.16µs] | ||
⭐️ Times for flux-sample-6 to flux-sample-7: [2.96µs 2.86µs 3.01µs 2.89µs 3.02µs 2.4µs 2.911µs 2.81µs 2.86µs 2.96µs] | ||
⭐️ Times for flux-sample-7 to flux-sample-5: [2.88µs 2.6µs 2.88µs 2.68µs 2.48µs 3.81µs 3.27µs 14.84µs 3.66µs 3.13µs] | ||
⭐️ Times for flux-sample-7 to flux-sample-6: [3.41µs 2.64µs 3.11µs 3.23µs 3.09µs 3.1µs 2.65µs 3.689µs 3.13µs 3.83µs] | ||
``` | ||
|
||
Yes, they are running on different physical nodes: | ||
|
||
```bash | ||
$ kubectl get pods -o wide | ||
``` | ||
```console | ||
NAME READY STATUS RESTARTS AGE IP NODE | ||
flux-sample-0-tzc6n 0/1 Completed 0 114s 10.64.4.4 gke-test-cluster-default-pool-1bf80ee1-x73j | ||
flux-sample-1-74hg7 0/1 Completed 0 114s 10.64.6.4 gke-test-cluster-default-pool-1bf80ee1-6tv6 | ||
flux-sample-2-4mxf8 0/1 Completed 0 114s 10.64.2.4 gke-test-cluster-default-pool-1bf80ee1-9xst | ||
flux-sample-3-m4ks9 0/1 Completed 0 113s 10.64.5.9 gke-test-cluster-default-pool-1bf80ee1-676j | ||
flux-sample-4-grstb 0/1 Completed 0 113s 10.64.1.5 gke-test-cluster-default-pool-1bf80ee1-qwrl | ||
flux-sample-5-8djxs 0/1 Completed 0 113s 10.64.3.4 gke-test-cluster-default-pool-1bf80ee1-6jng | ||
flux-sample-6-67fdr 0/1 Completed 0 113s 10.64.7.5 gke-test-cluster-default-pool-1bf80ee1-h9sl | ||
flux-sample-7-7w2ds 0/1 Completed 0 113s 10.64.0.16 gke-test-cluster-default-pool-1bf80ee1-n0pp | ||
``` | ||
|
||
That should be a matrix size of times minus the diagonal (the process to itself, which we don't measure). | ||
When you are done, clean up: | ||
|
||
```bash | ||
gcloud container clusters delete test-cluster --region us-central1-a | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
#!/bin/bash | ||
|
||
docker build -t gozmq . | ||
kind load docker-image gozmq | ||
kubectl delete -f minicluster.yaml | ||
kubectl apply -f minicluster.yaml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
#!/bin/bash | ||
|
||
workers="${1:-2}" | ||
rank=${FLUX_TASK_RANK} | ||
|
||
# Get the host name | ||
host=$(hostname) | ||
echo "Hello I'm host $host" | ||
go run /code/main.go run --size ${workers} --prefix flux-sample --suffix "flux-service.default.svc.cluster.local" --port 5555 --rank ${rank} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
module gozmq | ||
|
||
go 1.20 | ||
|
||
require github.com/pebbe/zmq4 v1.2.11 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q= | ||
github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
package main | ||
|
||
import ( | ||
"log" | ||
"os" | ||
"strings" | ||
|
||
"github.com/akamensky/argparse" | ||
zmq "github.com/pebbe/zmq4" | ||
|
||
"fmt" | ||
"math/rand" | ||
"time" | ||
) | ||
|
||
func workerTask(toHost, fromHost string) { | ||
worker, err := zmq.NewSocket(zmq.DEALER) | ||
if err != nil { | ||
log.Fatalf("Error", err) | ||
} | ||
defer worker.Close() | ||
set_id(worker) // Set a printable identity | ||
worker.Connect(fmt.Sprintf("tcp://%s", toHost)) | ||
|
||
total := 0 | ||
for { | ||
// Tell the broker we're ready for work | ||
worker.Send("", zmq.SNDMORE) | ||
worker.Send("Ready to serve!", 0) | ||
|
||
// Get workload from broker, until finished | ||
worker.Recv(0) // Envelope delimiter | ||
workload, _ := worker.Recv(0) | ||
if workload == "Done" { | ||
fmt.Printf("Completed: from %s to %s: %d tasks\n", fromHost, toHost, total) | ||
break | ||
} | ||
total++ | ||
|
||
// Do some random work | ||
time.Sleep(time.Duration(rand.Intn(500)+1) * time.Millisecond) | ||
} | ||
} | ||
|
||
func main() { | ||
|
||
parser := argparse.NewParser("gozmq", "Playing with ZeroMQ in Go") | ||
runCmd := parser.NewCommand("run", "Run the example") | ||
prefix := runCmd.String("p", "prefix", &argparse.Options{Help: "Hostname prefix (e.g., flux-sample)"}) | ||
size := runCmd.Int("s", "size", &argparse.Options{Help: "Number of hosts (count starts at 0)"}) | ||
rank := runCmd.Int("r", "rank", &argparse.Options{Help: "Rank of this host"}) | ||
tasks := runCmd.Int("t", "tasks", &argparse.Options{Help: "Number of tasks (workers) per node", Default: 1}) | ||
measurements := runCmd.Int("m", "measurements", &argparse.Options{Help: "Number of measurements to take (to average over)", Default: 10}) | ||
suffix := runCmd.String("", "suffix", &argparse.Options{Help: "Hostname suffix (e.g. .flux-service.default.svc.cluster.local)"}) | ||
port := runCmd.String("", "port", &argparse.Options{Help: "Port to use", Default: "5671"}) | ||
|
||
err := parser.Parse(os.Args) | ||
if err != nil { | ||
fmt.Println(parser.Usage(err)) | ||
return | ||
} | ||
|
||
if runCmd.Happened() { | ||
|
||
// Start the broker on the host | ||
thisHost := fmt.Sprintf("%s-%d.%s:%s", *prefix, *rank, *suffix, *port) | ||
|
||
broker, err := zmq.NewSocket(zmq.ROUTER) | ||
if err != nil { | ||
log.Fatalf("Error", err) | ||
} | ||
defer broker.Close() | ||
|
||
brokerHost := fmt.Sprintf("tcp://*:%s", *port) | ||
broker.Bind(brokerHost) | ||
|
||
// Run a client task for each host | ||
for i := 0; i < *size; i++ { | ||
|
||
// Don't send to self? | ||
if i == *rank { | ||
continue | ||
} | ||
|
||
host := fmt.Sprintf("%s-%d.%s:%s", *prefix, i, *suffix, *port) | ||
|
||
// Note that we can run more than one worker task here, | ||
// I'm choosing one to mimic(?) point to point (maybe)? | ||
for w := 0; w < *tasks; w++ { | ||
go workerTask(host, thisHost) | ||
} | ||
|
||
// Keep matrix of elapsed times | ||
times := make([]time.Duration, *measurements) | ||
for m := 0; m < *measurements; m++ { | ||
|
||
// Next message gives us least recently used worker | ||
identity, err := broker.Recv(0) | ||
if err != nil { | ||
log.Fatalf("Error", err) | ||
} | ||
start := time.Now() | ||
broker.Send(identity, zmq.SNDMORE) | ||
|
||
// This is the envelope delimiter | ||
broker.Recv(0) | ||
|
||
// This is the response from the worker | ||
// This is the round trip time | ||
broker.Recv(0) | ||
end := time.Now() | ||
elapsed := end.Sub(start) | ||
|
||
// Add the entry to our matrix | ||
times[m] = elapsed | ||
broker.Send("", zmq.SNDMORE) | ||
|
||
// Workers need to keep going until experiment done | ||
broker.Send("Keep going", 0) | ||
} | ||
|
||
// Tell the worker it's done | ||
toHostPrefix := strings.Split(host, ".") | ||
fromHostPrefix := strings.Split(thisHost, ".") | ||
fmt.Printf(" ⭐️ Times for %s to %s: %s\n", fromHostPrefix[0], toHostPrefix[0], times) | ||
} | ||
|
||
// Give some time for everyone to finish | ||
time.Sleep(time.Second * 10) | ||
broker.Send("Done", 0) | ||
} | ||
} | ||
|
||
// calculateMean calculates the mean duration | ||
// TODO get this working, units are weird | ||
func calculateMean(times []time.Duration) time.Duration { | ||
total := time.Duration(0) | ||
for _, t := range times { | ||
total += t | ||
} | ||
return (total / time.Duration(len(times))) * time.Nanosecond | ||
} | ||
|
||
func set_id(soc *zmq.Socket) { | ||
identity := fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000)) | ||
soc.SetIdentity(identity) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
apiVersion: flux-framework.org/v1alpha2 | ||
kind: MiniCluster | ||
metadata: | ||
name: flux-sample | ||
spec: | ||
# This will be one task per node | ||
size: 8 | ||
tasks: 8 | ||
logging: | ||
quiet: true | ||
flux: | ||
container: | ||
image: ghcr.io/converged-computing/flux-view-ubuntu:tag-jammy | ||
containers: | ||
- image: vanessa/gozmq:0 | ||
command: /bin/bash /code/entrypoint.sh 8 | ||
# c2d-standard-8 has 4 physical cores | ||
resources: | ||
limits: | ||
cpu: 3 | ||
requests: | ||
cpu: 3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
apiVersion: flux-framework.org/v1alpha2 | ||
kind: MiniCluster | ||
metadata: | ||
name: flux-sample | ||
spec: | ||
size: 2 | ||
tasks: 2 | ||
logging: | ||
quiet: true | ||
flux: | ||
container: | ||
image: ghcr.io/converged-computing/flux-view-ubuntu:tag-jammy | ||
containers: | ||
- image: gozmq | ||
command: /bin/bash /code/entrypoint.sh |