Skip to content

Commit

Permalink
More flexible tx impl
Browse files Browse the repository at this point in the history
  • Loading branch information
majst01 committed Feb 11, 2025
1 parent dcf6630 commit 996568d
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 27 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/metal-stack/go-ipam v1.14.8
github.com/metal-stack/masterdata-api v0.11.5
github.com/metal-stack/metal-lib v0.19.2
github.com/metal-stack/security v0.9.2
github.com/metal-stack/security v0.9.3
github.com/metal-stack/v v1.0.3
github.com/open-policy-agent/opa v1.1.0
github.com/prometheus/client_golang v1.20.5
Expand All @@ -31,7 +31,7 @@ require (
github.com/urfave/cli/v2 v2.27.5
go.opentelemetry.io/otel/exporters/prometheus v0.56.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
golang.org/x/net v0.34.0
golang.org/x/net v0.35.0
golang.org/x/oauth2 v0.26.0
golang.org/x/sync v0.11.0
google.golang.org/protobuf v1.36.5
Expand Down Expand Up @@ -90,7 +90,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/cel-go v0.23.2 // indirect
github.com/google/flatbuffers v25.1.24+incompatible // indirect
github.com/google/flatbuffers v25.2.10+incompatible // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.2.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/cel-go v0.23.2 h1:UdEe3CvQh3Nv+E/j9r1Y//WO0K0cSyD7/y0bzyLIMI4=
github.com/google/cel-go v0.23.2/go.mod h1:52Pb6QsDbC5kvgxvZhiL9QX1oZEkcUF/ZqaPx1J5Wwo=
github.com/google/flatbuffers v25.1.24+incompatible h1:4wPqL3K7GzBd1CwyhSd3usxLKOaJN/AC6puCca6Jm7o=
github.com/google/flatbuffers v25.1.24+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q=
github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down Expand Up @@ -255,8 +255,8 @@ github.com/metal-stack/masterdata-api v0.11.5 h1:r7bYdhdVgOjCk6k7K/SCLlHALH23ZuM
github.com/metal-stack/masterdata-api v0.11.5/go.mod h1:Xk8kqxAp3NkAc2BX8yTIWgSlD77T897QSdRSluWvP4w=
github.com/metal-stack/metal-lib v0.19.2 h1:8/qOtRDr7V3/C35ltyxfvYw5d2x/PJHcuAyuHzf+tjo=
github.com/metal-stack/metal-lib v0.19.2/go.mod h1:9jql29GhlfKbzC3YVHgWtxv8bsuHo1IetuV8C66jbJE=
github.com/metal-stack/security v0.9.2 h1:WH5LYwKccoEgS532f5qwonS7zX4/PJJO1RQ6R6Md410=
github.com/metal-stack/security v0.9.2/go.mod h1:ENm5kPjqh4uYvn79sAIxd6GZBwtF2GSsGdkLELrB/D4=
github.com/metal-stack/security v0.9.3 h1:ZF5rGeZ4fIFe0DFFQWkXsUDCzODyjdrpvKmeaLOz9lo=
github.com/metal-stack/security v0.9.3/go.mod h1:ENm5kPjqh4uYvn79sAIxd6GZBwtF2GSsGdkLELrB/D4=
github.com/metal-stack/v v1.0.3 h1:Sh2oBlnxrCUD+mVpzfC8HiqL045YWkxs0gpTvkjppqs=
github.com/metal-stack/v v1.0.3/go.mod h1:YTahEu7/ishwpYKnp/VaW/7nf8+PInogkfGwLcGPdXg=
github.com/miekg/dns v1.1.58 h1:ca2Hdkz+cDg/7eNF6V56jjzuZ4aCAE+DbVkILdQWG/4=
Expand Down Expand Up @@ -464,8 +464,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/oauth2 v0.26.0 h1:afQXWNNaeC4nvZ0Ed9XvCCzXM6UHJG7iCg0W4fPqSBE=
golang.org/x/oauth2 v0.26.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
16 changes: 14 additions & 2 deletions pkg/tx/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log/slog"
"time"

"github.com/metal-stack/api-server/pkg/repository"
"github.com/redis/go-redis/v9"
)

Expand Down Expand Up @@ -52,9 +53,20 @@ type Queue struct {
txStore *txStore
}

func New(log *slog.Logger, client *redis.Client) (*Queue, error) {
func New(log *slog.Logger, client *redis.Client, repo repository.Repository) (*Queue, error) {
ctx := context.Background()
txStore, err := NewTxStore(ctx, log, client)

actionFns := actionFns{
ActionIpDelete: func(id string) error {
_, err := repo.IP(repository.ProjectScope("FIXME unscoped")).Delete(ctx, id)
if err != nil {
return err
}
return nil
},
}

txStore, err := NewTxStore(ctx, log, client, actionFns)
if err != nil {
return nil, err
}
Expand Down
32 changes: 22 additions & 10 deletions pkg/tx/tx-store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,27 @@ const (
group = "txStore"
)

type txStore struct {
log *slog.Logger
client *redis.Client
}
type (
txStore struct {
log *slog.Logger
client *redis.Client
actionFns actionFns
}

actionFns map[Action]actionFn
actionFn func(id string) error
)

func NewTxStore(ctx context.Context, log *slog.Logger, client *redis.Client) (*txStore, error) {
func NewTxStore(ctx context.Context, log *slog.Logger, client *redis.Client, actions actionFns) (*txStore, error) {
result := client.XGroupCreateMkStream(ctx, stream, group, "$")
if result.Err() != nil {
return nil, result.Err()
}

return &txStore{
log: log,
client: client,
log: log,
client: client,
actionFns: actions,
}, nil
}

Expand Down Expand Up @@ -116,9 +123,14 @@ func (t *txStore) Process(ctx context.Context) error {

func (t *txStore) processTx(tx Tx) error {
for _, job := range tx.Jobs {
switch job.Action {
case ActionIpDelete:
t.log.Info("delete ip", "uuid", job.ID)

action, ok := t.actionFns[job.Action]
if !ok {
return fmt.Errorf("no action func defined for action:%s", job.Action)
}
err := action(job.ID)
if err != nil {
return fmt.Errorf("error executing action: %s with id: %s error: %w", job.Action, job.ID, err)
}
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/tx/tx-store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ func Test_txStore_AddTx(t *testing.T) {

ctx := context.Background()
tests := []struct {
name string
tx *Tx
wantErr bool
name string
tx *Tx
actionFns actionFns
wantErr bool
}{
{
name: "simple",
tx: &Tx{Jobs: []Job{{ID: "j1", Action: ActionIpDelete}}},
name: "simple",
tx: &Tx{Jobs: []Job{{ID: "j1", Action: ActionIpDelete}}},
actionFns: actionFns{ActionIpDelete: func(id string) error {
return nil
}},
wantErr: false,
},
}
Expand All @@ -32,7 +36,7 @@ func Test_txStore_AddTx(t *testing.T) {
mr := miniredis.RunT(t)
client := redis.NewClient(&redis.Options{Addr: mr.Addr()})

tr, err := NewTxStore(ctx, log, client)
tr, err := NewTxStore(ctx, log, client, tt.actionFns)
require.NoError(t, err)

if err := tr.AddTx(ctx, tt.tx); (err != nil) != tt.wantErr {
Expand Down

0 comments on commit 996568d

Please sign in to comment.