Skip to content

Commit

Permalink
Merge pull request #35 from h0n9/develop
Browse files Browse the repository at this point in the history
Prepare to release v0.0.2
  • Loading branch information
h0n9 authored Aug 3, 2023
2 parents adcb124 + ac46fd1 commit e9d0bdc
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 83 deletions.
26 changes: 24 additions & 2 deletions cli/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"math/big"
"net"
"os"
"os/signal"
"sync"
"syscall"

"github.com/rs/zerolog"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -46,10 +49,27 @@ var Cmd = &cobra.Command{
zerolog.SetGlobalLevel(logLevel)
logger.Info().Msg("initalized logger")

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
logger.Info().Msg("initalized context")

grpcServer := grpc.NewServer()
var grpcServer *grpc.Server = nil

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s := <-sigCh
logger.Info().Msgf("got signal %v, attempting graceful shutdown", s)
cancel()
if grpcServer != nil {
grpcServer.GracefulStop()
}
wg.Done()
}()
logger.Info().Msg("listening os signal: SIGINT, SIGTERM")

grpcServer = grpc.NewServer()
logger.Info().Msg("initalized gRPC server")
lakeService, err := lake.NewService(
ctx,
Expand Down Expand Up @@ -79,6 +99,8 @@ var Cmd = &cobra.Command{
return err
}

wg.Wait()

return nil
},
}
Expand Down
39 changes: 22 additions & 17 deletions cli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,20 @@ var Cmd = &cobra.Command{
wg.Add(1)
go func() {
defer wg.Done()
sig := <-sigCh
fmt.Println("\r\ngot", sig.String())
if conn != nil {
fmt.Printf("closing grpc client ... ")
conn.Close()
select {
case <-ctx.Done():
return
case s := <-sigCh:
fmt.Printf("got signal %v, attempting graceful shutdown\n", s)
if conn != nil {
fmt.Printf("closing grpc client ... ")
conn.Close()
fmt.Printf("done\n")
}
fmt.Printf("cancelling ctx ... ")
cancel()
fmt.Printf("done\n")
}
fmt.Printf("cancelling ctx ... ")
cancel()
fmt.Printf("done\n")
}()

// init privKey
Expand Down Expand Up @@ -134,8 +138,8 @@ var Cmd = &cobra.Command{
res, err := stream.Recv()
if err != nil {
fmt.Println(err)
sigCh <- syscall.SIGINT
break
cancel()
return
}
if res.GetType() != pb.SubscribeResType_SUBSCRIBE_RES_TYPE_RELAY {
continue
Expand All @@ -162,8 +166,11 @@ var Cmd = &cobra.Command{
}()

// execute goroutine (sender)
wg.Add(1)
go func() {
defer wg.Done()
reader := bufio.NewReader(os.Stdin)
ok := true
for {
printInput(false)
input, err := reader.ReadString('\n')
Expand All @@ -178,6 +185,10 @@ var Cmd = &cobra.Command{
if input == "" {
continue
}
if !ok {
cancel()
return
}
go func() {
msg := Msg{
Data: []byte(input),
Expand Down Expand Up @@ -206,15 +217,9 @@ var Cmd = &cobra.Command{
},
},
})
if err == io.EOF {
err := stream.CloseSend()
if err != nil {
fmt.Println(err)
cancel()
}
}
if err != nil {
fmt.Println(err)
ok = false
return
}

Expand Down
88 changes: 62 additions & 26 deletions lake/lake.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lake
import (
"context"
"fmt"
"sync"

"github.com/rs/zerolog"

Expand Down Expand Up @@ -86,16 +87,24 @@ func (service *Service) Publish(ctx context.Context, req *pb.PublishReq) (*pb.Pu
return &publishRes, err
}

service.logger.Debug().Str("addr", string(pubKey.Address())).Msg("published")
service.logger.Debug().
Str("topic-id", req.GetTopicId()).
Str("addr", string(pubKey.Address())).
Msg("published")

// update publish res
publishRes.Ok = true

return &publishRes, nil
}
func (service *Service) Subscribe(req *pb.SubscribeReq, stream pb.Lake_SubscribeServer) error {
service.logger.Debug().Msg("begin of subscribe stream")
defer service.logger.Debug().Msg("end of subscribe stream")
service.logger.Debug().
Str("topic-id", req.GetTopicId()).
Msg("begin of subscribe stream")
defer service.logger.Debug().
Str("topic-id", req.GetTopicId()).
Msg("end of subscribe stream")

// set subscribe res
res := pb.SubscribeRes{
Type: pb.SubscribeResType_SUBSCRIBE_RES_TYPE_ACK,
Expand Down Expand Up @@ -159,7 +168,10 @@ func (service *Service) Subscribe(req *pb.SubscribeReq, stream pb.Lake_Subscribe
return nil
}

service.logger.Debug().Str("subscriber-id", subscriberID).Msg("registered")
service.logger.Info().
Str("topic-id", req.GetTopicId()).
Str("subscriber-id", subscriberID).
Msg("joined subscriber")

// update subscriber res
res.SubscriberId = subscriberID
Expand All @@ -171,29 +183,53 @@ func (service *Service) Subscribe(req *pb.SubscribeReq, stream pb.Lake_Subscribe
return err
}

// update subscriber res
res.Type = pb.SubscribeResType_SUBSCRIBE_RES_TYPE_RELAY

// relay msgs to susbscriber
for {
select {
case <-stream.Context().Done():
subSize, err := msgBox.LeaveSub(subscriberID)
if err != nil {
return err
}
if subSize > 0 {
return nil
wg := sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stream.Context().Done():
return
case msgCapsule := <-subscriberCh:
err := stream.Send(&pb.SubscribeRes{
Type: pb.SubscribeResType_SUBSCRIBE_RES_TYPE_RELAY,
Res: &pb.SubscribeRes_MsgCapsule{
MsgCapsule: msgCapsule,
},
})
if err != nil {
return
}
msgCapsule = nil // explicitly free
}
msgBox.StopSub()
return nil
case msgCapsule := <-subscriberCh:
res.Res = &pb.SubscribeRes_MsgCapsule{MsgCapsule: msgCapsule}
err := stream.Send(&res)
if err != nil {
service.logger.Err(err).Msg("")
}
msgCapsule = nil // explicitly free
}
}()

wg.Wait()

go func() {
for len(subscriberCh) > 0 {
<-subscriberCh
}
service.logger.Debug().
Str("topic-id", req.GetTopicId()).
Str("subscriber-id", subscriberID).
Msg("drained subscriber ch")
}()

err = msgBox.LeaveSub(subscriberID)
if err != nil {
service.logger.Err(err).
Str("topic-id", req.GetTopicId()).
Str("subscriber-id", subscriberID).
Msg("")
}
service.logger.Info().
Str("topic-id", req.GetTopicId()).
Str("subscriber-id", subscriberID).
Msg("left subscriber")

return nil
}
Loading

0 comments on commit e9d0bdc

Please sign in to comment.