@@ -15,13 +15,14 @@ package evnet
15
15
16
16
import (
17
17
"errors"
18
- "log"
19
18
"math/rand"
20
19
"net"
21
20
"os"
22
21
"runtime"
23
22
"sync"
24
23
24
+ log "github.com/sirupsen/logrus"
25
+
25
26
. "github.com/bruce2233/evnet/socket"
26
27
27
28
"golang.org/x/sys/unix"
@@ -91,7 +92,7 @@ func (mr *MainReactor) Init(proto, addr string) error {
91
92
return err
92
93
}
93
94
mr .poller = p
94
- log .Println ( "debug: init main poller: " , p )
95
+ log .Debug ( " init main poller: " , p )
95
96
96
97
//init subReactor
97
98
for i := 0 ; i < SubReactorsNum ; i ++ {
@@ -111,7 +112,7 @@ func (mr *MainReactor) Init(proto, addr string) error {
111
112
listenerFd , _ , err := TcpSocket (proto , addr , true )
112
113
mr .listener .Fd = listenerFd
113
114
if err != nil {
114
- log .Println ("error: " , err )
115
+ log .Error ("error: " , err )
115
116
}
116
117
117
118
//add listenerFd epoll
@@ -156,22 +157,22 @@ func setEventHandler(mr *MainReactor, eh EventHandler) {
156
157
157
158
func (mainReactor * MainReactor ) Loop () {
158
159
159
- log .Println ( "debug: " , "mainReactor create" , "poller: " , mainReactor .poller , "linstenerFd is " , mainReactor .listener .Fd )
160
+ log .Debug ( "mainReactor create" , "poller: " , mainReactor .poller , "linstenerFd is " , mainReactor .listener .Fd )
160
161
161
162
for {
162
- log .Println ( "debug: " , "main reactor start polling" )
163
+ log .Debug ( "main reactor start polling" )
163
164
eventsList := mainReactor .poller .Polling ()
164
165
//working
165
166
for _ , v := range eventsList {
166
167
nfd , raddr , err := AcceptSocket (int (v .Fd ))
167
- // log.Println("debug: ", nfd, raddr.Network(), raddr.String())
168
+ // log.Debug( nfd, raddr.Network(), raddr.String())
168
169
os .NewSyscallError ("AcceptSocket Err" , err )
169
170
//convert from nfd, tcpAddr to a socket
170
171
// err = mainReactor.subReactors[0].poller.AddPollRead(int(v.Fd))
171
172
idx := rand .Intn (2 )
172
173
laddr := mainReactor .listener .addr
173
174
c , _ := NewConn (nfd , laddr , raddr )
174
- log .Println ( "debug: " , "mainReactor accept connection " , raddr , "to subReactor " , idx )
175
+ log .Debug ( "mainReactor accept connection " , raddr , "to subReactor " , idx )
175
176
sr := mainReactor .subReactors [idx ]
176
177
registerConn (sr , c )
177
178
}
@@ -197,13 +198,13 @@ func (mr *MainReactor) stop() {
197
198
198
199
func (sr * SubReactor ) Loop () error {
199
200
for {
200
- log .Println ( "debug: " , "SubReactor start polling" , sr .poller .Fd )
201
+ log .Debug ( "SubReactor start polling" , sr .poller .Fd )
201
202
eventList := sr .poller .Polling ()
202
203
for _ , event := range eventList {
203
- log .Println ( "debug: " , "events: " , event .Events )
204
+ log .Debug ( "events: " , event .Events )
204
205
if event .Events & OutEvents != 0 {
205
206
c := sr .connections [int (event .Fd )]
206
- log .Println ( "debug: " , "subReactor OutEvents from: " , c .RemoteAddr ())
207
+ log .Debug ( "subReactor OutEvents from: " , c .RemoteAddr ())
207
208
sr .write (c )
208
209
}
209
210
@@ -243,7 +244,7 @@ func polling(epfd int) []unix.EpollEvent {
243
244
var err error
244
245
for {
245
246
n , err = unix .EpollWait (epfd , eventsList , - 1 )
246
- log .Println ( "debug: " , "epoll trigger" )
247
+ log .Debug ( "epoll trigger" )
247
248
// if return EINTR, EpollWait again
248
249
// debugging will trigger unix.EINTR error
249
250
if n < 0 && err == unix .EINTR {
@@ -260,14 +261,14 @@ func polling(epfd int) []unix.EpollEvent {
260
261
261
262
func (sr * SubReactor ) read (c * conn ) error {
262
263
n , err := unix .Read (c .Fd (), sr .buffer )
263
- log .Println ( "debug: " , "sr read n:" , n )
264
+ log .Debug ( "sr read n:" , n )
264
265
// log.Println("sr.buffer len():", len(sr.buffer))
265
266
if n == 0 {
266
267
sr .closeConn (c )
267
268
}
268
269
if err != nil {
269
270
if err == unix .EAGAIN {
270
- log .Println ( "err: " , unix .EAGAIN )
271
+ log .Warn ( unix .EAGAIN )
271
272
return unix .EAGAIN
272
273
}
273
274
return unix .ECONNRESET
@@ -276,13 +277,13 @@ func (sr *SubReactor) read(c *conn) error {
276
277
err = (* * sr .eventHandlerPP ).OnTraffic (c )
277
278
if err != nil {
278
279
if err == ErrClose {
279
- log .Println ( "debug: ErrClose" )
280
+ log .Info ( ErrClose )
280
281
sr .closeConn (c )
281
282
}
282
- if err .Error () == "Shutdown" {
283
- log .Println ("debug: ErrShutdonw" )
284
- return err
283
+ if err == ErrShutdown {
284
+ log .Info (ErrShutdown )
285
285
}
286
+ return err
286
287
}
287
288
return nil
288
289
}
@@ -294,8 +295,13 @@ func (sr *SubReactor) write(c *conn) error {
294
295
295
296
n , err := unix .Write (c .Fd (), c .outboundBuffer )
296
297
if err != nil {
297
- log .Println ("error: " , "subReactor Write error" )
298
+ log .Warn ("subReactor Write error" )
299
+ if n == - 1 {
300
+ log .Warn ("subReactor try to write a closed conn" )
301
+ }
302
+ sr .closeConn (c )
298
303
}
304
+
299
305
if n == buffedLen {
300
306
//the c.outbound data belongs to the peek of queue
301
307
cur := c .asyncTaskQueue .Dequeue ()
@@ -360,7 +366,7 @@ func AcceptSocket(fd int) (int, net.Addr, error) {
360
366
case * unix.SockaddrInet4 :
361
367
sa4 , ok := sa .(* unix.SockaddrInet4 )
362
368
if ! ok {
363
- log .Println ("sa4 asset error" )
369
+ log .Error ("sa4 assertion error" )
364
370
}
365
371
return nfd , & net.TCPAddr {IP : sa4 .Addr [:], Port : sa4 .Port }, err
366
372
}
0 commit comments