Add a segment type and start bits for retransmission requests.

This commit is contained in:
Dan Williams 2006-05-16 17:08:39 -04:00
parent 17b77fc7cc
commit b6d50a215b

View File

@ -10,11 +10,15 @@ pygtk.require('2.0')
import gtk, gobject import gtk, gobject
_MTU = 482 _MTU = 481
_HEADER_LEN = 30 _HEADER_LEN = 31
_MAGIC = 0xbaea4304 _MAGIC = 0xbaea4304
_TTL = 120 # 2 minutes _TTL = 120 # 2 minutes
# Message segment packet types
_SEGMENT_TYPE_DATA = 0
_SEGMENT_TYPE_RETRANSMIT = 1
def _stringify_sha(sha): def _stringify_sha(sha):
print_sha = "" print_sha = ""
for char in sha: for char in sha:
@ -28,11 +32,12 @@ def _sha_data(data):
class MessageSegment(object): class MessageSegment(object):
# 4: magic (0xbaea4304) # 4: magic (0xbaea4304)
# 1: type
# 2: segment number # 2: segment number
# 2: total segments # 2: total segments
# 2: message sequence number # 2: message sequence number
#20: total data sha1 #20: total data sha1
_HEADER_TEMPLATE = "! IHHH20s" _HEADER_TEMPLATE = "! IbHHH20s"
def _new_from_parts(self, msg_seq_num, segno, total_segs, data, master_sha): def _new_from_parts(self, msg_seq_num, segno, total_segs, data, master_sha):
"""Construct a new message segment from individual attributes.""" """Construct a new message segment from individual attributes."""
@ -56,15 +61,16 @@ class MessageSegment(object):
self._total_segs = total_segs self._total_segs = total_segs
self._msg_seq_num = msg_seq_num self._msg_seq_num = msg_seq_num
self._addr = None self._addr = None
self._type = _SEGMENT_TYPE_DATA
# Make the header # Make the header
self._header = struct.pack(self._HEADER_TEMPLATE, _MAGIC, self._segno, self._header = struct.pack(self._HEADER_TEMPLATE, _MAGIC, self._type,
self._total_segs, self._msg_seq_num, self._master_sha) self._segno, self._total_segs, self._msg_seq_num, self._master_sha)
def _new_from_data(self, addr, data): def _new_from_data(self, addr, data):
"""Verify and construct a new message segment from network data.""" """Verify and construct a new message segment from network data."""
if len(data) < _HEADER_LEN + 1: if len(data) < _HEADER_LEN + 1:
raise ValueError("Message is less then minimum required length") raise ValueError("Segment is less then minimum required length")
stream = StringIO.StringIO(data) stream = StringIO.StringIO(data)
self._stime = None self._stime = None
self._addr = addr self._addr = addr
@ -73,19 +79,24 @@ class MessageSegment(object):
stream.seek(0, 2) stream.seek(0, 2)
header_size = struct.calcsize(self._HEADER_TEMPLATE) header_size = struct.calcsize(self._HEADER_TEMPLATE)
self._data_len = stream.tell() - header_size self._data_len = stream.tell() - header_size
if self._data_len < 1:
raise ValueError("Message must have some data.")
if self._data_len > _MTU:
raise ValueError("Data length must not be larger than the MTU (%s)." % _MTU)
stream.seek(0) stream.seek(0)
# Read the header attributes # Read the header attributes
(magic, segno, total_segs, msg_seq_num, master_sha) = struct.unpack(self._HEADER_TEMPLATE, (magic, seg_type, segno, total_segs, msg_seq_num, master_sha) = struct.unpack(self._HEADER_TEMPLATE,
stream.read(header_size)) stream.read(header_size))
# Sanity checks on the message attributes # Sanity checks on the message attributes
if seg_type != _SEGMENT_TYPE_DATA and seg_type != _SEGMENT_TYPE_RETRANSMIT:
raise ValueError("Segment has invalid type.")
if seg_type == _SEGMENT_TYPE_RETRANSMIT:
if segno != 1 or total_segs != 1:
raise ValueError("Retransmission request messages must have only one segment.")
if magic != _MAGIC: if magic != _MAGIC:
raise ValueError("Message does not have the correct magic.") raise ValueError("Segment does not have the correct magic.")
if self._data_len < 1:
raise ValueError("Segment must have some data.")
if self._data_len > _MTU:
raise ValueError("Data length must not be larger than the MTU (%s)." % _MTU)
if segno < 1: if segno < 1:
raise ValueError("Segment number must be greater than 0.") raise ValueError("Segment number must be greater than 0.")
if segno > total_segs: if segno > total_segs:
@ -95,6 +106,7 @@ class MessageSegment(object):
if msg_seq_num < 1: if msg_seq_num < 1:
raise ValueError("Message sequence number must be greater than 0.") raise ValueError("Message sequence number must be greater than 0.")
self._type = seg_type
self._segno = segno self._segno = segno
self._total_segs = total_segs self._total_segs = total_segs
self._msg_seq_num = msg_seq_num self._msg_seq_num = msg_seq_num
@ -138,6 +150,9 @@ class MessageSegment(object):
def master_sha(self): def master_sha(self):
return self._master_sha return self._master_sha
def segment_type(self):
return self._type
def segment(self): def segment(self):
"""Return a correctly formatted message that can be immediately sent.""" """Return a correctly formatted message that can be immediately sent."""
return self._header + self._data return self._header + self._data
@ -209,7 +224,7 @@ class MostlyReliablePipe(object):
"""Send complete message data to the owner's data callback.""" """Send complete message data to the owner's data callback."""
self._data_cb(addr, message, self._user_data) self._data_cb(addr, message, self._user_data)
def _process_incoming(self, segment): def _process_incoming_data(self, segment):
"""Handle a new message segment. First checks if there is only one """Handle a new message segment. First checks if there is only one
segment to the message, and if the checksum from the header matches segment to the message, and if the checksum from the header matches
that computed from the data, dispatches it. Otherwise, it adds the that computed from the data, dispatches it. Otherwise, it adds the
@ -249,6 +264,19 @@ class MostlyReliablePipe(object):
self._dispatch_message(addr, all_data) self._dispatch_message(addr, all_data)
del self._incoming[msg_key] del self._incoming[msg_key]
def _process_retransmit_request(self, segment):
"""Validate and process a retransmission request."""
# Retransmission data format:
# 2: message sequence number
# 20: total data sha1
# 2: segment number
data = segment.data()
if len(data) != 22:
print "Bad retransmission request message format."
# Native byte-order since the receive bits already unpacked it for us
template = "@ H20sH"
(msg_seq_num, master_sha, segno) = struct.unpack(template, data)
def _handle_incoming_data(self, source, condition): def _handle_incoming_data(self, source, condition):
"""Handle incoming network data by making a message segment out of it """Handle incoming network data by making a message segment out of it
sending it off to the processing function.""" sending it off to the processing function."""
@ -258,9 +286,12 @@ class MostlyReliablePipe(object):
data, addr = source.recvfrom(_MTU + _HEADER_LEN) data, addr = source.recvfrom(_MTU + _HEADER_LEN)
try: try:
segment = MessageSegment.new_from_data(addr, data) segment = MessageSegment.new_from_data(addr, data)
self._process_incoming(segment) if segment.segment_type() == _SEGMENT_TYPE_DATA:
self._process_incoming_data(segment)
elif segment.segment_type() == _SEGMENT_TYPE_RETRANSMIT:
self._process_retransmit_request(segment)
except ValueError, exc: except ValueError, exc:
pass print "Bad segment: %s" % exc
return True return True
def send(self, data): def send(self, data):