Skip to content

Commit

Permalink
Tidied up config
Browse files Browse the repository at this point in the history
  • Loading branch information
lstyles committed Feb 28, 2020
1 parent e063017 commit 5e42f05
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 34 deletions.
40 changes: 39 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,45 @@

Welcome to nsgflowlogsbeat, an Azure NSG Flow Logs shipper for Logstash and Elasticsearch.

## Getting ready
## Usage

Configuration:

```
############################# {Beat} ######################################
nsgflowlogsbeat:
# Defines how often storage account is scanned for changes
#scan_frequency: 30s
# Storage account name where NSG logs are stored
storage_account_name: '<storage account name>'
# Storage account key
storage_account_key: '<storage account key>'
# Name of the storage account container
#container_name: 'insights-logs-networksecuritygroupflowevent'
# Checkpoints table name
#checkpoints_table_name: 'nsgflowlogsbeat_checkpoints'
# Storage table operations timeout in seconds
#checkpoints_table_timeout: 15
# Ignores NSG logs older than specified time offset
#ignore_older: 10m
# Number of workers
#workers: 4
```

Running the beat:

```
./nsgflowlogsbeat -c nsgflowlogsbeat.yml -e
```

## Development

To set up a working development environment follow the official [beat developer guide](https://www.elastic.co/guide/en/beats/devguide/7.6/newbeat-getting-ready.html) and specifically [Setting Up Your Dev Environment](https://www.elastic.co/guide/en/beats/devguide/7.6/beats-contributing.html#setting-up-dev-environment).

Expand Down
16 changes: 9 additions & 7 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ nsgflowlogsbeat:
#scan_frequency: 30s

# Storage account name where NSG logs are stored
storage_account_name: ''
storage_account_name: '<storage account name>'
# Storage account key
storage_account_key: ''
storage_account_key: '<storage account key>'

# Name of the storage account container
container_name: ''
#container_name: 'insights-logs-networksecuritygroupflowevent'

# Checkpoints table name
checkpoints_table_name: 'insights-logs-networksecuritygroupflowevent'
# Storage table operations timeout in MS
checkpoints_table_timeout: 15
#checkpoints_table_name: 'nsgflowlogsbeat_checkpoints'

# Storage table operations timeout in seconds
#checkpoints_table_timeout: 15

# Ignores NSG logs older than specified time offset
ignore_older: 0
#ignore_older: 10m

# Number of workers
#workers: 4
1 change: 1 addition & 0 deletions checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Checkpoint struct {
Length int64
}

// NewCheckpoint creates a new instance of Checkpoint
func NewCheckpoint(partitionKey, rowKey string) *Checkpoint {
return &Checkpoint{
PartitionKey: partitionKey,
Expand Down
26 changes: 16 additions & 10 deletions checkpoint/checkpoint_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,20 @@ func (ct *Table) GetCheckpoint(partitionKey, rowKey string) (*Checkpoint, error)
Index: index,
}
return r, nil
} else {
// Checkpoint doesn't exist, create and return
c := NewCheckpoint(partitionKey, rowKey)

return c, nil
}

// Checkpoint doesn't exist, create and return
c := NewCheckpoint(partitionKey, rowKey)

return c, nil
}

