diff --git a/.github/release.yml b/.github/release.yml new file mode 100644 index 0000000..a10d620 --- /dev/null +++ b/.github/release.yml @@ -0,0 +1,11 @@ +changelog: + categories: + - title: 🌱 What's new! + labels: + - feature + - title: 🐆 Optimizations + labels: + - performance + - title: 🛠️ Bugfixes + labels: + - bug diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..bad3266 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,42 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: flowd + +on: + push: + branches: [ main ] + tags: + - '*' + pull_request: + branches: [ main ] + +jobs: + + build: + runs-on: + labels: ubuntu-22.04-8c-32g + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: "1.20" + cache: true + + - name: Start containers + run: docker-compose -f "docker-compose.yml" up -d + + - name: Test + run: make test + + - name: Fmtcheck + run: make fmtcheck + + - name: Lint + run: make lint + + - name: Stop containers + run: docker-compose -f "docker-compose.yml" down -v + if: always() diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..4190e3c --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,34 @@ +name: release + +on: + push: + # run only against tags + tags: + - '*' + +permissions: + contents: write + # packages: write + # issues: write + +jobs: + public: + runs-on: macos-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - run: git fetch --force --tags + - uses: actions/setup-go@v4 + with: + go-version: stable + # More assembly might be required: Docker logins, GPG, etc. It all depends + # on your needs. + - uses: goreleaser/goreleaser-action@v4 + with: + # either 'goreleaser' (default) or 'goreleaser-pro': + distribution: goreleaser + version: latest + args: release --debug --clean -f .goreleaser.public.yaml + env: + GITHUB_TOKEN: ${{ secrets.GO_RELEASER }} diff --git a/.gitignore b/.gitignore index 223cec9..cf95cf7 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .coverprofile +dist diff --git a/.goreleaser.public.yaml b/.goreleaser.public.yaml new file mode 100644 index 0000000..aadfb29 --- /dev/null +++ b/.goreleaser.public.yaml @@ -0,0 +1,58 @@ +# ⚠️ Find delightfully thorough documentation at https://goreleaser.com + +before: + hooks: + # You may remove this if you don't use go modules. + - go mod tidy + # you may remove this if you don't need go generate + # - go generate ./... +builds: + - main: ./cmd/flowd + id: flowd + binary: flowd + goos: + - linux + - darwin + - windows + +archives: + - format: tar.gz + id: flowd + # this name template makes the OS and Arch compatible with the results of uname. + name_template: >- + flowd- + {{- .Os }}- + {{- if eq .Arch "386" }}i386 + {{- else }}{{ .Arch }}{{ end }} + {{- if .Arm }}v{{ .Arm }}{{ end }} + # use zip for windows archives + builds: [flowd] + format_overrides: + - goos: windows + format: zip + # only embed binaries for now + # https://goreleaser.com/customization/archive/?h=archives#packaging-only-the-binaries + files: + - none* + +changelog: + use: github-native + +# https://goreleaser.com/customization/release/ +release: + # Repo in which the release will be created. + # Default is extracted from the origin remote URL or empty if its private hosted. + github: + owner: runreveal + name: flow + +checksum: + name_template: 'checksums.txt' + +snapshot: + name_template: "{{ incpatch .Version }}-{{ .ShortCommit }}-{{ .Branch }}" + +# The lines beneath this are called `modelines`. See `:help modeline` +# Feel free to remove those if you don't want/use them. +# yaml-language-server: $schema=https://goreleaser.com/static/schema.json +# vim: set ts=2 sw=2 tw=0 fo=cnqoj diff --git a/Makefile b/Makefile index 1b94e76..55a2df4 100644 --- a/Makefile +++ b/Makefile @@ -1,21 +1,34 @@ -VERSION := $(shell git describe --tags --always --dirty="-dev") +VERSION := $(shell git describe --tags --always --dirty="_dev") LDFLAGS := -ldflags='-X "main.version=$(VERSION)"' -# GCFLAGS := -gcflags='-G=3' -GO = go -Q = @ - -GOTESTFLAGS = -race $(GCFLAGS) -ifndef Q - GOTESTFLAGS += -v -endif +GOTESTFLAGS = -race GOTAGS = testing +GO ?= $(shell which go) + +export GOEXPERIMENT=nocoverageredesign + .PHONY: test -test: vet - $Q$(GO) test -vet=off -tags='$(GOTAGS)' $(GOTESTFLAGS) -coverpkg="./..." -coverprofile=.coverprofile ./... - $Qgrep -v 'cmd' < .coverprofile > .covprof && mv .covprof .coverprofile - $Q$(GO) tool cover -func=.coverprofile +test: compose + $(GO) test -vet=off -tags='$(GOTAGS)' $(GOTESTFLAGS) -coverpkg="./..." -coverprofile=.coverprofile ./... + grep -v 'cmd' < .coverprofile > .covprof && mv .covprof .coverprofile + $(GO) tool cover -func=.coverprofile + +.PHONY: coverage +coverage: + $(GO) tool cover -html=.coverprofile + +.PHONY: version +version: + @echo $(VERSION) + +.PHONY: dist +dist: + goreleaser release --config .goreleaser.public.yaml --clean --snapshot + +.PHONY: compose +compose: + docker-compose up -d .PHONY: lint lint: $(GOPATH)/bin/golangci-lint @@ -24,19 +37,17 @@ lint: $(GOPATH)/bin/golangci-lint $(GOPATH)/bin/golangci-lint: $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2 -.PHONY: build -build: vet - $Q$(GO) build $(LDFLAGS) $(GCFLAGS) -o ./build/$(NAME) ./... - -.PHONY: vet -vet: - $Q$(GO) vet ./... +$(GOPATH)/bin/golines: + $(GO) install github.com/segmentio/golines@latest .PHONY: fmtcheck -fmtchk: - $Qexit $(shell goimports -l . | grep -v '^vendor' | wc -l) +fmtcheck: $(GOPATH)/bin/golines + exit $(shell golines -m 128 -l . | wc -l) .PHONY: fmtfix -fmtfix: - $Qgoimports -w $(shell find . -iname '*.go' | grep -v vendor) +fmtfix: $(GOPATH)/bin/golines + golines -m 128 -w . +.PHONY: clean +clean: + rm -rf dist diff --git a/README.md b/README.md index 907a99a..dc93c30 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,29 @@ +# flow +flow is an opinionated framework for scalable, reliable stream processing. -# Support Wishlist & Priorities +# flowd -Kafka -redis +flowd is a daemon for collecting system logs and metrics which makes is powered +by flow. -socket -http +# Installation -amqp -NATS +TODO + +# TODO + +Ensure that consumers of flow aren't subject to all the dependencies of flowd. +Consider breaking apart the library from the daemon. + +# Source Wishlist +Kafka +redis +NATS +amqp pubsub Kinesis -SQS - memcache? zmq? NSQ? -NATS? - diff --git a/cmd/flowd/config.go b/cmd/flowd/config.go new file mode 100644 index 0000000..7091319 --- /dev/null +++ b/cmd/flowd/config.go @@ -0,0 +1,84 @@ +package main + +import ( + "os" + + "github.com/runreveal/flow" + "github.com/runreveal/flow/internal/destinations" + "github.com/runreveal/flow/internal/destinations/runreveal" + "github.com/runreveal/flow/internal/sources" + "github.com/runreveal/flow/internal/sources/journald" + "github.com/runreveal/flow/internal/sources/syslog" + "github.com/runreveal/flow/internal/types" + "github.com/runreveal/lib/loader" + "golang.org/x/exp/slog" + // We could register and configure these in a separate package + // using the init() function. + // That would make it easy to "dynamically" enable and disable them at + // compile time since it would simply be updating the import list. +) + +func init() { + loader.Register("scanner", func() loader.Builder[flow.Source[types.Event]] { + return &ScannerConfig{} + }) + loader.Register("syslog", func() loader.Builder[flow.Source[types.Event]] { + return &SyslogConfig{} + }) + loader.Register("journald", func() loader.Builder[flow.Source[types.Event]] { + return &JournaldConfig{} + }) + + loader.Register("printer", func() loader.Builder[flow.Destination[types.Event]] { + return &PrinterConfig{} + }) + loader.Register("runreveal", func() loader.Builder[flow.Destination[types.Event]] { + return &RunRevealConfig{} + }) +} + +type ScannerConfig struct { +} + +func (c *ScannerConfig) Configure() (flow.Source[types.Event], error) { + slog.Info("configuring scanner") + return sources.NewScanner(os.Stdin), nil +} + +type SyslogConfig struct { + Addr string `json:"addr"` +} + +func (c *SyslogConfig) Configure() (flow.Source[types.Event], error) { + slog.Info("configuring syslog") + return syslog.NewSyslogSource(syslog.SyslogCfg{ + Addr: c.Addr, + }), nil +} + +type PrinterConfig struct { +} + +func (c *PrinterConfig) Configure() (flow.Destination[types.Event], error) { + slog.Info("configuring printer") + return destinations.NewPrinter(os.Stdout), nil +} + +type RunRevealConfig struct { + WebhookURL string `json:"webhookURL"` +} + +func (c *RunRevealConfig) Configure() (flow.Destination[types.Event], error) { + slog.Info("configuring runreveal") + return runreveal.New( + runreveal.WithWebhookURL(c.WebhookURL), + ), nil +} + +type JournaldConfig struct { +} + +func (c *JournaldConfig) Configure() (flow.Source[types.Event], error) { + slog.Info("configuring journald") + return journald.New(), nil +} diff --git a/cmd/flowd/main.go b/cmd/flowd/main.go new file mode 100644 index 0000000..ae9485f --- /dev/null +++ b/cmd/flowd/main.go @@ -0,0 +1,139 @@ +package main + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + + "github.com/runreveal/flow" + "github.com/runreveal/flow/internal/queue" + "github.com/runreveal/flow/internal/types" + "github.com/runreveal/lib/loader" + "github.com/spf13/cobra" + "golang.org/x/exp/slog" +) + +var ( + version = "dev" +) + +func init() { + replace := func(groups []string, a slog.Attr) slog.Attr { + // Remove the directory from the source's filename. + if a.Key == slog.SourceKey { + source := a.Value.Any().(*slog.Source) + source.File = filepath.Base(source.File) + } + return a + } + level := slog.LevelInfo + if _, ok := os.LookupEnv("FLOW_DEBUG"); ok { + level = slog.LevelDebug + } + + h := slog.NewTextHandler( + os.Stderr, + &slog.HandlerOptions{ + Level: level, + AddSource: true, + ReplaceAttr: replace, + }, + ) + + slogger := slog.New(h) + slog.SetDefault(slogger) +} + +func main() { + slog.Info(fmt.Sprintf("starting %s", path.Base(os.Args[0])), "version", version) + rootCmd := NewRootCommand() + flowdCmd := NewRunCommand() + rootCmd.AddCommand(flowdCmd) + + if err := rootCmd.Execute(); err != nil { + slog.Error(fmt.Sprintf("%+v", err)) + os.Exit(1) + } +} + +// Build the cobra command that handles our command line tool. +func NewRootCommand() *cobra.Command { + rootCmd := &cobra.Command{ + Use: path.Base(os.Args[0]), + Short: `flowd is an all-in-one event ingestion daemon`, + Long: `flowd is an all-in-one event ingestion daemon. +It is designed to be a single binary that can be deployed to a server and +configured to receive events from a variety of sources and send them to a +variety of destinations.`, + RunE: func(cmd *cobra.Command, args []string) error { + return cmd.Help() + }, + } + return rootCmd +} + +type Config struct { + Sources []loader.Loader[flow.Source[types.Event]] `json:"sources"` + Destinations []loader.Loader[flow.Destination[types.Event]] `json:"destinations"` + + PProfAddr string `json:"pprof"` +} + +// Build the cobra command that handles our command line tool. +func NewRunCommand() *cobra.Command { + // Use configuration defined outside the main package 🎉 + var config Config + var configFile string + + cmd := &cobra.Command{ + Use: "run", + Short: "run the all-in-one event ingestion daemon", + RunE: func(cmd *cobra.Command, args []string) error { + bts, err := os.ReadFile(configFile) + if err != nil { + return err + } + err = loader.LoadConfig(bts, &config) + if err != nil { + return err + } + + slog.Info(fmt.Sprintf("config: %+v", config)) + + ctx := context.Background() + srcs := []flow.Source[types.Event]{} + for _, v := range config.Sources { + src, err := v.Configure() + if err != nil { + return err + } + srcs = append(srcs, src) + } + + dsts := []flow.Destination[types.Event]{} + for _, v := range config.Destinations { + dst, err := v.Configure() + if err != nil { + return err + } + dsts = append(dsts, dst) + } + + q := queue.New(queue.WithSources(srcs), queue.WithDestinations(dsts)) + + err = q.Run(ctx) + slog.Error(fmt.Sprintf("closing: %+v", err)) + return err + }, + } + + cmd.Flags().StringVar(&configFile, "config", "config.json", "where to load the configuration from") + err := cmd.MarkFlagRequired("config") + if err != nil { + panic(err) + } + + return cmd +} diff --git a/go.mod b/go.mod index 8ea2802..1ed33b6 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,28 @@ module github.com/runreveal/flow go 1.18 require ( + github.com/carlmjohnson/requests v0.23.4 github.com/pkg/errors v0.9.1 - github.com/stretchr/testify v1.8.0 + github.com/runreveal/lib/await v0.0.0-20230712051640-49bc0f55bb3f + github.com/runreveal/lib/loader v0.0.0-20230712051640-49bc0f55bb3f + github.com/spf13/cobra v1.7.0 + github.com/stretchr/testify v1.8.4 + golang.org/x/exp v0.0.0-20230725012225-302865e7556b + gopkg.in/mcuadros/go-syslog.v2 v2.3.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/segmentio/asm v1.2.0 // indirect + github.com/segmentio/encoding v0.3.6 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a // indirect + github.com/tidwall/gjson v1.14.4 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + golang.org/x/net v0.7.0 // indirect + golang.org/x/sys v0.10.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8bde76e..34763fa 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,50 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/carlmjohnson/requests v0.23.4 h1:AxcvapfB9RPXLSyvAHk9YJoodQ43ZjzNHj6Ft3tQGdg= +github.com/carlmjohnson/requests v0.23.4/go.mod h1:Qzp6tW4DQyainPP+tGwiJTzwxvElTIKm0B191TgTtOA= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/runreveal/lib/await v0.0.0-20230712051640-49bc0f55bb3f h1:cSl3g4SyiXQFkFuASrw1HUqRta3tPQYwJ4EBfeymiNc= +github.com/runreveal/lib/await v0.0.0-20230712051640-49bc0f55bb3f/go.mod h1:Gyj90y+175aa23yMbbml4zvGuIZj32GOe3qx7wMRgoo= +github.com/runreveal/lib/loader v0.0.0-20230712051640-49bc0f55bb3f h1:r5nZa0tgkJIwXUMXKTBlBRdj36AKuYTx3xyrUK3xGGk= +github.com/runreveal/lib/loader v0.0.0-20230712051640-49bc0f55bb3f/go.mod h1:E9M3xUL9yIghVm+AuwcQZZim3JbiDveZX8ldXEwfg4g= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/segmentio/asm v1.1.3/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/segmentio/encoding v0.3.6 h1:E6lVLyDPseWEulBmCmAKPanDd3jiyGDo5gMcugCRwZQ= +github.com/segmentio/encoding v0.3.6/go.mod h1:n0JeuIqEQrQoPDGsjo8UNd1iA0U8d8+oHAA4E3G3OxM= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a h1:SJy1Pu0eH1C29XwJucQo73FrleVK6t4kYz4NVhp34Yw= +github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a/go.mod h1:DFSS3NAGHthKo1gTlmEcSBiZrRJXi28rLNd/1udP1c8= +github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM= +github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +golang.org/x/exp v0.0.0-20230725012225-302865e7556b h1:tK7yjGqVRzYdXsBcfD2MLhFAhHfDgGLm2rY1ub7FA9k= +golang.org/x/exp v0.0.0-20230725012225-302865e7556b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/sys v0.0.0-20211110154304-99a53858aa08/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/mcuadros/go-syslog.v2 v2.3.0 h1:kcsiS+WsTKyIEPABJBJtoG0KkOS6yzvJ+/eZlhD79kk= +gopkg.in/mcuadros/go-syslog.v2 v2.3.0/go.mod h1:l5LPIyOOyIdQquNg+oU6Z3524YwrcqEm0aKH+5zpt2U= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/destinations/printer.go b/internal/destinations/printer.go new file mode 100644 index 0000000..506a10d --- /dev/null +++ b/internal/destinations/printer.go @@ -0,0 +1,31 @@ +package destinations + +import ( + "context" + "fmt" + "io" + + "github.com/runreveal/flow" + "github.com/runreveal/flow/internal/types" +) + +type Printer struct { + writer io.Writer +} + +func NewPrinter(writer io.Writer) *Printer { + return &Printer{writer: writer} +} + +func (p *Printer) Send(ctx context.Context, ack func(), msg ...flow.Message[types.Event]) error { + for _, m := range msg { + _, err := fmt.Fprintf(p.writer, "%s\n", m.Value.RawLog) + if err != nil { + return err + } + } + if ack != nil { + ack() + } + return nil +} diff --git a/internal/destinations/runreveal/runreveal.go b/internal/destinations/runreveal/runreveal.go new file mode 100644 index 0000000..aa105ec --- /dev/null +++ b/internal/destinations/runreveal/runreveal.go @@ -0,0 +1,95 @@ +package runreveal + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/carlmjohnson/requests" + "github.com/runreveal/flow" + "github.com/runreveal/flow/internal/types" + batch "github.com/runreveal/flow/x/batcher" + "golang.org/x/exp/slog" +) + +type Option func(*RunReveal) + +func WithWebhookURL(url string) Option { + return func(r *RunReveal) { + r.webhookURL = url + } +} + +func WithHTTPClient(httpc *http.Client) Option { + return func(r *RunReveal) { + r.httpc = httpc + } +} + +type RunReveal struct { + httpc *http.Client + batcher *batch.Destination[types.Event] + + webhookURL string + reqConf requests.Config +} + +func New(opts ...Option) *RunReveal { + ret := &RunReveal{ + httpc: http.DefaultClient, + } + for _, o := range opts { + o(ret) + } + ret.batcher = batch.NewDestination[types.Event](ret, + batch.FlushLength(25), + batch.FlushFrequency(5*time.Second), + ) + return ret +} + +func (r *RunReveal) Run(ctx context.Context) error { + if r.webhookURL == "" { + return errors.New("missing webhook url") + } + + r.reqConf = func(rb *requests.Builder) { + rb. + UserAgent("flowd"). + Accept("application/json"). + BaseURL(r.webhookURL). + Header("Content-Type", "application/json") + } + + return r.batcher.Run(ctx) +} + +func (r *RunReveal) Send(ctx context.Context, ack func(), msgs ...flow.Message[types.Event]) error { + return r.batcher.Send(ctx, ack, msgs...) +} + +func (r *RunReveal) newReq() *requests.Builder { + return requests.New(r.reqConf) +} + +// Flush sends the given messages of type flow.Message[type.Event] to the RunReveal api +func (r *RunReveal) Flush(ctx context.Context, msgs []flow.Message[types.Event]) error { + batch := make([]json.RawMessage, len(msgs)) + var err error + for i, msg := range msgs { + batch[i], err = json.Marshal(msg.Value) + if err != nil { + slog.Error("error marshalling event", "err", err) + continue + } + } + // Send events to the webhookURL using POST + err = r.newReq().BodyJSON(batch).Fetch(ctx) + if err != nil { + return err + } + // TODO: retries + return nil +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 0000000..17c3916 --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,103 @@ +package queue + +import ( + "context" + "errors" + + "github.com/runreveal/flow" + "github.com/runreveal/flow/internal/types" + "github.com/runreveal/flow/x/multi" + "github.com/runreveal/lib/await" + "golang.org/x/exp/slog" +) + +type Option func(*Queue) + +func WithSources(srcs []flow.Source[types.Event]) Option { + return func(q *Queue) { + q.Sources = srcs + } +} + +func WithDestinations(dsts []flow.Destination[types.Event]) Option { + return func(q *Queue) { + q.Destinations = dsts + } +} + +type Queue struct { + Sources []flow.Source[types.Event] + Destinations []flow.Destination[types.Event] +} + +var ( + ErrNoSources = errors.New("no sources configured") + ErrNoDestinations = errors.New("no destinations configured") +) + +func (q *Queue) Validate() error { + if len(q.Sources) == 0 { + return ErrNoSources + } + if len(q.Destinations) == 0 { + return ErrNoDestinations + } + return nil +} + +func New(opts ...Option) *Queue { + var q Queue + + for _, opt := range opts { + opt(&q) + } + + return &q +} + +func (q *Queue) Run(ctx context.Context) error { + if err := q.Validate(); err != nil { + return err + } + + w := await.New(await.WithSignals) + + for _, s := range q.Sources { + if r, ok := s.(interface { + Run(context.Context) error + }); ok { + w.Add(r.Run) + } + } + + for _, s := range q.Destinations { + if r, ok := s.(interface { + Run(context.Context) error + }); ok { + w.Add(r.Run) + } + } + + multiDst := multi.NewMultiDestination(q.Destinations) + // w.Add(multiDst.Run) + + multiSrc := multi.NewMultiSource(q.Sources) + w.Add(multiSrc.Run) + + p, err := flow.New(flow.Config[types.Event, types.Event]{ + Source: multiSrc, + Destination: multiDst, + Handler: flow.Pipe[types.Event](), + // NOTE(alan): don't increase parallelism on this processor until we've + // verified thread safety thread-safe story. + }, flow.Parallelism(1)) + if err != nil { + return err + } + w.Add(p.Run) + + slog.Info("running queue") + err = w.Run(ctx) + slog.Error("await error", "error", err) + return err +} diff --git a/internal/sources/journald/journald.go b/internal/sources/journald/journald.go new file mode 100644 index 0000000..d66bc91 --- /dev/null +++ b/internal/sources/journald/journald.go @@ -0,0 +1,230 @@ +package journald + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "time" + + "github.com/runreveal/flow" + "github.com/runreveal/flow/internal/types" + "golang.org/x/exp/slog" +) + +type Journald struct { + msgC chan msgAck +} + +type msgAck struct { + msg flow.Message[types.Event] + ack func() +} + +func New() *Journald { + return &Journald{ + msgC: make(chan msgAck), + } +} + +func (s *Journald) Run(ctx context.Context) error { + return s.recvLoop(ctx) +} + +func (s *Journald) recvLoop(ctx context.Context) error { + // Open file to check and save high watermark + hwmFile, err := os.OpenFile("/tmp/flowd-journald-hwm", os.O_RDWR|os.O_CREATE, os.FileMode(0644)) + if err != nil { + return err + } + defer hwmFile.Close() + + // Read high watermark from file + bts, err := io.ReadAll(hwmFile) + if err != nil { + return err + } + + // Save high watermark to file + ack := func(cursor string) { + var err error + defer func() { + if err != nil { + slog.Error(fmt.Sprintf("writing high watermark: %+v", err)) + } + }() + err = hwmFile.Truncate(0) + if err != nil { + return + } + _, err = hwmFile.Seek(0, 0) + if err != nil { + return + } + _, err = hwmFile.WriteString(cursor) + if err != nil { + return + } + } + + args := []string{ + "journalctl", "-b", "-af", "-o", "json", + } + if len(bts) > 0 { + // Resume reading from the location of a previous invocation + args = append(args, "--after-cursor", string(bts)) + } else { + // Read all logs for this boot + args = append(args, "--since", "1970-01-01 00:00:00") + } + slog.Debug(fmt.Sprintf("running: `%s`", strings.Join(args, " "))) + + cmd := exec.Command(args[0], args[1:]...) + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + if err := cmd.Start(); err != nil { + return err + } + scanner := bufio.NewScanner(stdout) + var wg sync.WaitGroup + + for scanner.Scan() { + bts := make([]byte, len(scanner.Bytes())) + copy(bts, scanner.Bytes()) + + // Parse timestamp from log + log := autoGeneratedJournal{} + var ts time.Time + if err := json.Unmarshal(bts, &log); err != nil { + slog.Error(fmt.Sprintf("unmarshaling: %+v", err)) + continue + } else { + ts, err = parseUnixMicroseconds(log.RealtimeTimestamp) + if err != nil { + slog.Error(fmt.Sprintf("parsing timestamp: %+v", err)) + } + } + + wg.Add(1) + select { + case s.msgC <- msgAck{ + msg: flow.Message[types.Event]{ + Value: types.Event{ + Timestamp: ts, + SourceType: "journald", + RawLog: bts, + }, + }, + ack: func() { + ack(log.Cursor) + wg.Done() + }, + }: + case <-ctx.Done(): + return ctx.Err() + } + } + + c := make(chan struct{}) + go func() { + wg.Wait() + close(c) + }() + select { + case <-c: + case <-ctx.Done(): + return ctx.Err() + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("scanning: %+w", err) + } + return cmd.Wait() +} + +func (s *Journald) Recv(ctx context.Context) (flow.Message[types.Event], func(), error) { + select { + case <-ctx.Done(): + return flow.Message[types.Event]{}, nil, ctx.Err() + case pass := <-s.msgC: + return pass.msg, pass.ack, nil + } +} + +func parseUnixMicroseconds(s string) (time.Time, error) { + microseconds, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return time.Time{}, err + } + + // Convert microseconds to seconds and remainder microseconds + sec := microseconds / 1e6 + nsec := (microseconds % 1e6) * 1e3 + + // Create a new time.Time value + return time.Unix(sec, nsec), nil +} + +type journalMsg []byte + +func (jm *journalMsg) UnmarshalJSON(b []byte) error { + if len(b) == 0 { + return errors.New("unexpected end of JSON input for journalMsg") + } + var err error + switch b[0] { + case '"': + var s string + err = json.Unmarshal(b, &s) + if err != nil { + return err + } + *jm = []byte(s) + case '[': + var bts []byte + err = json.Unmarshal(b, &bts) + if err != nil { + return err + } + *jm = bts + default: + err = fmt.Errorf("unexpected character in journalMsg: %s. expecting string or list", string(b[0])) + } + return err +} + +// There are other fields, but these should be on just about every journald event +type autoGeneratedJournal struct { + Message journalMsg `json:"MESSAGE"` + // Unix Timestamp in Microseconds since epoch as string + RealtimeTimestamp string `json:"__REALTIME_TIMESTAMP"` + SyslogIdentifier string `json:"SYSLOG_IDENTIFIER"` + Hostname string `json:"_HOSTNAME"` + Cursor string `json:"__CURSOR"` + + // BootID string `json:"_BOOT_ID"` + // CapEffective string `json:"_CAP_EFFECTIVE"` + // Cmdline string `json:"_CMDLINE"` + // Comm string `json:"_COMM"` + // Exe string `json:"_EXE"` + // Gid string `json:"_GID"` + // MachineID string `json:"_MACHINE_ID"` + // MonotonicTimestamp string `json:"__MONOTONIC_TIMESTAMP"` + // Pid string `json:"_PID"` + // Priority string `json:"PRIORITY"` + // SelinuxContext string `json:"_SELINUX_CONTEXT"` + // SyslogFacility string `json:"SYSLOG_FACILITY"` + // SystemdCgroup string `json:"_SYSTEMD_CGROUP"` + // SystemdSlice string `json:"_SYSTEMD_SLICE"` + // SystemdUnit string `json:"_SYSTEMD_UNIT"` + // Transport string `json:"_TRANSPORT"` + // UID string `json:"_UID"` +} diff --git a/internal/sources/scanner.go b/internal/sources/scanner.go new file mode 100644 index 0000000..bc3c4fe --- /dev/null +++ b/internal/sources/scanner.go @@ -0,0 +1,89 @@ +package sources + +import ( + "bufio" + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/runreveal/flow" + "github.com/runreveal/flow/internal/types" +) + +type Scanner struct { + reader io.Reader + msgC chan msgAck +} + +type msgAck struct { + msg flow.Message[types.Event] + ack func() +} + +func NewScanner(reader io.Reader) *Scanner { + return &Scanner{ + reader: reader, + msgC: make(chan msgAck), + } +} + +func (s *Scanner) Run(ctx context.Context) error { + return s.recvLoop(ctx) +} + +func (s *Scanner) recvLoop(ctx context.Context) error { + scanner := bufio.NewScanner(s.reader) + var wg sync.WaitGroup + + for scanner.Scan() { + str := scanner.Text() + // NOTE: what does it mean to acknolwedge a message was successfully + // processed in the context of a file source? There isn't really an + // upstream to communicate with when consuming an io.Reader. + wg.Add(1) + select { + case s.msgC <- msgAck{ + msg: flow.Message[types.Event]{ + Value: types.Event{ + Timestamp: time.Now(), + SourceType: "reader", + RawLog: []byte(str), + }, + }, + ack: func() { + wg.Done() + }, + }: + case <-ctx.Done(): + return ctx.Err() + } + } + + c := make(chan struct{}) + go func() { + wg.Wait() + close(c) + }() + select { + case <-c: + case <-ctx.Done(): + return ctx.Err() + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("scanning: %+w", err) + } + + return nil +} + +func (s *Scanner) Recv(ctx context.Context) (flow.Message[types.Event], func(), error) { + select { + case <-ctx.Done(): + return flow.Message[types.Event]{}, nil, ctx.Err() + case pass := <-s.msgC: + return pass.msg, pass.ack, nil + } +} diff --git a/internal/sources/syslog/syslog.go b/internal/sources/syslog/syslog.go new file mode 100644 index 0000000..b33a295 --- /dev/null +++ b/internal/sources/syslog/syslog.go @@ -0,0 +1,89 @@ +package syslog + +import ( + "context" + "fmt" + "time" + + "github.com/runreveal/flow" + "github.com/runreveal/flow/internal/types" + "golang.org/x/exp/slog" + "gopkg.in/mcuadros/go-syslog.v2" +) + +type SyslogCfg struct { + Addr string `json:"addr"` +} + +type SyslogSource struct { + cfg SyslogCfg + server *syslog.Server + syslogPartsC syslog.LogPartsChannel +} + +func NewSyslogSource(cfg SyslogCfg) *SyslogSource { + server := syslog.NewServer() + channel := make(syslog.LogPartsChannel) + handler := syslog.NewChannelHandler(channel) + server.SetFormat(syslog.RFC3164) + server.SetHandler(handler) + return &SyslogSource{ + cfg: cfg, + server: server, + syslogPartsC: channel, + } +} + +func (s *SyslogSource) Run(ctx context.Context) error { + slog.Info(fmt.Sprintf("starting syslog server on socket %s", s.cfg.Addr)) + err := s.server.ListenUDP(s.cfg.Addr) + if err != nil { + return err + } + err = s.server.Boot() + if err != nil { + return err + } + done := make(chan struct{}) + go func() { + s.server.Wait() + close(done) + }() + select { + case <-ctx.Done(): + err := s.server.Kill() + return err + case <-done: + } + return nil +} + +func (s *SyslogSource) Recv(ctx context.Context) (flow.Message[types.Event], func(), error) { + select { + case logParts := <-s.syslogPartsC: + if content, ok := logParts["content"]; ok { + rawLog := []byte(content.(string)) + + ts := time.Now().UTC() + if timestamp, ok := logParts["timestamp"]; ok { + if ts, ok = timestamp.(time.Time); !ok { + ts = time.Now().UTC() + } + } + + msg := flow.Message[types.Event]{ + Value: types.Event{ + Timestamp: ts, + SourceType: "syslog", + RawLog: rawLog, + }, + } + return msg, nil, nil + } else { + fmt.Println("warn: found syslog without 'content' key") + } + case <-ctx.Done(): + return flow.Message[types.Event]{}, nil, ctx.Err() + } + panic("unreachable!") +} diff --git a/internal/types/flowd.go b/internal/types/flowd.go new file mode 100644 index 0000000..da17363 --- /dev/null +++ b/internal/types/flowd.go @@ -0,0 +1,9 @@ +package types + +import "time" + +type Event struct { + Timestamp time.Time `json:"ts"` + SourceType string `json:"sourceType"` + RawLog []byte `json:"rawLog"` +} diff --git a/x/batcher/batcher_test.go b/x/batcher/batcher_test.go index 3a5344d..9bac0bd 100644 --- a/x/batcher/batcher_test.go +++ b/x/batcher/batcher_test.go @@ -54,9 +54,9 @@ func TestBatcher(t *testing.T) { }(ctx, errc) writeMsgs := []flow.Message[string]{ - flow.Message[string]{Value: "hi"}, - flow.Message[string]{Value: "hello"}, - flow.Message[string]{Value: "bonjour"}, + {Value: "hi"}, + {Value: "hello"}, + {Value: "bonjour"}, } done := make(chan struct{}) @@ -97,8 +97,8 @@ func TestBatchFlushTimeout(t *testing.T) { }(ctx, errc) writeMsgs := []flow.Message[string]{ - flow.Message[string]{Value: "hi"}, - flow.Message[string]{Value: "hello"}, + {Value: "hi"}, + {Value: "hello"}, } done := make(chan struct{}) @@ -152,25 +152,20 @@ func TestBatcherErrors(t *testing.T) { t.Run("cancellation works", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - go func(c context.Context, ec chan error) { ec <- bat.Run(c) }(ctx, errc) cancel() - select { - case err := <-errc: - assert.EqualError(t, err, "context canceled") - } + err := <-errc + assert.EqualError(t, err, "context canceled") }) t.Run("cancellation works in deadlock", func(t *testing.T) { var ff = func(c context.Context, msgs []flow.Message[string]) error { - select { - case <-c.Done(): - return c.Err() - } + <-c.Done() + return c.Err() } bat := NewDestination[string](FlushFunc[string](ff), FlushLength(1)) @@ -196,9 +191,7 @@ func TestBatcherErrors(t *testing.T) { assert.NoError(t, err) cancel() - select { - case err := <-errc: - assert.EqualError(t, err, "context canceled") - } + err = <-errc + assert.EqualError(t, err, "context canceled") }) }