Skip to content

Commit

Permalink
feat: Add support for data export rules endpoint (#132)
Browse files Browse the repository at this point in the history
* feat: add missing API endpoint

Signed-off-by: Timothy MacDonald <[email protected]>

* feat: Add support for data export rules endpoint

Signed-off-by: Timothy MacDonald <[email protected]>

---------

Signed-off-by: Timothy MacDonald <[email protected]>
  • Loading branch information
tmac1973 authored Dec 19, 2023
1 parent 3aff0db commit 19a8ba8
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 3 deletions.
2 changes: 2 additions & 0 deletions laceworksdk/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .v2.container_registries import ContainerRegistriesAPI
from .v2.contract_info import ContractInfoAPI
from .v2.datasources import DatasourcesAPI
from .v2.data_export_rules import DataExportRulesAPI
from .v2.entities import EntitiesAPI
from .v2.events import EventsAPI
from .v2.inventory import InventoryAPI
Expand Down Expand Up @@ -149,6 +150,7 @@ def __init__(self,
self.container_registries = ContainerRegistriesAPI(self._session)
self.contract_info = ContractInfoAPI(self._session)
self.datasources = DatasourcesAPI(self._session)
self.data_export_rules = DataExportRulesAPI(self._session)
self.entities = EntitiesAPI(self._session)
self.events = EventsAPI(self._session)
self.inventory = InventoryAPI(self._session)
Expand Down
3 changes: 2 additions & 1 deletion laceworksdk/api/v2/agent_access_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ def __init__(self, session):
:return AgentAccessTokensAPI object.
"""

super().__init__(session, "AgentAccessTokens")


def create(self,
alias=None,
enabled=True,
Expand Down
123 changes: 123 additions & 0 deletions laceworksdk/api/v2/data_export_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# -*- coding: utf-8 -*-
"""
Lacework DataExportRules API wrapper.
"""

from laceworksdk.api.crud_endpoint import CrudEndpoint


class DataExportRulesAPI(CrudEndpoint):

def __init__(self, session):
"""
Initializes the DataExportRulesAPI object.
:param session: An instance of the HttpSession class
:return DataExportRulesAPI object.
"""

super().__init__(session, "DataExportRules")

def create(self,
type,
filters,
intg_guid_list,
**request_params):
"""
A method to create a new DataExportRules object.
Args:
type(str): A string representing the type of rule to be added.
filters(dict): A dictionary containing the name(string), description(string), enabled(bool), and
profile_version(list[string]) fields.
intg_guid_list(str): A list of strings representing the guids of the alert channels to use (s3 only).
request_params(any): Additional request parameters.
(provides support for parameters that may be added in the future)
Return:
response(json)
"""

return super().create(
filters=self._format_filters(filters),
type=type,
intg_guid_list=intg_guid_list,
**request_params
)

def get(self,
guid=None):
"""
A method to get DataExportRules objects.
Args:
guid(str): A string representing the object GUID.
Return:
response(json)
"""

return super().get(id=guid)

def get_by_guid(self,
guid):
"""
A method to get an DataExportRules object by GUID.
Args:
guid(str): A string representing the object GUID.
Return:
response(json)
"""

return self.get(guid=guid)

def update(self,
guid,
filters=None,
intg_guid_list=None,
type=None,
**request_params):
"""
A method to update an existing DataExportRules object.
Args:
guid(str): A string representing the object GUID.
type(str): A string representing the type of rule.
filters(dict): A dictionary containing the name(string), description(string), enabled(bool), and
profile_version(list[string]) fields.
intg_guid_list(str): A list of strings representing the guids of the alert channels to use (s3 only).
request_params(any): Additional request parameters.
(provides support for parameters that may be added in the future)
Return:
response(json)
"""

return super().update(
id=guid,
filters=self._format_filters(filters),
type=type,
intg_guid_list=intg_guid_list,
**request_params
)

def delete(self,
guid):
"""
A method to delete a DataExportRules object.
Args:
guid(str): A string representing the object GUID.
Return:
response(json)
"""

return super().delete(id=guid)
1 change: 1 addition & 0 deletions laceworksdk/api/v2/user_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from laceworksdk.api.base_endpoint import BaseEndpoint


class UserGroupsAPI(BaseEndpoint):
def __init__(self, session):
super().__init__(session, "UserGroups")
Expand Down
3 changes: 1 addition & 2 deletions laceworksdk/api/v2/vulnerability_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ def create(self,
**request_params
)

def get(self,
guid=None):
def get(self, guid=None):
"""
A method to get VulnerabilityExceptions objects.
Expand Down
19 changes: 19 additions & 0 deletions tests/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,25 @@ def email_alert_channel_guid(api):
alert_channel_guid = response["data"][0]["intgGuid"]
return alert_channel_guid

@pytest.fixture(scope="session")
def s3_alert_channel_guid(api):
response = api.alert_channels.search(
json={
"filters": [
{
"expression": "eq",
"field": "type",
"value": "AwsS3"
}
],
"returns": [
"intgGuid"
]
}
)
alert_channel_guid = response["data"][0]["intgGuid"]
return alert_channel_guid


@pytest.fixture(scope="session")
def aws_resource_group_guid(api):
Expand Down
48 changes: 48 additions & 0 deletions tests/api/v2/test_data_export_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
"""
Test suite for the community-developed Python SDK for interacting with Lacework APIs.
"""

import pytest

from laceworksdk.api.v2.data_export_rules import DataExportRulesAPI
from tests.api.test_crud_endpoint import CrudEndpoint


# Tests

@pytest.fixture(scope="module")
def api_object(api):
return api.data_export_rules


@pytest.fixture(scope="module")
def api_object_create_body(random_text, s3_alert_channel_guid):
return {
"type": "Dataexport",
"filters": {
"name": f"Test Data Export Rule {random_text}",
"description": f"Test Data Export Rule Description {random_text}",
"enabled": 1
},
"intg_guid_list": [s3_alert_channel_guid]
}


@pytest.fixture(scope="module")
def api_object_update_body(random_text):
return {
"filters": {
"name": f"Test Data Export Rule {random_text} (Updated)",
"enabled": False
}
}


class TestDataExportRules(CrudEndpoint):

OBJECT_ID_NAME = "mcGuid"
OBJECT_TYPE = DataExportRulesAPI

def test_api_get_by_guid(self, api_object):
self._get_object_classifier_test(api_object, "guid", self.OBJECT_ID_NAME)

0 comments on commit 19a8ba8

Please sign in to comment.