Skip to content

Commit

Permalink
Merge pull request #49 from nats-rpc/48-server-side-concurrency
Browse files Browse the repository at this point in the history
server side concurrency
  • Loading branch information
cdevienne authored Sep 21, 2018
2 parents 7b212b7 + 914de1b commit 8cd5ee3
Show file tree
Hide file tree
Showing 9 changed files with 1,234 additions and 654 deletions.
400 changes: 194 additions & 206 deletions examples/alloptions/alloptions.nrpc.go

Large diffs are not rendered by default.

513 changes: 347 additions & 166 deletions examples/alloptions/alloptions_test.go

Large diffs are not rendered by default.

78 changes: 48 additions & 30 deletions examples/helloworld/helloworld/helloworld.nrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ type GreeterServer interface {
// GreeterHandler provides a NATS subscription handler that can serve a
// subscription using a given GreeterServer implementation.
type GreeterHandler struct {
ctx context.Context
nc nrpc.NatsConn
server GreeterServer
ctx context.Context
workers *nrpc.WorkerPool
nc nrpc.NatsConn
server GreeterServer
}

func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
Expand All @@ -33,13 +34,26 @@ func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *
}
}

func NewGreeterConcurrentHandler(workers *nrpc.WorkerPool, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
return &GreeterHandler{
workers: workers,
nc: nc,
server: s,
}
}

func (h *GreeterHandler) Subject() string {
return "helloworld.Greeter.>"
}

