Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to TCP and UDP protocols #224

Open
wants to merge 45 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a4ea276
separated unix socket logic from generic logic in the server class in…
Jun 9, 2021
534ba48
Separated logic between tasks and Sessions, the separation is only ap…
ThibaudCartegnie Jun 14, 2021
9da70fd
Fixed potential corruption :
ThibaudCartegnie Jun 14, 2021
0149aee
Separated Sessions into UnixSession and TcpSession
ThibaudCartegnie Jun 15, 2021
60f0efc
WIP : First draft of DarwinPacket struct
ThibaudCartegnie Jun 18, 2021
551209b
Finished integration of DarwinPacket in session
ThibaudCartegnie Jun 18, 2021
3c0be86
Modified DarwinPacket to be more OOP-like
ThibaudCartegnie Jun 21, 2021
e8426bf
Re-wired some things in the tasks
ThibaudCartegnie Jun 21, 2021
6072b7f
Put back structs in c header
ThibaudCartegnie Jun 22, 2021
f9b8b0f
Adapted all compiled filters
ThibaudCartegnie Jun 23, 2021
de6f710
Added parsing of tcp/udp
ThibaudCartegnie Jun 25, 2021
d957e01
Adapted TcpServer and Config for ip addresses
ThibaudCartegnie Jun 25, 2021
711dc9d
DarwinPacket: fixed issues with the serialization
ThibaudCartegnie Jun 29, 2021
3a46fe0
Separated NextFilterConnector in :
ThibaudCartegnie Jun 30, 2021
070660b
Moved all network related objects to network folder
ThibaudCartegnie Jun 30, 2021
3be3f48
Fixed Set of asio buffer in ANextFilterConnector
ThibaudCartegnie Jun 30, 2021
2436f5d
Fixed memory issues:
ThibaudCartegnie Jul 1, 2021
80955db
Fixed problem with the manager
ThibaudCartegnie Jul 1, 2021
55dfa07
Fixed manager (failed previous commit)
ThibaudCartegnie Jul 1, 2021
98a3039
RedisManager: modified Rate Limit connection logic
ThibaudCartegnie Jul 2, 2021
36e2ae0
Adapted ATask, Asession and DarwinPacket
ThibaudCartegnie Jul 6, 2021
b8cbf63
Adapted tests to TCP :
ThibaudCartegnie Jul 6, 2021
00b207d
Added tests for TCP connections
ThibaudCartegnie Jul 8, 2021
5139e2a
Added UDP protocol to darwin
ThibaudCartegnie Jul 16, 2021
79e7036
Merge branch 'dev' into support_tcp_udp
ThibaudCartegnie Jul 16, 2021
b8d24db
fixed few issues with the manager
ThibaudCartegnie Jul 20, 2021
9fabcd9
Added comments documentation
ThibaudCartegnie Jul 20, 2021
47e58b7
Fixed method names
ThibaudCartegnie Jul 21, 2021
3daf60c
Few modification of NextFilter handling
ThibaudCartegnie Jul 22, 2021
06f9cbf
Few fixes when passing information to next filters
ThibaudCartegnie Jul 25, 2021
8694a44
Added a tests for multi filters using the manager
ThibaudCartegnie Jul 26, 2021
7b61fdd
Fixed issue with UUIDs
ThibaudCartegnie Jul 27, 2021
aa1b53c
Fixed problems with next filters, cleaned tests
ThibaudCartegnie Jul 27, 2021
aa05f12
Modified config of next filter for easier configuration
ThibaudCartegnie Jul 27, 2021
b088029
Modified protocol.h
ThibaudCartegnie Jul 28, 2021
13f4ab3
Fixed clang/gcc warnings
ThibaudCartegnie Jul 29, 2021
b3478ed
Fixed compilation issues on hardenedBSD
ThibaudCartegnie Jul 29, 2021
15ef861
Fixed last issue with UDP NextFilterConnector
ThibaudCartegnie Jul 30, 2021
016d80f
Added Thread Pool Cpp dependency
ThibaudCartegnie Jul 30, 2021
2206178
Fixed github TBE comments
ThibaudCartegnie Jan 10, 2022
fd00830
Fixed last few issues with comments
ThibaudCartegnie Jan 28, 2022
2aab973
Merge branch 'dev' into support_tcp_udp
ThibaudCartegnie Jan 31, 2022
95cea44
Adapted dga for new usage of darwin api
ThibaudCartegnie Jan 31, 2022
1755c38
Adapted dga for darwin api (forgot file)
ThibaudCartegnie Jan 31, 2022
c6352dd
MANAGER:: Removed todo
ThibaudCartegnie Apr 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ if (NOT DEFINED FILTER)
CONTENT_INSPECTION
BUFFER
YARA
TEST
ThibaudCartegnie marked this conversation as resolved.
Show resolved Hide resolved
)
else (NOT DEFINED FILTER)
set(
Expand Down Expand Up @@ -72,12 +73,15 @@ set(

include_directories(
toolkit
toolkit/thread-pool-cpp/include/
samples/base
samples/base/network
samples/
${HIREDIS_INCLUDE_DIRS}
${Boost_INCLUDE_DIRS}
)

add_compile_options(-Wno-unknown-pragmas)
frikilax marked this conversation as resolved.
Show resolved Hide resolved

####################
# CORE SOURCES #
Expand All @@ -96,9 +100,22 @@ set(
samples/base/ThreadGroup.cpp samples/base/ThreadGroup.hpp
samples/base/AlertManager.cpp samples/base/AlertManager.hpp

samples/base/Server.cpp samples/base/Server.hpp
samples/base/DarwinPacket.cpp samples/base/DarwinPacket.hpp
samples/base/network/ANextFilterConnector.cpp samples/base/network/ANextFilterConnector.hpp
samples/base/network/UnixNextFilterConnector.cpp samples/base/network/UnixNextFilterConnector.hpp
samples/base/network/TcpNextFilterConnector.cpp samples/base/network/TcpNextFilterConnector.hpp
samples/base/network/UdpNextFilterConnector.cpp samples/base/network/UdpNextFilterConnector.hpp

samples/base/network/AServer.cpp samples/base/network/AServer.hpp
samples/base/network/UnixServer.cpp samples/base/network/UnixServer.hpp
samples/base/network/TcpServer.cpp samples/base/network/TcpServer.hpp
samples/base/network/UdpServer.cpp samples/base/network/UdpServer.hpp
samples/base/Manager.cpp samples/base/Manager.hpp
samples/base/Session.cpp samples/base/Session.hpp
samples/base/network/ASession.cpp samples/base/network/ASession.hpp
samples/base/network/UnixSession.cpp samples/base/network/UnixSession.hpp
samples/base/network/TcpSession.cpp samples/base/network/TcpSession.hpp
samples/base/network/UdpSession.cpp samples/base/network/UdpSession.hpp
samples/base/ATask.cpp samples/base/ATask.hpp

toolkit/Network.cpp toolkit/Network.hpp
toolkit/Validators.cpp toolkit/Validators.hpp
Expand Down
28 changes: 26 additions & 2 deletions manager/HeartBeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,46 @@
__doc__ = 'The heartbeat functions and main class'

from os import kill, access, F_OK

import socket

class HeartBeat:
"""
This class is in charge of the health check for the filters.
"""

@staticmethod
def check_socket(file):
def check_unix_socket(file):
try:
if access(file, F_OK):
return True
return False
except Exception:
return False

def check_network_socket(filter_network):
if filter_network['socket_type'] == 'UNIX':
return HeartBeat.check_unix_socket(filter_network['address_path'])

elif filter_network['socket_type'] == 'TCP':
# parse ip address field
splitted_address = filter_network['address_path'].rsplit(':', 1)
if '[' in splitted_address[0]: # ipv6 address
host_address = splitted_address[0][1:-1]
else:
host_address = splitted_address[0]
host_port = int(splitted_address[1])
#test connection to socket
if ':' in host_address: # ipv6
s = socket.socket(socket.AF_INET6)
else:
s = socket.socket(socket.AF_INET)
s.settimeout(2)
res = s.connect_ex((host_address,host_port))
s.close()
return res == 0
else: #No check for UDP
return True

@staticmethod
def check_pid_file(file):
"""
Expand Down
32 changes: 22 additions & 10 deletions manager/Services.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ def start_all(self):
self.stop_one(filter, no_lock=True)
self.clean_one(filter, no_lock=True)
else:
logger.debug("Linking UNIX sockets...")
filter['status'] = psutil.STATUS_RUNNING
call(['ln', '-s', filter['socket'], filter['socket_link']])
if filter['network']['socket_type'] == 'UNIX':
logger.debug("Linking UNIX sockets...")
filter['status'] = psutil.STATUS_RUNNING
call(['ln', '-s', filter['socket'], filter['socket_link']])

def rotate_logs_all(self):
"""
Expand Down Expand Up @@ -89,6 +90,14 @@ def _build_cmd(filt):

cmd = [filt['exec_path']]

# Flags MUST be before positional arguments as the parsing on HardenedBSD is not done on all the arguments
# On BSD getopt stops at the first argument which is not in the specified flags
if filt['network']['socket_type'] == 'UDP':
cmd.append('-u')

if filt['next_filter_network']['socket_type'] == 'UDP':
cmd.append('-v')

try:
if filt['log_level'] not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "DEVELOPER"]:
logger.warning(
Expand All @@ -99,15 +108,15 @@ def _build_cmd(filt):
cmd.append(filt['log_level'])
except KeyError:
pass

cmd += [
filt['name'],
filt['socket'],
filt['network']['address_path'],
filt['config_file'],
filt['monitoring'],
filt['pid_file'],
filt['output'],
filt['next_filter_unix_socket'],
filt['next_filter_network']['address_path'],
str(filt['nb_thread']),
str(filt['cache_size']),
str(filt['threshold']),
Expand Down Expand Up @@ -369,6 +378,9 @@ def update(self, names, prefix, suffix):
prefix=prefix, suffix=suffix,
name=n, extension=new[n]['extension']
)
# TODO Handle TCP case?
if new[n]['network']['socket_type'] == 'UNIX':
new[n]['network']['address_path'] = new[n]['socket']

new[n]['monitoring'] = '{prefix}/sockets{suffix}/{name}_mon{extension}.sock'.format(
prefix=prefix, suffix=suffix,
Expand Down Expand Up @@ -531,7 +543,7 @@ def _wait_process_ready(content):
status = "Process not running"
continue

if not HeartBeat.check_socket(content['monitoring']):
if not HeartBeat.check_unix_socket(content['monitoring']):
status = "Monitoring socket not created"
continue

Expand All @@ -550,7 +562,7 @@ def _wait_process_ready(content):
continue

if "running" in resp:
if not HeartBeat.check_socket(content['socket']):
if not HeartBeat.check_network_socket(content['network']):
status = "Main socket not created"
continue
else:
Expand All @@ -574,10 +586,10 @@ def hb_one(self, filter):
if not HeartBeat.check_process(pid):
raise Exception('Process not running')

if not HeartBeat.check_socket(filter['socket']):
if not HeartBeat.check_network_socket(filter['network']):
raise Exception('Socket not accessible')

if not HeartBeat.check_socket(filter['monitoring']):
if not HeartBeat.check_unix_socket(filter['monitoring']):
raise Exception('Monitoring socket not accessible')

except Exception as e:
Expand Down
60 changes: 50 additions & 10 deletions manager/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,35 @@ def set_defaults(validator, properties, instance, schema):
"enum": ["NONE", "RAW", "LOG", "PARSED"],
"default": "NONE"
},
"next_filter": {"type": "string"},
"next_filter": {
"OneOf":[
{"type": "string"},
{"type": "object",
"properties": {
"socket_type":{
"type":"string",
"enum": ["NONE", "UNIX", "TCP", "UDP"],
"default":"NONE"
},
"address_path": {"type": "string"}
}
}
]
},
"threshold": {
"type": "integer",
"default": 100
},
"network": {
"type": "object",
"properties": {
"socket_type":{
"type":"string",
"enum": ["UNIX", "TCP", "UDP"],
"default":"UNIX"
},
"address_path": {"type": "string"}
}
}
},
"required": ["name", "exec_path", "config_file"],
Expand Down Expand Up @@ -268,19 +293,34 @@ def complete_filters_conf(prefix, suffix):
filter['failures'] = 0
filter['extension'] = '.1'
filter['pid_file'] = '{prefix}/run{suffix}/{filter}{extension}.pid'.format(prefix=prefix, suffix=suffix, filter=filter['name'], extension=filter['extension'])

if not filter['next_filter']:
filter['next_filter_unix_socket'] = 'no'
else:
filter['next_filter_unix_socket'] = '{prefix}/sockets{suffix}/{next_filter}.sock'.format(
prefix=prefix, suffix=suffix,
next_filter=filter['next_filter']
)


filter['socket'] = '{prefix}/sockets{suffix}/{filter}{extension}.sock'.format(prefix=prefix, suffix=suffix, filter=filter['name'], extension=filter['extension'])
filter['socket_link'] = '{prefix}/sockets{suffix}/{filter}.sock'.format(prefix=prefix, suffix=suffix, filter=filter['name'])

if 'network' not in filter:
filter['network'] = { "socket_type":"UNIX", "address_path":filter['socket'] }
elif filter['network']['socket_type'] == "UNIX":
filter['network']['address_path'] = filter['socket']

filter['monitoring'] = '{prefix}/sockets{suffix}/{filter}_mon{extension}.sock'.format(
prefix=prefix, suffix=suffix,
filter=filter['name'], extension=filter['extension']
)

# Next filter is setup with a second loop (we need network information already setup)
for _, filter in filters.items():
if 'next_filter' not in filter or filter['next_filter'] == '':
filter['next_filter_network'] = { "socket_type":"NONE", "address_path":"no" }
else:
if isinstance(filter['next_filter'], str):
#check other filters
modified = False
for _, other_filter in filters.items():
if filter['next_filter'] == other_filter['name']:
filter['next_filter_network'] = other_filter['network']
modified = True
if not modified:
raise ConfParseError("Filter '{}' had next_filter configured to '{}' but it was not found in the configuration"
.format(filter['name'], filter['next_filter']))
else:
filter['next_filter_network'] = filter['next_filter']
2 changes: 1 addition & 1 deletion manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def create_dirs(dirs, prefix, suffix):
for d in dirs:
path = '{}/{}{}'.format(prefix, d, suffix)
if not os.path.exists(path):
os.mkdir(path)
os.makedirs(path, exist_ok=True)

# Argparse
parser = argparse.ArgumentParser()
Expand Down
15 changes: 15 additions & 0 deletions samples/base/AGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@
#include "AGenerator.hpp"
#include "AlertManager.hpp"

AGenerator::AGenerator(size_t nb_task_threads): _threadPool{GetThreadPoolOptions(nb_task_threads)}
{
;
}

tp::ThreadPoolOptions AGenerator::GetThreadPoolOptions(size_t nb_task_threads){
auto opts = tp::ThreadPoolOptions();
opts.setThreadCount(nb_task_threads);
return opts;
}

bool AGenerator::Configure(std::string const& configFile, const std::size_t cache_size) {
DARWIN_LOGGER;
DARWIN_LOG_DEBUG("AGenerator:: Configuring...");
Expand Down Expand Up @@ -116,3 +127,7 @@ bool AGenerator::ReadConfig(const std::string &configuration_file_path) {
conf_file_stream.close();
return true;
}

tp::ThreadPool& AGenerator::GetTaskThreadPool() {
return this->_threadPool;
}
33 changes: 24 additions & 9 deletions samples/base/AGenerator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,27 @@
#include <iostream>
#include <fstream>
#include <string>
#include <thread_pool.hpp>

#include "Session.hpp"
#include "ATask.hpp"
#include "../toolkit/rapidjson/document.h"

class AGenerator {
public:
AGenerator() = default;
AGenerator(size_t nb_task_threads);
virtual ~AGenerator() = default;


// Methods to be implemented by the child
public:
/// Create a new task.
///
/// \param socket The Session's socket.
/// \param manager The Session manager.
/// \return A pointer to a new session.
virtual darwin::session_ptr_t
CreateTask(boost::asio::local::stream_protocol::socket& socket,
darwin::Manager& manager) noexcept = 0;
/// \brief Create a Task object
///
/// \param s a shred pointer to the sessions creating the task
frikilax marked this conversation as resolved.
Show resolved Hide resolved
/// \return std::shared_ptr<darwin::ATask> a shared pointer to the created task
///
virtual std::shared_ptr<darwin::ATask> CreateTask(darwin::session_ptr_t s) noexcept = 0;

virtual bool ConfigureNetworkObject(boost::asio::io_context &context);

protected:
Expand Down Expand Up @@ -59,6 +61,8 @@ class AGenerator {
Configure(std::string const& configFile,
const std::size_t cache_size) final;

virtual tp::ThreadPool& GetTaskThreadPool() final;

private:
/// Open and read the configuration file.
/// Try to load the json format of the configuration.
Expand All @@ -76,7 +80,18 @@ class AGenerator {
virtual bool ExtractCustomAlertingTags(const rapidjson::Document &configuration,
std::string& tags);

///
/// \brief Get the configuration object of the task thread pool
///
/// \param nb_task_threads number of workers (threads) to spawn for the tasks
/// \return tp::ThreadPoolOptions
///
static tp::ThreadPoolOptions GetThreadPoolOptions(size_t nb_task_threads);

protected:
std::shared_ptr<boost::compute::detail::lru_cache<xxh::hash64_t, unsigned int>> _cache; //!< The cache for already processed request
std::mutex _cache_mutex;

tp::ThreadPool _threadPool;

};
Loading