|
| 1 | +import os |
| 2 | + |
| 3 | +from expects import equal, expect, start_with, contain |
| 4 | +from mamba import before, context, description, it |
| 5 | + |
| 6 | +from sdcclient import SdSecureClient |
| 7 | +from specs import be_successful_api_call |
| 8 | + |
| 9 | +with description("Custom Rules") as self: |
| 10 | + with before.each: |
| 11 | + self.client = SdSecureClient(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), |
| 12 | + token=os.getenv("SDC_SECURE_TOKEN")) |
| 13 | + |
| 14 | + with context("when the custom rules file exists"): |
| 15 | + with it("can be retrieved"): |
| 16 | + ok, res = self.client.get_user_falco_rules() |
| 17 | + |
| 18 | + expect((ok, res)).to(be_successful_api_call) |
| 19 | + expect(res).to(start_with("####################\n# Your custom rules!\n####################\n")) |
| 20 | + |
| 21 | + with it("can push custom rules"): |
| 22 | + _, previous_rules = self.client.get_user_falco_rules() |
| 23 | + empty_rules = self.empty_falco_rules() |
| 24 | + custom_rules = self.user_falco_rules() |
| 25 | + |
| 26 | + ok, res = self.client.set_user_falco_rules(custom_rules) |
| 27 | + expect((ok, res)).to(be_successful_api_call) |
| 28 | + expect(res).to(equal(custom_rules)) |
| 29 | + |
| 30 | + ok, res = self.client.set_user_falco_rules(empty_rules) |
| 31 | + expect((ok, res)).to(be_successful_api_call) |
| 32 | + expect(res).to(equal(empty_rules)) |
| 33 | + |
| 34 | + ok, res = self.client.set_user_falco_rules(self.rules_without_header()) |
| 35 | + expect((ok, res)).to(be_successful_api_call) |
| 36 | + # The endpoint automatically fills the header for the user. |
| 37 | + expect(res).to(start_with("####################\n# Your custom rules!\n####################\n\n")) |
| 38 | + expect(res).to(contain(self.rules_without_header())) |
| 39 | + |
| 40 | + ok, res = self.client.set_user_falco_rules(previous_rules) |
| 41 | + expect((ok, res)).to(be_successful_api_call) |
| 42 | + expect(res).to(equal(previous_rules)) |
| 43 | + |
| 44 | + |
| 45 | + def user_falco_rules(self): |
| 46 | + with open("fixtures/custom_rules.yaml", "r") as f: |
| 47 | + return f.read() |
| 48 | + |
| 49 | + |
| 50 | + def empty_falco_rules(self): |
| 51 | + return """#################### |
| 52 | +# Your custom rules! |
| 53 | +#################### |
| 54 | +
|
| 55 | +# Add new rules, like this one |
| 56 | +# - rule: A shell is run in a container |
| 57 | +# desc: An event will trigger every time you run a shell in a container |
| 58 | +# condition: evt.type = execve and evt.dir=< and container.id != host and proc.name = bash |
| 59 | +# output: "Suspect shell run in container (user=%user.name %container.info shell=%proc.name parent=%proc.pname cmdline=%proc.cmdline)" |
| 60 | +# priority: ERROR |
| 61 | +# tags: [shell] |
| 62 | +
|
| 63 | +# Or override any rule, macro, or list from the Default Rules |
| 64 | +""" |
| 65 | + |
| 66 | + |
| 67 | + def rules_without_header(self): |
| 68 | + return """\ |
| 69 | +--- |
| 70 | +- rule: "Testing rule" |
| 71 | + desc: "Description" |
| 72 | + condition: "always_true" |
| 73 | + output: "Sample output" |
| 74 | + priority: "WARNING" |
| 75 | + tags: [] |
| 76 | + source: "syscall" |
| 77 | +""" |
0 commit comments