Skip to content

Commit

Permalink
[FEATURE] Add ability to filter on host online state (#95)
Browse files Browse the repository at this point in the history
* Remove broken 'ConnectionStatus' from hosts filters

* Add hook for hosts/get_online_state from falconpy

* Add support for filtering on device online state for describe_devices and get_device_ids

* typo fix

* Make pylint happy

* Implement online state as an enum and add unit tests for failure cases

* Re-formats Deprecation Message

* Adds a blank line

* Use an Optional[Union] type for online_state

* Resolves a quoting issues that caused the file to not parse

---------

Co-authored-by: Chris Hammond <[email protected]>
  • Loading branch information
kenoel and ChristopherHammond13 authored May 4, 2023
1 parent 3cf2e49 commit 274d03e
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 26 deletions.
31 changes: 31 additions & 0 deletions caracara/common/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Common constants to be shared throughout Caracara."""
from enum import Enum, EnumMeta

FILTER_OPERATORS = {
"EQUAL": '',
Expand Down Expand Up @@ -30,3 +31,33 @@

#
DEFAULT_COMMENT = "This action was performed by the Caracara Python library."


# Device online states classes
class MetaEnum(EnumMeta):
"""Overrided class for the use of the in operator and to query for all valid enum values."""

def __init__(cls, *kwargs):
"""Store all possible values of the Enum subclass."""
cls.VALUES = [state.value for state in cls.__members__.values()]
super().__init__(kwargs)

def __contains__(cls: Enum, item: str) -> bool:
"""Override the __contains__ method to use the in operator with Enum subclasses."""
return item in cls.VALUES


class OnlineState(Enum, metaclass=MetaEnum):
"""
Falcon OnlineState class.
Enum class of valid device online states. Valid states are 'online', 'offline', and 'unknown'.
"""

ONLINE = "online"
OFFLINE = "offline"
UNKNOWN = "unknown"

def __eq__(self, item: str) -> bool:
"""Override the __eq__ method for strings to compare the OnlineState's string value."""
return str(self.value) == item
15 changes: 15 additions & 0 deletions caracara/common/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Caracara exceptions."""
from typing import Dict, List

from caracara.common.constants import OnlineState


class BaseCaracaraError(Exception):
"""Base exception class from which all other exceptions inherit."""
Expand Down Expand Up @@ -114,3 +116,16 @@ def __init__(self, arg_names: List[str] = None):
"message": arg_str
}]
super().__init__(self.errors)


class InvalidOnlineState(GenericAPIError):
"""The provided online state is invalid."""

def __init__(self, online_state_string):
"""Construct an instance of the InvalidOnlineState class."""
self.errors = [{
"code": 500,
"message": f"Invalid online state '{online_state_string}'. \
Expected one of {OnlineState.VALUES}."
}]
super().__init__(self.errors)
2 changes: 2 additions & 0 deletions caracara/modules/hosts/_data_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def describe_state(
) -> Dict[str, Dict]:
"""Return a dictionary containing online state for devices matching the provided filter.
DEPRECATED. The describe_devices function now captures this functionality.
Arguments
---------
filters: FalconFilter or str, optional
Expand Down
88 changes: 88 additions & 0 deletions caracara/modules/hosts/_online_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Caracara Hosts Module: host online state functions.
In order to avoid the main hosts.py file getting too unwieldly, this file contains
the implementations of the host online state query functions.
"""
# Disable the protected access rule because this file is an extension of the class in hosts.py.
# pylint: disable=protected-access
from __future__ import annotations
from typing import (
List,
TYPE_CHECKING,
)

from caracara.common.batching import batch_get_data
from caracara.common.constants import OnlineState
from caracara.common.exceptions import InvalidOnlineState

if TYPE_CHECKING:
from caracara.modules.hosts import HostsApiModule


def validate_online_state(
self: HostsApiModule,
online_state: OnlineState or str,
):
"""Raise an exception if the online_state_string is not a valid online state.
Arguments
---------
online_state: OnlineState or str, required
OnlineState or string to validate.
"""
if not isinstance(online_state, OnlineState) and online_state not in OnlineState:
raise InvalidOnlineState(online_state)
self.logger.debug(f"Validated online state {online_state}")


def get_online_state(
self: HostsApiModule,
device_ids: List[str],
) -> List[str]:
"""Return a dictionary containing online state details for every device specified by ID.
You should only use this endpoint if you already have a list of Device IDs / AIDs,
and want to retreive data about them directly. If you need to get device data via a
filter, and you do not yet have a list of Device IDs, you should consider using the
describe_devices() function.
Arguments
---------
device_ids: [str], required
A list of Falcon Device IDs.
Returns
-------
dict: A dictionary containing online state details for every device listed.
"""
self.logger.info("Obtaining online state data for %s devices", len(device_ids))
device_online_state_data = batch_get_data(device_ids, self.hosts_api.get_online_state)
return device_online_state_data


def filter_device_ids_by_online_state(
self: HostsApiModule,
device_ids: List[str],
online_state: OnlineState or str,
) -> List[str]:
"""Filter a list of device IDs by an online state.
Arguments
---------
device_ids: List[str]
Device IDs to filter against
online_state: OnlineState or str
Online state to filter device IDs on. Options are "online", "offline", "unknown"
Returns
-------
List[str]: A list of device IDs with the specified online state
"""
self.validate_online_state(online_state)

device_state_data = self.get_online_state(device_ids)

return list(filter(
lambda device_id: device_state_data[device_id]["state"] == online_state,
device_state_data,
))
58 changes: 54 additions & 4 deletions caracara/modules/hosts/hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from typing import (
Dict,
List,
Union,
Optional,
)

from falconpy import (
Expand All @@ -23,6 +25,7 @@
)

from caracara.common.batching import batch_get_data
from caracara.common.constants import OnlineState
from caracara.common.exceptions import (
GenericAPIError,
)
Expand Down Expand Up @@ -93,6 +96,13 @@ def __init__(self, api_authentication: OAuth2):
unhide,
)

# Import functions to filter by device online state
from caracara.modules.hosts._online_state import (
get_online_state,
filter_device_ids_by_online_state,
validate_online_state,
)

# Import functions to create and modify device tags
from caracara.modules.hosts._tagging import (
_create_tag_list,
Expand All @@ -104,7 +114,11 @@ def __init__(self, api_authentication: OAuth2):
# Static methods have to be set within the class
_create_tag_list = staticmethod(_create_tag_list)

def _perform_action(self, action_name: str, device_ids: List[str]) -> Dict:
def _perform_action(
self,
action_name: str,
device_ids: List[str],
) -> Dict:
"""Perform the specified action against the list of targets."""
returned = self.hosts_api.perform_action(ids=device_ids, action_name=action_name)["body"]

Expand All @@ -114,25 +128,50 @@ def _perform_action(self, action_name: str, device_ids: List[str]) -> Dict:
return returned

@filter_string
def describe_devices(self, filters: FalconFilter or str = None) -> Dict[str, Dict]:
def describe_devices(
self,
filters: FalconFilter or str = None,
online_state: Optional[Union[OnlineState, str]] = None,
) -> Dict[str, Dict]:
"""Return a dictionary containing details for every device matching the provided filter.
Arguments
---------
filters: FalconFilter or str, optional
Filters to apply to the device search.
online_state: OnlineState or str, optional
Device online state to filter devices on. Options are "online", "offline", "unknown"
Returns
-------
dict: A dictionary containing details for every device discovered.
"""
self.logger.info("Describing devices according to the filter string %s", filters)
device_ids = self.get_device_ids(filters)

# Collect state data
device_state_data = self.get_online_state(device_ids)

# Filter by online state, if applicable.
if online_state is not None:
self.validate_online_state(online_state)
device_ids = list(filter(
lambda key: device_state_data[key]["state"] == online_state,
device_state_data,
))

device_data = self.get_device_data(device_ids)

# Enrich the results with the online state field
for device_id, data in device_data.items():
data["state"] = device_state_data[device_id]["state"]

return device_data

def get_device_data(self, device_ids: List[str]) -> Dict[str, Dict]:
def get_device_data(
self,
device_ids: List[str],
) -> Dict[str, Dict]:
"""Return a dictionary containing details for every device specified by ID.
You should only use this endpoint if you already have a list of Device IDs / AIDs,
Expand All @@ -154,13 +193,19 @@ def get_device_data(self, device_ids: List[str]) -> Dict[str, Dict]:
return device_data

@filter_string
def get_device_ids(self, filters: FalconFilter or str = None) -> List[str]:
def get_device_ids(
self,
filters: FalconFilter or str = None,
online_state: Optional[Union[OnlineState, str]] = None,
) -> List[str]:
"""Return a list of IDs (string) for every device in your Falcon tenant.
Arguments
---------
filters: FalconFilter or str, optional
Filters to apply to the device search.
online_state: OnlineState or str, optional
Device online state to filter devices on. Options are "online", "offline", "unknown"
Returns
-------
Expand All @@ -169,4 +214,9 @@ def get_device_ids(self, filters: FalconFilter or str = None) -> List[str]:
self.logger.info("Searching for device IDs using the filter string %s", filters)
func = partial(self.hosts_api.query_devices_by_filter_scroll, filter=filters)
id_list: List[str] = all_pages_token_offset(func=func, logger=self.logger)

# Filter by online state, if applicable.
if online_state is not None:
id_list = self.filter_device_ids_by_online_state(id_list, online_state=online_state)

return id_list
22 changes: 0 additions & 22 deletions caracara/modules/hosts/hosts_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,6 @@ class HostContainedFilterAttribute(FalconFilterAttribute):
restrict = True


class HostConnectionStatusFilterAttribute(FalconFilterAttribute):
"""
Filter by whether a host is connected to the Falcon cloud.
Current valid options are Online and Offline.
"""

name = "ConnectionStatus"
fql = "connection_status"
options = ["Online", "Offline"]
restrict = True

def example(self) -> str:
"""Show filter example."""
return (
"This filter allows you to search for systems based on whether Falcon assesses them "
"as being Online or Offline. Theoretically, all 'Online' systems should be available "
"to connect to via Real Time Response (RTR)."
)


class HostDomainFqlFilterAttribute(FalconFilterAttribute):
"""Filter by host AD domain."""

Expand Down Expand Up @@ -292,7 +271,6 @@ def example(self) -> str:

FILTER_ATTRIBUTES = [
HostContainedFilterAttribute,
HostConnectionStatusFilterAttribute,
HostDomainFqlFilterAttribute,
HostGroupIdFilterAttribute,
HostHostnameFilterAttribute,
Expand Down
Loading

0 comments on commit 274d03e

Please sign in to comment.