Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement unary HTTP calls with retry support #649

Merged
merged 5 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,9 @@ func TestServer(t *testing.T) {
pingServer{checkMetadata: true},
)
errorWriter := connect.NewErrorWriter()
// Add some net/http middleware to the ping service so we can also exercise ErrorWriter.
// Add net/http middleware to the ping service to evaluate HTTP state.
mux.Handle(pingRoute, http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
// Exercise ErrorWriter for HTTP middleware errors.
if request.Header.Get(clientMiddlewareErrorHeader) != "" {
defer request.Body.Close()
if _, err := io.Copy(io.Discard, request.Body); err != nil {
Expand All @@ -449,6 +450,24 @@ func TestServer(t *testing.T) {
}
return
}
// Check Content-Length is set correctly.
switch request.URL.Path {
case pingv1connect.PingServicePingProcedure,
pingv1connect.PingServiceFailProcedure,
pingv1connect.PingServiceCountUpProcedure:
// Unary requests set Content-Length to the length of the request body.
if request.ContentLength < 0 {
t.Errorf("%s: expected Content-Length >= 0, got %d", request.URL.Path, request.ContentLength)
}
case pingv1connect.PingServiceSumProcedure,
pingv1connect.PingServiceCumSumProcedure:
// Streaming requests set Content-Length to -1 or 0 on empty requests.
if request.ContentLength > 0 {
emcfarlane marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("%s: expected Content-Length -1 or 0, got %d", request.URL.Path, request.ContentLength)
}
default:
t.Errorf("unexpected path %q", request.URL.Path)
}
pingHandler.ServeHTTP(response, request)
}))

Expand Down
187 changes: 144 additions & 43 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"net/http"
"net/url"
"sync"
"sync/atomic"
)

Expand All @@ -36,10 +37,8 @@ type duplexHTTPCall struct {
onRequestSend func(*http.Request)
validateResponse func(*http.Response) *Error

// We'll use a pipe as the request body. We hand the read side of the pipe to
// net/http, and we write to the write side (naturally). The two ends are
// safe to use concurrently.
requestBodyReader *io.PipeReader
// io.Pipe is used to implement the request body for client streaming calls.
// If the request is unary, requestBodyWriter is nil.
requestBodyWriter *io.PipeWriter

// requestSent ensures we only send the request once.
Expand All @@ -65,7 +64,6 @@ func newDuplexHTTPCall(
// Request. This ensures if a transport out of our control wants
// to mutate the req.URL, we don't feel the effects of it.
url = cloneURL(url)
pipeReader, pipeWriter := io.Pipe()

// This is mirroring what http.NewRequestContext did, but
// using an already parsed url.URL object, rather than a string
Expand All @@ -80,24 +78,35 @@ func newDuplexHTTPCall(
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Body: pipeReader,
Body: http.NoBody,
GetBody: getNoBody,
Host: url.Host,
}).WithContext(ctx)
return &duplexHTTPCall{
ctx: ctx,
httpClient: httpClient,
streamType: spec.StreamType,
requestBodyReader: pipeReader,
requestBodyWriter: pipeWriter,
request: request,
responseReady: make(chan struct{}),
ctx: ctx,
httpClient: httpClient,
streamType: spec.StreamType,
request: request,
responseReady: make(chan struct{}),
}
}

