From b4e4708c8545188191a3a6145bc25c411cd06fd6 Mon Sep 17 00:00:00 2001 From: Dina Samatova Date: Tue, 25 Feb 2025 10:47:11 +0100 Subject: [PATCH 1/2] Refactor & Test: Enhance JWT expiration handling, resolve algorithm naming conflict, and add Twilio Video Room test - Improved JWT expiration logic: - Replaced time.time() with datetime.datetime.utcnow() to ensure more accurate expiration handling. - Applied datetime.timedelta(seconds=...) for better timestamp calculations. - Ensured compatibility with jwt.encode() by converting expiration times using .timestamp(). - Resolved naming issue between ALGORITHM and algorithm: - Renamed the algorithm parameter to jwt_algorithm to avoid conflicts with the ALGORITHM constant. - Updated all references to maintain consistency throughout the class. - Added unit test for Twilio Video Room creation: - Integrated unittest framework for structured testing. - Used responses library to mock Twilio API calls. - Implemented a test case that validates room creation by checking sid, unique_name, and status. These updates improve JWT handling, fix potential naming conflicts, and introduce automated testing for Twilio API interactions. --- tests/unit/rest/test__video_rooms.py | 37 ++++++++++++++++++++++++++++ twilio/jwt/__init__.py | 33 +++++++++++-------------- 2 files changed, 52 insertions(+), 18 deletions(-) create mode 100644 tests/unit/rest/test__video_rooms.py diff --git a/tests/unit/rest/test__video_rooms.py b/tests/unit/rest/test__video_rooms.py new file mode 100644 index 000000000..360927542 --- /dev/null +++ b/tests/unit/rest/test__video_rooms.py @@ -0,0 +1,37 @@ +import unittest +import responses +from twilio.rest import Client + + +class TestVideoRooms(unittest.TestCase): + @responses.activate + def test_create_video_room(self): + # Configuración de credenciales de prueba + account_sid = "ACXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + auth_token = "your_auth_token" + client = Client(account_sid, auth_token) + + # Simulación de la respuesta del endpoint de creación de salas de video + responses.add( + responses.POST, + "https://video.twilio.com/v1/Rooms", + json={ + "sid": "RMXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", + "unique_name": "TestRoom", + "status": "in-progress", + }, + status=201, + content_type="application/json", + ) + + # Llamada al método para crear la sala + room = client.video.rooms.create(unique_name="TestRoom", type="group") + + # Verificaciones de la respuesta + self.assertEqual(room.sid, "RMXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX") + self.assertEqual(room.unique_name, "TestRoom") + self.assertEqual(room.status, "in-progress") + + +if __name__ == "__main__": + unittest.main() diff --git a/twilio/jwt/__init__.py b/twilio/jwt/__init__.py index 7a51ea70d..815d9af9e 100644 --- a/twilio/jwt/__init__.py +++ b/twilio/jwt/__init__.py @@ -1,5 +1,5 @@ import jwt as jwt_lib -import time +import datetime __all__ = ["Jwt", "JwtDecodeError"] @@ -20,25 +20,18 @@ def __init__( secret_key, issuer, subject=None, - algorithm=None, + jwt_algorithm=None, nbf=GENERATE, ttl=3600, valid_until=None, ): self.secret_key = secret_key - """:type str: The secret used to encode the JWT""" self.issuer = issuer - """:type str: The issuer of this JWT""" self.subject = subject - """:type str: The subject of this JWT, omitted from payload by default""" - self.algorithm = algorithm or self.ALGORITHM - """:type str: The algorithm used to encode the JWT, defaults to 'HS256'""" + self.jwt_algorithm = jwt_algorithm or self.ALGORITHM self.nbf = nbf - """:type int: Time in secs since epoch before which this JWT is invalid. Defaults to now.""" self.ttl = ttl - """:type int: Time to live of the JWT in seconds, defaults to 1 hour""" self.valid_until = valid_until - """:type int: Time in secs since epoch this JWT is valid for. Overrides ttl if provided.""" self.__decoded_payload = None self.__decoded_headers = None @@ -55,14 +48,14 @@ def _generate_headers(self): def _from_jwt(cls, headers, payload, key=None): """ Class specific implementation of from_jwt which should take jwt components and return - and instance of this Class with jwt information loaded. - :return: Jwt object containing the headers, payload and key + an instance of this Class with jwt information loaded. + :return: Jwt object containing the headers, payload, and key """ jwt = Jwt( secret_key=key, issuer=payload.get("iss", None), subject=payload.get("sub", None), - algorithm=headers.get("alg", None), + jwt_algorithm=headers.get("alg", None), valid_until=payload.get("exp", None), nbf=payload.get("nbf", None), ) @@ -77,10 +70,12 @@ def payload(self): payload = self._generate_payload().copy() payload["iss"] = self.issuer - payload["exp"] = int(time.time()) + self.ttl + payload["exp"] = ( + datetime.datetime.utcnow() + datetime.timedelta(seconds=self.ttl) + ).timestamp() if self.nbf is not None: if self.nbf == self.GENERATE: - payload["nbf"] = int(time.time()) + payload["nbf"] = datetime.datetime.utcnow().timestamp() else: payload["nbf"] = self.nbf if self.valid_until: @@ -97,7 +92,7 @@ def headers(self): headers = self._generate_headers().copy() headers["typ"] = "JWT" - headers["alg"] = self.algorithm + headers["alg"] = self.jwt_algorithm return headers def to_jwt(self, ttl=None): @@ -114,10 +109,12 @@ def to_jwt(self, ttl=None): payload = self.payload.copy() if ttl: - payload["exp"] = int(time.time()) + ttl + payload["exp"] = ( + datetime.datetime.utcnow() + datetime.timedelta(seconds=ttl) + ).timestamp() return jwt_lib.encode( - payload, self.secret_key, algorithm=self.algorithm, headers=headers + payload, self.secret_key, algorithm=self.jwt_algorithm, headers=headers ) @classmethod From cc852fa7fa7f1d315e5d53ecc8fb7251c488742f Mon Sep 17 00:00:00 2001 From: Dina Samatova Date: Tue, 25 Feb 2025 17:11:43 +0100 Subject: [PATCH 2/2] Enhanced 'time' part to not use outdated code. --- twilio/jwt/__init__.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/twilio/jwt/__init__.py b/twilio/jwt/__init__.py index 815d9af9e..77e6d8bd5 100644 --- a/twilio/jwt/__init__.py +++ b/twilio/jwt/__init__.py @@ -20,7 +20,7 @@ def __init__( secret_key, issuer, subject=None, - jwt_algorithm=None, + jwt_algorithm=None, # Renamed from `algorithm` to `jwt_algorithm` for clarity nbf=GENERATE, ttl=3600, valid_until=None, @@ -28,7 +28,7 @@ def __init__( self.secret_key = secret_key self.issuer = issuer self.subject = subject - self.jwt_algorithm = jwt_algorithm or self.ALGORITHM + self.jwt_algorithm = jwt_algorithm or self.ALGORITHM # Updated variable name self.nbf = nbf self.ttl = ttl self.valid_until = valid_until @@ -55,7 +55,7 @@ def _from_jwt(cls, headers, payload, key=None): secret_key=key, issuer=payload.get("iss", None), subject=payload.get("sub", None), - jwt_algorithm=headers.get("alg", None), + jwt_algorithm=headers.get("alg", None), # Updated variable name valid_until=payload.get("exp", None), nbf=payload.get("nbf", None), ) @@ -70,14 +70,24 @@ def payload(self): payload = self._generate_payload().copy() payload["iss"] = self.issuer + + # Changed from `int(time.time()) + self.ttl` to `datetime.now(timezone.utc) + timedelta(seconds=self.ttl)` + # This ensures that the timestamp is timezone-aware and prevents potential issues with time handling. payload["exp"] = ( - datetime.datetime.utcnow() + datetime.timedelta(seconds=self.ttl) + datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(seconds=self.ttl) ).timestamp() + if self.nbf is not None: if self.nbf == self.GENERATE: - payload["nbf"] = datetime.datetime.utcnow().timestamp() + # Replaced `int(time.time())` with `datetime.now(timezone.utc).timestamp()` + # This ensures the `nbf` value is also timezone-aware. + payload["nbf"] = datetime.datetime.now( + datetime.timezone.utc + ).timestamp() else: payload["nbf"] = self.nbf + if self.valid_until: payload["exp"] = self.valid_until if self.subject: @@ -92,7 +102,7 @@ def headers(self): headers = self._generate_headers().copy() headers["typ"] = "JWT" - headers["alg"] = self.jwt_algorithm + headers["alg"] = self.jwt_algorithm # Updated variable name return headers def to_jwt(self, ttl=None): @@ -106,11 +116,14 @@ def to_jwt(self, ttl=None): raise ValueError("JWT does not have a signing key configured.") headers = self.headers.copy() - payload = self.payload.copy() + if ttl: + # Replaced `int(time.time()) + ttl` with `datetime.now(timezone.utc) + timedelta(seconds=ttl)` + # Ensures consistency across all timestamp calculations. payload["exp"] = ( - datetime.datetime.utcnow() + datetime.timedelta(seconds=ttl) + datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(seconds=ttl) ).timestamp() return jwt_lib.encode( @@ -144,7 +157,7 @@ def from_jwt(cls, jwt, key=""): key, algorithms=[cls.ALGORITHM], options={ - "verify_signature": verify, + "verify_signature": verify, # Ensured signature verification if a key is provided "verify_exp": True, "verify_nbf": True, },