Skip to content

Commit

Permalink
Fix stuck on startup (#81)
Browse files Browse the repository at this point in the history
* fix getting stuck during processing large pending queues

* updated dependencies
  • Loading branch information
nrvnrvn committed Jul 10, 2017
1 parent 3565f1b commit 95db868
Show file tree
Hide file tree
Showing 18 changed files with 380 additions and 253 deletions.
26 changes: 26 additions & 0 deletions beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package beater

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -122,6 +123,26 @@ func (jb *Journalbeat) initJournal() error {
return nil
}

func (jb *Journalbeat) publishPending() error {
pending := map[string]common.MapStr{}
file, err := os.Open(jb.config.PendingQueue.File)
if err != nil {
return err
}
defer file.Close()

if err = json.NewDecoder(file).Decode(&pending); err != nil {
return err
}

logp.Info("Loaded %d events, trying to publish", len(pending))
for cursor, event := range pending {
jb.client.PublishEvent(event, publisher.Signal(&eventSignal{&eventReference{cursor, event}, jb.completed}), publisher.Guaranteed)
}

return nil
}

// New creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
Expand Down Expand Up @@ -165,6 +186,11 @@ func (jb *Journalbeat) Run(b *beat.Beat) error {
go jb.writeCursorLoop()
}

// load the previously saved queue of unsent events and try to publish them if any
if err := jb.publishPending(); err != nil {
logp.Warn("could not read the pending queue: %s", err)
}

for rawEvent := range journal.Follow(jb.journal, jb.done) {
//convert sdjournal.JournalEntry to common.MapStr
event := MapStrFromJournalEntry(
Expand Down
21 changes: 0 additions & 21 deletions beater/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
)

// eventSignal implements the op.Signaler interface
Expand Down Expand Up @@ -86,17 +85,6 @@ func (jb *Journalbeat) managePendingQueueLoop() {
return os.Rename(tempFile.Name(), dest)
}

// load loads the map[string]common.MapStr from the JSON file on disk
load := func(source string, dest *map[string]common.MapStr) error {
file, err := os.Open(source)
if err != nil {
return err
}
defer file.Close()

return json.NewDecoder(file).Decode(dest)
}

// on exit fully consume both queues and flush to disk the pending queue
defer func() {
var wg sync.WaitGroup
Expand All @@ -123,15 +111,6 @@ func (jb *Journalbeat) managePendingQueueLoop() {
}
}()

// load the previously saved queue of unsent events and try to publish them if any
if err := load(jb.config.PendingQueue.File, &pending); err != nil {
logp.Warn("could not read the pending queue: %s", err)
}
logp.Info("Loaded %d events, trying to publish", len(pending))
for cursor, event := range pending {
jb.client.PublishEvent(event, publisher.Signal(&eventSignal{&eventReference{cursor, event}, jb.completed}), publisher.Guaranteed)
}

// flush the pending queue to disk periodically
tick := time.Tick(jb.config.PendingQueue.FlushPeriod)
for {
Expand Down
5 changes: 4 additions & 1 deletion vendor/github.com/Shopify/sarama/config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions vendor/github.com/Shopify/sarama/consumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/github.com/Shopify/sarama/fetch_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 18 additions & 2 deletions vendor/github.com/Shopify/sarama/message.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/davecgh/go-spew/spew/common.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/elastic/beats/libbeat/beat/version.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions vendor/github.com/garyburd/redigo/redis/reply.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 11 additions & 31 deletions vendor/github.com/nranchev/go-libGeoIP/libgeo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 95db868

Please sign in to comment.