Skip to content

Commit

Permalink
Merge pull request AliyunContainerService#6 from jzwlqx/refactor
Browse files Browse the repository at this point in the history
refactor and add regexp log format support
  • Loading branch information
jzwlqx authored Mar 14, 2017
2 parents 2149f14 + a7ad614 commit f241116
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 42 deletions.
7 changes: 6 additions & 1 deletion docker-images/fluentd.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
tag {{ $.containerId }}.{{ .Name }}
path {{ .HostDir }}/{{ .File }}
format {{ .Format }}
{{if .TimeFormat}} time_format {{ .TimeFormat }} {{end}}

{{if .FormatConfig}}
{{range $key, $value := .FormatConfig}}
{{ $key }} {{ $value }}
{{end}}
{{end}}
pos_file /pilot/pos/fluentd.pos
refresh_interval 5
</source>
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"io/ioutil"
log "github.com/sirupsen/logrus"
"os"
"github.com/jzwlqx/fluent-pilot/pilot"
"github.com/jzwlqx/fluentd-pilot/pilot"
)

func main() {
Expand Down
62 changes: 62 additions & 0 deletions pilot/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package pilot

import "fmt"

type FormatConverter func(info *LogInfoNode) (map[string]string, error)

var converters = make(map[string]FormatConverter)

func Register(format string, converter FormatConverter) {
converters[format] = converter
}

func Convert(info *LogInfoNode) (map[string]string, error) {
converter := converters[info.value]
if converter == nil {
return nil, fmt.Errorf("unsupported log format: %s", info.value)
}
return converter(info)
}

type SimpleConverter struct {
properties map[string]bool
}

func init() {

simpleConverter := func(properties []string) FormatConverter {
return func(info *LogInfoNode) (map[string]string, error) {
validProperties := make(map[string]bool)
for _, property := range properties {
validProperties[property] = true
}
ret := make(map[string]string)
for k, v := range info.children {
if _, ok := validProperties[k]; !ok {
return nil, fmt.Errorf("%s is not a valid properties for format %s", k, info.value)
}
ret[k] = v.value
}
return ret, nil
}
}

Register("none", simpleConverter([]string{}))
Register("csv", simpleConverter([]string{"time_key", "time_format", "keys"}))
Register("json", simpleConverter([]string{"time_key", "time_format"}))
Register("regexp", simpleConverter([]string{"time_key", "time_format", }))
Register("apache2", simpleConverter([]string{}))
Register("apache_error", simpleConverter([]string{}))
Register("nginx", simpleConverter([]string{}))
Register("regexp", func(info *LogInfoNode) (map[string]string, error) {
ret, err := simpleConverter([]string{"pattern", "time_format"})(info)
if err != nil {
return ret, err
}
if ret["pattern"] == "" {
return nil, fmt.Errorf("regex pattern can not be emtpy")
}
return ret, nil
})
}

125 changes: 94 additions & 31 deletions pilot/pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"sync"
"text/template"
"sort"
)

