Skip to content

Commit

Permalink
Polished receiver.
Browse files Browse the repository at this point in the history
  • Loading branch information
grzkv committed May 31, 2022
1 parent 4759689 commit d29d871
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Description=Receiver fro NT test
After=network.target

[Service]
ExecStart={{ bin_location }} -ports {{ ports }} -prom-port {{ prom_port }} -profiler {{ pprof_port }}
ExecStart={{ bin_location }} -ports {{ ports }} -promPort {{ prom_port }} -profPort {{ pprof_port }}
LimitNOFILE=5000
User=nanotube
Group=nanotube
Expand Down
19 changes: 11 additions & 8 deletions test/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func main() {
portsStr := flag.String("ports", "", `List of the ports to listen on. Has to be supplied in the from "XXXX YYYY ZZZZ AAAA-BBBB" in quotes.`)
outPrefix := flag.String("prefix", "", "Prefix for the output files.")
outDir := flag.String("outdir", "", "Output directory. Absolute path. Optional.")
profPort := flag.String("profPort", "", "Where should the profiler listen?")
profPort := flag.Int("profPort", 0, "Where should the profiler listen?")
promPort := flag.Int("promPort", 0, "Prometheus port. If unset, Prometheus metrics are not exposed.")

flag.Parse()
Expand All @@ -146,15 +146,18 @@ func main() {
go promListen(*promPort, lg)
}

if *profPort != "" {
if *profPort != 0 {
go func() {
lg.Info("profiler server exited", zap.Error(http.ListenAndServe(*profPort, nil)))
lg.Info("profiler server exited", zap.Error(http.ListenAndServe(fmt.Sprintf(":%d", *profPort), nil)))
}()
}

ports := parsePorts(*portsStr, lg)
fs := openFiles(*outDir, *outPrefix, ports, lg)
defer closeFiles(fs, lg)
var fs map[int]*os.File
if *outDir != "" {
fs = openFiles(*outDir, *outPrefix, ports, lg)
defer closeFiles(fs, lg)
}
ls := openPorts(ports, lg)

ms.nOpenPorts.Set(float64(len(ls)))
Expand All @@ -165,7 +168,7 @@ func main() {
for _, p := range ports {
portsWG.Add(1)

go listen(ls[p], p, *outDir, stop, &portsWG, fs, ms, lg)
go listen(ls[p], p, stop, &portsWG, fs, ms, lg)
}

sgn := make(chan os.Signal, 1)
Expand Down Expand Up @@ -205,7 +208,7 @@ func openPorts(ports []int, lg *zap.Logger) map[int]net.Listener {
return ls
}

func listen(lst net.Listener, prt int, outDir string, stop chan struct{}, portsWG *sync.WaitGroup, fs map[int]*os.File, ms *metrics, lg *zap.Logger) {
func listen(lst net.Listener, prt int, stop chan struct{}, portsWG *sync.WaitGroup, fs map[int]*os.File, ms *metrics, lg *zap.Logger) {
defer portsWG.Done()
var connectionWG sync.WaitGroup
out:
Expand Down Expand Up @@ -233,7 +236,7 @@ out:
lg.Fatal("connection close failed", zap.Error(err))
}
}()
if outDir == "" {
if fs == nil {
scanner := bufio.NewScanner(conn)
scanner.Buffer(make([]byte, bufio.MaxScanTokenSize*100), bufio.MaxScanTokenSize)
for scanner.Scan() {
Expand Down

0 comments on commit d29d871

Please sign in to comment.