diff --git a/Tribler/community/anontunnel/ConnectionHandlers/Socks5Connection.py b/Tribler/community/anontunnel/ConnectionHandlers/Socks5Connection.py index 2c0016cf1f1..9c16f5121d7 100644 --- a/Tribler/community/anontunnel/ConnectionHandlers/Socks5Connection.py +++ b/Tribler/community/anontunnel/ConnectionHandlers/Socks5Connection.py @@ -138,7 +138,7 @@ def _try_request(self): # We use same IP as the single socket, but the port number comes from the newly created UDP listening socket ip, port = self.single_socket.get_myip(), socket.getsockname()[1] - logger.error("Accepting UDP ASSOCIATE request, direct client to %s:%d", ip, port) + logger.info("Accepting UDP ASSOCIATE request, direct client to %s:%d", ip, port) response = structs.encode_reply(0x05, 0x00, 0x00, structs.ADDRESS_TYPE_IPV4, ip, port) self.write(response) diff --git a/Tribler/community/anontunnel/Socks5Server.py b/Tribler/community/anontunnel/Socks5Server.py index 5152ff68355..6c253a79375 100644 --- a/Tribler/community/anontunnel/Socks5Server.py +++ b/Tribler/community/anontunnel/Socks5Server.py @@ -20,7 +20,7 @@ class Socks5Server(object, TunnelObserver): - + def __init__(self): self._tunnel = None self._accept_incoming = False @@ -34,7 +34,7 @@ def __init__(self): self.routes = {} self.udp_relays = {} - + @property def accept_incoming(self): return self._accept_incoming @@ -42,10 +42,10 @@ def accept_incoming(self): @accept_incoming.setter def accept_incoming(self, value): if value and not self._accept_incoming: - logger.error("Accepting SOCKS5 connections now!") + logger.info("Accepting SOCKS5 connections now!") if not value: - logger.error("DISCONNECTING SOCKS5 !") + logger.info("DISCONNECTING SOCKS5 !") for key in self.socket2connection.keys(): self.socket2connection[key].close() @@ -89,7 +89,7 @@ def create_udp_relay(self): :rtype : socket.socket """ - udp_relay_socket = self.raw_server.create_udpsocket(0,"0.0.0.0") + udp_relay_socket = self.raw_server.create_udpsocket(0, "0.0.0.0") handler = UdpRelayTunnelHandler(udp_relay_socket, self) self.raw_server.start_listening_udp(udp_relay_socket, handler) @@ -101,9 +101,8 @@ def on_client_udp_packets(self, socket, packets): self.udp_relays[source_address] = socket - if __debug__: - logger.info("Relaying UDP packets from %s:%d to %s:%d", source_address[0], source_address[1], - request.destination_address, request.destination_port) + logger.debug("Relaying UDP packets from %s:%d to %s:%d", source_address[0], source_address[1], + request.destination_address, request.destination_port) self.routes[(request.destination_address, request.destination_port)] = source_address self.tunnel.send_data( @@ -140,9 +139,8 @@ def on_tunnel_data(self, community, source_address, data): if socks5_socket.sendto(encapsulated, destination_address) < len(encapsulated): logger.error("Not sending package!") - if __debug__: - logger.info("Returning UDP packets from %s to %s using proxy port %d", source_address, destination_address, - socks5_socket.getsockname()[1]) + logger.info("Returning UDP packets from %s to %s using proxy port %d", source_address, destination_address, + socks5_socket.getsockname()[1]) def external_connection_made(self, s): if not self.accept_incoming: @@ -192,4 +190,4 @@ def shutdown(self): tcp_connection.shutdown() def on_state_change(self, community, state): - self.accept_incoming = state \ No newline at end of file + self.accept_incoming = state diff --git a/Tribler/community/anontunnel/community.py b/Tribler/community/anontunnel/community.py index 2041c8f5b98..1fb92c2fd64 100755 --- a/Tribler/community/anontunnel/community.py +++ b/Tribler/community/anontunnel/community.py @@ -2,7 +2,7 @@ import socket from threading import RLock from Tribler.community.anontunnel.ConnectionHandlers.CircuitReturnHandler import ShortCircuitReturnHandler, CircuitReturnHandler - +from Tribler.dispersy.requestcache import NumberCache logger = logging.getLogger(__name__) from Tribler.community.anontunnel.globals import * @@ -11,16 +11,16 @@ from Tribler.community.anontunnel.CircuitLengthStrategies import ConstantCircuitLengthStrategy from Tribler.community.anontunnel.SelectionStrategies import RandomSelectionStrategy from traceback import print_exc -from Tribler.dispersy.dispersy import IntroductionRequestCache from time import time -from Tribler.community.anontunnel.conversion import ProxyConversion,\ +from Tribler.community.anontunnel.conversion import ProxyConversion, \ CustomProxyConversion -from Tribler.community.anontunnel.payload import StatsPayload, CreatedMessage,\ + +from Tribler.community.anontunnel.payload import StatsPayload, CreatedMessage, \ PongMessage, CreateMessage, ExtendedWithMessage, PingMessage, DataMessage -from Tribler.dispersy.candidate import BootstrapCandidate, WalkCandidate,\ +from Tribler.dispersy.candidate import BootstrapCandidate, WalkCandidate, \ Candidate from Tribler.dispersy.authentication import MemberAuthentication from Tribler.dispersy.community import Community @@ -30,16 +30,14 @@ from Tribler.dispersy.message import Message from Tribler.dispersy.resolution import PublicResolution - class ProxySettings: def __init__(self): - length = 1#randint(1, 4) + length = 1 # randint(1, 4) self.extend_strategy = ExtendStrategies.TrustThyNeighbour self.select_strategy = RandomSelectionStrategy(1) self.length_strategy = ConstantCircuitLengthStrategy(length) - class TunnelObserver(): def on_state_change(self, community, state): pass @@ -52,7 +50,7 @@ def on_tunnel_stats(self, community, candidate, stats): class Circuit: """ Circuit data structure storing the id, status, first hop and all hops """ - + def __init__(self, community, circuit_id, goal_hops=0, candidate=None): """ Instantiate a new Circuit data structure @@ -80,7 +78,7 @@ def __init__(self, community, circuit_id, goal_hops=0, candidate=None): self.speed_up = 0.0 self.speed_down = 0.0 self.last_incomming = time() - + @property def bytes_downloaded(self): return self.bytes_down[1] @@ -92,26 +90,26 @@ def bytes_uploaded(self): @property def online(self): return self.goal_hops == len(self.hops) - + @property def state(self): if self.hops == None: return CIRCUIT_STATE_BROKEN - + if len(self.hops) < self.goal_hops: return CIRCUIT_STATE_EXTENDING else: return CIRCUIT_STATE_READY - + @property def ping_time_remaining(self): too_old = time() - CANDIDATE_WALK_LIFETIME - 5.0 diff = self.last_incomming - too_old return diff if diff > 0 else 0 - + def __contains__(self, other): if isinstance(other, Candidate): - #TODO: should compare to a list here + # TODO: should compare to a list here return other == self.candidate @@ -130,7 +128,7 @@ def __init__(self, circuit_id, candidate): self.online = False self.last_incomming = time() - + @property def ping_time_remaining(self): too_old = time() - CANDIDATE_WALK_LIFETIME - 5.0 @@ -187,22 +185,22 @@ def __init__(self, dispersy, master_member, raw_server, settings=None, integrate self.lock = RLock() # Custom conversion - self.prefix = 'f' * 22 + 'e' #shouldn't this be "fffffffe".decode("HEX")? + self.prefix = 'f' * 22 + 'e' # shouldn't this be "fffffffe".decode("HEX")? self.proxy_conversion = CustomProxyConversion(self.prefix) - self.on_custom = {MESSAGE_BREAK: self.on_break, MESSAGE_CREATE: self.on_create, + self.on_custom = {MESSAGE_CREATE: self.on_create, MESSAGE_CREATED: self.on_created, MESSAGE_DATA: self.on_data, MESSAGE_EXTEND: self.on_extend, MESSAGE_EXTENDED: self.on_extended, MESSAGE_PING: self.on_ping, MESSAGE_PONG: self.on_pong, MESSAGE_PUNCTURE: self.on_puncture} self.__observers = [] ''' :type : list of TunnelObserver''' - + # Replace endpoint dispersy.endpoint.bypass_prefix = self.prefix dispersy.endpoint.bypass_community = self - + self.circuits = {} self.relay_from_to = {} - + # Stats self.stats = { 'bytes_enter': 0, @@ -211,7 +209,7 @@ def __init__(self, dispersy, master_member, raw_server, settings=None, integrate 'dropped_exit': 0, 'packet_size': 0 } - + self.circuit_length_strategy = settings.length_strategy self.circuit_selection_strategy = settings.select_strategy self.extend_strategy = settings.extend_strategy @@ -221,16 +219,11 @@ def __init__(self, dispersy, master_member, raw_server, settings=None, integrate self._exit_sockets = {} self.raw_server = raw_server - - # Heartbeat hashmap Candidate -> last heart beat timestamp, assume we never heard any - self.member_heartbeat = {} - self.member_ping = {} - self._online = False dispersy._callback.register(self.check_ready) dispersy._callback.register(self.ping_circuits) - + if integrate_with_tribler: from Tribler.Core.CacheDB.Notifier import Notifier self.notifier = Notifier.getInstance() @@ -258,10 +251,10 @@ def initiate_meta_messages(self): , self._dispersy._generic_timeline_check , self.on_stats )] - + def _initialize_meta_messages(self): super(ProxyCommunity, self)._initialize_meta_messages() - + self._original_on_introduction_request = None self._original_on_introduction_response = None @@ -278,10 +271,10 @@ def _initialize_meta_messages(self): self._meta_messages[meta.name] = Message(meta.community, meta.name, meta.authentication, meta.resolution, meta.distribution, meta.destination, meta.payload, meta.check_callback, self.on_introduction_response, meta.undo_callback, meta.batch) - + assert self._original_on_introduction_request assert self._original_on_introduction_response - + def on_introduction_request(self, messages): try: return self._original_on_introduction_request(messages) @@ -295,12 +288,12 @@ def on_introduction_response(self, messages): finally: for message in messages: self.on_member_heartbeat(message.candidate) - + def on_stats(self, messages): for message in messages: for o in self.__observers: o.on_tunnel_stats(self, message.candidate, message.payload.stats) - + def send_stats(self, stats): meta = self.get_meta_message(u"stats") record = meta.impl(authentication=(self._my_member,), @@ -308,77 +301,67 @@ def send_stats(self, stats): payload=(stats,)) self.dispersy.store_update_forward([record], True, False, True) - - ### END OF DISPERSY DEFINED MESSAGES - ### START OF CUSTOM MESSAGES + + # END OF DISPERSY DEFINED MESSAGES + # START OF CUSTOM MESSAGES def on_bypass_message(self, sock_addr, packet): dispersy = self._dispersy - - #TODO: we should attempt to get the candidate from the member_heartbeat dict - #get_candidate has a garbage collector :P + + # TODO: we should attempt to get the candidate from the member_heartbeat dict + # get_candidate has a garbage collector :P candidate = self.get_candidate(sock_addr) or Candidate(sock_addr, False) circuit_id, data = self.proxy_conversion.get_circuit_and_data(packet) relay_key = (candidate, circuit_id) packet_type = self.proxy_conversion.get_type(data) - + logger.debug("GOT %s from %s:%d over circuit %d", MESSAGE_STRING_REPRESENTATION[packet_type], candidate.sock_addr[0], candidate.sock_addr[1], circuit_id) - - # Next, relay packet if we know whom to forward message to for this circuit + + # First, relay packet if we know whom to forward message to for this circuit if circuit_id > 0 and relay_key in self.relay_from_to and self.relay_from_to[relay_key].online: next_relay = self.relay_from_to[relay_key] new_packet = self.prefix + self.proxy_conversion.add_circuit(data, next_relay.circuit_id) next_relay.bytes[1] += len(new_packet) - + + this_relay_key = (next_relay.candidate, next_relay.circuit_id) + if this_relay_key in self.relay_from_to: + this_relay = self.relay_from_to[this_relay_key] + this_relay.last_incomming = time() + this_relay.bytes[0] += len(packet) + self.send_packet(next_relay.candidate, circuit_id, packet_type, new_packet, relayed=True) - - #hmm, are we able to read this message type if we're relaying - #moreover, shouldn't we be calling self.break_circuit - if packet_type == MESSAGE_BREAK: - # Route is dead :( - logger.debug('routing is dead') - del self.relay_from_to[relay_key] - self.dict_inc(dispersy.statistics.success, MESSAGE_STRING_REPRESENTATION[packet_type] + '-relayed') - + # We don't know where to relay this message to, must be for me? else: - type, payload = self.proxy_conversion.decode(data) - # TODO: I think this can be removed if we remove the initial additional constraint + _, payload = self.proxy_conversion.decode(data) if circuit_id in self.circuits: self.circuits[circuit_id].last_incomming = time() - - if not self.on_custom[type](circuit_id, candidate, payload): + + if not self.on_custom.get(packet_type, lambda *args:None)(circuit_id, candidate, payload): self.dict_inc(dispersy.statistics.success, MESSAGE_STRING_REPRESENTATION[packet_type] + '-ignored') logger.debug("Prev message was IGNORED") else: self.dict_inc(dispersy.statistics.success, MESSAGE_STRING_REPRESENTATION[packet_type]) - - def on_break(self, circuit_id, candidate, message): - self._break_circuit(circuit_id) - - def _break_circuit(self, circuit_id, additional_info = ''): - assert isinstance(circuit_id, (long, int)), type(circuit_id) - - if circuit_id in self.circuits: - logger.info("Breaking circuit %d " +additional_info, circuit_id) - # Delete from data structures - if self.circuits[circuit_id].extend_strategy: - self.circuits[circuit_id].extend_strategy.stop() - del self.circuits[circuit_id] + class CircuitRequestCache(NumberCache): + + @staticmethod + def create_number(force_number= -1): + return force_number if force_number >= 0 else NumberCache.create_number() - return True - return False - - class CircuitRequestCache(IntroductionRequestCache): @staticmethod - def create_identifier(number): + def create_identifier(number, force_number= -1): assert isinstance(number, (int, long)), type(number) return u"request-cache:circuit-request:%d" % (number,) - - def __init__(self, community): - IntroductionRequestCache.__init__(self, community, None) + + def __init__(self, community, force_number): + NumberCache.__init__(self, community._request_cache, force_number) + self.community = community + + @property + def timeout_delay(self): + return 5.0 @property def cleanup_delay(self): @@ -389,111 +372,138 @@ def on_created(self): self.circuit.extend_strategy.extend() elif self.circuit.state == CIRCUIT_STATE_READY: self.on_success() - + if self.community.notifier: from Tribler.Core.simpledefs import NTFY_ANONTUNNEL, NTFY_CREATED self.community.notifier.notify(NTFY_ANONTUNNEL, NTFY_CREATED, self.circuit) - + def on_extended(self, extended_message): self.circuit.hops.append(extended_message.extended_with) - + if self.circuit.state == CIRCUIT_STATE_EXTENDING: self.circuit.extend_strategy.extend() - + elif self.circuit.state == CIRCUIT_STATE_READY: self.on_success() - + if self.community.notifier: from Tribler.Core.simpledefs import NTFY_ANONTUNNEL, NTFY_EXTENDED self.community.notifier.notify(NTFY_ANONTUNNEL, NTFY_EXTENDED, self.circuit) - + def on_success(self): if self.circuit.state == CIRCUIT_STATE_READY: - logger.debug("Circuit %d is ready", self.number) - self.community._dispersy._callback.register(self.community._request_cache.pop, args = (self.identifier,)) + logger.info("Circuit %d is ready", self.number) + self.community._dispersy._callback.register(self.community._request_cache.pop, args=(self.identifier,)) def on_timeout(self): if not self.circuit.state == CIRCUIT_STATE_READY: - self.community._break_circuit(self.number, 'timeout on CircuitRequestCache, state = %s'%self.circuit.state) + self.community.remove_circuit(self.number, 'timeout on CircuitRequestCache, state = %s' % self.circuit.state) def create_circuit(self, first_hop_candidate, extend_strategy=None): """ Create a new circuit, with one initial hop """ - - cache = self._request_cache.add(ProxyCommunity.CircuitRequestCache(self)) + + circuit_id = self._generate_circuit_id(first_hop_candidate) + cache = self._request_cache.add(ProxyCommunity.CircuitRequestCache(self, circuit_id)) goal_hops = self.circuit_length_strategy.circuit_length() - circuit = cache.circuit = Circuit(self, cache.number, goal_hops, first_hop_candidate) + circuit = cache.circuit = Circuit(self, circuit_id, goal_hops, first_hop_candidate) circuit.extend_strategy = extend_strategy(self, circuit) if extend_strategy else self.extend_strategy(self, circuit) - self.circuits[circuit.circuit_id] = circuit - - logger.info('Circuit %d is to be created, we want %d hops sending to %s:%d', circuit.circuit_id, circuit.goal_hops, first_hop_candidate.sock_addr[0], first_hop_candidate.sock_addr[1]) - self.send_message(first_hop_candidate, circuit.circuit_id, MESSAGE_CREATE, CreateMessage()) + self.circuits[circuit_id] = circuit + + logger.info('Circuit %d is to be created, we want %d hops sending to %s:%d', circuit_id, circuit.goal_hops, first_hop_candidate.sock_addr[0], first_hop_candidate.sock_addr[1]) + self.send_message(first_hop_candidate, circuit_id, MESSAGE_CREATE, CreateMessage()) return circuit + def remove_circuit(self, circuit_id, additional_info=''): + assert isinstance(circuit_id, (long, int)), type(circuit_id) + + if circuit_id in self.circuits: + logger.info("Breaking circuit %d " + additional_info, circuit_id) + + # Delete from data structures + if self.circuits[circuit_id].extend_strategy: + self.circuits[circuit_id].extend_strategy.stop() + del self.circuits[circuit_id] + + return True + return False + + def remove_relay(self, relay_key): + if relay_key in self.relay_from_to: + logger.info("Breaking relay %s:%d %d" % (relay_key[0][0], relay_key[0][1], relay_key[1])) + + relay = self.relay_from_to[relay_key] + + # one side of the relay broke, removing both + del self.relay_from_to[(relay.candidate, relay.circuit_id)] + del self.relay_from_to[relay_key] + def on_create(self, circuit_id, candidate, message): """ Handle incoming CREATE message, acknowledge the CREATE request with a CREATED reply """ logger.info('We joined circuit %d with neighbour %s', circuit_id, candidate) - + if self.notifier: from Tribler.Core.simpledefs import NTFY_ANONTUNNEL, NTFY_JOINED self.notifier.notify(NTFY_ANONTUNNEL, NTFY_JOINED, candidate.sock_addr, circuit_id) - return self.send_message(candidate, circuit_id, MESSAGE_CREATED, CreatedMessage()) + return self.send_message(candidate, circuit_id, MESSAGE_CREATED, CreatedMessage()) def on_created(self, circuit_id, candidate, message): """ Handle incoming CREATED messages relay them backwards towards the originator if necessary """ - request = self._dispersy._callback.call(self._request_cache.get, args = (ProxyCommunity.CircuitRequestCache.create_identifier(circuit_id),)) + request = self._dispersy._callback.call(self._request_cache.get, args=(ProxyCommunity.CircuitRequestCache.create_identifier(circuit_id),)) if request: request.on_created() return True - + relay_key = (candidate, circuit_id) if relay_key in self.relay_from_to: created_to = candidate created_for = self.relay_from_to[(created_to, circuit_id)] - + # Mark link online such that no new extension attempts will be taken created_for.online = True self.relay_from_to[(created_for.candidate, created_for.circuit_id)].online = True - + self.send_message(created_for.candidate, created_for.circuit_id, MESSAGE_EXTENDED, ExtendedWithMessage(created_to.sock_addr)) - logger.info('We have created a circuit requested by (%s, %d) to (%s,%d)', - created_for.candidate, + logger.info('We have created a circuit requested by (%s:%d, %d) to (%s:%d, %d)', + created_for.candidate.sock_addr[0], + created_for.candidate.sock_addr[1], created_for.circuit_id, - created_to, + created_to.sock_addr[0], + created_to.sock_addr[1], circuit_id ) - + return True return False def on_data(self, circuit_id, candidate, message): """ Handles incoming DATA message, forwards it over the chain or over the internet if needed.""" - #TODO: what's happening here?, some magic averaging I guess + # TODO: what's happening here?, some magic averaging I guess self.stats['packet_size'] = 0.8 * self.stats['packet_size'] + 0.2 * len(message.data) if circuit_id in self.circuits \ and message.destination == ("0.0.0.0", 0) \ and candidate == self.circuits[circuit_id].candidate: - + self.circuits[circuit_id].last_incomming = time() self.circuits[circuit_id].bytes_down[1] += len(message.data) self.stats['bytes_returned'] += len(message.data) for observer in self.__observers: observer.on_tunnel_data(self, message.origin, message.data) - + return True # If it is not ours and we have nowhere to forward to then act as exit node if message.destination != ('0.0.0.0', 0): self.exit_data(circuit_id, candidate, message.destination, message.data) - + return True return False - + def on_extend(self, circuit_id, candidate, message): """ Upon reception of a EXTEND message the message is forwarded over the Circuit if possible. At the end of @@ -513,12 +523,12 @@ def on_extend(self, circuit_id, candidate, message): if not extend_with: return - + relay_key = (candidate, circuit_id) if relay_key in self.relay_from_to: current_relay = self.relay_from_to[relay_key] - assert not current_relay.online, "shouldn't be called whenever relay is online, the extend message should have been forwarded" - + assert not current_relay.online, "shouldn't be called whenever relay is online, the extend message should have been forwarded" + # We will just forget the attempt and try again, possible with another candidate old_to_key = current_relay.candidate, current_relay.circuit_id del self.relay_from_to[old_to_key] @@ -531,33 +541,63 @@ def on_extend(self, circuit_id, candidate, message): self.relay_from_to[relay_key] = RelayRoute(new_circuit_id, extend_with) return self.send_message(extend_with, new_circuit_id, MESSAGE_CREATE, CreateMessage()) - + def on_extended(self, circuit_id, candidate, message): """ A circuit has been extended, forward the acknowledgment back to the origin of the EXTEND. If we are the origin update our records. """ - request = self._dispersy._callback.call(self._request_cache.get, args = (ProxyCommunity.CircuitRequestCache.create_identifier(circuit_id),)) + request = self._dispersy._callback.call(self._request_cache.get, args=(ProxyCommunity.CircuitRequestCache.create_identifier(circuit_id),)) if request: request.on_extended(message) return True - return False - - def create_ping(self, candidates, circuit_ids): - #TODO: this needs to be upgraded to use the pingrequestcache - logger.info("pinging %d circuits/relays", len(candidates)) - for candidate, circuit_id in zip(candidates, circuit_ids): - self.send_message(candidate, circuit_id, MESSAGE_PING, PingMessage()) - + + class PingRequestCache(NumberCache): + + @staticmethod + def create_number(force_number= -1): + return force_number if force_number >= 0 else NumberCache.create_number() + + @staticmethod + def create_identifier(number, force_number= -1): + assert isinstance(number, (int, long)), type(number) + return u"request-cache:ping-request:%d" % (number,) + + def __init__(self, community, force_number): + NumberCache.__init__(self, community._request_cache, force_number) + self.community = community + + @property + def timeout_delay(self): + return 5.0 + + @property + def cleanup_delay(self): + return 0.0 + + def on_pong(self): + self.community._dispersy._callback.register(self.community._request_cache.pop, args=(self.identifier,)) + + def on_timeout(self): + self.community.remove_circuit(self.number, 'timeout on PingRequestCache') + + def create_ping(self, candidate, circuit_id): + self._dispersy._callback.register(self._request_cache.add, args=(ProxyCommunity.PingRequestCache(self, circuit_id),)) + self.send_message(candidate, circuit_id, MESSAGE_PING, PingMessage()) + def on_ping(self, circuit_id, candidate, message): if circuit_id in self.circuits: return self.send_message(candidate, circuit_id, MESSAGE_PONG, PongMessage()) return False - + def on_pong(self, circuit_id, candidate, message): - return True - + request = self._dispersy._callback.call(self._request_cache.get, args=(ProxyCommunity.PingRequestCache.create_identifier(circuit_id),)) + if request: + request.on_pong(message) + return True + return False + def on_puncture(self, circuit_id, candidate, message): return @@ -570,19 +610,19 @@ def on_puncture(self, circuit_id, candidate, message): message.sock_addr, message.sock_addr, randint(0, 2 ** 16))) return self.dispersy.endpoint.send([introduce], [puncture_message.packet]) - - - #got introduction_request or introduction_response from candidate - #not necessarily a new candidate + + + # got introduction_request or introduction_response from candidate + # not necessarily a new candidate def on_member_heartbeat(self, candidate): assert isinstance(candidate, WalkCandidate), type(candidate) if not isinstance(candidate, BootstrapCandidate): - + if len(self.circuits) < MAX_CIRCUITS_TO_CREATE and candidate not in self.circuits.values(): self.create_circuit(candidate) - + def _generate_circuit_id(self, neighbour): - #TODO: why is the circuit_id so small? The conversion is using a unsigned long. + # TODO: why is the circuit_id so small? The conversion is using a unsigned long. circuit_id = randint(1, 255) # prevent collisions @@ -590,11 +630,11 @@ def _generate_circuit_id(self, neighbour): circuit_id = randint(1, 255) return circuit_id - + def send_message(self, destination, circuit_id, message_type, message): return self.send_packet(destination, circuit_id, message_type, self.proxy_conversion.encode(circuit_id, message_type, message)) - - def send_packet(self, destination, circuit_id, message_type, packet, relayed = False): + + def send_packet(self, destination, circuit_id, message_type, packet, relayed=False): assert isinstance(destination, Candidate), type(destination) assert isinstance(packet, str), type(packet) assert packet.startswith(self.prefix) @@ -602,54 +642,52 @@ def send_packet(self, destination, circuit_id, message_type, packet, relayed = F logger.debug("SEND %s to %s:%d over circuit %d", MESSAGE_STRING_REPRESENTATION[message_type], destination.sock_addr[0], destination.sock_addr[1], circuit_id) self.dict_inc(self.dispersy.statistics.outgoing, MESSAGE_STRING_REPRESENTATION[message_type] + ('-relayed' if relayed else ''), 1) - + # we need to make sure that this endpoint is threadsafe return self.dispersy.endpoint.send([destination], [packet]) def dict_inc(self, statistics_dict, key, inc=1): - self.dispersy._callback.register(self._dispersy.statistics.dict_inc, args= (statistics_dict, u"anontunnel-" + key, inc)) - - ### CIRCUIT STUFFS + self.dispersy._callback.register(self._dispersy.statistics.dict_inc, args=(statistics_dict, u"anontunnel-" + key, inc)) + + # CIRCUIT STUFFS def get_circuits(self): return self.circuits.values() - + @property def active_circuits(self): # Circuit is active when it has received a CREATED for it and the final length and the length is 0 return [circuit for circuit in self.circuits.values() if circuit.state == CIRCUIT_STATE_READY] - + def check_ready(self): while True: try: self.circuit_selection_strategy.try_select(self.active_circuits) self.online = True - - except BaseException: + + except ValueError: self.online = False - + finally: yield 1.0 - + def ping_circuits(self): while True: try: - to_be_removed = [self._break_circuit(circuit.circuit_id, 'did not respond to ping') for circuit in self.circuits.values() if circuit.ping_time_remaining == 0] + to_be_removed = [self.remove_relay(relay_key, 'no activity') for relay_key, relay in self.relay_from_to.items() if relay.ping_time_remaining == 0] + logger.info("removed %d relays", len(to_be_removed)) assert all(to_be_removed) - - to_be_pinged = [circuit for circuit in self.circuits.values() if circuit.ping_time_remaining < PING_INTERVAL] - to_be_pinged += [relay for relay in self.relay_from_to.values() if relay.ping_time_remaining < PING_INTERVAL] - - ping_candidates = [obj.candidate for obj in to_be_pinged if obj.candidate] - ping_ids = [obj.circuit_id for obj in to_be_pinged if obj.candidate] - self.create_ping(ping_candidates, ping_ids) + + to_be_pinged = [circuit for circuit in self.circuits.values() if circuit.ping_time_remaining < PING_INTERVAL and circuit.candidate] + logger.info("pinging %d circuits", len(to_be_pinged)) + for circuit in to_be_pinged: + self.create_ping(circuit.candidate, circuit.circuit_id) except: print_exc() yield PING_INTERVAL def exit_data(self, circuit_id, return_candidate, destination, data): - if __debug__: - logger.info("EXIT DATA packet to %s", destination) + logger.debug("EXIT DATA packet to %s", destination) self.stats['bytes_exit'] += len(data) @@ -701,9 +739,7 @@ def send_data(self, payload, circuit_id=None, address=None, ultimate_destination # Make sure the '0-hop circuit' is also a candidate for selection circuit_id = self.circuit_selection_strategy.select(self.active_circuits).circuit_id self.destination_circuit[ultimate_destination] = circuit_id - logger.warning("SELECT %d for %s", circuit_id, ultimate_destination) -# self.fire("circuit_select", destination=ultimate_destination, circuit_id=circuit_id) - + logger.info("SELECT circuit %d for %s:%d", circuit_id, *ultimate_destination) # If chosen the 0-hop circuit OR if there are no other circuits act as EXIT node ourselves if circuit_id == 0: @@ -726,8 +762,7 @@ def send_data(self, payload, circuit_id=None, address=None, ultimate_destination if origin is None: self.circuits[circuit_id].bytes_up[1] += len(payload) - if __debug__: - logger.info("Sending data with origin %s to %s over circuit %d with ultimate destination %s", - origin, address, circuit_id, ultimate_destination) + logger.debug("Sending data with origin %s to %s over circuit %d with ultimate destination %s:%d", + origin, address, circuit_id, *ultimate_destination) except Exception, e: - logger.exception(e) \ No newline at end of file + logger.exception(e) diff --git a/Tribler/community/anontunnel/conversion.py b/Tribler/community/anontunnel/conversion.py index 74614ca261b..35e4fb46e78 100644 --- a/Tribler/community/anontunnel/conversion.py +++ b/Tribler/community/anontunnel/conversion.py @@ -27,19 +27,19 @@ def _decode_stats(placeholder, offset, data): return offset, placeholder.meta.payload.implement(stats) class CustomProxyConversion(): - + def __init__(self, prefix): self.prefix = prefix - + self.encode_functions = {} self.decode_functions = {} - + self.encode_functions[MESSAGE_CREATE] = lambda message: '' self.decode_functions[MESSAGE_CREATE] = lambda buffer, offset: CreateMessage() self.encode_functions[MESSAGE_CREATED] = lambda message: '' self.decode_functions[MESSAGE_CREATED] = lambda buffer, offset: CreatedMessage() - + self.encode_functions[MESSAGE_EXTEND] = self.__encode_extend self.decode_functions[MESSAGE_EXTEND] = self.__decode_extend @@ -48,30 +48,27 @@ def __init__(self, prefix): self.encode_functions[MESSAGE_DATA] = self.__encode_data self.decode_functions[MESSAGE_DATA] = self.__decode_data - - self.encode_functions[MESSAGE_BREAK] = lambda message: '' - self.decode_functions[MESSAGE_BREAK] = lambda buffer, offset: BreakMessage() - + self.encode_functions[MESSAGE_PING] = lambda message: '' self.decode_functions[MESSAGE_PING] = lambda buffer, offset: PingMessage() - + self.encode_functions[MESSAGE_PONG] = lambda message: '' self.decode_functions[MESSAGE_PONG] = lambda buffer, offset: PongMessage() - + self.encode_functions[MESSAGE_PUNCTURE] = self.__encode_puncture self.decode_functions[MESSAGE_PUNCTURE] = self.__decode_puncture - - + + def encode(self, circuit_id, type, message): return self.prefix + struct.pack("!L", circuit_id) + type + self.encode_functions[type](message) - + def decode(self, data, offset=0): message_type = data[offset] - return message_type, self.decode_functions[message_type](data, offset+1) + return message_type, self.decode_functions[message_type](data, offset + 1) def get_circuit_and_data(self, buffer, offset=0): offset += len(self.prefix) - + circuit_id, = struct.unpack_from("!L", buffer, offset) offset += 4 @@ -79,14 +76,14 @@ def get_circuit_and_data(self, buffer, offset=0): def get_type(self, data): return data[0] - + def add_circuit(self, data, new_id): return struct.pack("!L", new_id) + data def __encode_extend(self, extend_message): host = extend_message.host if extend_message.host else '' port = extend_message.port if extend_message.port else 0 - + data = struct.pack("!LL", len(host), port) + host return data @@ -95,12 +92,12 @@ def __decode_extend(self, buffer, offset=0): raise ValueError("Cannot unpack HostLength/Port, insufficient packet size") host_length, port = struct.unpack_from("!LL", buffer, offset) offset += 8 - + if len(buffer) < offset + host_length: raise ValueError("Cannot unpack Host, insufficient packet size") host = buffer[offset:offset + host_length] offset += host_length - + extended_with = (host, port) if host and port else None return ExtendMessage(extended_with) @@ -113,14 +110,14 @@ def __decode_extended(self, buffer, offset=0): raise ValueError("Cannot unpack HostLength/Port, insufficient packet size") host_length, port = struct.unpack_from("!LL", buffer, offset) offset += 8 - + if len(buffer) < offset + host_length: raise ValueError("Cannot unpack Host, insufficient packet size") host = buffer[offset:offset + host_length] offset += host_length - + extended_with = (host, port) - + return ExtendedWithMessage(extended_with) def __encode_data(self, data_message): @@ -128,12 +125,12 @@ def __encode_data(self, data_message): (host, port) = ("0.0.0.0", 0) else: (host, port) = data_message.destination - + if data_message.origin is None: origin = ("0.0.0.0", 0) else: origin = data_message.origin - + return struct.pack("!LLLLL", len(host), port, len(origin[0]), origin[1], len(data_message.data)) \ + host \ @@ -143,24 +140,24 @@ def __encode_data(self, data_message): def __decode_data(self, buffer, offset=0): host_length, port, origin_host_length, origin_port, payload_length = struct.unpack_from("!LLLLL", buffer, offset) offset += 20 - + if len(buffer) < offset + host_length: raise ValueError("Cannot unpack Host, insufficient packet size") host = buffer[offset:offset + host_length] offset += host_length - + destination = (host, port) - + if len(buffer) < offset + origin_host_length: raise ValueError("Cannot unpack Origin Host, insufficient packet size") origin_host = buffer[offset:offset + origin_host_length] offset += origin_host_length - + origin = (origin_host, origin_port) - + if origin == ("0.0.0.0", 0): origin = None - + if payload_length == 0: payload = None else: @@ -171,19 +168,19 @@ def __decode_data(self, buffer, offset=0): return DataMessage(destination, payload, origin) - #why are we using a custom punture-req message? + # why are we using a custom punture-req message? def __encode_puncture(self, puncture_message): return struct.pack("!LL", len(puncture_message.sock_addr[0]), puncture_message.sock_addr[1]) + puncture_message.sock_addr[0] def __decode_puncture(self, buffer, offset=0): host_length, port = struct.unpack_from("!LL", buffer, offset) offset += 8 - + if len(buffer) < offset + host_length: raise ValueError("Cannot unpack Host, insufficient packet size") host = buffer[offset:offset + host_length] offset += host_length - + destination = (host, port) - - return PunctureMessage(destination) \ No newline at end of file + + return PunctureMessage(destination) diff --git a/Tribler/community/anontunnel/globals.py b/Tribler/community/anontunnel/globals.py index e352766a45e..c7832205c62 100644 --- a/Tribler/community/anontunnel/globals.py +++ b/Tribler/community/anontunnel/globals.py @@ -12,7 +12,6 @@ MESSAGE_EXTEND = chr(3) MESSAGE_EXTENDED = chr(4) MESSAGE_DATA = chr(5) -MESSAGE_BREAK = chr(6) MESSAGE_PING = chr(7) MESSAGE_PONG = chr(8) MESSAGE_PUNCTURE = chr(9) @@ -24,11 +23,10 @@ MESSAGE_EXTEND: u'extend', MESSAGE_EXTENDED: u'extended', MESSAGE_DATA: u'data', - MESSAGE_BREAK: u'break', MESSAGE_PING: u'ping', MESSAGE_PONG: u'pong', MESSAGE_PUNCTURE: u'puncture', MESSAGE_STATS: u'stats' } -PING_INTERVAL = (CANDIDATE_WALK_LIFETIME - 5.0) / 4 \ No newline at end of file +PING_INTERVAL = (CANDIDATE_WALK_LIFETIME - 5.0) / 4 diff --git a/Tribler/dispersy b/Tribler/dispersy index 7f02bf34c7d..dc43c46a160 160000 --- a/Tribler/dispersy +++ b/Tribler/dispersy @@ -1 +1 @@ -Subproject commit 7f02bf34c7db9c5be6643f9746e2920346f9d847 +Subproject commit dc43c46a1601d7f9549d0bb0dbfd2bf6c6f8255a diff --git a/logger.conf b/logger.conf index 376b636a5e6..6a306afd54e 100644 --- a/logger.conf +++ b/logger.conf @@ -18,7 +18,7 @@ handlers=default propagate=0 [logger_anontunnel] -level=DEBUG +level=INFO qualname=Tribler.community.anontunnel.community handlers=default propagate=0