Skip to content

Commit

Permalink
messager: add consistent log prefix w/ table name (#15973)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Perkins <[email protected]>
  • Loading branch information
derekperkins authored May 22, 2024
1 parent d20cb57 commit baff809
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions go/vt/vttablet/tabletserver/messager/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,37 +366,37 @@ func (mm *messageManager) Open() {

// Close stops the messageManager service.
func (mm *messageManager) Close() {
log.Infof("messageManager - started execution of Close")
log.Infof("messageManager (%v) - started execution of Close", mm.name)
mm.pollerTicks.Stop()
mm.purgeTicks.Stop()
log.Infof("messageManager - stopped the ticks. Acquiring mu Lock")
log.Infof("messageManager (%v) - stopped the ticks. Acquiring mu Lock", mm.name)

mm.mu.Lock()
log.Infof("messageManager - acquired mu Lock")
log.Infof("messageManager (%v) - acquired mu Lock", mm.name)
if !mm.isOpen {
log.Infof("messageManager - manager is not open")
log.Infof("messageManager (%v) - manager is not open", mm.name)
mm.mu.Unlock()
return
}
mm.isOpen = false
log.Infof("messageManager - cancelling all receivers")
log.Infof("messageManager (%v) - cancelling all receivers", mm.name)
for _, rcvr := range mm.receivers {
rcvr.receiver.cancel()
}
mm.receivers = nil
MessageStats.Set([]string{mm.name.String(), "ClientCount"}, 0)
log.Infof("messageManager - clearing cache")
log.Infof("messageManager (%v) - clearing cache", mm.name)
mm.cache.Clear()
log.Infof("messageManager - sending a broadcast")
log.Infof("messageManager (%v) - sending a broadcast", mm.name)
// This broadcast will cause runSend to exit.
mm.cond.Broadcast()
log.Infof("messageManager - stopping VStream")
log.Infof("messageManager (%v) - stopping VStream", mm.name)
mm.stopVStream()
mm.mu.Unlock()

log.Infof("messageManager - Waiting for the wait group")
log.Infof("messageManager (%v) - Waiting for the wait group", mm.name)
mm.wg.Wait()
log.Infof("messageManager - closed")
log.Infof("messageManager (%v) - closed", mm.name)
}

// Subscribe registers the send function as a receiver of messages
Expand All @@ -414,7 +414,7 @@ func (mm *messageManager) Subscribe(ctx context.Context, send func(*sqltypes.Res
}

if err := receiver.Send(mm.fieldResult); err != nil {
log.Errorf("Terminating connection due to error sending field info: %v", err)
log.Errorf("messageManager (%v) - Terminating connection due to error sending field info: %v", mm.name, err)
receiver.cancel()
return done
}
Expand Down Expand Up @@ -578,7 +578,7 @@ func (mm *messageManager) runSend() {
go func() {
err := mm.send(context.Background(), receiver, &sqltypes.Result{Rows: rows}) // calls the offsetting mm.wg.Done()
if err != nil {
log.Errorf("messageManager - send failed: %v", err)
log.Errorf("messageManager (%v) - send failed: %v", mm.name, err)
}
}()
}
Expand Down Expand Up @@ -621,7 +621,7 @@ func (mm *messageManager) send(ctx context.Context, receiver *receiverWithStatus
// Log the error, but we still want to postpone the message.
// Otherwise, if this is a chronic failure like "message too
// big", we'll end up spamming non-stop.
log.Errorf("Error sending messages: %v: %v", qr, err)
log.Errorf("messageManager (%v) - Error sending messages: %v: %v", mm.name, qr, err)
}
return mm.postpone(ctx, mm.tsv, mm.ackWaitTime, ids)
}
Expand Down Expand Up @@ -652,7 +652,7 @@ func (mm *messageManager) startVStream() {
}

func (mm *messageManager) stopVStream() {
log.Infof("messageManager - calling stream cancel")
log.Infof("messageManager (%v) - calling stream cancel", mm.name)
if mm.streamCancel != nil {
mm.streamCancel()
mm.streamCancel = nil
Expand All @@ -664,12 +664,12 @@ func (mm *messageManager) runVStream(ctx context.Context) {
err := mm.runOneVStream(ctx)
select {
case <-ctx.Done():
log.Info("Context canceled, exiting vstream")
log.Info("messageManager (%v) - Context canceled, exiting vstream", mm.name)
return
default:
}
MessageStats.Add([]string{mm.name.String(), "VStreamFailed"}, 1)
log.Infof("VStream ended: %v, retrying in 5 seconds", err)
log.Infof("messageManager (%v) - VStream ended: %v, retrying in 5 seconds", mm.name, err)
time.Sleep(5 * time.Second)
}
}
Expand Down Expand Up @@ -815,7 +815,7 @@ func (mm *messageManager) runPoller() {
mr, err := BuildMessageRow(row)
if err != nil {
mm.tsv.Stats().InternalErrors.Add("Messages", 1)
log.Errorf("Error reading message row: %v", err)
log.Errorf("messageManager (%v) - Error reading message row: %v", mm.name, err)
continue
}
if !mm.cache.Add(mr) {
Expand All @@ -836,7 +836,7 @@ func (mm *messageManager) runPurge() {
count, err := mm.tsv.PurgeMessages(ctx, nil, mm, time.Now().Add(-mm.purgeAfter).UnixNano())
if err != nil {
MessageStats.Add([]string{mm.name.String(), "PurgeFailed"}, 1)
log.Errorf("Unable to delete messages: %v", err)
log.Errorf("messageManager (%v) - Unable to delete messages: %v", mm.name, err)
} else {
MessageStats.Add([]string{mm.name.String(), "Purged"}, count)
}
Expand Down Expand Up @@ -939,7 +939,7 @@ func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]*
query, err := mm.readByPriorityAndTimeNext.GenerateQuery(bindVars, nil)
if err != nil {
mm.tsv.Stats().InternalErrors.Add("Messages", 1)
log.Errorf("Error reading rows from message table: %v", err)
log.Errorf("messageManager (%v) - Error reading rows from message table: %v", mm.name, err)
return nil, err
}
qr := &sqltypes.Result{}
Expand Down

0 comments on commit baff809

Please sign in to comment.