Skip to content

Commit

Permalink
Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
majst01 committed Feb 11, 2025
1 parent d485570 commit 3d0aa8e
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 29 deletions.
87 changes: 79 additions & 8 deletions pkg/tx/tx-store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/redis/go-redis/v9"
)

const (
stream = "metal-tx"
group = "txStore"
stream = "metal:tx"
group = "apiserver"
)

type (
Expand All @@ -21,24 +24,38 @@ type (
client *redis.Client
messageIdToStart string
actionFns actionFns
processErrors []error
}

actionFns map[Action]actionFn
actionFn func(id string) error
)

func NewTxStore(ctx context.Context, log *slog.Logger, client *redis.Client, actions actionFns) (*txStore, error) {
// Check if group exists
infoResult := client.XInfoGroups(ctx, stream)
if infoResult.Err() != nil && !errors.Is(infoResult.Err(), redis.Nil) {
return nil, fmt.Errorf("xinfo group: %w", infoResult.Err())
}

result := client.XGroupCreateMkStream(ctx, stream, group, "$")
if result.Err() != nil {
return nil, result.Err()
if result.Err() != nil && !strings.Contains(result.Err().Error(), "BUSYGROUP") {
return nil, fmt.Errorf("xgroup create: %w", result.Err())
}

return &txStore{
store := &txStore{
log: log,
client: client,
actionFns: actions,
messageIdToStart: "0-0", // Start from beginning on startup, if set to ">" it starts with new unprocessed entries
}, nil
}
go func() {
err := store.Process(ctx)
if err != nil {
panic(err)
}
}()
return store, nil
}

func (t *txStore) AddTx(ctx context.Context, tx *Tx) error {
Expand Down Expand Up @@ -87,6 +104,7 @@ func (t *txStore) Process(ctx context.Context) error {
}).Result()
if err != nil {
t.log.Error("unable to receive from stream", "error", err)
t.processErrors = append(t.processErrors, err)
}

///we have received the data we should loop it and queue the messages
Expand All @@ -97,25 +115,29 @@ func (t *txStore) Process(ctx context.Context) error {
txJson, err := base64.StdEncoding.DecodeString(txString.(string))
if err != nil {
t.log.Error("unable to decode tx to json bytes", "tx reference", txReference, "error", err)
t.processErrors = append(t.processErrors, err)
continue
}

var tx Tx
err = json.Unmarshal(txJson, &tx)
if err != nil {
t.log.Error("unable to unmarshal tx to json", "tx reference", txReference, "error", err)
t.processErrors = append(t.processErrors, err)
continue
}

err = t.processTx(tx)
if err != nil {
t.log.Error("unable to process tx", "tx reference", txReference, "error", err)
t.processErrors = append(t.processErrors, err)
continue
}

acked := t.client.XAck(ctx, stream, message.ID)
if acked.Err() != nil {
t.log.Error("tx could not be acked", "error", err)
t.processErrors = append(t.processErrors, err)
}
t.messageIdToStart = message.ID
}
Expand All @@ -125,17 +147,66 @@ func (t *txStore) Process(ctx context.Context) error {
}

func (t *txStore) processTx(tx Tx) error {
var errs []error
for _, job := range tx.Jobs {

action, ok := t.actionFns[job.Action]
if !ok {
return fmt.Errorf("no action func defined for action:%s", job.Action)
errs = append(errs, fmt.Errorf("no action func defined for action:%s", job.Action))
}
err := action(job.ID)
if err != nil {
return fmt.Errorf("error executing action: %s with id: %s error: %w", job.Action, job.ID, err)
errs = append(errs, fmt.Errorf("error executing action: %s with id: %s error: %w", job.Action, job.ID, err))
}
}

if len(errs) > 0 {
return errors.Join(errs...)
}

return nil
}

type Pending struct {
ID string
Consumer string
Idle time.Duration
RetryCount int64
}

func (t *txStore) Pending(ctx context.Context) ([]Pending, error) {
pendingStreams, err := t.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: stream,
Group: group,
Start: "-",
End: "+",
Count: 10,
}).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return nil, err
}

var pendingTxs []Pending
for _, stream := range pendingStreams {
pendingTxs = append(pendingTxs, Pending{
ID: stream.ID,
Consumer: stream.Consumer,
Idle: stream.Idle,
RetryCount: stream.RetryCount,
})
}
return pendingTxs, nil
}

func (t *txStore) Info(ctx context.Context) (*redis.XInfoStream, error) {
streamInfo, err := t.client.XInfoStream(ctx, stream).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return nil, err
}

return streamInfo, nil
}

func (t *txStore) Errors() error {
return errors.Join(t.processErrors...)
}
63 changes: 42 additions & 21 deletions pkg/tx/tx-store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package tx

import (
"context"
"fmt"
"log/slog"
"os"
"testing"

"github.com/alicebob/miniredis/v2"
// "github.com/alicebob/miniredis/v2"
"github.com/davecgh/go-spew/spew"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
Expand All @@ -15,12 +16,21 @@ import (
func Test_txStore_AddTx(t *testing.T) {
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))

var alotJobs []Job
for i := range 2 {
alotJobs = append(alotJobs, Job{
ID: fmt.Sprintf("j%d", i),
Action: ActionIpDelete,
})
}

ctx := context.Background()
tests := []struct {
name string
tx *Tx
actionFns actionFns
wantErr bool
name string
tx *Tx
actionFns actionFns
wantErr bool
wantPending []Pending
}{
{
name: "simple",
Expand All @@ -31,32 +41,43 @@ func Test_txStore_AddTx(t *testing.T) {
}},
wantErr: false,
},
{
name: "pending",
tx: &Tx{Jobs: alotJobs},
actionFns: actionFns{ActionIpDelete: func(id string) error {
if id == "j1" {
return nil
}
return fmt.Errorf("unable to process:%s", id)
}},
wantErr: false,
wantPending: []Pending{{ID: "j2"}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mr := miniredis.RunT(t)
client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
realRedis := "localhost:6379"
// mr := miniredis.RunT(t)
client := redis.NewClient(&redis.Options{Addr: realRedis})

tr, err := NewTxStore(ctx, log, client, tt.actionFns)
ts, err := NewTxStore(ctx, log, client, tt.actionFns)
require.NoError(t, err)

if err := tr.AddTx(ctx, tt.tx); (err != nil) != tt.wantErr {
if err := ts.AddTx(ctx, tt.tx); (err != nil) != tt.wantErr {
t.Errorf("txStore.AddTx() error = %v, wantErr %v", err, tt.wantErr)
}

data, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "txStore", Streams: []string{"metal-tx", ">"}}).Result()
require.NoError(t, err)
require.Len(t, data, 1)

spew.Dump(data)
// t.Fail()
if len(tt.wantPending) > 0 {
pending, err := ts.Pending(ctx)
spew.Dump(pending)
require.NoError(t, err)
info, err := ts.Info(ctx)
require.NoError(t, err)
t.Logf("stream info:%#v", info)
t.Log(ts.Errors())
require.Equal(t, tt.wantPending, pending)
}

// go func() {
// err := tr.Process()
// if err != nil {
// t.Fail()
// }
// }()
})
}
}

0 comments on commit 3d0aa8e

Please sign in to comment.