From e38dd6e8991024c2531f8f57eba8f7ffaa041247 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Wed, 30 Aug 2023 20:15:46 +0800 Subject: [PATCH 1/3] fix get message --- src/constant/strings/service.go | 5 ++-- src/services/message/handler.go | 43 ++++++++++++++++++++++++++------ src/services/msgconsumer/main.go | 8 +++++- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/constant/strings/service.go b/src/constant/strings/service.go index 1fc8054..9f3c1a5 100644 --- a/src/constant/strings/service.go +++ b/src/constant/strings/service.go @@ -1,8 +1,9 @@ package strings const ( - VideoExchange = "video_exchange" - EventExchange = "event" + VideoExchange = "video_exchange" + EventExchange = "event" + MessageExchange = "message_exchange" VideoPicker = "video_picker" VideoSummary = "video_summary" diff --git a/src/services/message/handler.go b/src/services/message/handler.go index ff6ba64..81d1945 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -59,15 +59,27 @@ func (c MessageServiceImpl) New() { if err != nil { failOnError(err, "Fialed to conenct to RabbitMQ") } + channel, err = conn.Channel() if err != nil { failOnError(err, "Failed to open a channel") } + + channel.ExchangeDeclare( + strings.MessageExchange, + "x-delayed-message", + true, false, false, false, + amqp.Table{ + "x-delayed-type": "direct", + }, + ) + _, err = channel.QueueDeclare( strings.MessageActionEvent, false, false, false, false, nil, ) + if err != nil { failOnError(err, "Failed to define queue") } @@ -279,14 +291,29 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) } rMessageList := make([]*chat.Message, 0, len(pMessageList)) - for _, pMessage := range pMessageList { - rMessageList = append(rMessageList, &chat.Message{ - Id: pMessage.ID, - Content: pMessage.Content, - CreateTime: uint64(pMessage.CreatedAt.UnixMilli()), - FromUserId: ptr.Ptr(pMessage.FromUserId), - ToUserId: ptr.Ptr(pMessage.ToUserId), - }) + if request.PreMsgTime == 0 { + for _, pMessage := range pMessageList { + + rMessageList = append(rMessageList, &chat.Message{ + Id: pMessage.ID, + Content: pMessage.Content, + CreateTime: uint64(pMessage.CreatedAt.UnixMilli()), + FromUserId: ptr.Ptr(pMessage.FromUserId), + ToUserId: ptr.Ptr(pMessage.ToUserId), + }) + } + } else { + for _, pMessage := range pMessageList { + if pMessage.ToUserId == request.ActorId { + rMessageList = append(rMessageList, &chat.Message{ + Id: pMessage.ID, + Content: pMessage.Content, + CreateTime: uint64(pMessage.CreatedAt.UnixMilli()), + FromUserId: ptr.Ptr(pMessage.FromUserId), + ToUserId: ptr.Ptr(pMessage.ToUserId), + }) + } + } } resp = &chat.ChatResponse{ diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 5f7af8c..1244717 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -45,7 +45,7 @@ func main() { msg, err := channel.Consume( strings.MessageActionEvent, "", - true, false, false, false, + false, false, false, false, nil, ) if err != nil { @@ -99,6 +99,12 @@ func main() { // logging.SetSpanError(span, err) return } + err = body.Ack(true) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when dealing with the ,essage...") + } } }() From 5a4c2001c28531395f393012162e77c96a25817a Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Wed, 30 Aug 2023 20:19:20 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/services/msgconsumer/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 1244717..184ed14 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -45,7 +45,7 @@ func main() { msg, err := channel.Consume( strings.MessageActionEvent, "", - false, false, false, false, + true, false, false, false, nil, ) if err != nil { @@ -99,12 +99,12 @@ func main() { // logging.SetSpanError(span, err) return } - err = body.Ack(true) + /* err = body.Ack(true) if err != nil { logger.WithFields(logrus.Fields{ "err": err, }).Errorf("Error when dealing with the ,essage...") - } + } */ } }() From 9b4e0d63825df8f3ee454b565ca7d12aeaf35966 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Wed, 30 Aug 2023 20:35:59 +0800 Subject: [PATCH 3/3] - --- src/constant/strings/service.go | 2 +- src/services/message/handler.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/constant/strings/service.go b/src/constant/strings/service.go index 9f3c1a5..f2a055f 100644 --- a/src/constant/strings/service.go +++ b/src/constant/strings/service.go @@ -13,5 +13,5 @@ const ( VideoCommentEvent = "video.comment.action" VideoPublishEvent = "video.publish.action" - MessageActionEvent = "message_send" + MessageActionEvent = "message.send" ) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index bff6730..70090ad 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -65,7 +65,7 @@ func (c MessageServiceImpl) New() { failOnError(err, "Failed to open a channel") } - channel.ExchangeDeclare( + err = channel.ExchangeDeclare( strings.MessageExchange, "x-delayed-message", true, false, false, false, @@ -73,6 +73,9 @@ func (c MessageServiceImpl) New() { "x-delayed-type": "direct", }, ) + if err != nil { + failOnError(err, "Failed to get exchange") + } _, err = channel.QueueDeclare( strings.MessageActionEvent,