diff --git a/community.py b/community.py index 9ebcff6d..2ab13d41 100644 --- a/community.py +++ b/community.py @@ -24,7 +24,7 @@ from .bloomfilter import BloomFilter from .candidate import Candidate, WalkCandidate from .conversion import BinaryConversion, DefaultConversion, Conversion -from .destination import CommunityDestination, CandidateDestination +from .destination import CommunityDestination, CandidateDestination, NHopCommunityDestination from .distribution import (SyncDistribution, GlobalTimePruning, LastSyncDistribution, DirectDistribution, FullSyncDistribution) from .exception import ConversionNotFoundException, MetaNotFoundException @@ -2438,10 +2438,10 @@ def on_signature_response(self, messages): changed = old_body != new_body - # A CommunityDestination is allowed to have one unsigned changed field: the hop count. + # A NHopCommunityDestination is allowed to have one unsigned changed field: the hop count. # This hop count has the restriction that it must be 1 less in the new message than # in the old message. - if changed and isinstance(message.payload.message.meta.destination, CommunityDestination): + if changed and isinstance(message.payload.message.meta.destination, NHopCommunityDestination): new_body_len = len(new_body) # Create a list of differing indices diffs = [i for i in xrange(len(old_body)) if (i < new_body_len) and (old_body[i] != new_body[i])] diff --git a/conversion.py b/conversion.py index 621ad39c..b749abc6 100644 --- a/conversion.py +++ b/conversion.py @@ -9,7 +9,7 @@ from .authentication import Authentication, NoAuthentication, MemberAuthentication, DoubleMemberAuthentication from .bloomfilter import BloomFilter from .candidate import Candidate -from .destination import Destination, CommunityDestination, CandidateDestination +from .destination import Destination, CommunityDestination, CandidateDestination, NHopCommunityDestination from .distribution import Distribution, FullSyncDistribution, LastSyncDistribution, DirectDistribution from .exception import MetaNotFoundException from .message import DelayPacketByMissingMember, DropPacket, Message @@ -239,7 +239,8 @@ def define_meta_message(self, byte, meta, encode_payload_func, decode_payload_fu DirectDistribution: self._encode_direct_distribution, CandidateDestination: self._encode_candidate_destination, - CommunityDestination: self._encode_community_destination} + CommunityDestination: self._encode_community_destination, + NHopCommunityDestination: self._encode_community_destination} self._encode_message_map[meta.name] = self.EncodeFunctions(byte, mapping[type(meta.authentication)], mapping[type(meta.resolution)], mapping[type(meta.distribution)], mapping[type(meta.destination)], encode_payload_func) @@ -256,7 +257,8 @@ def define_meta_message(self, byte, meta, encode_payload_func, decode_payload_fu LastSyncDistribution: self._decode_last_sync_distribution, CandidateDestination: self._decode_candidate_destination, - CommunityDestination: self._decode_community_destination} + CommunityDestination: self._decode_community_destination, + NHopCommunityDestination: self._decode_community_destination} self._decode_message_map[byte] = self.DecodeFunctions(meta, mapping[type(meta.authentication)], mapping[type(meta.resolution)], mapping[type(meta.distribution)], mapping[type(meta.destination)], decode_payload_func) @@ -975,7 +977,8 @@ def _encode_candidate_destination(self, container, message): pass def _encode_community_destination(self, container, message): - container.append(pack("!b", message.destination.depth)) + if isinstance(message.meta.destination, NHopCommunityDestination): + container.append(pack("!b", message.destination.depth)) def can_encode_message(self, message): """ @@ -1174,11 +1177,14 @@ def _decode_candidate_destination(self, placeholder): placeholder.destination = placeholder.meta.destination.Implementation(placeholder.meta.destination) def _decode_community_destination(self, placeholder): - depth, = unpack_from("!b", placeholder.data, placeholder.offset) - placeholder.offset += 1 - new_depth = depth - 1 if depth > 0 else depth - placeholder.destination = placeholder.meta.destination.Implementation(placeholder.meta.destination, - depth=new_depth) + if isinstance(placeholder.meta.destination, NHopCommunityDestination): + depth, = unpack_from("!b", placeholder.data, placeholder.offset) + placeholder.offset += 1 + new_depth = depth - 1 if depth > 0 else depth + placeholder.destination = placeholder.meta.destination.Implementation(placeholder.meta.destination, + depth=new_depth) + else: + placeholder.destination = placeholder.meta.destination.Implementation(placeholder.meta.destination) def can_decode_message(self, data): """ diff --git a/destination.py b/destination.py index 4ae3db25..9b000126 100644 --- a/destination.py +++ b/destination.py @@ -71,24 +71,15 @@ def __init__(self, meta, *candidates, **kwargs): super(CommunityDestination.Implementation, self).__init__(meta) self._candidates = candidates - if 'depth' in kwargs: - self._depth = kwargs['depth'] - else: - self._depth = meta.depth - @property def node_count(self): return self._meta._node_count - @property - def depth(self): - return self._depth - @property def candidates(self): return self._candidates - def __init__(self, node_count, depth=-1): + def __init__(self, node_count): """ Construct a CommunityDestination object. @@ -102,15 +93,59 @@ def __init__(self, node_count, depth=-1): assert isinstance(node_count, int) assert node_count >= 0 self._node_count = node_count - self._depth = depth @property def node_count(self): return self._node_count + def __str__(self): + return "<%s node_count:%d>" % (self.__class__.__name__, self._node_count) + +class NHopCommunityDestination(CommunityDestination): + """ + An extension of the default CommunityDestination which also takes a maximum number of hops for the message + to traverse. + """ + + class Implementation(CommunityDestination.Implementation): + + def __init__(self, meta, *candidates, **kwargs): + """ + Construct a NHopCommunityDestination.Implementation object. + + META the associated CandidateDestination object. + + CANDIDATES is a tuple containing zero or more Candidate objects. These will contain the + destination addresses when the associated message is sent. + """ + super(NHopCommunityDestination.Implementation, self).__init__(meta, *candidates, **kwargs) + + if 'depth' in kwargs: + self._depth = kwargs['depth'] + else: + self._depth = meta.depth + + @property + def depth(self): + return self._depth + + def __init__(self, node_count, depth=-1): + """ + Construct a NHopCommunityDestination object. + + NODE_COUNT is an integer giving the number of nodes where, when the message is created, the + message must be sent to. These nodes are selected using the + community.yield_verified_candidates(...) method. NODE_COUNT must be zero or higher. + + DEPTH is an integer in [0, 127] v -1, this determines the _remaining_ hop count for this message. If + DEPTH is equal to -1, no hop depth will be used. + """ + super(NHopCommunityDestination, self).__init__(node_count) + self._depth = depth + @property def depth(self): return self._depth def __str__(self): - return "<%s node_count:%d>" % (self.__class__.__name__, self._node_count) + return "<%s node_count:%d, depth:%d>" % (self.__class__.__name__, self._node_count, self.depth) diff --git a/dispersy.py b/dispersy.py index 3379a0d3..10f4999b 100644 --- a/dispersy.py +++ b/dispersy.py @@ -57,7 +57,7 @@ from .candidate import LoopbackCandidate, WalkCandidate, Candidate from .community import Community from .crypto import DispersyCrypto, ECCrypto -from .destination import CommunityDestination, CandidateDestination +from .destination import CommunityDestination, CandidateDestination, NHopCommunityDestination from .discovery.community import DiscoveryCommunity from .dispersydatabase import DispersyDatabase from .distribution import SyncDistribution, FullSyncDistribution, LastSyncDistribution @@ -1275,7 +1275,7 @@ def _forward(self, messages): if isinstance(meta.destination, (CommunityDestination, CandidateDestination)): for message in messages: # Don't forward messages with a 0 TTL - if isinstance(meta.destination, CommunityDestination) and message.destination.depth == 0: + if isinstance(meta.destination, NHopCommunityDestination) and message.destination.depth == 0: continue # CandidateDestination.candidates may be empty candidates = set(message.destination.candidates) diff --git a/tests/debugcommunity/community.py b/tests/debugcommunity/community.py index ee0ae073..bc22f3b6 100644 --- a/tests/debugcommunity/community.py +++ b/tests/debugcommunity/community.py @@ -2,7 +2,7 @@ from ...candidate import Candidate from ...community import Community, HardKilledCommunity from ...conversion import DefaultConversion -from ...destination import CommunityDestination +from ...destination import CommunityDestination, NHopCommunityDestination from ...distribution import DirectDistribution, FullSyncDistribution, LastSyncDistribution, GlobalTimePruning from ...message import Message, DelayMessageByProof, BatchConfiguration from ...resolution import PublicResolution, LinearResolution, DynamicResolution @@ -56,7 +56,7 @@ def initiate_meta_messages(self): DoubleMemberAuthentication(allow_signature_func=self.allow_double_signed_text), PublicResolution(), LastSyncDistribution(synchronization_direction=u"ASC", priority=128, history_size=1), - CommunityDestination(node_count=10, depth=42), + NHopCommunityDestination(node_count=10, depth=42), TextPayload(), self._generic_timeline_check, self.on_text), @@ -64,7 +64,7 @@ def initiate_meta_messages(self): DoubleMemberAuthentication(allow_signature_func=self.allow_double_signed_text), PublicResolution(), DirectDistribution(), - CommunityDestination(node_count=10, depth=42), + NHopCommunityDestination(node_count=10, depth=42), TextPayload(), self._generic_timeline_check, self.on_text), @@ -72,7 +72,7 @@ def initiate_meta_messages(self): DoubleMemberAuthentication(allow_signature_func=self.allow_double_signed_text, split_payload_func=self.split_double_payload), PublicResolution(), DirectDistribution(), - CommunityDestination(node_count=10, depth=42), + NHopCommunityDestination(node_count=10, depth=42), TextPayload(), self._generic_timeline_check, self.on_text), @@ -89,7 +89,7 @@ def initiate_meta_messages(self): MemberAuthentication(), PublicResolution(), FullSyncDistribution(enable_sequence_number=False, synchronization_direction=u"ASC", priority=128), - CommunityDestination(node_count=1, depth=1), + NHopCommunityDestination(node_count=1, depth=1), TextPayload(), self._generic_timeline_check, self.on_text,