Skip to content

Commit

Permalink
kafka, redis, fluentd and tcpclient loggers: no flush occurred on spe…
Browse files Browse the repository at this point in the history
…cific conditions for on connection attempts (#552)

* Fix flush buffer properly
* Add test conn attempt
* Update README.md
  • Loading branch information
dmachard authored Jan 20, 2024
1 parent 4b76235 commit d8ef081
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<p align="center">
<img src="https://goreportcard.com/badge/github.com/dmachard/go-dns-collector" alt="Go Report"/>
<img src="https://img.shields.io/badge/go%20version-min%201.20-green" alt="Go version"/>
<img src="https://img.shields.io/badge/go%20tests-383-green" alt="Go tests"/>
<img src="https://img.shields.io/badge/go%20tests-384-green" alt="Go tests"/>
<img src="https://img.shields.io/badge/go%20lines-37290-green" alt="Go lines"/>
</p>

Expand Down
1 change: 0 additions & 1 deletion loggers/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ PROCESS_LOOP:
case <-flushTimer.C:
if !fc.writerReady {
bufferDm = nil
continue
}

if len(bufferDm) > 0 {
Expand Down
2 changes: 0 additions & 2 deletions loggers/kafkaproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,7 @@ PROCESS_LOOP:
// flush the buffer
case <-flushTimer.C:
if !k.kafkaConnected {
k.LogInfo("buffer cleared!")
bufferDm = nil
continue
}

if len(bufferDm) > 0 {
Expand Down
2 changes: 0 additions & 2 deletions loggers/redispub.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,7 @@ PROCESS_LOOP:
// flush the buffer
case <-flushTimer.C:
if !c.writerReady {
c.LogInfo("Buffer cleared!")
bufferDm = nil
continue
}

if len(bufferDm) > 0 {
Expand Down
2 changes: 0 additions & 2 deletions loggers/tcpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,7 @@ PROCESS_LOOP:
// flush the buffer
case <-flushTimer.C:
if !c.writerReady {
c.LogInfo("buffer cleared!")
bufferDm = nil
continue
}

if len(bufferDm) > 0 {
Expand Down
58 changes: 58 additions & 0 deletions loggers/tcpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,61 @@ func Test_TcpClientRun(t *testing.T) {
})
}
}

func Test_TcpClient_ConnectionAttempt(t *testing.T) {
// init logger
cfg := pkgconfig.GetFakeConfig()
cfg.Loggers.TCPClient.FlushInterval = 1
cfg.Loggers.TCPClient.Mode = pkgconfig.ModeText
cfg.Loggers.TCPClient.RemoteAddress = "127.0.0.1"
cfg.Loggers.TCPClient.RemotePort = 9999
cfg.Loggers.TCPClient.ConnectTimeout = 1
cfg.Loggers.TCPClient.RetryInterval = 2

g := NewTCPClient(cfg, logger.New(true), "test")

// start the logger
go g.Run()

// just way to get connect attempt
time.Sleep(time.Second * 3)

// start receiver
fakeRcvr, err := net.Listen(netlib.SocketTCP, ":9999")
if err != nil {
t.Fatal(err)
}
defer fakeRcvr.Close()

// accept conn from logger
conn, err := fakeRcvr.Accept()
if err != nil {
return
}
defer conn.Close()

// wait connection on logger
time.Sleep(time.Second)

// send fake dns message to logger
dm := dnsutils.GetFakeDNSMessage()
g.GetInputChannel() <- dm

// read data on server side and decode-it
reader := bufio.NewReader(conn)
line, err := reader.ReadString('\n')
if err != nil {
t.Error(err)
return
}

pattern := regexp.MustCompile("dns.collector")
if !pattern.MatchString(line) {
t.Errorf("tcp error want dns.collector, got: %s", line)
}

// stop all
fakeRcvr.Close()
g.Stop()

}

0 comments on commit d8ef081

Please sign in to comment.