Skip to content

Commit

Permalink
Fix #47 - Streamed replies with subject params
Browse files Browse the repository at this point in the history
  • Loading branch information
cdevienne committed Jul 18, 2018
1 parent b71deed commit 053d2d1
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 34 deletions.
63 changes: 63 additions & 0 deletions examples/alloptions/alloptions.nrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ func (c *SvcCustomSubjectClient) MtVoidReqStreamedReply(
// SvcSubjectParams should implement.
type SvcSubjectParamsServer interface {
MtWithSubjectParams(ctx context.Context, mp1 string, mp2 string) (resp SimpleStringReply, err error)
MtStreamedReplyWithSubjectParams(ctx context.Context, mp1 string, mp2 string, pushRep func(SimpleStringReply)) (err error)
MtNoReply(ctx context.Context)
}

Expand All @@ -410,6 +411,40 @@ func (h *SvcSubjectParamsHandler) Subject() string {
return "root.*.svcsubjectparams.*.>"
}

func (h *SvcSubjectParamsHandler) MtStreamedReplyWithSubjectParamsHandler(ctx context.Context, tail []string, msg *nats.Msg) {
mtParams, encoding, err := nrpc.ParseSubjectTail(2, tail)
if err != nil {
log.Printf("SvcSubjectParams: MtStreamedReplyWithSubjectParams subject parsing failed:")
}

ctx, cancel := context.WithCancel(ctx)

keepStreamAlive := nrpc.NewKeepStreamAlive(h.nc, msg.Reply, encoding, cancel)

var msgCount uint32

_, nrpcErr := nrpc.CaptureErrors(func() (proto.Message, error) {
err := h.server.MtStreamedReplyWithSubjectParams(ctx, mtParams[0], mtParams[1], func(rep SimpleStringReply){
if err = nrpc.Publish(&rep, nil, h.nc, msg.Reply, encoding); err != nil {
log.Printf("nrpc: error publishing response")
cancel()
return
}
msgCount++
})
return nil, err
})
keepStreamAlive.Stop()

if nrpcErr != nil {
nrpc.Publish(nil, nrpcErr, h.nc, msg.Reply, encoding)
} else {
nrpc.Publish(
nil, &nrpc.Error{Type: nrpc.Error_EOS, MsgCount: msgCount},
h.nc, msg.Reply, encoding)
}
}

func (h *SvcSubjectParamsHandler) MtNoRequestWParamsPublish(pkginstance string, svcclientid string, mtmp1 string, msg SimpleStringReply) error {
rawMsg, err := nrpc.Marshal("protobuf", &msg)
if err != nil {
Expand Down Expand Up @@ -465,6 +500,9 @@ func (h *SvcSubjectParamsHandler) Handler(msg *nats.Msg) {
log.Printf("MtWithSubjectParamsHandler: MtWithSubjectParams handler failed: %s", replyError.Error())
}
}
case "mtstreamedreplywithsubjectparams":
h.MtStreamedReplyWithSubjectParamsHandler(ctx, tail, msg)
return
case "mtnoreply":
noreply = true
_, encoding, err = nrpc.ParseSubjectTail(0, tail)
Expand Down Expand Up @@ -551,6 +589,31 @@ func (c *SvcSubjectParamsClient) MtWithSubjectParams(mp1 string, mp2 string, ) (
return
}

func (c *SvcSubjectParamsClient) MtStreamedReplyWithSubjectParams(
ctx context.Context,mp1 string,mp2 string,
cb func (context.Context, SimpleStringReply),
) error {
subject := c.PkgSubject + "." + c.PkgParaminstance + "." + c.Subject + "." + c.SvcParamclientid + "." + "mtstreamedreplywithsubjectparams" + "." + mp1 + "." + mp2

sub, err := nrpc.StreamCall(ctx, c.nc, subject, &nrpc.Void{}, c.Encoding, c.Timeout)
if err != nil {
return err
}

var res SimpleStringReply
for {
err = sub.Next(&res)
if err != nil {
break
}
cb(ctx, res)
}
if err == nrpc.ErrEOS {
err = nil
}
return err
}

func (c *SvcSubjectParamsClient) MtNoReply() (err error) {

subject := c.PkgSubject + "." + c.PkgParaminstance + "." + c.Subject + "." + c.SvcParamclientid + "." + "mtnoreply"
Expand Down
63 changes: 32 additions & 31 deletions examples/alloptions/alloptions.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions examples/alloptions/alloptions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ service SvcSubjectParams {
option (nrpc.methodSubjectParams) = "mp1";
option (nrpc.methodSubjectParams) = "mp2";
}
rpc MtStreamedReplyWithSubjectParams(nrpc.Void) returns (SimpleStringReply) {
option (nrpc.streamedReply) = true;
option (nrpc.methodSubjectParams) = "mp1";
option (nrpc.methodSubjectParams) = "mp2";
}
rpc MtNoReply(nrpc.Void) returns (nrpc.NoReply) {}
rpc MtNoRequestWParams(nrpc.NoRequest) returns (SimpleStringReply) {
option (nrpc.methodSubjectParams) = "mp1";
Expand Down
27 changes: 27 additions & 0 deletions examples/alloptions/alloptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ func (s BasicServerImpl) MtWithSubjectParams(
return
}

func (s BasicServerImpl) MtStreamedReplyWithSubjectParams(
ctx context.Context, mp1 string, mp2 string, send func(rep SimpleStringReply),
) error {
send(SimpleStringReply{Reply: mp1})
send(SimpleStringReply{Reply: mp2})
return nil
}

func TestAll(t *testing.T) {
c, err := nats.Connect(natsURL)
if err != nil {
Expand Down Expand Up @@ -221,6 +229,25 @@ func TestAll(t *testing.T) {
t.Errorf("Expected a nrpc.Error, got %#v", err)
}

t.Run("StreamedReply with SubjectParams", func(t *testing.T) {
var resList []string
err := c2.MtStreamedReplyWithSubjectParams(
context.Background(),
"arg1", "arg2",
func(ctx context.Context, rep SimpleStringReply) {
resList = append(resList, rep.GetReply())
})
if err != nil {
t.Fatal(err)
}
if resList[0] != "arg1" {
t.Errorf("Expected 'arg1', got '%s'", resList[0])
}
if resList[1] != "arg2" {
t.Errorf("Expected 'arg2', got '%s'", resList[1])
}
})

t.Run("NoRequest method with params", func(t *testing.T) {
sub, err := c2.MtNoRequestWParamsSubscribeSync(
"mtvalue",
Expand Down
9 changes: 6 additions & 3 deletions protoc-gen-nrpc/tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ func (h *{{$serviceName}}Handler) {{.GetName}}Publish(
{{- if HasStreamedReply .}}
func (h *{{$serviceName}}Handler) {{.GetName}}Handler(ctx context.Context, tail []string, msg *nats.Msg) {
_, encoding, err := nrpc.ParseSubjectTail({{len (GetMethodSubjectParams .)}}, tail)
{{if ne 0 (len (GetMethodSubjectParams .)) -}}
mtParams
{{- else -}}_{{- end -}}
, encoding, err := nrpc.ParseSubjectTail({{len (GetMethodSubjectParams .)}}, tail)
if err != nil {
log.Printf("{{$serviceName}}: {{.GetName}} subject parsing failed:")
}
Expand All @@ -179,8 +182,8 @@ func (h *{{$serviceName}}Handler) {{.GetName}}Handler(ctx context.Context, tail
_, nrpcErr := nrpc.CaptureErrors(func() (proto.Message, error) {
err := h.server.{{.GetName}}(ctx
{{- range GetMethodSubjectParams . -}}
, {{.}}
{{- range $i, $p := GetMethodSubjectParams . -}}
, mtParams[{{ $i }}]
{{- end -}}
{{- if ne .GetInputType ".nrpc.Void" -}}
, req
Expand Down

0 comments on commit 053d2d1

Please sign in to comment.