-
Notifications
You must be signed in to change notification settings - Fork 11
/
slicer.go
89 lines (82 loc) · 2.36 KB
/
slicer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package ik
import (
"sync"
"unsafe"
)
type SlicerNewKeyEventListener func(last Journal, next Journal) error
type Slicer struct {
journalGroup JournalGroup
keyGetter func(record FluentRecord) string
packer RecordPacker
logger Logger
keys map[string]bool
newKeyEventListeners map[uintptr]SlicerNewKeyEventListener
mtx sync.Mutex
}
func (slicer *Slicer) notifySlicerNewKeyEventListeners(last Journal, next Journal) {
// lock for slicer must be acquired by caller
for _, listener := range slicer.newKeyEventListeners {
err := listener(last, next)
if err != nil {
slicer.logger.Error("error occurred during notifying flush event: %s", err.Error())
}
}
}
func (slicer *Slicer) AddNewKeyEventListener(listener SlicerNewKeyEventListener) {
slicer.mtx.Lock()
defer slicer.mtx.Unlock()
// XXX hack!
slicer.newKeyEventListeners[uintptr(*(*unsafe.Pointer)(unsafe.Pointer(&listener)))] = listener
}
func (slicer *Slicer) Emit(recordSets []FluentRecordSet) error {
journals := make(map[string]Journal)
lastJournal := (Journal)(nil)
for _, recordSet := range recordSets {
tag := recordSet.Tag
for _, record := range recordSet.Records {
fullRecord := FluentRecord{
tag,
record.Timestamp,
record.Data,
}
key := slicer.keyGetter(fullRecord)
data, err := slicer.packer.Pack(fullRecord)
if err != nil {
return err
}
journal, ok := journals[key]
if !ok {
journal = slicer.journalGroup.GetJournal(key)
journals[key] = journal
slicer.mtx.Lock()
_, ok := slicer.keys[key]
slicer.keys[key] = true
slicer.mtx.Unlock()
if !ok {
slicer.notifySlicerNewKeyEventListeners(lastJournal, journal)
}
lastJournal = journal
}
err = journal.Write(data)
if err != nil {
return err
}
}
}
return nil
}
func NewSlicer(journalGroup JournalGroup, keyGetter func(record FluentRecord) string, packer RecordPacker, logger Logger) *Slicer {
keys := make(map[string]bool)
for _, key := range journalGroup.GetJournalKeys() {
keys[key] = true
}
return &Slicer{
journalGroup: journalGroup,
keyGetter: keyGetter,
packer: packer,
logger: logger,
keys: keys,
newKeyEventListeners: make(map[uintptr]SlicerNewKeyEventListener),
mtx: sync.Mutex{},
}
}