diff --git a/src/constant/config/service.go b/src/constant/config/service.go index 26c1029..e42f4fe 100644 --- a/src/constant/config/service.go +++ b/src/constant/config/service.go @@ -30,5 +30,7 @@ const RelationRpcServerPort = ":37008" const RecommendRpcServiceName = "GuGoTik-Recommend" const RecommendRpcServicePort = ":37009" +const VideoProcessorRpcServiceName = "GuGoTik-VideoProcessorService" + const VideoPicker = "GuGoTik-VideoPicker" const Event = "GuGoTik-Recommend" diff --git a/src/constant/strings/err.go b/src/constant/strings/err.go index d595c6e..0d8d770 100644 --- a/src/constant/strings/err.go +++ b/src/constant/strings/err.go @@ -52,6 +52,8 @@ const ( FavoriteServiceError = "点赞服务内部出错" UserServiceInnerErrorCode = 50021 UserServiceInnerError = "登录服务出现内部错误,请稍后重试!" + UnableToQueryVideoErrorCode = 50022 + UnableToQueryVideoError = "无法查询到该视频" ) // Expected Error diff --git a/src/models/user.go b/src/models/user.go index feac4c1..56adeaf 100644 --- a/src/models/user.go +++ b/src/models/user.go @@ -41,10 +41,10 @@ func init() { // Create magic user (id = 0): show video summary and keywords, and act as ChatGPT magicUser := User{ - ID: 999999, + ID: 1, UserName: "ChatGPT", Password: "chatgpt", - Role: 0, + Role: 2, Avatar: "https://maples31-blog.oss-cn-beijing.aliyuncs.com/img/ChatGPT_logo.svg.png", BackgroundImage: "https://maples31-blog.oss-cn-beijing.aliyuncs.com/img/ChatGPT.jpg", Signature: "GuGoTik 小助手", diff --git a/src/services/auth/handler.go b/src/services/auth/handler.go index b7b1e7b..eb49d0d 100644 --- a/src/services/auth/handler.go +++ b/src/services/auth/handler.go @@ -404,7 +404,7 @@ func addMagicUserFriend(ctx context.Context, span *trace.Span, userId uint32) { logger := logging.LogService("AuthService.Register.AddMagicUserFriend").WithContext(ctx) isMagicUserExist, err := userClient.GetUserExistInformation(ctx, &user2.UserExistRequest{ - UserId: 999999, + UserId: 1, }) if err != nil { logger.WithFields(logrus.Fields{ @@ -426,7 +426,7 @@ func addMagicUserFriend(ctx context.Context, span *trace.Span, userId uint32) { // User follow magic user _, err = relationClient.Follow(ctx, &relation.RelationActionRequest{ ActorId: userId, - UserId: 999999, + UserId: 1, }) if err != nil { logger.WithFields(logrus.Fields{ @@ -439,7 +439,7 @@ func addMagicUserFriend(ctx context.Context, span *trace.Span, userId uint32) { // Magic user follow user _, err = relationClient.Follow(ctx, &relation.RelationActionRequest{ - ActorId: 999999, + ActorId: 1, UserId: userId, }) if err != nil { diff --git a/src/services/comment/handler.go b/src/services/comment/handler.go index 86c56ac..00322e5 100644 --- a/src/services/comment/handler.go +++ b/src/services/comment/handler.go @@ -395,7 +395,10 @@ func addComment(ctx context.Context, logger *logrus.Entry, span trace.Span, pUse // Rate comment go rateComment(logger, span, pCommentText, rComment.ID) + wg := sync.WaitGroup{} + wg.Add(1) go func() { + defer wg.Done() produceComment(ctx, models.RecommendEvent{ ActorId: pUser.Id, VideoId: []uint32{pVideoID}, @@ -403,6 +406,8 @@ func addComment(ctx context.Context, logger *logrus.Entry, span trace.Span, pUse Source: config.CommentRpcServerName, }) }() + wg.Wait() + resp = &comment.ActionCommentResponse{ StatusCode: strings.ServiceOKCode, StatusMsg: strings.ServiceOK, @@ -556,7 +561,7 @@ func reindexCommentList(commentList *[]models.Comment) { var commonComments []models.Comment for _, c := range *commentList { - if c.UserId == 999999 { + if c.UserId == 1 { magicComments = append(magicComments, c) } else { commonComments = append(commonComments, c) diff --git a/src/services/favorite/handler.go b/src/services/favorite/handler.go index 96d483b..e42d2f1 100644 --- a/src/services/favorite/handler.go +++ b/src/services/favorite/handler.go @@ -17,6 +17,7 @@ import ( "fmt" "github.com/streadway/amqp" "strconv" + "sync" "time" "github.com/redis/go-redis/v9" @@ -181,7 +182,10 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo pipe.ZAdd(ctx, user_like_Id, redis.Z{Score: float64(time.Now().Unix()), Member: req.VideoId}) return nil }) + wg := sync.WaitGroup{} + wg.Add(1) go func() { + defer wg.Done() produceFavorite(ctx, models.RecommendEvent{ ActorId: req.ActorId, VideoId: []uint32{req.VideoId}, @@ -189,6 +193,7 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo Source: config.FavoriteRpcServerName, }) }() + wg.Wait() if err == redis.Nil { err = nil } diff --git a/src/services/feed/handler.go b/src/services/feed/handler.go index 0a81ec4..769d6db 100644 --- a/src/services/feed/handler.go +++ b/src/services/feed/handler.go @@ -182,7 +182,10 @@ func (s FeedServiceImpl) ListVideos(ctx context.Context, request *feed.ListFeedR } return resp, err } + wg := sync.WaitGroup{} + wg.Add(1) go func() { + defer wg.Done() var videoLists []uint32 for _, item := range videos { videoLists = append(videoLists, item.Id) @@ -194,6 +197,7 @@ func (s FeedServiceImpl) ListVideos(ctx context.Context, request *feed.ListFeedR Source: config.FeedRpcServerName, }) }() + wg.Wait() resp = &feed.ListFeedResponse{ StatusCode: strings.ServiceOKCode, StatusMsg: strings.ServiceOK, @@ -269,8 +273,58 @@ func (s FeedServiceImpl) QueryVideoExisted(ctx context.Context, req *feed.VideoE return } -// QueryVideoSummaryAndKeywords TODO func (s FeedServiceImpl) QueryVideoSummaryAndKeywords(ctx context.Context, req *feed.QueryVideoSummaryAndKeywordsRequest) (resp *feed.QueryVideoSummaryAndKeywordsResponse, err error) { + ctx, span := tracing.Tracer.Start(ctx, "QueryVideoSummaryAndKeywordsService") + defer span.End() + logger := logging.LogService("FeedService.QueryVideoSummaryAndKeywords").WithContext(ctx) + + videoExistRes, err := s.QueryVideoExisted(ctx, &feed.VideoExistRequest{ + VideoId: req.VideoId, + }) + + if err != nil { + logger.WithFields(logrus.Fields{ + "VideoId": req.VideoId, + }).Errorf("Cannot check if the video exists") + logging.SetSpanError(span, err) + + resp = &feed.QueryVideoSummaryAndKeywordsResponse{ + StatusCode: strings.VideoServiceInnerErrorCode, + StatusMsg: strings.VideoServiceInnerError, + } + return + } + + if !videoExistRes.Existed { + resp = &feed.QueryVideoSummaryAndKeywordsResponse{ + StatusCode: strings.UnableToQueryVideoErrorCode, + StatusMsg: strings.UnableToQueryVideoError, + } + return + } + + video := models.Video{} + result := database.Client.WithContext(ctx).Where("id = ?", req.VideoId).First(&video) + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "VideoId": req.VideoId, + }).Errorf("Cannot get video from database") + logging.SetSpanError(span, err) + + resp = &feed.QueryVideoSummaryAndKeywordsResponse{ + StatusCode: strings.VideoServiceInnerErrorCode, + StatusMsg: strings.VideoServiceInnerError, + } + return + } + + resp = &feed.QueryVideoSummaryAndKeywordsResponse{ + StatusCode: strings.ServiceOKCode, + StatusMsg: strings.ServiceOK, + Summary: video.Summary, + Keywords: video.Keywords, + } + return } diff --git a/src/services/videoprocessor/main.go b/src/services/videoprocessor/main.go index 001263e..5ecfa0a 100644 --- a/src/services/videoprocessor/main.go +++ b/src/services/videoprocessor/main.go @@ -130,6 +130,7 @@ func main() { logger.Infof(strings.VideoSummary + " is running now") ConnectServiceClient() + defer CloseMQConn() wg := sync.WaitGroup{} wg.Add(1) diff --git a/src/services/videoprocessor/summary.go b/src/services/videoprocessor/summary.go index d90be8b..19d65f1 100644 --- a/src/services/videoprocessor/summary.go +++ b/src/services/videoprocessor/summary.go @@ -24,6 +24,7 @@ import ( "gorm.io/gorm/clause" "os/exec" "strings" + "sync" ) var userClient user.UserServiceClient @@ -32,11 +33,75 @@ var openaiClient = openai.NewClient(config.EnvCfg.ChatGPTAPIKEYS) var delayTime = int32(2 * 60 * 1000) //2 minutes var maxRetries = int32(3) +var conn *amqp.Connection +var channel *amqp.Channel + func ConnectServiceClient() { userRpcConn := grpc2.Connect(config.UserRpcServerName) userClient = user.NewUserServiceClient(userRpcConn) commentRpcConn := grpc2.Connect(config.CommentRpcServerName) commentClient = comment.NewCommentServiceClient(commentRpcConn) + + var err error + + conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr()) + exitOnError(err) + + channel, err = conn.Channel() + exitOnError(err) + + err = channel.ExchangeDeclare( + strings2.EventExchange, + "topic", + true, + false, + false, + false, + nil, + ) + exitOnError(err) +} + +func CloseMQConn() { + if err := conn.Close(); err != nil { + panic(err) + } + + if err := channel.Close(); err != nil { + panic(err) + } +} + +func produceKeywords(ctx context.Context, event models.RecommendEvent) { + ctx, span := tracing.Tracer.Start(ctx, "KeywordsEventPublisher") + defer span.End() + logger := logging.LogService("VideoSummaryService.KeywordsEventPublisher").WithContext(ctx) + data, err := json.Marshal(event) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when marshal the event model") + logging.SetSpanError(span, err) + return + } + + err = channel.Publish( + strings2.EventExchange, + strings2.FavoriteActionEvent, + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: data, + }, + ) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when publishing the event model") + logging.SetSpanError(span, err) + return + } } // errorHandler If `requeue` is false, it will just `Nack` it. If `requeue` is true, it will try to re-publish it. @@ -270,6 +335,23 @@ func SummaryConsume(channel *amqp.Channel) { } } + // Publish keywords event + if !keywordsExist && keywords != "" { + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + produceKeywords(ctx, models.RecommendEvent{ + ActorId: raw.ActorId, + VideoId: []uint32{raw.VideoId}, + Type: 3, + Source: config.VideoProcessorRpcServiceName, + }) + }() + wg.Wait() + } + + // Add magic comments isMagicUserExistRes := isMagicUserExist(ctx, logger, &span) if isMagicUserExistRes { logger.Debug("Magic user exist") @@ -534,7 +616,7 @@ func isKeywordsExist(videoId uint32) (res bool, keywords string, err error) { func isMagicUserExist(ctx context.Context, logger *logrus.Entry, span *trace.Span) bool { isMagicUserExistRes, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ - UserId: 999999, + UserId: 1, }) if err != nil { logger.WithFields(logrus.Fields{ @@ -554,7 +636,7 @@ func isMagicUserExist(ctx context.Context, logger *logrus.Entry, span *trace.Spa func addMagicComment(videoId uint32, content string, ctx context.Context, logger *logrus.Entry, span *trace.Span) { _, err := commentClient.ActionComment(ctx, &comment.ActionCommentRequest{ - ActorId: 999999, + ActorId: 1, VideoId: videoId, ActionType: comment.ActionCommentType_ACTION_COMMENT_TYPE_ADD, Action: &comment.ActionCommentRequest_CommentText{CommentText: content},