From be8ac6bd311c2352688ccc20eb05ccb4f14384d7 Mon Sep 17 00:00:00 2001 From: Hisham Anver Date: Thu, 11 Jan 2024 12:04:16 +1100 Subject: [PATCH] fix unittests - missing transport type --- tests/test_base.py | 187 ++++++++++++++++++++++---------------------- tests/test_proxy.py | 45 ++++++----- 2 files changed, 118 insertions(+), 114 deletions(-) diff --git a/tests/test_base.py b/tests/test_base.py index f76a13d..0817f34 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -4,7 +4,7 @@ ~~~~~~~~~~~~~~ This module contains the Diameter protocol base unittests. - + :copyright: (c) 2020 Henrique Marques Ribeiro. :license: MIT, see LICENSE for more details. """ @@ -46,9 +46,9 @@ def test_diameter_avp__vendor_id_bit__default(self): def test_diameter_avp__vendor_id_bit__unset_when_is_unset(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.set_vendor_id_bit(False) - + self.assertEqual(cm.exception.args[0], "V-bit was already unset") def test_diameter_avp__vendor_id_bit__set_when_is_set(self): @@ -58,9 +58,9 @@ def test_diameter_avp__vendor_id_bit__set_when_is_set(self): avp.set_vendor_id_bit(True) self.assertTrue(avp.is_vendor_id()) - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.set_vendor_id_bit(True) - + self.assertEqual(cm.exception.args[0], "V-bit was already set") def test_diameter_avp__mandatory_bit__default(self): @@ -82,9 +82,9 @@ def test_diameter_avp__mandatory_bit__default(self): def test_diameter_avp__mandatory_bit__unset_when_is_unset(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.set_mandatory_bit(False) - + self.assertEqual(cm.exception.args[0], "M-bit was already unset") def test_diameter_avp__mandatory_bit__set_when_is_set(self): @@ -94,9 +94,9 @@ def test_diameter_avp__mandatory_bit__set_when_is_set(self): avp.set_mandatory_bit(True) self.assertTrue(avp.is_mandatory()) - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.set_mandatory_bit(True) - + self.assertEqual(cm.exception.args[0], "M-bit was already set") def test_diameter_avp__protected_bit__default(self): @@ -118,9 +118,9 @@ def test_diameter_avp__protected_bit__default(self): def test_diameter_avp__protected_bit__unset_when_is_unset(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.set_protected_bit(False) - + self.assertEqual(cm.exception.args[0], "P-bit was already unset") def test_diameter_avp__protected_bit__set_when_is_set(self): @@ -130,14 +130,14 @@ def test_diameter_avp__protected_bit__set_when_is_set(self): avp.set_protected_bit(True) self.assertTrue(avp.is_protected()) - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.set_protected_bit(True) - + self.assertEqual(cm.exception.args[0], "P-bit was already set") def test_diameter_avp__default_object(self): avp = DiameterAVP() - + self.assertEqual(avp.code.hex(), "00000000") self.assertEqual(avp.flags.hex(), "00") self.assertIsNone(avp.vendor_id) @@ -195,113 +195,113 @@ def test_diameter_avp__custom_object_by_instance_attributes(self): def test_diameter_avp__custom_object__invalid_code_value__string(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.code = "4294967296" - + self.assertEqual(cm.exception.args[0], "invalid code attribute value") def test_diameter_avp__custom_object__invalid_code_value__list(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.code = ["4294967296"] - + self.assertEqual(cm.exception.args[0], "invalid code attribute value") def test_diameter_avp__custom_object__invalid_code_value__integer_1(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.code = 4294967296 - + self.assertEqual(cm.exception.args[0], "code attribute has 4-bytes length long") def test_diameter_avp__custom_object__invalid_code_value__integer_2(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.code = -1 - + self.assertEqual(cm.exception.args[0], "code attribute has 4-bytes length long") def test_diameter_avp__custom_object__invalid_code_value__bytes_1(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.code = bytes.fromhex("00") - + self.assertEqual(cm.exception.args[0], "code attribute has 4-bytes length long") def test_diameter_avp__custom_object__invalid_code_value__bytes_2(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.code = bytes.fromhex("0000") - + self.assertEqual(cm.exception.args[0], "code attribute has 4-bytes length long") def test_diameter_avp__custom_object__invalid_code_value__bytes_3(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.code = bytes.fromhex("000000") - + self.assertEqual(cm.exception.args[0], "code attribute has 4-bytes length long") def test_diameter_avp__custom_object__invalid_code_value__bytes_4(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.code = bytes.fromhex("0000000001") - + self.assertEqual(cm.exception.args[0], "code attribute has 4-bytes length long") def test_diameter_avp__custom_object__invalid_flags_value__string(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.flags = "256" - + self.assertEqual(cm.exception.args[0], "invalid flags attribute value") def test_diameter_avp__custom_object__invalid_flags_value__list(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.flags = ["256"] - + self.assertEqual(cm.exception.args[0], "invalid flags attribute value") def test_diameter_avp__custom_object__invalid_flags_value__integer_1(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.flags = 256 - + self.assertEqual(cm.exception.args[0], "flags attribute has 1-byte length long") def test_diameter_avp__custom_object__invalid_flags_value__integer_2(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.flags = -1 - + self.assertEqual(cm.exception.args[0], "flags attribute has 1-byte length long") def test_diameter_avp__custom_object__invalid_flags_value__bytes_1(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.flags = bytes.fromhex("0000") - + self.assertEqual(cm.exception.args[0], "flags attribute has 1-byte length long") def test_diameter_avp__custom_object__invalid_flags_value__bytes_2(self): avp = DiameterAVP() - with self.assertRaises(AVPAttributeValueError) as cm: + with self.assertRaises(AVPAttributeValueError) as cm: avp.flags = bytes.fromhex("000000") - + self.assertEqual(cm.exception.args[0], "flags attribute has 1-byte length long") def test_diameter_avp__custom_object__flags_by_setting_class_attributes(self): @@ -373,7 +373,7 @@ def test_diameter_avp__custom_object__length_by_using_len_builtin_function(self) def test_diameter_avp__custom_object__length_changing_based_on_data(self): avp = DiameterAVP() - + avp.data = "srcrary" self.assertEqual(avp.get_length(), 15) self.assertEqual(avp.get_padding_length(), 1) @@ -393,14 +393,14 @@ def test_diameter_avp__custom_object__length_changing_based_on_data(self): avp.data += b"B" self.assertEqual(avp.get_length(), 23) self.assertEqual(avp.get_padding_length(), 1) - + avp.data += b"=" self.assertEqual(avp.get_length(), 24) self.assertIsNone(avp.get_padding_length()) def test_diameter_avp__custom_object__length_by_adding_vendor_id(self): avp = DiameterAVP() - + avp.data = "srcrary" self.assertEqual(avp.get_length(), 15) self.assertEqual(avp.get_padding_length(), 1) @@ -527,9 +527,9 @@ def test_diameter_avp__eq_dunder(self): def test_diameter_avp__custom_object__padding_not_allowed_to_set(self): avp = DiameterAVP() - with self.assertRaises(AttributeError) as cm: + with self.assertRaises(AttributeError) as cm: avp.padding = bytes.fromhex("0000") - self.assertEqual(cm.exception.args[0], "can't set attribute") + self.assertEqual(cm.exception.args[0], "can't set attribute 'padding'") class TestDiameterHeader(unittest.TestCase): @@ -724,9 +724,9 @@ def test_diameter_header_request_bit__default(self): def test_diameter_header_request_bit__unset_when_is_unset(self): header = DiameterHeader() - with self.assertRaises(DiameterHeaderError) as cm: + with self.assertRaises(DiameterHeaderError) as cm: header.set_request_bit(False) - + self.assertEqual(cm.exception.args[0], "R-bit was already unset") def test_diameter_header_request_bit__set_when_is_set(self): @@ -736,9 +736,9 @@ def test_diameter_header_request_bit__set_when_is_set(self): header.set_request_bit(True) self.assertTrue(header.is_request()) - with self.assertRaises(DiameterHeaderError) as cm: + with self.assertRaises(DiameterHeaderError) as cm: header.set_request_bit(True) - + self.assertEqual(cm.exception.args[0], "R-bit was already set") def test_diameter_header_request_bit__set_when_error_bit_is_set(self): @@ -747,10 +747,10 @@ def test_diameter_header_request_bit__set_when_error_bit_is_set(self): header.set_error_bit(True) self.assertTrue(header.is_error()) - - with self.assertRaises(DiameterHeaderError) as cm: + + with self.assertRaises(DiameterHeaderError) as cm: header.set_request_bit(True) - + self.assertEqual(cm.exception.args[0], "R-bit MUST NOT be set when E-bit is set") header.set_error_bit(False) @@ -778,9 +778,9 @@ def test_diameter_header_proxiable_bit__default(self): def test_diameter_header_proxiable_bit__unset_when_is_unset(self): header = DiameterHeader() - with self.assertRaises(DiameterHeaderError) as cm: + with self.assertRaises(DiameterHeaderError) as cm: header.set_proxiable_bit(False) - + self.assertEqual(cm.exception.args[0], "P-bit was already unset") def test_diameter_header_proxiable_bit__set_when_is_set(self): @@ -790,9 +790,9 @@ def test_diameter_header_proxiable_bit__set_when_is_set(self): header.set_proxiable_bit(True) self.assertTrue(header.is_proxiable()) - with self.assertRaises(DiameterHeaderError) as cm: + with self.assertRaises(DiameterHeaderError) as cm: header.set_proxiable_bit(True) - + self.assertEqual(cm.exception.args[0], "P-bit was already set") def test_diameter_header_error_bit__default(self): @@ -814,9 +814,9 @@ def test_diameter_header_error_bit__default(self): def test_diameter_header_error_bit__unset_when_is_unset(self): header = DiameterHeader() - with self.assertRaises(DiameterHeaderError) as cm: + with self.assertRaises(DiameterHeaderError) as cm: header.set_error_bit(False) - + self.assertEqual(cm.exception.args[0], "E-bit was already unset") def test_diameter_header_error_bit__set_when_is_set(self): @@ -826,9 +826,9 @@ def test_diameter_header_error_bit__set_when_is_set(self): header.set_error_bit(True) self.assertTrue(header.is_error()) - with self.assertRaises(DiameterHeaderError) as cm: + with self.assertRaises(DiameterHeaderError) as cm: header.set_error_bit(True) - + self.assertEqual(cm.exception.args[0], "E-bit was already set") def test_diameter_header_error_bit__set_when_request_bit_is_set(self): @@ -837,10 +837,10 @@ def test_diameter_header_error_bit__set_when_request_bit_is_set(self): header.set_request_bit(True) self.assertTrue(header.is_request()) - - with self.assertRaises(DiameterHeaderError) as cm: + + with self.assertRaises(DiameterHeaderError) as cm: header.set_error_bit(True) - + self.assertEqual(cm.exception.args[0], "E-bit MUST NOT be set when R-bit is set") header.set_request_bit(False) @@ -868,9 +868,9 @@ def test_diameter_header_retransmitted_bit__default(self): def test_diameter_header_retransmitted_bit__unset_when_is_unset(self): header = DiameterHeader() - with self.assertRaises(DiameterHeaderError) as cm: + with self.assertRaises(DiameterHeaderError) as cm: header.set_retransmitted_bit(False) - + self.assertEqual(cm.exception.args[0], "T-bit was already unset") def test_diameter_header_retransmitted_bit__set_when_is_set(self): @@ -880,9 +880,9 @@ def test_diameter_header_retransmitted_bit__set_when_is_set(self): header.set_retransmitted_bit(True) self.assertTrue(header.is_retransmitted()) - with self.assertRaises(DiameterHeaderError) as cm: + with self.assertRaises(DiameterHeaderError) as cm: header.set_retransmitted_bit(True) - + self.assertEqual(cm.exception.args[0], "T-bit was already set") @@ -901,18 +901,18 @@ def test_diameter_message__invalid_way_of_passing_avp_input_argument__1(self): header = DiameterHeader() avp = DiameterAVP() - with self.assertRaises(DiameterMessageError) as cm: + with self.assertRaises(DiameterMessageError) as cm: message = DiameterMessage(header, avp) - + self.assertEqual(cm.exception.args[0], "invalid input argument: 'avps'. It MUST be a list of DiameterAVP objects") def test_diameter_message__invalid_way_of_passing_avp_input_argument__2(self): header = DiameterHeader() avps = [DiameterAVP(), 1] - with self.assertRaises(DiameterMessageError) as cm: + with self.assertRaises(DiameterMessageError) as cm: message = DiameterMessage(header, avps) - + self.assertEqual(cm.exception.args[0], "invalid element found in list argument: 'avps'. The element 1 in position 1 does not represent a DiameterAVP object") def test_diameter_message__invalid_way_of_passing_avp_input_argument__3(self): @@ -921,16 +921,16 @@ def test_diameter_message__invalid_way_of_passing_avp_input_argument__3(self): with self.assertRaises(DiameterMessageError) as cm: message = DiameterMessage(header, avps) - + self.assertEqual(cm.exception.args[0], "invalid element found in list argument: 'avps'. The element DiameterAVP() in position 2 does not represent a DiameterAVP object") def test_diameter_message__invalid_way_of_passing_avp_input_argument__4(self): header = DiameterHeader() avps = [DiameterAVP(), OriginHostAVP("host.example.com"), "DiameterAVP()"] - with self.assertRaises(DiameterMessageError) as cm: + with self.assertRaises(DiameterMessageError) as cm: message = DiameterMessage(header, avps) - + self.assertEqual(cm.exception.args[0], "invalid element found in list argument: 'avps'. The element DiameterAVP() in position 2 does not represent a DiameterAVP object") def test_diameter_message__valid_way_of_passing_avp_input_argument(self): @@ -1080,7 +1080,7 @@ def test_diameter_message__append_method__valid_way_to_call__creating_cer_like_m message.header.command_code = CAPABILITIES_EXCHANGE_MESSAGE self.assertEqual(message.__str__(), "") - + message.header.application_id = DIAMETER_APPLICATION_DEFAULT self.assertEqual(message.__str__(), "") @@ -1118,12 +1118,12 @@ def test_diameter_message__pop_method__valid_way_to_call(self): with self.assertRaises(DiameterMessageError) as cm: message.pop("origin_realm_avp") - + self.assertEqual(cm.exception.args[0], "`avps` attribute is empty. There is no DiameterAVP object to be removed") def test_diameter_message__cleanup_method_after_appending_2_avps(self): origin_host_avp = OriginHostAVP("host.example.com") - origin_realm_avp = OriginRealmAVP("example.com") + origin_realm_avp = OriginRealmAVP("example.com") avps = [origin_host_avp, origin_realm_avp] message = DiameterMessage(avps=avps) @@ -1198,7 +1198,7 @@ def test_diameter_message__extend_method__valid_way_to_call__5_avps(self): origin_state_id_avp = OriginStateIdAVP(1524733202) auth_session_state_avp = AuthSessionStateAVP(STATE_MAINTAINED) auth_request_type_avp = AuthRequestTypeAVP(AUTH_REQUEST_TYPE_AUTHENTICATE_ONLY) - + avps = [origin_host_avp, origin_realm_avp, origin_state_id_avp, auth_session_state_avp, auth_request_type_avp] message = DiameterMessage() @@ -1251,7 +1251,7 @@ def test_diameter_message__has_avp_method__valid_way_to_call__2(self): self.assertEqual(message.auth_request_type_avp.__str__(), "") self.assertTrue(message.has_avp("auth_session_state_avp")) self.assertTrue(message.has_avp("auth_request_type_avp")) - + self.assertFalse(message.has_avp("origin_host_avp")) self.assertFalse(message.has_avp("origin_realm_avp")) self.assertFalse(message.has_avp("origin_state_id_avp")) @@ -1420,7 +1420,7 @@ def test_diameter_message__update_key_method__invalid_way_to_call(self): with self.assertRaises(DiameterMessageError) as cm: message.update_key("firmware1", "firmware0") - + self.assertEqual(cm.exception.args[0], "`firmware0` key already defined") def test_diameter_message__refresh_method__1(self): @@ -1716,7 +1716,7 @@ def test_diameter_message__refresh_method__3(self): self.assertEqual(message.avps[2].data, b"mme.epc.3gppnetwork.org") self.assertEqual(message.avps[2].get_padding_length(), 1) - #: See the needed to refresh it. Before call it the length is wrong if + #: See the needed to refresh it. Before call it the length is wrong if #: you try to access through the Header object self.assertEqual(message.header.length.hex(), "0001a0") self.assertEqual(message.header.get_length(), 416) @@ -1744,7 +1744,7 @@ def test_diameter_message__refresh_method__3(self): self.assertEqual(message.avps[3].data, b"epc.3gppnetwork.org") self.assertEqual(message.avps[3].get_padding_length(), 1) - #: See the needed to refresh it. Before call it the length is wrong if + #: See the needed to refresh it. Before call it the length is wrong if #: you try to access through the Header object self.assertEqual(message.header.length.hex(), "000190") self.assertEqual(message.header.get_length(), 400) @@ -1772,7 +1772,7 @@ def test_diameter_message__refresh_method__3(self): self.assertEqual(message.avps[0].data, b"mme.epc.3gppnetwork.org;1559529822;356549175;2.17;940463984") self.assertEqual(message.avps[0].get_padding_length(), 1) - #: See the needed to refresh it. Before call it the length is wrong if + #: See the needed to refresh it. Before call it the length is wrong if #: you try to access through the Header object self.assertEqual(message.header.length.hex(), "000180") self.assertEqual(message.header.get_length(), 384) @@ -1800,7 +1800,7 @@ def test_diameter_message__refresh_method__3(self): self.assertEqual(message.avps[4].data, b"epc.3gppnetwork.org") self.assertEqual(message.avps[4].get_padding_length(), 1) - #: See the needed to refresh it. Before call it the length is wrong if + #: See the needed to refresh it. Before call it the length is wrong if #: you try to access through the Header object self.assertEqual(message.header.length.hex(), "000170") self.assertEqual(message.header.get_length(), 368) @@ -1828,7 +1828,7 @@ def test_diameter_message__refresh_method__3(self): self.assertEqual(message.avps[5].data, b"frodo") self.assertEqual(message.avps[5].get_padding_length(), 3) - #: See the needed to refresh it. Before call it the length is wrong if + #: See the needed to refresh it. Before call it the length is wrong if #: you try to access through the Header object self.assertEqual(message.header.length.hex(), "000160") self.assertEqual(message.header.get_length(), 352) @@ -1846,9 +1846,10 @@ def test_diameter_message__update_avps_method__1(self): #: Initial Setup config = { "MODE": "CLIENT", + "TRANSPORT_TYPE": "TCP", "APPLICATIONS": [ { - "vendor_id": VENDOR_ID_3GPP, + "vendor_id": VENDOR_ID_3GPP, "app_id": DIAMETER_APPLICATION_Cx }, { @@ -2022,7 +2023,7 @@ def test_diameter_message__update_avps_method__1(self): self.assertEqual(cea.avps[8].avps[1].length.hex(), "00000c") self.assertEqual(cea.avps[8].avps[1].get_length(), 12) self.assertEqual(cea.avps[8].avps[1].data, DIAMETER_APPLICATION_Rx) - self.assertIsNone(cea.avps[8].avps[1].get_padding_length()) + self.assertIsNone(cea.avps[8].avps[1].get_padding_length()) #: Update all AVPs @@ -2479,7 +2480,7 @@ def test_diameter_message__update_avps_method__2(self): self.assertEqual(cea.avps[8].avps[1].length.hex(), "00000c") self.assertEqual(cea.avps[8].avps[1].get_length(), 12) self.assertEqual(cea.avps[8].avps[1].data, DIAMETER_APPLICATION_Rx) - self.assertIsNone(cea.avps[8].avps[1].get_padding_length()) + self.assertIsNone(cea.avps[8].avps[1].get_padding_length()) #: Update all AVPs avps = { @@ -2491,11 +2492,11 @@ def test_diameter_message__update_avps_method__2(self): "product_name": f"Bromelia-AAA", "auth_application_id": DIAMETER_APPLICATION_S6b, "vendor_specific_application_id": [ - VendorIdAVP(VENDOR_ID_ETSI), + VendorIdAVP(VENDOR_ID_ETSI), AuthApplicationIdAVP(DIAMETER_APPLICATION_SWm) ], "vendor_specific_application_id__1": [ - VendorIdAVP(VENDOR_ID_ETSI), + VendorIdAVP(VENDOR_ID_ETSI), AuthApplicationIdAVP(DIAMETER_APPLICATION_SWx) ], "origin_state_id": 42, @@ -2796,4 +2797,4 @@ def test_diameter_message__update_avps_method__2(self): if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() diff --git a/tests/test_proxy.py b/tests/test_proxy.py index b87146e..5d7fdf3 100644 --- a/tests/test_proxy.py +++ b/tests/test_proxy.py @@ -4,7 +4,7 @@ ~~~~~~~~~~~~~~~ This module contains the Bromelia proxy unittests. - + :copyright: (c) 2021 Henrique Marques Ribeiro. :license: MIT, see LICENSE for more details. """ @@ -35,6 +35,7 @@ class TestDiameterBaseProxy(unittest.TestCase): def setUp(self): config = { "MODE": "CLIENT", + "TRANSPORT_TYPE": "TCP", "APPLICATIONS": [], "LOCAL_NODE_HOSTNAME": "client.network", "LOCAL_NODE_REALM": "network", @@ -47,7 +48,7 @@ def setUp(self): "WATCHDOG_TIMEOUT": 30 } self.connection = _convert_config_to_connection_obj(config) - + def test__load_cea(self): cea = DiameterBaseProxy.load_cea(self.connection) @@ -389,7 +390,7 @@ def test__diameter_base_proxy__get_default_messages(self): self.assertTrue(isinstance(base.dpr, DisconnectPeerRequest)) ##: Check Capabilities-Exchange-Answer - + #: Check Diameter Header self.assertEqual(base.cea.header.version, DIAMETER_VERSION) self.assertEqual(base.cea.header.flags, FLAG_RESPONSE) @@ -478,7 +479,7 @@ def test__diameter_base_proxy__get_default_messages(self): ##: Check Capabilities-Exchange-Request - + #: Check Diameter Header self.assertEqual(base.cer.header.version, DIAMETER_VERSION) self.assertEqual(base.cer.header.flags, FLAG_REQUEST) @@ -728,7 +729,7 @@ def test__diameter_base_proxy__get_custom_messages__no_list(self): self.assertTrue(isinstance(base.dpr, DisconnectPeerRequest)) ##: Check Capabilities-Exchange-Answer - + #: Check Diameter Header self.assertEqual(base.cea.header.version, DIAMETER_VERSION) self.assertEqual(base.cea.header.flags, FLAG_RESPONSE) @@ -817,7 +818,7 @@ def test__diameter_base_proxy__get_custom_messages__no_list(self): ##: Check Capabilities-Exchange-Request - + #: Check Diameter Header self.assertEqual(base.cer.header.version, DIAMETER_VERSION) self.assertEqual(base.cer.header.flags, FLAG_REQUEST) @@ -1073,7 +1074,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_cea(self): self.assertTrue(isinstance(base.dpr, DisconnectPeerRequest)) ##: Check Capabilities-Exchange-Answer - + #: Check Diameter Header self.assertEqual(base.cea.header.version, DIAMETER_VERSION) self.assertEqual(base.cea.header.flags, FLAG_RESPONSE) @@ -1195,7 +1196,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_cea(self): ##: Check Capabilities-Exchange-Request - + #: Check Diameter Header self.assertEqual(base.cer.header.version, DIAMETER_VERSION) self.assertEqual(base.cer.header.flags, FLAG_REQUEST) @@ -1451,7 +1452,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_cer(self): self.assertTrue(isinstance(base.dpr, DisconnectPeerRequest)) ##: Check Capabilities-Exchange-Answer - + #: Check Diameter Header self.assertEqual(base.cea.header.version, DIAMETER_VERSION) self.assertEqual(base.cea.header.flags, FLAG_RESPONSE) @@ -1540,7 +1541,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_cer(self): ##: Check Capabilities-Exchange-Request - + #: Check Diameter Header self.assertEqual(base.cer.header.version, DIAMETER_VERSION) self.assertEqual(base.cer.header.flags, FLAG_REQUEST) @@ -1830,7 +1831,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_dwa(self): self.assertTrue(isinstance(base.dpr, DisconnectPeerRequest)) ##: Check Capabilities-Exchange-Answer - + #: Check Diameter Header self.assertEqual(base.cea.header.version, DIAMETER_VERSION) self.assertEqual(base.cea.header.flags, FLAG_RESPONSE) @@ -1919,7 +1920,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_dwa(self): ##: Check Capabilities-Exchange-Request - + #: Check Diameter Header self.assertEqual(base.cer.header.version, DIAMETER_VERSION) self.assertEqual(base.cer.header.flags, FLAG_REQUEST) @@ -2209,7 +2210,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_dwr(self): self.assertTrue(isinstance(base.dpr, DisconnectPeerRequest)) ##: Check Capabilities-Exchange-Answer - + #: Check Diameter Header self.assertEqual(base.cea.header.version, DIAMETER_VERSION) self.assertEqual(base.cea.header.flags, FLAG_RESPONSE) @@ -2298,7 +2299,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_dwr(self): ##: Check Capabilities-Exchange-Request - + #: Check Diameter Header self.assertEqual(base.cer.header.version, DIAMETER_VERSION) self.assertEqual(base.cer.header.flags, FLAG_REQUEST) @@ -2588,7 +2589,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_dpa(self): self.assertTrue(isinstance(base.dpr, DisconnectPeerRequest)) ##: Check Capabilities-Exchange-Answer - + #: Check Diameter Header self.assertEqual(base.cea.header.version, DIAMETER_VERSION) self.assertEqual(base.cea.header.flags, FLAG_RESPONSE) @@ -2677,7 +2678,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_dpa(self): ##: Check Capabilities-Exchange-Request - + #: Check Diameter Header self.assertEqual(base.cer.header.version, DIAMETER_VERSION) self.assertEqual(base.cer.header.flags, FLAG_REQUEST) @@ -2967,7 +2968,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_dpr(self): self.assertTrue(isinstance(base.dpr, DisconnectPeerRequest)) ##: Check Capabilities-Exchange-Answer - + #: Check Diameter Header self.assertEqual(base.cea.header.version, DIAMETER_VERSION) self.assertEqual(base.cea.header.flags, FLAG_RESPONSE) @@ -3056,7 +3057,7 @@ def test__diameter_base_proxy__get_custom_messages__custom_dpr(self): ##: Check Capabilities-Exchange-Request - + #: Check Diameter Header self.assertEqual(base.cer.header.version, DIAMETER_VERSION) self.assertEqual(base.cer.header.flags, FLAG_REQUEST) @@ -3331,8 +3332,9 @@ class TestDiameterBaseProxyWithOneApplicationId(unittest.TestCase): def setUp(self): config = { "MODE": "CLIENT", + "TRANSPORT_TYPE": "TCP", "APPLICATIONS": [{ - "vendor_id": VENDOR_ID_3GPP, + "vendor_id": VENDOR_ID_3GPP, "app_id": DIAMETER_APPLICATION_Cx }], "LOCAL_NODE_HOSTNAME": "client.network", @@ -3588,9 +3590,10 @@ class TestDiameterBaseProxyWithTwoApplicationsId(unittest.TestCase): def setUp(self): config = { "MODE": "CLIENT", + "TRANSPORT_TYPE": "TCP", "APPLICATIONS": [ { - "vendor_id": VENDOR_ID_3GPP, + "vendor_id": VENDOR_ID_3GPP, "app_id": DIAMETER_APPLICATION_Cx }, { @@ -3913,4 +3916,4 @@ def test__load_cer(self): if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main()