Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

今日は, 川 #7

Merged
merged 1 commit into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: cheetahd
name: kawa

on:
push:
Expand Down
14 changes: 7 additions & 7 deletions .goreleaser.public.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ before:
# you may remove this if you don't need go generate
# - go generate ./...
builds:
- main: ./cmd/cheetahd
id: cheetahd
binary: cheetahd
- main: ./cmd/kawa
id: kawa
binary: kawa
goos:
- linux
- darwin
- windows

archives:
- format: tar.gz
id: cheetahd
id: kawa
# this name template makes the OS and Arch compatible with the results of uname.
name_template: >-
cheetahd-
kawa-
{{- .Os }}-
{{- if eq .Arch "386" }}i386
{{- else }}{{ .Arch }}{{ end }}
{{- if .Arm }}v{{ .Arm }}{{ end }}
# use zip for windows archives
builds: [cheetahd]
builds: [kawa]
format_overrides:
- goos: windows
format: zip
Expand All @@ -44,7 +44,7 @@ release:
# Default is extracted from the origin remote URL or empty if its private hosted.
github:
owner: runreveal
name: cheetah
name: kawa

checksum:
name_template: 'checksums.txt'
Expand Down
34 changes: 17 additions & 17 deletions cmd/cheetahd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package main
import (
"os"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/destinations"
"github.com/runreveal/chta/internal/destinations/runreveal"
"github.com/runreveal/chta/internal/sources"
"github.com/runreveal/chta/internal/sources/journald"
"github.com/runreveal/chta/internal/sources/syslog"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/destinations"
"github.com/runreveal/kawa/internal/destinations/runreveal"
"github.com/runreveal/kawa/internal/sources"
"github.com/runreveal/kawa/internal/sources/journald"
"github.com/runreveal/kawa/internal/sources/syslog"
"github.com/runreveal/kawa/internal/types"
"github.com/runreveal/lib/loader"
"golang.org/x/exp/slog"
// We could register and configure these in a separate package
Expand All @@ -19,28 +19,28 @@ import (
)

func init() {
loader.Register("scanner", func() loader.Builder[chta.Source[types.Event]] {
loader.Register("scanner", func() loader.Builder[kawa.Source[types.Event]] {
return &ScannerConfig{}
})
loader.Register("syslog", func() loader.Builder[chta.Source[types.Event]] {
loader.Register("syslog", func() loader.Builder[kawa.Source[types.Event]] {
return &SyslogConfig{}
})
loader.Register("journald", func() loader.Builder[chta.Source[types.Event]] {
loader.Register("journald", func() loader.Builder[kawa.Source[types.Event]] {
return &JournaldConfig{}
})

loader.Register("printer", func() loader.Builder[chta.Destination[types.Event]] {
loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] {
return &PrinterConfig{}
})
loader.Register("runreveal", func() loader.Builder[chta.Destination[types.Event]] {
loader.Register("runreveal", func() loader.Builder[kawa.Destination[types.Event]] {
return &RunRevealConfig{}
})
}

type ScannerConfig struct {
}

func (c *ScannerConfig) Configure() (chta.Source[types.Event], error) {
func (c *ScannerConfig) Configure() (kawa.Source[types.Event], error) {
slog.Info("configuring scanner")
return sources.NewScanner(os.Stdin), nil
}
Expand All @@ -49,7 +49,7 @@ type SyslogConfig struct {
Addr string `json:"addr"`
}

func (c *SyslogConfig) Configure() (chta.Source[types.Event], error) {
func (c *SyslogConfig) Configure() (kawa.Source[types.Event], error) {
slog.Info("configuring syslog")
return syslog.NewSyslogSource(syslog.SyslogCfg{
Addr: c.Addr,
Expand All @@ -59,7 +59,7 @@ func (c *SyslogConfig) Configure() (chta.Source[types.Event], error) {
type PrinterConfig struct {
}

func (c *PrinterConfig) Configure() (chta.Destination[types.Event], error) {
func (c *PrinterConfig) Configure() (kawa.Destination[types.Event], error) {
slog.Info("configuring printer")
return destinations.NewPrinter(os.Stdout), nil
}
Expand All @@ -68,7 +68,7 @@ type RunRevealConfig struct {
WebhookURL string `json:"webhookURL"`
}

func (c *RunRevealConfig) Configure() (chta.Destination[types.Event], error) {
func (c *RunRevealConfig) Configure() (kawa.Destination[types.Event], error) {
slog.Info("configuring runreveal")
return runreveal.New(
runreveal.WithWebhookURL(c.WebhookURL),
Expand All @@ -78,7 +78,7 @@ func (c *RunRevealConfig) Configure() (chta.Destination[types.Event], error) {
type JournaldConfig struct {
}

func (c *JournaldConfig) Configure() (chta.Source[types.Event], error) {
func (c *JournaldConfig) Configure() (kawa.Source[types.Event], error) {
slog.Info("configuring journald")
return journald.New(), nil
}
14 changes: 7 additions & 7 deletions cmd/cheetahd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"path"
"path/filepath"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/queue"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/queue"
"github.com/runreveal/kawa/internal/types"
"github.com/runreveal/lib/loader"
"github.com/spf13/cobra"
"golang.org/x/exp/slog"
Expand Down Expand Up @@ -75,8 +75,8 @@ variety of destinations.`,
}

type Config struct {
Sources []loader.Loader[chta.Source[types.Event]] `json:"sources"`
Destinations []loader.Loader[chta.Destination[types.Event]] `json:"destinations"`
Sources []loader.Loader[kawa.Source[types.Event]] `json:"sources"`
Destinations []loader.Loader[kawa.Destination[types.Event]] `json:"destinations"`

PProfAddr string `json:"pprof"`
}
Expand All @@ -103,7 +103,7 @@ func NewRunCommand() *cobra.Command {
slog.Info(fmt.Sprintf("config: %+v", config))

ctx := context.Background()
srcs := []chta.Source[types.Event]{}
srcs := []kawa.Source[types.Event]{}
for _, v := range config.Sources {
src, err := v.Configure()
if err != nil {
Expand All @@ -112,7 +112,7 @@ func NewRunCommand() *cobra.Command {
srcs = append(srcs, src)
}

dsts := []chta.Destination[types.Event]{}
dsts := []kawa.Destination[types.Event]{}
for _, v := range config.Destinations {
dst, err := v.Configure()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/runreveal/chta
module github.com/runreveal/kawa

go 1.18

Expand Down
6 changes: 3 additions & 3 deletions internal/destinations/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"io"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
)

type Printer struct {
Expand All @@ -17,7 +17,7 @@ func NewPrinter(writer io.Writer) *Printer {
return &Printer{writer: writer}
}

func (p *Printer) Send(ctx context.Context, ack func(), msg ...chta.Message[types.Event]) error {
func (p *Printer) Send(ctx context.Context, ack func(), msg ...kawa.Message[types.Event]) error {
for _, m := range msg {
_, err := fmt.Fprintf(p.writer, "%s\n", m.Value.RawLog)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions internal/destinations/runreveal/runreveal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"time"

"github.com/carlmjohnson/requests"
"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
batch "github.com/runreveal/chta/x/batcher"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
batch "github.com/runreveal/kawa/x/batcher"
"golang.org/x/exp/slog"
)

Expand Down Expand Up @@ -66,16 +66,16 @@ func (r *RunReveal) Run(ctx context.Context) error {
return r.batcher.Run(ctx)
}

func (r *RunReveal) Send(ctx context.Context, ack func(), msgs ...chta.Message[types.Event]) error {
func (r *RunReveal) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error {
return r.batcher.Send(ctx, ack, msgs...)
}

func (r *RunReveal) newReq() *requests.Builder {
return requests.New(r.reqConf)
}

// Flush sends the given messages of type chta.Message[type.Event] to the RunReveal api
func (r *RunReveal) Flush(ctx context.Context, msgs []chta.Message[types.Event]) error {
// Flush sends the given messages of type kawa.Message[type.Event] to the RunReveal api
func (r *RunReveal) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error {
batch := make([]json.RawMessage, len(msgs))
var err error
for i, msg := range msgs {
Expand Down
20 changes: 10 additions & 10 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,30 @@ import (
"context"
"errors"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/chta/x/multi"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
"github.com/runreveal/kawa/x/multi"
"github.com/runreveal/lib/await"
"golang.org/x/exp/slog"
)

type Option func(*Queue)

func WithSources(srcs []chta.Source[types.Event]) Option {
func WithSources(srcs []kawa.Source[types.Event]) Option {
return func(q *Queue) {
q.Sources = srcs
}
}

func WithDestinations(dsts []chta.Destination[types.Event]) Option {
func WithDestinations(dsts []kawa.Destination[types.Event]) Option {
return func(q *Queue) {
q.Destinations = dsts
}
}

type Queue struct {
Sources []chta.Source[types.Event]
Destinations []chta.Destination[types.Event]
Sources []kawa.Source[types.Event]
Destinations []kawa.Destination[types.Event]
}

var (
Expand Down Expand Up @@ -84,13 +84,13 @@ func (q *Queue) Run(ctx context.Context) error {
multiSrc := multi.NewMultiSource(q.Sources)
w.Add(multiSrc.Run)

p, err := chta.New(chta.Config[types.Event, types.Event]{
p, err := kawa.New(kawa.Config[types.Event, types.Event]{
Source: multiSrc,
Destination: multiDst,
Handler: chta.Pipe[types.Event](),
Handler: kawa.Pipe[types.Event](),
// NOTE(alan): don't increase parallelism on this processor until we've
// verified thread safety thread-safe story.
}, chta.Parallelism(1))
}, kawa.Parallelism(1))
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions internal/sources/journald/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"sync"
"time"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
"golang.org/x/exp/slog"
)

Expand All @@ -24,7 +24,7 @@ type Journald struct {
}

type msgAck struct {
msg chta.Message[types.Event]
msg kawa.Message[types.Event]
ack func()
}

Expand All @@ -40,7 +40,7 @@ func (s *Journald) Run(ctx context.Context) error {

func (s *Journald) recvLoop(ctx context.Context) error {
// Open file to check and save high watermark
hwmFile, err := os.OpenFile("/tmp/chtad-journald-hwm", os.O_RDWR|os.O_CREATE, os.FileMode(0644))
hwmFile, err := os.OpenFile("/tmp/kawad-journald-hwm", os.O_RDWR|os.O_CREATE, os.FileMode(0644))
if err != nil {
return err
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func (s *Journald) recvLoop(ctx context.Context) error {
wg.Add(1)
select {
case s.msgC <- msgAck{
msg: chta.Message[types.Event]{
msg: kawa.Message[types.Event]{
Value: types.Event{
Timestamp: ts,
SourceType: "journald",
Expand Down Expand Up @@ -150,10 +150,10 @@ func (s *Journald) recvLoop(ctx context.Context) error {
return cmd.Wait()
}

func (s *Journald) Recv(ctx context.Context) (chta.Message[types.Event], func(), error) {
func (s *Journald) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) {
select {
case <-ctx.Done():
return chta.Message[types.Event]{}, nil, ctx.Err()
return kawa.Message[types.Event]{}, nil, ctx.Err()
case pass := <-s.msgC:
return pass.msg, pass.ack, nil
}
Expand Down
12 changes: 6 additions & 6 deletions internal/sources/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"sync"
"time"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
)

type Scanner struct {
Expand All @@ -18,7 +18,7 @@ type Scanner struct {
}

type msgAck struct {
msg chta.Message[types.Event]
msg kawa.Message[types.Event]
ack func()
}

Expand All @@ -45,7 +45,7 @@ func (s *Scanner) recvLoop(ctx context.Context) error {
wg.Add(1)
select {
case s.msgC <- msgAck{
msg: chta.Message[types.Event]{
msg: kawa.Message[types.Event]{
Value: types.Event{
Timestamp: time.Now(),
SourceType: "reader",
Expand Down Expand Up @@ -79,10 +79,10 @@ func (s *Scanner) recvLoop(ctx context.Context) error {
return nil
}

func (s *Scanner) Recv(ctx context.Context) (chta.Message[types.Event], func(), error) {
func (s *Scanner) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) {
select {
case <-ctx.Done():
return chta.Message[types.Event]{}, nil, ctx.Err()
return kawa.Message[types.Event]{}, nil, ctx.Err()
case pass := <-s.msgC:
return pass.msg, pass.ack, nil
}
Expand Down
Loading