Skip to content

Commit af689cb

Browse files
committed
init code
1 parent 0b6bacf commit af689cb

9 files changed

+568
-1
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# outbox
1+
# outbox(开发中)
22
发件箱模式

constant.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package outbox
2+
3+
const (
4+
// MessageHeaderMsgIDKey 消息ID
5+
MessageHeaderMsgIDKey = "nilorg.outbox.msg.id"
6+
// MessageHeaderMsgTopicKey 消息主题
7+
MessageHeaderMsgTopicKey = "nilorg.outbox.msg.topic"
8+
// MessageHeaderMsgTypeKey 消息内容类型
9+
MessageHeaderMsgTypeKey = "nilorg.outbox.msg.type"
10+
// MessageHeaderMsgSendTimeKey 消息发送时间
11+
MessageHeaderMsgSendTimeKey = "nilorg.outbox.msg.sendtime"
12+
// MessageHeaderMsgCallbackKey 消息回调
13+
MessageHeaderMsgCallbackKey = "nilorg.outbox.msg.callback"
14+
// MessageHeaderMsgUserKey 消息用户
15+
MessageHeaderMsgUserKey = "nilorg.outbox.msg.user"
16+
)
17+
18+
const (
19+
// StatusNameFailed 失败
20+
StatusNameFailed = "failed"
21+
// StatusNameScheduled 列入计划
22+
StatusNameScheduled = "scheduled"
23+
// StatusNameSucceeded 成功
24+
StatusNameSucceeded = "succeeded"
25+
)

go.mod

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
11
module github.com/nilorg/outbox
22

33
go 1.14
4+
5+
require (
6+
github.com/bwmarrin/snowflake v0.3.0
7+
github.com/nilorg/eventbus v0.0.0-20200905025346-3add52e4e0bb
8+
gorm.io/gorm v1.20.0
9+
)

go.sum

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
2+
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
3+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
4+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5+
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
6+
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
7+
github.com/jinzhu/now v1.1.1 h1:g39TucaRWyV3dwDO++eEc6qf8TVIQ/Da48WmqjZ3i7E=
8+
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
9+
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
10+
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
11+
github.com/nilorg/eventbus v0.0.0-20200905025346-3add52e4e0bb h1:pk0EGadmcc9cUIkXEnnpKO2aElnPtrR2ESqcmev9BTA=
12+
github.com/nilorg/eventbus v0.0.0-20200905025346-3add52e4e0bb/go.mod h1:Od2iRC4F5lHdJc+ZUC92v+64Jg6vyM3lg5g5xOjtlV8=
13+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
14+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
15+
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
16+
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
17+
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
18+
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
19+
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
20+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
21+
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
22+
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
23+
gorm.io/gorm v1.20.0 h1:qfIlyaZvrF7kMWY3jBdEBXkXJ2M5MFYMTppjILxS3fQ=
24+
gorm.io/gorm v1.20.0/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=

