Skip to content

Commit

Permalink
fix: ensure we clean up
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Apr 4, 2024
1 parent 5effde0 commit 97343e7
Showing 1 changed file with 23 additions and 52 deletions.
75 changes: 23 additions & 52 deletions examples/distributed/gozmq/main.go.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,6 @@ import (
"time"
)

// Global identity lookup of rank to socket identity and times
var (
// Lookup of rank to identity
ids = map[string]string{}

// Lookup of identity to rank
ranks = map[string]string{}
)

type Rank struct {
Number int
}

func (r *Rank) String() string {
return fmt.Sprintf("%s", r.Number)
}

// ElapsedTime holds a start, end and elapsed time
type ElapsedTime struct {
Expand All @@ -46,18 +30,6 @@ func (e *ElapsedTime) Elapsed() time.Duration {
return e.EndTime.Sub(e.StartTime)
}

func (e *ElapsedTime) StartAsString() string {
return e.StartTime.Format(time.RFC3339Nano)
}

func (e *ElapsedTime) SetEndFromString(raw string) {
restored, err := time.Parse(time.RFC3339Nano, raw)
if err != nil {
log.Fatalf("Cannot convert time %s\n", err)
}
e.EndTime = restored
}

// brokerTask is receiving work (client or DEALER calls)
// and responding.
func brokerTask(
Expand Down Expand Up @@ -181,6 +153,28 @@ func workerTask(
}
}

// calculateMean calculates the mean duration
func calculateMean(times []time.Duration) time.Duration {
total := time.Duration(0) * time.Nanosecond
for _, t := range times {
total += t
}
return (total / time.Duration(len(times)) * time.Nanosecond)
}

// getIdentifier for a rank
func getIdentifier(rank int) string {
return fmt.Sprintf("rank-%d", rank)
}

// setIdentifier for a rank
// These need to be predictable between nodes
func setIdentifier(soc *zmq.Socket, rank int) string {
identity := getIdentifier(rank)
soc.SetIdentity(identity)
return identity
}

func main() {

parser := argparse.NewParser("gozmq", "Playing with ZeroMQ in Go")
Expand Down Expand Up @@ -245,33 +239,10 @@ func main() {
// Step 2: Kick off workers to receive them. Keep going
// until both it's received all the expected pings (from other workers)
// AND our own workers are done.
wg.Add(1)
go brokerTask(broker, *measurements, *size)

// Wait for all workers to finish, and then for all brokers
// to have the number of interactions they expect
wg.Wait()

}
}

// calculateMean calculates the mean duration
func calculateMean(times []time.Duration) time.Duration {
total := time.Duration(0) * time.Nanosecond
for _, t := range times {
total += t
}
return (total / time.Duration(len(times)) * time.Nanosecond)
}

// getIdentifier for a rank
func getIdentifier(rank int) string {
return fmt.Sprintf("rank-%d", rank)
}

// getIdentifier for a rank
func setIdentifier(soc *zmq.Socket, rank int) string {
identity := getIdentifier(rank)
soc.SetIdentity(identity)
return identity
}
}

0 comments on commit 97343e7

Please sign in to comment.