-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathprotocol.go
411 lines (375 loc) · 16.1 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
// Copyright 2023-2024 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package vanguard
import (
"bytes"
"io"
"net/http"
"net/textproto"
"strconv"
"strings"
"time"
"connectrpc.com/connect"
"google.golang.org/protobuf/proto"
)
const envelopeLen = 5
// Protocol represents an on-the-wire protocol for RPCs.
type Protocol int
const (
// ProtocolConnect indicates the Connect protocol. This protocol supports
// unary and streaming endpoints. However, bidirectional streams are only
// supported when combined with HTTP/2.
ProtocolConnect = Protocol(iota + 1)
// ProtocolGRPC indicates the gRPC protocol. This protocol can only be
// used in combination with HTTP/2. It supports unary and all kinds of
// streaming endpoints.
ProtocolGRPC
// ProtocolGRPCWeb indicates the gRPC-Web protocol. This is a tweak of the
// gRPC protocol to support HTTP 1.1. This protocol supports unary and
// streaming endpoints. However, bidirectional streams are only supported
// when combined with HTTP/2.
ProtocolGRPCWeb
// ProtocolREST indicates the REST+JSON protocol. This protocol often
// requires non-trivial transformations between HTTP requests and responses
// and Protobuf request and response messages.
//
// Only methods that have the google.api.http annotation can be invoked
// with this protocol. The annotation defines the "shape" of the HTTP
// request and response, such as the URI path, HTTP method, and how URI
// path components, query string parameters, and an optional request
// body are mapped to the Protobuf request message.
//
// This protocol only supports unary and server-stream endpoints.
ProtocolREST
)
var (
// allProtocols are all supported protocols in descending order of
// preference. The first protocol is the default protocol.
allProtocols = [...]Protocol{
ProtocolConnect,
ProtocolGRPC,
ProtocolGRPCWeb,
ProtocolREST,
}
protocolToString = map[Protocol]string{
ProtocolConnect: "Connect",
ProtocolGRPC: "gRPC",
ProtocolGRPCWeb: "gRPC-Web",
ProtocolREST: "REST",
}
)
func (p Protocol) String() string {
s, ok := protocolToString[p]
if !ok {
return strconv.Itoa(int(p))
}
return s
}
func (p Protocol) serverHandler(op *operation) serverProtocolHandler {
switch p {
case ProtocolConnect:
if op.methodConf.streamType == connect.StreamTypeUnary {
return connectUnaryServerProtocol{}
}
return connectStreamServerProtocol{}
case ProtocolGRPC:
return grpcServerProtocol{}
case ProtocolGRPCWeb:
return grpcWebServerProtocol{}
case ProtocolREST:
return restServerProtocol{}
default:
return nil
}
}
// clientProtocolHandler handles the protocol used by the client.
// This allows the middleware to understand the incoming request
// and to send valid responses to the client.
type clientProtocolHandler interface {
protocol() Protocol
acceptsStreamType(*operation, connect.StreamType) bool
// Extracts relevant request metadata from the given headers to
// determine the codec (aka sub-format), compression (aka encoding),
// timeout, etc. The relevant headers are interpreted into the
// returned requestMeta and also *removed* from the given headers.
extractProtocolRequestHeaders(*operation, http.Header) (requestMeta, error)
// TODO: The following two methods were meant to be agnostic as to whether
// the protocol is a streaming protocol or a unary one. The operations
// are split because a streaming protocol cannot change headers or the
// status code from encodeEnd because headers and status have already
// been written. This requires unary implementations to do extra
// handling of errors in addProtocolResponseHeaders, awkwardly separated
// from the handling in encodeEnd. Worse, if an unexpected error happens
// in encodeEnd, it is too late to change status code or headers. We
// could possibly combine these for unary-only protocols to make the
// implementation simpler. If we do, we'd need a way to swap protocol
// handlers -- so that a REST handler can swap itself out for a
// unary- or streaming-specific implementation once the method is known
// (for streaming upload/download endpoints or in future general support
// for server streaming endpoints).
// Encodes the given responseMeta as headers into the given target
// headers. If provided, allowedCompression should be used instead
// of meta.allowedCompression when adding "accept-encoding" headers.
//
// The return value is the status code that should be sent to the
// client. If the status code written was anything other than
// 200 OK, the given meta will include a responseEnd that has that
// original code.
//
// Note that this method's responsibility is to decide the status
// code and set headers. When meta.end is non-nil, encodeEnd will
// also be called, which is where a response body and trailers
// can be written.
addProtocolResponseHeaders(meta responseMeta, target http.Header) int
// Encodes the given final disposition of the RPC to the given
// writer. It can also return any trailers to add to the response.
// Some protocols may ignore the writer; some will return no
// trailers.
//
// The given codec represents the sub-format that the client used
// (which could be used, for example, to encode the error).
//
// The wasInHeaders flag indicates that end was signalled in the
// response headers. For some protocols, like gRPC and gRPC-Web,
// this is the difference between a trailers-only response and a
// normal response (where the end is signalled in the response
// body or trailers, not headers). When this is true, the end was
// also already provided to addProtocolResponseHeaders.
encodeEnd(op *operation, end *responseEnd, writer io.Writer, wasInHeaders bool) http.Header
// String returns a human-readable name/description of protocol.
String() string
}
// clientProtocolSupportsGet is an optional interface implemented by
// clientProtocolHandler instances that can support the GET HTTP method.
type clientProtocolAllowsGet interface {
allowsGetRequests(*methodConfig) bool
}
// serverProtocolHandler handles the protocol used by the server.
// This allows the middleware to send a valid request to the server
// and understand the responses it sends.
type serverProtocolHandler interface {
protocol() Protocol
// Encodes the given requestMeta has headers into the given target
// headers. If non-nil, allowedCompression should be used instead
// of meta.allowedCompression when adding "accept-encoding" headers.
addProtocolRequestHeaders(meta requestMeta, target http.Header)
// Returns the response metadata from the headers.
//
// If the response meta's end field is set (i.e. headers indicate RPC
// is over), but the protocol needs to read the response body to
// populate it, it should return a non-nil function as the second
// returned value. This generally only occurs when the RPC fails and
// the body includes error information. If the body includes response
// message data, handlers should NOT set a non-nil end.
//
// If the headers include trailers (such as in the Connect unary
// protocol), but the RPC isn't quite over because the message data
// must still be read from the response body, the handler should
// instead populate the pendingTrailers field of meta. Note that
// this field is ignored if the end field is non-nil. So if the
// end is set to non-nil, the handler should store trailers there.
//
// This function will receive the server's codec (optionally used
// to encode other messages and could be used to decode the error
// body), the body, and a pointer to the responseEnd which should
// be populated with the details. If the response body was compressed,
// it will be decompressed before it is provided to the given function.
extractProtocolResponseHeaders(statusCode int, headers http.Header) (responseMeta, responseEndUnmarshaller, error)
// Called at end of RPC if responseEnd has not been returned by
// extractProtocolResponseHeaders or from an enveloped message
// in the response body whose trailer bit is set.
extractEndFromTrailers(*operation, http.Header) (responseEnd, error)
// String returns a human-readable name/description of protocol.
String() string
}
// responseEndUnmarshaller populates the given responseEnd by unmarshalling
// information from the given buffer. If unmarshalling needs to know the
// server's codec, it also provided as the first argument.
type responseEndUnmarshaller func(Codec, *bytes.Buffer, *responseEnd)
// clientProtocolEndMustBeInHeaders is an optional interface implemented
// by clientProtocolHandler instances to indicate if the end of an RPC
// must be indicated in response headers (not trailers or in the body).
// If a protocol handler does not implement this, it is assumed to be
// false.
type clientProtocolEndMustBeInHeaders interface {
endMustBeInHeaders() bool
}
// envelopedProtocolHandler is an optional interface implemented
// by clientProtocolHandler and serverProtocolHandler instances
// whose protocol uses an envelope around messages.
type envelopedProtocolHandler interface {
decodeEnvelope(envelopeBytes) (envelope, error)
encodeEnvelope(envelope) envelopeBytes
}
// serverEnvelopedProtocolHandler is an optional interface implemented
// by serverProtocolHandler instances whose protocol uses an envelope
// around messages.
type serverEnvelopedProtocolHandler interface {
envelopedProtocolHandler
// If a stream includes an envelope with the trailer bit
// set, this is called to parse the message contents. The
// given reader will be decompressed (even if the envelope
// had its compressed bit set).
//
// The given codec represents the sub-format used to send
// the request to the server (which may be used to decode
// the error).
decodeEndFromMessage(*operation, *bytes.Buffer) (responseEnd, error)
}
// requestLineBuilder is an optional interface implemented by
// serverProtocolHandler instances whose HTTP request line
// needs to be computed in a custom manner. By default (for
// protocols that do not implement this), the request line is
// "POST /<service>/<method>".
//
// This is necessary for REST and Connect GET requests, which
// can encode parts of the request data into the URI path
// or query string parameters.
type requestLineBuilder interface {
// Returns true if the request message must be known in order
// to compute the request line.
requiresMessageToProvideRequestLine(*operation) bool
// Computes the components of the request line and also
// indicates if the request will include a body or not. The
// body can be omitted for requests where *all* request
// information is supplied in the request line.
requestLine(op *operation, req proto.Message) (urlPath, queryParams, method string, includeBody bool, err error)
}
// clientBodyPreparer is an optional interface implemented by
// clientProtocolHandler instances whose request messages may
// need to be assembled from sources other than just decoding
// the request or response body.
type clientBodyPreparer interface {
// Returns true if the request message needs to be prepared.
// If it can simply be read and decoded from the request body
// then it does not need to be prepared. But if the message
// data must be merged with parts of the request path or
// query param (etc), it must return true.
requestNeedsPrep(*operation) bool
// Combines the given request body data with other info to
// produce a request message. The given bytes represent the
// uncompressed request body. The given message should be
// populated if/when the method returns nil.
prepareUnmarshalledRequest(op *operation, src []byte, target proto.Message) error
// Returns true if the response message needs to be prepared.
// If it can simply be encoded into the response body then it
// does not need to be prepared. But if the message data must
// be wrapped or some parts discarded, the method must return
// true.
responseNeedsPrep(*operation) bool
// Produces the request body for the given message. The data
// should be appended to the given slice (which will be empty
// but have capacity to accept data) to reduce allocations.
// The given headers may be updated, like if a message has
// content that must go into headers (such as recording a
// custom content-type for uses of google.api.HttpBody).
prepareMarshalledResponse(op *operation, base []byte, src proto.Message, headers http.Header) ([]byte, error)
}
// serverBodyPreparer is an optional interface implemented by
// serverProtocolHandler instances whose request messages may
// need to be assembled from sources other than just decoding
// the request or response body.
type serverBodyPreparer interface {
// These methods are reversed from clientBodyPreparer: for the
// server side, we have a request message and must produce a
// body; and we have a response body and must extract from that
// a message.
requestNeedsPrep(*operation) bool
prepareMarshalledRequest(op *operation, base []byte, src proto.Message, headers http.Header) ([]byte, error)
responseNeedsPrep(*operation) bool
prepareUnmarshalledResponse(op *operation, src []byte, target proto.Message) error
}
// envelopeBytes is an array of bytes representing an encoded envelope.
type envelopeBytes [envelopeLen]byte
// envelope is an exploded representation of the 5-byte preamble that appears
// on the wire for enveloped protocols. This form is protocol-agnostic.
type envelope struct {
trailer bool
compressed bool
length uint32
}
// requestMeta represents the metadata found in request headers that are
// protocol-specific.
type requestMeta struct {
timeout time.Duration
hasTimeout bool
codec string
compression string
acceptCompression []string
}
// responseMeta represents the metadata found in response headers that are
// protocol-specific.
type responseMeta struct {
end *responseEnd
codec string
compression string
acceptCompression []string
pendingTrailers http.Header
pendingTrailerKeys headerKeys
}
// responseEnd is a protocol-agnostic representation of the disposition
// of an RPC.
type responseEnd struct {
err *connect.Error
trailers http.Header
// httpCode is only populated when the responseEnd source contained
// such a code. This happens when the responseEnd comes from the
// response headers, which include the status line. It can also
// occur for REST streaming responses, where the final message may
// include both gRPC and HTTP codes.
httpCode int
// For enveloping protocols where the end is in a special stream
// payload, this will be true if that special payload was compressed.
// This can be used by a protocol handler that also encodes the end
// in a stream payload to decide whether to compress the final frame.
wasCompressed bool
}
type headerKeys map[string]struct{}
func (k headerKeys) add(key string) {
k[textproto.CanonicalMIMEHeaderKey(key)] = struct{}{}
}
func (k headerKeys) contains(key string) bool {
_, contains := k[textproto.CanonicalMIMEHeaderKey(key)]
return contains
}
// parseMultiHeader parses headers that allow multiple values. It
// supports the values being supplied in a single header separated
// by commas, multiple headers, or a combination thereof.
func parseMultiHeader(vals []string) []string {
if len(vals) == 0 {
return nil
}
var count int
for _, val := range vals {
count += strings.Count(val, ",") + 1
}
result := make([]string, 0, count)
for _, val := range vals {
for {
pos := strings.IndexByte(val, ',')
if pos == -1 {
if val != "" {
result = append(result, strings.TrimSpace(val))
}
break
}
item := val[:pos]
if item != "" {
result = append(result, strings.TrimSpace(item))
}
val = val[pos+1:]
}
}
return result
}