diff --git a/capirca/lib/cisco.py b/capirca/lib/cisco.py index 0c74edf5..0bb5c229 100644 --- a/capirca/lib/cisco.py +++ b/capirca/lib/cisco.py @@ -441,6 +441,7 @@ def GetProtocol(port_num, proto, platform='cisco'): except KeyError: return port_num + class Term(aclgenerator.Term): """A single ACL Term.""" ALLOWED_PROTO_STRINGS = ['eigrp', 'gre', 'icmp', 'igmp', 'igrp', 'ip', @@ -573,29 +574,41 @@ def __str__(self): # options opts = [str(x) for x in self.term.option] - if ((self.PROTO_MAP['tcp'] in protocol or 'tcp' in protocol) - and ('tcp-established' in opts or 'established' in opts)): + if (self.PROTO_MAP['tcp'] in protocol or 'tcp' in protocol) and ( + 'tcp-established' in opts or 'established' in opts + ): if 'established' not in self.options: self.options.append('established') # Using both 'fragments' and 'is-fragment', ref Github Issue #187 - if ('ip' in protocol) and (('fragments' in opts) or - ('is-fragment' in opts)): + if ('ip' in protocol) and ( + ('fragments' in opts) or ('is-fragment' in opts) + ): if 'fragments' not in self.options: self.options.append('fragments') # ACL-based Forwarding - if (self.platform == 'ciscoxr' - ) and not self.term.action and self.term.next_ip and ( - 'nexthop1' not in opts): + if ( + (self.platform == 'ciscoxr') + and not self.term.action + and self.term.next_ip + and ('nexthop1' not in opts) + ): if len(self.term.next_ip) > 1: - raise CiscoNextIpError('The following term has more than one next IP ' - 'value: %s' % self.term.name) - if (not isinstance(self.term.next_ip[0], nacaddr.IPv4) and - not isinstance(self.term.next_ip[0], nacaddr.IPv6)): - raise CiscoNextIpError('Next IP value must be an IP address. ' - 'Invalid term: %s' % self.term.name) + raise CiscoNextIpError( + 'The following term has more than one next IP value: %s' + % self.term.name + ) + if not isinstance(self.term.next_ip[0], nacaddr.IPv4) and not isinstance( + self.term.next_ip[0], nacaddr.IPv6 + ): + raise CiscoNextIpError( + 'Next IP value must be an IP address. Invalid term: %s' + % self.term.name + ) if self.term.next_ip[0].num_addresses > 1: - raise CiscoNextIpError('The following term has a subnet instead of a ' - 'host: %s' % self.term.name) + raise CiscoNextIpError( + 'The following term has a subnet instead of a host: %s' + % self.term.name + ) nexthop = self.term.next_ip[0].network_address nexthop_protocol = 'ipv4' if nexthop.version == 4 else 'ipv6' self.options.append('nexthop1 %s %s' % (nexthop_protocol, nexthop)) @@ -930,7 +943,7 @@ def _BuildTokens(self): def _TranslatePolicy(self, pol, exp_info): self.cisco_policies = [] - current_date = datetime.datetime.utcnow().date() + current_date = datetime.datetime.now(datetime.timezone.utc).date() exp_info_date = current_date + datetime.timedelta(weeks=exp_info) # a mixed filter outputs both ipv4 and ipv6 acls in the same output file @@ -998,45 +1011,72 @@ def _TranslatePolicy(self, pol, exp_info): continue # Ignore if the term is for a different AF - if term.restrict_address_family and term.restrict_address_family != af: + if ( + term.restrict_address_family + and term.restrict_address_family != af + ): continue if term.expiration: if term.expiration <= exp_info_date: - logging.info('INFO: Term %s in policy %s expires ' - 'in less than two weeks.', term.name, filter_name) + logging.info( + 'INFO: Term %s in policy %s expires in less than two weeks.', + term.name, + filter_name, + ) if term.expiration <= current_date: - logging.warning('WARNING: Term %s in policy %s is expired and ' - 'will not be rendered.', term.name, filter_name) + logging.warning( + 'WARNING: Term %s in policy %s is expired and ' + 'will not be rendered.', + term.name, + filter_name, + ) continue # render terms based on filter type if next_filter == 'standard': # keep track of sequence numbers across terms - new_terms.append(TermStandard(term, filter_name, self._PLATFORM, - self.verbose)) + new_terms.append( + TermStandard(term, filter_name, self._PLATFORM, self.verbose) + ) elif next_filter == 'extended': - enable_dsmo = (len(filter_options) > 2 and - filter_options[2] == 'enable_dsmo') + enable_dsmo = ( + len(filter_options) > 2 and filter_options[2] == 'enable_dsmo' + ) new_terms.append( - Term(term, proto_int=self._PROTO_INT, enable_dsmo=enable_dsmo, - term_remark=self._TERM_REMARK, platform=self._PLATFORM, - verbose=self.verbose)) + Term( + term, + proto_int=self._PROTO_INT, + enable_dsmo=enable_dsmo, + term_remark=self._TERM_REMARK, + platform=self._PLATFORM, + verbose=self.verbose, + ) + ) elif next_filter == 'object-group': obj_target.AddTerm(term) - new_terms.append(self._GetObjectGroupTerm(term, filter_name, - verbose=self.verbose)) + new_terms.append( + self._GetObjectGroupTerm( + term, filter_name, verbose=self.verbose + ) + ) elif next_filter == 'inet6': new_terms.append( Term( - term, 6, proto_int=self._PROTO_INT, - platform=self._PLATFORM, verbose=self.verbose)) + term, + 6, + proto_int=self._PROTO_INT, + platform=self._PLATFORM, + verbose=self.verbose, + ) + ) # cisco requires different name for the v4 and v6 acls if filter_type == 'mixed' and next_filter == 'inet6': filter_name = 'ipv6-%s' % filter_name - self.cisco_policies.append((header, filter_name, [next_filter], - new_terms, obj_target)) + self.cisco_policies.append( + (header, filter_name, [next_filter], new_terms, obj_target) + ) def _GetObjectGroupTerm(self, term, filter_name, verbose=True): """Returns an ObjectGroupTerm object."""