From 5effde0cfef80a0a03540ba2e80a31bfb2cd990c Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 4 Apr 2024 15:05:48 -0600 Subject: [PATCH] update gozmq example Signed-off-by: vsoch --- examples/distributed/gozmq/README.md | 127 ++++----- examples/distributed/gozmq/entrypoint.sh | 2 +- examples/distributed/gozmq/kind-config.yaml | 14 + examples/distributed/gozmq/main.go.txt | 264 +++++++++++++----- examples/distributed/gozmq/main.go.v1 | 169 +++++++++++ .../distributed/gozmq/minicluster-gke.yaml | 1 + examples/distributed/gozmq/minicluster.yaml | 6 +- 7 files changed, 446 insertions(+), 137 deletions(-) create mode 100644 examples/distributed/gozmq/kind-config.yaml create mode 100644 examples/distributed/gozmq/main.go.v1 diff --git a/examples/distributed/gozmq/README.md b/examples/distributed/gozmq/README.md index b53b1065..e4d4f620 100644 --- a/examples/distributed/gozmq/README.md +++ b/examples/distributed/gozmq/README.md @@ -1,11 +1,15 @@ # ZeroMQ in Go Examples +> Pair to Pair with DEALER to ROUTER + Note that we are going to try to use the [DEALER to ROUTER](https://zguide.zeromq.org/docs/chapter3/#The-DEALER-to-ROUTER-Combination) design here. +I implemented this in two ways, the current approach in [main.go.txt](main.go.txt) that builds into the container, and an (opposite) design +in [main.go.v1](main.go.v1). Create the kind cluster. ```bash -kind create cluster --config ../../kind-config.yaml +kind create cluster --config ./kind-config.yaml ``` Install the flux operator @@ -16,7 +20,7 @@ kubectl apply -f ../../dist/flux-operator.yaml ## Local Test -You can automate the entire thing: +You can automate the entire thing. Note that this first example has `--raw` added to the entrypoint to print the raw times. ```bash ./build.sh @@ -25,14 +29,51 @@ You can automate the entire thing: And then get logs: ```console +Defaulted container "flux-sample" out of: flux-sample, flux-view (init) +Hello I'm host flux-sample-0 +Hello I'm host flux-sample-3 +Hello I'm host flux-sample-2 Hello I'm host flux-sample-1 + ⭐️ Times for 10 messages flux-sample-3.flux-service.default.svc.cluster.local:5555 to flux-sample-1.flux-service.default.svc.cluster.local:5555: [2.022144ms 85.646µs 71.651µs 67.255µs 56.497µs 69.762µs 64.737µs 64.719µs 64.475µs 50.268µs] + ⭐️ Times for 10 messages flux-sample-1.flux-service.default.svc.cluster.local:5555 to flux-sample-3.flux-service.default.svc.cluster.local:5555: [187.388415ms 98.255µs 77.949µs 79.8µs 77.135µs 79.047µs 75.378µs 65.81µs 72.987µs 75.554µs] + ⭐️ Times for 10 messages flux-sample-2.flux-service.default.svc.cluster.local:5555 to flux-sample-1.flux-service.default.svc.cluster.local:5555: [1.910568ms 97.721µs 72.368µs 70.992µs 122.607µs 67.052µs 69.333µs 72.959µs 55.936µs 54.03µs] + ⭐️ Times for 10 messages flux-sample-2.flux-service.default.svc.cluster.local:5555 to flux-sample-3.flux-service.default.svc.cluster.local:5555: [2.766664ms 104.953µs 71.581µs 68.432µs 69.385µs 50.691µs 72.501µs 68.229µs 65.689µs 127.542µs] + ⭐️ Times for 10 messages flux-sample-3.flux-service.default.svc.cluster.local:5555 to flux-sample-2.flux-service.default.svc.cluster.local:5555: [312.753192ms 101.437µs 82.906µs 78.605µs 77.598µs 78.379µs 79.073µs 79.202µs 82.471µs 79.849µs] + ⭐️ Times for 10 messages flux-sample-0.flux-service.default.svc.cluster.local:5555 to flux-sample-3.flux-service.default.svc.cluster.local:5555: [1.627328ms 180.601µs 97.693µs 64.498µs 63.371µs 76.179µs 56.385µs 58.304µs 60.835µs 74.159µs] + ⭐️ Times for 10 messages flux-sample-0.flux-service.default.svc.cluster.local:5555 to flux-sample-2.flux-service.default.svc.cluster.local:5555: [1.748117ms 79.71µs 89.488µs 95.987µs 107.764µs 88.422µs 120.027µs 69.187µs 151.28µs 194.752µs] + ⭐️ Times for 10 messages flux-sample-0.flux-service.default.svc.cluster.local:5555 to flux-sample-1.flux-service.default.svc.cluster.local:5555: [2.39201ms 199.99µs 182.462µs 76.48µs 55.165µs 53.682µs 56.092µs 51.916µs 45.052µs 51.173µs] + ⭐️ Times for 10 messages flux-sample-1.flux-service.default.svc.cluster.local:5555 to flux-sample-2.flux-service.default.svc.cluster.local:5555: [532.644951ms 55.401µs 60.862µs 52.268µs 44.262µs 47.538µs 46.047µs 43.696µs 46.358µs 43.545µs] + ⭐️ Times for 10 messages flux-sample-2.flux-service.default.svc.cluster.local:5555 to flux-sample-0.flux-service.default.svc.cluster.local:5555: [127.942516ms 238.537µs 248.172µs 241.167µs 220.343µs 183.445µs 203.348µs 227.755µs 178.029µs 206.332µs] + ⭐️ Times for 10 messages flux-sample-3.flux-service.default.svc.cluster.local:5555 to flux-sample-0.flux-service.default.svc.cluster.local:5555: [451.017582ms 130.585µs 128.069µs 126.802µs 137.073µs 116.022µs 113.394µs 109.567µs 90.688µs 118.078µs] + ⭐️ Times for 10 messages flux-sample-1.flux-service.default.svc.cluster.local:5555 to flux-sample-0.flux-service.default.svc.cluster.local:5555: [661.313903ms 236.399µs 188.52µs 167.761µs 252.685µs 187.253µs 173.82µs 146.113µs 154.547µs 158.902µ +``` + +Note that we have 12 groups of 10 times that represent a matrix minus the diagonal, which would be a node to itself (which we don't record). So the above is 12 groups, which is 4x4 == 16 minus the diagonal of 4. +If you omit `--raw`, you'll get a matrix of mean times (over N=10 measurements). + +```console +Defaulted container "flux-sample" out of: flux-sample, flux-view (init) Hello I'm host flux-sample-0 - ⭐️ Times for flux-sample-0 to flux-sample-1: [5.279µs 15.12µs 66.039µs 27.815µs 11.285µs 15.554µs 8.282µs 7.801µs 3.895µs 4.585µs] - ⭐️ Times for flux-sample-1 to flux-sample-0: [10.757µs 18.31µs 7.088µs 12.65µs 7.853µs 4.582µs 3.825µs 16.614µs 48.301µs 9.307µs] +Hello I'm host flux-sample-2 +Hello I'm host flux-sample-3 +Hello I'm host flux-sample-1 + ⭐️ Mean times for 10 messages flux-sample-1.flux-service.default.svc.cluster.local:5555 to flux-sample-0.flux-service.default.svc.cluster.local:5555: 214.183µs + ⭐️ Mean times for 10 messages flux-sample-0.flux-service.default.svc.cluster.local:5555 to flux-sample-1.flux-service.default.svc.cluster.local:5555: 25.595208ms + ⭐️ Mean times for 10 messages flux-sample-2.flux-service.default.svc.cluster.local:5555 to flux-sample-1.flux-service.default.svc.cluster.local:5555: 324.54µs + ⭐️ Mean times for 10 messages flux-sample-2.flux-service.default.svc.cluster.local:5555 to flux-sample-0.flux-service.default.svc.cluster.local:5555: 324.835µs + ⭐️ Mean times for 10 messages flux-sample-0.flux-service.default.svc.cluster.local:5555 to flux-sample-2.flux-service.default.svc.cluster.local:5555: 32.615343ms + ⭐️ Mean times for 10 messages flux-sample-1.flux-service.default.svc.cluster.local:5555 to flux-sample-2.flux-service.default.svc.cluster.local:5555: 12.847121ms + ⭐️ Mean times for 10 messages flux-sample-3.flux-service.default.svc.cluster.local:5555 to flux-sample-2.flux-service.default.svc.cluster.local:5555: 252.283µs + ⭐️ Mean times for 10 messages flux-sample-3.flux-service.default.svc.cluster.local:5555 to flux-sample-0.flux-service.default.svc.cluster.local:5555: 239.2µs + ⭐️ Mean times for 10 messages flux-sample-3.flux-service.default.svc.cluster.local:5555 to flux-sample-1.flux-service.default.svc.cluster.local:5555: 279.983µs + ⭐️ Mean times for 10 messages flux-sample-2.flux-service.default.svc.cluster.local:5555 to flux-sample-3.flux-service.default.svc.cluster.local:5555: 12.813611ms + ⭐️ Mean times for 10 messages flux-sample-0.flux-service.default.svc.cluster.local:5555 to flux-sample-3.flux-service.default.svc.cluster.local:5555: 49.268451ms + ⭐️ Mean times for 10 messages flux-sample-1.flux-service.default.svc.cluster.local:5555 to flux-sample-3.flux-service.default.svc.cluster.local:5555: 33.067423ms ``` You can look at [build.sh](build.sh) for the build steps, and [entrypoint.sh](entrypoint.sh) for the start command, -and [main.go](main.go) for the defaults and logic. +and [main.go](main.go.txt) for the defaults and logic (we have to rename to .txt so the flux operator doesn't include +in its build). ## Google Cloud @@ -69,70 +110,24 @@ kubectl apply -f minicluster-gke.yaml kubectl logs flux-sample-0-2prsv -f ``` ```console -Hello I'm host flux-sample-0 -Hello I'm host flux-sample-5 -Hello I'm host flux-sample-6 +Defaulted container "flux-sample" out of: flux-sample, flux-view (init) +Hello I'm host flux-sample-7 Hello I'm host flux-sample-3 +Hello I'm host flux-sample-4 +Hello I'm host flux-sample-6 +Hello I'm host flux-sample-5 Hello I'm host flux-sample-2 Hello I'm host flux-sample-1 -Hello I'm host flux-sample-4 -Hello I'm host flux-sample-7 - ⭐️ Times for flux-sample-0 to flux-sample-1: [3.01µs 2.8µs 3.82µs 2.72µs 3.21µs 2.6µs 2.6µs 3.3µs 3.37µs 3µs] - ⭐️ Times for flux-sample-0 to flux-sample-2: [3.03µs 3.01µs 3.04µs 2.711µs 3.02µs 3.331µs 2.98µs 2.5µs 2.33µs 3.56µs] - ⭐️ Times for flux-sample-0 to flux-sample-3: [2.37µs 3.37µs 2.44µs 2.4µs 3.43µs 2.65µs 2.6µs 2.94µs 2.3µs 3.35µs] - ⭐️ Times for flux-sample-0 to flux-sample-4: [3.16µs 2.55µs 8.07µs 2.78µs 2.55µs 2.95µs 2.54µs 2.48µs 2.84µs 3.13µs] - ⭐️ Times for flux-sample-0 to flux-sample-5: [3.74µs 2.42µs 2.92µs 3.33µs 3.36µs 4.41µs 3.41µs 5.29µs 3.09µs 2.95µs] - ⭐️ Times for flux-sample-0 to flux-sample-6: [2.67µs 3.07µs 3.1µs 2.66µs 3.78µs 2.011µs 3.21µs 2.72µs 6.44µs 3.9µs] - ⭐️ Times for flux-sample-0 to flux-sample-7: [4.46µs 9.2µs 3.11µs 8.29µs 3.54µs 3.15µs 3.43µs 2.48µs 2.37µs 2.75µs] - ⭐️ Times for flux-sample-3 to flux-sample-0: [3.42µs 3.89µs 3.111µs 3.6µs 3.03µs 3.36µs 3.02µs 3.09µs 4.31µs 4.42µs] - ⭐️ Times for flux-sample-1 to flux-sample-0: [5.649µs 3.15µs 3.18µs 3.86µs 4.18µs 3.19µs 4.19µs 3.31µs 4.159µs 3.74µs] - ⭐️ Times for flux-sample-2 to flux-sample-0: [4.05µs 3.35µs 5.8µs 4.5µs 2.89µs 3.22µs 3.92µs 3.1µs 4.26µs 4.46µs] - ⭐️ Times for flux-sample-6 to flux-sample-0: [3.02µs 2.91µs 3.66µs 3.19µs 2.77µs 2.89µs 4.02µs 4.969µs 3.08µs 3.04µs] - ⭐️ Times for flux-sample-4 to flux-sample-0: [3.3µs 4.12µs 3.429µs 3.78µs 3.24µs 5.89µs 3.52µs 3.26µs 6.6µs 4.88µs] - ⭐️ Times for flux-sample-1 to flux-sample-2: [5.96µs 9.9µs 3.82µs 3.011µs 3µs 3.68µs 3.18µs 2.77µs 5.73µs 4.24µs] - ⭐️ Times for flux-sample-5 to flux-sample-0: [3.44µs 2.84µs 3.43µs 3.88µs 4.14µs 3.73µs 4.34µs 5.431µs 4.329µs 3.02µs] - ⭐️ Times for flux-sample-7 to flux-sample-0: [2.95µs 3.269µs 2.88µs 3.14µs 2.491µs 3.84µs 2.86µs 3.529µs 2.9µs 2.93µs] - ⭐️ Times for flux-sample-1 to flux-sample-3: [2.94µs 3.04µs 4.31µs 3.81µs 2.78µs 3.33µs 3.28µs 2.99µs 2.78µs 10.02µs] - ⭐️ Times for flux-sample-1 to flux-sample-4: [2.66µs 14.87µs 3.04µs 3.14µs 2.98µs 2.7µs 2.7µs 2.86µs 2.55µs 3.61µs] - ⭐️ Times for flux-sample-1 to flux-sample-5: [3.21µs 20.709µs 3.52µs 3.45µs 3.431µs 3.12µs 2.57µs 3.31µs 3.15µs 2.55µs] - ⭐️ Times for flux-sample-3 to flux-sample-1: [9.88µs 5.01µs 4µs 4.65µs 3.471µs 3.22µs 3.29µs 3.25µs 3.11µs 4.04µs] - ⭐️ Times for flux-sample-2 to flux-sample-1: [3.99µs 2.78µs 4.02µs 3.62µs 3.39µs 3.42µs 3.67µs 4.26µs 3.08µs 2.91µs] - ⭐️ Times for flux-sample-1 to flux-sample-6: [2.87µs 3.13µs 3.33µs 2.89µs 3.15µs 2.53µs 3.06µs 3.1µs 11.16µs 3.05µs] - ⭐️ Times for flux-sample-5 to flux-sample-1: [3.64µs 7.3µs 3.81µs 16.24µs 3.86µs 3.42µs 2.96µs 3.19µs 3.85µs 3.03µs] - ⭐️ Times for flux-sample-2 to flux-sample-3: [3.78µs 2.87µs 3.54µs 3.43µs 2.9µs 9.64µs 2.38µs 2.9µs 2.58µs 2.58µs] - ⭐️ Times for flux-sample-4 to flux-sample-1: [3.29µs 6.68µs 3.85µs 4.15µs 6.09µs 3.8µs 3.14µs 3.02µs 4.05µs 3.57µs] - ⭐️ Times for flux-sample-1 to flux-sample-7: [3.13µs 3.04µs 4.04µs 3.19µs 2.99µs 2.611µs 2.94µs 4.34µs 3.82µs 9.65µs] - ⭐️ Times for flux-sample-7 to flux-sample-1: [3.009µs 3.4µs 3.26µs 3.91µs 2.85µs 2.84µs 3.151µs 3.16µs 3.58µs 3.21µs] - ⭐️ Times for flux-sample-6 to flux-sample-1: [3.43µs 4.65µs 3.38µs 3.02µs 2.95µs 3.03µs 3.08µs 3.071µs 3µs 2.71µs] - ⭐️ Times for flux-sample-3 to flux-sample-2: [3.109µs 3.68µs 4.38µs 3.309µs 3.27µs 3.04µs 2.54µs 3µs 3.16µs 4.08µs] - ⭐️ Times for flux-sample-2 to flux-sample-4: [2.75µs 2.99µs 3.25µs 2.25µs 2.81µs 3.24µs 5.2µs 4.08µs 4.14µs 2.94µs] - ⭐️ Times for flux-sample-4 to flux-sample-2: [4.169µs 3.32µs 3.09µs 3.26µs 3.28µs 3.8µs 13.37µs 2.41µs 3.6µs 2.94µs] - ⭐️ Times for flux-sample-2 to flux-sample-5: [3.28µs 3.16µs 2.46µs 2.88µs 2.9µs 3.3µs 4µs 3.24µs 3.49µs 2.87µs] - ⭐️ Times for flux-sample-5 to flux-sample-2: [3.66µs 2.689µs 2.96µs 4.02µs 3.02µs 21.68µs 2.66µs 3.27µs 3.209µs 3.16µs] - ⭐️ Times for flux-sample-3 to flux-sample-4: [3.77µs 2.67µs 3µs 2.74µs 3.75µs 2.76µs 3.77µs 3.09µs 2.92µs 5.22µs] - ⭐️ Times for flux-sample-2 to flux-sample-6: [3.01µs 3.19µs 2.88µs 2.73µs 3.1µs 3.829µs 2.57µs 4.54µs 4.25µs 3.78µs] - ⭐️ Times for flux-sample-4 to flux-sample-3: [3.87µs 3.66µs 3.2µs 10.11µs 3.54µs 2.95µs 2.31µs 2.93µs 2.5µs 3.76µs] - ⭐️ Times for flux-sample-5 to flux-sample-3: [3µs 3.18µs 2.44µs 3.03µs 3.02µs 6.33µs 3.389µs 3.18µs 2.82µs 3.2µs] - ⭐️ Times for flux-sample-6 to flux-sample-2: [3.06µs 3.53µs 3.47µs 2.85µs 2.62µs 3.07µs 3.07µs 2.93µs 2.689µs 3.18µs] - ⭐️ Times for flux-sample-3 to flux-sample-5: [3.12µs 4.04µs 3.509µs 3.15µs 3.1µs 2.98µs 2.77µs 2.71µs 3.11µs 2.869µs] - ⭐️ Times for flux-sample-7 to flux-sample-2: [2.63µs 3.47µs 3.08µs 12.26µs 3.15µs 2.82µs 2.891µs 5.41µs 3.26µs 2.98µs] - ⭐️ Times for flux-sample-2 to flux-sample-7: [4.45µs 3.42µs 2.56µs 4.74µs 3.75µs 3.14µs 1.78µs 3.39µs 3.37µs 3.24µs] - ⭐️ Times for flux-sample-5 to flux-sample-4: [3.7µs 3.02µs 3.23µs 4.611µs 3.22µs 4.38µs 3.25µs 2.47µs 7.02µs 3.29µs] - ⭐️ Times for flux-sample-3 to flux-sample-6: [2.99µs 3.1µs 3.08µs 3.07µs 2.81µs 3.92µs 2.97µs 3.8µs 2.47µs 2.77µs] - ⭐️ Times for flux-sample-4 to flux-sample-5: [2.99µs 6.609µs 3.59µs 3.591µs 3.43µs 3.31µs 3.16µs 3.611µs 3.6µs 2.94µs] - ⭐️ Times for flux-sample-6 to flux-sample-3: [2.6µs 2.93µs 3.08µs 3.02µs 2.78µs 6.36µs 3.02µs 2.79µs 3.089µs 3.26µs] - ⭐️ Times for flux-sample-3 to flux-sample-7: [2.86µs 3.529µs 3.41µs 2.8µs 2.91µs 2.78µs 2.43µs 3.08µs 5.46µs 2.84µs] - ⭐️ Times for flux-sample-5 to flux-sample-6: [2.41µs 2.28µs 4.59µs 3.27µs 13.72µs 3.54µs 2.79µs 3.73µs 4.37µs 2.651µs] - ⭐️ Times for flux-sample-7 to flux-sample-3: [3.8µs 2.98µs 2.73µs 5.53µs 6.36µs 3.5µs 3.22µs 2.62µs 3.04µs 3.07µs] - ⭐️ Times for flux-sample-4 to flux-sample-6: [2.83µs 3.13µs 2.95µs 3.29µs 2.66µs 2.65µs 3.7µs 3.07µs 2.71µs 2.31µs] - ⭐️ Times for flux-sample-6 to flux-sample-4: [7.01µs 3µs 3.02µs 2.81µs 2.99µs 2.95µs 3.43µs 3.17µs 2.991µs 2.81µs] - ⭐️ Times for flux-sample-4 to flux-sample-7: [3.83µs 2.65µs 3.76µs 3.09µs 3.6µs 3.29µs 2.38µs 3.66µs 3.35µs 2.97µs] - ⭐️ Times for flux-sample-5 to flux-sample-7: [3.58µs 4.73µs 4.58µs 3.16µs 3.21µs 4.66µs 4.55µs 2.42µs 3.16µs 3.4µs] - ⭐️ Times for flux-sample-6 to flux-sample-5: [2.85µs 3.44µs 2.48µs 2.4µs 2.88µs 2.6µs 2.98µs 9.98µs 2.96µs 3.08µs] - ⭐️ Times for flux-sample-7 to flux-sample-4: [2.87µs 2.57µs 2.531µs 3.53µs 3.14µs 2.56µs 2.58µs 2.74µs 2.39µs 3.16µs] - ⭐️ Times for flux-sample-6 to flux-sample-7: [2.96µs 2.86µs 3.01µs 2.89µs 3.02µs 2.4µs 2.911µs 2.81µs 2.86µs 2.96µs] - ⭐️ Times for flux-sample-7 to flux-sample-5: [2.88µs 2.6µs 2.88µs 2.68µs 2.48µs 3.81µs 3.27µs 14.84µs 3.66µs 3.13µs] - ⭐️ Times for flux-sample-7 to flux-sample-6: [3.41µs 2.64µs 3.11µs 3.23µs 3.09µs 3.1µs 2.65µs 3.689µs 3.13µs 3.83µs] +Hello I'm host flux-sample-0 + ⭐️ Times for 10 messages flux-sample-0.flux-service.default.svc.cluster.local:5555 to flux-sample-2.flux-service.default.svc.cluster.local:5555: [2.27578ms 124.99µs 101.27µs 114.43µs 99.71µs 97.76µs 112.5µs 99.08µs 100.39µs 106.33µs] + ⭐️ Times for 10 messages flux-sample-2.flux-service.default.svc.cluster.local:5555 to flux-sample-0.flux-service.default.svc.cluster.local:5555: [509.558548ms 109.25µs 104.63µs 116.06µs 119.81µs 121.51µs 118.59µs 118.88µs 116.48µs 127.89µs] + ⭐️ Times for 10 messages flux-sample-2.flux-service.default.svc.cluster.local:5555 to flux-sample-1.flux-service.default.svc.cluster.local:5555: [1.088216243s 126.44µs 121.869µs 113.149µs 114.86µs 102.871µs 95.871µs 94.16µs 94.6µs 94.609µs] + ⭐️ Times for 10 messages flux-sample-1.flux-service.default.svc.cluster.local:5555 to flux-sample-7.flux-service.default.svc.cluster.local:5555: [291.263625ms 116.05µs 107.22µs 112.98µs 107.62µs 127.05µs 105.86µs 104.71µs 104.38µs 105.14µs] + ⭐️ Times for 10 messages flux-sample-2.flux-service.default.svc.cluster.local:5555 to flux-sample-3.flux-service.default.svc.cluster.local:5555: [1.37243942s 119.05µs 104.24µs 100.7µs 98.49µs 92.93µs 103.75µs 95.28µs 101.65µs 97.46µs] + ⭐️ Times for 10 messages flux-sample-3.flux-service.default.svc.cluster.local:5555 to flux-sample-6.flux-service.default.svc.cluster.local:5555: [188.336356ms 113.54µs 131.38µs 108.73µs 109.1µs 96.92µs 93.06µs 111.22µs 92.21µs 95.73µs] + ⭐️ Times for 10 messages flux-sample-6.flux-service.default.svc.cluster.local:5555 to flux-sample-5.flux-service.default.svc.cluster.local:5555: [617.644567ms 122.01µs 131.17µs 108.52µs 105.43µs 111.13µs 111.14µs 108.96µs 107.06µs 109.07µs] + ⭐️ Times for 10 messages flux-sample-2.flux-service.default.svc.cluster.local:5555 to flux-sample-4.flux-service.default.svc.cluster.local:5555: [2.335518399s 125.58µs 109.42µs 104.43µs 110.44µs 107.31µs 105.26µs 114.19µs 113.07µs 106.36µs] +... ``` Yes, they are running on different physical nodes: diff --git a/examples/distributed/gozmq/entrypoint.sh b/examples/distributed/gozmq/entrypoint.sh index 88a04195..1aa7fcca 100755 --- a/examples/distributed/gozmq/entrypoint.sh +++ b/examples/distributed/gozmq/entrypoint.sh @@ -6,4 +6,4 @@ rank=${FLUX_TASK_RANK} # Get the host name host=$(hostname) echo "Hello I'm host $host" -go run /code/main.go run --size ${workers} --prefix flux-sample --suffix "flux-service.default.svc.cluster.local" --port 5555 --rank ${rank} +go run /code/main.go run --size ${workers} --prefix flux-sample --suffix "flux-service.default.svc.cluster.local" --port 5555 --rank ${rank} --raw diff --git a/examples/distributed/gozmq/kind-config.yaml b/examples/distributed/gozmq/kind-config.yaml new file mode 100644 index 00000000..30ef3e08 --- /dev/null +++ b/examples/distributed/gozmq/kind-config.yaml @@ -0,0 +1,14 @@ +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +nodes: +- role: control-plane + kubeadmConfigPatches: + - | + kind: InitConfiguration + nodeRegistration: + kubeletExtraArgs: + node-labels: "ingress-ready=true" +- role: worker +- role: worker +- role: worker +- role: worker \ No newline at end of file diff --git a/examples/distributed/gozmq/main.go.txt b/examples/distributed/gozmq/main.go.txt index f3dfdd08..a94b029a 100644 --- a/examples/distributed/gozmq/main.go.txt +++ b/examples/distributed/gozmq/main.go.txt @@ -3,42 +3,181 @@ package main import ( "log" "os" - "strings" + "sync" "github.com/akamensky/argparse" zmq "github.com/pebbe/zmq4" "fmt" - "math/rand" "time" ) -func workerTask(toHost, fromHost string) { +// Global identity lookup of rank to socket identity and times +var ( + // Lookup of rank to identity + ids = map[string]string{} + + // Lookup of identity to rank + ranks = map[string]string{} +) + +type Rank struct { + Number int +} + +func (r *Rank) String() string { + return fmt.Sprintf("%s", r.Number) +} + +// ElapsedTime holds a start, end and elapsed time +type ElapsedTime struct { + StartTime time.Time + EndTime time.Time +} + +func (e *ElapsedTime) Start() { + e.StartTime = time.Now() +} +func (e *ElapsedTime) Stop() { + e.EndTime = time.Now() +} + +func (e *ElapsedTime) Elapsed() time.Duration { + return e.EndTime.Sub(e.StartTime) +} + +func (e *ElapsedTime) StartAsString() string { + return e.StartTime.Format(time.RFC3339Nano) +} + +func (e *ElapsedTime) SetEndFromString(raw string) { + restored, err := time.Parse(time.RFC3339Nano, raw) + if err != nil { + log.Fatalf("Cannot convert time %s\n", err) + } + e.EndTime = restored +} + +// brokerTask is receiving work (client or DEALER calls) +// and responding. +func brokerTask( + broker *zmq.Socket, + measurements int, + size int, +) { + + // The total number of expected interactions we should have is + // the number of other workers * measurements + expected := measurements * (size - 1) + count := 0 + + // Keep going until we hit expected + for count < expected { + identity, err := broker.Recv(0) + if err != nil { + log.Fatalf("Error", err) + } + + // Send back to the specific identity asking for more + // We check that the identity we receive at the worker is the one we sent + broker.Send(identity, zmq.SNDMORE) + + // This is the envelope delimiter + // If you look at the string it is empty + broker.Recv(0) + + // This is the response from the worker + fromIdentity, err := broker.Recv(0) + if fromIdentity != identity { + log.Fatalf("[broker] received message expecting %s got %s\n", identity, fromIdentity) + } + if err != nil { + log.Fatalf("Error broker receiving message", err) + } + + // This is completing the round trip, it tells the worker to start + // the next loop and that this message round is finished (I think) + broker.Send("", zmq.SNDMORE) + broker.Send(fromIdentity, 0) + count += 1 + } +} + +// workerTask SENDS the message and responds +// raw indicates showing raw results instead of a mean +func workerTask( + fromHost, toHost string, + rank int, + raw bool, + wg *sync.WaitGroup, + measurements int, +) { + + // Dealer sockets are the clients worker, err := zmq.NewSocket(zmq.DEALER) if err != nil { log.Fatalf("Error", err) } defer worker.Close() - set_id(worker) // Set a printable identity + defer wg.Done() + + // Set a printable identity and set for times + // This is a lookup of point to point send times + identity := setIdentifier(worker, rank) worker.Connect(fmt.Sprintf("tcp://%s", toHost)) - total := 0 - for { + // The client (dealer) is sending and receiving, + // so we keep track of round trip here. + // I think if we time the broker, the broker can store + // messages in memory so the times are too fast. + // Each rank (fromHost ) keeps track of the times from itself + // to one other host (toHost) + times := []time.Duration{} + + // Take m measurements + for m := 0; m < measurements; m++ { + + // This is a request for work - I think it would + // encompass two messages + _, err := worker.Send("", zmq.SNDMORE) + if err != nil { + log.Fatalf("Error Send More", err) + } + // Tell the broker we're ready for work - worker.Send("", zmq.SNDMORE) - worker.Send("Ready to serve!", 0) - - // Get workload from broker, until finished - worker.Recv(0) // Envelope delimiter - workload, _ := worker.Recv(0) - if workload == "Done" { - fmt.Printf("Completed: from %s to %s: %d tasks\n", fromHost, toHost, total) - break + t := ElapsedTime{} + t.Start() + + // We send back the worker rank (based on identity) to check + // against the original identity sent to + _, err = worker.Send(identity, 0) + if err != nil { + log.Fatalf("Error Send Message", err) + } + + _, err = worker.Recv(0) + if err != nil { + log.Fatalf("Error Receiving Envelope", err) } - total++ + receivedMessage, err := worker.Recv(0) - // Do some random work - time.Sleep(time.Duration(rand.Intn(500)+1) * time.Millisecond) + // This is thd end of the round trip + t.Stop() + + if err != nil { + log.Fatalf("Error", err) + } + + times = append(times, t.Elapsed()) + if receivedMessage != identity { + log.Fatalf("[worker] received message expecting %s got %s\n", identity, receivedMessage) + } + } + if raw { + fmt.Printf(" ⭐️ Times for %d messages %s to %s: %s\n", measurements, fromHost, toHost, times) + } else { + meanTime := calculateMean(times) + fmt.Printf(" ⭐️ Mean times for %d messages %s to %s: %s\n", measurements, fromHost, toHost, meanTime) } } @@ -49,7 +188,9 @@ func main() { prefix := runCmd.String("p", "prefix", &argparse.Options{Help: "Hostname prefix (e.g., flux-sample)"}) size := runCmd.Int("s", "size", &argparse.Options{Help: "Number of hosts (count starts at 0)"}) rank := runCmd.Int("r", "rank", &argparse.Options{Help: "Rank of this host"}) - tasks := runCmd.Int("t", "tasks", &argparse.Options{Help: "Number of tasks (workers) per node", Default: 1}) + + // This should only be set to 1 for this example + raw := runCmd.Flag("", "raw", &argparse.Options{Help: "Output raw times instead of mean", Default: false}) measurements := runCmd.Int("m", "measurements", &argparse.Options{Help: "Number of measurements to take (to average over)", Default: 10}) suffix := runCmd.String("", "suffix", &argparse.Options{Help: "Hostname suffix (e.g. .flux-service.default.svc.cluster.local)"}) port := runCmd.String("", "port", &argparse.Options{Help: "Port to use", Default: "5671"}) @@ -65,6 +206,9 @@ func main() { // Start the broker on the host thisHost := fmt.Sprintf("%s-%d.%s:%s", *prefix, *rank, *suffix, *port) + // This is the broker that will be a router on the rank it is running on + // We will ask the worker for a message, and then keep track of the + // round trip time broker, err := zmq.NewSocket(zmq.ROUTER) if err != nil { log.Fatalf("Error", err) @@ -74,74 +218,60 @@ func main() { brokerHost := fmt.Sprintf("tcp://*:%s", *port) broker.Bind(brokerHost) - // Run a client task for each host + // This will ensure the clients finish, and brokers as well + var wg sync.WaitGroup + + // Step 1: launch all the worker tasks! + // We run a client task (worker) to send a message to every other host + // The workers are going to be the main driver to run some number of measurements for i := 0; i < *size; i++ { - // Don't send to self? + // Don't send to self if i == *rank { + //row[i+1] = fmt.Sprintf("0") continue } host := fmt.Sprintf("%s-%d.%s:%s", *prefix, i, *suffix, *port) - // Note that we can run more than one worker task here, - // I'm choosing one to mimic(?) point to point (maybe)? - for w := 0; w < *tasks; w++ { - go workerTask(host, thisHost) - } + // We should only have one worker here for a point to point test + // This worker is from thisHost TO the other rank, which should + // also be running a broker. It will perform some number of + // tasks until it receives a Done message + wg.Add(1) + go workerTask(thisHost, host, i, *raw, &wg, *measurements) + } - // Keep matrix of elapsed times - times := make([]time.Duration, *measurements) - for m := 0; m < *measurements; m++ { - - // Next message gives us least recently used worker - identity, err := broker.Recv(0) - if err != nil { - log.Fatalf("Error", err) - } - start := time.Now() - broker.Send(identity, zmq.SNDMORE) - - // This is the envelope delimiter - broker.Recv(0) - - // This is the response from the worker - // This is the round trip time - broker.Recv(0) - end := time.Now() - elapsed := end.Sub(start) - - // Add the entry to our matrix - times[m] = elapsed - broker.Send("", zmq.SNDMORE) - - // Workers need to keep going until experiment done - broker.Send("Keep going", 0) - } + // Step 2: Kick off workers to receive them. Keep going + // until both it's received all the expected pings (from other workers) + // AND our own workers are done. + wg.Add(1) + go brokerTask(broker, *measurements, *size) - // Tell the worker it's done - toHostPrefix := strings.Split(host, ".") - fromHostPrefix := strings.Split(thisHost, ".") - fmt.Printf(" ⭐️ Times for %s to %s: %s\n", fromHostPrefix[0], toHostPrefix[0], times) - } + // Wait for all workers to finish, and then for all brokers + // to have the number of interactions they expect + wg.Wait() - // Give some time for everyone to finish - time.Sleep(time.Second * 10) - broker.Send("Done", 0) } } // calculateMean calculates the mean duration -// TODO get this working, units are weird func calculateMean(times []time.Duration) time.Duration { - total := time.Duration(0) + total := time.Duration(0) * time.Nanosecond for _, t := range times { total += t } - return (total / time.Duration(len(times))) * time.Nanosecond + return (total / time.Duration(len(times)) * time.Nanosecond) +} + +// getIdentifier for a rank +func getIdentifier(rank int) string { + return fmt.Sprintf("rank-%d", rank) } -func set_id(soc *zmq.Socket) { - identity := fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000)) +// getIdentifier for a rank +func setIdentifier(soc *zmq.Socket, rank int) string { + identity := getIdentifier(rank) soc.SetIdentity(identity) + return identity } diff --git a/examples/distributed/gozmq/main.go.v1 b/examples/distributed/gozmq/main.go.v1 new file mode 100644 index 00000000..cbc94917 --- /dev/null +++ b/examples/distributed/gozmq/main.go.v1 @@ -0,0 +1,169 @@ +// +// Asynchronous client-to-server (DEALER to ROUTER). +// +// While this example runs in a single process, that is just to make +// it easier to start and stop the example. Each task has its own +// context and conceptually acts as a separate process. + +package main + +import ( + "os" + + "github.com/akamensky/argparse" + zmq "github.com/pebbe/zmq4" + + "fmt" + "log" + "math/rand" + "sync" + "time" +) + +// --------------------------------------------------------------------- +// This is our client task +// It connects to the server, and then sends a request once per second +// It collects responses as they arrive, and it prints them out. We will +// run several client tasks in parallel, each with a different random ID. + +func client_task(host, port string) { + var mu sync.Mutex + + client, _ := zmq.NewSocket(zmq.DEALER) + defer client.Close() + + hostname := fmt.Sprintf("tcp://%s:%s", host, port) + + // Set random identity to make tracing easier + set_id(client) + client.Connect(hostname) + + go func() { + for request_nbr := 1; true; request_nbr++ { + time.Sleep(time.Second) + mu.Lock() + client.SendMessage(fmt.Sprintf("request #%d", request_nbr)) + mu.Unlock() + } + }() + + for { + time.Sleep(10 * time.Millisecond) + mu.Lock() + msg, err := client.RecvMessage(zmq.DONTWAIT) + if err == nil { + id, _ := client.GetIdentity() + fmt.Println(msg[0], id) + } + mu.Unlock() + } +} + +// This is our server task. +// It uses the multithreaded server model to deal requests out to a pool +// of workers and route replies back to clients. One worker can handle +// one request at a time but one client can talk to multiple workers at +// once. + +func server_task(port string) { + + // Frontend socket talks to clients over TCP + frontend, _ := zmq.NewSocket(zmq.ROUTER) + defer frontend.Close() + address := fmt.Sprintf("tcp://*:%s", port) + frontend.Bind(address) + + // Backend socket talks to workers over inproc + backend, _ := zmq.NewSocket(zmq.DEALER) + defer backend.Close() + backend.Bind("inproc://backend") + + // Connect backend to frontend via a proxy + err := zmq.Proxy(frontend, backend, nil) + if err != nil { + log.Fatalln("Proxy interrupted:", err) + } + + // Launch pool of worker threads, precise number is not critical + for { + go server_worker() + } + +} + +// Each worker task works on one request at a time and sends a random number +// of replies back, with random delays between replies: + +func server_worker() { + + worker, _ := zmq.NewSocket(zmq.DEALER) + defer worker.Close() + worker.Connect("inproc://backend") + + for { + // The DEALER socket gives us the reply envelope and message + msg, _ := worker.RecvMessage(0) + identity, content := pop(msg) + + fmt.Println("Looking for replies") + // Send 0..4 replies back + replies := rand.Intn(5) + for reply := 0; reply < replies; reply++ { + // Sleep for some fraction of a second + time.Sleep(time.Duration(rand.Intn(1000)+1) * time.Millisecond) + worker.SendMessage(identity, content) + } + } +} + +// The main thread simply starts several clients, and a server, and then +// waits for the server to finish. + +func main() { + + parser := argparse.NewParser("gozmq", "Playing with ZeroMQ in Go") + runCmd := parser.NewCommand("run", "Run the example") + prefix := runCmd.String("p", "prefix", &argparse.Options{Help: "Hostname prefix (e.g., flux-sample)"}) + size := runCmd.Int("s", "size", &argparse.Options{Help: "Number of hosts (count starts at 0)"}) + rank := runCmd.Int("r", "rank", &argparse.Options{Help: "Rank of this host"}) + suffix := runCmd.String("", "suffix", &argparse.Options{Help: "Hostname suffix (e.g. .flux-service.default.svc.cluster.local)"}) + port := runCmd.String("", "port", &argparse.Options{Help: "Port to use", Default: "5555"}) + + err := parser.Parse(os.Args) + if err != nil { + fmt.Println(parser.Usage(err)) + return + } + + if runCmd.Happened() { + go server_task(*port) + + // Run a client task for each host + for i := 0; i < *size; i++ { + + // Don't send to self? + if i == *rank { + continue + } + host := fmt.Sprintf("%s-%d%.s", *prefix, i, *suffix) + go client_task(host, *port) + } + } +} + +// TODO change to flux task rank? +func set_id(soc *zmq.Socket) { + identity := fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000)) + soc.SetIdentity(identity) +} + +func pop(msg []string) (head, tail []string) { + if msg[1] == "" { + head = msg[:2] + tail = msg[2:] + } else { + head = msg[:1] + tail = msg[1:] + } + return +} diff --git a/examples/distributed/gozmq/minicluster-gke.yaml b/examples/distributed/gozmq/minicluster-gke.yaml index 1cdbcec5..493a6c7b 100644 --- a/examples/distributed/gozmq/minicluster-gke.yaml +++ b/examples/distributed/gozmq/minicluster-gke.yaml @@ -13,6 +13,7 @@ spec: image: ghcr.io/converged-computing/flux-view-ubuntu:tag-jammy containers: - image: vanessa/gozmq:0 + pullAlways: true command: /bin/bash /code/entrypoint.sh 8 # c2d-standard-8 has 4 physical cores resources: diff --git a/examples/distributed/gozmq/minicluster.yaml b/examples/distributed/gozmq/minicluster.yaml index 9008407c..8c7eff67 100755 --- a/examples/distributed/gozmq/minicluster.yaml +++ b/examples/distributed/gozmq/minicluster.yaml @@ -3,8 +3,8 @@ kind: MiniCluster metadata: name: flux-sample spec: - size: 2 - tasks: 2 + size: 4 + tasks: 4 logging: quiet: true flux: @@ -12,4 +12,4 @@ spec: image: ghcr.io/converged-computing/flux-view-ubuntu:tag-jammy containers: - image: gozmq - command: /bin/bash /code/entrypoint.sh \ No newline at end of file + command: /bin/bash /code/entrypoint.sh 4 \ No newline at end of file