gorm_transaction.go

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package outbox
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"reflect"
8+
"sync"
9+
"time"
10+
11+
"github.com/bwmarrin/snowflake"
12+
"github.com/nilorg/eventbus"
13+
"gorm.io/gorm"
14+
)
15+
16+
// NewGormTransaction ...
17+
func NewGormTransaction(db *gorm.DB, eventBus eventbus.EventBus) (outbox Transactioner, err error) {
18+
db.AutoMigrate(
19+
Published{},
20+
Received{},
21+
)
22+
var node *snowflake.Node
23+
if node, err = snowflake.NewNode(1); err != nil {
24+
return
25+
}
26+
outbox = &GormTransaction{
27+
db: db,
28+
eventBus: eventBus,
29+
node: node,
30+
}
31+
return
32+
}
33+
34+
// GormTransaction ...
35+
type GormTransaction struct {
36+
db *gorm.DB
37+
tx *gorm.DB
38+
txMutex sync.Mutex
39+
eventBus eventbus.EventBus
40+
node *snowflake.Node
41+
}
42+
43+
// Begin ...
44+
func (o *GormTransaction) Begin(ctx context.Context, opts ...*sql.TxOptions) (tx interface{}, err error) {
45+
o.txMutex.Lock()
46+
defer o.txMutex.Unlock()
47+
48+
gormTx := o.db.WithContext(ctx).Begin(opts...)
49+
if err = gormTx.Error; err != nil {
50+
return
51+
}
52+
o.tx = gormTx
53+
tx = gormTx
54+
return
55+
}
56+
57+
// Rollback ...
58+
func (o *GormTransaction) Rollback(ctx context.Context) (err error) {
59+
err = o.tx.WithContext(ctx).Rollback().Error
60+
return
61+
}
62+
63+
// Commit ...
64+
func (o *GormTransaction) Commit(ctx context.Context, args ...*CommitMessage) (err error) {
65+
if len(args) > 0 {
66+
for _, arg := range args {
67+
id := o.node.Generate().Int64()
68+
timeNow := time.Now()
69+
msg := &eventbus.Message{
70+
Header: eventbus.MessageHeader{
71+
MessageHeaderMsgIDKey: fmt.Sprint(id),
72+
MessageHeaderMsgTopicKey: arg.Topic,
73+
MessageHeaderMsgTypeKey: reflect.TypeOf(arg.Value).Name(),
74+
MessageHeaderMsgSendTimeKey: timeNow.Format("2006-01-02 15:04:05"),
75+
MessageHeaderMsgCallbackKey: arg.CallbackTopic,
76+
},
77+
Value: arg.Value,
78+
}
79+
var value string
80+
if value, err = encodeValue(msg); err != nil {
81+
return
82+
}
83+
p := &Published{
84+
ID: id,
85+
Version: "v1",
86+
Topic: arg.Topic,
87+
Value: value,
88+
Retries: 0,
89+
CreatedAt: timeNow,
90+
StatusName: StatusNameScheduled,
91+
}
92+
if pubErr := o.eventBus.Publish(ctx, arg.Topic, msg); pubErr != nil {
93+
// 使用日志组件,打印日志
94+
} else {
95+
p.StatusName = StatusNameSucceeded
96+
}
97+
// 记录发件箱 成功日志
98+
if err = o.insert(p); err != nil {
99+
return
100+
}
101+
}
102+
}
103+
err = o.tx.WithContext(ctx).Commit().Error
104+
return
105+
}
106+
107+
func (o *GormTransaction) insert(p *Published) error {
108+
return o.tx.Create(p).Error
109+
}
110+
111+
func (o *GormTransaction) changePublishState(msgID int64, state string) error {
112+
return o.db.Model(&Published{}).Where("id = ?", msgID).Updates(&Published{StatusName: state}).Error
113+
}
114+
115+
func (o *GormTransaction) changeReceiveState(msgID int64, state string) error {
116+
return o.db.Model(&Received{}).Where("id = ?", msgID).Updates(&Received{StatusName: state}).Error
117+
}

logger.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package outbox
2+
3+
import (
4+
"context"
5+
"log"
6+
)
7+
8+
// Logger logger
9+
type Logger interface {
10+
// Debugf 测试
11+
Debugf(ctx context.Context, format string, args ...interface{})
12+
// Debugln 测试
13+
Debugln(ctx context.Context, args ...interface{})
14+
// Infof 信息
15+
Infof(ctx context.Context, format string, args ...interface{})
16+
// Infoln 消息
17+
Infoln(ctx context.Context, args ...interface{})
18+
// Warnf 警告
19+
Warnf(ctx context.Context, format string, args ...interface{})
20+
// Warnln 警告
21+
Warnln(ctx context.Context, args ...interface{})
22+
// Warningf 警告
23+
Warningf(ctx context.Context, format string, args ...interface{})
24+
// Warningln 警告
25+
Warningln(ctx context.Context, args ...interface{})
26+
// Errorf 错误
27+
Errorf(ctx context.Context, format string, args ...interface{})
28+
// Errorln 错误
29+
Errorln(ctx context.Context, args ...interface{})
30+
}
31+
32+
// StdLogger ...
33+
type StdLogger struct {
34+
}
35+
36+
// Debugf 测试
37+
func (StdLogger) Debugf(ctx context.Context, format string, args ...interface{}) {
38+
log.Printf("[Debug] "+format, args...)
39+
}
40+
41+
// Debugln 测试
42+
func (StdLogger) Debugln(ctx context.Context, args ...interface{}) {
43+
nArgs := []interface{}{
44+
"[Debug]",
45+
}
46+
nArgs = append(nArgs, args...)
47+
log.Println(nArgs...)
48+
}
49+
50+
// Infof 信息
51+
func (StdLogger) Infof(ctx context.Context, format string, args ...interface{}) {
52+
log.Printf("[INFO] "+format, args...)
53+
}
54+
55+
// Infoln 消息
56+
func (StdLogger) Infoln(ctx context.Context, args ...interface{}) {
57+
nArgs := []interface{}{
58+
"[INFO]",
59+
}
60+
nArgs = append(nArgs, args...)
61+
log.Println(nArgs...)
62+
}
63+
64+
// Warnf 警告
65+
func (StdLogger) Warnf(ctx context.Context, format string, args ...interface{}) {
66+
log.Printf("[Warn] "+format, args...)
67+
}
68+
69+
// Warnln 警告
70+
func (StdLogger) Warnln(ctx context.Context, args ...interface{}) {
71+
nArgs := []interface{}{
72+
"[Warn]",
73+
}
74+
nArgs = append(nArgs, args...)
75+
log.Println(nArgs...)
76+
}
77+
78+
// Warningf 警告
79+
func (StdLogger) Warningf(ctx context.Context, format string, args ...interface{}) {
80+
log.Printf("[Warning] "+format, args...)
81+
}
82+
83+
// Warningln 警告
84+
func (StdLogger) Warningln(ctx context.Context, args ...interface{}) {
85+
nArgs := []interface{}{
86+
"[Warning]",
87+
}
88+
nArgs = append(nArgs, args...)
89+
log.Println(nArgs...)
90+
}
91+
92+
// Errorf 错误
93+
func (StdLogger) Errorf(ctx context.Context, format string, args ...interface{}) {
94+
log.Printf("[Error] "+format, args...)
95+
}
96+
97+
// Errorln 错误
98+
func (StdLogger) Errorln(ctx context.Context, args ...interface{}) {
99+
nArgs := []interface{}{
100+
"[Error]",
101+
}
102+
nArgs = append(nArgs, args...)
103+
log.Println(nArgs...)
104+
}

