From a7ad614905d6252e9aabe3afb8230c0fbc26c91b Mon Sep 17 00:00:00 2001 From: Jizhong Jiang Date: Tue, 14 Mar 2017 13:42:58 +0800 Subject: [PATCH] refactor and add regexp log format support 1. code refactor 2. add regexp log format 3. add aliyun.logs.name.format.xxx extend config 4. add time_key, time_format, keys configure to csv format 4. add time_key, time_format configure to json format --- docker-images/fluentd.tpl | 7 ++- main.go | 2 +- pilot/format.go | 62 +++++++++++++++++++ pilot/pilot.go | 125 ++++++++++++++++++++++++++++---------- pilot/pilot_test.go | 45 +++++++++++--- 5 files changed, 199 insertions(+), 42 deletions(-) create mode 100644 pilot/format.go diff --git a/docker-images/fluentd.tpl b/docker-images/fluentd.tpl index 7d80a238..c8a6a53f 100644 --- a/docker-images/fluentd.tpl +++ b/docker-images/fluentd.tpl @@ -4,7 +4,12 @@ tag {{ $.containerId }}.{{ .Name }} path {{ .HostDir }}/{{ .File }} format {{ .Format }} - {{if .TimeFormat}} time_format {{ .TimeFormat }} {{end}} + +{{if .FormatConfig}} +{{range $key, $value := .FormatConfig}} +{{ $key }} {{ $value }} +{{end}} +{{end}} pos_file /pilot/pos/fluentd.pos refresh_interval 5 diff --git a/main.go b/main.go index b004572c..489f3c96 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,7 @@ import ( "io/ioutil" log "github.com/sirupsen/logrus" "os" - "github.com/jzwlqx/fluent-pilot/pilot" + "github.com/jzwlqx/fluentd-pilot/pilot" ) func main() { diff --git a/pilot/format.go b/pilot/format.go new file mode 100644 index 00000000..7b94bab0 --- /dev/null +++ b/pilot/format.go @@ -0,0 +1,62 @@ +package pilot + +import "fmt" + +type FormatConverter func(info *LogInfoNode) (map[string]string, error) + +var converters = make(map[string]FormatConverter) + +func Register(format string, converter FormatConverter) { + converters[format] = converter +} + +func Convert(info *LogInfoNode) (map[string]string, error) { + converter := converters[info.value] + if converter == nil { + return nil, fmt.Errorf("unsupported log format: %s", info.value) + } + return converter(info) +} + +type SimpleConverter struct { + properties map[string]bool +} + +func init() { + + simpleConverter := func(properties []string) FormatConverter { + return func(info *LogInfoNode) (map[string]string, error) { + validProperties := make(map[string]bool) + for _, property := range properties { + validProperties[property] = true + } + ret := make(map[string]string) + for k, v := range info.children { + if _, ok := validProperties[k]; !ok { + return nil, fmt.Errorf("%s is not a valid properties for format %s", k, info.value) + } + ret[k] = v.value + } + return ret, nil + } + } + + Register("none", simpleConverter([]string{})) + Register("csv", simpleConverter([]string{"time_key", "time_format", "keys"})) + Register("json", simpleConverter([]string{"time_key", "time_format"})) + Register("regexp", simpleConverter([]string{"time_key", "time_format", })) + Register("apache2", simpleConverter([]string{})) + Register("apache_error", simpleConverter([]string{})) + Register("nginx", simpleConverter([]string{})) + Register("regexp", func(info *LogInfoNode) (map[string]string, error) { + ret, err := simpleConverter([]string{"pattern", "time_format"})(info) + if err != nil { + return ret, err + } + if ret["pattern"] == "" { + return nil, fmt.Errorf("regex pattern can not be emtpy") + } + return ret, nil + }) +} + diff --git a/pilot/pilot.go b/pilot/pilot.go index 4d5b6350..71c3d9d3 100644 --- a/pilot/pilot.go +++ b/pilot/pilot.go @@ -16,6 +16,7 @@ import ( "strings" "sync" "text/template" + "sort" ) /** @@ -104,7 +105,7 @@ type LogConfig struct { HostDir string ContainerDir string Format string - TimeFormat string + FormatConfig map[string]string File string Tags map[string]string } @@ -261,8 +262,8 @@ func (p *Pilot) hostDirOf(path string, mounts map[string]types.MountPoint) strin if point, ok := mounts[path]; ok { return point.Source } - path = filepath.Base(path) - if path == "/" { + path = filepath.Dir(path) + if path == "/" || path == "."{ break } } @@ -289,80 +290,142 @@ func (p *Pilot) parseTags(tags string) (map[string]string, error) { tagMap[key] = value } return tagMap, nil - } -func (p *Pilot) parseLogConfig(prefix string, jsonLogPath string, mounts map[string]types.MountPoint, labels map[string]string) (*LogConfig, error) { - path := labels[prefix] - +func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath string, mounts map[string]types.MountPoint) (*LogConfig, error) { + path := info.value if path == "" { - return nil, fmt.Errorf("label %s is empty or not exist.", prefix) + return nil, fmt.Errorf("path for %s is empty", name) } - format := labels[prefix+".format"] - if format == "" { - format = "none" - } - - tags := labels[prefix+".tags"] + tags := info.get("tags") tagMap, err := p.parseTags(tags) if err != nil { - return nil, fmt.Errorf("parse tags in %s error: %v", prefix+".tags", err) + return nil, fmt.Errorf("parse tags for %s error: %v", name, err) } if path == "stdout" { return &LogConfig{ - Name: strings.Split(prefix, ".")[2], + Name: name, HostDir: filepath.Join(p.base, filepath.Dir(jsonLogPath)), Format: "json", - TimeFormat: "%Y-%m-%dT%H:%M:%S.%NZ", File: filepath.Base(jsonLogPath), Tags: tagMap, + FormatConfig: map[string]string{"time_format":"%Y-%m-%dT%H:%M:%S.%NZ"}, }, nil } if !filepath.IsAbs(path) { - return nil, fmt.Errorf("%s must be absolute path, in label %s", path, prefix) + return nil, fmt.Errorf("%s must be absolute path, for %s", path, name) } containerDir := filepath.Dir(path) file := filepath.Base(path) if file == "" { - return nil, fmt.Errorf("%s must be a file path, not directory, in label %s", path, prefix) + return nil, fmt.Errorf("%s must be a file path, not directory, for %s", path, name) } hostDir := p.hostDirOf(containerDir, mounts) if hostDir == "" { - return nil, fmt.Errorf("%s is not mount on host, in label %s", path, prefix) + return nil, fmt.Errorf("in log %s: %s is not mount on host", name, path) + } + + format := info.children["format"] + if format == nil { + format = newLogInfoNode("none") + } + + formatConfig, err := Convert(format) + if err != nil { + return nil, fmt.Errorf("in log %s: format error: %v", name, err) + } + + //特殊处理regex + if format.value == "regexp" { + format.value = fmt.Sprintf("/%s/", formatConfig["pattern"]) + delete(formatConfig, "pattern") } return &LogConfig{ - Name: strings.Split(prefix, ".")[2], + Name: name, ContainerDir: containerDir, - Format: format, + Format: format.value, File: file, Tags: tagMap, HostDir: filepath.Join(p.base, hostDir), + FormatConfig: formatConfig, }, nil } -func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, labels map[string]string) ([]LogConfig, error) { - var ret []LogConfig +type LogInfoNode struct { + value string + children map[string]*LogInfoNode +} + +func newLogInfoNode(value string) *LogInfoNode { + return &LogInfoNode{ + value: value, + children: make(map[string]*LogInfoNode), + } +} + +func (node *LogInfoNode) insert(keys []string, value string) error { + if len(keys) == 0 { + return nil + } + key := keys[0] + if len(keys) > 1 { + if child, ok := node.children[key]; ok { + child.insert(keys[1:], value) + } else { + return fmt.Errorf("%s has no parent node", key) + } + } else { + child := newLogInfoNode(value) + node.children[key] = child + } + return nil +} + +func (node *LogInfoNode) get(key string) string { + if child, ok := node.children[key]; ok { + return child.value + } + return "" +} + +func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, labels map[string]string) ([]*LogConfig, error) { + var ret []*LogConfig mountsMap := make(map[string]types.MountPoint) for _, mount := range mounts { mountsMap[mount.Destination] = mount } + var labelNames []string + //sort keys for k, _ := range labels { - if strings.HasPrefix(k, LABEL_SERVICE_LOGS) && strings.Count(k, ".") == 2 { - config, err := p.parseLogConfig(k, jsonLogPath, mountsMap, labels) - if err != nil { - return nil, err - } + labelNames = append(labelNames, k) + } + + sort.Strings(labelNames) + root := newLogInfoNode("") + for _, k := range labelNames { + if !strings.HasPrefix(k, LABEL_SERVICE_LOGS) || strings.Count(k, ".") == 1 { + continue + } + logLabel := strings.TrimPrefix(k, LABEL_SERVICE_LOGS) + if err := root.insert(strings.Split(logLabel, "."), labels[k]); err != nil { + return nil, err + } + } - ret = append(ret, *config) + for name, node := range root.children { + logConfig, err := p.parseLogConfig(name, node, jsonLogPath, mountsMap) + if err != nil { + return nil, err } + ret = append(ret, logConfig) } return ret, nil } @@ -374,7 +437,7 @@ func (p *Pilot) exists(containId string) bool { return true } -func (p *Pilot) render(containerId string, source Source, configList []LogConfig) (string, error) { +func (p *Pilot) render(containerId string, source Source, configList []*LogConfig) (string, error) { log.Infof("logs: %v", configList) var buf bytes.Buffer diff --git a/pilot/pilot_test.go b/pilot/pilot_test.go index cbdcbb29..6acf98b8 100644 --- a/pilot/pilot_test.go +++ b/pilot/pilot_test.go @@ -1,11 +1,11 @@ package pilot import ( - "fmt" log "github.com/sirupsen/logrus" check "gopkg.in/check.v1" "os" "testing" + "github.com/docker/docker/api/types" ) func Test(t *testing.T) { @@ -21,18 +21,46 @@ var _ = check.Suite(&PilotSuite{}) func (p *PilotSuite) TestGetLogConfigs(c *check.C) { pilot := &Pilot{} labels := map[string]string{} - configs := pilot.getLogConfigs(labels) + configs, err := pilot.getLogConfigs("/path/to/json.log", []types.MountPoint{}, labels) + c.Assert(err, check.IsNil) c.Assert(configs, check.HasLen, 0) labels = map[string]string{ "aliyun.logs.hello": "/var/log/hello.log", "aliyun.logs.hello.format": "json", + "aliyun.logs.hello.tags": "name=hello,stage=test", + "aliyun.logs.hello.format.time_format": "%Y-%m-%d", + } + + //no mount + configs, err = pilot.getLogConfigs("/path/to/json.log", []types.MountPoint{}, labels) + c.Assert(err, check.NotNil) + + mounts := []types.MountPoint{ + { + Source: "/host", + Destination: "/var/log", + }, } - configs = pilot.getLogConfigs(labels) + configs, err = pilot.getLogConfigs("/path/to/json.log", mounts, labels) + c.Assert(err, check.IsNil) c.Assert(configs, check.HasLen, 1) c.Assert(configs[0].Format, check.Equals, "json") c.Assert(configs[0].ContainerDir, check.Equals, "/var/log") c.Assert(configs[0].File, check.Equals, "hello.log") + c.Assert(configs[0].Tags, check.HasLen, 2) + c.Assert(configs[0].FormatConfig, check.HasLen, 1) + + //Test regex format + labels = map[string]string{ + "aliyun.logs.hello": "/var/log/hello.log", + "aliyun.logs.hello.format": "regexp", + "aliyun.logs.hello.tags": "name=hello,stage=test", + "aliyun.logs.hello.format.pattern": "(?=name:hello).*", + } + configs, err = pilot.getLogConfigs("/path/to/json.log", mounts, labels) + c.Assert(err, check.IsNil) + c.Assert(configs[0].Format, check.Equals, "/(?=name:hello).*/") } func (p *PilotSuite) TestRender(c *check.C) { @@ -48,21 +76,20 @@ func (p *PilotSuite) TestRender(c *check.C) { {{end}} ` - configs := []LogConfig{ - LogConfig{ + configs := []*LogConfig{ + &LogConfig{ Name: "hello", HostDir: "/path/to/hello", File: "hello.log", }, - LogConfig{ + &LogConfig{ Name: "world", File: "world.log", HostDir: "/path/to/world", }, } - pilot, err := New(template) + pilot, err := New(template, "/") c.Assert(err, check.IsNil) - result, err := pilot.render("id-1111", configs) + _, err = pilot.render("id-1111", Source{}, configs) c.Assert(err, check.IsNil) - fmt.Print(result) }