Skip to content

Commit

Permalink
Merge pull request #41 from h0n9/develop
Browse files Browse the repository at this point in the history
Prepare to release v0.0.4
  • Loading branch information
h0n9 authored Aug 10, 2023
2 parents 9c1c2fd + 12395e0 commit f4b1b02
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 60 deletions.
4 changes: 3 additions & 1 deletion cli/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ var Cmd = &cobra.Command{
}()
logger.Info().Msg("listening os signal: SIGINT, SIGTERM")

grpcServer = grpc.NewServer()
grpcServer = grpc.NewServer(
grpc.UnaryInterceptor(util.UnaryServerInterceptor()),
)
logger.Info().Msg("initalized gRPC server")
lakeService, err = lake.NewService(
ctx,
Expand Down
97 changes: 48 additions & 49 deletions cli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,65 +170,64 @@ var Cmd = &cobra.Command{
go func() {
defer wg.Done()
reader := bufio.NewReader(os.Stdin)
ok := true
for {
printInput(false)
input, err := reader.ReadString('\n')
if err == io.EOF {
select {
case <-ctx.Done():
return
}
if err != nil {
fmt.Println(err)
continue
}
input = strings.TrimSuffix(input, "\n")
if input == "" {
continue
}
if !ok {
cancel()
return
}
go func() {
msg := Msg{
Data: []byte(input),
Metadata: map[string][]byte{
"nickname": []byte(nickname),
},
}
data, err := json.Marshal(msg)
if err != nil {
fmt.Println(err)
default:
printInput(false)
input, err := reader.ReadString('\n')
if err == io.EOF {
return
}
sigDataBytes, err := privKey.Sign(data)
if err != nil {
fmt.Println(err)
return
continue
}
input = strings.TrimSuffix(input, "\n")
if input == "" {
continue
}
go func() {
msg := Msg{
Data: []byte(input),
Metadata: map[string][]byte{
"nickname": []byte(nickname),
},
}
data, err := json.Marshal(msg)
if err != nil {
fmt.Println(err)
return
}
sigDataBytes, err := privKey.Sign(data)
if err != nil {
fmt.Println(err)
return
}

pubRes, err := cli.Publish(ctx, &pb.PublishReq{
TopicId: topicID,
MsgCapsule: &pb.MsgCapsule{
Data: data,
Signature: &pb.Signature{
PubKey: pubKeyBytes,
Data: sigDataBytes,
pubRes, err := cli.Publish(ctx, &pb.PublishReq{
TopicId: topicID,
MsgCapsule: &pb.MsgCapsule{
Data: data,
Signature: &pb.Signature{
PubKey: pubKeyBytes,
Data: sigDataBytes,
},
},
},
})
if err != nil {
fmt.Println(err)
ok = false
return
}
})
if err != nil {
fmt.Println(err)
return
}

// check publish res
if !pubRes.GetOk() {
fmt.Println("failed to send message")
return
}
}()
// check publish res
if !pubRes.GetOk() {
fmt.Println("failed to send message")
return
}
}()
}
}
}()

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/rs/zerolog v1.29.1
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.4
go.uber.org/ratelimit v0.3.0
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/ratelimit v0.3.0 h1:IdZd9wqvFXnvLvSEBo0KPcGfkoBGNkpTHlrE3Rcjkjw=
go.uber.org/ratelimit v0.3.0/go.mod h1:So5LG7CV1zWpY1sHe+DXTJqQvOx+FFPFaAs2SnoyBaI=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
Expand Down
11 changes: 2 additions & 9 deletions msg/box.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -256,21 +255,15 @@ func (box *Box) LeaveSub(subscriberID string) error {
}

func init() {
tmp, err := getEnvInt("INTERNAL_CHAN_BUFFER_SIZE", DefaultInternalChanBufferSize)
tmp, err := util.GetEnvInt("INTERNAL_CHAN_BUFFER_SIZE", DefaultInternalChanBufferSize)
if err != nil {
panic(err)
}
internalChanBufferSize = tmp

tmp, err = getEnvInt("EXTERNAL_CHAN_BUFFER_SIZE", DefaultExternalChanBufferSize)
tmp, err = util.GetEnvInt("EXTERNAL_CHAN_BUFFER_SIZE", DefaultExternalChanBufferSize)
if err != nil {
panic(err)
}
externalChanBufferSize = tmp
}

func getEnvInt(key string, fallback int) (int, error) {
tmpStr := strconv.Itoa(fallback)
tmpStr = util.GetEnv(key, tmpStr)
return strconv.Atoi(tmpStr)
}
32 changes: 32 additions & 0 deletions util/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package util

import (
"context"

"go.uber.org/ratelimit"
"google.golang.org/grpc"
)

const (
DefaultUnaryServerInterceptorRateLimit = 10000
)

var (
unaryServerInterceptorRateLimit int
)

func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
rl := ratelimit.New(unaryServerInterceptorRateLimit)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
rl.Take()
return handler(ctx, req)
}
}

func init() {
tmp, err := getEnvInt("UNARY_SERVER_INTERCEPTOR_RATE_LIMIT", DefaultUnaryServerInterceptorRateLimit)
if err != nil {
panic(err)
}
unaryServerInterceptorRateLimit = tmp
}
17 changes: 16 additions & 1 deletion util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/rand"
"encoding/base64"
"os"
"strconv"
)

func CheckStrLen(target string, min, max int) bool {
Expand All @@ -20,14 +21,28 @@ func GenerateRandomBase64String(size int) string {
return base64.RawStdEncoding.EncodeToString(bytes)
}

func GetEnv(key, fallback string) string {
func getEnv(key, fallback string) string {
value, ok := os.LookupEnv(key)
if !ok {
return fallback
}
return value
}

func GetEnv(key, fallback string) string {
return getEnv(key, fallback)
}

func getEnvInt(key string, fallback int) (int, error) {
tmpStr := strconv.Itoa(fallback)
tmpStr = getEnv(key, tmpStr)
return strconv.Atoi(tmpStr)
}

func GetEnvInt(key string, fallback int) (int, error) {
return getEnvInt(key, fallback)
}

func GetLogLevel() string {
return GetEnv("LOG_LEVEL", "info")
}

0 comments on commit f4b1b02

Please sign in to comment.