Skip to content

Commit

Permalink
merge asset_clust (#26)
Browse files Browse the repository at this point in the history
Merge Asset Clustering project on public dataset into the github morpheus-experimental repo

Authors:
  - Vem Avinash (https://github.com/avinashvem)
  - https://github.com/avem-nv

Approvers:
  - https://github.com/raykallen
  - https://github.com/gbatmaz

URL: #26
  • Loading branch information
avinashvem authored Jan 23, 2023
1 parent af035ac commit 21e3917
Show file tree
Hide file tree
Showing 12 changed files with 36,888 additions and 0 deletions.
70 changes: 70 additions & 0 deletions asset-clustering/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
## Asset Clustering using Windows Event Logs

## Use Case
Cluster assets into various groups based on Windows Event Logs data.

### Version
1.0

### Model Overview
The model is a clustering algorithm to assign each host present in the dataset to a cluster based on aggregated and derived features from Windows Event Logs of that particular host.

### Model Architecture
There are two clustering algorithms available:
- DBSCAN which stands for Density-Based Spatial Clustering of Applications with Noise.
- KMeans
Input features to the model are derived from the windows event logs wherein various facets of login events, type of logon event, number of usernames associated with a host etc.., are aggregated.

### Requirements
An environment based on __[Rapids](https://rapids.ai/pip.html)__ is required to run the scripts and python notebook provided. Also on top of that the additional requirements can be installed into the environment via the supplementary requirements file provided.

```bash
pip install -r requirements.txt
```

### Training

#### Training data
In this project we use the publicly available __[**Unified Host and Network Data Set**](https://csr.lanl.gov/data/2017/)__[1] dataset from the Advanced Research team in Cyber Systems of the Los Alamos National Laboratory (lanl) to demonstrate various aspects involved in clustering assets in a given network.
The lanl dataset consists of netflow and windows event log (wls) files for 90 days. For this project we focus solely on the windows event log files which use the naming convention wls_day-01.bz2, wls_day-02.bz2,..., wls_day-90.bz2. The training data uses first ten days of data i.e. wls_day-01.bz2,..., wls_day-10.bz2. Note that for purposes of scale and quick reproducibility, we use only first ten days of data to experiment. One can easily use more data by changing the input file suffix. Refer to experiment.ipynb for more details. These ten days' data is pre-processed and the features are aggregated. The resulting dataset contains 14044 hosts and is saved in datasets/host_agg_data_day-01_day-10.csv.


#### Training parameters
The following parameters are chosen in training for the DBSCAN algorithm:
- $\epsilon=0.0005$
- *Manhattan distance* as the metric i.e. Minkowski distance with $p=1$.


#### Model accuracy
clusters found = 9 (+1 cluster for for the noisy samples)
Silhouette score = 0.975

#### Training script

To train the model run the following script under working directory.
```bash
cd ${MORPHEUS_EXPERIMENTAL_ROOT}/asset-clustering/training-tuning-inference
# Run training script and save models
python train.py --model dbscan
```
This saves trained model files under `../models` directory. Then the inference script can load the models for future inferences.

### Inference Input

```
python inference.py --model dbscan
```
When the above command is executed, dbscan clustering is performed on the windows event logs data from days 11 to 15. This data is pre-processed and aggregated to a validation dataset which can be found at datasets/host_agg_data_day-11_day-15.csv. This contains a total of 12606 hosts. One can similarly run inference using KMeans clustering model.
```
python inference.py --model kmeans
```


### Inference Output
The clustering of the 12606 hosts is performed and the count of each cluster is printed to stdout.

### Ethical considerations
N/A

### References
[1]. M. Turcotte, A. Kent and C. Hash, “Unified Host and Network Data Set”, in Data Science for Cyber-Security. November 2018, 1-22
8,110 changes: 8,110 additions & 0 deletions asset-clustering/datasets/host_agg_data_day-01_day-01.csv

Large diffs are not rendered by default.

14,047 changes: 14,047 additions & 0 deletions asset-clustering/datasets/host_agg_data_day-01_day-10.csv

Large diffs are not rendered by default.

12,611 changes: 12,611 additions & 0 deletions asset-clustering/datasets/host_agg_data_day-11_day-15.csv

Large diffs are not rendered by default.

Binary file added asset-clustering/models/dbscan_eps0.0005.pkl
Binary file not shown.
Binary file added asset-clustering/models/kmeans_16clusts.pkl
Binary file not shown.
220 changes: 220 additions & 0 deletions asset-clustering/training-tuning-inference/data_preprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import cudf
import bz2
import logging
import time
import click
import numpy as np
from utils import *
from collections import defaultdict
from itertools import chain

VALID_LOGON_TYPES = {0, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12}

# List of tuples (EventID, feature name), where a feature name denotes
# frequency of corresp. EventID, by asset appearing in LogHost field.
EVENTID_CNTFEAT = [
(4624, 'total_logins_cnt'),
(4625, 'accnt_fail_logon_cnt'),
(4634, 'total_logoff_cnt'),
(4647, 'total_user_initi_logoff_cnt'),
(4648, 'logon_explicit_cred_frm_cnt'),
(4672, 'spl_pvlgs'),
(4776, 'domain_ctr_validate_cnt'),
(4802, 'scrnsaver_invok_cnt'),
(4803, 'scrnsaver_dismiss_cnt')]
# (4768, 'TGT_req_cnt'), (4769, 'TGS_req_cnt')
# 4768 & 4769 not used since 100% of LogHost for 4768,4769 is ActiveDirectory

# EVENTIDFORSOURCE_CNTFEAT & EVENTIDFORDEST_CNTFEAT are similar to EVENTID_CNTFEAT
# except that they corresp. to frequency of an EventID, by asset, appearing in
# Source & Destination fields resp.
EVENTIDFORSOURCE_CNTFEAT = [
(4624, 'total_logins_src_cnt'),
(4625, 'accnt_fail_logon_src_cnt'),
(4768, 'TGT_req_src_cnt'),
(4769, 'TGS_req_src_cnt'),
(4776, 'domain_ctr_validate_src_cnt')
]
EVENTIDFORDEST_CNTFEAT = [(4648, 'logon_explicit_cred_to_cnt')]


def host_aggr(df, host, uniq_values_dict, count_cols):
"""
Args:
df: cudf DataFrame with the data read from windows event logs file
host: DataFrame of hosts seen so far, with aggregated features
uniq_values_dict: Dictionary with (k,v) pairs being (field, dict_)
where dict_ represents, for the given 'field', a dictionary of (k,v)
pairs of (host, Set of unique values in 'field' for that host)
count_cols: List of features that represents counts
Returns:
host: Updated host with data from df
uniq_values_dict: Updated uniq_values_dict with data from df
"""

newhosts = set(df['LogHost'].to_pandas()).union(set(df['Source'].to_pandas()))
newhosts = newhosts - set(host.index.to_pandas())
newhosts.discard(None)

frac_cols = ['uname_other_compacnt_login_frac','uname_that_compacnt_login_frac']
newhost = cudf.DataFrame({'LogHost': newhosts}).set_index('LogHost')
newhost[count_cols] = 0
newhost[frac_cols] = 0.0

if host.shape[0] == 0:
host = newhost.copy()
else:
host = cudf.concat([host, newhost], axis=0)

numrows = df.shape[0]
# Remove rows if Both SOURCE & DESTINATION neq NA since LogHost is meaningless
# in a directed authentication event with both SOURCE & DESTINATION present.
df = df.loc[(df['Source'].isna()) | (df['Destination'].isna())]
if numrows < df.shape[0]:
logging.debug("Filtering Rows if SOURCE & DESTINATION neq NA")
logging.debug("Removed {} ROWS".format(numrows-df.shape[0]))

host = compute_logins_with_loghostuname(df, host, login_eventids=[4624,])
host = logon_types(df, host, VALID_LOGON_TYPES)
host, uniq_values_dict = compute_diff_source_logon_cnt(df, host, uniq_values_dict)
host, uniq_values_dict = compute_username_cnt(df, host, uniq_values_dict)
host, uniq_values_dict = compute_username_domain_cnt(df, host, uniq_values_dict)

for evtuple in EVENTID_CNTFEAT:
evid, ev_str = evtuple
host = compute_eventid_cnt(df , evid, ev_str, host)

for evtuple in EVENTIDFORSOURCE_CNTFEAT:
evid, ev_str = evtuple
host = compute_eventid_cnt_source(df , evid, ev_str, host)
host[count_cols] = host[count_cols].fillna(value=0, inplace=False)
host['uname_other_compacnt_login_frac'] = host['uname_other_compacnt_login_cnt']/host['total_logins_cnt']
host['uname_other_compacnt_login_frac'] = host['uname_other_compacnt_login_frac'].replace(np.inf, -1.)

host['uname_that_compacnt_login_frac'] = host['uname_that_compacnt_login_cnt']/host['total_logins_cnt']
host['uname_that_compacnt_login_frac'] = host['uname_that_compacnt_login_frac'].replace(np.inf, -1.)

return host, uniq_values_dict


def initialize_hostdf():
"""
Initializes and returns following variables.
Returns:
host: empty cudf DataFrame representing assets/hosts
uniq_values_dict: dictionary with fields for which unique values
encountered need to be tracked
count_cols: features that are counts; computed by counting number
of occurrences
"""

count_cols = ['UserName_cnt', 'DomainName_cnt', 'Source_cnt']
count_cols += [x[1] for x in chain(EVENTID_CNTFEAT, EVENTIDFORSOURCE_CNTFEAT, EVENTIDFORDEST_CNTFEAT)]
count_cols += ['logon_type_{}'.format(int(x)) for x in VALID_LOGON_TYPES]
count_cols += ['logon_type_frm_{}'.format(int(x)) for x in VALID_LOGON_TYPES]

count_cols += ['uname_other_compacnt_login_cnt', 'uname_that_compacnt_login_cnt']
host = cudf.DataFrame(columns=['LogHost']).set_index('LogHost')

uniq_values_dict = {
'Sources': defaultdict(set),
'Unames': defaultdict(set),
'UserDomains': defaultdict(set)
}
return host, uniq_values_dict, count_cols


def read_process_data(wls_files, readsize=1000000, max_lines=1e15):
"""
Reads each file from input list, iteratively block by block, does feature
computation and aggregates derived features by host.
Args:
wls_files (list): List of windows event log files, compressed using bzip2
readsize (int): Number of Bytes of data in each iteration
max_lines (int): Maximum number of lines to read before breaking
Returns:
DataFrame with shape (number of hosts, number of derived features)
"""

host_df, uniq_vals_dict, count_cols = initialize_hostdf()
for wls_fname in wls_files:
residue = b''
decomp = bz2.BZ2Decompressor()
total_lines, iter_, t0 = 0, 0, time.time()

fi = open(wls_fname, 'rb')
for data in iter(lambda: fi.read(readsize), b''):
raw = residue + decomp.decompress(data)
current_block = raw.split(b'\n')
residue = current_block.pop() # last line could be incomplete
df_wls = read_wls(current_block, file_path=False)
host_df, uniq_vals_dict = host_aggr(df_wls, host_df, uniq_vals_dict, count_cols)

total_lines += len(current_block)/1000000
iter_ += 1

if iter_ % 10000 == 0:
proc_speed = 1000.0*total_lines / (time.time() - t0)
logging.info(
'{:.3f}M Lines, {:.2f}K/sec'.format(total_lines, proc_speed))
logging.debug('host shape:{}'.format(hostdf.shape))
if total_lines*1e6 > max_lines:
logging.info("Breaking for loop. total_lines={}>{}".format(total_lines, max_lines))
break
fi.close()
return hostdf


@click.command()
@click.option('--debug', is_flag=True)
@click.option('--data_range', default='day-01-day-01',
help='Range of dates for which wls files need to be read and preprocessed. '\
'For example, data_range=day-01-day_03 reads wls_day-01.bz2, wls_day-02.bz2'\
'and wls_day-03.bz2, preprocess them and prepare a combined dataset.')
def run(**kwargs):
global dataset_path
debug_mode = kwargs['debug']
logging.basicConfig(level=logging.DEBUG, datefmt='%m%d-%H%M',
format='%(asctime)s: %(message)s')
dataset_path = '../datasets/'
ipfile_suffix = kwargs['data_range']
if debug_mode:
max_lines = 5e6
readsize = 32768*32
opfile_suffix = '_{:d}Mlines'.format(int(max_lines / 1e6))
else:
max_lines = 1e15
readsize = 32768*32*30
opfile_suffix = '_' + ipfile_suffix
logger_fname = 'logs/dataprocess_{}.log'.format(ipfile_suffix)
fh = logging.FileHandler(filename=logger_fname, mode='a')
fmt = logging.Formatter('%(asctime)s: %(message)s', datefmt='%m%d-%H%M')
fh.setFormatter(fmt)
logging.getLogger().addHandler(fh)
print("Logging in {}".format(logger_fname))

logging.info("DataProcess for WLS files {}. Read Size:{}MB\n\n".format(
ipfile_suffix, readsize//2**20))
wls_files = get_fnames(dataset_path, ipfile_suffix)
host_df = read_process_data(wls_files, readsize, max_lines)
logging.debug("Number of hosts:{}".format(host_df.shape[0]))
host_df.to_csv(dataset_path + 'aggr/host_agg_data{}.csv'.format(opfile_suffix))


if __name__ == '__main__':

run()
Loading

0 comments on commit 21e3917

Please sign in to comment.