Skip to content

Commit

Permalink
Fix forwarder body buffering (#713)
Browse files Browse the repository at this point in the history
Currently, the forwarder is duplicating headers the tests
fail when testing components within an Istio mesh.

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored May 3, 2024
1 parent eee0b8a commit b2181ed
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions pkg/eventshub/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ limitations under the License.
package forwarder

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding"
"go.opencensus.io/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -127,10 +128,21 @@ func (o *Forwarder) ServeHTTP(writer http.ResponseWriter, request *http.Request)
requestCtx, span := trace.StartSpan(request.Context(), "eventshub-forwarder")
defer span.End()

body, err := io.ReadAll(request.Body)
if err != nil {
writer.WriteHeader(http.StatusInternalServerError)
logging.FromContext(o.ctx).Errorw("Failed to read request body", zap.Error(err))
return
}
_ = request.Body.Close()
request.Body = io.NopCloser(bytes.NewBuffer(body))

m := cloudeventshttp.NewMessageFromHttpRequest(request)
defer m.Finish(nil)

event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m)
request.Body = io.NopCloser(bytes.NewBuffer(body)) // reset body

receivedHeaders := make(http.Header)
for k, v := range request.Header {
if !strings.HasPrefix(k, "Ce-") {
Expand Down Expand Up @@ -174,11 +186,6 @@ func (o *Forwarder) ServeHTTP(writer http.ResponseWriter, request *http.Request)
}
req.URL = u

err = cehttp.WriteRequest(requestCtx, binding.ToMessage(event), req)
if err != nil {
logging.FromContext(o.ctx).Error("Cannot write the event to request: ", err)
}

eventString := "unknown"
if event != nil {
eventString = event.String()
Expand All @@ -202,7 +209,7 @@ func (o *Forwarder) ServeHTTP(writer http.ResponseWriter, request *http.Request)
}
}

writer.WriteHeader(http.StatusAccepted)
writer.WriteHeader(res.StatusCode)
}

func (o *Forwarder) sentInfo(event *cloudevents.Event, req *http.Request, err error) eventshub.EventInfo {
Expand Down

0 comments on commit b2181ed

Please sign in to comment.