diff --git a/sugar/p2p/MostlyReliablePipe.py b/sugar/p2p/MostlyReliablePipe.py index f85a815a..7bb4aaa3 100644 --- a/sugar/p2p/MostlyReliablePipe.py +++ b/sugar/p2p/MostlyReliablePipe.py @@ -10,11 +10,15 @@ pygtk.require('2.0') import gtk, gobject -_MTU = 482 -_HEADER_LEN = 30 +_MTU = 481 +_HEADER_LEN = 31 _MAGIC = 0xbaea4304 _TTL = 120 # 2 minutes +# Message segment packet types +_SEGMENT_TYPE_DATA = 0 +_SEGMENT_TYPE_RETRANSMIT = 1 + def _stringify_sha(sha): print_sha = "" for char in sha: @@ -28,11 +32,12 @@ def _sha_data(data): class MessageSegment(object): # 4: magic (0xbaea4304) + # 1: type # 2: segment number # 2: total segments # 2: message sequence number #20: total data sha1 - _HEADER_TEMPLATE = "! IHHH20s" + _HEADER_TEMPLATE = "! IbHHH20s" def _new_from_parts(self, msg_seq_num, segno, total_segs, data, master_sha): """Construct a new message segment from individual attributes.""" @@ -56,15 +61,16 @@ class MessageSegment(object): self._total_segs = total_segs self._msg_seq_num = msg_seq_num self._addr = None + self._type = _SEGMENT_TYPE_DATA # Make the header - self._header = struct.pack(self._HEADER_TEMPLATE, _MAGIC, self._segno, - self._total_segs, self._msg_seq_num, self._master_sha) + self._header = struct.pack(self._HEADER_TEMPLATE, _MAGIC, self._type, + self._segno, self._total_segs, self._msg_seq_num, self._master_sha) def _new_from_data(self, addr, data): """Verify and construct a new message segment from network data.""" if len(data) < _HEADER_LEN + 1: - raise ValueError("Message is less then minimum required length") + raise ValueError("Segment is less then minimum required length") stream = StringIO.StringIO(data) self._stime = None self._addr = addr @@ -73,19 +79,24 @@ class MessageSegment(object): stream.seek(0, 2) header_size = struct.calcsize(self._HEADER_TEMPLATE) self._data_len = stream.tell() - header_size - if self._data_len < 1: - raise ValueError("Message must have some data.") - if self._data_len > _MTU: - raise ValueError("Data length must not be larger than the MTU (%s)." % _MTU) stream.seek(0) # Read the header attributes - (magic, segno, total_segs, msg_seq_num, master_sha) = struct.unpack(self._HEADER_TEMPLATE, + (magic, seg_type, segno, total_segs, msg_seq_num, master_sha) = struct.unpack(self._HEADER_TEMPLATE, stream.read(header_size)) # Sanity checks on the message attributes + if seg_type != _SEGMENT_TYPE_DATA and seg_type != _SEGMENT_TYPE_RETRANSMIT: + raise ValueError("Segment has invalid type.") + if seg_type == _SEGMENT_TYPE_RETRANSMIT: + if segno != 1 or total_segs != 1: + raise ValueError("Retransmission request messages must have only one segment.") if magic != _MAGIC: - raise ValueError("Message does not have the correct magic.") + raise ValueError("Segment does not have the correct magic.") + if self._data_len < 1: + raise ValueError("Segment must have some data.") + if self._data_len > _MTU: + raise ValueError("Data length must not be larger than the MTU (%s)." % _MTU) if segno < 1: raise ValueError("Segment number must be greater than 0.") if segno > total_segs: @@ -95,6 +106,7 @@ class MessageSegment(object): if msg_seq_num < 1: raise ValueError("Message sequence number must be greater than 0.") + self._type = seg_type self._segno = segno self._total_segs = total_segs self._msg_seq_num = msg_seq_num @@ -138,6 +150,9 @@ class MessageSegment(object): def master_sha(self): return self._master_sha + def segment_type(self): + return self._type + def segment(self): """Return a correctly formatted message that can be immediately sent.""" return self._header + self._data @@ -209,7 +224,7 @@ class MostlyReliablePipe(object): """Send complete message data to the owner's data callback.""" self._data_cb(addr, message, self._user_data) - def _process_incoming(self, segment): + def _process_incoming_data(self, segment): """Handle a new message segment. First checks if there is only one segment to the message, and if the checksum from the header matches that computed from the data, dispatches it. Otherwise, it adds the @@ -249,6 +264,19 @@ class MostlyReliablePipe(object): self._dispatch_message(addr, all_data) del self._incoming[msg_key] + def _process_retransmit_request(self, segment): + """Validate and process a retransmission request.""" + # Retransmission data format: + # 2: message sequence number + # 20: total data sha1 + # 2: segment number + data = segment.data() + if len(data) != 22: + print "Bad retransmission request message format." + # Native byte-order since the receive bits already unpacked it for us + template = "@ H20sH" + (msg_seq_num, master_sha, segno) = struct.unpack(template, data) + def _handle_incoming_data(self, source, condition): """Handle incoming network data by making a message segment out of it sending it off to the processing function.""" @@ -258,9 +286,12 @@ class MostlyReliablePipe(object): data, addr = source.recvfrom(_MTU + _HEADER_LEN) try: segment = MessageSegment.new_from_data(addr, data) - self._process_incoming(segment) + if segment.segment_type() == _SEGMENT_TYPE_DATA: + self._process_incoming_data(segment) + elif segment.segment_type() == _SEGMENT_TYPE_RETRANSMIT: + self._process_retransmit_request(segment) except ValueError, exc: - pass + print "Bad segment: %s" % exc return True def send(self, data):