-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy patherrorkernel.go
325 lines (275 loc) · 9.06 KB
/
errorkernel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
// The error kernel shall handle errors for a given process.
// This will be cases where the process itself were unable
// to handle the error on it's own, and we might need to
// restart the process, or send a message back to the operator
// that the action which the message where supposed to trigger
// failed, or that an event where unable to be processed.
package ctrl
import (
"context"
"fmt"
"log"
"os"
"time"
"golang.org/x/exp/slog"
)
// errorKernel is the structure that will hold all the error
// handling values and logic.
type errorKernel struct {
// NOTE: The errorKernel should probably have a concept
// of error-state which is a map of all the processes,
// how many times a process have failed over the same
// message etc...
// errorCh is used to report errors from a process
errorCh chan errorEvent
// testCh is used within REQTest for receving data for tests.
testCh chan []byte
ctx context.Context
cancel context.CancelFunc
metrics *metrics
configuration *Configuration
}
// newErrorKernel will initialize and return a new error kernel
func newErrorKernel(ctx context.Context, m *metrics, configuration *Configuration) *errorKernel {
ctxC, cancel := context.WithCancel(ctx)
return &errorKernel{
errorCh: make(chan errorEvent, 2),
testCh: make(chan []byte),
ctx: ctxC,
cancel: cancel,
metrics: m,
configuration: configuration,
}
}
type logLevel string
const logError logLevel = "error"
const logInfo logLevel = "info"
const logWarning logLevel = "warning"
const logDebug logLevel = "debug"
const logNone logLevel = "none"
// startErrorKernel will start the error kernel and check if there
// have been reveived any errors from any of the processes, and
// handle them appropriately.
//
// NOTE: Since a process will be locked while waiting to send the error
// on the errorCh maybe it makes sense to have a channel inside the
// processes error handling with a select so we can send back to the
// process if it should continue or not based not based on how severe
// the error where. This should be right after sending the error
// sending in the process.
func (e *errorKernel) start(ringBufferBulkInCh chan<- Message) error {
// Initiate the slog logger.
var replaceFunc func(groups []string, a slog.Attr) slog.Attr
if !e.configuration.LogConsoleTimestamps {
replaceFunc = func(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.TimeKey {
return slog.Attr{}
}
return a
}
}
switch {
case e.configuration.LogLevel == string(logError):
opts := slog.HandlerOptions{Level: slog.LevelError,
ReplaceAttr: replaceFunc}
slog.SetDefault(slog.New(opts.NewTextHandler(os.Stderr)))
case e.configuration.LogLevel == string(logInfo):
opts := slog.HandlerOptions{Level: slog.LevelInfo,
ReplaceAttr: replaceFunc}
slog.SetDefault(slog.New(opts.NewTextHandler(os.Stderr)))
case e.configuration.LogLevel == string(logWarning):
opts := slog.HandlerOptions{Level: slog.LevelWarn,
ReplaceAttr: replaceFunc}
slog.SetDefault(slog.New(opts.NewTextHandler(os.Stderr)))
case e.configuration.LogLevel == string(logDebug):
opts := slog.HandlerOptions{Level: slog.LevelDebug,
ReplaceAttr: replaceFunc}
slog.SetDefault(slog.New(opts.NewTextHandler(os.Stderr)))
case e.configuration.LogLevel == string(logNone):
// TODO:
default:
log.Printf("error: not valid log level: %v\n", e.configuration.LogLevel)
os.Exit(1)
}
for {
var errEvent errorEvent
select {
case errEvent = <-e.errorCh:
case <-e.ctx.Done():
return fmt.Errorf("info: stopping errorKernel")
}
sendErrorOrInfo := func(errEvent errorEvent) {
er := fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err)
m := Message{
Directory: "errorLog",
ToNode: "errorCentral",
FromNode: errEvent.process.node,
FileName: "error.log",
Data: []byte(er),
Method: ErrorLog,
ACKTimeout: errEvent.process.configuration.ErrorMessageTimeout,
Retries: errEvent.process.configuration.ErrorMessageRetries,
}
// Put the message on the channel to the ringbuffer.
ringBufferBulkInCh <- m
// if errEvent.process.configuration.EnableDebug {
// log.Printf("%v\n", er)
// }
switch errEvent.logLevel {
case logError:
slog.Error("error", fmt.Errorf("%v", er))
case logInfo:
slog.Info("info", fmt.Errorf("%v", er))
case logWarning:
slog.Warn(er)
case logDebug:
slog.Debug(er)
case logNone:
// Do nothing for type logNone errors.
}
}
// Check the type of the error to decide what to do.
//
// We should be able to handle each error individually and
// also concurrently, so each handler is started in it's
// own go routine
//
// Here we should check the severity of the error,
// and also possibly the the error-state of the process
// that fails.
switch errEvent.errorType {
case errTypeSendError:
// Just log the error by creating a message and send it
// to the errorCentral log server.
go func() {
sendErrorOrInfo(errEvent)
e.metrics.promErrorMessagesSentTotal.Inc()
}()
case errTypeSendInfo:
// Just log the error by creating a message and send it
// to the errorCentral log server.
go func() {
sendErrorOrInfo(errEvent)
e.metrics.promInfoMessagesSentTotal.Inc()
}()
case errTypeWithAction:
// Just print the error, and tell the process to continue. The
// process who sent the error should block and wait for receiving
// an errActionContinue message.
go func() {
log.Printf("TESTING, we received and error from the process, but we're telling the process back to continue\n")
// Send a message back to where the errWithAction function
// was called on the errorActionCh so the caller can decide
// what to do based on the response.
select {
case errEvent.errorActionCh <- errActionContinue:
case <-e.ctx.Done():
log.Printf("info: errorKernel: got ctx.Done, will stop waiting for errAction\n")
return
}
// We also want to log the error.
e.errSend(errEvent.process, errEvent.message, errEvent.err, logWarning)
}()
default:
}
}
}
func (e *errorKernel) stop() {
e.cancel()
}
type errorEvent struct {
// The actual error
err error
// Channel for communicating the action to take back to
// to the process who triggered the error
errorActionCh chan errorAction
// Some informational text
errorType errorType
// The process structure that belongs to a given process
process process
// The message that where in progress when error occured
message Message
// Level, the log level of the severity
logLevel logLevel
}
func (e errorEvent) Error() string {
return fmt.Sprintf("worker error: proc = %#v, message = %#v", e.process, e.message)
}
// errSend will just send an error message to the errorCentral.
// As input arguments it takes:
//
// The process where the error was generated.
// A message, where this can be an Message{} if you don't want to log the message
// or an actual message.
// The error, and a logLevel.
func (e *errorKernel) errSend(proc process, msg Message, err error, logLevel logLevel) {
ev := errorEvent{
err: err,
errorType: errTypeSendError,
process: proc,
message: msg,
logLevel: logLevel,
// We don't want to create any actions when just
// sending errors.
// errorActionCh: make(chan errorAction),
}
e.errorCh <- ev
switch logLevel {
case logError:
e.logError(err.Error())
case logInfo:
e.logInfo(err.Error())
case logWarning:
e.logWarn(err.Error())
case logDebug:
e.logDebug(err.Error())
}
}
// infoSend will just send an info message to the errorCentral.
func (e *errorKernel) infoSend(proc process, msg Message, err error) {
ev := errorEvent{
err: err,
errorType: errTypeSendInfo,
process: proc,
message: msg,
// We don't want to create any actions when just
// sending errors.
// errorActionCh: make(chan errorAction),
}
e.errorCh <- ev
}
func (e *errorKernel) logError(msg string, args ...any) {
slog.Error(msg, args...)
}
func (e *errorKernel) logInfo(msg string, args ...any) {
slog.Info(msg, args...)
}
func (e *errorKernel) logWarn(msg string, args ...any) {
slog.Warn(msg, args...)
}
// TODO: Make this into structured logging
func (e *errorKernel) logDebug(msg string, args ...any) {
slog.Debug(msg, args...)
}
// errorAction is used to tell the process who sent the error
// what it shall do. The process who sends the error will
// have to block and wait for the response on the errorActionCh.
type errorAction int
const (
// errActionContinue is ment to be used when the a process
// can just continue without taking any special care.
errActionContinue errorAction = iota
// TODO NOT IMPLEMENTED YET:
// errActionKill should log the error,
// and f.ex. stop the current work, and restart from start?
// errActionKill errorAction = iota
)
// errorType
type errorType int
const (
// errSend will just send the content of the error to the
// central error logger.
errTypeSendError errorType = iota
errTypeSendInfo errorType = iota
errTypeWithAction errorType = iota
)