Skip to content

Commit

Permalink
Merge pull request #34 from airbnb/jacknagz-refactor-streampayload-class
Browse files Browse the repository at this point in the history
Refactor StreamPayload, Parsers, Testing, and more
  • Loading branch information
jacknagz authored Feb 14, 2017
2 parents c60e8cf + f1daa05 commit 586b385
Show file tree
Hide file tree
Showing 28 changed files with 1,030 additions and 626 deletions.
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Sphinx build directory
build
docs/build

# PYC files
# Compiled Python files
*.pyc

# Terraform files
Expand All @@ -14,3 +14,6 @@ Thumbs.db
.DS_Store
*.swp
terminal.glue

# nose coverage file
.coverage
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ before_install:
install:
- pip install -r requirements.txt
script:
- nosetests -v -s test/unit/
- ./test/scripts/unit_tests.sh
- ./test/scripts/integration_test_kinesis.sh
1 change: 1 addition & 0 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ When writing commit messages, make sure to prefix with one of the following tags
[core] # changes with core stream_alert classes used across both functions
[testing] # changes with testing infrastructure or processes
[setup] # StreamAlert development setup changes
[config] # stream_alert config changes

The first line of your commit message should be short. Use newlines to explain further::

Expand Down
22 changes: 11 additions & 11 deletions conf/sample_logs.json → conf/logs.json
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
/*
This is a sample! Copy and rename this file to `logs.json` in the same folder.
Below you will find a sample log for each parser type.
*/
{
"json_log_name": {
"json_log": {
"schema": {
"name": "string",
"host": "integer",
"host": "string",
"data": {
"time": "string"
}
},
"parser": "json"
},
"syslog_log_name": {
"syslog_log": {
"schema": {
"timestamp": "string",
"host": "string",
Expand All @@ -22,18 +18,22 @@ Below you will find a sample log for each parser type.
},
"parser": "syslog"
},
"csv_log_name": {
"csv_log": {
"schema": {
"date": "string",
"time": "integer",
"host": "string",
"message": "string"
"message": "string",
"source": "string"
},
"parser": "csv",
"hints": {
"message": ["*keyword*"]
"source": [
"cluster *"
]
}
},
"kv_log_name": {
"kv_log": {
"schema": {
"type": "string",
"msg": "string",
Expand Down
29 changes: 0 additions & 29 deletions conf/sample_sources.json

This file was deleted.

24 changes: 24 additions & 0 deletions conf/sources.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"kinesis": {
"prefix_cluster1_stream_alert_kinesis": {
"logs": [
"json_log",
"syslog_log",
"kv_log",
"csv_log"
]
},
"prefix_cluster2_stream_alert_kinesis": {
"logs": [
"json_log"
]
}
},
"s3": {
"my-s3-bucket-id": {
"logs": [
"syslog_log"
]
}
}
}
22 changes: 15 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import os

from stream_alert.config import load_config, load_env
from stream_alert.classifier import StreamPayload, StreamPayloadHelpers
from stream_alert.classifier import StreamPayload, StreamClassifier
from stream_alert.pre_parsers import StreamPreParsers
from stream_alert.rules_engine import StreamRules
from stream_alert.sink import StreamSink

Expand Down Expand Up @@ -51,23 +52,30 @@ def handler(event, context):

config = load_config()
env = load_env(context)
# process_alerts(event['Records'])
alerts_to_send = []

# TODO(jack): Move this into classification
for record in event.get('Records'):
payload = StreamPayload(raw_record=record, config=config, env=env)
payload.map_source()
payload = StreamPayload(raw_record=record)
classifier = StreamClassifier(config=config)
classifier.map_source(payload)
# If the kinesis stream or s3 bucket is not in our config,
# go onto the next record.
if not payload.valid_source:
continue

if payload.service == 's3':
s3_file_lines = StreamPayloadHelpers.parse_s3_object(payload.raw_record)
s3_file_lines = StreamPreParsers.pre_parse_s3(payload.raw_record)
for line in s3_file_lines:
data = line.rstrip()
payload.refresh_record(data)
payload.classify_record(data)
classifier.classify_record(payload, data)
process_alerts(payload, alerts_to_send)

elif payload.service == 'kinesis':
data = StreamPayloadHelpers.pre_parse_kinesis(payload.raw_record)
payload.classify_record(data)
data = StreamPreParsers.pre_parse_kinesis(payload.raw_record)
classifier.classify_record(payload, data)
process_alerts(payload, alerts_to_send)

if alerts_to_send:
Expand Down
35 changes: 35 additions & 0 deletions rules/sample_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,38 @@ def invalid_subnet_rule(rec):
def rule_func(rec):
"""Description"""
return True


@rule('sample_json_rule',
logs=['json_log'],
matchers=[],
outputs=['s3'])
def sample_json_rule(rec):
return rec['host'] == 'test-host-1'


@rule('sample_syslog_rule',
logs=['syslog_log'],
matchers=[],
outputs=['pagerduty'])
def sample_syslog_rule(rec):
return rec['application'] == 'sudo'


@rule('sample_csv_rule',
logs=['csv_log'],
matchers=[],
outputs=['s3'])
def sample_csv_rule(rec):
return rec['host'] == 'test-host-2'


@rule('sample_kv_rule',
logs=['kv_log'],
matchers=[],
outputs=['s3'])
def sample_kv_rule(rec):
return (
rec['msg'] == 'fatal' and
rec['uid'] == 100
)
2 changes: 1 addition & 1 deletion stream_alert/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.0.0'
__version__ = '1.1.0'
Loading

0 comments on commit 586b385

Please sign in to comment.