From 533dbd638e52520ec9aa779a75e493ea6b40ad7e Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Thu, 18 May 2006 13:09:56 -0400 Subject: [PATCH] Implement retransmit requests on receiver side, fix some stuff up. --- sugar/p2p/MostlyReliablePipe.py | 296 +++++++++++++++++++++++++++----- 1 file changed, 249 insertions(+), 47 deletions(-) diff --git a/sugar/p2p/MostlyReliablePipe.py b/sugar/p2p/MostlyReliablePipe.py index fbc65bd7..4a9dbdcb 100644 --- a/sugar/p2p/MostlyReliablePipe.py +++ b/sugar/p2p/MostlyReliablePipe.py @@ -100,15 +100,18 @@ class SegmentBase(object): self._msg_seq_num = msg_seq_num self._master_sha = master_sha - def new_from_data(addr, data): - """Static constructor for creation from a packed data stream.""" - + def _validate_address(addr): if not addr or type(addr) != type(()): raise ValueError("Address must be a tuple.") if len(addr) != 2 or type(addr[0]) != type("") or type(addr[1]) != type(1): raise ValueError("Address format was invalid.") if addr[1] < 1 or addr[1] > 65535: raise ValueError("Address port was invalid.") + _validate_address = staticmethod(_validate_address) + + def new_from_data(addr, data): + """Static constructor for creation from a packed data stream.""" + SegmentBase._validate_address(addr) # Verify minimum length if not data: @@ -162,7 +165,7 @@ class SegmentBase(object): def stime(self): return self._stime - def addr(self): + def address(self): return self._addr def segment_number(self): @@ -271,7 +274,7 @@ class RetransmitSegment(SegmentBase): return (data, _sha_data(data)) _make_rtms_data = staticmethod(_make_rtms_data) - def new_from_parts(msg_seq_num, rt_msg_seq_num, rt_master_sha, rt_segment_number): + def new_from_parts(addr, msg_seq_num, rt_msg_seq_num, rt_master_sha, rt_segment_number): """Static constructor for creation from individual attributes.""" RetransmitSegment._verify_data(rt_msg_seq_num, rt_master_sha, rt_segment_number) @@ -280,6 +283,8 @@ class RetransmitSegment(SegmentBase): segment = RetransmitSegment(1, 1, msg_seq_num, data_sha) segment._data_len = RetransmitSegment._RT_DATA_LEN segment._data = data + SegmentBase._validate_address(addr) + segment._addr = addr segment._rt_msg_seq_num = rt_msg_seq_num segment._rt_master_sha = rt_master_sha @@ -310,6 +315,109 @@ class RetransmitSegment(SegmentBase): return self._rt_segment_number +class Message(object): + """Tracks an entire message object, which is composed of a number + of individual segments.""" + def __init__(self, src_addr, msg_seq_num, msg_sha, total_segments): + self._rt_target = 0 + self._next_rt_time = 0 + self._last_incoming_time = 0 + self._segments = {} + self._complete = False + self._data = None + self._data_sha = None + self._src_addr = src_addr + self._msg_seq_num = msg_seq_num + self._msg_sha = msg_sha + self._total_segments = total_segments + self._rt_tries = {} + for i in range(1, self._total_segments + 1): + self._rt_tries[i] = 0 + + def __del__(self): + self.clear() + + def sha(self): + return self._msg_sha + + def source_address(self): + return self._src_addr + + def clear(self): + for key in self._segments.keys()[:]: + del self._segments[key] + del self._rt_tries[key] + self._segments = {} + self._rt_tries = {} + + def has_segment(self, segno): + return self._segments.has_key(segno) + + def first_missing(self): + for i in range(1, self._total_segments + 1): + if not self._segments.has_key(i): + return i + return 0 + + _DEF_RT_REQUEST_INTERVAL = 500 + def update_rt_wait(self, now): + """now argument should be in seconds.""" + msg_completeness = float(len(self._segments)) / float(self._total_segments) + next = self._DEF_RT_REQUEST_INTERVAL * (1.1 - msg_completeness) + rt_penalty = 100 + first_missing = self.first_missing() + if first_missing: + rt_penalty = self._rt_tries[first_missing] * 100 + # print "** msg_comp: %s, next: %d, rt_penalty: %s, rt: %s" % (msg_completeness, next, rt_penalty, (float(next + rt_penalty) / 1000)) + next = next + rt_penalty + self._next_rt_time = now + (float(next) * .001) + + def add_segment(self, segment): + segno = segment.segment_number() + if self._segments.has_key(segno): + return + self._segments[segno] = segment + self._rt_tries[segno] = 0 + now = time.time() + self._last_incoming_time = now + + num_segs = len(self._segments) + if num_segs == segment.total_segments(): + self._complete = True + self._data = '' + for seg in self._segments.values(): + self._data = self._data + seg.data() + self._data_sha = _sha_data(self._data) + else: + self.update_rt_wait(now) + + def get_retransmit_message(self, msg_seq_num, segno): + if segno < 1 or segno > self._total_segments: + return None + seg = RetransmitSegment.new_from_parts(self._src_addr, msg_seq_num, + self._msg_seq_num, self._msg_sha, segno) + self._rt_tries[segno] = self._rt_tries[segno] + 1 + self.update_rt_wait(time.time()) + return seg + + def complete(self): + return self._complete + + def data(self): + return (self._data, self._data_sha) + + def last_incoming_time(self): + return self._last_incoming_time + + def next_rt_time(self): + return self._next_rt_time + + def rt_tries(self, segno): + if self._rt_tries.has_key(segno): + return self._rt_tries[segno] + return 0 + + class MostlyReliablePipe(object): """Implement Mostly-Reliable UDP. We don't actually care about guaranteeing delivery or receipt, just a better effort than no effort at all.""" @@ -324,8 +432,10 @@ class MostlyReliablePipe(object): self._data_cb = data_cb self._user_data = user_data self._started = False - self._worker = 0 + self._send_worker = 0 self._seq_counter = 0 + self._drop_prob = 0 + self._rt_check_worker = 0 self._outgoing = [] self._sent = {} @@ -335,6 +445,14 @@ class MostlyReliablePipe(object): self._setup_listener() self._setup_sender() + def __del__(self): + if self._send_worker > 0: + gobject.source_remove(self._send_worker) + self._send_worker = 0 + if self._rt_check_worker > 0: + gobject.source_remove(self._rt_check_worker) + self._rt_check_worker = 0 + def _setup_sender(self): """Setup the send socket for multicast.""" self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -354,7 +472,7 @@ class MostlyReliablePipe(object): def start(self): """Let the listener socket start listening for network data.""" # Set some more multicast options - self._listen_sock.bind((self._local_addr, self._port)) # Bind to all interfaces + self._listen_sock.bind((self._local_addr, self._port)) self._listen_sock.settimeout(2) intf = socket.gethostbyname(socket.gethostname()) self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, @@ -365,6 +483,7 @@ class MostlyReliablePipe(object): # Watch the listener socket for data gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data) gobject.timeout_add(self._SEGMENT_TTL * 1000, self._segment_ttl_worker) + gobject.timeout_add(250, self._retransmit_check_worker) self._started = True @@ -373,13 +492,51 @@ class MostlyReliablePipe(object): now = time.time() for segment in self._sent[:]: if segment.stime() < now - self._SEGMENT_TTL: + if segment.userdata: + gobject.source_remove(segment.userdata) self._sent.remove(segment) + + # Cull incomplete incoming segment chains that haven't gotten any data + # for a long time either + for msg_key in self._incoming.keys()[:]: + message = self._incoming[msg_key] + if message.last_incoming_time() < now - self._SEGMENT_TTL: + del self._incoming[msg_key] return True def _dispatch_message(self, addr, message): """Send complete message data to the owner's data callback.""" self._data_cb(addr, message, self._user_data) + _MAX_SEGMENT_RETRIES = 10 + def _retransmit_request(self, message): + """Returns true if the message has exceeded it's retry limit.""" + first_missing = message.first_missing() + if first_missing > 0: + num_retries = message.rt_tries(first_missing) + if num_retries > self._MAX_SEGMENT_RETRIES: + return True + msg_seq = self._next_msg_seq() + seg = message.get_retransmit_message(msg_seq, first_missing) + if seg: + print " Requesting retransmit of %d by %s" % (first_missing, message.source_address()) + self._outgoing.append(seg) + self._schedule_send_worker() + return False + + def _retransmit_check_worker(self): + now = time.time() + for key in self._incoming.keys()[:]: + message = self._incoming[key] + if message.complete(): + continue + if message.next_rt_time() < now: + if self._retransmit_request(message): + # Kill the message, too many retries + print "Dropped message %s, exceeded retries." % _stringify_sha(message.sha()) + del self._incoming[key] + return True + def _process_incoming_data(self, segment): """Handle a new message segment. First checks if there is only one segment to the message, and if the checksum from the header matches @@ -388,55 +545,56 @@ class MostlyReliablePipe(object): checks to see if the message is complete. If all segments are present, the message is reassembled and dispatched.""" - string_sha = _stringify_sha(segment.master_sha()) + msg_sha = segment.master_sha() nsegs = segment.total_segments() - addr = segment.addr() + addr = segment.address() segno = segment.segment_number() # Short-circuit single-segment messages if segno == 1 and nsegs == 1: # Ensure the header's master sha actually equals the data's sha - if string_sha == _stringify_sha(_sha_data(segment.data())): + if msg_sha == _sha_data(segment.data()): self._dispatch_message(addr, segment.data()) return # Otherwise, track the new segment msg_seq_num = segment.message_sequence_number() - msg_key = (addr[0], msg_seq_num, string_sha, nsegs) + msg_key = (addr[0], msg_seq_num, msg_sha, nsegs) if not self._incoming.has_key(msg_key): - self._incoming[msg_key] = {} + self._incoming[msg_key] = Message((addr[0], self._port), msg_seq_num, msg_sha, nsegs) + message = self._incoming[msg_key] # Look for a dupe, and if so, drop the new segment - if self._incoming[msg_key].has_key(segno): + if message.has_segment(segno): return - self._incoming[msg_key][segno] = segment + message.add_segment(segment) # Dispatch the message if all segments are present and the sha is correct - if len(self._incoming[msg_key]) == nsegs: - all_data = '' - for i in range(1, nsegs + 1): - all_data = all_data + self._incoming[msg_key][i].data() - if string_sha == _stringify_sha(_sha_data(all_data)): - self._dispatch_message(addr, all_data) + if message.complete(): + (msg_data, complete_data_sha) = message.data() + if msg_sha == complete_data_sha: + self._dispatch_message(addr, msg_data) del self._incoming[msg_key] + return - _STD_RETRANSMIT_INTERVAL = 500 # 1/2 second (in milliseconds) + _STD_RETRANSMIT_INTERVAL = 150 # in milliseconds def _calc_next_retransmit(self, segment, now): """Calculate the next time (in seconds) that a packet can be retransmitted.""" num_retrans = segment.transmits() - 1 interval = num_retrans * self._STD_RETRANSMIT_INTERVAL - randomness = num_retrans * random.randint(-4, 11) + randomness = num_retrans * ((random.random() * 15) - 5) real_interval = max(self._STD_RETRANSMIT_INTERVAL, interval + randomness) +# print "------ Interval %s" % (real_interval * .001) return max(now, segment.last_transmit() + (real_interval * .001)) - def _segment_retransmit_cb(self, segment): + def _segment_retransmit_cb(self, key, segment): """Add a segment ot the outgoing queue and schedule its transmission.""" del self._sent[key] self._outgoing.append(segment) self._schedule_send_worker() return False - def _schedule_segment_retransmit(self, segment, when): + def _schedule_segment_retransmit(self, key, segment, when): """Schedule retransmission of a segment if one is not already scheduled.""" if segment.userdata: # Already scheduled for retransmit @@ -444,12 +602,13 @@ class MostlyReliablePipe(object): if when == 0: # Immediate retransmission - segment.userdata = gobject.idle_add(self._segment_retransmit_cb, segment) + segment.userdata = gobject.idle_add(self._segment_retransmit_cb, key, segment) else: # convert time to milliseconds timeout = int((when - time.time()) * 1000) + print "RT timeout: %s" % timeout segment.userdata = gobject.timeout_add(timeout, self._segment_retransmit_cb, - segment) + key, segment) def _process_retransmit_request(self, segment): """Validate and process a retransmission request.""" @@ -462,7 +621,13 @@ class MostlyReliablePipe(object): segment = self._sent[key] now = time.time() next_retrans = self._calc_next_retransmit(segment, now) - self._schedule_segment_retransmit(segment, next_retrans - now) + self._schedule_segment_retransmit(key, segment, next_retrans - now) + + def set_drop_probability(self, prob=4): + """Debugging function to randomly drop incoming packets. + The prob argument should be an integer between 1 and 10 to drop, + or 0 to drop none. Higher numbers drop more packets.""" + self._drop_prob = prob def _handle_incoming_data(self, source, condition): """Handle incoming network data by making a message segment out of it @@ -471,25 +636,38 @@ class MostlyReliablePipe(object): return True msg = {} data, addr = source.recvfrom(self._UDP_MSG_SIZE) + + should_drop = False + p = random.random() * 10.0 + if self._drop_prob > 0 and p <= self._drop_prob: + should_drop = True + try: segment = SegmentBase.new_from_data(addr, data) - stype = segment.segment_type() - if stype == SegmentBase.type_data(): - self._process_incoming_data(segment) - elif stype == SegmentBase.type_retransmit(): - self._process_retransmit_request(segment) + if should_drop: + print "Dropped segment %d (p (%s) < %s)." % (segment.segment_number(), p, self._drop_prob) + else: + stype = segment.segment_type() + if stype == SegmentBase.type_data(): + self._process_incoming_data(segment) + elif stype == SegmentBase.type_retransmit(): + self._process_retransmit_request(segment) except ValueError, exc: print "Bad segment: %s" % exc return True + def _next_msg_seq(self): + self._seq_counter = self._seq_counter + 1 + if self._seq_counter > 65535: + self._seq_counter = 1 + return self._seq_counter + def send(self, data): """Break data up into chunks and queue for later transmission.""" if not self._started: raise Exception("Can't send anything until started!") - self._seq_counter = self._seq_counter + 1 - if self._seq_counter > 65535: - self._seq_counter = 1 + msg_seq = self._next_msg_seq() # Pack the data into network byte order template = "! %ds" % len(data) @@ -505,7 +683,7 @@ class MostlyReliablePipe(object): msg_num = 1 while left > 0: seg = DataSegment.new_from_parts(msg_num, nmessages, - self._seq_counter, master_sha, data[:mtu]) + msg_seq, master_sha, data[:mtu]) self._outgoing.append(seg) msg_num = msg_num + 1 data = data[mtu:] @@ -513,16 +691,21 @@ class MostlyReliablePipe(object): self._schedule_send_worker() def _schedule_send_worker(self): - if len(self._outgoing) > 0 and self._worker == 0: - self._worker = gobject.idle_add(self._send_worker) + if len(self._outgoing) > 0 and self._send_worker == 0: + self._send_worker = gobject.idle_add(self._send_worker_cb) - def _send_worker(self): + def _send_worker_cb(self): """Send all queued segments that have yet to be transmitted.""" - self._worker = 0 + self._send_worker = 0 for segment in self._outgoing: packet = segment.packetize() segment.inc_transmits() - self._send_sock.sendto(packet, (self._remote_addr, self._port)) + addr = (self._remote_addr, self._port) + if segment.address(): + addr = segment.address() + self._send_sock.sendto(packet, addr) + if segment.userdata: + gobject.source_remove(segment.userdata) segment.userdata = None # Retransmission GSource key = (segment.message_sequence_number(), segment.master_sha(), segment.segment_number()) self._sent[key] = segment @@ -566,7 +749,7 @@ class SegmentBaseInitTestCase(SegmentBaseTestCase): def testGoodInit(self): seg = SegmentBase(self._DEF_SEGNO, self._DEF_TOT_SEGS, self._DEF_MSG_SEQ_NUM, self._DEF_MASTER_SHA) assert seg.stime() < time.time(), "segment start time is less than now!" - assert not seg.addr(), "Segment address was not None after init." + assert not seg.address(), "Segment address was not None after init." assert seg.segment_number() == self._DEF_SEGNO, "Segment number wasn't correct after init." assert seg.total_segments() == self._DEF_TOT_SEGS, "Total segments wasn't correct after init." assert seg.message_sequence_number() == self._DEF_MSG_SEQ_NUM, "Message sequence number wasn't correct after init." @@ -692,7 +875,7 @@ class DataSegmentTestCase(SegmentBaseTestCase): self._DEF_TOT_SEGS, self._DEF_MSG_SEQ_NUM, payload_sha) seg = SegmentBase.new_from_data(self._DEF_ADDRESS, header + payload) - assert seg.addr() == self._DEF_ADDRESS, "Segment address did not match expected." + assert seg.address() == self._DEF_ADDRESS, "Segment address did not match expected." assert seg.segment_type() == SegmentBase.type_data(), "Segment type did not match expected." assert seg.segment_number() == self._DEF_SEGNO, "Segment number did not match expected." assert seg.total_segments() == self._DEF_TOT_SEGS, "Total segments did not match expected." @@ -730,7 +913,8 @@ class RetransmitSegmentTestCase(SegmentBaseTestCase): def _test_new_from_parts_fail(self, msg_seq_num, rt_msg_seq_num, rt_master_sha, rt_segment_number, fail_msg): try: - seg = RetransmitSegment.new_from_parts(msg_seq_num, rt_msg_seq_num, rt_master_sha, rt_segment_number) + seg = RetransmitSegment.new_from_parts(self._DEF_ADDRESS, msg_seq_num, rt_msg_seq_num, + rt_master_sha, rt_segment_number) except ValueError, exc: pass else: @@ -775,7 +959,7 @@ class RetransmitSegmentTestCase(SegmentBaseTestCase): self._DEF_MASTER_SHA, "", "invalid retransmit message segment number") # Ensure message data is same as we stuff in after object is instantiated - seg = RetransmitSegment.new_from_parts(self._DEF_MSG_SEQ_NUM, self._DEF_MSG_SEQ_NUM, + seg = RetransmitSegment.new_from_parts(self._DEF_ADDRESS, self._DEF_MSG_SEQ_NUM, self._DEF_MSG_SEQ_NUM, self._DEF_MASTER_SHA, self._DEF_SEGNO) assert seg.rt_msg_seq_num() == self._DEF_MSG_SEQ_NUM, "RT message sequence number after segment creation didn't match expected." assert seg.rt_master_sha() == self._DEF_MASTER_SHA, "RT master SHA after segment creation didn't match expected." @@ -814,10 +998,19 @@ class RetransmitSegmentTestCase(SegmentBaseTestCase): assert seg.rt_master_sha() == self._DEF_MASTER_SHA, "Segment RT master SHA didn't match expected." assert seg.rt_segment_number() == self._DEF_SEGNO, "Segment RT segment number didn't match expected." + def testPartsToData(self): + seg = RetransmitSegment.new_from_parts(self._DEF_ADDRESS, self._DEF_MSG_SEQ_NUM, self._DEF_MSG_SEQ_NUM, + self._DEF_MASTER_SHA, self._DEF_SEGNO) + new_seg = SegmentBase.new_from_data(self._DEF_ADDRESS, seg.packetize()) + assert new_seg.rt_msg_seq_num() == self._DEF_MSG_SEQ_NUM, "Segment RT message sequence number didn't match expected." + assert new_seg.rt_master_sha() == self._DEF_MASTER_SHA, "Segment RT master SHA didn't match expected." + assert new_seg.rt_segment_number() == self._DEF_SEGNO, "Segment RT segment number didn't match expected." + def addToSuite(suite): suite.addTest(RetransmitSegmentTestCase("testInit")) suite.addTest(RetransmitSegmentTestCase("testNewFromParts")) suite.addTest(RetransmitSegmentTestCase("testNewFromData")) + suite.addTest(RetransmitSegmentTestCase("testPartsToData")) addToSuite = staticmethod(addToSuite) @@ -858,10 +1051,19 @@ def main(): def got_data(addr, data, user_data=None): print "Data (%s): %s" % (addr, data) -def foobar(): +def retry_test(): pipe = MostlyReliablePipe('', '224.0.0.222', 2293, got_data) + pipe.set_drop_probability(3) pipe.start() - pipe.send('The quick brown fox jumps over the lazy dog') + msg = """The said Eliza, John, and Georgiana were now clustered round their mama in the drawing-room: +she lay reclined on a sofa by the fireside, and with her darlings about her (for the time neither +quarrelling nor crying) looked perfectly happy. Me, she had dispensed from joining the group; saying, +'She regretted to be under the necessity of keeping me at a distance; but that until she heard from +Bessie, and could discover by her own observation, that I was endeavouring in good earnest to acquire +a more sociable and childlike disposition, a more attractive and sprightly manner -- something lighter, +franker, more natural, as it were -- she really must exclude me from privileges intended only for + contented, happy, little children.'""" + pipe.send(msg) try: gtk.main() except KeyboardInterrupt: