Skip to content

Commit

Permalink
Merge pull request #44 from h0n9/develop
Browse files Browse the repository at this point in the history
Prepare to release `v0.0.6`
  • Loading branch information
h0n9 authored Mar 13, 2024
2 parents 6ad9d5b + 5ff69d1 commit 2b4e5e0
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 315 deletions.
2 changes: 1 addition & 1 deletion cli/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var Cmd = &cobra.Command{
}
logger.Info().Msg("initalized lake service")

pb.RegisterLakeServer(grpcServer, lakeService)
pb.RegisterMsgLakeServer(grpcServer, lakeService)
logger.Info().Msg("registered lake service to gRPC server")

listener, err := net.Listen("tcp", grpcListenAddr)
Expand Down
39 changes: 12 additions & 27 deletions cli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var Cmd = &cobra.Command{
if err != nil {
return err
}
cli := pb.NewLakeClient(conn)
cli := pb.NewMsgLakeClient(conn)

msg := Msg{
Data: []byte(topicID),
Expand Down Expand Up @@ -149,18 +149,10 @@ var Cmd = &cobra.Command{
if bytes.Equal(signature.GetPubKey(), pubKeyBytes) {
continue
}
data := msgCapsule.GetData()
if len(data) == 0 {
if len(msgCapsule.GetData()) == 0 {
continue
}
timestamp := msgCapsule.GetTimestamp()
msg := Msg{}
err = json.Unmarshal(data, &msg)
if err != nil {
fmt.Println(err)
continue
}
printOutput(true, &msg, timestamp)
printOutput(true, msgCapsule)
printInput(true)
}
}()
Expand Down Expand Up @@ -189,13 +181,7 @@ var Cmd = &cobra.Command{
continue
}
go func() {
msg := Msg{
Data: []byte(input),
Metadata: map[string][]byte{
"nickname": []byte(nickname),
},
}
data, err := json.Marshal(msg)
data, err := json.Marshal(input)
if err != nil {
fmt.Println(err)
return
Expand Down Expand Up @@ -242,21 +228,20 @@ func printInput(newline bool) {
if newline {
s = "\r\n" + s
}
fmt.Printf(s, nickname)
fmt.Printf(s, "me")
}

func printOutput(newline bool, msg *Msg, timestamp int64) {
func printOutput(newline bool, msgCapsule *pb.MsgCapsule) {
s := "📩 <%s> [%d] %s"
if newline {
s = "\r\n" + s
}
nickname := "unknown"
metadata := msg.Metadata
value, exist := metadata["nickname"]
if exist {
nickname = string(value)
}
fmt.Printf(s, nickname, timestamp, msg.Data)
fmt.Printf(
s,
fmt.Sprintf("%x", msgCapsule.GetSignature().GetPubKey()[:4]),
msgCapsule.GetTimestamp(),
msgCapsule.GetData(),
)
}

type Msg struct {
Expand Down
4 changes: 2 additions & 2 deletions lake/lake.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
)

type Service struct {
pb.UnimplementedLakeServer
pb.UnimplementedMsgLakeServer

ctx context.Context
logger *zerolog.Logger
Expand Down Expand Up @@ -96,7 +96,7 @@ func (service *Service) Publish(ctx context.Context, req *pb.PublishReq) (*pb.Pu

return &publishRes, nil
}
func (service *Service) Subscribe(req *pb.SubscribeReq, stream pb.Lake_SubscribeServer) error {
func (service *Service) Subscribe(req *pb.SubscribeReq, stream pb.MsgLake_SubscribeServer) error {
service.logger.Debug().
Str("topic-id", req.GetTopicId()).
Msg("begin of subscribe stream")
Expand Down
174 changes: 0 additions & 174 deletions proto/lake_grpc.pb.go

This file was deleted.

Loading

0 comments on commit 2b4e5e0

Please sign in to comment.