diff --git a/sugar/p2p/MostlyReliablePipe.py b/sugar/p2p/MostlyReliablePipe.py index 7bb4aaa3..c57b022e 100644 --- a/sugar/p2p/MostlyReliablePipe.py +++ b/sugar/p2p/MostlyReliablePipe.py @@ -10,15 +10,6 @@ pygtk.require('2.0') import gtk, gobject -_MTU = 481 -_HEADER_LEN = 31 -_MAGIC = 0xbaea4304 -_TTL = 120 # 2 minutes - -# Message segment packet types -_SEGMENT_TYPE_DATA = 0 -_SEGMENT_TYPE_RETRANSMIT = 1 - def _stringify_sha(sha): print_sha = "" for char in sha: @@ -31,6 +22,8 @@ def _sha_data(data): return sha_hash.digest() class MessageSegment(object): + _MAGIC = 0xbaea4304 + # 4: magic (0xbaea4304) # 1: type # 2: segment number @@ -38,6 +31,38 @@ class MessageSegment(object): # 2: message sequence number #20: total data sha1 _HEADER_TEMPLATE = "! IbHHH20s" + _HEADER_LEN = struct.calcsize(_HEADER_TEMPLATE) + _MTU = 512 - _HEADER_LEN + + # Message segment packet types + _SEGMENT_TYPE_DATA = 0 + _SEGMENT_TYPE_RETRANSMIT = 1 + + def is_data_type(stype): + if stype == MessageSegment._SEGMENT_TYPE_DATA: + return True + return False + is_data_type = staticmethod(is_data_type) + + def is_retransmit_type(stype): + if stype == MessageSegment._SEGMENT_TYPE_RETRANSMIT: + return True + return False + is_retransmit_type = staticmethod(is_retransmit_type) + + def header_len(): + return MessageSegment._HEADER_LEN + header_len = staticmethod(header_len) + + def mtu(): + return MessageSegment._MTU + mtu = staticmethod(mtu) + + def is_type_valid(stype): + if MessageSegment.is_data_type(stype) or MessageSegment.is_retransmit_type(stype): + return True + return False + is_type_valid = staticmethod(is_type_valid) def _new_from_parts(self, msg_seq_num, segno, total_segs, data, master_sha): """Construct a new message segment from individual attributes.""" @@ -61,15 +86,15 @@ class MessageSegment(object): self._total_segs = total_segs self._msg_seq_num = msg_seq_num self._addr = None - self._type = _SEGMENT_TYPE_DATA + self._type = MessageSegment._SEGMENT_TYPE_DATA # Make the header - self._header = struct.pack(self._HEADER_TEMPLATE, _MAGIC, self._type, + self._header = struct.pack(self._HEADER_TEMPLATE, self._MAGIC, self._type, self._segno, self._total_segs, self._msg_seq_num, self._master_sha) def _new_from_data(self, addr, data): """Verify and construct a new message segment from network data.""" - if len(data) < _HEADER_LEN + 1: + if len(data) < self._HEADER_LEN + 1: raise ValueError("Segment is less then minimum required length") stream = StringIO.StringIO(data) self._stime = None @@ -86,17 +111,17 @@ class MessageSegment(object): stream.read(header_size)) # Sanity checks on the message attributes - if seg_type != _SEGMENT_TYPE_DATA and seg_type != _SEGMENT_TYPE_RETRANSMIT: + if not MessageSegment.is_type_valid(seg_type): raise ValueError("Segment has invalid type.") - if seg_type == _SEGMENT_TYPE_RETRANSMIT: + if MessageSegment.is_data_type(seg_type): if segno != 1 or total_segs != 1: raise ValueError("Retransmission request messages must have only one segment.") - if magic != _MAGIC: + if magic != self._MAGIC: raise ValueError("Segment does not have the correct magic.") if self._data_len < 1: raise ValueError("Segment must have some data.") - if self._data_len > _MTU: - raise ValueError("Data length must not be larger than the MTU (%s)." % _MTU) + if self._data_len > self._MTU: + raise ValueError("Data length must not be larger than the MTU (%s)." % self._MTU) if segno < 1: raise ValueError("Segment number must be greater than 0.") if segno > total_segs: @@ -161,6 +186,9 @@ class MostlyReliablePipe(object): """Implement Mostly-Reliable UDP. We don't actually care about guaranteeing delivery or receipt, just a better effort than no effort at all.""" + _UDP_MSG_SIZE = MessageSegment.mtu() + MessageSegment.header_len() + _SEGMENT_TTL = 120 # 2 minutes + def __init__(self, local_addr, remote_addr, port, data_cb, user_data=None): self._local_addr = local_addr self._remote_addr = remote_addr @@ -208,7 +236,7 @@ class MostlyReliablePipe(object): # Watch the listener socket for data gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data) - gobject.timeout_add(120000, self._segment_ttl_worker) + gobject.timeout_add(self._SEGMENT_TTL * 1000, self._segment_ttl_worker) self._started = True @@ -216,7 +244,7 @@ class MostlyReliablePipe(object): """Cull already-sent message segments that are past their TTL.""" now = time.time() for segment in self._sent[:]: - if segment.stime() < now - _MSG_TTL: + if segment.stime() < now - self._SEGMENT_TTL: self._sent.remove(segment) return True @@ -271,10 +299,10 @@ class MostlyReliablePipe(object): # 20: total data sha1 # 2: segment number data = segment.data() - if len(data) != 22: + template = "@ H20sH" + if len(data) != struct.calcsize(template): print "Bad retransmission request message format." # Native byte-order since the receive bits already unpacked it for us - template = "@ H20sH" (msg_seq_num, master_sha, segno) = struct.unpack(template, data) def _handle_incoming_data(self, source, condition): @@ -283,12 +311,13 @@ class MostlyReliablePipe(object): if not (condition & gobject.IO_IN): return True msg = {} - data, addr = source.recvfrom(_MTU + _HEADER_LEN) + data, addr = source.recvfrom(self._UDP_MSG_SIZE) try: segment = MessageSegment.new_from_data(addr, data) - if segment.segment_type() == _SEGMENT_TYPE_DATA: + stype = segment.segment_type() + if MessageSegment.is_data_type(stype): self._process_incoming_data(segment) - elif segment.segment_type() == _SEGMENT_TYPE_RETRANSMIT: + elif MessageSegment.is_retransmit_type(stype): self._process_retransmit_request(segment) except ValueError, exc: print "Bad segment: %s" % exc @@ -310,17 +339,18 @@ class MostlyReliablePipe(object): # Split up the data into segments left = length = len(data) - nmessages = length / _MTU - if length % _MTU > 0: + mtu = MessageSegment.mtu() + nmessages = length / mtu + if length % mtu > 0: nmessages = nmessages + 1 msg_num = 1 while left > 0: msg = MessageSegment.new_from_parts(self._seq_counter, msg_num, - nmessages, data[:_MTU], master_sha) + nmessages, data[:mtu], master_sha) self._outgoing.append(msg) msg_num = msg_num + 1 - data = data[_MTU:] - left = left - _MTU + data = data[mtu:] + left = left - mtu if len(self._outgoing) > 0 and self._worker == 0: self._worker = gobject.idle_add(self._send_worker)