// Send sends a message to the server.
func (d *duplexHTTPCall) Send(payload messsagePayload) (int64, error) {
isFirst := d.ensureRequestMade()
// Before we send any data, check if the context has been canceled.
func (d *duplexHTTPCall) Send(payload messagePayload) (int64, error) {
if d.streamType&StreamTypeClient == 0 {
return d.sendUnary(payload)
}
isFirst := d.requestSent.CompareAndSwap(false, true)
if isFirst {
// This is the first time we're sending a message to the server.
// We need to send the request headers and start the request.
pipeReader, pipeWriter := io.Pipe()
d.requestBodyWriter = pipeWriter
d.request.Body = pipeReader
d.request.GetBody = nil // GetBody not supported for client streaming
d.request.ContentLength = -1
go d.makeRequest() // concurrent request
}
if err := d.ctx.Err(); err != nil {
return 0, wrapIfContextError(err)
}
Expand All @@ -113,18 +122,55 @@ func (d *duplexHTTPCall) Send(payload messsagePayload) (int64, error) {
// Signal that the stream is closed with the more-typical io.EOF instead of
// io.ErrClosedPipe. This makes it easier for protocol-specific wrappers to
// match grpc-go's behavior.
return bytesWritten, io.EOF
err = io.EOF
}
return bytesWritten, err
}

func (d *duplexHTTPCall) sendUnary(payload messagePayload) (int64, error) {
// Unary messages are sent as a single HTTP request. We don't need to use a
// pipe for the request body and we don't need to send headers separately.
if !d.requestSent.CompareAndSwap(false, true) {
return 0, fmt.Errorf("request already sent")
}
payloadLength := int64(payload.Len())
if payloadLength > 0 {
// Build the request body from the payload.
payloadBody := newPayloadCloser(payload)
d.request.Body = payloadBody
d.request.ContentLength = payloadLength
d.request.GetBody = func() (io.ReadCloser, error) {
if !payloadBody.Rewind() {
return nil, fmt.Errorf("payload cannot be retried")
}
return payloadBody, nil
}
// Release the payload ensuring that after Send returns the
// payload is safe to be reused. See [http.RoundTripper] for
// more details.
defer payloadBody.Release()
}
d.makeRequest() // synchronous request
if d.responseErr != nil {
// Check on response errors for context errors. Other errors are
// handled on read.
if err := d.ctx.Err(); err != nil {
return 0, wrapIfContextError(err)
}
}
return payloadLength, nil
}

// Close the request body. Callers *must* call CloseWrite before Read when
// using HTTP/1.x.
func (d *duplexHTTPCall) CloseWrite() error {
// Even if Write was never called, we need to make an HTTP request. This
// ensures that we've sent any headers to the server and that we have an HTTP
// response to read from.
d.ensureRequestMade()
if d.requestSent.CompareAndSwap(false, true) {
go d.makeRequest()
return nil
}
// The user calls CloseWrite to indicate that they're done sending data. It's
// safe to close the write side of the pipe while net/http is reading from
// it.
Expand All @@ -136,7 +182,10 @@ func (d *duplexHTTPCall) CloseWrite() error {
// forever. To make sure users don't have to worry about this, the generated
// code for unary, client streaming, and server streaming RPCs must call
// CloseWrite automatically rather than requiring the user to do it.
return d.requestBodyWriter.Close()
if d.requestBodyWriter != nil {
return d.requestBodyWriter.Close()
}
return d.request.Body.Close()
}

// Header returns the HTTP request headers.
Expand Down Expand Up @@ -171,9 +220,6 @@ func (d *duplexHTTPCall) Read(data []byte) (int, error) {
if err := d.ctx.Err(); err != nil {
return 0, wrapIfContextError(err)
}
if d.response == nil {
return 0, fmt.Errorf("nil response from %v", d.request.URL)
}
n, err := d.response.Body.Read(data)
return n, wrapIfRSTError(err)
}
Expand Down Expand Up @@ -233,17 +279,6 @@ func (d *duplexHTTPCall) BlockUntilResponseReady() error {
return d.responseErr
}

// ensureRequestMade sends the request headers and starts the response stream.
// It is not safe to call this concurrently. Write and CloseWrite call this but
// ensure that they're not called concurrently.
func (d *duplexHTTPCall) ensureRequestMade() (isFirst bool) {
if d.requestSent.CompareAndSwap(false, true) {
go d.makeRequest()
return true
}
return false
}

func (d *duplexHTTPCall) makeRequest() {
// This runs concurrently with Write and CloseWrite. Read and CloseRead wait
// on d.responseReady, so we can't race with them.
Expand All @@ -253,7 +288,6 @@ func (d *duplexHTTPCall) makeRequest() {
if host := d.request.Header.Get(headerHost); len(host) > 0 {
d.request.Host = host
}

if d.onRequestSend != nil {
d.onRequestSend(d.request)
}
Expand All @@ -272,15 +306,15 @@ func (d *duplexHTTPCall) makeRequest() {
err = NewError(CodeUnavailable, err)
}
d.responseErr = err
d.requestBodyWriter.Close()
_ = d.CloseWrite()
return
}
// We've got a response. We can now read from the response body.
// Closing the response body is delegated to the caller even on error.
d.response = response
if err := d.validateResponse(response); err != nil {
d.responseErr = err
d.requestBodyWriter.Close()
_ = d.CloseWrite()
return
}
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
Expand All @@ -293,13 +327,18 @@ func (d *duplexHTTPCall) makeRequest() {
response.ProtoMajor,
response.ProtoMinor,
)
d.requestBodyWriter.Close()
_ = d.CloseWrite()
}
}

// messsagePayload is a sized and seekable message payload. The interface is
// implemented by [*bytes.Reader] and *envelope.
type messsagePayload interface {
// getNoBody is a GetBody function for http.NoBody.
func getNoBody() (io.ReadCloser, error) {
return http.NoBody, nil
}

// messagePayload is a sized and seekable message payload. The interface is
// implemented by [*bytes.Reader] and *envelope. Reads must be non-blocking.
type messagePayload interface {
io.Reader
io.WriterTo
io.Seeker
Expand All @@ -310,7 +349,7 @@ type messsagePayload interface {
// to the server.
type nopPayload struct{}

var _ messsagePayload = nopPayload{}
var _ messagePayload = nopPayload{}

func (nopPayload) Read([]byte) (int, error) {
return 0, io.EOF
Expand All @@ -328,7 +367,7 @@ func (nopPayload) Len() int {
// messageSender sends a message payload. The interface is implemented by
// [*duplexHTTPCall] and writeSender.
type messageSender interface {
Send(messsagePayload) (int64, error)
Send(messagePayload) (int64, error)
}

// writeSender is a sender that writes to an [io.Writer]. Useful for wrapping
Expand All @@ -339,7 +378,7 @@ type writeSender struct {

var _ messageSender = writeSender{}

func (w writeSender) Send(payload messsagePayload) (int64, error) {
func (w writeSender) Send(payload messagePayload) (int64, error) {
return payload.WriteTo(w.writer)
}

Expand All @@ -356,3 +395,65 @@ func cloneURL(oldURL *url.URL) *url.URL {
}
return newURL
}

// payloadCloser is an [io.ReadCloser] that wraps a messagePayload. It's used to
// implement the request body for unary calls. To safely reuse the buffer
// call Release after the response is received to ensure the payload is safe for
// reuse.
type payloadCloser struct {
mu sync.Mutex
payload messagePayload // nil after Release
}

func newPayloadCloser(payload messagePayload) *payloadCloser {
return &payloadCloser{
payload: payload,
}
}

// Read implements [io.Reader].
func (p *payloadCloser) Read(dst []byte) (readN int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.payload == nil {
return 0, io.EOF
}
return p.payload.Read(dst)
}

// WriteTo implements [io.WriterTo].
func (p *payloadCloser) WriteTo(dst io.Writer) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.payload == nil {
return 0, nil
}
return p.payload.WriteTo(dst)
}

// Close implements [io.Closer].
func (p *payloadCloser) Close() error {
return nil
}

// Rewind rewinds the payload to the beginning. It returns false if the
// payload has been discarded from a previous call to Release.
func (p *payloadCloser) Rewind() bool {
p.mu.Lock()
defer p.mu.Unlock()
if p.payload == nil {
return false
}
if _, err := p.payload.Seek(0, io.SeekStart); err != nil {
return false
}
return true
}

// Release discards the payload. After Release is called, the payload cannot be
// rewound and the payload is safe to reuse.
func (p *payloadCloser) Release() {
p.mu.Lock()
p.payload = nil
p.mu.Unlock()
}
Loading