diff --git a/src/constant/config/service.go b/src/constant/config/service.go index 1a57999..62859d2 100644 --- a/src/constant/config/service.go +++ b/src/constant/config/service.go @@ -36,3 +36,5 @@ const VideoProcessorRpcServiceName = "GuGoTik-VideoProcessorService" const VideoPicker = "GuGoTik-VideoPicker" const Event = "GuGoTik-Recommend" const MsgConsumer = "GuGoTik-MgsConsumer" + +const BloomRedisChannel = "GuGoTik-Bloom" diff --git a/src/services/auth/handler.go b/src/services/auth/handler.go index b8fa091..14841e4 100644 --- a/src/services/auth/handler.go +++ b/src/services/auth/handler.go @@ -11,6 +11,7 @@ import ( user2 "GuGoTik/src/rpc/user" "GuGoTik/src/storage/cached" "GuGoTik/src/storage/database" + "GuGoTik/src/storage/redis" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "context" @@ -36,7 +37,7 @@ var relationClient relation.RelationServiceClient var userClient user2.UserServiceClient var recommendClient recommend.RecommendServiceClient -var bloomFilter *bloom.BloomFilter +var BloomFilter *bloom.BloomFilter type AuthServiceImpl struct { auth.AuthServiceServer @@ -49,9 +50,6 @@ func (a AuthServiceImpl) New() { userClient = user2.NewUserServiceClient(userRpcConn) recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName) recommendClient = recommend.NewRecommendServiceClient(recommendRpcConn) - - // Create a new Bloom filter with a target false positive rate of 0.1% - bloomFilter = bloom.NewWithEstimates(10000000, 0.001) // assuming we have 1 million users } func (a AuthServiceImpl) Authenticate(ctx context.Context, request *auth.AuthenticateRequest) (resp *auth.AuthenticateResponse, err error) { @@ -239,7 +237,20 @@ func (a AuthServiceImpl) Register(ctx context.Context, request *auth.RegisterReq resp.StatusCode = strings.ServiceOKCode resp.StatusMsg = strings.ServiceOK - bloomFilter.AddString(user.UserName) + // Publish the username to redis + BloomFilter.AddString(user.UserName) + logger.WithFields(logrus.Fields{ + "username": user.UserName, + }).Infof("Publishing user name to redis channel") + err = redis.Client.Publish(ctx, config.BloomRedisChannel, user.UserName).Err() + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "username": user.UserName, + }).Errorf("Publishing user name to redis channel happens error") + logging.SetSpanError(span, err) + } + addMagicUserFriend(ctx, &span, user.ID) return @@ -255,7 +266,7 @@ func (a AuthServiceImpl) Login(ctx context.Context, request *auth.LoginRequest) }).Infof("User try to log in.") // Check if a username might be in the filter - if !bloomFilter.TestString(request.Username) { + if !BloomFilter.TestString(request.Username) { resp = &auth.LoginResponse{ StatusCode: strings.UnableToQueryUserErrorCode, StatusMsg: strings.UnableToQueryUserError, diff --git a/src/services/auth/main.go b/src/services/auth/main.go index d416e54..57110d7 100644 --- a/src/services/auth/main.go +++ b/src/services/auth/main.go @@ -4,9 +4,12 @@ import ( "GuGoTik/src/constant/config" "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" + "GuGoTik/src/models" "GuGoTik/src/rpc/auth" "GuGoTik/src/rpc/health" healthImpl "GuGoTik/src/services/health" + "GuGoTik/src/storage/database" + "GuGoTik/src/storage/redis" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -17,7 +20,9 @@ import ( _ "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" _ "github.com/prometheus/client_golang/prometheus/promhttp" + redis2 "github.com/redis/go-redis/v9" "github.com/sirupsen/logrus" + "github.com/willf/bloom" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "net" @@ -61,6 +66,43 @@ func main() { reg := prom.Client reg.MustRegister(srvMetrics) + // Create a new Bloom filter with a target false positive rate of 0.1% + BloomFilter = bloom.NewWithEstimates(10000000, 0.001) // assuming we have 1 million users + + // Initialize BloomFilter from database + var users []models.User + userNamesResult := database.Client.WithContext(context.Background()).Select("user_name").Find(&users) + if userNamesResult.Error != nil { + log.Panicf("Getting user names from databse happens error: %s", userNamesResult.Error) + panic(userNamesResult.Error) + } + for _, u := range users { + BloomFilter.AddString(u.UserName) + } + + // Create a go routine to receive redis message and add it to BloomFilter + go func() { + pubSub := redis.Client.Subscribe(context.Background(), config.BloomRedisChannel) + defer func(pubSub *redis2.PubSub) { + err := pubSub.Close() + if err != nil { + log.Panicf("Closing redis pubsub happend error: %s", err) + } + }(pubSub) + + _, err := pubSub.ReceiveMessage(context.Background()) + if err != nil { + log.Panicf("Reveiving message from redis happens error: %s", err) + panic(err) + } + + ch := pubSub.Channel() + for msg := range ch { + log.Infof("Add user name to BloomFilter: %s", msg.Payload) + BloomFilter.AddString(msg.Payload) + } + }() + s := grpc.NewServer( grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 0f1d3f9..6db7778 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -49,11 +49,11 @@ func init() { func main() { conn, err := amqp.Dial(rabbitmq.BuildMQConnAddr()) if err != nil { - failOnError(err, "Fialed to conenct to RabbitMQ") + failOnError(err, "Failed to connect to RabbitMQ") } defer func(conn *amqp.Connection) { err := conn.Close() - failOnError(err, "Fialed to close conn") + failOnError(err, "Failed to close connection") }(conn) tp, err := tracing.SetTraceProvider(config.MsgConsumer) @@ -77,7 +77,7 @@ func main() { defer func(channel *amqp.Channel) { err := channel.Close() - failOnError(err, "Fialed to close channel") + failOnError(err, "Failed to close channel") }(channel) err = channel.ExchangeDeclare( @@ -143,7 +143,7 @@ func main() { failOnError(err, "Failed to Consume") } - var foreever chan struct{} + var forever chan struct{} logger := logging.LogService("msgConsumer") logger.Infof(strings.MessageActionEvent + " is running now") @@ -179,15 +179,15 @@ func main() { continue } - pmessage := models.Message{ + pMessage := models.Message{ ToUserId: message.ToUserId, FromUserId: message.FromUserId, ConversationId: message.ConversationId, Content: message.Content, } - logger.Info(pmessage) + logger.Info(pMessage) //可能会重新插入数据 开启事务 晚点改 - result := database.Client.WithContext(context.Background()).Create(&pmessage) + result := database.Client.WithContext(context.Background()).Create(&pMessage) if result.Error != nil { logger.WithFields(logrus.Fields{ "from_id": message.FromUserId, @@ -223,12 +223,12 @@ func main() { go ss(channel) - <-foreever + <-forever } func ss(channel *amqp.Channel) { - gptmsg, err := channel.Consume( + gptMsg, err := channel.Consume( strings.MessageGptActionEvent, "", false, false, false, false, @@ -239,7 +239,7 @@ func ss(channel *amqp.Channel) { } var message models.Message - for body := range gptmsg { + for body := range gptMsg { ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) ctx, span := tracing.Tracer.Start(ctx, "message_send Service") logger := logging.LogService("message_send").WithContext(ctx) @@ -271,14 +271,14 @@ func ss(channel *amqp.Channel) { continue } - pmessage := models.Message{ + pMessage := models.Message{ ToUserId: message.ToUserId, FromUserId: message.FromUserId, ConversationId: message.ConversationId, Content: message.Content, } //可能会重新插入数据 开启事务 晚点改 - result := database.Client.WithContext(context.Background()).Create(&pmessage) + result := database.Client.WithContext(context.Background()).Create(&pMessage) //发一份消息到openai api if result.Error != nil { logger.WithFields(logrus.Fields{ @@ -318,7 +318,7 @@ func ss(channel *amqp.Channel) { } text := resp.Choices[0].Message.Content - pmessage = models.Message{ + pMessage = models.Message{ ToUserId: message.FromUserId, FromUserId: message.ToUserId, ConversationId: message.ConversationId, @@ -326,7 +326,7 @@ func ss(channel *amqp.Channel) { // Content: "111", } - result = database.Client.WithContext(context.Background()).Create(&pmessage) + result = database.Client.WithContext(context.Background()).Create(&pMessage) if result.Error != nil { logger.WithFields(logrus.Fields{