Skip to content

Commit

Permalink
Merge pull request #209 from Maple-pro/dev
Browse files Browse the repository at this point in the history
feat(auth): add bloom filter in login request
  • Loading branch information
Maple-pro committed Aug 31, 2023
2 parents 4231989 + 2f70c10 commit 684d05b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 20 deletions.
2 changes: 2 additions & 0 deletions src/constant/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ const VideoProcessorRpcServiceName = "GuGoTik-VideoProcessorService"
const VideoPicker = "GuGoTik-VideoPicker"
const Event = "GuGoTik-Recommend"
const MsgConsumer = "GuGoTik-MgsConsumer"

const BloomRedisChannel = "GuGoTik-Bloom"
23 changes: 17 additions & 6 deletions src/services/auth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
user2 "GuGoTik/src/rpc/user"
"GuGoTik/src/storage/cached"
"GuGoTik/src/storage/database"
"GuGoTik/src/storage/redis"
grpc2 "GuGoTik/src/utils/grpc"
"GuGoTik/src/utils/logging"
"context"
Expand All @@ -36,7 +37,7 @@ var relationClient relation.RelationServiceClient
var userClient user2.UserServiceClient
var recommendClient recommend.RecommendServiceClient

var bloomFilter *bloom.BloomFilter
var BloomFilter *bloom.BloomFilter

type AuthServiceImpl struct {
auth.AuthServiceServer
Expand All @@ -49,9 +50,6 @@ func (a AuthServiceImpl) New() {
userClient = user2.NewUserServiceClient(userRpcConn)
recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName)
recommendClient = recommend.NewRecommendServiceClient(recommendRpcConn)

// Create a new Bloom filter with a target false positive rate of 0.1%
bloomFilter = bloom.NewWithEstimates(10000000, 0.001) // assuming we have 1 million users
}

func (a AuthServiceImpl) Authenticate(ctx context.Context, request *auth.AuthenticateRequest) (resp *auth.AuthenticateResponse, err error) {
Expand Down Expand Up @@ -239,7 +237,20 @@ func (a AuthServiceImpl) Register(ctx context.Context, request *auth.RegisterReq
resp.StatusCode = strings.ServiceOKCode
resp.StatusMsg = strings.ServiceOK

bloomFilter.AddString(user.UserName)
// Publish the username to redis
BloomFilter.AddString(user.UserName)
logger.WithFields(logrus.Fields{
"username": user.UserName,
}).Infof("Publishing user name to redis channel")
err = redis.Client.Publish(ctx, config.BloomRedisChannel, user.UserName).Err()
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
"username": user.UserName,
}).Errorf("Publishing user name to redis channel happens error")
logging.SetSpanError(span, err)
}

addMagicUserFriend(ctx, &span, user.ID)

return
Expand All @@ -255,7 +266,7 @@ func (a AuthServiceImpl) Login(ctx context.Context, request *auth.LoginRequest)
}).Infof("User try to log in.")

