diff --git a/plugins/anomalydetection/README.md b/plugins/anomalydetection/README.md index c2673d4a..39c0996f 100644 --- a/plugins/anomalydetection/README.md +++ b/plugins/anomalydetection/README.md @@ -60,7 +60,9 @@ plugins: { "fields": "%container.id %proc.name %proc.aname[1] %proc.aname[2] %proc.aname[3] %proc.exepath %proc.tty %proc.vpgid.name %proc.sname", # execve, execveat - "event_codes": [293, 331] + "event_codes": [293, 331], + # optional config `reset_timer_ms`, resets the data structure every x milliseconds, here one hour as example + "reset_timer_ms": 3600000 }, { "fields": "%container.id %proc.name %proc.aname[1] %proc.aname[2] %proc.aname[3] %proc.exepath %proc.tty %proc.vpgid.name %proc.sname %fd.name", diff --git a/plugins/anomalydetection/src/num/cms.h b/plugins/anomalydetection/src/num/cms.h index 942175ae..5970124e 100644 --- a/plugins/anomalydetection/src/num/cms.h +++ b/plugins/anomalydetection/src/num/cms.h @@ -15,6 +15,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +#pragma once + #include "xxhash_ext.h" #include @@ -96,6 +98,15 @@ class cms } } + void reset() + { + // Reset data structure + for (uint64_t i = 0; i < d_; ++i) + { + std::fill(sketch[i].get(), sketch[i].get() + w_, static_cast(0)); + } + } + uint64_t hash_XXH3_seed(std::string value, uint64_t seed) const { // using https://raw.githubusercontent.com/Cyan4973/xxHash/v0.8.2/xxhash.h diff --git a/plugins/anomalydetection/src/plugin.cpp b/plugins/anomalydetection/src/plugin.cpp index 2ff02fbb..6b3d3541 100644 --- a/plugins/anomalydetection/src/plugin.cpp +++ b/plugins/anomalydetection/src/plugin.cpp @@ -99,6 +99,10 @@ falcosecurity::init_schema anomalydetection::get_init_schema() "type": "number", "description": "PPME event codes supported by Falco." } + }, + "reset_timer_ms": { + "type": "number", + "description": "The anomaly detection behavior profile timer, in milliseconds (ms), is used to reset the sketch counts." } }, "required": [ @@ -134,16 +138,17 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json) // If used, config JSON schema enforces a minimum of 1 items and 2-d sub-arrays auto gamma_eps_pointer = nlohmann::json::json_pointer("/count_min_sketch/gamma_eps"); + m_gamma_eps.clear(); if (config_json.contains(gamma_eps_pointer) && config_json[gamma_eps_pointer].is_array()) { - int i = 0; + int n = 1; for (const auto& array : config_json[gamma_eps_pointer]) { if (array.is_array() && array.size() == 2) { std::vector sub_array = {array[0].get(), array[1].get()}; log_error("Count min sketch data structure number (" - + std::to_string(i+1) + ") loaded with gamma and eps values (" + + std::to_string(n) + ") loaded with gamma and eps values (" + std::to_string(sub_array[0]) + "," + std::to_string(sub_array[1]) + ") equivalent to sketch dimensions (" @@ -154,15 +159,16 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json) + ") bytes of constant memory allocation on the heap"); m_gamma_eps.emplace_back(sub_array); } - i++; + n++; } } // If used, config JSON schema enforces a minimum of 1 items and 2-d sub-arrays auto rows_cols_pointer = nlohmann::json::json_pointer("/count_min_sketch/rows_cols"); + m_rows_cols.clear(); if (config_json.contains(rows_cols_pointer) && config_json[rows_cols_pointer].is_array()) { - int i = 0; + int n = 1; if (config_json.contains(gamma_eps_pointer) && config_json[gamma_eps_pointer].is_array()) { log_error("[Override Notice] Count min sketch data structures will be overriden with below settings as 'rows_cols' config overrides any previous setting"); @@ -173,7 +179,7 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json) { std::vector sub_array = {array[0].get(), array[1].get()}; log_error("Count min sketch data structure number (" - + std::to_string(i+1) + ") loaded with d and w/buckets values (" + + std::to_string(n) + ") loaded with d and w/buckets values (" + std::to_string(sub_array[0]) + "," + std::to_string(sub_array[1]) + ") equivalent to sketch error probability and relative error tolerances (" @@ -184,7 +190,7 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json) + ") bytes of constant memory allocation on the heap"); m_rows_cols.emplace_back(sub_array); } - i++; + n++; } } @@ -215,6 +221,9 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json) supported_codes_fd_profile.end() ); + m_reset_timers.clear(); + m_behavior_profiles_fields.clear(); + m_behavior_profiles_event_codes.clear(); int n = 1; for (const auto& profile : behavior_profiles) { @@ -260,6 +269,21 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json) } } } + if (profile.contains("reset_timer_ms")) + { + uint64_t interval = profile["reset_timer_ms"].get(); + if (interval > 100) + { + m_reset_timers.emplace_back(interval); + } else + { + m_reset_timers.emplace_back(uint64_t(0)); + } + log_error("Behavior profile number (" + std::to_string(n) + ") resets the counts to zero every (" + std::to_string(interval) + ") ms"); + } else + { + m_reset_timers.emplace_back(uint64_t(0)); + } m_behavior_profiles_fields.emplace_back(filter_check_fields); m_behavior_profiles_event_codes.emplace_back(std::move(codes)); n++; @@ -375,18 +399,20 @@ bool anomalydetection::init(falcosecurity::init_input& in) return false; } - ////////////////////////// + //////////////////// // Init sketches - ////////////////////////// + //////////////////// // Init the plugin managed state table holding the count min sketch estimates for each behavior profile + m_thread_manager.stop_threads(); // Important for reloading configs conditions + m_count_min_sketches.lock()->clear(); if (m_rows_cols.size() == m_n_sketches) { for (uint32_t i = 0; i < m_n_sketches; ++i) { uint64_t rows = m_rows_cols[i][0]; uint64_t cols = m_rows_cols[i][1]; - m_count_min_sketches.lock()->push_back(std::make_unique>(rows, cols)); + m_count_min_sketches.lock()->push_back(std::make_shared>(rows, cols)); } } else if (m_gamma_eps.size() == m_n_sketches && m_rows_cols.empty()) { @@ -394,13 +420,20 @@ bool anomalydetection::init(falcosecurity::init_input& in) { double gamma = m_gamma_eps[i][0]; double eps = m_gamma_eps[i][1]; - m_count_min_sketches.lock()->push_back(std::make_unique>(gamma, eps)); + m_count_min_sketches.lock()->push_back(std::make_shared>(gamma, eps)); } } else { return false; } + // Launch threads to periodically reset the data structures (if applicable) + m_thread_manager.m_stop_requested = false; + for (uint32_t i = 0; i < m_n_sketches; ++i) + { + m_thread_manager.start_periodic_count_min_sketch_reset_worker(i, (uint64_t)m_reset_timers[i], m_count_min_sketches); + } + return true; } diff --git a/plugins/anomalydetection/src/plugin.h b/plugins/anomalydetection/src/plugin.h index d7cdc272..abcd6ef7 100644 --- a/plugins/anomalydetection/src/plugin.h +++ b/plugins/anomalydetection/src/plugin.h @@ -21,6 +21,7 @@ limitations under the License. #include "plugin_consts.h" #include "plugin_utils.h" #include "plugin_mutex.h" +#include "plugin_thread_manager.h" #include "plugin_sinsp_filterchecks.h" #include @@ -45,6 +46,8 @@ struct sinsp_param class anomalydetection { public: + anomalydetection() : m_thread_manager() {} + // Keep this aligned with `get_fields` enum anomalydetection_fields { @@ -132,15 +135,19 @@ class anomalydetection private: + // Manages plugin side threads, such as resetting the count min sketch data structures + ThreadManager m_thread_manager; + bool m_count_min_sketch_enabled = false; uint32_t m_n_sketches = 0; std::vector> m_gamma_eps; std::vector> m_rows_cols; // If set supersedes m_gamma_eps std::vector> m_behavior_profiles_fields; std::vector> m_behavior_profiles_event_codes; + std::vector m_reset_timers; // Plugin managed state table - plugin_anomalydetection::Mutex>>> m_count_min_sketches; + plugin_anomalydetection::Mutex>>> m_count_min_sketches; // required; standard plugin API std::string m_lasterr; diff --git a/plugins/anomalydetection/src/plugin_thread_manager.h b/plugins/anomalydetection/src/plugin_thread_manager.h new file mode 100644 index 00000000..cf3d3f2e --- /dev/null +++ b/plugins/anomalydetection/src/plugin_thread_manager.h @@ -0,0 +1,117 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2024 The Falco Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +#pragma once + +#include "num/cms.h" +#include "plugin_mutex.h" + +#include +#include +#include +#include +#include +#include + +class ThreadManager { +public: + ThreadManager() : m_stop_requested(false) {} + + ~ThreadManager() + { + stop_threads(); + } + + void stop_threads() + { + { + std::lock_guard lock(m_thread_mutex); + m_stop_requested = true; + } + + { + std::lock_guard lock(m_thread_mutex); + for (auto& t : m_threads) + { + if (t.joinable()) + { + t.join(); + } + } + m_threads.clear(); + } + } + + template + void start_periodic_count_min_sketch_reset_worker(int id, uint64_t interval_ms, plugin_anomalydetection::Mutex>>>& count_min_sketches) + { + if (interval_ms > 100) + { + auto worker = [id, interval_ms, &count_min_sketches, this]() { + periodic_count_min_sketch_reset_worker(id, interval_ms, count_min_sketches); + }; + + std::thread worker_thread(worker); + { + std::lock_guard lock(m_thread_mutex); + m_threads.push_back(std::move(worker_thread)); + } + } + } + std::atomic m_stop_requested; + +private: + std::vector m_threads; + std::mutex m_thread_mutex; + + template + void reset_sketches_worker(int id, plugin_anomalydetection::Mutex>>>& count_min_sketches) + { + auto sketches = count_min_sketches.lock(); + if (id >= 0 && id < sketches->size()) + { + auto& sketch_ptr = sketches->at(id); + if (sketch_ptr) + { + sketch_ptr->reset(); + } + } + } + + template + void periodic_count_min_sketch_reset_worker(int id, uint64_t interval_ms, plugin_anomalydetection::Mutex>>>& count_min_sketches) + { + std::chrono::milliseconds interval(interval_ms); + while (true) + { + std::this_thread::sleep_for(interval); + { + std::lock_guard lock(m_thread_mutex); + if (m_stop_requested) + break; + } + + try + { + reset_sketches_worker(id, count_min_sketches); + } catch (const std::exception& e) + { + } + } + } +}; diff --git a/plugins/anomalydetection/src/plugin_utils.cpp b/plugins/anomalydetection/src/plugin_utils.cpp index 87934245..9f9ef52e 100644 --- a/plugins/anomalydetection/src/plugin_utils.cpp +++ b/plugins/anomalydetection/src/plugin_utils.cpp @@ -17,6 +17,8 @@ limitations under the License. #include "plugin_utils.h" +#define SCAP_MAX_PATH_SIZE 1024 + static const filtercheck_field_info sinsp_filter_check_fields[] = { {PT_CHARBUF, EPF_ANOMALY_PLUGIN | EPF_NONE, PF_NA, "proc.exe", "First Argument", "The first command-line argument (i.e., argv[0]), typically the executable name or a custom string as specified by the user. It is primarily obtained from syscall arguments, truncated after 4096 bytes, or, as a fallback, by reading /proc/PID/cmdline, in which case it may be truncated after 1024 bytes. This field may differ from the last component of proc.exepath, reflecting how command invocation and execution paths can vary."}, diff --git a/plugins/anomalydetection/src/plugin_utils.h b/plugins/anomalydetection/src/plugin_utils.h index b3dcbad4..3b8abb30 100644 --- a/plugins/anomalydetection/src/plugin_utils.h +++ b/plugins/anomalydetection/src/plugin_utils.h @@ -25,8 +25,6 @@ limitations under the License. #include #include -#define SCAP_MAX_PATH_SIZE 1024 - typedef struct plugin_sinsp_filterchecks_field { plugin_sinsp_filterchecks::check_type id;