Skip to content

Commit

Permalink
Beta
Browse files Browse the repository at this point in the history
  • Loading branch information
lstyles committed Feb 28, 2020
1 parent 4bc6d08 commit e063017
Show file tree
Hide file tree
Showing 17 changed files with 615 additions and 358 deletions.
2 changes: 1 addition & 1 deletion _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ nsgflowlogsbeat:
ignore_older: 0

# Number of workers
message_processor_workers: 4
#workers: 4
33 changes: 21 additions & 12 deletions beater/Nsgflowlogsbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,31 @@ import (
"github.com/lstyles/nsgflowlogsbeat/nsgflowlogs"
)

// nsgflowlogsbeat configuration.
type nsgflowlogsbeat struct {
done chan struct{}
// Nsgflowlogsbeat is used to conform to the beat interface.
type Nsgflowlogsbeat struct {
config config.Config
done chan struct{}
}

// New creates an instance of nsgflowlogsbeat.
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {

c := config.DefaultConfig
if err := cfg.Unpack(&c); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
return nil, fmt.Errorf("Error reading configuration file: %v", err)
}

if c.ScanFrequency.Seconds() < 30 {
logp.Warn("Chosen interval of %s is not valid. Changing to default 30s", c.ScanFrequency.String())
c.ScanFrequency = 1 * time.Minute
c.ScanFrequency = 30 * time.Second
}

bt := &nsgflowlogsbeat{
logp.Debug("nsgflowlogsbeat", "Validating configuration")
if err := c.Validate(); err != nil {
panic(err)
}

bt := &Nsgflowlogsbeat{
done: make(chan struct{}),
config: c,
}
Expand All @@ -39,7 +45,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
}

// Run starts nsgflowlogsbeat.
func (bt *nsgflowlogsbeat) Run(b *beat.Beat) error {
func (bt *Nsgflowlogsbeat) Run(b *beat.Beat) error {
logp.Info("nsgflowlogsbeat is running! Hit CTRL-C to stop it.")

ticker := time.NewTicker(bt.config.ScanFrequency)
Expand All @@ -50,16 +56,19 @@ func (bt *nsgflowlogsbeat) Run(b *beat.Beat) error {
case <-ticker.C:
}

lh, err := nsgflowlogs.NewLogHarvester(&bt.config, b.Publisher)
lp, err := nsgflowlogs.NewLogProcessor(b, &bt.config, bt.done)
if err != nil {
logp.Error(err)
panic(err)
}

lh.ScanAndProcessUpdates()
lp.Process(bt.done)
}
}

// Stop stops nsgflowlogsbeat.
func (bt *nsgflowlogsbeat) Stop() {
close(bt.done)
func (bt *Nsgflowlogsbeat) Stop() {
logp.Info("Stopping nsgflowlogsbeat...")
if bt.done != nil {
close(bt.done)
}
}
11 changes: 11 additions & 0 deletions checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,15 @@ type Checkpoint struct {
RowKey string
ETag string
Index int64
Length int64
}

func NewCheckpoint(partitionKey, rowKey string) *Checkpoint {
return &Checkpoint{
PartitionKey: partitionKey,
RowKey: rowKey,
ETag: "",
Index: 0,
Length: 0,
}
}
40 changes: 25 additions & 15 deletions checkpoint/checkpoint_table.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package checkpoint

import (
"errors"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -33,18 +32,6 @@ func NewCheckpointTable(accountName, accountKey, checkpointsTableName string, ti
"Creating new instance of checkpoint table",
)

if len(accountName) == 0 {
return nil, errors.New("account name is required")
}

if len(accountKey) == 0 {
return nil, errors.New("account key is required")
}

if len(checkpointsTableName) == 0 {
return nil, errors.New("checkpoints table name is required")
}

c := &Config{
accountName: accountName,
accountKey: accountKey,
Expand Down Expand Up @@ -95,9 +82,12 @@ func (ct *Table) GetCheckpoint(partitionKey, rowKey string) (*Checkpoint, error)
Index: index,
}
return r, nil
}
} else {
// Checkpoint doesn't exist, create and return
c := NewCheckpoint(partitionKey, rowKey)

return nil, nil
return c, nil
}
}

// CreateOrUpdateCheckpoint creates or updates checkpoint in checkpoints table
Expand All @@ -109,6 +99,7 @@ func (ct *Table) CreateOrUpdateCheckpoint(checkpoint *Checkpoint) error {
checkpoint.RowKey,
checkpoint.ETag,
checkpoint.Index,
checkpoint.Length,
)

