Skip to content

Commit

Permalink
Merge pull request #175 from bookingcom/grzkv/improving_testing_and_b…
Browse files Browse the repository at this point in the history
…ench_system

Rebuilding receiver and improving e2e testing system.
  • Loading branch information
grzkv authored May 31, 2022
2 parents e7e763e + d29d871 commit c192499
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 161 deletions.
2 changes: 1 addition & 1 deletion test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ WORKDIR /nanotube

COPY . .

RUN apt-get -y update && apt-get -y install bzip2 jq
RUN apt-get -y update && apt-get -y install bzip2 bc

RUN make nanotube
RUN make test/sender/sender
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Description=Receiver fro NT test
After=network.target

[Service]
ExecStart={{ bin_location }} -ports {{ ports }} -prom-port {{ prom_port }} -profiler {{ pprof_port }}
ExecStart={{ bin_location }} -ports {{ ports }} -promPort {{ prom_port }} -profPort {{ pprof_port }}
LimitNOFILE=5000
User=nanotube
Group=nanotube
Expand Down
291 changes: 139 additions & 152 deletions test/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package main

import (
"bufio"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
Expand All @@ -17,22 +15,12 @@ import (
"strings"
"sync"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)

type status struct {
sync.Mutex
Ready bool
dataProcessed bool

timestampLastProcessed time.Time
IdleTimeMilliSecs int64
}

func parsePorts(portsStr string, lg *zap.Logger) []int {
var ports []int

Expand Down Expand Up @@ -63,66 +51,87 @@ func parsePorts(portsStr string, lg *zap.Logger) []int {
ports = append(ports, i)
}
default:
lg.Fatal("wrong ports argument")

lg.Fatal("invalid ports argument")
}
}

return ports
}

func setupStatusServer(localAPIPort int, currentStatus *status, lg *zap.Logger) {
http.HandleFunc("/status", func(w http.ResponseWriter, req *http.Request) {
currentStatus.Lock()
defer currentStatus.Unlock()
if currentStatus.dataProcessed {
currentStatus.IdleTimeMilliSecs = time.Since(currentStatus.timestampLastProcessed).Milliseconds()
}
data, err := json.Marshal(currentStatus)
if err != nil {
lg.Error("error when json marshaling status", zap.Any("status", currentStatus), zap.Error(err))
}
fmt.Fprint(w, string(data))
})
if localAPIPort != 0 {
go func() {
err := http.ListenAndServe(fmt.Sprintf(":%d", localAPIPort), nil)
if err != nil {
lg.Fatal("failed to start the status server", zap.Error(err))
}
lg.Info("status server running", zap.Int("port", localAPIPort))
}()
}

}

type metrics struct {
inRecs prometheus.Counter
inRecs prometheus.Counter
timeOfLastWrite prometheus.Gauge
nOpenPorts prometheus.Gauge
}

func setupMetrics() *metrics {
func setupMetrics(lg *zap.Logger) *metrics {
ns := "receiver"
ms := metrics{
inRecs: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "receiver",
Namespace: ns,
Name: "in_records_total",
Help: "Incoming records counter.",
}),
timeOfLastWrite: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "time_of_last_write",
Help: "Time of last write to the port dump file.",
}),
nOpenPorts: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "n_open_ports",
Help: "Number of opened ports.",
}),
}
err := prometheus.Register(ms.inRecs)
if err != nil {
log.Fatalf("error registering the in_records_total metric: %v", err)
lg.Fatal("error registering the metric", zap.String("metric", "in_records_total"),
zap.Error(err))
}
err = prometheus.Register(ms.timeOfLastWrite)
if err != nil {
lg.Fatal("error registering the metric", zap.String("metric", "time_of_last_write"),
zap.Error(err))
}
err = prometheus.Register(ms.nOpenPorts)
if err != nil {
lg.Fatal("error registering the metric", zap.String("metric", "n_open_ports"),
zap.Error(err))
}

return &ms
}

