Skip to content

Commit

Permalink
add tests to check and avoid deadlocks (#571)
Browse files Browse the repository at this point in the history
* add test to check deadlocks
* Update README.md
  • Loading branch information
dmachard authored Jan 27, 2024
1 parent 83be5c3 commit 8fab14e
Show file tree
Hide file tree
Showing 23 changed files with 461 additions and 234 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testing-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ jobs:

- id: count_tests
run: |
data=$(sudo go test -v ./collectors ./processors ./dnsutils ./netlib ./loggers ./transformers ./pkgconfig ././ 2>&1 | grep -c RUN)
data=$(sudo go test -v ./collectors ./processors ./dnsutils ./netlib ./loggers ./transformers ./pkgconfig ./pkglinker ./pkgutils ././ 2>&1 | grep -c RUN)
echo "Count of Tests: $data"
echo "data=$data" >> $GITHUB_OUTPUT
Expand Down
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,13 @@ lint:
tests: check-go
@go test -race -cover -v
@go test ./pkgconfig/ -race -cover -v
@go test ./pkgutils/ -race -cover -v
@go test ./pkglinker/ -race -cover -v
@go test ./dnsutils/ -race -cover -v
@go test ./netlib/ -race -cover -v
@go test -timeout 60s ./transformers/ -race -cover -v
@go test -timeout 60s ./collectors/ -race -cover -v
@go test -timeout 90s ./dnsutils/ -race -cover -v
@go test -timeout 90s ./transformers/ -race -cover -v
@go test -timeout 90s ./collectors/ -race -cover -v
@go test -timeout 90s ./loggers/ -race -cover -v
@go test -timeout 60s ./processors/ -race -cover -v
@go test -timeout 90s ./processors/ -race -cover -v

# Cleans the project using go clean.
clean: check-go
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<p align="center">
<img src="https://goreportcard.com/badge/github.com/dmachard/go-dns-collector" alt="Go Report"/>
<img src="https://img.shields.io/badge/go%20version-min%201.20-green" alt="Go version"/>
<img src="https://img.shields.io/badge/go%20tests-384-green" alt="Go tests"/>
<img src="https://img.shields.io/badge/go%20lines-37290-green" alt="Go lines"/>
<img src="https://img.shields.io/badge/go%20tests-398-green" alt="Go tests"/>
<img src="https://img.shields.io/badge/go%20lines-37763-green" alt="Go lines"/>
</p>

<p align="center">
Expand Down
133 changes: 104 additions & 29 deletions collectors/dnsmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"reflect"
"regexp"
"strings"
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
Expand All @@ -30,31 +31,37 @@ type MatchSource struct {
}

type DNSMessage struct {
doneRun chan bool
doneMonitor chan bool
stopRun chan bool
stopMonitor chan bool
config *pkgconfig.Config
configChan chan *pkgconfig.Config
inputChan chan dnsutils.DNSMessage
logger *logger.Logger
name string
RoutingHandler pkgutils.RoutingHandler
doneRun chan bool
doneMonitor chan bool
stopRun chan bool
stopMonitor chan bool
config *pkgconfig.Config
configChan chan *pkgconfig.Config
inputChan chan dnsutils.DNSMessage
logger *logger.Logger
name string
// RoutingHandler pkgutils.RoutingHandler
droppedRoutes []pkgutils.Worker
defaultRoutes []pkgutils.Worker
dropped chan string
droppedCount map[string]int
}

func NewDNSMessage(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *DNSMessage {
logger.Info("[%s] collector=dnsmessage - enabled", name)
s := &DNSMessage{
doneRun: make(chan bool),
doneMonitor: make(chan bool),
stopRun: make(chan bool),
stopMonitor: make(chan bool),
config: config,
configChan: make(chan *pkgconfig.Config),
inputChan: make(chan dnsutils.DNSMessage, config.Collectors.DNSMessage.ChannelBufferSize),
logger: logger,
name: name,
RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name),
doneRun: make(chan bool),
doneMonitor: make(chan bool),
stopRun: make(chan bool),
stopMonitor: make(chan bool),
config: config,
configChan: make(chan *pkgconfig.Config),
inputChan: make(chan dnsutils.DNSMessage, config.Collectors.DNSMessage.ChannelBufferSize),
logger: logger,
name: name,
// RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name),
dropped: make(chan string),
droppedCount: map[string]int{},
}
s.ReadConfig()
return s
Expand All @@ -63,11 +70,13 @@ func NewDNSMessage(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *
func (c *DNSMessage) GetName() string { return c.name }

func (c *DNSMessage) AddDroppedRoute(wrk pkgutils.Worker) {
c.RoutingHandler.AddDroppedRoute(wrk)
// c.RoutingHandler.AddDroppedRoute(wrk)
c.droppedRoutes = append(c.droppedRoutes, wrk)
}

func (c *DNSMessage) AddDefaultRoute(wrk pkgutils.Worker) {
c.RoutingHandler.AddDefaultRoute(wrk)
// c.RoutingHandler.AddDefaultRoute(wrk)
c.defaultRoutes = append(c.defaultRoutes, wrk)
}

// deprecated function
Expand Down Expand Up @@ -217,26 +226,37 @@ func (c *DNSMessage) LogError(msg string, v ...interface{}) {
}

func (c *DNSMessage) Stop() {
c.LogInfo("stopping routing handler...")
c.RoutingHandler.Stop()
// c.LogInfo("stopping routing handler...")
// c.RoutingHandler.Stop()

// read done channel and block until run is terminated
c.LogInfo("stopping run...")
c.stopRun <- true
<-c.doneRun

c.LogInfo("stopping monitor...")
c.stopMonitor <- true
<-c.doneMonitor
}

func (c *DNSMessage) Run() {
c.LogInfo("starting collector...")
var err error

// prepare next channels
defaultRoutes, defaultNames := c.RoutingHandler.GetDefaultRoutes()
droppedRoutes, droppedNames := c.RoutingHandler.GetDroppedRoutes()
// defaultRoutes, defaultNames := c.RoutingHandler.GetDefaultRoutes()
// droppedRoutes, droppedNames := c.RoutingHandler.GetDroppedRoutes()
defaultRoutes, defaultNames := pkgutils.GetRoutes(c.defaultRoutes)
droppedRoutes, droppedNames := pkgutils.GetRoutes(c.droppedRoutes)

// prepare transforms
subprocessors := transformers.NewTransforms(&c.config.IngoingTransformers, c.logger, c.name, defaultRoutes, 0)

// start goroutine to count dropped messsages
go c.MonitorNextStanzas()

// read incoming dns message
c.LogInfo("waiting dns message to process...")
RUN_LOOP:
for {
select {
Expand Down Expand Up @@ -290,22 +310,77 @@ RUN_LOOP:
if matched {
subprocessors.InitDNSMessageFormat(&dm)
if subprocessors.ProcessMessage(&dm) == transformers.ReturnDrop {
c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
// c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
for i := range droppedRoutes {
select {
case droppedRoutes[i] <- dm:
default:
c.dropped <- droppedNames[i]
}
}
continue
}
}

// drop packet ?
if !matched {
c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
// c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
for i := range droppedRoutes {
select {
case droppedRoutes[i] <- dm:
default:
c.dropped <- droppedNames[i]
}
}
continue
}

// send to next
c.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm)
// c.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm)
for i := range defaultRoutes {
select {
case defaultRoutes[i] <- dm:
default:
c.dropped <- defaultNames[i]
}
}

}

}
c.LogInfo("run terminated")
}

func (c *DNSMessage) MonitorNextStanzas() {
watchInterval := 10 * time.Second
bufferFull := time.NewTimer(watchInterval)
FOLLOW_LOOP:
for {
select {
case <-c.stopMonitor:
close(c.dropped)
bufferFull.Stop()
c.doneMonitor <- true
break FOLLOW_LOOP

case loggerName := <-c.dropped:
if _, ok := c.droppedCount[loggerName]; !ok {
c.droppedCount[loggerName] = 1
} else {
c.droppedCount[loggerName]++
}

case <-bufferFull.C:

for v, k := range c.droppedCount {
if k > 0 {
c.LogError("stanza[%s] buffer is full, %d dnsmessage(s) dropped", v, k)
c.droppedCount[v] = 0
}
}
bufferFull.Reset(watchInterval)

}
}
c.LogInfo("monitor terminated")
}
80 changes: 80 additions & 0 deletions collectors/dnsmessage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package collectors

import (
"fmt"
"regexp"
"testing"
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-dnscollector/pkgutils"
"github.com/dmachard/go-dnscollector/processors"
"github.com/dmachard/go-logger"
)

func Test_DnsMessage_BufferLoggerIsFull(t *testing.T) {
// redirect stdout output to bytes buffer
logsChan := make(chan logger.LogEntry, 10)
lg := logger.New(true)
lg.SetOutputChannel((logsChan))

// init the collector and run-it
config := pkgconfig.GetFakeConfig()
c := NewDNSMessage(nil, config, lg, "test")

// init next logger with a buffer of one element
nxt := pkgutils.NewFakeLoggerWithBufferSize(1)
c.AddDefaultRoute(nxt)

// run collector
go c.Run()

// add a shot of dnsmessages to collector
dmIn := dnsutils.GetFakeDNSMessage()
for i := 0; i < 512; i++ {
c.GetInputChannel() <- dmIn
}

// waiting monitor to run in consumer
time.Sleep(12 * time.Second)

for entry := range logsChan {
fmt.Println(entry)
pattern := regexp.MustCompile(processors.ExpectedBufferMsg511)
if pattern.MatchString(entry.Message) {
break
}
}

// read dnsmessage from next logger
dmOut := <-nxt.GetInputChannel()
if dmOut.DNS.Qname != processors.ExpectedQname2 {
t.Errorf("invalid qname in dns message: %s", dmOut.DNS.Qname)
}

// send second shot of packets to consumer
for i := 0; i < 1024; i++ {
c.GetInputChannel() <- dmIn
}

// waiting monitor to run in consumer
time.Sleep(12 * time.Second)

for entry := range logsChan {
fmt.Println(entry)
pattern := regexp.MustCompile(processors.ExpectedBufferMsg1023)
if pattern.MatchString(entry.Message) {
break
}
}
// read dnsmessage from next logger
dm2 := <-nxt.GetInputChannel()
if dm2.DNS.Qname != processors.ExpectedQname2 {
t.Errorf("invalid qname in dns message: %s", dm2.DNS.Qname)
}

// stop all
c.Stop()
nxt.Stop()
}
Loading

0 comments on commit 8fab14e

Please sign in to comment.