Skip to content

Commit

Permalink
New code.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 committed Feb 7, 2025
1 parent c9dbf3c commit a1c196f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 33 deletions.
18 changes: 0 additions & 18 deletions pkg/tx/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,3 @@ func (q *Queue) List() []Tx {
func (q *Queue) Delete(ref string) error {
return nil
}

func (q *Queue) run() {
actions := make(chan Action)

for {
select {
case action := <-actions:
switch action {
case ActionIpDelete:

}
}
}
}

func ipDelete() error {
return nil
}
30 changes: 17 additions & 13 deletions pkg/tx/tx-store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,51 +20,53 @@ type txStore struct {
client *redis.Client
}

func NewTxStore(log *slog.Logger, client *redis.Client) (*txStore, error) {

result := client.XGroupCreateMkStream(context.Background(), stream, group, "$")

func NewTxStore(ctx context.Context, log *slog.Logger, client *redis.Client) (*txStore, error) {
result := client.XGroupCreateMkStream(ctx, stream, group, "$")
if result.Err() != nil {
return nil, result.Err()
}

return &txStore{
log: log,
client: client,
}, nil
}

func (t *txStore) AddTx(tx Tx) error {
func (t *txStore) AddTx(ctx context.Context, tx Tx) error {
serializedTx, err := json.Marshal(tx)
if err != nil {
return err
}
stringTx := base64.StdEncoding.EncodeToString(serializedTx)

stringTx := base64.StdEncoding.EncodeToString(serializedTx) // do we need to encode or not?
data := map[string]any{tx.Reference: stringTx}
//we have received an order here send it to
//redis has a function called xadd that we will use to add this to our stream
//you can read more about it on the link shared above.
err = t.client.XAdd(context.Background(), &redis.XAddArgs{
err = t.client.XAdd(ctx, &redis.XAddArgs{
///this is the name we want to give to our stream
///in our case we called it send_order_emails
//note you can have as many stream as possible
//such as one for email...another for notifications
ID: tx.Reference,
Stream: stream,
MaxLen: 0,
MaxLen: 0, // means unlimited
Approx: true,
//values is the data you want to send to the stream
//in our case we send a map with email and message keys
Values: data,
}).Err()
if err != nil {
return fmt.Errorf("unable to enqueu transaction:%w", err)
return fmt.Errorf("unable to enqueue transaction: %w", err)
}

return nil
}

func (t *txStore) Process() error {
func (t *txStore) Process(ctx context.Context) error {
// TODO: read the history of unprocessed jobs with 0, then just tail unprocessed jobs with >

for {
ctx := context.Background()
data, err := t.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Streams: []string{stream, ">"},
Expand All @@ -77,6 +79,7 @@ func (t *txStore) Process() error {
if err != nil {
t.log.Error("unable to receive from stream", "error", err)
}

///we have received the data we should loop it and queue the messages
//so that our jobs can start processing
for _, result := range data {
Expand All @@ -87,6 +90,7 @@ func (t *txStore) Process() error {
t.log.Error("unable to decode tx to json bytes", "tx reference", txReference, "error", err)
continue
}

var tx Tx
err = json.Unmarshal(txJson, &tx)
if err != nil {
Expand All @@ -99,6 +103,7 @@ func (t *txStore) Process() error {
t.log.Error("unable to process tx", "tx reference", txReference, "error", err)
continue
}

acked := t.client.XAck(ctx, stream, message.ID)
if acked.Err() != nil {
t.log.Error("tx could not be acked", "error", err)
Expand All @@ -111,12 +116,11 @@ func (t *txStore) Process() error {

func (t *txStore) processTx(tx Tx) error {
for _, job := range tx.jobs {

switch job.Action {
case ActionIpDelete:
t.log.Info("delete ip", "uuid", job.ID)
}

}

return nil
}
4 changes: 2 additions & 2 deletions pkg/tx/tx-store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ func Test_txStore_AddTx(t *testing.T) {
mr := miniredis.RunT(t)
client := redis.NewClient(&redis.Options{Addr: mr.Addr()})

tr, err := NewTxStore(log, client)
tr, err := NewTxStore(ctx, log, client)
require.NoError(t, err)

if err := tr.AddTx(tt.tx); (err != nil) != tt.wantErr {
if err := tr.AddTx(ctx, tt.tx); (err != nil) != tt.wantErr {
t.Errorf("txStore.AddTx() error = %v, wantErr %v", err, tt.wantErr)
}

Expand Down

0 comments on commit a1c196f

Please sign in to comment.