diff --git a/src/constant/config/env.go b/src/constant/config/env.go index 0fafbab..d0a20f2 100644 --- a/src/constant/config/env.go +++ b/src/constant/config/env.go @@ -13,11 +13,11 @@ type envConfig struct { LoggerLevel string `env:"LOGGER_LEVEL" envDefault:"INFO"` LoggerWithTraceState string `env:"LOGGER_OUT_TRACING" envDefault:"disable"` TiedLogging string `env:"TIED" envDefault:"NONE"` - PostgreSQLHost string `env:"POSTGRESQL_HOST" envDefault:"127.0.0.1"` - PostgreSQLPort string `env:"POSTGRESQL_PORT" envDefault:"5432"` - PostgreSQLUser string `env:"POSTGRESQL_USER" envDefault:"postgres"` - PostgreSQLPassword string `env:"POSTGRESQL_PASSWORD" envDefault:"1092"` - PostgreSQLDataBase string `env:"POSTGRESQL_DATABASE" envDefault:"Gugo"` + PostgreSQLHost string `env:"POSTGRESQL_HOST"` + PostgreSQLPort string `env:"POSTGRESQL_PORT"` + PostgreSQLUser string `env:"POSTGRESQL_USER"` + PostgreSQLPassword string `env:"POSTGRESQL_PASSWORD"` + PostgreSQLDataBase string `env:"POSTGRESQL_DATABASE"` StorageType string `env:"STORAGE_TYPE" envDefault:"fs"` FileSystemStartPath string `env:"FS_PATH" envDefault:"/tmp"` FileSystemBaseUrl string `env:"FS_BASEURL" envDefault:"http://localhost/"` @@ -27,7 +27,7 @@ type envConfig struct { TracingEndPoint string `env:"TRACING_ENDPOINT"` PyroscopeState string `env:"PYROSCOPE_STATE" envDefault:"false"` PyroscopeAddr string `env:"PYROSCOPE_ADDR"` - RedisPrefix string `env:"REDIS_PREFIX" envDefault:"GUGUo"` + RedisPrefix string `env:"REDIS_PREFIX" envDefault:"GUGUTIK"` PostgreSQLSchema string `env:"POSTGRESQL_SCHEMA"` RedisMaster string `env:"REDIS_MASTER"` ConsulAnonymityPrefix string `env:"CONSUL_ANONYMITY_NAME" envDefault:""` diff --git a/src/constant/strings/err.go b/src/constant/strings/err.go index 43f6a14..0d8d770 100644 --- a/src/constant/strings/err.go +++ b/src/constant/strings/err.go @@ -54,36 +54,30 @@ const ( UserServiceInnerError = "登录服务出现内部错误,请稍后重试!" UnableToQueryVideoErrorCode = 50022 UnableToQueryVideoError = "无法查询到该视频" - AlreadyFollowingErrorCode = 50023 - AlreadyFollowingError = "无法关注已关注的人" - UnableToGetFriendListErrorCode = 50024 - UnableToGetFriendListError = "无法查询到好友列表" ) // Expected Error const ( - AuthUserExistedCode = 10001 - AuthUserExisted = "用户已存在,请更换用户名或尝试登录!" - UserNotExistedCode = 10002 - UserNotExisted = "用户不存在,请先注册或检查你的用户名是否正确!" - AuthUserLoginFailedCode = 10003 - AuthUserLoginFailed = "用户信息错误,请检查账号密码是否正确" - AuthUserNeededCode = 10004 - AuthUserNeeded = "用户无权限操作,请登陆后重试!" - ActionCommentTypeInvalidCode = 10005 - ActionCommentTypeInvalid = "不合法的评论类型" - ActionCommentLimitedCode = 10006 - ActionCommentLimited = "评论频繁,请稍后再试!" - InvalidContentTypeCode = 10007 - InvalidContentType = "不合法的内容类型" - FavorivateServiceDuplicateCode = 10008 - FavorivateServiceDuplicateError = "不能重复点赞" - FavorivateServiceCancelCode = 10009 - FavorivateServiceCancelError = "没有点赞,不能取消点赞" - PublishVideoLimitedCode = 10010 - PublishVideoLimited = "视频发布频繁,请稍后再试!" - ChatActionLimitedCode = 10011 - ChatActionLimitedError = "发送消息频繁,请稍后再试!" - FollowLimitedCode = 10012 - FollowLimited = "关注频繁,请稍后再试! " + AuthUserExistedCode = 10001 + AuthUserExisted = "用户已存在,请更换用户名或尝试登录!" + UserNotExistedCode = 10002 + UserNotExisted = "用户不存在,请先注册或检查你的用户名是否正确!" + AuthUserLoginFailedCode = 10003 + AuthUserLoginFailed = "用户信息错误,请检查账号密码是否正确" + AuthUserNeededCode = 10004 + AuthUserNeeded = "用户无权限操作,请登陆后重试!" + ActionCommentTypeInvalidCode = 10005 + ActionCommentTypeInvalid = "不合法的评论类型" + ActionCommentLimitedCode = 10006 + ActionCommentLimited = "评论频繁,请稍后再试!" + InvalidContentTypeCode = 10007 + InvalidContentType = "不合法的内容类型" + FavoriteServiceDuplicateCode = 10008 + FavoriteServiceDuplicateError = "不能重复点赞" + FavoriteServiceCancelCode = 10009 + FavoriteServiceCancelError = "没有点赞,不能取消点赞" + PublishVideoLimitedCode = 10010 + PublishVideoLimited = "视频发布频繁,请稍后再试!" + ChatActionLimitedCode = 10011 + ChatActionLimitedError = "发送消息频繁,请稍后再试!" ) diff --git a/src/services/relation/handler.go b/src/services/relation/handler.go index 3adaa99..43f287e 100644 --- a/src/services/relation/handler.go +++ b/src/services/relation/handler.go @@ -9,12 +9,10 @@ import ( "GuGoTik/src/rpc/user" "GuGoTik/src/storage/cached" "GuGoTik/src/storage/database" - redis2 "GuGoTik/src/storage/redis" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "context" "fmt" - "github.com/go-redis/redis_rate/v10" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/trace" "strconv" @@ -24,16 +22,21 @@ import ( var userClient user.UserServiceClient -var actionRelationLimitKeyPrefix = config.EnvCfg.RedisPrefix + "relation_freq_limit" - -const actionRelationMaxQPS = 3 - type RelationServiceImpl struct { relation.RelationServiceServer } -func actionRelationLimitKey(userId uint32) string { - return fmt.Sprintf("%s-%d", actionRelationLimitKeyPrefix, userId) +type CacheRelationList struct { + rList []models.Relation +} + +func (c *CacheRelationList) IsDirty() bool { + return c.rList != nil +} + +// GetID : use userid as key for cache +func (c *CacheRelationList) GetID() uint32 { + return 0 } func (r RelationServiceImpl) New() { @@ -46,53 +49,6 @@ func (r RelationServiceImpl) Follow(ctx context.Context, request *relation.Relat defer span.End() logger := logging.LogService("RelationService.Follow").WithContext(ctx) - //限流 - limiter := redis_rate.NewLimiter(redis2.Client) - limiterKey := actionRelationLimitKey(request.ActorId) - limiterRes, err := limiter.Allow(ctx, limiterKey, redis_rate.PerSecond(actionRelationMaxQPS)) - if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - }).Errorf("ActionRelation limiter error") - logging.SetSpanError(span, err) - - resp = &relation.RelationActionResponse{ - StatusCode: strings.UnableToFollowErrorCode, - StatusMsg: strings.UnableToFollowError, - } - return - } - if limiterRes.Allowed == 0 { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - }).Infof("Follow query too frequently by user %d", request.ActorId) - - resp = &relation.RelationActionResponse{ - StatusCode: strings.FollowLimitedCode, - StatusMsg: strings.FollowLimited, - } - return - } - - //actor exists - userExist, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{UserId: request.ActorId}) - - if err != nil || userExist.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - }).Errorf("User service error") - logging.SetSpanError(span, err) - - resp = &relation.RelationActionResponse{ - StatusCode: strings.UnableToQueryUserErrorCode, - StatusMsg: strings.UnableToQueryUserError, - } - return - } - if request.UserId == request.ActorId { resp = &relation.RelationActionResponse{ StatusCode: strings.UnableToRelateYourselfErrorCode, @@ -101,13 +57,16 @@ func (r RelationServiceImpl) Follow(ctx context.Context, request *relation.Relat return } - userExist, err = userClient.GetUserExistInformation(ctx, &user.UserExistRequest{UserId: request.UserId}) + userResponse, err := userClient.GetUserInfo(ctx, &user.UserRequest{ + UserId: request.UserId, + ActorId: request.ActorId, + }) - if err != nil || userExist.StatusCode != strings.ServiceOKCode { + if err != nil || userResponse.StatusCode != strings.ServiceOKCode { logger.WithFields(logrus.Fields{ "err": err, "ActorId": request.ActorId, - }).Errorf("User service error") + }).Errorf("failed to get user info") logging.SetSpanError(span, err) resp = &relation.RelationActionResponse{ @@ -131,24 +90,6 @@ func (r RelationServiceImpl) Follow(ctx context.Context, request *relation.Relat tx.Commit() }() - // 检查是否已经存在相同的记录 - var count int64 - if err = tx.Model(&models.Relation{}).Where("actor_id = ? AND user_id = ?", rRelation.ActorId, rRelation.UserId).Count(&count).Error; err != nil { - resp = &relation.RelationActionResponse{ - StatusCode: strings.UnableToFollowErrorCode, - StatusMsg: strings.UnableToFollowError, - } - logging.SetSpanError(span, err) - return - } - if count > 0 { - resp = &relation.RelationActionResponse{ - StatusCode: strings.AlreadyFollowingErrorCode, - StatusMsg: strings.AlreadyFollowingError, - } - return - } - if err = tx.Create(&rRelation).Error; err != nil { resp = &relation.RelationActionResponse{ StatusCode: strings.UnableToFollowErrorCode, @@ -201,23 +142,6 @@ func (r RelationServiceImpl) Unfollow(ctx context.Context, request *relation.Rel defer span.End() logger := logging.LogService("RelationService.Unfollow").WithContext(ctx) - //actor exists - userExist, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{UserId: request.ActorId}) - - if err != nil || userExist.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - }).Errorf("User service error") - logging.SetSpanError(span, err) - - resp = &relation.RelationActionResponse{ - StatusCode: strings.UnableToQueryUserErrorCode, - StatusMsg: strings.UnableToQueryUserError, - } - return - } - if request.UserId == request.ActorId { resp = &relation.RelationActionResponse{ StatusCode: strings.UnableToRelateYourselfErrorCode, @@ -226,13 +150,16 @@ func (r RelationServiceImpl) Unfollow(ctx context.Context, request *relation.Rel return } - userExist, err = userClient.GetUserExistInformation(ctx, &user.UserExistRequest{UserId: request.UserId}) + userResponse, err := userClient.GetUserInfo(ctx, &user.UserRequest{ + UserId: request.UserId, + ActorId: request.ActorId, + }) - if err != nil || userExist.StatusCode != strings.ServiceOKCode { + if err != nil || userResponse.StatusCode != strings.ServiceOKCode { logger.WithFields(logrus.Fields{ "err": err, "ActorId": request.ActorId, - }).Errorf("User service error") + }).Errorf("failed to get user info") logging.SetSpanError(span, err) resp = &relation.RelationActionResponse{ @@ -270,7 +197,7 @@ func (r RelationServiceImpl) Unfollow(ctx context.Context, request *relation.Rel tx.Commit() }() - if err = tx.Unscoped().Where(&rRelation).Delete(&rRelation).Error; err != nil { + if err = tx.Where(&rRelation).Delete(&rRelation).Error; err != nil { resp = &relation.RelationActionResponse{ StatusCode: strings.UnableToUnFollowErrorCode, StatusMsg: strings.UnableToUnFollowError, @@ -322,24 +249,8 @@ func (r RelationServiceImpl) CountFollowList(ctx context.Context, request *relat ctx, span := tracing.Tracer.Start(ctx, "CountFollowListService") defer span.End() logger := logging.LogService("RelationService.CountFollowList").WithContext(ctx) - //actor exists - userExist, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{UserId: request.UserId}) - - if err != nil || userExist.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "UserId": request.UserId, - }).Errorf("not find the user:%v", request.UserId) - logging.SetSpanError(span, err) - - resp = &relation.CountFollowListResponse{ - StatusCode: strings.UnableToQueryUserErrorCode, - StatusMsg: strings.UnableToQueryUserError, - } - return - } - cacheKey := fmt.Sprintf("follow_count_%d", request.UserId) + cacheKey := fmt.Sprintf("follow_list_count_%d", request.UserId) cachedCountString, ok, err := cached.Get(ctx, cacheKey) if err != nil { @@ -411,23 +322,8 @@ func (r RelationServiceImpl) CountFollowerList(ctx context.Context, request *rel defer span.End() logger := logging.LogService("RelationService.CountFollowerList").WithContext(ctx) - userExist, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{UserId: request.UserId}) - - if err != nil || userExist.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "UserId": request.UserId, - }).Errorf("not find the user:%v", request.UserId) - logging.SetSpanError(span, err) - - resp = &relation.CountFollowerListResponse{ - StatusCode: strings.UnableToQueryUserErrorCode, - StatusMsg: strings.UnableToQueryUserError, - } - return - } - cacheKey := fmt.Sprintf("follower_count_%d", request.UserId) + cachedCountString, ok, err := cached.Get(ctx, cacheKey) if err != nil { @@ -496,63 +392,28 @@ func (r RelationServiceImpl) GetFriendList(ctx context.Context, request *relatio defer span.End() logger := logging.LogService("RelationService.GetFriendList").WithContext(ctx) - ok, err := isUserExist(ctx, request.ActorId, request.UserId, span, logger) - if err != nil || !ok { - resp = &relation.FriendListResponse{ - StatusCode: strings.UnableToQueryUserErrorCode, - StatusMsg: strings.UnableToQueryUserError, - } - return - } - //followList - cacheKey := config.EnvCfg.RedisPrefix + fmt.Sprintf("follow_list_%d", request.UserId) - followIdList, err := redis2.Client.SMembers(ctx, cacheKey).Result() - var followRelationList []models.Relation - // 构建关注列表的用户 ID 映射 - followingMap := make(map[uint32]bool) - //判断是否需要读db - db := false - + cacheKey := fmt.Sprintf("follow_list_%d", request.UserId) + followList := CacheRelationList{} + ok, err := cached.CacheAndRedisGet(ctx, cacheKey, &followList) if err != nil { - db = true - } else { - for _, id := range followIdList { - idInt, err := strconv.Atoi(id) - //redis存在不合法的id,删除redis中的整个set并重新读数据库,写redis缓存 - if err != nil { - logger.WithFields(logrus.Fields{ - "id": id, - "err": err, - }).Errorf("Redis exists illegal id %s", id) - logging.SetSpanError(span, err) - _, err := redis2.Client.Del(ctx, cacheKey).Result() - if err != nil { - logger.WithFields(logrus.Fields{ - "id": id, - "err": err, - }).Errorf("Redis exists illegal id %s and delete redis failed", id) - logging.SetSpanError(span, err) - } - break - } - followingMap[uint32(idInt)] = true - } - } - - if db { logger.WithFields(logrus.Fields{ "error": err, - }).Errorf("Err when read Redis or no data in Redis") + }).Errorf("Err when read Redis") logging.SetSpanError(span, err) + } + if ok { + logger.Infof("Cache hit for follow list for user %d", request.UserId) + } else { followResult := database.Client.WithContext(ctx). Where("actor_id = ?", request.UserId). - Find(&followRelationList) + Find(&followList.rList) + if followResult.Error != nil { logger.WithFields(logrus.Fields{ "err": followResult.Error, - }).Errorf("GetFriendListService failed with dataBaseError") + }).Errorf("GetFriendListService failed with error") logging.SetSpanError(span, followResult.Error) resp = &relation.FriendListResponse{ @@ -561,60 +422,46 @@ func (r RelationServiceImpl) GetFriendList(ctx context.Context, request *relatio } return } - for _, rel := range followRelationList { - followingMap[rel.UserId] = true - } - for _, rel := range followRelationList { - redis2.Client.SAdd(ctx, cacheKey, rel.UserId) - } } - - //followerList - cacheKey = config.EnvCfg.RedisPrefix + fmt.Sprintf("follower_list_%d", request.UserId) - followerIdList, err := redis2.Client.SMembers(ctx, cacheKey).Result() - var followerRelationList []models.Relation - followerIdListInt := make([]uint32, len(followerIdList)) - db = false - + err = cached.ScanWriteCache(ctx, cacheKey, &followList, true) if err != nil { - db = true - } else { - for index, id := range followerIdList { - idInt, err := strconv.Atoi(id) - //redis存在不合法的id,删除redis中的整个set并重新读数据库,写redis缓存 - if err != nil { - logger.WithFields(logrus.Fields{ - "id": id, - "err": err, - }).Errorf("Redis exists illegal id %s", id) - logging.SetSpanError(span, err) - _, err := redis2.Client.Del(ctx, cacheKey).Result() - if err != nil { - logger.WithFields(logrus.Fields{ - "id": id, - "err": err, - }).Errorf("Redis exists illegal id %s and delete redis failed", id) - logging.SetSpanError(span, err) - } - break - } - followerIdListInt[index] = uint32(idInt) - } + logger.WithFields(logrus.Fields{ + "err": err, + "key": cacheKey, + }).Errorf("failed to write cache for follow list") + logging.SetSpanError(span, err) + } + + // 构建关注列表的用户 ID 映射 + followingMap := make(map[uint32]bool) + for _, follow := range followList.rList { + followingMap[follow.UserId] = true } - if db { + //followerList + cacheKey = fmt.Sprintf("follower_list_%d", request.UserId) + followerList := CacheRelationList{} + ok, err = cached.CacheAndRedisGet(ctx, cacheKey, &followerList) + if err != nil { logger.WithFields(logrus.Fields{ "error": err, - }).Errorf("Err when read Redis or no data in Redis") + }).Errorf("Err when read Redis") logging.SetSpanError(span, err) + } + if ok { + logger.WithFields(logrus.Fields{ + "userId": request.UserId, + }).Infof("Cache hit for follower list for user %d", request.UserId) + } else { followerResult := database.Client.WithContext(ctx). Where("user_id = ?", request.UserId). - Find(&followerRelationList) + Find(&followerList.rList) + if followerResult.Error != nil { logger.WithFields(logrus.Fields{ "err": followerResult.Error, - }).Errorf("GetFriendListService failed with dataBaseError") + }).Errorf("GetFriendListService failed with error") logging.SetSpanError(span, followerResult.Error) resp = &relation.FriendListResponse{ @@ -623,12 +470,15 @@ func (r RelationServiceImpl) GetFriendList(ctx context.Context, request *relatio } return } - for index, rel := range followerRelationList { - followerIdListInt[index] = rel.ActorId - } - for _, rel := range followerRelationList { - redis2.Client.SAdd(ctx, cacheKey, rel.ActorId) - } + } + err = cached.ScanWriteCache(ctx, cacheKey, &followerList, true) + + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "key": cacheKey, + }).Errorf("failed to write cache for follower list") + logging.SetSpanError(span, err) } // 构建互相关注的用户列表(既关注了关注者又被关注者所关注的用户) @@ -637,38 +487,29 @@ func (r RelationServiceImpl) GetFriendList(ctx context.Context, request *relatio var mu sync.Mutex var wg sync.WaitGroup - for _, id := range followerIdListInt { + for _, follower := range followerList.rList { wg.Add(1) - go func(id uint32) { + go func(follower models.Relation) { defer wg.Done() - if followingMap[id] { - + if followingMap[follower.ActorId] { userResponse, err := userClient.GetUserInfo(ctx, &user.UserRequest{ - UserId: id, + UserId: follower.ActorId, ActorId: request.ActorId, }) - if err != nil || userResponse.StatusCode != strings.ServiceOKCode { logger.WithFields(logrus.Fields{ - "err": err, - "followerId": id, + "err": err, + "follower": follower, }).Errorf("Unable to get information about users who follow each other") logging.SetSpanError(span, err) - resp = &relation.FriendListResponse{ - StatusCode: strings.UnableToGetFriendListErrorCode, - StatusMsg: strings.UnableToGetFriendListError, - UserList: nil, - } - } else { mu.Lock() mutualFriends = append(mutualFriends, userResponse.User) mu.Unlock() } - } - }(id) + }(follower) } wg.Wait() @@ -722,59 +563,75 @@ func (r RelationServiceImpl) GetFollowList(ctx context.Context, request *relatio defer span.End() logger := logging.LogService("RelationService.GetFollowList").WithContext(ctx) - ok, err := isUserExist(ctx, request.ActorId, request.UserId, span, logger) - if err != nil || !ok { - resp = &relation.FollowListResponse{ - StatusCode: strings.UnableToQueryUserErrorCode, - StatusMsg: strings.UnableToQueryUserError, - } - return - } - - cacheKey := config.EnvCfg.RedisPrefix + fmt.Sprintf("follow_list_%d", request.UserId) - followIdList, err := redis2.Client.SMembers(ctx, cacheKey).Result() - followIdListInt := make([]uint32, 0, len(followIdList)) - var followList []models.Relation + cacheKey := fmt.Sprintf("follow_list_%d", request.UserId) + cachedFollowList := CacheRelationList{} + // cache and redis + ok, err := cached.CacheAndRedisGet(ctx, cacheKey, &cachedFollowList) if err != nil { - result := database.Client.WithContext(ctx). - Where("actor_id = ?", request.UserId). - Order("created_at desc"). - Find(&followList) + logger.WithFields(logrus.Fields{ + "error": err, + }).Errorf("Err when read Redis") + logging.SetSpanError(span, err) + } - if result.Error != nil { - logger.WithFields(logrus.Fields{ - "err": result.Error, - }).Errorf("Failed to retrieve follow list") - logging.SetSpanError(span, err) + var rFollowList []*user.User + if ok { + logger.Infof("Cache hit, retrieving follow list for user %d", request.UserId) + rFollowList, err = r.fetchUserListInfo(ctx, cachedFollowList.rList, request.ActorId, logger, span) + if err != nil { resp = &relation.FollowListResponse{ StatusCode: strings.UnableToGetFollowListErrorCode, StatusMsg: strings.UnableToGetFollowListError, + UserList: nil, } - return - } - - for index, rel := range followList { - redis2.Client.SAdd(ctx, cacheKey, rel.UserId) - followIdListInt[index] = rel.UserId - } - } else { - followIdListInt, err = string2Int(followIdList, logger, span) - if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Errorf("failed to convert string to int") + }).Errorf("failed to convert relation to user") logging.SetSpanError(span, err) - resp = &relation.FollowListResponse{ - StatusCode: strings.UnableToGetFollowListErrorCode, - StatusMsg: strings.UnableToGetFollowListError, - } return } + + resp = &relation.FollowListResponse{ + StatusCode: strings.ServiceOKCode, + StatusMsg: strings.ServiceOK, + UserList: rFollowList, + } + return } - rFollowList, err := r.idList2UserList(ctx, followIdListInt, request.ActorId, logger, span) + //database + var followList []models.Relation + result := database.Client.WithContext(ctx). + Where("actor_id = ?", request.UserId). + Order("created_at desc"). + Find(&followList) + + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "err": result.Error, + }).Errorf("Failed to retrieve follow list") + logging.SetSpanError(span, err) + + resp = &relation.FollowListResponse{ + StatusCode: strings.UnableToGetFollowListErrorCode, + StatusMsg: strings.UnableToGetFollowListError, + } + return + } + cachedFollowList.rList = followList + + err = cached.ScanWriteCache(ctx, cacheKey, &cachedFollowList, true) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "key": cacheKey, + }).Errorf("failed to write cache for follow list") + logging.SetSpanError(span, err) + } + + rFollowList, err = r.fetchUserListInfo(ctx, followList, request.ActorId, logger, span) if err != nil { resp = &relation.FollowListResponse{ StatusCode: strings.UnableToGetFollowListErrorCode, @@ -801,59 +658,73 @@ func (r RelationServiceImpl) GetFollowerList(ctx context.Context, request *relat defer span.End() logger := logging.LogService("RelationService.GetFollowerList").WithContext(ctx) - ok, err := isUserExist(ctx, request.ActorId, request.UserId, span, logger) - if err != nil || !ok { - resp = &relation.FollowerListResponse{ - StatusCode: strings.UnableToQueryUserErrorCode, - StatusMsg: strings.UnableToQueryUserError, - } - return - } - - cacheKey := config.EnvCfg.RedisPrefix + fmt.Sprintf("follower_list_%d", request.UserId) - followerIdList, err := redis2.Client.SMembers(ctx, cacheKey).Result() - followerIdListInt := make([]uint32, 0, len(followerIdList)) - var followerList []models.Relation + cacheKey := fmt.Sprintf("follower_list_%d", request.UserId) + cachedFollowerList := CacheRelationList{} + ok, err := cached.CacheAndRedisGet(ctx, cacheKey, &cachedFollowerList) if err != nil { - result := database.Client.WithContext(ctx). - Where("user_id = ?", request.UserId). - Order("created_at desc"). - Find(&followerList) + logger.WithFields(logrus.Fields{ + "error": err, + }).Errorf("Err when read Redis") + logging.SetSpanError(span, err) + } - if result.Error != nil { - logger.WithFields(logrus.Fields{ - "err": result.Error, - }).Errorf("Failed to retrieve follower list") - logging.SetSpanError(span, err) + var rFollowerList []*user.User + if ok { + logger.Infof("Cache hit, retrieving follower list for user %d", request.UserId) + rFollowerList, err = r.fetchUserListInfo(ctx, cachedFollowerList.rList, request.ActorId, logger, span) + if err != nil { resp = &relation.FollowerListResponse{ StatusCode: strings.UnableToGetFollowerListErrorCode, StatusMsg: strings.UnableToGetFollowerListError, + UserList: nil, } - return - } - - for index, rel := range followerList { - redis2.Client.SAdd(ctx, cacheKey, rel.UserId) - followerIdListInt[index] = rel.UserId - } - } else { - followerIdListInt, err = string2Int(followerIdList, logger, span) - if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Errorf("failed to convert string to int") + }).Errorf("failed to convert relation to user") logging.SetSpanError(span, err) - resp = &relation.FollowerListResponse{ - StatusCode: strings.UnableToGetFollowerListErrorCode, - StatusMsg: strings.UnableToGetFollowerListError, - } return } + + resp = &relation.FollowerListResponse{ + StatusCode: strings.ServiceOKCode, + StatusMsg: strings.ServiceOK, + UserList: rFollowerList, + } + return + } + + var followerList []models.Relation + result := database.Client.WithContext(ctx). + Where("user_id = ?", request.UserId). + Order("created_at desc"). + Find(&followerList) + + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "err": result.Error, + }).Errorf("Failed to retrieve follower list") + logging.SetSpanError(span, err) + + resp = &relation.FollowerListResponse{ + StatusCode: strings.UnableToGetFollowerListErrorCode, + StatusMsg: strings.UnableToGetFollowerListError, + } + return + } + + cachedFollowerList.rList = followerList + err = cached.ScanWriteCache(ctx, cacheKey, &cachedFollowerList, true) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "key": cacheKey, + }).Errorf("failed to write cache for follower list") + logging.SetSpanError(span, err) } - rFollowerList, err := r.idList2UserList(ctx, followerIdListInt, request.ActorId, logger, span) + rFollowerList, err = r.fetchUserListInfo(ctx, followerList, request.ActorId, logger, span) if err != nil { resp = &relation.FollowerListResponse{ StatusCode: strings.UnableToGetFollowerListErrorCode, @@ -875,8 +746,7 @@ func (r RelationServiceImpl) GetFollowerList(ctx context.Context, request *relat return } -func (r RelationServiceImpl) idList2UserList(ctx context.Context, idList []uint32, actorID uint32, logger *logrus.Entry, span trace.Span) ([]*user.User, error) { - +func (r RelationServiceImpl) fetchUserListInfo(ctx context.Context, userList []models.Relation, actorID uint32, logger *logrus.Entry, span trace.Span) ([]*user.User, error) { var wg sync.WaitGroup var mu sync.Mutex var wgErrors []error @@ -885,24 +755,24 @@ func (r RelationServiceImpl) idList2UserList(ctx context.Context, idList []uint3 maxRetries := 3 retryInterval := 1 - rUserList := make([]*user.User, 0, len(idList)) + rUserList := make([]*user.User, 0, len(userList)) - for _, id := range idList { + for _, r := range userList { wg.Add(1) - go func(id uint32) { + go func(relation models.Relation) { defer wg.Done() retryCount := 0 for retryCount < maxRetries { userResponse, err := userClient.GetUserInfo(ctx, &user.UserRequest{ - UserId: id, + UserId: relation.UserId, ActorId: actorID, }) if err != nil || userResponse.StatusCode != strings.ServiceOKCode { logger.WithFields(logrus.Fields{ - "err": err, - "userId": id, + "err": err, + "relation": relation, }).Errorf("Unable to get user information") retryCount++ time.Sleep(time.Duration(retryInterval) * time.Second) @@ -918,7 +788,7 @@ func (r RelationServiceImpl) idList2UserList(ctx context.Context, idList []uint3 if retryCount >= maxRetries { logging.SetSpanError(span, err) } - }(id) + }(r) } wg.Wait() @@ -933,61 +803,104 @@ func (r RelationServiceImpl) idList2UserList(ctx context.Context, idList []uint3 return rUserList, nil } -func string2Int(s []string, logger *logrus.Entry, span trace.Span) (i []uint32, err error) { +// followOp = true -> follow +// followOp = false -> unfollow +func updateFollowListCache(ctx context.Context, actorID uint32, relation models.Relation, followOp bool, span trace.Span, logger *logrus.Entry) error { - i = make([]uint32, len(s)) + cacheKey := fmt.Sprintf("follow_list_%d", actorID) + cachedRelationList := CacheRelationList{} - for index, v := range s { - var idInt int - idInt, err = strconv.Atoi(v) - if err != nil { + ok, err := cached.CacheAndRedisGet(ctx, cacheKey, &cachedRelationList) + if err != nil { + logger.WithFields(logrus.Fields{ + "error": err, + }).Errorf("Redis error when find struct") + logging.SetSpanError(span, err) + return err + } + + if !ok { + result := database.Client.WithContext(ctx). + Where("actor_id = ?", actorID). + Find(&cachedRelationList.rList) + if result.Error != nil { logger.WithFields(logrus.Fields{ - "err": err, - }).Errorf("failed to convert string to int") - logging.SetSpanError(span, err) - return + "err": result.Error, + }).Errorf("GetFollowList from database failed: %v", result.Error) + logging.SetSpanError(span, result.Error) + return result.Error } - i[index] = uint32(idInt) } - return -} - -// followOp = true -> follow -// followOp = false -> unfollow -func updateFollowListCache(ctx context.Context, actorID uint32, relation models.Relation, followOp bool, span trace.Span, logger *logrus.Entry) (err error) { - - cacheKey := config.EnvCfg.RedisPrefix + fmt.Sprintf("follow_list_%d", actorID) if followOp { - _, err = redis2.Client.SAdd(ctx, cacheKey, relation.UserId).Result() + cachedRelationList.rList = append(cachedRelationList.rList, relation) } else { - _, err = redis2.Client.SRem(ctx, cacheKey, relation.UserId).Result() + for i, r := range cachedRelationList.rList { + if r.UserId == relation.UserId { + cachedRelationList.rList = append(cachedRelationList.rList[:i], cachedRelationList.rList[i+1:]...) + break + } + } } + + err = cached.ScanWriteCache(ctx, cacheKey, &cachedRelationList, true) if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Errorf("update FollowList redis failed") + }).Errorf("ScanWriteCache failed") logging.SetSpanError(span, err) + return err } - return + + return nil } -func updateFollowerListCache(ctx context.Context, userID uint32, relation models.Relation, followOp bool, span trace.Span, logger *logrus.Entry) (err error) { - cacheKey := config.EnvCfg.RedisPrefix + fmt.Sprintf("follower_list_%d", userID) +func updateFollowerListCache(ctx context.Context, userID uint32, relation models.Relation, followOp bool, span trace.Span, logger *logrus.Entry) error { + cacheKey := fmt.Sprintf("follower_list_%d", userID) + cachedRelationList := CacheRelationList{} - if followOp { - _, err = redis2.Client.SAdd(ctx, cacheKey, relation.ActorId).Result() + ok, err := cached.CacheAndRedisGet(ctx, cacheKey, &cachedRelationList) + if err != nil { + logger.WithFields(logrus.Fields{ + "error": err, + }).Errorf("Redis error when find struct") + logging.SetSpanError(span, err) + return err + } + + if !ok { + result := database.Client.WithContext(ctx). + Where("user_id = ?", userID). + Find(&cachedRelationList.rList) + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "err": result.Error, + }).Errorf("GetFollowerList from database failed: %v", result.Error) + logging.SetSpanError(span, result.Error) + return result.Error + } + } + if followOp { + cachedRelationList.rList = append(cachedRelationList.rList, relation) } else { - _, err = redis2.Client.SRem(ctx, cacheKey, relation.ActorId).Result() + for i, r := range cachedRelationList.rList { + if r.ActorId == relation.ActorId { + cachedRelationList.rList = append(cachedRelationList.rList[:i], cachedRelationList.rList[i+1:]...) + break + } + } } + + err = cached.ScanWriteCache(ctx, cacheKey, &cachedRelationList, true) if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Errorf("update FollowerList redis failed") + }).Errorf("ScanWriteCache failed") logging.SetSpanError(span, err) + return err } - return + return nil } func updateFollowCountCache(ctx context.Context, actorID uint32, followOp bool, span trace.Span, logger *logrus.Entry) error { @@ -1031,18 +944,6 @@ func updateFollowCountCache(ctx context.Context, actorID uint32, followOp bool, Where("actor_id = ?", actorID). Count(&dbCount) - if !followOp { - // unfollow - if dbCount > 0 { - dbCount = dbCount - 1 - } else { - dbCount = 0 - } - } else { - // follow - dbCount = dbCount + 1 - } - if result.Error != nil { logger.WithFields(logrus.Fields{ "error": result.Error, @@ -1100,17 +1001,6 @@ func updateFollowerCountCache(ctx context.Context, userID uint32, followOp bool, Model(&models.Relation{}). Where("user_id = ?", userID). Count(&dbCount) - if !followOp { - // unfollow - if dbCount > 0 { - dbCount = dbCount - 1 - } else { - dbCount = 0 - } - } else { - // follow - dbCount = dbCount + 1 - } if result.Error != nil { logger.WithFields(logrus.Fields{ @@ -1126,32 +1016,3 @@ func updateFollowerCountCache(ctx context.Context, userID uint32, followOp bool, cached.Write(ctx, cacheKey, countString, true) return nil } - -func isUserExist(ctx context.Context, actorID uint32, userID uint32, span trace.Span, logger *logrus.Entry) (ok bool, err error) { - - userExist, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{UserId: actorID}) - - if err != nil || userExist.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "UserId": actorID, - }).Errorf("not find the user:%v", actorID) - logging.SetSpanError(span, err) - ok = false - return - } - - userExist, err = userClient.GetUserExistInformation(ctx, &user.UserExistRequest{UserId: userID}) - - if err != nil || userExist.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "UserId": userID, - }).Errorf("not find the user:%v", userID) - logging.SetSpanError(span, err) - ok = false - return - } - ok = true - return -} diff --git a/src/services/user/handler.go b/src/services/user/handler.go index 114a666..7abd99f 100644 --- a/src/services/user/handler.go +++ b/src/services/user/handler.go @@ -6,6 +6,7 @@ import ( "GuGoTik/src/extra/tracing" "GuGoTik/src/models" "GuGoTik/src/rpc/favorite" + "GuGoTik/src/rpc/publish" "GuGoTik/src/rpc/relation" "GuGoTik/src/rpc/user" "GuGoTik/src/storage/cached" @@ -23,7 +24,7 @@ type UserServiceImpl struct { var relationClient relation.RelationServiceClient -//var publishClient publish.PublishServiceClient +var publishClient publish.PublishServiceClient var favoriteClient favorite.FavoriteServiceClient @@ -31,8 +32,8 @@ func (a UserServiceImpl) New() { relationConn := grpc2.Connect(config.RelationRpcServerName) relationClient = relation.NewRelationServiceClient(relationConn) - //publishConn := grpc2.Connect(config.PublishRpcServerName) - //publishClient = publish.NewPublishServiceClient(publishConn) + publishConn := grpc2.Connect(config.PublishRpcServerName) + publishClient = publish.NewPublishServiceClient(publishConn) favoriteConn := grpc2.Connect(config.FavoriteRpcServerName) favoriteClient = favorite.NewFavoriteServiceClient(favoriteConn) @@ -87,7 +88,7 @@ func (a UserServiceImpl) GetUserInfo(ctx context.Context, request *user.UserRequ } var wg sync.WaitGroup - wg.Add(5) + wg.Add(6) isErr := false go func() { @@ -171,31 +172,31 @@ func (a UserServiceImpl) GetUserInfo(ctx context.Context, request *user.UserRequ resp.User.IsFollow = rResp.Result }() - //go func() { - // defer wg.Done() - // rResp, err := publishClient.CountVideo(ctx, &publish.CountVideoRequest{UserId: request.UserId}) - // if err != nil { - // logger.WithFields(logrus.Fields{ - // "err": err, - // "userId": request.UserId, - // }).Errorf("Error when user service get published count") - // isErr = true - // return - // } - // - // if rResp != nil && rResp.StatusCode == strings.ServiceOKCode { - // if err != nil { - // logger.WithFields(logrus.Fields{ - // "errMsg": rResp.StatusMsg, - // "userId": request.UserId, - // }).Errorf("Error when user service get published count") - // isErr = true - // return - // } - // } - // - // resp.User.WorkCount = &rResp.Count - //}() + go func() { + defer wg.Done() + rResp, err := publishClient.CountVideo(ctx, &publish.CountVideoRequest{UserId: request.UserId}) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "userId": request.UserId, + }).Errorf("Error when user service get published count") + isErr = true + return + } + + if rResp != nil && rResp.StatusCode == strings.ServiceOKCode { + if err != nil { + logger.WithFields(logrus.Fields{ + "errMsg": rResp.StatusMsg, + "userId": request.UserId, + }).Errorf("Error when user service get published count") + isErr = true + return + } + } + + resp.User.WorkCount = &rResp.Count + }() go func() { defer wg.Done() diff --git a/test/rpc/relationrpc_test.go b/test/rpc/relationrpc_test.go index e650f47..bb7b9c8 100644 --- a/test/rpc/relationrpc_test.go +++ b/test/rpc/relationrpc_test.go @@ -15,8 +15,8 @@ func TestFollow(t *testing.T) { var Client relation.RelationServiceClient req := relation.RelationActionRequest{ - UserId: 1, - ActorId: 4, + UserId: 4, + ActorId: 3, } conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1%s", config.RelationRpcServerPort), @@ -33,8 +33,8 @@ func TestFollow(t *testing.T) { func TestUnfollow(t *testing.T) { var Client relation.RelationServiceClient req := relation.RelationActionRequest{ - UserId: 5, - ActorId: 5, + UserId: 4, + ActorId: 3, } conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1%s", config.RelationRpcServerPort), @@ -90,7 +90,7 @@ func TestGetFollowerList(t *testing.T) { func TestCountFollowList(t *testing.T) { var Client relation.RelationServiceClient req := relation.CountFollowListRequest{ - UserId: 2, + UserId: 1, } conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1%s", config.RelationRpcServerPort), grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -99,7 +99,6 @@ func TestCountFollowList(t *testing.T) { Client = relation.NewRelationServiceClient(conn) res, err := Client.CountFollowList(context.Background(), &req) - fmt.Println(res.Count) assert.NoError(t, err) assert.Equal(t, int32(0), res.StatusCode) } @@ -108,7 +107,7 @@ func TestCountFollowerList(t *testing.T) { var Client relation.RelationServiceClient req := relation.CountFollowerListRequest{ - UserId: 4, + UserId: 1, } conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1%s", config.RelationRpcServerPort), grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -117,7 +116,6 @@ func TestCountFollowerList(t *testing.T) { Client = relation.NewRelationServiceClient(conn) res, err := Client.CountFollowerList(context.Background(), &req) - fmt.Println(res.Count) assert.NoError(t, err) assert.Equal(t, int32(0), res.StatusCode) @@ -145,8 +143,8 @@ func TestIsFollow(t *testing.T) { func TestGetFriendList(t *testing.T) { var Client relation.RelationServiceClient req := relation.FriendListRequest{ - ActorId: 1, - UserId: 1, + ActorId: 3, + UserId: 3, } conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1%s", config.RelationRpcServerPort), grpc.WithTransportCredentials(insecure.NewCredentials()), diff --git a/test/web/auth_test.go b/test/web/auth_test.go index 991aa3a..528c719 100644 --- a/test/web/auth_test.go +++ b/test/web/auth_test.go @@ -36,7 +36,7 @@ func TestRegister(t *testing.T) { // This Test can only run once. func TestDisplayRegister(t *testing.T) { - url := "http://127.0.0.1:37000/douyin/user/register?username=epicmo&password=epicmo" + url := "http://127.0.0.1:37000/douyin/user/register?username=epicmo4&password=epicmo" method := "POST" client := &http.Client{} req, err := http.NewRequest(method, url, nil) diff --git a/test/web/relation_test.go b/test/web/relation_test.go index a7c690d..4155388 100644 --- a/test/web/relation_test.go +++ b/test/web/relation_test.go @@ -192,8 +192,8 @@ func TestGetFriendList(t *testing.T) { req, err := http.NewRequest(method, url, nil) q := req.URL.Query() q.Add("token", "93b9e0bf-ebd3-4d35-801d-ac9076a1d6e5") - q.Add("actor_id", "1") - q.Add("user_id", "1") + q.Add("actor_id", "3") + q.Add("user_id", "3") req.URL.RawQuery = q.Encode() assert.Empty(t, err)