Skip to content

Commit

Permalink
Optional specification of stream content type (#4926)
Browse files Browse the repository at this point in the history
* feat: `runtime.Marshaler` can support distinct content type for streamed responses

* test: Distinct stream content type on `runtime.Marshaler`.

* doc: document the Delimited and StreamContentType interfaces
  • Loading branch information
huin authored Nov 13, 2024
1 parent e2dba67 commit 371dddb
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 17 deletions.
72 changes: 72 additions & 0 deletions docs/docs/mapping/custom_marshalers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
---
layout: default
title: Custom marshalers
nav_order: 6
parent: Mapping
---

# Custom marshalers

[`Marshaler`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime?tab=doc#Marshaler)
implementations can implement optional additional methods to customize their
behaviour beyond the methods required by the core interface.

## Stream delimiters

By default, a streamed response delimits each response body with a single
newline (`"\n"`). You can change this delimiter by having your marshaler
implement
[`Delimited`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime#Delimited).

For example, to separate each entry with a pipe (`"|"`) instead:

```go
type YourMarshaler struct {
// ...
}

// ...

func (*YourMarshaler) Delimited() []byte {
return []byte("|")
}
```

## Stream content type

By default, a streamed response emits a `Content-Type` header that is the same
for a unary response, from the `ContentType()` method of the
[`Marshaler`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime?tab=doc#Marshaler)
interface.

If you require the server to declare a distinct content type for stream
responses versus unary responses, the marshaler must implement
[`StreamContentType`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime#StreamContentType).
This provides the MIME type when specifically responding to a streaming
response.

For example, by default the
[`JSONPb`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime#JSONPb)
marshaler results in `application/json` for its `Content-Type` response header,
irrespective of unary versus streaming. This can be changed for streaming
endpoints by wrapping the marshaler with a custom marshaler that implements
[`StreamContentType`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime#StreamContentType)
to return the [NDJSON](https://github.com/ndjson/ndjson-spec) MIME type for
streaming response endpoints:

```go
type CustomJSONPb struct {
runtime.JSONPb
}

func (*CustomJSONPb) Delimited() []byte {
// Strictly speaking this is already the default delimiter for JSONPb, but
// providing it here for completeness with an NDJSON marshaler all in one
// place.
return []byte("\n")
}

func (*CustomJSONPb) StreamContentType(interface{}) string {
return "application/x-ndjson"
}
```
5 changes: 4 additions & 1 deletion docs/docs/mapping/customizing_your_gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ parent: Mapping

You might want to serialize request/response messages in MessagePack instead of JSON, for example:

1. Write a custom implementation of [`Marshaler`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime?tab=doc#Marshaler).
1. Write a custom implementation of
[`Marshaler`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime?tab=doc#Marshaler).
See [Custom marshalers](custom_marshalers.md) for some additional
customization options.

2. Register your marshaler with [`WithMarshalerOption`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime?tab=doc#WithMarshalerOption).

Expand Down
8 changes: 7 additions & 1 deletion runtime/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ func ForwardResponseStream(ctx context.Context, mux *ServeMux, marshaler Marshal
}

if !wroteHeader {
w.Header().Set("Content-Type", marshaler.ContentType(respRw))
var contentType string
if sct, ok := marshaler.(StreamContentType); ok {
contentType = sct.StreamContentType(respRw)
} else {
contentType = marshaler.ContentType(respRw)
}
w.Header().Set("Content-Type", contentType)
}

var buf []byte
Expand Down
61 changes: 46 additions & 15 deletions runtime/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,36 +178,68 @@ func (c *CustomMarshaler) NewDecoder(r io.Reader) runtime.Decoder { return c
func (c *CustomMarshaler) NewEncoder(w io.Writer) runtime.Encoder { return c.m.NewEncoder(w) }
func (c *CustomMarshaler) ContentType(v interface{}) string { return "Custom-Content-Type" }

// marshalerStreamContentType implements Marshaler, but with the addition of a custom StreamContentType.
type marshalerStreamContentType struct {
runtime.Marshaler
CustomStreamContentType string
}

func (m marshalerStreamContentType) StreamContentType(interface{}) string {
return m.CustomStreamContentType
}

func TestForwardResponseStreamCustomMarshaler(t *testing.T) {
type msg struct {
pb proto.Message
err error
}
marshaler := &CustomMarshaler{&runtime.JSONPb{}}

tests := []struct {
name string
msgs []msg
statusCode int
name string
marshaler runtime.Marshaler
msgs []msg
statusCode int
wantContentType string
}{{
name: "encoding",
name: "encoding",
marshaler: marshaler,
msgs: []msg{
{&pb.SimpleMessage{Id: "One"}, nil},
{&pb.SimpleMessage{Id: "Two"}, nil},
},
statusCode: http.StatusOK,
statusCode: http.StatusOK,
wantContentType: "Custom-Content-Type",
}, {
name: "empty",
marshaler: marshaler,
statusCode: http.StatusOK,
}, {
name: "error",
msgs: []msg{{nil, status.Errorf(codes.OutOfRange, "400")}},
statusCode: http.StatusBadRequest,
name: "error",
marshaler: marshaler,
msgs: []msg{{nil, status.Errorf(codes.OutOfRange, "400")}},
statusCode: http.StatusBadRequest,
wantContentType: "Custom-Content-Type",
}, {
name: "stream_error",
name: "stream_error",
marshaler: marshaler,
msgs: []msg{
{&pb.SimpleMessage{Id: "One"}, nil},
{nil, status.Errorf(codes.OutOfRange, "400")},
},
statusCode: http.StatusOK,
statusCode: http.StatusOK,
wantContentType: "Custom-Content-Type",
}, {
name: "stream_content_type",
marshaler: marshalerStreamContentType{
Marshaler: marshaler,
CustomStreamContentType: "Stream-Content-Type",
},
msgs: []msg{
{&pb.SimpleMessage{Id: "One"}, nil},
},
statusCode: http.StatusOK,
wantContentType: "Stream-Content-Type",
}}

newTestRecv := func(t *testing.T, msgs []msg) func() (proto.Message, error) {
Expand All @@ -224,14 +256,13 @@ func TestForwardResponseStreamCustomMarshaler(t *testing.T) {
}
}
ctx := runtime.NewServerMetadataContext(context.Background(), runtime.ServerMetadata{})
marshaler := &CustomMarshaler{&runtime.JSONPb{}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recv := newTestRecv(t, tt.msgs)
req := httptest.NewRequest("GET", "http://example.com/foo", nil)
resp := httptest.NewRecorder()

runtime.ForwardResponseStream(ctx, runtime.NewServeMux(), marshaler, resp, req, recv)
runtime.ForwardResponseStream(ctx, runtime.NewServeMux(), tt.marshaler, resp, req, recv)

w := resp.Result()
if w.StatusCode != tt.statusCode {
Expand All @@ -245,16 +276,16 @@ func TestForwardResponseStreamCustomMarshaler(t *testing.T) {
t.Errorf("Failed to read response body with %v", err)
}
w.Body.Close()
if len(body) > 0 && w.Header.Get("Content-Type") != "Custom-Content-Type" {
t.Errorf("Content-Type %s want Custom-Content-Type", w.Header.Get("Content-Type"))
if w.Header.Get("Content-Type") != tt.wantContentType {
t.Errorf("Content-Type %q want %q", w.Header.Get("Content-Type"), tt.wantContentType)
}

var want []byte
for _, msg := range tt.msgs {
if msg.err != nil {
t.Skip("checking error encodings")
}
b, err := marshaler.Marshal(map[string]proto.Message{"result": msg.pb})
b, err := tt.marshaler.Marshal(map[string]proto.Message{"result": msg.pb})
if err != nil {
t.Errorf("marshaler.Marshal() failed %v", err)
}
Expand Down
8 changes: 8 additions & 0 deletions runtime/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ type Delimited interface {
// Delimiter returns the record separator for the stream.
Delimiter() []byte
}

// StreamContentType defines the streaming content type.
type StreamContentType interface {
// StreamContentType returns the content type for a stream. This shares the
// same behaviour as for `Marshaler.ContentType`, but is called, if present,
// in the case of a streamed response.
StreamContentType(v interface{}) string
}

0 comments on commit 371dddb

Please sign in to comment.