-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdefault_collector_test.go
84 lines (66 loc) · 1.67 KB
/
default_collector_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package phonelab
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type lineMapEntry struct {
Logline int64 `json:"logline"`
Timestamp float64 `json:"timestamp"`
}
type lineTimeMapProcessor struct{}
func (p *lineTimeMapProcessor) Handle(data interface{}) interface{} {
if ll, ok := data.(*Logline); ok && ll != nil {
return &lineMapEntry{
Logline: ll.LogcatToken,
Timestamp: ll.TraceTime,
}
}
return nil
}
func (p *lineTimeMapProcessor) Finish() {}
// Generate processors
type lineTimeMapGen struct{}
func (lc *lineTimeMapGen) GenerateProcessor(source *PipelineSourceInstance,
kwargs map[string]interface{}) Processor {
return NewSimpleProcessor(source.Processor, &lineTimeMapProcessor{})
}
// TODO: Clean up or remove this test
func TestBuildeDefaultrDataCollector(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
env := NewEnvironment()
env.Processors["test"] = &lineTimeMapGen{}
confString := `
data_collector:
name: "default"
args:
path: "file://test/default_collector_test"
aggregate: true
compress: true
source:
type: files
sources: ["./test/*.log"]
processors:
- name: main
generator: test
has_logstream: true
sink:
name: main
`
conf, err := RunnerConfFromString(confString)
require.Nil(err)
require.NotNil(conf)
require.NotNil(conf.DataCollector)
assert.Equal("default", conf.DataCollector.Name)
// Remove this for testing
defer os.RemoveAll("test/default_collector_test")
runner, err := conf.ToRunner(env)
require.Nil(err)
require.NotNil(runner)
t.Log(runner.Source)
// The processors handle the checking.
runner.Run()
}