@@ -3,6 +3,7 @@ package outbox
3
3
import (
4
4
"context"
5
5
"database/sql"
6
+ "encoding/json"
6
7
"fmt"
7
8
"reflect"
8
9
"sync"
@@ -62,15 +63,20 @@ func (e *gormEngine) newSubscribeHandler(topic, group string, h SubscribeHandler
62
63
return func (ctx context.Context , baseMsg * eventbus.Message ) (err error ) {
63
64
id := e .node .Generate ().Int64 ()
64
65
var value string
65
- if value , err = encodeValue (baseMsg ); err != nil {
66
+ if value , err = EncodeValue (baseMsg .Value ); err != nil {
67
+ return
68
+ }
69
+ baseMsg .Value = value
70
+ var bytes []byte
71
+ if bytes , err = json .Marshal (baseMsg ); err != nil {
66
72
return
67
73
}
68
74
r := & Received {
69
75
ID : id ,
70
76
Version : MessageVersion ,
71
77
Topic : topic ,
72
78
Group : group ,
73
- Value : value ,
79
+ Value : string ( bytes ) ,
74
80
Retries : 0 ,
75
81
CreatedAt : time .Now (),
76
82
StatusName : StatusNameScheduled ,
@@ -115,6 +121,10 @@ func (e *gormEngine) publish(ctx context.Context, db *gorm.DB, topic string, v i
115
121
if len (callback ) > 0 {
116
122
callbackName = callback [0 ]
117
123
}
124
+ var value string
125
+ if value , err = EncodeValue (v ); err != nil {
126
+ return
127
+ }
118
128
msg := & eventbus.Message {
119
129
Header : eventbus.MessageHeader {
120
130
MessageHeaderMsgIDKey : fmt .Sprint (id ),
@@ -123,17 +133,17 @@ func (e *gormEngine) publish(ctx context.Context, db *gorm.DB, topic string, v i
123
133
MessageHeaderMsgSendTimeKey : fmt .Sprint (timeNow .Unix ()),
124
134
MessageHeaderMsgCallbackKey : callbackName ,
125
135
},
126
- Value : v ,
136
+ Value : value ,
127
137
}
128
- var value string
129
- if value , err = encodeValue (msg ); err != nil {
138
+ var bytes [] byte
139
+ if bytes , err = json . Marshal (msg ); err != nil {
130
140
return
131
141
}
132
142
p := & Published {
133
143
ID : id ,
134
144
Version : MessageVersion ,
135
145
Topic : topic ,
136
- Value : value ,
146
+ Value : string ( bytes ) ,
137
147
Retries : 0 ,
138
148
CreatedAt : timeNow ,
139
149
StatusName : StatusNameScheduled ,
@@ -310,8 +320,8 @@ func (e *gormEngine) failedPublishedRetryInterval() (err error) {
310
320
if err = e .retriePublished (e .db , published .ID ); err != nil {
311
321
return
312
322
}
313
-
314
- if msg , msgErr := decodeValue ([] byte ( published .Value ) ); msgErr != nil {
323
+ var msg Message
324
+ if msgErr := DecodeValue ( published .Value , & msg ); msgErr != nil {
315
325
e .logger .Errorf (ctx , "failedRetryInterval-decodeValue error: %v" , msgErr )
316
326
} else {
317
327
if pubErr := e .eventBus .Publish (ctx , published .Topic , msg ); pubErr != nil {
@@ -356,16 +366,15 @@ func (e *gormEngine) failedReceivedRetryInterval() (err error) {
356
366
if err = e .retrieReceived (e .db , received .ID ); err != nil {
357
367
return
358
368
}
359
-
360
- if msg , msgErr := decodeValue ([] byte ( received .Value ) ); msgErr != nil {
369
+ var msg Message
370
+ if msgErr := DecodeValue ( received .Value , & msg ); msgErr != nil {
361
371
e .logger .Errorf (ctx , "failedRetryInterval-decodeValue error: %v" , msgErr )
362
372
} else {
363
373
hex := fmt .Sprintf ("%s-%s" , received .Topic , received .Group )
364
374
if subscribeItem , subscribeItemOk := e .subscribeItems [hex ]; ! subscribeItemOk || subscribeItem .h == nil {
365
375
e .logger .Warnf (ctx , "failedRetryInterval-subscribeItem %s error: %v" , hex , msgErr )
366
376
} else {
367
- subMsg := Message (* msg )
368
- if subErr := subscribeItem .h (ctx , & subMsg ); subErr != nil {
377
+ if subErr := subscribeItem .h (ctx , & msg ); subErr != nil {
369
378
e .logger .Errorf (ctx , "failedRetryInterval-subscribeItem %s execute error: %v" , hex , msgErr )
370
379
} else {
371
380
if err = e .changePublishStateSucceeded (e .db , received .ID ); err != nil {
0 commit comments