message.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package outbox
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/nilorg/eventbus"
7+
)
8+
9+
const (
10+
// Version 版本
11+
Version = "v1"
12+
)
13+
14+
// Message 消息
15+
type Message eventbus.Message
16+
17+
// IsCallback 存在回调
18+
func (m *Message) IsCallback() bool {
19+
_, ok := m.Header[MessageHeaderMsgCallbackKey]
20+
return ok
21+
}
22+
23+
// Callback 回调地址
24+
func (m *Message) Callback() string {
25+
callback, _ := m.Header[MessageHeaderMsgCallbackKey]
26+
return callback
27+
}
28+
29+
// IsUser 是否存在用户
30+
func (m *Message) IsUser() bool {
31+
_, ok := m.Header[MessageHeaderMsgUserKey]
32+
return ok
33+
}
34+
35+
// User 用户
36+
func (m *Message) User() string {
37+
user, _ := m.Header[MessageHeaderMsgUserKey]
38+
return user
39+
}
40+
41+
func encodeValue(v *eventbus.Message) (s string, err error) {
42+
var data []byte
43+
data, err = json.Marshal(v)
44+
if err != nil {
45+
return
46+
}
47+
s = string(data)
48+
return
49+
}

model.go

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package outbox
2+
3+
import "time"
4+
5+
// Published ...
6+
type Published struct {
7+
ID int64 `json:"id" gorm:"column:id;primaryKey;type:BIGINT(20)"`
8+
Version string `json:"version" gorm:"column:version;type:VARCHAR(20)"`
9+
Topic string `json:"topic" gorm:"column:topic;type:VARCHAR(200);not null"`
10+
Value string `json:"value" gorm:"column:value;type:LONGTEXT"`
11+
Retries int `json:"retries" gorm:"column:retries;type:INT(11)"`
12+
CreatedAt time.Time `json:"created_at" gorm:"column:created_at;type:DATETIME;not null"`
13+
ExpiresAt *time.Time `json:"expires_at" gorm:"column:expires_at;type:DATETIME"`
14+
StatusName string `json:"status_name" gorm:"index;column:status_name;type:VARCHAR(40);not null"`
15+
}
16+
17+
// Received ...
18+
type Received struct {
19+
ID int64 `json:"id" gorm:"column:id;primaryKey;type:BIGINT(20)"`
20+
Version string `json:"version" gorm:"column:version;type:VARCHAR(20)"`
21+
Topic string `json:"name" gorm:"column:name;type:VARCHAR(200);not null"`
22+
Group string `json:"group" gorm:"column:group;type:VARCHAR(200);not null"`
23+
Value string `json:"value" gorm:"column:value;type:LONGTEXT"`
24+
Retries int `json:"retries" gorm:"column:retries;type:INT(11)"`
25+
CreatedAt time.Time `json:"created_at" gorm:"column:created_at;type:DATETIME;not null"`
26+
ExpiresAt *time.Time `json:"expires_at" gorm:"column:expires_at;type:DATETIME"`
27+
StatusName string `json:"status_name" gorm:"index;column:status_name;type:VARCHAR(40);not null"`
28+
}
29+
30+
// CommitMessage 提交message
31+
type CommitMessage struct {
32+
Topic string
33+
Value interface{}
34+
CallbackTopic string
35+
}

0 commit comments

Comments
 (0)