tableBatch := ct.table.NewBatch()
Expand All @@ -122,6 +113,7 @@ func (ct *Table) CreateOrUpdateCheckpoint(checkpoint *Checkpoint) error {
m := make(map[string]interface{})
m["ETag"] = checkpoint.ETag
m["Index"] = checkpoint.Index
m["Length"] = checkpoint.Length

entity.Properties = m

Expand All @@ -134,6 +126,24 @@ func (ct *Table) CreateOrUpdateCheckpoint(checkpoint *Checkpoint) error {
return nil
}

func (ct *Table) UpdateCheckpoint(partitionKey, rowKey, etag string, index int64) {

logp.Info("Updating checkpoint Partition Key: %s, Row Key: %s, ETag: %s, Index: %d", partitionKey, rowKey, etag, index)
c, err := ct.GetCheckpoint(partitionKey, rowKey)
if err != nil {
logp.Error(err)
}
if c.Index == 0 {
logp.Warn("This should never happen")
}

c.ETag = etag
c.Index = index
c.Length = index

ct.CreateOrUpdateCheckpoint(c)
}

func (ct *Table) initialize() error {

config := *ct.config
Expand Down
36 changes: 33 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@

package config

import "time"
import (
"fmt"
"time"

"github.com/joeshaw/multierror"
)

// Config - all configuration settings
type Config struct {
Expand All @@ -14,7 +19,8 @@ type Config struct {
CheckpointsTableName string `config:"checkpoints_table_name"`
CheckpointsTableTimeout uint `config:"checkpoints_table_timeout"`
IgnoreOlder time.Duration `config:"ignore_older"`
MessageProcessorWorkers int `config:"message_processor_workers"`
Workers int `config:"workers"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
}

// DefaultConfig - default configuration settings
Expand All @@ -24,5 +30,29 @@ var DefaultConfig = Config{
CheckpointsTableName: "nsgflowlogsbeat_checkpoints",
CheckpointsTableTimeout: 15,
IgnoreOlder: 10 * time.Second,
MessageProcessorWorkers: 4,
Workers: 4,
ShutdownTimeout: 15 * time.Second,
}

// Validate validates the configuration and returns an error describing all problems or nil if there are none
func (cfg Config) Validate() error {
var errs multierror.Errors

if len(cfg.StorageAccountName) == 0 {
errs = append(errs, fmt.Errorf("account name is required"))
}

if len(cfg.StorageAccountKey) == 0 {
errs = append(errs, fmt.Errorf("account key is required"))
}

if len(cfg.CheckpointsTableName) == 0 {
errs = append(errs, fmt.Errorf("checkpoints table name is required"))
}

if len(cfg.ContainerName) == 0 {
errs = append(errs, fmt.Errorf("container name is required"))
}

return errs.Err()
}
56 changes: 56 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,59 @@
// +build !integration

package config

import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
)

type validationTestCase struct {
config Config
errMsg string
}

func (v validationTestCase) run(t *testing.T) {
if v.errMsg == "" {
assert.NoError(t, v.config.Validate())
} else {
err := v.config.Validate()
if err != nil {
assert.Contains(t, err.Error(), v.errMsg)
} else {
t.Errorf("expected error with '%s'", v.errMsg)
}
}
}

func TestConfigValidate(t *testing.T) {
testCases := []validationTestCase{
// Top-level config
{
Config{
StorageAccountName: "<storage_account_name>",
StorageAccountKey: "<storage_account_key>",
ContainerName: "<container_name>",
CheckpointsTableName: "<checkpoints_table_name>",
},
"", // No Error
},
{
Config{},
"4 errors: account name is required; account key is required; checkpoints table name is required; container name is required",
},
}

for _, test := range testCases {
test.run(t)
}
}

func newConfig(from map[string]interface{}) *common.Config {
cfg, err := common.NewConfigFrom(from)
if err != nil {
panic(err)
}
return cfg
}
Empty file removed data/nsgflowlogsbeat.lock
Empty file.
89 changes: 89 additions & 0 deletions nsgflowlogs/acker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package nsgflowlogs

import (
"context"
"sync"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/lstyles/nsgflowlogsbeat/checkpoint"
"github.com/lstyles/nsgflowlogsbeat/config"
)

type EventACKer struct {
active *atomic.Int
wg *sync.WaitGroup
config *config.Config
checkpointTable *checkpoint.Table
}

func NewEventACKer(config *config.Config) (*EventACKer, error) {

ct, err := checkpoint.NewCheckpointTable(config.StorageAccountName, config.StorageAccountKey, config.CheckpointsTableName, config.CheckpointsTableTimeout)
if err != nil {
return nil, err
}

return &EventACKer{
active: atomic.NewInt(0),
wg: &sync.WaitGroup{},
config: config,
checkpointTable: ct,
}, nil
}

// ACKEvents receives callbacks from the publisher for every event that is
// published. It persists the record number of the last event in each
func (a *EventACKer) ACKLastEvent(data []interface{}) {

for _, d := range data {

logp.Info("Acking event %v", d)
msg := d.(common.MapStr)
batch_complete, err := msg.GetValue("batch_complete")
if err != nil {
logp.Error(err)
continue
}

if batch_complete.(bool) {
logp.Info("Batch complete: %v", batch_complete)

index, _ := msg.GetValue("index")
partitionKey, _ := msg.GetValue("partitionKey")
rowKey, _ := msg.GetValue("rowKey")
etag, _ := msg.GetValue("etag")

a.checkpointTable.UpdateCheckpoint(partitionKey.(string), rowKey.(string), etag.(string), index.(int64))
}

}
}

func (a *EventACKer) ACKCount(i int) {
logp.Info("Acking %d out of %d active events.", i, a.Active())
a.active.Add(-1 * i)
a.wg.Add(-1 * i)
}

// Wait waits for all events to be ACKed or for the context to be done.
func (a *EventACKer) Wait(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
a.wg.Wait()
}()
<-ctx.Done()
}

// Add adds to the number of active events.
func (a *EventACKer) Add(delta int) {
a.active.Add(delta)
a.wg.Add(delta)
}

// Active returns the number of active events (published but not yet ACKed).
func (a *EventACKer) Active() int {
return a.active.Load()
}
Loading

0 comments on commit e063017

Please sign in to comment.