func (h *GreeterHandler) Handler(msg *nats.Msg) {
var encoding string
var noreply bool
var ctx context.Context
if h.workers != nil {
ctx = h.workers.Context
} else {
ctx = h.ctx
}
request := nrpc.NewRequest(ctx, h.nc, msg.Subject, msg.Reply)
// extract method name & encoding from subject
_, _, name, tail, err := nrpc.ParseSubject(
"helloworld", 0, "Greeter", 0, msg.Subject)
Expand All @@ -48,54 +62,58 @@ func (h *GreeterHandler) Handler(msg *nats.Msg) {
return
}

ctx := h.ctx
request.MethodName = name
request.SubjectTail = tail

// call handler and form response
var resp proto.Message
var replyError *nrpc.Error
var immediateError *nrpc.Error
switch name {
case "SayHello":
_, encoding, err = nrpc.ParseSubjectTail(0, tail)
_, request.Encoding, err = nrpc.ParseSubjectTail(0, request.SubjectTail)
if err != nil {
log.Printf("SayHelloHanlder: SayHello subject parsing failed: %v", err)
break
}
var req HelloRequest
if err := nrpc.Unmarshal(encoding, msg.Data, &req); err != nil {
if err := nrpc.Unmarshal(request.Encoding, msg.Data, &req); err != nil {
log.Printf("SayHelloHandler: SayHello request unmarshal failed: %v", err)
replyError = &nrpc.Error{
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "bad request received: " + err.Error(),
}
} else {
resp, replyError = nrpc.CaptureErrors(
func()(proto.Message, error){
innerResp, err := h.server.SayHello(ctx, req)
if err != nil {
return nil, err
}
return &innerResp, err
})
if replyError != nil {
log.Printf("SayHelloHandler: SayHello handler failed: %s", replyError.Error())
request.Handler = func(ctx context.Context)(proto.Message, error){
innerResp, err := h.server.SayHello(ctx, req)
if err != nil {
return nil, err
}
return &innerResp, err
}
}
default:
log.Printf("GreeterHandler: unknown name %q", name)
replyError = &nrpc.Error{
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "unknown name: " + name,
}
}
if immediateError == nil {
if h.workers != nil {
// Try queuing the request
if err := h.workers.QueueRequest(request); err != nil {
log.Printf("nrpc: Error queuing the request: %s", err)
}
} else {
// Run the handler synchronously
request.RunAndReply()
}
}


if !noreply {
// encode and send response
err = nrpc.Publish(resp, replyError, h.nc, msg.Reply, encoding) // error is logged
if immediateError != nil {
if err := request.SendReply(nil, immediateError); err != nil {
log.Printf("GreeterHandler: Greeter handler failed to publish the response: %s", err)
}
} else {
err = nil
}
if err != nil {
log.Println("GreeterHandler: Greeter handler failed to publish the response: %s", err)
}
}

Expand Down
114 changes: 70 additions & 44 deletions examples/metrics_helloworld/helloworld/helloworld.nrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ var (
// GreeterHandler provides a NATS subscription handler that can serve a
// subscription using a given GreeterServer implementation.
type GreeterHandler struct {
ctx context.Context
nc nrpc.NatsConn
server GreeterServer
ctx context.Context
workers *nrpc.WorkerPool
nc nrpc.NatsConn
server GreeterServer
}

func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
Expand All @@ -82,13 +83,26 @@ func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *
}
}

func NewGreeterConcurrentHandler(workers *nrpc.WorkerPool, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
return &GreeterHandler{
workers: workers,
nc: nc,
server: s,
}
}

func (h *GreeterHandler) Subject() string {
return "helloworld.Greeter.>"
}

func (h *GreeterHandler) Handler(msg *nats.Msg) {
var encoding string
var noreply bool
var ctx context.Context
if h.workers != nil {
ctx = h.workers.Context
} else {
ctx = h.ctx
}
request := nrpc.NewRequest(ctx, h.nc, msg.Subject, msg.Reply)
// extract method name & encoding from subject
_, _, name, tail, err := nrpc.ParseSubject(
"helloworld", 0, "Greeter", 0, msg.Subject)
Expand All @@ -97,71 +111,83 @@ func (h *GreeterHandler) Handler(msg *nats.Msg) {
return
}

ctx := h.ctx
request.MethodName = name
request.SubjectTail = tail

// call handler and form response
var resp proto.Message
var replyError *nrpc.Error
var elapsed float64
var immediateError *nrpc.Error
switch name {
case "SayHello":
_, encoding, err = nrpc.ParseSubjectTail(0, tail)
_, request.Encoding, err = nrpc.ParseSubjectTail(0, request.SubjectTail)
if err != nil {
log.Printf("SayHelloHanlder: SayHello subject parsing failed: %v", err)
break
}
var req HelloRequest
if err := nrpc.Unmarshal(encoding, msg.Data, &req); err != nil {
if err := nrpc.Unmarshal(request.Encoding, msg.Data, &req); err != nil {
log.Printf("SayHelloHandler: SayHello request unmarshal failed: %v", err)
replyError = &nrpc.Error{
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "bad request received: " + err.Error(),
}
serverRequestsForGreeter.WithLabelValues(
"SayHello", encoding, "unmarshal_fail").Inc()
"SayHello", request.Encoding, "unmarshal_fail").Inc()
} else {
start := time.Now()
resp, replyError = nrpc.CaptureErrors(
func()(proto.Message, error){
innerResp, err := h.server.SayHello(ctx, req)
if err != nil {
return nil, err
}
return &innerResp, err
})
elapsed = time.Since(start).Seconds()
if replyError != nil {
log.Printf("SayHelloHandler: SayHello handler failed: %s", replyError.Error())
serverRequestsForGreeter.WithLabelValues(
"SayHello", encoding, "handler_fail").Inc()
request.Handler = func(ctx context.Context)(proto.Message, error){
innerResp, err := h.server.SayHello(ctx, req)
if err != nil {
return nil, err
}
return &innerResp, err
}
}
default:
log.Printf("GreeterHandler: unknown name %q", name)
replyError = &nrpc.Error{
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "unknown name: " + name,
}
serverRequestsForGreeter.WithLabelValues(
"Greeter", encoding, "name_fail").Inc()
"Greeter", request.Encoding, "name_fail").Inc()
}


if !noreply {
// encode and send response
err = nrpc.Publish(resp, replyError, h.nc, msg.Reply, encoding) // error is logged
} else {
err = nil
request.AfterReply = func(request *nrpc.Request, success, replySuccess bool) {
if !replySuccess {
serverRequestsForGreeter.WithLabelValues(
request.MethodName, request.Encoding, "sendreply_fail").Inc()
}
if success {
serverRequestsForGreeter.WithLabelValues(
request.MethodName, request.Encoding, "success").Inc()
} else {
serverRequestsForGreeter.WithLabelValues(
request.MethodName, request.Encoding, "handler_fail").Inc()
}
// report metric to Prometheus
serverHETForGreeter.WithLabelValues(request.MethodName).Observe(
request.Elapsed().Seconds())
}
if err != nil {
serverRequestsForGreeter.WithLabelValues(
name, encoding, "sendreply_fail").Inc()
} else if replyError == nil {
serverRequestsForGreeter.WithLabelValues(
name, encoding, "success").Inc()
if immediateError == nil {
if h.workers != nil {
// Try queuing the request
if err := h.workers.QueueRequest(request); err != nil {
log.Printf("nrpc: Error queuing the request: %s", err)
}
} else {
// Run the handler synchronously
request.RunAndReply()
}
}

// report metric to Prometheus
serverHETForGreeter.WithLabelValues(name).Observe(elapsed)
if immediateError != nil {
if err := request.SendReply(nil, immediateError); err != nil {
log.Printf("GreeterHandler: Greeter handler failed to publish the response: %s", err)
serverRequestsForGreeter.WithLabelValues(
request.MethodName, request.Encoding, "handler_fail").Inc()
}
serverHETForGreeter.WithLabelValues(request.MethodName).Observe(
request.Elapsed().Seconds())
} else {
}
}

type GreeterClient struct {
Expand Down
Loading

0 comments on commit 8cd5ee3

Please sign in to comment.