Skip to content

Commit

Permalink
(wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
rjeczalik committed Jun 30, 2024
1 parent 23e6677 commit 2455e85
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 18 deletions.
51 changes: 42 additions & 9 deletions pkg/plugin/builtin/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package event // import "hookt.dev/cmd/pkg/plugin/builtin/event"

import (
"context"
"fmt"
"log/slog"
"strconv"
"time"

"hookt.dev/cmd/pkg/check"
Expand All @@ -16,14 +18,14 @@ type Plugin struct {
wire.Config

p *proto.P
c map[chan proto.Message]struct{}
c map[chan proto.Message]string
mux chan proto.Message
stop chan (chan proto.Message)
}

func New(opts ...func(*Plugin)) *Plugin {
p := &Plugin{
c: make(map[chan proto.Message]struct{}),
c: make(map[chan proto.Message]string),
mux: make(chan proto.Message),
stop: make(chan chan proto.Message),
}
Expand Down Expand Up @@ -81,22 +83,30 @@ wire:
}

func (p *Plugin) process() {
type indexer interface {
Index() int
}

for {
select {
case c := <-p.stop:
delete(p.c, c)
close(c)
case msg := <-p.mux:
for c := range p.c {
c <- msg
}
go func() {
for c, id := range p.c {
fmt.Printf("\nDEBUG: MUXING EVENT TO %q: seq=%d\n\n", id, msg.(indexer).Index())
c <- msg
fmt.Printf("\nDEBUG: MUXED EVENT TO %q: seq=%d\n\n", id, msg.(indexer).Index())
}
}()
}
}
}

func (p *Plugin) Step(ctx context.Context) any {
c := make(chan proto.Message)
p.c[c] = struct{}{}
p.c[c] = trace.Get(ctx, "step")
it, _ := time.ParseDuration(p.Config.InactiveTimeout)
return &Step{
p: p,
Expand All @@ -118,6 +128,10 @@ func group(ctx context.Context, name string) context.Context {
}

func (s *Step) Run(ctx context.Context, c *check.S) error {
type indexer interface {
Index() int
}

slog.Debug("event: run",
"match", s.Match,
"pass", s.Pass,
Expand All @@ -144,7 +158,13 @@ func (s *Step) Run(ctx context.Context, c *check.S) error {
inactive := time.NewTimer(s.it)
defer inactive.Stop()

step := trace.Get(ctx, "step")

defer fmt.Printf("\nDEBUG: RUN DONE IN %q\n\n", step)

for {
fmt.Printf("\nDEBUG: WAITING FOR EVENT IN %q\n\n", step)

select {
case <-inactive.C:
c.Fail()
Expand All @@ -156,9 +176,19 @@ func (s *Step) Run(ctx context.Context, c *check.S) error {
}
inactive.Reset(s.it)

ctxt := ctx

if i, ok := msg.(indexer); ok {
ctxt = trace.With(ctxt, "event-seq", strconv.Itoa(i.Index()))

fmt.Printf("\nDEBUG: EVENT RECEIVED IN %q: seq=%d\n\n", step, i.Index())
} else {
fmt.Printf("\nDEBUG: EVENT RECEIVED IN %q\n\n", step)
}

obj := msg.Object()

match, err := match.Match(group(ctx, "match"), obj)
match, err := match.Match(group(ctxt, "match"), obj)
if err != nil {
return errors.New("failed to match on pattern: %w", err)
}
Expand All @@ -167,7 +197,7 @@ func (s *Step) Run(ctx context.Context, c *check.S) error {
continue
}

fail, err := fail.Match(group(ctx, "fail"), obj)
fail, err := fail.Match(group(ctxt, "fail"), obj)
if err != nil {
return errors.New("failed to match fail pattern: %w", err)
}
Expand All @@ -176,7 +206,7 @@ func (s *Step) Run(ctx context.Context, c *check.S) error {
return errors.New("failure pattern matched")
}

ok, err := pass.Match(group(ctx, "match"), obj)
ok, err := pass.Match(group(ctxt, "pass"), obj)
if err != nil {
return errors.New("failed to match ok pattern: %w", err)
}
Expand All @@ -191,8 +221,11 @@ func (s *Step) Run(ctx context.Context, c *check.S) error {
}

func (s *Step) Stop() {
fmt.Printf("\nDEBUG: STOPPING STEP\n\n")
s.p.stop <- s.c
fmt.Printf("\nDEBUG: STOPPED STEP\n\n")
s.drain()
fmt.Printf("\nDEBUG: DRAINED STEP\n\n")
}

func (s *Step) drain() {
Expand Down
15 changes: 11 additions & 4 deletions pkg/plugin/builtin/inline/inline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"os"
Expand Down Expand Up @@ -75,10 +76,13 @@ func (p *Plugin) publish(f *os.File) {

dec := json.NewDecoder(f)

for {
for index := 0; ; index++ {
var raw json.RawMessage

err := dec.Decode(&raw)

fmt.Printf("\nDEBUG: EVENT DECODED: seq=%d: %#v\n\n", index, err)

if isEOF(err) {
return
}
Expand All @@ -97,7 +101,9 @@ func (p *Plugin) publish(f *os.File) {
"bytes", len(raw),
)

p.c <- protowire.MakeMessage(raw)
p.c <- &protowire.Message{P: raw, I: index}

fmt.Printf("\nDEBUG: EVENT SENT: seq=%d\n\n", index)
case '[':
var msgs []json.RawMessage

Expand All @@ -109,12 +115,13 @@ func (p *Plugin) publish(f *os.File) {
return
}

for i := range msgs {
for i := 0; i < len(msgs); i, index = i+1, index+1 {
slog.Debug("inline: publish",
"bytes", len(msgs[i]),
)

p.c <- protowire.MakeMessage(msgs[i])
p.c <- &protowire.Message{P: msgs[i], I: index}
fmt.Printf("\nDEBUG: EVENT SENT: seq=%d\n\n", index)
}
default:
err = errors.New("unexpected JSON input")
Expand Down
5 changes: 2 additions & 3 deletions pkg/proto/wire/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ type Job struct {
}

type Message struct {
I int `json:"index,omitempty"`
P []byte `json:"bytes,omitempty"`
}

func MakeMessage(p json.RawMessage) *Message {
return &Message{P: p}
}
func (m *Message) Index() int { return m.I }

func (m *Message) Bytes() []byte { return m.P }

Expand Down
13 changes: 11 additions & 2 deletions pkg/trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,14 @@ func LogPattern() PatternTrace {
},
EqualMatch: func(ctx context.Context, want any, got any, ok bool) {
tags := append(attrs(ctx),
"want", fmt.Sprintf("%+[1]v (%[1]T)", want),
"got", fmt.Sprintf("%+[1]v (%[1]T)", got),
slog.Group("want",
"value", want,
"type", fmt.Sprintf("%T", want),
),
slog.Group("got",
"value", got,
"type", fmt.Sprintf("%T", got),
),
)
if !ok {
slog.Error("trace: EqualMatch", tags...)
Expand All @@ -128,6 +134,9 @@ func LogPattern() PatternTrace {
func attrs(ctx context.Context) []any {
var attrs []any

if seq := Get(ctx, "event-seq"); seq != "" {
attrs = append(attrs, "event-seq", seq)
}
if job := Get(ctx, "job"); job != "" {
attrs = append(attrs, "job", job)
}
Expand Down

0 comments on commit 2455e85

Please sign in to comment.