Skip to content

Commit

Permalink
Merge pull request #351 from maxekman/feature-109/persistent-schedule…
Browse files Browse the repository at this point in the history
…d-commands

109 / Add persistance to command scheduler
  • Loading branch information
maxekman authored Nov 26, 2021
2 parents c4089ff + c1fd089 commit bc7e608
Show file tree
Hide file tree
Showing 16 changed files with 921 additions and 99 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,27 @@ lint:

.PHONY: test
test:
go test -v -race -short ./...
go test -race -short ./...

.PHONY: test_cover
test_cover:
go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -v -race -short -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c
go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -race -short -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c
go run ./hack/coverage/coverage.go . unit.coverprofile
@find . -name \.coverprofile -type f -delete

.PHONY: test_integration
test_integration:
go test -v -race -run Integration ./...
go test -race -run Integration ./...

.PHONY: test_integration_cover
test_integration_cover:
go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -v -race -run Integration -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c
go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -race -run Integration -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c
go run ./hack/coverage/coverage.go . integration.coverprofile
@find . -name \.coverprofile -type f -delete

.PHONY: test_loadtest
test_loadtest:
go test -race -v -run Loadtest ./...
go test -race -run Loadtest ./...

.PHONY: test_all_docker
test_all_docker:
Expand Down
8 changes: 8 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,11 @@ type EventCodec interface {
// UnmarshalEvent unmarshals an event and supported parts of context from bytes.
UnmarshalEvent(context.Context, []byte) (Event, context.Context, error)
}

// CommandCodec is a codec for marshaling and unmarshaling commands to and from bytes.
type CommandCodec interface {
// MarshalCommand marshals a command and the supported parts of context into bytes.
MarshalCommand(context.Context, Command) ([]byte, error)
// UnmarshalCommand unmarshals a command and supported parts of context from bytes.
UnmarshalCommand(context.Context, []byte) (Command, context.Context, error)
}
93 changes: 91 additions & 2 deletions codec/acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package codec