// CreateOrUpdateCheckpoint creates or updates checkpoint in checkpoints table
func (ct *Table) CreateOrUpdateCheckpoint(checkpoint *Checkpoint) error {

logp.Debug(
"checkpoint_table", "Creating or updating checkpoint. PartitionKey: %s, RowKey: %s, ETag: %s, Index: %v.",
"checkpoint_table",
"Creating or updating checkpoint. PartitionKey: %s, RowKey: %s, ETag: %s, Index: %v, Length: %v.",
checkpoint.PartitionKey,
checkpoint.RowKey,
checkpoint.ETag,
Expand Down Expand Up @@ -126,16 +127,21 @@ func (ct *Table) CreateOrUpdateCheckpoint(checkpoint *Checkpoint) error {
return nil
}

// UpdateCheckpoint updates etag and index on checkpoint identified by partition key and row key.
func (ct *Table) UpdateCheckpoint(partitionKey, rowKey, etag string, index int64) {

logp.Info("Updating checkpoint Partition Key: %s, Row Key: %s, ETag: %s, Index: %d", partitionKey, rowKey, etag, index)
logp.Debug(
"checkpoint_table",
"Updating checkpoint. Partition Key: %s, Row Key: %s, ETag: %s, Index: %d",
partitionKey,
rowKey,
etag,
index,
)
c, err := ct.GetCheckpoint(partitionKey, rowKey)
if err != nil {
logp.Error(err)
}
if c.Index == 0 {
logp.Warn("This should never happen")
}

c.ETag = etag
c.Index = index
Expand Down
4 changes: 1 addition & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Config struct {
CheckpointsTableTimeout uint `config:"checkpoints_table_timeout"`
IgnoreOlder time.Duration `config:"ignore_older"`
Workers int `config:"workers"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
}

// DefaultConfig - default configuration settings
Expand All @@ -29,9 +28,8 @@ var DefaultConfig = Config{
ContainerName: "insights-logs-networksecuritygroupflowevent",
CheckpointsTableName: "nsgflowlogsbeat_checkpoints",
CheckpointsTableTimeout: 15,
IgnoreOlder: 10 * time.Second,
IgnoreOlder: 10 * time.Minute,
Workers: 4,
ShutdownTimeout: 15 * time.Second,
}

// Validate validates the configuration and returns an error describing all problems or nil if there are none
Expand Down
11 changes: 5 additions & 6 deletions nsgflowlogs/log_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewLogProcessor(b *beat.Beat, c *config.Config, done chan struct{}) (*LogPr

func (lp *LogProcessor) Process(done chan struct{}) {

logp.Info("Processing")
logp.Info("Processing NSG flow logs")

// Get list of blobs to process
var wg sync.WaitGroup
Expand Down Expand Up @@ -120,7 +120,6 @@ func (lp *LogProcessor) ScanForUpdatedBlobs(wg *sync.WaitGroup) {
for i, blob := range *blobsToProcess {
i++

logp.Info("Getting checkpoint for blob %s %s %d", blob.PartitionKey, blob.RowKey, i)
c, err := lp.checkpointTable.GetCheckpoint(blob.PartitionKey, blob.RowKey)
if err != nil {
logp.Error(err)
Expand All @@ -131,7 +130,7 @@ func (lp *LogProcessor) ScanForUpdatedBlobs(wg *sync.WaitGroup) {

c.Length = blob.Length
if finishFile {
c.Index = blob.Length
logp.Info("This happened...")
}

logp.Info("Blob length is %d. Updating checkpoint", c.Length)
Expand All @@ -141,9 +140,9 @@ func (lp *LogProcessor) ScanForUpdatedBlobs(wg *sync.WaitGroup) {
continue
}

if finishFile {
continue
}
//if finishFile {
// continue
//}

if blob.ETag == c.ETag {
logp.Info("Blob ETag hasn't changed. Skipping.")
Expand Down
16 changes: 9 additions & 7 deletions nsgflowlogsbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ nsgflowlogsbeat:
#scan_frequency: 30s

# Storage account name where NSG logs are stored
storage_account_name: ''
storage_account_name: '<storage account name>'
# Storage account key
storage_account_key: ''
storage_account_key: '<storage account key>'

# Name of the storage account container
container_name: ''
#container_name: 'insights-logs-networksecuritygroupflowevent'

# Checkpoints table name
checkpoints_table_name: 'insights-logs-networksecuritygroupflowevent'
# Storage table operations timeout in MS
checkpoints_table_timeout: 15
#checkpoints_table_name: 'nsgflowlogsbeat_checkpoints'

# Storage table operations timeout in seconds
#checkpoints_table_timeout: 15

# Ignores NSG logs older than specified time offset
ignore_older: 0
#ignore_older: 10m

# Number of workers
#workers: 4
Expand Down

0 comments on commit 5e42f05

Please sign in to comment.