/**
Expand Down Expand Up @@ -104,7 +105,7 @@ type LogConfig struct {
HostDir string
ContainerDir string
Format string
TimeFormat string
FormatConfig map[string]string
File string
Tags map[string]string
}
Expand Down Expand Up @@ -261,8 +262,8 @@ func (p *Pilot) hostDirOf(path string, mounts map[string]types.MountPoint) strin
if point, ok := mounts[path]; ok {
return point.Source
}
path = filepath.Base(path)
if path == "/" {
path = filepath.Dir(path)
if path == "/" || path == "."{
break
}
}
Expand All @@ -289,80 +290,142 @@ func (p *Pilot) parseTags(tags string) (map[string]string, error) {
tagMap[key] = value
}
return tagMap, nil

}

func (p *Pilot) parseLogConfig(prefix string, jsonLogPath string, mounts map[string]types.MountPoint, labels map[string]string) (*LogConfig, error) {
path := labels[prefix]

func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath string, mounts map[string]types.MountPoint) (*LogConfig, error) {
path := info.value
if path == "" {
return nil, fmt.Errorf("label %s is empty or not exist.", prefix)
return nil, fmt.Errorf("path for %s is empty", name)
}

format := labels[prefix+".format"]
if format == "" {
format = "none"
}

tags := labels[prefix+".tags"]
tags := info.get("tags")
tagMap, err := p.parseTags(tags)

if err != nil {
return nil, fmt.Errorf("parse tags in %s error: %v", prefix+".tags", err)
return nil, fmt.Errorf("parse tags for %s error: %v", name, err)
}

if path == "stdout" {
return &LogConfig{
Name: strings.Split(prefix, ".")[2],
Name: name,
HostDir: filepath.Join(p.base, filepath.Dir(jsonLogPath)),
Format: "json",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%NZ",
File: filepath.Base(jsonLogPath),
Tags: tagMap,
FormatConfig: map[string]string{"time_format":"%Y-%m-%dT%H:%M:%S.%NZ"},
}, nil
}

if !filepath.IsAbs(path) {
return nil, fmt.Errorf("%s must be absolute path, in label %s", path, prefix)
return nil, fmt.Errorf("%s must be absolute path, for %s", path, name)
}
containerDir := filepath.Dir(path)
file := filepath.Base(path)
if file == "" {
return nil, fmt.Errorf("%s must be a file path, not directory, in label %s", path, prefix)
return nil, fmt.Errorf("%s must be a file path, not directory, for %s", path, name)
}

hostDir := p.hostDirOf(containerDir, mounts)
if hostDir == "" {
return nil, fmt.Errorf("%s is not mount on host, in label %s", path, prefix)
return nil, fmt.Errorf("in log %s: %s is not mount on host", name, path)
}

format := info.children["format"]
if format == nil {
format = newLogInfoNode("none")
}

formatConfig, err := Convert(format)
if err != nil {
return nil, fmt.Errorf("in log %s: format error: %v", name, err)
}

//特殊处理regex
if format.value == "regexp" {
format.value = fmt.Sprintf("/%s/", formatConfig["pattern"])
delete(formatConfig, "pattern")
}

return &LogConfig{
Name: strings.Split(prefix, ".")[2],
Name: name,
ContainerDir: containerDir,
Format: format,
Format: format.value,
File: file,
Tags: tagMap,
HostDir: filepath.Join(p.base, hostDir),
FormatConfig: formatConfig,
}, nil
}

func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, labels map[string]string) ([]LogConfig, error) {
var ret []LogConfig
type LogInfoNode struct {
value string
children map[string]*LogInfoNode
}

func newLogInfoNode(value string) *LogInfoNode {
return &LogInfoNode{
value: value,
children: make(map[string]*LogInfoNode),
}
}

func (node *LogInfoNode) insert(keys []string, value string) error {
if len(keys) == 0 {
return nil
}
key := keys[0]
if len(keys) > 1 {
if child, ok := node.children[key]; ok {
child.insert(keys[1:], value)
} else {
return fmt.Errorf("%s has no parent node", key)
}
} else {
child := newLogInfoNode(value)
node.children[key] = child
}
return nil
}

func (node *LogInfoNode) get(key string) string {
if child, ok := node.children[key]; ok {
return child.value
}
return ""
}

func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, labels map[string]string) ([]*LogConfig, error) {
var ret []*LogConfig

mountsMap := make(map[string]types.MountPoint)
for _, mount := range mounts {
mountsMap[mount.Destination] = mount
}

var labelNames []string
//sort keys
for k, _ := range labels {
if strings.HasPrefix(k, LABEL_SERVICE_LOGS) && strings.Count(k, ".") == 2 {
config, err := p.parseLogConfig(k, jsonLogPath, mountsMap, labels)
if err != nil {
return nil, err
}
labelNames = append(labelNames, k)
}

sort.Strings(labelNames)
root := newLogInfoNode("")
for _, k := range labelNames {
if !strings.HasPrefix(k, LABEL_SERVICE_LOGS) || strings.Count(k, ".") == 1 {
continue
}
logLabel := strings.TrimPrefix(k, LABEL_SERVICE_LOGS)
if err := root.insert(strings.Split(logLabel, "."), labels[k]); err != nil {
return nil, err
}
}

ret = append(ret, *config)
for name, node := range root.children {
logConfig, err := p.parseLogConfig(name, node, jsonLogPath, mountsMap)
if err != nil {
return nil, err
}
ret = append(ret, logConfig)
}
return ret, nil
}
Expand All @@ -374,7 +437,7 @@ func (p *Pilot) exists(containId string) bool {
return true
}

func (p *Pilot) render(containerId string, source Source, configList []LogConfig) (string, error) {
func (p *Pilot) render(containerId string, source Source, configList []*LogConfig) (string, error) {
log.Infof("logs: %v", configList)
var buf bytes.Buffer

Expand Down
45 changes: 36 additions & 9 deletions pilot/pilot_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package pilot

import (
"fmt"
log "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
"os"
"testing"
"github.com/docker/docker/api/types"
)

func Test(t *testing.T) {
Expand All @@ -21,18 +21,46 @@ var _ = check.Suite(&PilotSuite{})
func (p *PilotSuite) TestGetLogConfigs(c *check.C) {
pilot := &Pilot{}
labels := map[string]string{}
configs := pilot.getLogConfigs(labels)
configs, err := pilot.getLogConfigs("/path/to/json.log", []types.MountPoint{}, labels)
c.Assert(err, check.IsNil)
c.Assert(configs, check.HasLen, 0)

labels = map[string]string{
"aliyun.logs.hello": "/var/log/hello.log",
"aliyun.logs.hello.format": "json",
"aliyun.logs.hello.tags": "name=hello,stage=test",
"aliyun.logs.hello.format.time_format": "%Y-%m-%d",
}

//no mount
configs, err = pilot.getLogConfigs("/path/to/json.log", []types.MountPoint{}, labels)
c.Assert(err, check.NotNil)

mounts := []types.MountPoint{
{
Source: "/host",
Destination: "/var/log",
},
}
configs = pilot.getLogConfigs(labels)
configs, err = pilot.getLogConfigs("/path/to/json.log", mounts, labels)
c.Assert(err, check.IsNil)
c.Assert(configs, check.HasLen, 1)
c.Assert(configs[0].Format, check.Equals, "json")
c.Assert(configs[0].ContainerDir, check.Equals, "/var/log")
c.Assert(configs[0].File, check.Equals, "hello.log")
c.Assert(configs[0].Tags, check.HasLen, 2)
c.Assert(configs[0].FormatConfig, check.HasLen, 1)

//Test regex format
labels = map[string]string{
"aliyun.logs.hello": "/var/log/hello.log",
"aliyun.logs.hello.format": "regexp",
"aliyun.logs.hello.tags": "name=hello,stage=test",
"aliyun.logs.hello.format.pattern": "(?=name:hello).*",
}
configs, err = pilot.getLogConfigs("/path/to/json.log", mounts, labels)
c.Assert(err, check.IsNil)
c.Assert(configs[0].Format, check.Equals, "/(?=name:hello).*/")
}

func (p *PilotSuite) TestRender(c *check.C) {
Expand All @@ -48,21 +76,20 @@ func (p *PilotSuite) TestRender(c *check.C) {
{{end}}
`

configs := []LogConfig{
LogConfig{
configs := []*LogConfig{
&LogConfig{
Name: "hello",
HostDir: "/path/to/hello",
File: "hello.log",
},
LogConfig{
&LogConfig{
Name: "world",
File: "world.log",
HostDir: "/path/to/world",
},
}
pilot, err := New(template)
pilot, err := New(template, "/")
c.Assert(err, check.IsNil)
result, err := pilot.render("id-1111", configs)
_, err = pilot.render("id-1111", Source{}, configs)
c.Assert(err, check.IsNil)
fmt.Print(result)
}

0 comments on commit f241116

Please sign in to comment.