Skip to content

Commit

Permalink
Allow forwarder to forward events from files (#732)
Browse files Browse the repository at this point in the history
* Allow forwarder to forward events from files

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Update pkg/eventshub/forwarder/forwarder.go

Co-authored-by: Christoph Stäbler <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Christoph Stäbler <[email protected]>
  • Loading branch information
pierDipi and creydr authored May 31, 2024
1 parent fe1acf2 commit 2723f53
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions pkg/eventshub/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package forwarder
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"time"

Expand Down Expand Up @@ -51,6 +53,9 @@ type Forwarder struct {
// Sink
Sink string

// FromFiles allows forwarding JSON-formatted events that are present on specified files (comma separated list of file paths)
FromFiles string

// EventLogs is the list of EventLogger implementors to vent observed events.
EventLogs *eventshub.EventLogs

Expand All @@ -69,6 +74,9 @@ type envConfig struct {

// Sink url for the message destination
Sink string `envconfig:"SINK" required:"true"`

// FromFiles allows forwarding JSON-formatted events that are present on specified files (comma separated list of file paths)
FromFiles string `envconfig:"FROM_FILES" required:"false"`
}

func NewFromEnv(ctx context.Context, eventLogs *eventshub.EventLogs, handlerFuncs []eventshub.HandlerFunc, clientOpts []eventshub.ClientOption) *Forwarder {
Expand All @@ -83,6 +91,7 @@ func NewFromEnv(ctx context.Context, eventLogs *eventshub.EventLogs, handlerFunc
Name: env.Name,
Namespace: env.Namespace,
Sink: env.Sink,
FromFiles: env.FromFiles,
EventLogs: eventLogs,
ctx: ctx,
handlerFuncs: handlerFuncs,
Expand All @@ -106,6 +115,11 @@ func (o *Forwarder) Start(ctx context.Context) error {
handler = dec(handler)
}

if o.FromFiles != "" {
o.forwardFromFiles()
return nil
}

server := &http.Server{Addr: ":8080", Handler: handler}

var err error
Expand Down Expand Up @@ -277,3 +291,67 @@ func (o *Forwarder) responseInfo(res *http.Response, event *cloudevents.Event) e
}
return responseInfo
}

func (o *Forwarder) forwardFromFile(f string) {
b, err := os.ReadFile(f)
if err != nil {
logging.FromContext(o.ctx).Fatalw("Failed to read the file",
zap.String("file", f),
zap.Error(err))
}

event := &cloudevents.Event{}
if err := json.Unmarshal(b, event); err != nil {
logging.FromContext(o.ctx).Fatalw("Failed to unmarshal the event",
zap.String("file", f),
zap.Error(err))
}

eventInfo := eventshub.EventInfo{
Error: err.Error(),
Event: event,
Observer: o.Name,
Origin: f,
Time: time.Now(),
Kind: eventshub.EventSent,
}

// Log the event that is being forwarded
if err := o.EventLogs.Vent(eventInfo); err != nil {
logging.FromContext(o.ctx).Fatalw("Failed to vent the event",
zap.String("file", f),
zap.Error(err))
}

req, err := http.NewRequest("POST", o.Sink, nil)
if err != nil {
logging.FromContext(o.ctx).Fatalw("Failed to create the client request",
zap.String("file", f),
zap.Error(err))
}
if err := cehttp.WriteRequest(context.Background(), cloudevents.ToMessage(event), req); err != nil {
logging.FromContext(o.ctx).Fatalw("Failed to write the client request",
zap.String("file", f),
zap.Error(err))
}

res, err := o.httpClient.Do(req)
if err != nil {
logging.FromContext(o.ctx).Fatalw("Failed to forward the event",
zap.String("file", f),
zap.Error(err))
}

// Vent the response info
if err := o.EventLogs.Vent(o.responseInfo(res, event)); err != nil {
logging.FromContext(o.ctx).Errorw("Failed to log response for forwarded event",
zap.String("file", f),
zap.Error(err))
}
}

func (o *Forwarder) forwardFromFiles() {
for _, f := range strings.Split(o.FromFiles, ",") {
o.forwardFromFile(f)
}
}

0 comments on commit 2723f53

Please sign in to comment.