Skip to content

Commit

Permalink
POST-147: cleaned up logging after locating error, added more logging…
Browse files Browse the repository at this point in the history
… for debugging.
  • Loading branch information
henrikoppen committed Nov 21, 2022
1 parent 9a64e9a commit 76687af
Showing 1 changed file with 5 additions and 38 deletions.
43 changes: 5 additions & 38 deletions go/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,28 +292,17 @@ type checkpointOrEvent struct {

// FetchEvents is a client-side implementation that queries the server and properly deserializes received data.
func (c Client) FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint int, r EventReceiver, headers ...string) error {
c.logger.WithField("event", "zeroeventhub.fetch_events_entered").Debug()
if len(cursors) == 0 {
c.logger.WithFields(logrus.Fields{
"event": "zeroeventhub.missing_cursor_error",
"stackTrace": errors.WithStack(ErrCursorsMissing).Error(),
}).WithError(ErrCursorsMissing).Debug()
return ErrCursorsMissing
}

c.logger.WithField("event", "zeroeventhub.new_request").Debug()
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/feed/v1", c.url), nil)
if err != nil {
c.logger.WithFields(logrus.Fields{
"event": "zeroeventhub.request_setup_error",
"stackTrace": errors.WithStack(err).Error(),
}).WithError(err).Debug()
return err
}

req = req.WithContext(ctx)

c.logger.WithField("event", "zeroeventhub.url_setup").Debug()
q := req.URL.Query()
q.Add("n", fmt.Sprintf("%d", c.partitionCount))
if pageSizeHint != DefaultPageSize {
Expand All @@ -327,36 +316,25 @@ func (c Client) FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint
}
req.URL.RawQuery = q.Encode()

c.logger.WithField("event", "zeroeventhub.request_processor").Debug()
c.logger.WithFields(logrus.Fields{
"event": "zeroeventhub.url_check",
"url": req.URL.String(),
}).WithContext(ctx).Debug()

if err := c.requestProcessor(req); err != nil {
c.logger.WithFields(logrus.Fields{
"event": "zeroeventhub.request_processor_error",
"stackTrace": errors.WithStack(err).Error(),
}).WithError(err).Debug()
return err
}

c.logger.WithField("event", "zeroeventhub.do_request").Debug()
res, err := c.httpClient.Do(req)
if err != nil {
c.logger.WithFields(logrus.Fields{
"event": "zeroeventhub.do_request_error",
"stackTrace": errors.WithStack(err).Error(),
}).WithError(err).Debug()
return err
}
defer func(body io.ReadCloser) {
_ = body.Close()
}(res.Body)

c.logger.WithField("event", "zeroeventhub.status_code_check").Debug()
if res.StatusCode/100 != 2 {
if all, err := io.ReadAll(res.Body); err != nil {
c.logger.WithFields(logrus.Fields{
"event": "zeroeventhub.read_body_err",
"statusCode": strconv.Itoa(res.StatusCode),
"stackTrace": errors.WithStack(err).Error(),
}).WithError(err).Debug()
return err
} else {
err = errors.New(string(all))
Expand All @@ -369,7 +347,6 @@ func (c Client) FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint
}
}

c.logger.WithField("event", "zeroeventhub.scanner_start").Debug()
scanner := bufio.NewScanner(res.Body)
for scanner.Scan() {
line := bytes.TrimSpace(scanner.Bytes())
Expand All @@ -384,23 +361,13 @@ func (c Client) FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint
}
if parsedLine.Cursor != "" {
// checkpoint
c.logger.WithField("event", "zeroeventhub.checkpoing_start").Debug()
if err := r.Checkpoint(parsedLine.PartitionId, parsedLine.Cursor); err != nil {
c.logger.WithFields(logrus.Fields{
"event": "zeroeventhub.checkpoint_error",
"stackTrace": errors.WithStack(err).Error(),
}).WithError(err).Debug()
return err
}

} else {
// event
c.logger.WithField("event", "zeroeventhub.event_start").Debug()
if err := r.Event(parsedLine.PartitionId, parsedLine.Headers, parsedLine.Data); err != nil {
c.logger.WithFields(logrus.Fields{
"event": "zeroeventhub.event_error",
"stackTrace": errors.WithStack(err).Error(),
}).WithError(err).Debug()
return err
}
}
Expand Down

0 comments on commit 76687af

Please sign in to comment.