Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event): add event publisher in VideoSummaryService9 #187

Merged
merged 3 commits into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/constant/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ const RelationRpcServerPort = ":37008"
const RecommendRpcServiceName = "GuGoTik-Recommend"
const RecommendRpcServicePort = ":37009"

const VideoProcessorRpcServiceName = "GuGoTik-VideoProcessorService"

const VideoPicker = "GuGoTik-VideoPicker"
const Event = "GuGoTik-Recommend"
2 changes: 2 additions & 0 deletions src/constant/strings/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
FavoriteServiceError = "点赞服务内部出错"
UserServiceInnerErrorCode = 50021
UserServiceInnerError = "登录服务出现内部错误,请稍后重试!"
UnableToQueryVideoErrorCode = 50022
UnableToQueryVideoError = "无法查询到该视频"
)

// Expected Error
Expand Down
4 changes: 2 additions & 2 deletions src/models/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func init() {

// Create magic user (id = 0): show video summary and keywords, and act as ChatGPT
magicUser := User{
ID: 999999,
ID: 1,
UserName: "ChatGPT",
Password: "chatgpt",
Role: 0,
Role: 2,
Avatar: "https://maples31-blog.oss-cn-beijing.aliyuncs.com/img/ChatGPT_logo.svg.png",
BackgroundImage: "https://maples31-blog.oss-cn-beijing.aliyuncs.com/img/ChatGPT.jpg",
Signature: "GuGoTik 小助手",
Expand Down
6 changes: 3 additions & 3 deletions src/services/auth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func addMagicUserFriend(ctx context.Context, span *trace.Span, userId uint32) {
logger := logging.LogService("AuthService.Register.AddMagicUserFriend").WithContext(ctx)

isMagicUserExist, err := userClient.GetUserExistInformation(ctx, &user2.UserExistRequest{
UserId: 999999,
UserId: 1,
})
if err != nil {
logger.WithFields(logrus.Fields{
Expand All @@ -426,7 +426,7 @@ func addMagicUserFriend(ctx context.Context, span *trace.Span, userId uint32) {
// User follow magic user
_, err = relationClient.Follow(ctx, &relation.RelationActionRequest{
ActorId: userId,
UserId: 999999,
UserId: 1,
})
if err != nil {
logger.WithFields(logrus.Fields{
Expand All @@ -439,7 +439,7 @@ func addMagicUserFriend(ctx context.Context, span *trace.Span, userId uint32) {

// Magic user follow user
_, err = relationClient.Follow(ctx, &relation.RelationActionRequest{
ActorId: 999999,
ActorId: 1,
UserId: userId,
})
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion src/services/comment/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,19 @@ func addComment(ctx context.Context, logger *logrus.Entry, span trace.Span, pUse
// Rate comment
go rateComment(logger, span, pCommentText, rComment.ID)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
produceComment(ctx, models.RecommendEvent{
ActorId: pUser.Id,
VideoId: []uint32{pVideoID},
Type: 2,
Source: config.CommentRpcServerName,
})
}()
wg.Wait()

resp = &comment.ActionCommentResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
Expand Down Expand Up @@ -556,7 +561,7 @@ func reindexCommentList(commentList *[]models.Comment) {
var commonComments []models.Comment

for _, c := range *commentList {
if c.UserId == 999999 {
if c.UserId == 1 {
magicComments = append(magicComments, c)
} else {
commonComments = append(commonComments, c)
Expand Down
5 changes: 5 additions & 0 deletions src/services/favorite/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"github.com/streadway/amqp"
"strconv"
"sync"
"time"

"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -181,14 +182,18 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo
pipe.ZAdd(ctx, user_like_Id, redis.Z{Score: float64(time.Now().Unix()), Member: req.VideoId})
return nil
})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
produceFavorite(ctx, models.RecommendEvent{
ActorId: req.ActorId,
VideoId: []uint32{req.VideoId},
Type: 1,
Source: config.FavoriteRpcServerName,
})
}()
wg.Wait()
if err == redis.Nil {
err = nil
}
Expand Down
56 changes: 55 additions & 1 deletion src/services/feed/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ func (s FeedServiceImpl) ListVideos(ctx context.Context, request *feed.ListFeedR
}
return resp, err
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
var videoLists []uint32
for _, item := range videos {
videoLists = append(videoLists, item.Id)
Expand All @@ -194,6 +197,7 @@ func (s FeedServiceImpl) ListVideos(ctx context.Context, request *feed.ListFeedR
Source: config.FeedRpcServerName,
})
}()
wg.Wait()
resp = &feed.ListFeedResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
Expand Down Expand Up @@ -269,8 +273,58 @@ func (s FeedServiceImpl) QueryVideoExisted(ctx context.Context, req *feed.VideoE
return
}

// QueryVideoSummaryAndKeywords TODO
func (s FeedServiceImpl) QueryVideoSummaryAndKeywords(ctx context.Context, req *feed.QueryVideoSummaryAndKeywordsRequest) (resp *feed.QueryVideoSummaryAndKeywordsResponse, err error) {
ctx, span := tracing.Tracer.Start(ctx, "QueryVideoSummaryAndKeywordsService")
defer span.End()
logger := logging.LogService("FeedService.QueryVideoSummaryAndKeywords").WithContext(ctx)

videoExistRes, err := s.QueryVideoExisted(ctx, &feed.VideoExistRequest{
VideoId: req.VideoId,
})

if err != nil {
logger.WithFields(logrus.Fields{
"VideoId": req.VideoId,
}).Errorf("Cannot check if the video exists")
logging.SetSpanError(span, err)

resp = &feed.QueryVideoSummaryAndKeywordsResponse{
StatusCode: strings.VideoServiceInnerErrorCode,
StatusMsg: strings.VideoServiceInnerError,
}
return
}

if !videoExistRes.Existed {
resp = &feed.QueryVideoSummaryAndKeywordsResponse{
StatusCode: strings.UnableToQueryVideoErrorCode,
StatusMsg: strings.UnableToQueryVideoError,
}
return
}

video := models.Video{}
result := database.Client.WithContext(ctx).Where("id = ?", req.VideoId).First(&video)
if result.Error != nil {
logger.WithFields(logrus.Fields{
"VideoId": req.VideoId,
}).Errorf("Cannot get video from database")
logging.SetSpanError(span, err)

resp = &feed.QueryVideoSummaryAndKeywordsResponse{
StatusCode: strings.VideoServiceInnerErrorCode,
StatusMsg: strings.VideoServiceInnerError,
}
return
}

resp = &feed.QueryVideoSummaryAndKeywordsResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
Summary: video.Summary,
Keywords: video.Keywords,
}

return
}

Expand Down
1 change: 1 addition & 0 deletions src/services/videoprocessor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func main() {
logger.Infof(strings.VideoSummary + " is running now")

ConnectServiceClient()
defer CloseMQConn()

wg := sync.WaitGroup{}
wg.Add(1)
Expand Down
86 changes: 84 additions & 2 deletions src/services/videoprocessor/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"gorm.io/gorm/clause"
"os/exec"
"strings"
"sync"
)

var userClient user.UserServiceClient
Expand All @@ -32,11 +33,75 @@ var openaiClient = openai.NewClient(config.EnvCfg.ChatGPTAPIKEYS)
var delayTime = int32(2 * 60 * 1000) //2 minutes
var maxRetries = int32(3)

var conn *amqp.Connection
var channel *amqp.Channel

func ConnectServiceClient() {
userRpcConn := grpc2.Connect(config.UserRpcServerName)
userClient = user.NewUserServiceClient(userRpcConn)
commentRpcConn := grpc2.Connect(config.CommentRpcServerName)
commentClient = comment.NewCommentServiceClient(commentRpcConn)

var err error

conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr())
exitOnError(err)

channel, err = conn.Channel()
exitOnError(err)

err = channel.ExchangeDeclare(
strings2.EventExchange,
"topic",
true,
false,
false,
false,
nil,
)
exitOnError(err)
}

func CloseMQConn() {
if err := conn.Close(); err != nil {
panic(err)
}

if err := channel.Close(); err != nil {
panic(err)
}
}

func produceKeywords(ctx context.Context, event models.RecommendEvent) {
ctx, span := tracing.Tracer.Start(ctx, "KeywordsEventPublisher")
defer span.End()
logger := logging.LogService("VideoSummaryService.KeywordsEventPublisher").WithContext(ctx)
data, err := json.Marshal(event)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when marshal the event model")
logging.SetSpanError(span, err)
return
}

err = channel.Publish(
strings2.EventExchange,
strings2.FavoriteActionEvent,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when publishing the event model")
logging.SetSpanError(span, err)
return
}
}

// errorHandler If `requeue` is false, it will just `Nack` it. If `requeue` is true, it will try to re-publish it.
Expand Down Expand Up @@ -270,6 +335,23 @@ func SummaryConsume(channel *amqp.Channel) {
}
}

// Publish keywords event
if !keywordsExist && keywords != "" {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
produceKeywords(ctx, models.RecommendEvent{
ActorId: raw.ActorId,
VideoId: []uint32{raw.VideoId},
Type: 3,
Source: config.VideoProcessorRpcServiceName,
})
}()
wg.Wait()
}

// Add magic comments
isMagicUserExistRes := isMagicUserExist(ctx, logger, &span)
if isMagicUserExistRes {
logger.Debug("Magic user exist")
Expand Down Expand Up @@ -534,7 +616,7 @@ func isKeywordsExist(videoId uint32) (res bool, keywords string, err error) {

func isMagicUserExist(ctx context.Context, logger *logrus.Entry, span *trace.Span) bool {
isMagicUserExistRes, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{
UserId: 999999,
UserId: 1,
})
if err != nil {
logger.WithFields(logrus.Fields{
Expand All @@ -554,7 +636,7 @@ func isMagicUserExist(ctx context.Context, logger *logrus.Entry, span *trace.Spa

func addMagicComment(videoId uint32, content string, ctx context.Context, logger *logrus.Entry, span *trace.Span) {
_, err := commentClient.ActionComment(ctx, &comment.ActionCommentRequest{
ActorId: 999999,
ActorId: 1,
VideoId: videoId,
ActionType: comment.ActionCommentType_ACTION_COMMENT_TYPE_ADD,
Action: &comment.ActionCommentRequest_CommentText{CommentText: content},
Expand Down
Loading