diff --git a/conn.go b/conn.go index f4180a4d..df943290 100644 --- a/conn.go +++ b/conn.go @@ -65,9 +65,9 @@ type Conn struct { delegate ConnDelegate - logger logger + logger []logger logLvl LogLevel - logFmt string + logFmt []string logGuard sync.RWMutex r io.Reader @@ -103,6 +103,9 @@ func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn { msgResponseChan: make(chan *msgResponse), exitChan: make(chan int), drainReady: make(chan int), + + logger: make([]logger, LogLevelMax+1), + logFmt: make([]string, LogLevelMax+1), } } @@ -122,19 +125,44 @@ func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) { c.logGuard.Lock() defer c.logGuard.Unlock() - c.logger = l - c.logLvl = lvl - c.logFmt = format - if c.logFmt == "" { - c.logFmt = "(%s)" + for level := range c.logger { + c.logger[level] = l + c.logFmt[level] = format + if c.logFmt[level] == "" { + c.logFmt[level] = "(%s)" + } } + c.logLvl = lvl +} + +func (c *Conn) SetLoggerForLevel(l logger, lvl LogLevel, format string) { + c.logGuard.Lock() + defer c.logGuard.Unlock() + + c.logger[lvl] = l + c.logFmt[lvl] = format +} + +// SetLoggerLevel sets the package logging level. +func (c *Conn) SetLoggerLevel(lvl LogLevel) { + c.logGuard.Lock() + defer c.logGuard.Unlock() + + c.logLvl = lvl +} + +func (c *Conn) getLogger(lvl LogLevel) (logger, LogLevel, string) { + c.logGuard.RLock() + defer c.logGuard.RUnlock() + + return c.logger[lvl], c.logLvl, c.logFmt[lvl] } -func (c *Conn) getLogger() (logger, LogLevel, string) { +func (c *Conn) getLogLevel() LogLevel { c.logGuard.RLock() defer c.logGuard.RUnlock() - return c.logger, c.logLvl, c.logFmt + return c.logLvl } // Connect dials and bootstraps the nsqd connection @@ -718,7 +746,7 @@ func (c *Conn) onMessageTouch(m *Message) { } func (c *Conn) log(lvl LogLevel, line string, args ...interface{}) { - logger, logLvl, logFmt := c.getLogger() + logger, logLvl, logFmt := c.getLogger(lvl) if logger == nil { return diff --git a/consumer.go b/consumer.go index e5438184..eae55d67 100644 --- a/consumer.go +++ b/consumer.go @@ -95,7 +95,7 @@ type Consumer struct { mtx sync.RWMutex - logger logger + logger []logger logLvl LogLevel logGuard sync.RWMutex @@ -166,7 +166,7 @@ func NewConsumer(topic string, channel string, config *Config) (*Consumer, error channel: channel, config: *config, - logger: log.New(os.Stderr, "", log.Flags()), + logger: make([]logger, LogLevelMax+1), logLvl: LogLevelInfo, maxInFlight: int32(config.MaxInFlight), @@ -183,6 +183,13 @@ func NewConsumer(topic string, channel string, config *Config) (*Consumer, error StopChan: make(chan int), exitChan: make(chan int), } + + // Set default logger for all log levels + l := log.New(os.Stderr, "", log.Flags()) + for index := range r.logger { + r.logger[index] = l + } + r.wg.Add(1) go r.rdyLoop() return r, nil @@ -219,10 +226,21 @@ func (r *Consumer) SetLogger(l logger, lvl LogLevel) { r.logGuard.Lock() defer r.logGuard.Unlock() - r.logger = l + for level := range r.logger { + r.logger[level] = l + } r.logLvl = lvl } +// SetLoggerForLevel assigns the same logger for specified `level`. +func (r *Consumer) SetLoggerForLevel(l logger, lvl LogLevel) { + r.logGuard.Lock() + defer r.logGuard.Unlock() + + r.logger[lvl] = l +} + +// SetLoggerLevel sets the package logging level. func (r *Consumer) SetLoggerLevel(lvl LogLevel) { r.logGuard.Lock() defer r.logGuard.Unlock() @@ -230,11 +248,18 @@ func (r *Consumer) SetLoggerLevel(lvl LogLevel) { r.logLvl = lvl } -func (r *Consumer) getLogger() (logger, LogLevel) { +func (r *Consumer) getLogger(lvl LogLevel) (logger, LogLevel) { r.logGuard.RLock() defer r.logGuard.RUnlock() - return r.logger, r.logLvl + return r.logger[lvl], r.logLvl +} + +func (r *Consumer) getLogLevel() LogLevel { + r.logGuard.RLock() + defer r.logGuard.RUnlock() + + return r.logLvl } // SetBehaviorDelegate takes a type implementing one or more @@ -530,12 +555,12 @@ func (r *Consumer) ConnectToNSQD(addr string) error { atomic.StoreInt32(&r.connectedFlag, 1) - logger, logLvl := r.getLogger() - conn := NewConn(addr, &r.config, &consumerConnDelegate{r}) - conn.SetLogger(logger, logLvl, - fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)) - + conn.SetLoggerLevel(r.getLogLevel()) + for index := range r.logger { + conn.SetLoggerForLevel(r.logger[index], LogLevel(index), + fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)) + } r.mtx.Lock() _, pendingOk := r.pendingConnections[addr] _, ok := r.connections[addr] @@ -1156,7 +1181,7 @@ func (r *Consumer) exit() { } func (r *Consumer) log(lvl LogLevel, line string, args ...interface{}) { - logger, logLvl := r.getLogger() + logger, logLvl := r.getLogger(lvl) if logger == nil { return diff --git a/delegates.go b/delegates.go index 2ccaec77..aca72529 100644 --- a/delegates.go +++ b/delegates.go @@ -15,6 +15,7 @@ const ( LogLevelInfo LogLevelWarning LogLevelError + LogLevelMax = iota - 1 // convenience - match highest log level ) // String returns the string form for a given LogLevel diff --git a/producer.go b/producer.go index ab8232ef..c4995dd9 100644 --- a/producer.go +++ b/producer.go @@ -12,6 +12,8 @@ import ( type producerConn interface { String() string SetLogger(logger, LogLevel, string) + SetLoggerLevel(LogLevel) + SetLoggerForLevel(logger, LogLevel, string) Connect() (*IdentifyResponse, error) Close() error WriteCommand(*Command) error @@ -28,7 +30,7 @@ type Producer struct { conn producerConn config Config - logger logger + logger []logger logLvl LogLevel logGuard sync.RWMutex @@ -80,7 +82,7 @@ func NewProducer(addr string, config *Config) (*Producer, error) { addr: addr, config: *config, - logger: log.New(os.Stderr, "", log.Flags()), + logger: make([]logger, int(LogLevelMax+1)), logLvl: LogLevelInfo, transactionChan: make(chan *ProducerTransaction), @@ -88,6 +90,12 @@ func NewProducer(addr string, config *Config) (*Producer, error) { responseChan: make(chan []byte), errorChan: make(chan []byte), } + + // Set default logger for all log levels + l := log.New(os.Stderr, "", log.Flags()) + for index, _ := range p.logger { + p.logger[index] = l + } return p, nil } @@ -119,15 +127,40 @@ func (w *Producer) SetLogger(l logger, lvl LogLevel) { w.logGuard.Lock() defer w.logGuard.Unlock() - w.logger = l + for level := range w.logger { + w.logger[level] = l + } w.logLvl = lvl } -func (w *Producer) getLogger() (logger, LogLevel) { +// SetLoggerForLevel assigns the same logger for specified `level`. +func (w *Producer) SetLoggerForLevel(l logger, lvl LogLevel) { + w.logGuard.Lock() + defer w.logGuard.Unlock() + + w.logger[lvl] = l +} + +// SetLoggerLevel sets the package logging level. +func (w *Producer) SetLoggerLevel(lvl LogLevel) { + w.logGuard.Lock() + defer w.logGuard.Unlock() + + w.logLvl = lvl +} + +func (w *Producer) getLogger(lvl LogLevel) (logger, LogLevel) { + w.logGuard.RLock() + defer w.logGuard.RUnlock() + + return w.logger[lvl], w.logLvl +} + +func (w *Producer) getLogLevel() LogLevel { w.logGuard.RLock() defer w.logGuard.RUnlock() - return w.logger, w.logLvl + return w.logLvl } // String returns the address of the Producer @@ -273,10 +306,11 @@ func (w *Producer) connect() error { w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr) - logger, logLvl := w.getLogger() - w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w}) - w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id)) + w.conn.SetLoggerLevel(w.getLogLevel()) + for index := range w.logger { + w.conn.SetLoggerForLevel(w.logger[index], LogLevel(index), fmt.Sprintf("%3d (%%s)", w.id)) + } _, err := w.conn.Connect() if err != nil { @@ -369,7 +403,7 @@ func (w *Producer) transactionCleanup() { } func (w *Producer) log(lvl LogLevel, line string, args ...interface{}) { - logger, logLvl := w.getLogger() + logger, logLvl := w.getLogger(lvl) if logger == nil { return diff --git a/producer_test.go b/producer_test.go index 9be29006..2b6f89ba 100755 --- a/producer_test.go +++ b/producer_test.go @@ -305,6 +305,10 @@ func (m *mockProducerConn) String() string { func (m *mockProducerConn) SetLogger(logger logger, level LogLevel, prefix string) {} +func (m *mockProducerConn) SetLoggerLevel(lvl LogLevel) {} + +func (m *mockProducerConn) SetLoggerForLevel(logger logger, level LogLevel, format string) {} + func (m *mockProducerConn) Connect() (*IdentifyResponse, error) { return &IdentifyResponse{}, nil }