-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogger.go
300 lines (263 loc) · 7.97 KB
/
logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
// Package logger provides a hook for Logrus to send log entries to VictoriaLogs.
// To use this package, initialize it with the desired configurations, and then use
// the provided global logger instance for logging.
//
// Example:
//
// logger.Init("http://my.victoria.logs", "production", "my-service", 2*time.Second, 100, 3, 1*time.Second)
// defer logger.Close() // Call this before exiting to ensure all logs are sent
// logger.Log.Info("This is an example log")
package logger
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/segmentio/encoding/json"
retry_v4 "github.com/avast/retry-go/v4"
"github.com/sirupsen/logrus"
)
// VictoriaLogsHook is a Logrus hook designed to forward log entries to VictoriaLogs.
type victoriaLogsHook struct {
URL string // The URL endpoint for VictoriaLogs
streamFields string
batchSize int // Number of logs to batch before sending
maxRetries int // Max number of retries for sending logs
retryDelay time.Duration // Delay between retries
client *http.Client // HTTP client
}
var (
flushSignalCh chan bool
once sync.Once
stopCh chan bool // Channel to signal stop
ticker *time.Ticker // Ticker for flush intervals
wg sync.WaitGroup // WaitGroup to ensure all logs are sent
ch chan *logrus.Entry // Channel for log entries
closed int32
)
func init() {
baseLogger := logrus.New()
Log = baseLogger.WithContext(context.Background())
}
// Init initializes the global logger with the VictoriaLogsHook.
//
// url: Endpoint for VictoriaLogs without path.
// env: Deployment environment (e.g. "production").
// service: Name of the service/application.
// flushPeriod: Duration to periodically flush logs.
// batchSize: Number of logs to batch before sending.
// maxRetries: Max number of retries for sending logs.
// retryDelay: Delay between retries.
func Init(ctx context.Context, url string, flushInterval time.Duration, batchSize, maxRetries int, retryDelay time.Duration, streams map[string]interface{}) error {
var initErr error
once.Do(func() { // Ensure initialization only happens once
baseLogger := logrus.New()
if ctx == nil {
ctx = context.Background()
}
Log = baseLogger.WithContext(ctx)
if url == "" {
initErr = errors.New("url cannot be empty")
}
if flushInterval <= 0 {
initErr = errors.New("flushInterval must be greater than 0")
}
if batchSize <= 0 {
initErr = errors.New("batchSize must be greater than 0")
}
if maxRetries < 0 {
initErr = errors.New("maxRetries cannot be negative")
}
if retryDelay <= 0 {
initErr = errors.New("retryDelay must be greater than 0")
}
if len(streams) == 0 {
initErr = errors.New("streams map cannot be empty")
}
Log = baseLogger.WithFields(streams)
// Perform a health check to ensure VictoriaLogs is operational
if err := healthCheck(url); err != nil {
initErr = fmt.Errorf("health check failed: %w", err)
}
client := &http.Client{
Timeout: 5 * time.Second,
}
// use string builder to build the stream fields
streamsFieldsBuilder := strings.Builder{}
for key, _ := range streams {
streamsFieldsBuilder.WriteString(key)
streamsFieldsBuilder.WriteString(",")
}
// remove the last comma
streamsFields := strings.TrimSuffix(streamsFieldsBuilder.String(), ",")
vlHook := &victoriaLogsHook{
URL: url + "/insert/jsonline",
streamFields: streamsFields,
batchSize: batchSize,
maxRetries: maxRetries,
retryDelay: retryDelay,
client: client,
}
baseLogger.AddHook(vlHook)
flushSignalCh = make(chan bool)
stopCh = make(chan bool)
ticker = time.NewTicker(flushInterval)
ch = make(chan *logrus.Entry, batchSize)
baseLogger.WithContext(ctx).WithFields(streams)
wg = sync.WaitGroup{}
wg.Add(1)
go vlHook.flusher()
})
return initErr // Return nil on successful initialization
}
// Levels returns all log levels to ensure the hook is used for all log levels.
func (hook *victoriaLogsHook) Levels() []logrus.Level {
return logrus.AllLevels
}
// Fire queues the log entry for batching.
func (hook *victoriaLogsHook) Fire(entry *logrus.Entry) error {
if atomic.LoadInt32(&closed) == 1 {
logrus.Warn("Attempted to log after logger has been closed. Log entry dropped.")
return nil
}
select {
case ch <- entry:
// Successfully sent the entry to the channel
default:
// Channel is full, signal the flusher to flush
flushSignalCh <- true
ch <- entry // You might want to handle the case where this blocks too
}
return nil
}
// flusher is a background goroutine that batches log entries and flushes them to VictoriaLogs.
func (hook *victoriaLogsHook) flusher() {
defer wg.Done()
batch := make([]*logrus.Entry, 0, hook.batchSize)
for {
select {
case entry, ok := <-ch:
if !ok {
hook.flush(batch)
return
}
batch = append(batch, entry)
if len(batch) >= hook.batchSize {
hook.flush(batch)
batch = batch[:0]
}
case <-ticker.C:
hook.flush(batch)
batch = batch[:0]
case <-flushSignalCh:
hook.flush(batch)
batch = batch[:0]
case <-stopCh:
hook.flush(batch)
return
}
}
}
// flush sends the batched log entries to VictoriaLogs.
func (hook *victoriaLogsHook) flush(batch []*logrus.Entry) {
if len(batch) == 0 {
return
}
var buffer bytes.Buffer
for _, entry := range batch {
// Get a logData map from the pool
logData := make(map[string]interface{}, len(entry.Data)+3)
logData["_msg"] = entry.Message
logData["_time"] = entry.Time.Format(time.RFC3339Nano)
logData["level"] = entry.Level.String()
for k, v := range entry.Data {
logData[k] = v
}
jsonEntry, err := json.Marshal(logData)
if err != nil {
logrus.Errorf("Error converting log entry to string: %v", err)
continue
}
buffer.Write(jsonEntry)
buffer.Write([]byte("\n"))
}
err := retry_v4.Do(
func() error {
req, err := http.NewRequest(
"POST",
hook.URL+fmt.Sprintf("?_stream_fields=%s", hook.streamFields),
&buffer,
)
if err != nil {
logrus.Error("Failed to create request:", err)
return err
}
req.Header.Set("Content-Type", "application/stream+json")
resp, err := hook.client.Do(req)
if err != nil {
logrus.Errorf("Failed to send logs: %v", err)
return err
}
defer resp.Body.Close() // Always close the response body
if resp.StatusCode != http.StatusOK {
errMsg := fmt.Sprintf("Failed to send logs with status: %s", resp.Status)
logrus.Error(errMsg)
return errors.New(errMsg) // Return this error instead of the previous one
}
return nil
},
retry_v4.Delay(hook.retryDelay),
retry_v4.Attempts(uint(hook.maxRetries)),
retry_v4.Delay(hook.retryDelay*time.Second),
retry_v4.MaxDelay(5*time.Second*hook.retryDelay),
retry_v4.MaxJitter(1*time.Second),
)
buffer.Reset()
if err != nil {
logrus.Errorln("Failed to send logs after retries:", err)
}
}
// Close should be called to gracefully shut down the logger.
// It ensures that all logs are sent before exiting.
func Close() {
atomic.StoreInt32(&closed, 1)
ticker.Stop()
close(stopCh)
close(ch)
wg.Wait()
}
// Log is the global logger instance.
var Log *logrus.Entry
func healthCheck(url string) error {
resp, err := http.Get(url + "/metrics")
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errors.New("VictoriaLogs is not operational")
}
return nil
}
// Define a private type to prevent context key collisions
type logContextKey struct{}
// logKey is the key for storing the logger in the context
var logKey = logContextKey{}
// WithLog embeds the logg into the context
func WithLog(ctx context.Context, logger *logrus.Entry) context.Context {
return context.WithValue(ctx, logKey, logger)
}
// LogFromContext retrieves the log from the context
func LogFromContext(ctx context.Context) *logrus.Entry {
logger, ok := ctx.Value(logKey).(*logrus.Entry)
if !ok {
// Return a default logger if none is found in the context
return Log
}
return logger
}