From 59d390f1e5a60bbccc02002822bbde3f21f26245 Mon Sep 17 00:00:00 2001 From: Alan Braithwaite Date: Thu, 27 Jul 2023 11:07:29 -0700 Subject: [PATCH] =?UTF-8?q?=E4=BB=8A=E6=97=A5=E3=81=AF,=20=E5=B7=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/ci.yml | 2 +- .goreleaser.public.yaml | 14 ++++---- cmd/cheetahd/config.go | 34 ++++++++++---------- cmd/cheetahd/main.go | 14 ++++---- go.mod | 2 +- internal/destinations/printer.go | 6 ++-- internal/destinations/runreveal/runreveal.go | 12 +++---- internal/queue/queue.go | 20 ++++++------ internal/sources/journald/journald.go | 14 ++++---- internal/sources/scanner.go | 12 +++---- internal/sources/syslog/syslog.go | 10 +++--- processor.go | 4 +-- processor_test.go | 14 ++++---- test/stream_test.go | 16 ++++----- types.go | 2 +- x/batcher/batcher.go | 16 ++++----- x/batcher/batcher_test.go | 20 ++++++------ x/memory/memory.go | 10 +++--- x/multi/multidest.go | 8 ++--- x/multi/multisrc.go | 14 ++++---- 20 files changed, 122 insertions(+), 122 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0474e31..7043797 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,7 +1,7 @@ # This workflow will build a golang project # For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go -name: cheetahd +name: kawa on: push: diff --git a/.goreleaser.public.yaml b/.goreleaser.public.yaml index 285940f..5f0ef9a 100644 --- a/.goreleaser.public.yaml +++ b/.goreleaser.public.yaml @@ -7,9 +7,9 @@ before: # you may remove this if you don't need go generate # - go generate ./... builds: - - main: ./cmd/cheetahd - id: cheetahd - binary: cheetahd + - main: ./cmd/kawa + id: kawa + binary: kawa goos: - linux - darwin @@ -17,16 +17,16 @@ builds: archives: - format: tar.gz - id: cheetahd + id: kawa # this name template makes the OS and Arch compatible with the results of uname. name_template: >- - cheetahd- + kawa- {{- .Os }}- {{- if eq .Arch "386" }}i386 {{- else }}{{ .Arch }}{{ end }} {{- if .Arm }}v{{ .Arm }}{{ end }} # use zip for windows archives - builds: [cheetahd] + builds: [kawa] format_overrides: - goos: windows format: zip @@ -44,7 +44,7 @@ release: # Default is extracted from the origin remote URL or empty if its private hosted. github: owner: runreveal - name: cheetah + name: kawa checksum: name_template: 'checksums.txt' diff --git a/cmd/cheetahd/config.go b/cmd/cheetahd/config.go index e7c216a..94f9f02 100644 --- a/cmd/cheetahd/config.go +++ b/cmd/cheetahd/config.go @@ -3,13 +3,13 @@ package main import ( "os" - "github.com/runreveal/chta" - "github.com/runreveal/chta/internal/destinations" - "github.com/runreveal/chta/internal/destinations/runreveal" - "github.com/runreveal/chta/internal/sources" - "github.com/runreveal/chta/internal/sources/journald" - "github.com/runreveal/chta/internal/sources/syslog" - "github.com/runreveal/chta/internal/types" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/destinations" + "github.com/runreveal/kawa/internal/destinations/runreveal" + "github.com/runreveal/kawa/internal/sources" + "github.com/runreveal/kawa/internal/sources/journald" + "github.com/runreveal/kawa/internal/sources/syslog" + "github.com/runreveal/kawa/internal/types" "github.com/runreveal/lib/loader" "golang.org/x/exp/slog" // We could register and configure these in a separate package @@ -19,20 +19,20 @@ import ( ) func init() { - loader.Register("scanner", func() loader.Builder[chta.Source[types.Event]] { + loader.Register("scanner", func() loader.Builder[kawa.Source[types.Event]] { return &ScannerConfig{} }) - loader.Register("syslog", func() loader.Builder[chta.Source[types.Event]] { + loader.Register("syslog", func() loader.Builder[kawa.Source[types.Event]] { return &SyslogConfig{} }) - loader.Register("journald", func() loader.Builder[chta.Source[types.Event]] { + loader.Register("journald", func() loader.Builder[kawa.Source[types.Event]] { return &JournaldConfig{} }) - loader.Register("printer", func() loader.Builder[chta.Destination[types.Event]] { + loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] { return &PrinterConfig{} }) - loader.Register("runreveal", func() loader.Builder[chta.Destination[types.Event]] { + loader.Register("runreveal", func() loader.Builder[kawa.Destination[types.Event]] { return &RunRevealConfig{} }) } @@ -40,7 +40,7 @@ func init() { type ScannerConfig struct { } -func (c *ScannerConfig) Configure() (chta.Source[types.Event], error) { +func (c *ScannerConfig) Configure() (kawa.Source[types.Event], error) { slog.Info("configuring scanner") return sources.NewScanner(os.Stdin), nil } @@ -49,7 +49,7 @@ type SyslogConfig struct { Addr string `json:"addr"` } -func (c *SyslogConfig) Configure() (chta.Source[types.Event], error) { +func (c *SyslogConfig) Configure() (kawa.Source[types.Event], error) { slog.Info("configuring syslog") return syslog.NewSyslogSource(syslog.SyslogCfg{ Addr: c.Addr, @@ -59,7 +59,7 @@ func (c *SyslogConfig) Configure() (chta.Source[types.Event], error) { type PrinterConfig struct { } -func (c *PrinterConfig) Configure() (chta.Destination[types.Event], error) { +func (c *PrinterConfig) Configure() (kawa.Destination[types.Event], error) { slog.Info("configuring printer") return destinations.NewPrinter(os.Stdout), nil } @@ -68,7 +68,7 @@ type RunRevealConfig struct { WebhookURL string `json:"webhookURL"` } -func (c *RunRevealConfig) Configure() (chta.Destination[types.Event], error) { +func (c *RunRevealConfig) Configure() (kawa.Destination[types.Event], error) { slog.Info("configuring runreveal") return runreveal.New( runreveal.WithWebhookURL(c.WebhookURL), @@ -78,7 +78,7 @@ func (c *RunRevealConfig) Configure() (chta.Destination[types.Event], error) { type JournaldConfig struct { } -func (c *JournaldConfig) Configure() (chta.Source[types.Event], error) { +func (c *JournaldConfig) Configure() (kawa.Source[types.Event], error) { slog.Info("configuring journald") return journald.New(), nil } diff --git a/cmd/cheetahd/main.go b/cmd/cheetahd/main.go index f05dd21..ad2b8c3 100644 --- a/cmd/cheetahd/main.go +++ b/cmd/cheetahd/main.go @@ -7,9 +7,9 @@ import ( "path" "path/filepath" - "github.com/runreveal/chta" - "github.com/runreveal/chta/internal/queue" - "github.com/runreveal/chta/internal/types" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/queue" + "github.com/runreveal/kawa/internal/types" "github.com/runreveal/lib/loader" "github.com/spf13/cobra" "golang.org/x/exp/slog" @@ -75,8 +75,8 @@ variety of destinations.`, } type Config struct { - Sources []loader.Loader[chta.Source[types.Event]] `json:"sources"` - Destinations []loader.Loader[chta.Destination[types.Event]] `json:"destinations"` + Sources []loader.Loader[kawa.Source[types.Event]] `json:"sources"` + Destinations []loader.Loader[kawa.Destination[types.Event]] `json:"destinations"` PProfAddr string `json:"pprof"` } @@ -103,7 +103,7 @@ func NewRunCommand() *cobra.Command { slog.Info(fmt.Sprintf("config: %+v", config)) ctx := context.Background() - srcs := []chta.Source[types.Event]{} + srcs := []kawa.Source[types.Event]{} for _, v := range config.Sources { src, err := v.Configure() if err != nil { @@ -112,7 +112,7 @@ func NewRunCommand() *cobra.Command { srcs = append(srcs, src) } - dsts := []chta.Destination[types.Event]{} + dsts := []kawa.Destination[types.Event]{} for _, v := range config.Destinations { dst, err := v.Configure() if err != nil { diff --git a/go.mod b/go.mod index 0d1a466..44b4f50 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/runreveal/chta +module github.com/runreveal/kawa go 1.18 diff --git a/internal/destinations/printer.go b/internal/destinations/printer.go index 9e85baa..f537d9c 100644 --- a/internal/destinations/printer.go +++ b/internal/destinations/printer.go @@ -5,8 +5,8 @@ import ( "fmt" "io" - "github.com/runreveal/chta" - "github.com/runreveal/chta/internal/types" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/types" ) type Printer struct { @@ -17,7 +17,7 @@ func NewPrinter(writer io.Writer) *Printer { return &Printer{writer: writer} } -func (p *Printer) Send(ctx context.Context, ack func(), msg ...chta.Message[types.Event]) error { +func (p *Printer) Send(ctx context.Context, ack func(), msg ...kawa.Message[types.Event]) error { for _, m := range msg { _, err := fmt.Fprintf(p.writer, "%s\n", m.Value.RawLog) if err != nil { diff --git a/internal/destinations/runreveal/runreveal.go b/internal/destinations/runreveal/runreveal.go index b7361f3..48cbf6f 100644 --- a/internal/destinations/runreveal/runreveal.go +++ b/internal/destinations/runreveal/runreveal.go @@ -8,9 +8,9 @@ import ( "time" "github.com/carlmjohnson/requests" - "github.com/runreveal/chta" - "github.com/runreveal/chta/internal/types" - batch "github.com/runreveal/chta/x/batcher" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/types" + batch "github.com/runreveal/kawa/x/batcher" "golang.org/x/exp/slog" ) @@ -66,7 +66,7 @@ func (r *RunReveal) Run(ctx context.Context) error { return r.batcher.Run(ctx) } -func (r *RunReveal) Send(ctx context.Context, ack func(), msgs ...chta.Message[types.Event]) error { +func (r *RunReveal) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error { return r.batcher.Send(ctx, ack, msgs...) } @@ -74,8 +74,8 @@ func (r *RunReveal) newReq() *requests.Builder { return requests.New(r.reqConf) } -// Flush sends the given messages of type chta.Message[type.Event] to the RunReveal api -func (r *RunReveal) Flush(ctx context.Context, msgs []chta.Message[types.Event]) error { +// Flush sends the given messages of type kawa.Message[type.Event] to the RunReveal api +func (r *RunReveal) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error { batch := make([]json.RawMessage, len(msgs)) var err error for i, msg := range msgs { diff --git a/internal/queue/queue.go b/internal/queue/queue.go index e4090b1..ed7f21e 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -4,30 +4,30 @@ import ( "context" "errors" - "github.com/runreveal/chta" - "github.com/runreveal/chta/internal/types" - "github.com/runreveal/chta/x/multi" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/types" + "github.com/runreveal/kawa/x/multi" "github.com/runreveal/lib/await" "golang.org/x/exp/slog" ) type Option func(*Queue) -func WithSources(srcs []chta.Source[types.Event]) Option { +func WithSources(srcs []kawa.Source[types.Event]) Option { return func(q *Queue) { q.Sources = srcs } } -func WithDestinations(dsts []chta.Destination[types.Event]) Option { +func WithDestinations(dsts []kawa.Destination[types.Event]) Option { return func(q *Queue) { q.Destinations = dsts } } type Queue struct { - Sources []chta.Source[types.Event] - Destinations []chta.Destination[types.Event] + Sources []kawa.Source[types.Event] + Destinations []kawa.Destination[types.Event] } var ( @@ -84,13 +84,13 @@ func (q *Queue) Run(ctx context.Context) error { multiSrc := multi.NewMultiSource(q.Sources) w.Add(multiSrc.Run) - p, err := chta.New(chta.Config[types.Event, types.Event]{ + p, err := kawa.New(kawa.Config[types.Event, types.Event]{ Source: multiSrc, Destination: multiDst, - Handler: chta.Pipe[types.Event](), + Handler: kawa.Pipe[types.Event](), // NOTE(alan): don't increase parallelism on this processor until we've // verified thread safety thread-safe story. - }, chta.Parallelism(1)) + }, kawa.Parallelism(1)) if err != nil { return err } diff --git a/internal/sources/journald/journald.go b/internal/sources/journald/journald.go index 14c921c..ee2b85b 100644 --- a/internal/sources/journald/journald.go +++ b/internal/sources/journald/journald.go @@ -14,8 +14,8 @@ import ( "sync" "time" - "github.com/runreveal/chta" - "github.com/runreveal/chta/internal/types" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/types" "golang.org/x/exp/slog" ) @@ -24,7 +24,7 @@ type Journald struct { } type msgAck struct { - msg chta.Message[types.Event] + msg kawa.Message[types.Event] ack func() } @@ -40,7 +40,7 @@ func (s *Journald) Run(ctx context.Context) error { func (s *Journald) recvLoop(ctx context.Context) error { // Open file to check and save high watermark - hwmFile, err := os.OpenFile("/tmp/chtad-journald-hwm", os.O_RDWR|os.O_CREATE, os.FileMode(0644)) + hwmFile, err := os.OpenFile("/tmp/kawad-journald-hwm", os.O_RDWR|os.O_CREATE, os.FileMode(0644)) if err != nil { return err } @@ -117,7 +117,7 @@ func (s *Journald) recvLoop(ctx context.Context) error { wg.Add(1) select { case s.msgC <- msgAck{ - msg: chta.Message[types.Event]{ + msg: kawa.Message[types.Event]{ Value: types.Event{ Timestamp: ts, SourceType: "journald", @@ -150,10 +150,10 @@ func (s *Journald) recvLoop(ctx context.Context) error { return cmd.Wait() } -func (s *Journald) Recv(ctx context.Context) (chta.Message[types.Event], func(), error) { +func (s *Journald) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { select { case <-ctx.Done(): - return chta.Message[types.Event]{}, nil, ctx.Err() + return kawa.Message[types.Event]{}, nil, ctx.Err() case pass := <-s.msgC: return pass.msg, pass.ack, nil } diff --git a/internal/sources/scanner.go b/internal/sources/scanner.go index b592607..279d80f 100644 --- a/internal/sources/scanner.go +++ b/internal/sources/scanner.go @@ -8,8 +8,8 @@ import ( "sync" "time" - "github.com/runreveal/chta" - "github.com/runreveal/chta/internal/types" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/types" ) type Scanner struct { @@ -18,7 +18,7 @@ type Scanner struct { } type msgAck struct { - msg chta.Message[types.Event] + msg kawa.Message[types.Event] ack func() } @@ -45,7 +45,7 @@ func (s *Scanner) recvLoop(ctx context.Context) error { wg.Add(1) select { case s.msgC <- msgAck{ - msg: chta.Message[types.Event]{ + msg: kawa.Message[types.Event]{ Value: types.Event{ Timestamp: time.Now(), SourceType: "reader", @@ -79,10 +79,10 @@ func (s *Scanner) recvLoop(ctx context.Context) error { return nil } -func (s *Scanner) Recv(ctx context.Context) (chta.Message[types.Event], func(), error) { +func (s *Scanner) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { select { case <-ctx.Done(): - return chta.Message[types.Event]{}, nil, ctx.Err() + return kawa.Message[types.Event]{}, nil, ctx.Err() case pass := <-s.msgC: return pass.msg, pass.ack, nil } diff --git a/internal/sources/syslog/syslog.go b/internal/sources/syslog/syslog.go index 3d0b2c2..0ad3371 100644 --- a/internal/sources/syslog/syslog.go +++ b/internal/sources/syslog/syslog.go @@ -5,8 +5,8 @@ import ( "fmt" "time" - "github.com/runreveal/chta" - "github.com/runreveal/chta/internal/types" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/types" "golang.org/x/exp/slog" "gopkg.in/mcuadros/go-syslog.v2" ) @@ -58,7 +58,7 @@ func (s *SyslogSource) Run(ctx context.Context) error { return nil } -func (s *SyslogSource) Recv(ctx context.Context) (chta.Message[types.Event], func(), error) { +func (s *SyslogSource) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) { select { case logParts := <-s.syslogPartsC: if content, ok := logParts["content"]; ok { @@ -71,7 +71,7 @@ func (s *SyslogSource) Recv(ctx context.Context) (chta.Message[types.Event], fun } } - msg := chta.Message[types.Event]{ + msg := kawa.Message[types.Event]{ Value: types.Event{ Timestamp: ts, SourceType: "syslog", @@ -83,7 +83,7 @@ func (s *SyslogSource) Recv(ctx context.Context) (chta.Message[types.Event], fun fmt.Println("warn: found syslog without 'content' key") } case <-ctx.Done(): - return chta.Message[types.Event]{}, nil, ctx.Err() + return kawa.Message[types.Event]{}, nil, ctx.Err() } panic("unreachable!") } diff --git a/processor.go b/processor.go index 649b2cc..8c02f32 100644 --- a/processor.go +++ b/processor.go @@ -1,4 +1,4 @@ -package chta +package kawa import ( "context" @@ -39,7 +39,7 @@ func New[T1, T2 any](c Config[T1, T2], opts ...Option) (*Processor[T1, T2], erro return nil, errors.New("both Source and Destination required") } if c.Handler == nil { - return nil, errors.New("handler required. Have you considered chta.Pipe?") + return nil, errors.New("handler required. Have you considered kawa.Pipe?") } p := &Processor[T1, T2]{ src: c.Source, diff --git a/processor_test.go b/processor_test.go index f10eb67..c37fcad 100644 --- a/processor_test.go +++ b/processor_test.go @@ -1,12 +1,12 @@ -package chta_test +package kawa_test import ( "context" "fmt" "testing" - "github.com/runreveal/chta" - "github.com/runreveal/chta/x/memory" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/x/memory" ) type BinString string @@ -29,12 +29,12 @@ func TestProcessor(t *testing.T) { MsgC: outC, } - countMessages := chta.HandlerFunc[*BinString, *BinString]( - func(c context.Context, m chta.Message[*BinString]) ([]chta.Message[*BinString], error) { - return []chta.Message[*BinString]{m}, nil + countMessages := kawa.HandlerFunc[*BinString, *BinString]( + func(c context.Context, m kawa.Message[*BinString]) ([]kawa.Message[*BinString], error) { + return []kawa.Message[*BinString]{m}, nil }) - p, _ := chta.New[*BinString, *BinString](chta.Config[*BinString, *BinString]{ + p, _ := kawa.New[*BinString, *BinString](kawa.Config[*BinString, *BinString]{ Source: memSrc, Destination: memDst, Handler: (countMessages), diff --git a/test/stream_test.go b/test/stream_test.go index 26f1421..264cfd8 100644 --- a/test/stream_test.go +++ b/test/stream_test.go @@ -1,4 +1,4 @@ -package chta +package kawa import ( "context" @@ -6,8 +6,8 @@ import ( "sync" "testing" - "github.com/runreveal/chta" - "github.com/runreveal/chta/x/memory" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/x/memory" ) type BinString string @@ -31,7 +31,7 @@ func TestSmokeHappyPath(t *testing.T) { go func() { for i := 0; i < 10; i++ { x := BinString(fmt.Sprintf("hi-%d", i)) - err := dst.Send(context.TODO(), nil, chta.Message[*BinString]{Value: &x}) + err := dst.Send(context.TODO(), nil, kawa.Message[*BinString]{Value: &x}) if err != nil { t.Log(err) } @@ -64,7 +64,7 @@ func TestWow(t *testing.T) { go func() { for i := 0; i < 10; i++ { x := BinString(fmt.Sprintf("wow-%d", i)) - err := dst.Send(context.TODO(), nil, chta.Message[*BinString]{Value: &x}) + err := dst.Send(context.TODO(), nil, kawa.Message[*BinString]{Value: &x}) if err != nil { t.Log(err) } @@ -94,11 +94,11 @@ func NewMemSource(in <-chan *BinString) MemorySource { } } -func (ms MemorySource) Recv(ctx context.Context) (chta.Message[*BinString], func(), error) { +func (ms MemorySource) Recv(ctx context.Context) (kawa.Message[*BinString], func(), error) { select { case <-ctx.Done(): - return chta.Message[*BinString]{}, nil, ctx.Err() + return kawa.Message[*BinString]{}, nil, ctx.Err() case v := <-ms.MsgC: - return chta.Message[*BinString]{Value: v}, nil, nil + return kawa.Message[*BinString]{Value: v}, nil, nil } } diff --git a/types.go b/types.go index 546b00d..d303412 100644 --- a/types.go +++ b/types.go @@ -1,4 +1,4 @@ -package chta +package kawa import ( "context" diff --git a/x/batcher/batcher.go b/x/batcher/batcher.go index f2f8dcd..f9bdd96 100644 --- a/x/batcher/batcher.go +++ b/x/batcher/batcher.go @@ -5,16 +5,16 @@ import ( "sync" "time" - "github.com/runreveal/chta" + "github.com/runreveal/kawa" ) type Flusher[T any] interface { - Flush(context.Context, []chta.Message[T]) error + Flush(context.Context, []kawa.Message[T]) error } -type FlushFunc[T any] func(context.Context, []chta.Message[T]) error +type FlushFunc[T any] func(context.Context, []kawa.Message[T]) error -func (ff FlushFunc[T]) Flush(c context.Context, msgs []chta.Message[T]) error { +func (ff FlushFunc[T]) Flush(c context.Context, msgs []kawa.Message[T]) error { return ff(c, msgs) } @@ -87,16 +87,16 @@ func NewDestination[T any](f Flusher[T], opts ...OptFunc) *Destination[T] { } type msgAck[T any] struct { - msg chta.Message[T] + msg kawa.Message[T] ack func() } -// Send satisfies the chta.Destination interface and accepts messages to be +// Send satisfies the kawa.Destination interface and accepts messages to be // buffered for flushing after the FlushLength limit is reached or the // FlushFrequency timer fires, whichever comes first. // // Messages will not be acknowledged until they have been flushed successfully. -func (d *Destination[T]) Send(ctx context.Context, ack func(), msgs ...chta.Message[T]) error { +func (d *Destination[T]) Send(ctx context.Context, ack func(), msgs ...kawa.Message[T]) error { if len(msgs) < 1 { return nil } @@ -184,7 +184,7 @@ func (d *Destination[T]) flush(ctx context.Context) error { } func (d *Destination[T]) doflush(ctx context.Context, msgs []msgAck[T]) { - chtaMsgs := make([]chta.Message[T], 0, len(msgs)) + chtaMsgs := make([]kawa.Message[T], 0, len(msgs)) for _, m := range msgs { chtaMsgs = append(chtaMsgs, m.msg) } diff --git a/x/batcher/batcher_test.go b/x/batcher/batcher_test.go index 250b629..7c1a1fd 100644 --- a/x/batcher/batcher_test.go +++ b/x/batcher/batcher_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/pkg/errors" - "github.com/runreveal/chta" + "github.com/runreveal/kawa" "github.com/stretchr/testify/assert" ) @@ -27,7 +27,7 @@ func TestAckChu(t *testing.T) { } } -// func flushTest[T any](c context.Context, msgs []chta.Message[T]) { +// func flushTest[T any](c context.Context, msgs []kawa.Message[T]) { // for _, msg := range msgs { // fmt.Println(msg.Value) // } @@ -36,7 +36,7 @@ func TestAckChu(t *testing.T) { func TestBatcher(t *testing.T) { - var ff = func(c context.Context, msgs []chta.Message[string]) error { + var ff = func(c context.Context, msgs []kawa.Message[string]) error { for _, msg := range msgs { fmt.Println(msg.Value) } @@ -53,7 +53,7 @@ func TestBatcher(t *testing.T) { ec <- bat.Run(c) }(ctx, errc) - writeMsgs := []chta.Message[string]{ + writeMsgs := []kawa.Message[string]{ {Value: "hi"}, {Value: "hello"}, {Value: "bonjour"}, @@ -76,7 +76,7 @@ func TestBatchFlushTimeout(t *testing.T) { hMu := sync.Mutex{} handled := false - var ff = func(c context.Context, msgs []chta.Message[string]) error { + var ff = func(c context.Context, msgs []kawa.Message[string]) error { for _, msg := range msgs { fmt.Println(msg.Value) } @@ -96,7 +96,7 @@ func TestBatchFlushTimeout(t *testing.T) { ec <- bat.Run(c) }(ctx, errc) - writeMsgs := []chta.Message[string]{ + writeMsgs := []kawa.Message[string]{ {Value: "hi"}, {Value: "hello"}, } @@ -121,7 +121,7 @@ func TestBatchFlushTimeout(t *testing.T) { func TestBatcherErrors(t *testing.T) { flushErr := errors.New("flush error") - var ff = func(c context.Context, msgs []chta.Message[string]) error { + var ff = func(c context.Context, msgs []kawa.Message[string]) error { return flushErr } bat := NewDestination[string](FlushFunc[string](ff), FlushLength(1)) @@ -134,7 +134,7 @@ func TestBatcherErrors(t *testing.T) { ec <- bat.Run(c) }(ctx, errc) - writeMsgs := []chta.Message[string]{ + writeMsgs := []kawa.Message[string]{ {Value: "hi"}, } @@ -163,7 +163,7 @@ func TestBatcherErrors(t *testing.T) { t.Run("cancellation works in deadlock", func(t *testing.T) { - var ff = func(c context.Context, msgs []chta.Message[string]) error { + var ff = func(c context.Context, msgs []kawa.Message[string]) error { <-c.Done() return c.Err() } @@ -177,7 +177,7 @@ func TestBatcherErrors(t *testing.T) { ec <- bat.Run(c) }(ctx, errc) - writeMsgs := []chta.Message[string]{ + writeMsgs := []kawa.Message[string]{ // will be blocked flushing {Value: "hi"}, // will be stuck waiting for flush slot diff --git a/x/memory/memory.go b/x/memory/memory.go index c27845b..b6b260a 100644 --- a/x/memory/memory.go +++ b/x/memory/memory.go @@ -4,7 +4,7 @@ import ( "context" "errors" - "github.com/runreveal/chta" + "github.com/runreveal/kawa" ) type MemorySource[T any] struct { @@ -17,12 +17,12 @@ func NewMemSource[T any](in <-chan T) MemorySource[T] { } } -func (ms MemorySource[T]) Recv(ctx context.Context) (chta.Message[T], func(), error) { +func (ms MemorySource[T]) Recv(ctx context.Context) (kawa.Message[T], func(), error) { select { case <-ctx.Done(): - return chta.Message[T]{}, nil, ctx.Err() + return kawa.Message[T]{}, nil, ctx.Err() case v := <-ms.MsgC: - return chta.Message[T]{Value: v}, nil, nil + return kawa.Message[T]{Value: v}, nil, nil } } @@ -36,7 +36,7 @@ func NewMemDestination[T any](out chan<- T) MemoryDestination[T] { } } -func (ms MemoryDestination[T]) Send(ctx context.Context, ack func(), msgs ...chta.Message[T]) error { +func (ms MemoryDestination[T]) Send(ctx context.Context, ack func(), msgs ...kawa.Message[T]) error { for _, msg := range msgs { select { case <-ctx.Done(): diff --git a/x/multi/multidest.go b/x/multi/multidest.go index 9e728c6..612b122 100644 --- a/x/multi/multidest.go +++ b/x/multi/multidest.go @@ -3,21 +3,21 @@ package multi import ( "context" - "github.com/runreveal/chta" + "github.com/runreveal/kawa" ) type MultiDestination[T any] struct { - wrapped []chta.Destination[T] + wrapped []kawa.Destination[T] } // TODO: options for ack behavior? -func NewMultiDestination[T any](dests []chta.Destination[T]) MultiDestination[T] { +func NewMultiDestination[T any](dests []kawa.Destination[T]) MultiDestination[T] { return MultiDestination[T]{ wrapped: dests, } } -func (md MultiDestination[T]) Send(ctx context.Context, ack func(), msgs ...chta.Message[T]) error { +func (md MultiDestination[T]) Send(ctx context.Context, ack func(), msgs ...kawa.Message[T]) error { if ack != nil { ack = ackFn(ack, len(md.wrapped)) } diff --git a/x/multi/multisrc.go b/x/multi/multisrc.go index 21d48b7..e85b940 100644 --- a/x/multi/multisrc.go +++ b/x/multi/multisrc.go @@ -4,11 +4,11 @@ import ( "context" "sync" - "github.com/runreveal/chta" + "github.com/runreveal/kawa" ) type msgAck[T any] struct { - msg chta.Message[T] + msg kawa.Message[T] ack func() } @@ -16,12 +16,12 @@ type msgAck[T any] struct { // quickly and may have some performance issues. It definitely needs some work // on proper error handling, and concurrency issues on closing. type MultiSource[T any] struct { - wrapped []chta.Source[T] + wrapped []kawa.Source[T] msgAckC chan msgAck[T] } // TODO: options for ack behavior? -func NewMultiSource[T any](sources []chta.Source[T]) MultiSource[T] { +func NewMultiSource[T any](sources []kawa.Source[T]) MultiSource[T] { return MultiSource[T]{ wrapped: sources, msgAckC: make(chan msgAck[T]), @@ -41,7 +41,7 @@ func (ms MultiSource[T]) Run(ctx context.Context) error { for _, src := range ms.wrapped { wg.Add(1) - go func(src chta.Source[T]) { + go func(src kawa.Source[T]) { defer wg.Done() for { msg, ack, err := src.Recv(ctx) @@ -71,11 +71,11 @@ func (ms MultiSource[T]) Run(ctx context.Context) error { return err } -func (ms MultiSource[T]) Recv(ctx context.Context) (chta.Message[T], func(), error) { +func (ms MultiSource[T]) Recv(ctx context.Context) (kawa.Message[T], func(), error) { select { case ma := <-ms.msgAckC: return ma.msg, ma.ack, nil case <-ctx.Done(): - return chta.Message[T]{}, nil, ctx.Err() + return kawa.Message[T]{}, nil, ctx.Err() } }