Skip to content

Commit

Permalink
Nftables truncate comments that exceed nftables 128-character limits.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 502678862
  • Loading branch information
TheLinuxGuy authored and Capirca Team committed Jan 17, 2023
1 parent 8fedd8c commit 5d10361
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 14 deletions.
24 changes: 24 additions & 0 deletions capirca/lib/aclgenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,3 +597,27 @@ def WrapWords(textlist, size, joiner='\n'):
# avoid empty comment lines
rval.extend(x.strip() for x in textlist[index].strip().split(joiner) if x)
return rval

def TruncateWords(input_text, char_limit):
"""Shorten text strings to not exceed a specified limit.
This function also removes any line breaks.
Args:
input_text: list or individual string values.
char_limit: size limit of resulting string element.
Returns:
truncated single string element within double quotes.
"""
CHAR_LIMIT = 126

if isinstance(input_text, list):
# handle multiple comments. Remove newline.
sanitized_list = []
for i in input_text:
sanitized_list.append(i.replace('\n', ''))
comment = ' '.join(sanitized_list)
else:
comment = input_text
return '"' + comment[:char_limit] + '"'
22 changes: 15 additions & 7 deletions capirca/lib/nftables.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ def PortsAndProtocols(self, address_family, protocol, src_ports, dst_ports,
Returns:
list of statements related to ports and protocols.
"""

def PortStatement(protocol, source, destination):
Expand Down Expand Up @@ -454,14 +453,14 @@ def RulesetGenerator(self, term):
Returns:
list of strings. Representing a ruleset for later formatting.
"""
term_ruleset = []

# COMMENT handling.
if self.verbose:
for line in self.term.comment:
term_ruleset.append('comment "%s"' % line)
term_ruleset.append(
'comment ' + aclgenerator.TruncateWords(
self.term.comment, Nftables.COMMENT_CHAR_LIMIT))
# OPTIONS / LOGGING / COUNTERS
opt = self._OptionsHandler(term)
# STATEMENT VERDICT / ACTION.
Expand Down Expand Up @@ -565,6 +564,8 @@ class Nftables(aclgenerator.ACLGenerator):
'expiration',
])

COMMENT_CHAR_LIMIT = 126

_AF_MAP = {'inet': (4,), 'inet6': (6,), 'mixed': (4, 6)}
# Below mapping converts capirca HEADER native to nftables table.
# In Nftables 'inet' contains both IPv4 and IPv6 addresses and rules.
Expand Down Expand Up @@ -790,9 +791,16 @@ def __str__(self):
# base chain header and contents.
nft_config.append(TabSpacer(4, 'chain %s {' % item))
if base_chain_dict[item]['comment']:
# Handle multi-line comments
for comment in base_chain_dict[item]['comment']:
nft_config.append(TabSpacer(8, 'comment "%s"' % comment))
# Due to Nftables limits on comments, we handle this twice.
# First time we comment it out so .nft file is human-readable.
nft_config.append(
TabSpacer(8, '#' + ' '.join(base_chain_dict[item]['comment'])))
# Second time so 'nft list ruleset' keeps the comment in memory.
nft_config.append(
TabSpacer(
8, 'comment ' +
aclgenerator.TruncateWords(
base_chain_dict[item]['comment'], self.COMMENT_CHAR_LIMIT)))
nft_config.append(
TabSpacer(
8, 'type filter hook %s priority %s; policy %s;' %
Expand Down
15 changes: 14 additions & 1 deletion tests/lib/aclgenerator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Unittest for ACL rendering module."""

from absl.testing import absltest
from absl.testing import parameterized
from unittest import mock

from capirca.lib import aclgenerator
Expand Down Expand Up @@ -87,12 +88,24 @@ def _TranslatePolicy(self, pol, exp_info):
pass


class ACLGeneratorTest(absltest.TestCase):
class ACLGeneratorTest(parameterized.TestCase):

def setUp(self):
super().setUp()
self.naming = mock.create_autospec(naming.Naming)

# pylint: disable=line-too-long,g-inconsistent-quotes
@parameterized.parameters(([
'There is something very long about this comment that',
'will require it to be truncated in order for nftables',
'binary to be able to load the rulesets.'], '"There is something very long about this comment that will require it to be truncated in order for nftables binary to be able t"'),
(['some comment description', 'second comment item \nNewline'], '"some comment description second comment item Newline"'), ('a string comment', '"a string comment"'))
def testTruncateWords(self, input_data, expected_output):
result = aclgenerator.TruncateWords(
input_data, 126)
self.assertEqual(result, expected_output)
# pylint: disable=line-too-long

def testEstablishedNostate(self):
# When using "nostate" filter and a term with "option:: established"
# have any protocol other than TCP and/or UDP should raise error.
Expand Down
10 changes: 4 additions & 6 deletions tests/lib/nftables_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,7 @@ def testDuplicateTerm(self):
self.naming)
with self.assertRaises(nftables.TermError):
nftables.Nftables.__init__(
nftables.Nftables.__new__(nftables.Nftables), pol,
EXP_INFO)
nftables.Nftables.__new__(nftables.Nftables), pol, EXP_INFO)

@parameterized.parameters(([(80, 80)], '80'), ([(1024, 65535)], '1024-65535'),
([], ''))
Expand Down Expand Up @@ -346,8 +345,7 @@ def testBadHeader(self, case):
pol = policy.ParsePolicy(header + GOOD_TERM_1, self.naming)
with self.assertRaises(nftables.HeaderError):
nftables.Nftables.__init__(
nftables.Nftables.__new__(nftables.Nftables), pol,
EXP_INFO)
nftables.Nftables.__new__(nftables.Nftables), pol, EXP_INFO)

@parameterized.parameters((HEADER_NOVERBOSE, False), (HEADER_COMMENT, True))
def testVerboseHeader(self, header_to_use, expected_output):
Expand Down Expand Up @@ -490,8 +488,7 @@ def testSkippedTerm(self, termdata, messagetxt):
with self.assertLogs() as ctx:
# run a policy object expected to be skipped and logged.
nft = nftables.Nftables(
policy.ParsePolicy(GOOD_HEADER_1 + termdata, self.naming),
EXP_INFO)
policy.ParsePolicy(GOOD_HEADER_1 + termdata, self.naming), EXP_INFO)
# self.assertEqual(len(ctx.records), 2)
record = ctx.records[1]
self.assertEqual(record.message, messagetxt)
Expand Down Expand Up @@ -532,5 +529,6 @@ def testRulesetGenerator(self, policy_data: str, expected_inet: str):
self.assertNotIn('ip protocol', rule)
self.assertNotIn('icmp', rule)


if __name__ == '__main__':
absltest.main()

0 comments on commit 5d10361

Please sign in to comment.