diff --git a/src/constant/strings/service.go b/src/constant/strings/service.go index 79b8b36..1fc8054 100644 --- a/src/constant/strings/service.go +++ b/src/constant/strings/service.go @@ -11,4 +11,6 @@ const ( VideoGetEvent = "video.get.action" VideoCommentEvent = "video.comment.action" VideoPublishEvent = "video.publish.action" + + MessageActionEvent = "message_send" ) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 053fcec..ff6ba64 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -15,12 +15,17 @@ import ( grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/ptr" + "GuGoTik/src/utils/rabbitmq" "context" + "encoding/json" "fmt" + + "time" + "github.com/go-redis/redis_rate/v10" "github.com/robfig/cron/v3" + "github.com/streadway/amqp" "gorm.io/gorm" - "time" "github.com/sirupsen/logrus" ) @@ -35,8 +40,40 @@ type MessageServiceImpl struct { chat.ChatServiceServer } +// 连接 +var conn *amqp.Connection +var channel *amqp.Channel + +//输出 + +func failOnError(err error, msg string) { + //打日志 + logging.Logger.Errorf("err %s", msg) + +} + func (c MessageServiceImpl) New() { + + var err error + conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr()) + if err != nil { + failOnError(err, "Fialed to conenct to RabbitMQ") + } + channel, err = conn.Channel() + if err != nil { + failOnError(err, "Failed to open a channel") + } + _, err = channel.QueueDeclare( + strings.MessageActionEvent, + false, false, false, false, + nil, + ) + if err != nil { + failOnError(err, "Failed to define queue") + } + userRpcConn := grpc2.Connect(config.UserRpcServerName) + userClient = user.NewUserServiceClient(userRpcConn) recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName) @@ -52,8 +89,9 @@ func (c MessageServiceImpl) New() { chatClient = chat.NewChatServiceClient(chatRpcConn) cronRunner := cron.New(cron.WithSeconds()) + //_, err := cronRunner.AddFunc("0 0 18 * * *", sendMagicMessage) // execute every 18:00 - _, err := cronRunner.AddFunc("@every 2m", sendMagicMessage) // execute every minute [for test] + _, err = cronRunner.AddFunc("@every 2m", sendMagicMessage) // execute every minute [for test] if err != nil { logging.Logger.WithFields(logrus.Fields{ @@ -62,8 +100,20 @@ func (c MessageServiceImpl) New() { } cronRunner.Start() + +} + +func CloseMQConn() { + if err := channel.Close(); err != nil { + failOnError(err, "close channel error") + } + if err := conn.Close(); err != nil { + failOnError(err, "close conn error") + } } +//发送消息 + var chatActionLimitKeyPrefix = config.EnvCfg.RedisPrefix + "chat_freq_limit" const chatActionMaxQPS = 3 @@ -266,10 +316,28 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context } //TO_DO 后面写mq? - result := database.Client.WithContext(ctx).Create(&message) - if result.Error != nil { + body, err := json.Marshal(message) + + if err != nil { + resp = &chat.ActionResponse{ + StatusCode: strings.UnableToAddMessageErrorCode, + StatusMsg: strings.UnableToAddMessageError, + } + return + } + headers := rabbitmq.InjectAMQPHeaders(ctx) + err = channel.Publish("", strings.MessageActionEvent, false, false, + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: body, + Headers: headers, + }) + // result := database.Client.WithContext(ctx).Create(&message) + + if err != nil { resp = &chat.ActionResponse{ StatusCode: strings.UnableToAddMessageErrorCode, StatusMsg: strings.UnableToAddMessageError, diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 3afe0e8..5f7af8c 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -1,10 +1,107 @@ package main import ( - "fmt" - "time" + "GuGoTik/src/constant/strings" + "GuGoTik/src/models" + "GuGoTik/src/storage/database" + "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/rabbitmq" + "context" + "encoding/json" + + "github.com/sirupsen/logrus" + "github.com/streadway/amqp" ) +func failOnError(err error, msg string) { + //打日志 + logging.Logger.Errorf("err %s", msg) + +} + func main() { - fmt.Println(time.Now().UTC().Format(time.RFC3339)) + conn, err := amqp.Dial(rabbitmq.BuildMQConnAddr()) + if err != nil { + failOnError(err, "Fialed to conenct to RabbitMQ") + } + defer func(conn *amqp.Connection) { + err := conn.Close() + failOnError(err, "Fialed to close conn") + }(conn) + channel, err := conn.Channel() + if err != nil { + failOnError(err, "Failed to open a channel") + } + + _, err = channel.QueueDeclare( + strings.MessageActionEvent, + false, false, false, false, + nil, + ) + if err != nil { + failOnError(err, "Failed to define queue") + } + + msg, err := channel.Consume( + strings.MessageActionEvent, + "", + true, false, false, false, + nil, + ) + if err != nil { + failOnError(err, "Failed to define queue") + } + + var foreever chan struct{} + + logger := logging.LogService("msgConsumer") + logger.Infof(strings.MessageActionEvent + " is running now") + go func() { + var message models.Message + for body := range msg { + if err := json.Unmarshal(body.Body, &message); err != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "err": err, + }).Errorf("Error when unmarshaling the prepare json body.") + return + } + + /* ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) + ctx, span := tracing.Tracer.Start(ctx, "message_send Service") + logger := logging.LogService("message_send").WithContext(ctx) + + if err := json.Unmarshal(body.Body, &message); err != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "err": err, + }).Errorf("Error when unmarshaling the prepare json body.") + logging.SetSpanError(span, err) + return + } */ + + pmessage := models.Message{ + ToUserId: message.ToUserId, + FromUserId: message.FromUserId, + ConversationId: message.ConversationId, + Content: message.Content, + } + logger.Info(pmessage) + result := database.Client.WithContext(context.Background()).Create(&pmessage) + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "err": result.Error, + }).Errorf("Error when insert message to database.") + // logging.SetSpanError(span, err) + return + } + } + }() + + <-foreever + }