import (
"context"
"reflect"
"testing"
"time"

Expand All @@ -26,21 +27,27 @@ import (

func init() {
eh.RegisterEventData(EventType, func() eh.EventData { return &EventData{} })

eh.RegisterCommand(func() eh.Command { return &Command{} })
}

const (
// EventType is a the type for Event.
EventType eh.EventType = "CodecEvent"
// AggregateType is the type for Aggregate.
AggregateType eh.AggregateType = "CodecAggregate"
// CommandType is the type for Command.
CommandType eh.CommandType = "CodecCommand"
)

// EventCodecAcceptanceTest is the acceptance test that all implementations of
// Codec should pass. It should manually be called from a test case in each
// EventCodec should pass. It should manually be called from a test case in each
// implementation:
//
// func TestEventCodec(t *testing.T) {
// c := EventCodec{}
// expectedBytes = []byte("")
// eventbus.AcceptanceTest(t, c, expectedBytes)
// codec.EventCodecAcceptanceTest(t, c, expectedBytes)
// }
//
func EventCodecAcceptanceTest(t *testing.T, c eh.EventCodec, expectedBytes []byte) {
Expand Down Expand Up @@ -117,3 +124,85 @@ type Nested struct {
String string
Number float64
}

// CommandCodecAcceptanceTest is the acceptance test that all implementations of
// CommandCodec should pass. It should manually be called from a test case in each
// implementation:
//
// func TestCommandCodec(t *testing.T) {
// c := CommandCodec{}
// expectedBytes = []byte("")
// codec.CommandCodecAcceptanceTest(t, c, expectedBytes)
// }
//
func CommandCodecAcceptanceTest(t *testing.T, c eh.CommandCodec, expectedBytes []byte) {
// Marshaling.
ctx := mocks.WithContextOne(context.Background(), "testval")
id := uuid.MustParse("10a7ec0f-7f2b-46f5-bca1-877b6e33c9fd")
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
cmd := &Command{
ID: id,
Bool: true,
String: "string",
Number: 42.0,
Slice: []string{"a", "b"},
Map: map[string]interface{}{"key": "value"}, // NOTE: Just one key to avoid compare issues.
Time: timestamp,
TimeRef: &timestamp,
Struct: Nested{
Bool: true,
String: "string",
Number: 42.0,
},
StructRef: &Nested{
Bool: true,
String: "string",
Number: 42.0,
},
}

b, err := c.MarshalCommand(ctx, cmd)
if err != nil {
t.Error("there should be no error:", err)
}

if string(b) != string(expectedBytes) {
t.Error("the encoded bytes should be correct:", string(b))
}

// Unmarshaling.
decodedCmd, decodedContext, err := c.UnmarshalCommand(context.Background(), b)
if err != nil {
t.Error("there should be no error:", err)
}

if !reflect.DeepEqual(decodedCmd, cmd) {
t.Error("the decoded command was incorrect:", err)
}

if val, ok := mocks.ContextOne(decodedContext); !ok || val != "testval" {
t.Error("the decoded context was incorrect:", decodedContext)
}
}

// Command is a mocked eventhorizon.Command, useful in testing.
type Command struct {
ID uuid.UUID
Bool bool
String string
Number float64
Slice []string
Map map[string]interface{}
Time time.Time
TimeRef *time.Time
NullTime *time.Time
Struct Nested
StructRef *Nested
NullStruct *Nested
}

var _ = eh.Command(&Command{})

func (t *Command) AggregateID() uuid.UUID { return t.ID }
func (t *Command) AggregateType() eh.AggregateType { return AggregateType }
func (t *Command) CommandType() eh.CommandType { return CommandType }
76 changes: 76 additions & 0 deletions codec/bson/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2021 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bson

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/bson"

eh "github.com/looplab/eventhorizon"
)

// CommandCodec is a codec for marshaling and unmarshaling commands
// to and from bytes in BSON format.
type CommandCodec struct{}

// MarshalCommand marshals a command into bytes in BSON format.
func (_ CommandCodec) MarshalCommand(ctx context.Context, cmd eh.Command) ([]byte, error) {
c := command{
CommandType: cmd.CommandType(),
Context: eh.MarshalContext(ctx),
}

var err error
if c.Command, err = bson.Marshal(cmd); err != nil {
return nil, fmt.Errorf("could not marshal command data: %w", err)
}

b, err := bson.Marshal(c)
if err != nil {
return nil, fmt.Errorf("could not marshal command: %w", err)
}

return b, nil
}

// UnmarshalCommand unmarshals a command from bytes in BSON format.
func (_ CommandCodec) UnmarshalCommand(ctx context.Context, b []byte) (eh.Command, context.Context, error) {
var c command
if err := bson.Unmarshal(b, &c); err != nil {
return nil, nil, fmt.Errorf("could not unmarshal command: %w", err)
}

cmd, err := eh.CreateCommand(c.CommandType)
if err != nil {
return nil, nil, fmt.Errorf("could not create command: %w", err)
}

if err := bson.Unmarshal(c.Command, cmd); err != nil {
return nil, nil, fmt.Errorf("could not unmarshal command data: %w", err)
}

ctx = eh.UnmarshalContext(ctx, c.Context)

return cmd, ctx, nil
}

// command is the internal structure used on the wire only.
type command struct {
CommandType eh.CommandType `bson:"command_type"`
Command bson.Raw `bson:"command"`
Context map[string]interface{} `bson:"context"`
}
33 changes: 33 additions & 0 deletions codec/bson/command_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2021 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bson

import (
"encoding/base64"
"testing"

"github.com/looplab/eventhorizon/codec"
)

func TestCommandCodec(t *testing.T) {
c := &CommandCodec{}

expectedBytes, err := base64.StdEncoding.DecodeString("jQEAAAJjb21tYW5kX3R5cGUADQAAAENvZGVjQ29tbWFuZAADY29tbWFuZAA5AQAAAmlkACUAAAAxMGE3ZWMwZi03ZjJiLTQ2ZjUtYmNhMS04NzdiNmUzM2M5ZmQACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVABHNsaWNlABcAAAACMAACAAAAYQACMQACAAAAYgAAA21hcAAUAAAAAmtleQAGAAAAdmFsdWUAAAl0aW1lAIA1U+AkAQAACXRpbWVyZWYAgDVT4CQBAAAKbnVsbHRpbWUAA3N0cnVjdAAvAAAACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVAAANzdHJ1Y3RyZWYALwAAAAhib29sAAECc3RyaW5nAAcAAABzdHJpbmcAAW51bWJlcgAAAAAAAABFQAAKbnVsbHN0cnVjdAAAA2NvbnRleHQAHgAAAAJjb250ZXh0X29uZQAIAAAAdGVzdHZhbAAAAA==")
if err != nil {
t.Error("could not decode expected bytes:", err)
}

codec.CommandCodecAcceptanceTest(t, c, expectedBytes)
}
File renamed without changes.
2 changes: 1 addition & 1 deletion codec/bson/codec_test.go → codec/bson/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

func TestEventCodec(t *testing.T) {
c := &EventCodec{}
expectedBytes, err := base64.StdEncoding.DecodeString("4QEAAAJldmVudF90eXBlAAsAAABDb2RlY0V2ZW50AANkYXRhAAwBAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAEc2xpY2UAFwAAAAIwAAIAAABhAAIxAAIAAABiAAADbWFwABQAAAACa2V5AAYAAAB2YWx1ZQAACXRpbWUAgDVT4CQBAAAJdGltZXJlZgCANVPgJAEAAApudWxsdGltZQADc3RydWN0AC8AAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAAA3N0cnVjdHJlZgAvAAAACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVAAApudWxsc3RydWN0AAAJdGltZXN0YW1wAIA1U+AkAQAAAmFnZ3JlZ2F0ZV90eXBlAAoAAABBZ2dyZWdhdGUAAl9pZAAlAAAAMTBhN2VjMGYtN2YyYi00NmY1LWJjYTEtODc3YjZlMzNjOWZkABB2ZXJzaW9uAAEAAAADbWV0YWRhdGEAEgAAAAFudW0AAAAAAAAARUAAA2NvbnRleHQAHgAAAAJjb250ZXh0X29uZQAIAAAAdGVzdHZhbAAAAA==")

expectedBytes, err := base64.StdEncoding.DecodeString("4QEAAAJldmVudF90eXBlAAsAAABDb2RlY0V2ZW50AANkYXRhAAwBAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAEc2xpY2UAFwAAAAIwAAIAAABhAAIxAAIAAABiAAADbWFwABQAAAACa2V5AAYAAAB2YWx1ZQAACXRpbWUAgDVT4CQBAAAJdGltZXJlZgCANVPgJAEAAApudWxsdGltZQADc3RydWN0AC8AAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAAA3N0cnVjdHJlZgAvAAAACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVAAApudWxsc3RydWN0AAAJdGltZXN0YW1wAIA1U+AkAQAAAmFnZ3JlZ2F0ZV90eXBlAAoAAABBZ2dyZWdhdGUAAl9pZAAlAAAAMTBhN2VjMGYtN2YyYi00NmY1LWJjYTEtODc3YjZlMzNjOWZkABB2ZXJzaW9uAAEAAAADbWV0YWRhdGEAEgAAAAFudW0AAAAAAAAARUAAA2NvbnRleHQAHgAAAAJjb250ZXh0X29uZQAIAAAAdGVzdHZhbAAAAA==")
if err != nil {
t.Error("could not decode expected bytes:", err)
}
Expand Down
75 changes: 75 additions & 0 deletions codec/json/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) 2021 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package json

import (
"context"
"encoding/json"
"fmt"

eh "github.com/looplab/eventhorizon"
)

// CommandCodec is a codec for marshaling and unmarshaling commands
// to and from bytes in JSON format.
type CommandCodec struct{}

// MarshalCommand marshals a command into bytes in JSON format.
func (_ CommandCodec) MarshalCommand(ctx context.Context, cmd eh.Command) ([]byte, error) {
c := command{
CommandType: cmd.CommandType(),
Context: eh.MarshalContext(ctx),
}

var err error
if c.Command, err = json.Marshal(cmd); err != nil {
return nil, fmt.Errorf("could not marshal command data: %w", err)
}

b, err := json.Marshal(c)
if err != nil {
return nil, fmt.Errorf("could not marshal command: %w", err)
}

return b, nil
}

// UnmarshalCommand unmarshals a command from bytes in JSON format.
func (_ CommandCodec) UnmarshalCommand(ctx context.Context, b []byte) (eh.Command, context.Context, error) {
var c command
if err := json.Unmarshal(b, &c); err != nil {
return nil, nil, fmt.Errorf("could not unmarshal command: %w", err)
}

cmd, err := eh.CreateCommand(c.CommandType)
if err != nil {
return nil, nil, fmt.Errorf("could not create command: %w", err)
}

if err := json.Unmarshal(c.Command, &cmd); err != nil {
return nil, nil, fmt.Errorf("could not unmarshal command data: %w", err)
}

ctx = eh.UnmarshalContext(ctx, c.Context)

return cmd, ctx, nil
}

// command is the internal structure used on the wire only.
type command struct {
CommandType eh.CommandType `json:"command_type"`
Command json.RawMessage `json:"command"`
Context map[string]interface{} `json:"context"`
}
Loading

0 comments on commit bc7e608

Please sign in to comment.