// Check if a username might be in the filter
if !bloomFilter.TestString(request.Username) {
if !BloomFilter.TestString(request.Username) {
resp = &auth.LoginResponse{
StatusCode: strings.UnableToQueryUserErrorCode,
StatusMsg: strings.UnableToQueryUserError,
Expand Down
42 changes: 42 additions & 0 deletions src/services/auth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"GuGoTik/src/constant/config"
"GuGoTik/src/extra/profiling"
"GuGoTik/src/extra/tracing"
"GuGoTik/src/models"
"GuGoTik/src/rpc/auth"
"GuGoTik/src/rpc/health"
healthImpl "GuGoTik/src/services/health"
"GuGoTik/src/storage/database"
"GuGoTik/src/storage/redis"
"GuGoTik/src/utils/consul"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/prom"
Expand All @@ -17,7 +20,9 @@ import (
_ "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
_ "github.com/prometheus/client_golang/prometheus/promhttp"
redis2 "github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
"github.com/willf/bloom"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"net"
Expand Down Expand Up @@ -61,6 +66,43 @@ func main() {
reg := prom.Client
reg.MustRegister(srvMetrics)

// Create a new Bloom filter with a target false positive rate of 0.1%
BloomFilter = bloom.NewWithEstimates(10000000, 0.001) // assuming we have 1 million users

// Initialize BloomFilter from database
var users []models.User
userNamesResult := database.Client.WithContext(context.Background()).Select("user_name").Find(&users)
if userNamesResult.Error != nil {
log.Panicf("Getting user names from databse happens error: %s", userNamesResult.Error)
panic(userNamesResult.Error)
}
for _, u := range users {
BloomFilter.AddString(u.UserName)
}

// Create a go routine to receive redis message and add it to BloomFilter
go func() {
pubSub := redis.Client.Subscribe(context.Background(), config.BloomRedisChannel)
defer func(pubSub *redis2.PubSub) {
err := pubSub.Close()
if err != nil {
log.Panicf("Closing redis pubsub happend error: %s", err)
}
}(pubSub)

_, err := pubSub.ReceiveMessage(context.Background())
if err != nil {
log.Panicf("Reveiving message from redis happens error: %s", err)
panic(err)
}

ch := pubSub.Channel()
for msg := range ch {
log.Infof("Add user name to BloomFilter: %s", msg.Payload)
BloomFilter.AddString(msg.Payload)
}
}()

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
Expand Down
28 changes: 14 additions & 14 deletions src/services/msgconsumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func init() {
func main() {
conn, err := amqp.Dial(rabbitmq.BuildMQConnAddr())
if err != nil {
failOnError(err, "Fialed to conenct to RabbitMQ")
failOnError(err, "Failed to connect to RabbitMQ")
}
defer func(conn *amqp.Connection) {
err := conn.Close()
failOnError(err, "Fialed to close conn")
failOnError(err, "Failed to close connection")
}(conn)

tp, err := tracing.SetTraceProvider(config.MsgConsumer)
Expand All @@ -77,7 +77,7 @@ func main() {

defer func(channel *amqp.Channel) {
err := channel.Close()
failOnError(err, "Fialed to close channel")
failOnError(err, "Failed to close channel")
}(channel)

err = channel.ExchangeDeclare(
Expand Down Expand Up @@ -143,7 +143,7 @@ func main() {
failOnError(err, "Failed to Consume")
}

var foreever chan struct{}
var forever chan struct{}

logger := logging.LogService("msgConsumer")
logger.Infof(strings.MessageActionEvent + " is running now")
Expand Down Expand Up @@ -179,15 +179,15 @@ func main() {
continue
}

pmessage := models.Message{
pMessage := models.Message{
ToUserId: message.ToUserId,
FromUserId: message.FromUserId,
ConversationId: message.ConversationId,
Content: message.Content,
}
logger.Info(pmessage)
logger.Info(pMessage)
//可能会重新插入数据 开启事务 晚点改
result := database.Client.WithContext(context.Background()).Create(&pmessage)
result := database.Client.WithContext(context.Background()).Create(&pMessage)
if result.Error != nil {
logger.WithFields(logrus.Fields{
"from_id": message.FromUserId,
Expand Down Expand Up @@ -223,12 +223,12 @@ func main() {

go ss(channel)

<-foreever
<-forever

}

func ss(channel *amqp.Channel) {
gptmsg, err := channel.Consume(
gptMsg, err := channel.Consume(
strings.MessageGptActionEvent,
"",
false, false, false, false,
Expand All @@ -239,7 +239,7 @@ func ss(channel *amqp.Channel) {
}
var message models.Message

for body := range gptmsg {
for body := range gptMsg {
ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers)
ctx, span := tracing.Tracer.Start(ctx, "message_send Service")
logger := logging.LogService("message_send").WithContext(ctx)
Expand Down Expand Up @@ -271,14 +271,14 @@ func ss(channel *amqp.Channel) {
continue
}

pmessage := models.Message{
pMessage := models.Message{
ToUserId: message.ToUserId,
FromUserId: message.FromUserId,
ConversationId: message.ConversationId,
Content: message.Content,
}
//可能会重新插入数据 开启事务 晚点改
result := database.Client.WithContext(context.Background()).Create(&pmessage)
result := database.Client.WithContext(context.Background()).Create(&pMessage)
//发一份消息到openai api
if result.Error != nil {
logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -318,15 +318,15 @@ func ss(channel *amqp.Channel) {
}

text := resp.Choices[0].Message.Content
pmessage = models.Message{
pMessage = models.Message{
ToUserId: message.FromUserId,
FromUserId: message.ToUserId,
ConversationId: message.ConversationId,
Content: text,
// Content: "111",
}

result = database.Client.WithContext(context.Background()).Create(&pmessage)
result = database.Client.WithContext(context.Background()).Create(&pMessage)

if result.Error != nil {
logger.WithFields(logrus.Fields{
Expand Down

0 comments on commit 684d05b

Please sign in to comment.