diff --git a/dpkt/bgp.py b/dpkt/bgp.py index a09c0808..a3c31f54 100755 --- a/dpkt/bgp.py +++ b/dpkt/bgp.py @@ -77,6 +77,9 @@ # Capability Types CAP_MULTIPROTOCOL = 1 CAP_ROUTE_REFRESH = 2 +CAP_GRACEFUL_RESTART = 64 +CAP_SUPPORT_4OCTETAS = 65 +CAP_ADD_PATH = 69 # NOTIFICATION Error Codes MESSAGE_HEADER_ERROR = 1 @@ -200,7 +203,22 @@ def unpack(self, buf): if self.type == AUTHENTICATION: self.data = self.authentication = self.Authentication(self.data) elif self.type == CAPABILITY: - self.data = self.capability = self.Capability(self.data) + l = [] + while self.data: + capability = self.Capability(self.data) + l.append(capability) + self.data = self.data[self.__hdr_len__+capability.len:] + self.data = self.capabilities = l + # define capability to keep backward compatibility + self.capability = self.capabilities[0] + + def __len__(self): + return self.__hdr_len__ + sum(map(len, self.capabilities)) + + def __bytes__(self): + caps = b''.join(map(bytes, self.capabilities)) + self.cap_len = len(caps) + return self.pack_hdr() + caps class Authentication(dpkt.Packet): __hdr__ = ( @@ -217,6 +235,7 @@ def unpack(self, buf): dpkt.Packet.unpack(self, buf) self.data = self.data[:self.len] + class Update(dpkt.Packet): __hdr_defaults__ = { 'withdrawn': [], @@ -738,6 +757,8 @@ def __bytes__(self): __bgp2 = b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x63\x02\x00\x00\x00\x48\x40\x01\x01\x00\x40\x02\x0a\x01\x02\x01\xf4\x01\xf4\x02\x01\xfe\xbb\x40\x03\x04\xc0\xa8\x00\x0f\x40\x05\x04\x00\x00\x00\x64\x40\x06\x00\xc0\x07\x06\xfe\xba\xc0\xa8\x00\x0a\xc0\x08\x0c\xfe\xbf\x00\x01\x03\x16\x00\x04\x01\x54\x00\xfa\x80\x09\x04\xc0\xa8\x00\x0f\x80\x0a\x04\xc0\xa8\x00\xfa\x16\xc0\xa8\x04' __bgp3 = b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x79\x02\x00\x00\x00\x62\x40\x01\x01\x00\x40\x02\x00\x40\x05\x04\x00\x00\x00\x64\xc0\x10\x08\x00\x02\x01\x2c\x00\x00\x01\x2c\xc0\x80\x24\x00\x00\xfd\xe9\x40\x01\x01\x00\x40\x02\x04\x02\x01\x15\xb3\x40\x05\x04\x00\x00\x00\x2c\x80\x09\x04\x16\x05\x05\x05\x80\x0a\x04\x16\x05\x05\x05\x90\x0e\x00\x1e\x00\x01\x80\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x0c\x04\x04\x04\x00\x60\x18\x77\x01\x00\x00\x01\xf4\x00\x00\x01\xf4\x85' __bgp4 = b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x2d\x01\x04\x00\xed\x00\x5a\xc6\x6e\x83\x7d\x10\x02\x06\x01\x04\x00\x01\x00\x01\x02\x02\x80\x00\x02\x02\x02\x00' +__bgp10 = b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x41\x01\x04\x00\x01\x00\xb4\x0a\x01\x01\x01\x24\x02\x22\x01\x04\x00\x01\x00\x01\x01\x04\x00\x01\x00\x04\x02\x00\x40\x02\x01\x2c\x41\x04\x00\x00\x00\x01\x45\x08\x00\x01\x01\x01\x00\x01\x04\x01' + # BGP-EVPN type 1-4 packets for testing. __bgp5 = b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x60\x02\x00\x00\x00\x49\x40\x01\x01\x00\x40\x02\x00\x40\x05\x04\x00\x00\x00\x64\xc0\x10\x10\x03\x0c\x00\x00\x00\x00\x00\x08\x00\x02\x03\xe8\x00\x00\x00\x02\x90\x0e\x00\x24\x00\x19\x46\x04\x01\x01\x01\x02\x00\x01\x19\x00\x01\x01\x01\x01\x02\x00\x02\x05\x00\x00\x03\xe8\x00\x00\x04\x00\x00\x00\x00\x00\x02\x00\x00\x02' @@ -757,6 +778,7 @@ def test_pack(): assert (__bgp7 == bytes(BGP(__bgp7))) assert (__bgp8 == bytes(BGP(__bgp8))) assert (__bgp9 == bytes(BGP(__bgp9))) + assert (__bgp10 == bytes(BGP(__bgp10))) def test_unpack(): @@ -828,11 +850,11 @@ def test_unpack(): p = b4.open.parameters[0] assert (p.type == CAPABILITY) assert (p.len == 6) - c = p.capability + c = p.capabilities[0] assert (c.code == CAP_MULTIPROTOCOL) assert (c.len == 4) assert (c.data == b'\x00\x01\x00\x01') - c = b4.open.parameters[2].capability + c = b4.open.parameters[2].capabilities[0] assert (c.code == CAP_ROUTE_REFRESH) assert (c.len == 0) @@ -934,6 +956,23 @@ def test_unpack(): assert (r.ip_address == b'\xc0\xb4\x01\x02\xc0\xb4\x01\x02\xc0\xb4\x01\x02\xc0\xb4\x01\x02') assert (r.mpls_label_stack == b'\x00\x00\x02\x00\x00\x00') + b10 = BGP(__bgp10) + assert (b10.len == 65) + assert (b10.type == OPEN) + assert (b10.open.asn == 1) + assert (b10.open.param_len == 36) + assert (len(b10.open.parameters) == 1) + p = b10.open.parameters[0] + assert (p.type == CAPABILITY) + assert (p.len == 34) + c = p.capabilities[0] + assert (c.code == CAP_MULTIPROTOCOL) + assert (c.len == 4) + assert (c.data == b'\x00\x01\x00\x01') + c = b10.open.parameters[0].capabilities[3] + assert (c.code == CAP_GRACEFUL_RESTART) + assert (c.len == 2) + if __name__ == '__main__': test_pack()