func openFiles(outDir string, outPrefix string, ports []int, lg *zap.Logger) map[int]*os.File {
fs := make(map[int]*os.File)

for _, p := range ports {
fPath := fmt.Sprintf("%s/%s%d", outDir, outPrefix, p)
f, err := os.Create(fPath)
if err != nil {
lg.Fatal("failed to create file", zap.String("path", fPath), zap.Error(err))
}
fs[p] = f
}

return fs
}

func closeFiles(fs map[int]*os.File, lg *zap.Logger) {
for p, f := range fs {
err := f.Close()
if err != nil {
lg.Error("could not close file for port", zap.Int("port", p), zap.Error(err))
}
}
}

func main() {
portsStr := flag.String("ports", "", `List of the ports to listen on. Has to be supplied in the from "XXXX YYYY ZZZZ AAAA-BBBB" in quotes.`)
outPrefix := flag.String("prefix", "", "Prefix for the output files.")
outDir := flag.String("outdir", "", "Output directory. Absolute path. Optional.")
profiler := flag.String("profiler", "", "Where should the profiler listen?")
localAPIPort := flag.Int("local-api-port", 0, "specify which port the local HTTP API should be listening on")
promPort := flag.Int("prom-port", 0, "Prometheus port. If unset, Prometheus metrics are not exposed.")
profPort := flag.Int("profPort", 0, "Where should the profiler listen?")
promPort := flag.Int("promPort", 0, "Prometheus port. If unset, Prometheus metrics are not exposed.")

flag.Parse()

Expand All @@ -131,59 +140,62 @@ func main() {
log.Fatal("failed to create logger: ", err)
}

if *portsStr == "" {
lg.Fatal("please supply the ports argument")
}

ms := setupMetrics()
ms := setupMetrics(lg)

if *promPort != 0 {
go func() {
l, err := net.Listen("tcp", fmt.Sprintf(":%d", *promPort))
if err != nil {
lg.Error("opening TCP port for Prometheus failed", zap.Error(err))
}
err = http.Serve(l, promhttp.Handler())
if err != nil {
lg.Error("prometheus server failed", zap.Error(err))
}
}()
go promListen(*promPort, lg)
}

if *profiler != "" {
if *profPort != 0 {
go func() {
lg.Info("profiler server exited", zap.Error(http.ListenAndServe(*profiler, nil)))
lg.Info("profiler server exited", zap.Error(http.ListenAndServe(fmt.Sprintf(":%d", *profPort), nil)))
}()
}

ports := parsePorts(*portsStr, lg)
var fs map[int]*os.File
if *outDir != "" {
fs = openFiles(*outDir, *outPrefix, ports, lg)
defer closeFiles(fs, lg)
}
ls := openPorts(ports, lg)

currentStatus := &status{sync.Mutex{}, false, false, time.Now(), 0}
ms.nOpenPorts.Set(float64(len(ls)))

stop := make(chan struct{})

var portsWG sync.WaitGroup
for _, p := range ports {
portsWG.Add(1)

if *localAPIPort != 0 {
setupStatusServer(*localAPIPort, currentStatus, lg)
go listen(ls[p], p, stop, &portsWG, fs, ms, lg)
}

fs := make(map[int]io.Writer)
sgn := make(chan os.Signal, 1)
signal.Notify(sgn, os.Interrupt, syscall.SIGTERM)
<-sgn

for _, p := range ports {
fs[p] = ioutil.Discard
if *outDir != "" {
fPath := fmt.Sprintf("%s/%s%d", *outDir, *outPrefix, p)
f, err := os.Create(fPath)
if err != nil {
lg.Fatal("failed to create file", zap.String("path", fPath), zap.Error(err))
}
defer func(p int) {
err := f.Close()
if err != nil {
lg.Error("could not close file for port", zap.Int("port", p), zap.Error(err))
}
// start termination sequence
close(stop)

}(p)
fs[p] = f
}
if *outDir == "" {
os.Exit(0)
}
portsWG.Wait()
}

func promListen(promPort int, lg *zap.Logger) {
l, err := net.Listen("tcp", fmt.Sprintf(":%d", promPort))
if err != nil {
lg.Error("opening TCP port for Prometheus failed", zap.Error(err))
}
err = http.Serve(l, promhttp.Handler())
if err != nil {
lg.Error("prometheus server failed", zap.Error(err))
}
}

func openPorts(ports []int, lg *zap.Logger) map[int]net.Listener {
ls := make(map[int]net.Listener)
for _, p := range ports {
l, err := net.Listen("tcp", fmt.Sprintf(":%d", p))
Expand All @@ -193,80 +205,55 @@ func main() {
ls[p] = l
}

stop := make(chan struct{})
return ls
}

var portsWG sync.WaitGroup
for _, p := range ports {
portsWG.Add(1)
func listen(lst net.Listener, prt int, stop chan struct{}, portsWG *sync.WaitGroup, fs map[int]*os.File, ms *metrics, lg *zap.Logger) {
defer portsWG.Done()
var connectionWG sync.WaitGroup
out:
for {
connCh := make(chan net.Conn)
go func() {
conn, err := lst.Accept()
if err != nil {
lg.Fatal("failed to accept connection on addr %s: %v", zap.String("address", lst.Addr().String()), zap.Error(err))
}
connCh <- conn
}()

go func(lst net.Listener, prt int, stop chan struct{}) {
defer portsWG.Done()
var connectionWG sync.WaitGroup
out:
for {
connCh := make(chan net.Conn)
go func() {
conn, err := lst.Accept()
select {
case <-stop:
break out
case conn := <-connCh:
connectionWG.Add(1)

go func(conn net.Conn) {
defer connectionWG.Done()
defer func() {
err := conn.Close()
if err != nil {
lg.Fatal("failed to accept connection on addr %s: %v", zap.String("address", lst.Addr().String()), zap.Error(err))
lg.Fatal("connection close failed", zap.Error(err))
}
connCh <- conn
}()

select {
case <-stop:
break out
case conn := <-connCh:
connectionWG.Add(1)

go func(conn net.Conn) {
defer connectionWG.Done()
defer func() {
err := conn.Close()
if err != nil {
lg.Fatal("connection close failed", zap.Error(err))
}
}()
scanner := bufio.NewScanner(conn)
scanner.Buffer(make([]byte, bufio.MaxScanTokenSize*100), bufio.MaxScanTokenSize)
if *outDir == "" {
for scanner.Scan() {
ms.inRecs.Inc()
}
if err := scanner.Err(); err != nil {
lg.Info("failed scan when reading data", zap.Error(err))
}
} else {
_, err := io.Copy(fs[prt], conn)
if err != nil {
lg.Error("failed when dumping data", zap.Error(err))
}
}

currentStatus.Lock()
currentStatus.dataProcessed = true
currentStatus.timestampLastProcessed = time.Now()
currentStatus.Unlock()
}(conn)
if fs == nil {
scanner := bufio.NewScanner(conn)
scanner.Buffer(make([]byte, bufio.MaxScanTokenSize*100), bufio.MaxScanTokenSize)
for scanner.Scan() {
ms.inRecs.Inc()
}
if err := scanner.Err(); err != nil {
lg.Info("failed scan when reading data", zap.Error(err))
}
} else {
_, err := io.Copy(fs[prt], conn)
if err != nil {
lg.Error("failed when dumping data", zap.Error(err))
}
ms.timeOfLastWrite.SetToCurrentTime()
}
}
connectionWG.Wait()
}(ls[p], p, stop)
}

currentStatus.Lock()
currentStatus.Ready = true
currentStatus.Unlock()

sgn := make(chan os.Signal, 1)
signal.Notify(sgn, os.Interrupt, syscall.SIGTERM)
<-sgn

// start termination sequence
close(stop)

if *outDir == "" {
os.Exit(0)
}(conn)
}
}
portsWG.Wait()
connectionWG.Wait()
}
Loading

0 comments on commit c192499

Please sign in to comment.