Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bind source ip #23

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions bromelia/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
"""
bromelia.setup
~~~~~~~~~~~~~~

This module implements the central Diameter application object. It works
as per Peer State Machine defined in RFC 6733 in order to establish a
as per Peer State Machine defined in RFC 6733 in order to establish a
Diameter association with another Peer Node.

:copyright: (c) 2020-present Henrique Marques Ribeiro.
Expand All @@ -16,7 +16,7 @@
import logging
import platform
import queue
import socket
import socket
import sys
import threading
import time
Expand Down Expand Up @@ -101,7 +101,7 @@ def __init__(self, connection: Connection, base: BaseMessages) -> None:
self._recv_messages = queue.Queue()
self._send_messages = queue.Queue()

self.postprocess_recv_messages = queue.Queue()
self.postprocess_recv_messages = queue.Queue()
self.postprocess_recv_messages_ready = threading.Event()
self.postprocess_recv_messages_lock = threading.Lock()
self.lock = threading.Lock()
Expand All @@ -110,7 +110,7 @@ def __init__(self, connection: Connection, base: BaseMessages) -> None:
def is_connected(self) -> bool:
if self.transport:
return self.transport.is_connected

return False


Expand All @@ -125,21 +125,25 @@ def start(self) -> None:

if self.connection.mode == DIAMETER_AGENT_CLIENT_MODE:
if self.connection.transport_type == DIAMETER_AGENT_TRANSPORT_TYPE_TCP:
self.transport = TcpClient(self.connection.peer_node.ip_address,
self.transport = TcpClient(self.connection.local_node.ip_address,
self.connection.local_node.port or 0, # will mean that any free port will be used for the client
self.connection.peer_node.ip_address,
self.connection.peer_node.port)
elif self.connection.transport_type == DIAMETER_AGENT_TRANSPORT_TYPE_SCTP:
self.transport = SctpClient(self.connection.peer_node.ip_address,
self.connection.peer_node.port)
self.transport = SctpClient(self.connection.local_node.ip_address,
self.connection.local_node.port or 0, # will mean that any free port will be used for the client
self.connection.peer_node.ip_address,
self.connection.peer_node.port)
else:
raise DiameterAssociationError("Invalid Diameter Agent transport type.")

elif self.connection.mode == DIAMETER_AGENT_SERVER_MODE:
if self.connection.transport_type == DIAMETER_AGENT_TRANSPORT_TYPE_TCP:
self.transport = TcpServer(self.connection.local_node.ip_address,
self.connection.local_node.port)
self.connection.local_node.port)
elif self.connection.transport_type == DIAMETER_AGENT_TRANSPORT_TYPE_SCTP:
self.transport = SctpServer(self.connection.local_node.ip_address,
self.connection.local_node.port)
self.connection.local_node.port)
else:
raise DiameterAssociationError("Invalid Diameter Agent transport type.")

Expand Down Expand Up @@ -183,7 +187,7 @@ def recv_message_from_queue(self) -> None:
for msg in msgs:
make_logging(msg, disable_else=True)
self._recv_messages.put(msg)

diameter_conn_logger.debug(f"Found {len(msgs)} Diameter "\
f"Message(s).")
except AVPParsingError:
Expand Down Expand Up @@ -228,7 +232,7 @@ def put_message_into_send_queue(self, msg: Type[DiameterMessage]) -> None:
diameter_conn_logger.debug(f"[{hop_by_hop.hex()}] Diameter "\
f"Message (Answer) have been put "\
f"into _send_messages Queue.")

self.lock.release()


Expand Down Expand Up @@ -259,7 +263,7 @@ def send_message_from_queue(self) -> None:
diameter_conn_logger.debug(f"[{msg.header.hop_by_hop.hex()}] "\
f"Diameter Request have been "\
f"put into Pending Request Queue.")

stream += msg.dump()

if self.transport:
Expand All @@ -279,9 +283,9 @@ def send_message_from_queue(self) -> None:
diameter_conn_logger.debug("Transport Layer is not in "\
"WRITE mode again, so we "\
"can send data stream.")

self.transport._set_selector_events_mask("rw", stream)

# maybe include a verification here before the "break" if a given message has been sent from transport layer.
break

Expand Down Expand Up @@ -317,7 +321,7 @@ def get_message(self) -> Type[DiameterMessage]:
else:
diameter_conn_logger.debug("No need to wait for go ahead for "\
"postprocess_recv_messages_ready")

return self.get_postprocess_recv_message()


Expand All @@ -328,7 +332,7 @@ def tracking_events(self) -> None:
diameter_conn_logger.debug("Generating a DWR message.")

self.transport.tracking_events_count = 0

except AttributeError as e:
if e.args[0] == "'TcpServer' object has no attribute 'events'":
pass
Expand Down Expand Up @@ -358,9 +362,9 @@ def __init__(self,
debug: bool = False,
is_logging: bool = False,
app_name: str = None) -> None:

self.logging = DiameterLogging(debug, is_logging, app_name)

self.config = self.make_config(config)
self._connection = _convert_config_to_connection_obj(self.config)
self._base = self.get_base_messages()
Expand Down Expand Up @@ -418,7 +422,7 @@ def reset(self) -> None:

def close(self) -> None:
current_state = self.get_current_state()

if current_state == CLOSED:
raise DiameterApplicationError("Cannot stop the application. "\
"Peer State Machine is already "\
Expand Down Expand Up @@ -477,7 +481,7 @@ def send_message(self, msg: Type[DiameterMessage], avoid: bool = True) -> Any:
if is_base_answer(msg):
raise DiameterApplicationError("Cannot send a Base protocol "\
"answer")

self._association.put_message_into_send_queue(msg)


Expand Down Expand Up @@ -510,7 +514,7 @@ def context(self) -> None:

if self.is_open():
break

if self.is_closed() and (stop - start).seconds >= 5:
start = datetime.datetime.utcnow()
self.start()
Expand Down
Loading