From 96a90ca32cf45da9b50f9ebc020471734d937bc5 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Fri, 19 May 2006 11:37:05 -0400 Subject: [PATCH] Pathetically simplify retransmit timings. --- sugar/p2p/MostlyReliablePipe.py | 141 ++++++++++++++++++-------------- 1 file changed, 80 insertions(+), 61 deletions(-) diff --git a/sugar/p2p/MostlyReliablePipe.py b/sugar/p2p/MostlyReliablePipe.py index 4a9dbdcb..9e794739 100644 --- a/sugar/p2p/MostlyReliablePipe.py +++ b/sugar/p2p/MostlyReliablePipe.py @@ -324,6 +324,7 @@ class Message(object): self._last_incoming_time = 0 self._segments = {} self._complete = False + self._dispatched_time = 0 self._data = None self._data_sha = None self._src_addr = src_addr @@ -359,20 +360,18 @@ class Message(object): return i return 0 - _DEF_RT_REQUEST_INTERVAL = 500 + _DEF_RT_REQUEST_INTERVAL = 0.09 # 70ms (in seconds) def update_rt_wait(self, now): """now argument should be in seconds.""" - msg_completeness = float(len(self._segments)) / float(self._total_segments) - next = self._DEF_RT_REQUEST_INTERVAL * (1.1 - msg_completeness) - rt_penalty = 100 - first_missing = self.first_missing() - if first_missing: - rt_penalty = self._rt_tries[first_missing] * 100 - # print "** msg_comp: %s, next: %d, rt_penalty: %s, rt: %s" % (msg_completeness, next, rt_penalty, (float(next + rt_penalty) / 1000)) - next = next + rt_penalty - self._next_rt_time = now + (float(next) * .001) + wait = self._DEF_RT_REQUEST_INTERVAL + if self._last_incoming_time > now - 0.02: + msg_completeness = float(len(self._segments)) / float(self._total_segments) + wait = wait + (self._DEF_RT_REQUEST_INTERVAL * (1.0 - msg_completeness)) + self._next_rt_time = now + wait def add_segment(self, segment): + if self.complete(): + return segno = segment.segment_number() if self._segments.has_key(segno): return @@ -382,13 +381,15 @@ class Message(object): self._last_incoming_time = now num_segs = len(self._segments) - if num_segs == segment.total_segments(): + if num_segs == self._total_segments: self._complete = True + self._next_rt_time = 0 self._data = '' for seg in self._segments.values(): self._data = self._data + seg.data() self._data_sha = _sha_data(self._data) - else: + elif segno == num_segs or num_segs == 1: + # If we're not missing segments, push back retransmit request self.update_rt_wait(now) def get_retransmit_message(self, msg_seq_num, segno): @@ -403,6 +404,12 @@ class Message(object): def complete(self): return self._complete + def dispatch_time(self): + return self._dispatch_time + + def set_dispatch_time(self): + self._dispatch_time = time.time() + def data(self): return (self._data, self._data_sha) @@ -441,6 +448,7 @@ class MostlyReliablePipe(object): self._sent = {} self._incoming = {} # (message sha, # of segments) -> [segment1, segment2, ...] + self._dispatched = {} self._setup_listener() self._setup_sender() @@ -483,18 +491,19 @@ class MostlyReliablePipe(object): # Watch the listener socket for data gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data) gobject.timeout_add(self._SEGMENT_TTL * 1000, self._segment_ttl_worker) - gobject.timeout_add(250, self._retransmit_check_worker) + gobject.timeout_add(50, self._retransmit_check_worker) self._started = True def _segment_ttl_worker(self): """Cull already-sent message segments that are past their TTL.""" now = time.time() - for segment in self._sent[:]: + for key in self._sent.keys()[:]: + segment = self._sent[key] if segment.stime() < now - self._SEGMENT_TTL: if segment.userdata: gobject.source_remove(segment.userdata) - self._sent.remove(segment) + del self._sent[key] # Cull incomplete incoming segment chains that haven't gotten any data # for a long time either @@ -502,11 +511,14 @@ class MostlyReliablePipe(object): message = self._incoming[msg_key] if message.last_incoming_time() < now - self._SEGMENT_TTL: del self._incoming[msg_key] - return True - def _dispatch_message(self, addr, message): - """Send complete message data to the owner's data callback.""" - self._data_cb(addr, message, self._user_data) + # Remove already dispatched messages after a while + for msg_key in self._dispatched.keys()[:]: + message = self._dispatched[msg_key] + if message.dispatch_time() < now - (self._SEGMENT_TTL*2): + del self._dispatched[msg_key] + + return True _MAX_SEGMENT_RETRIES = 10 def _retransmit_request(self, message): @@ -519,7 +531,7 @@ class MostlyReliablePipe(object): msg_seq = self._next_msg_seq() seg = message.get_retransmit_message(msg_seq, first_missing) if seg: - print " Requesting retransmit of %d by %s" % (first_missing, message.source_address()) + print "(MRP): Requesting retransmit of %d by %s" % (first_missing, message.source_address()) self._outgoing.append(seg) self._schedule_send_worker() return False @@ -530,11 +542,15 @@ class MostlyReliablePipe(object): message = self._incoming[key] if message.complete(): continue - if message.next_rt_time() < now: - if self._retransmit_request(message): - # Kill the message, too many retries - print "Dropped message %s, exceeded retries." % _stringify_sha(message.sha()) - del self._incoming[key] + next_rt = message.next_rt_time() + if next_rt == 0 or next_rt > now: + continue + if self._retransmit_request(message): + # Kill the message, too many retries + print "(MRP): Dropped message %s, exceeded retries." % _stringify_sha(message.sha()) + self._dispatched[key] = message + message.set_dispatch_time() + del self._incoming[key] return True def _process_incoming_data(self, segment): @@ -550,16 +566,13 @@ class MostlyReliablePipe(object): addr = segment.address() segno = segment.segment_number() - # Short-circuit single-segment messages - if segno == 1 and nsegs == 1: - # Ensure the header's master sha actually equals the data's sha - if msg_sha == _sha_data(segment.data()): - self._dispatch_message(addr, segment.data()) - return - - # Otherwise, track the new segment msg_seq_num = segment.message_sequence_number() msg_key = (addr[0], msg_seq_num, msg_sha, nsegs) + + if self._dispatched.has_key(msg_key): + # We already dispatched this message, this segment is useless + return + # First segment in the message if not self._incoming.has_key(msg_key): self._incoming[msg_key] = Message((addr[0], self._port), msg_seq_num, msg_sha, nsegs) @@ -573,20 +586,12 @@ class MostlyReliablePipe(object): if message.complete(): (msg_data, complete_data_sha) = message.data() if msg_sha == complete_data_sha: - self._dispatch_message(addr, msg_data) + self._data_cb(addr, msg_data, self._user_data) + self._dispatched[msg_key] = message + message.set_dispatch_time() del self._incoming[msg_key] return - _STD_RETRANSMIT_INTERVAL = 150 # in milliseconds - def _calc_next_retransmit(self, segment, now): - """Calculate the next time (in seconds) that a packet can be retransmitted.""" - num_retrans = segment.transmits() - 1 - interval = num_retrans * self._STD_RETRANSMIT_INTERVAL - randomness = num_retrans * ((random.random() * 15) - 5) - real_interval = max(self._STD_RETRANSMIT_INTERVAL, interval + randomness) -# print "------ Interval %s" % (real_interval * .001) - return max(now, segment.last_transmit() + (real_interval * .001)) - def _segment_retransmit_cb(self, key, segment): """Add a segment ot the outgoing queue and schedule its transmission.""" del self._sent[key] @@ -594,22 +599,22 @@ class MostlyReliablePipe(object): self._schedule_send_worker() return False - def _schedule_segment_retransmit(self, key, segment, when): + def _schedule_segment_retransmit(self, key, segment, when, now): """Schedule retransmission of a segment if one is not already scheduled.""" if segment.userdata: # Already scheduled for retransmit return - if when == 0: + if when <= now: # Immediate retransmission - segment.userdata = gobject.idle_add(self._segment_retransmit_cb, key, segment) + self._segment_retransmit_cb(key, segment) else: # convert time to milliseconds - timeout = int((when - time.time()) * 1000) - print "RT timeout: %s" % timeout + timeout = int((when - now) * 1000) segment.userdata = gobject.timeout_add(timeout, self._segment_retransmit_cb, key, segment) + _STD_RETRANSMIT_INTERVAL = 0.05 # 50ms (in seconds) def _process_retransmit_request(self, segment): """Validate and process a retransmission request.""" key = (segment.rt_msg_seq_num(), segment.rt_master_sha(), segment.rt_segment_number()) @@ -619,9 +624,10 @@ class MostlyReliablePipe(object): # Calculate next retransmission time and schedule packet for retransmit segment = self._sent[key] + # only retransmit segments every 150ms or more now = time.time() - next_retrans = self._calc_next_retransmit(segment, now) - self._schedule_segment_retransmit(key, segment, next_retrans - now) + next_transmit = max(now, segment.last_transmit() + self._STD_RETRANSMIT_INTERVAL) + self._schedule_segment_retransmit(key, segment, next_transmit, now) def set_drop_probability(self, prob=4): """Debugging function to randomly drop incoming packets. @@ -645,7 +651,7 @@ class MostlyReliablePipe(object): try: segment = SegmentBase.new_from_data(addr, data) if should_drop: - print "Dropped segment %d (p (%s) < %s)." % (segment.segment_number(), p, self._drop_prob) + print "(MRP): Dropped segment %d." % segment.segment_number() else: stype = segment.segment_type() if stype == SegmentBase.type_data(): @@ -653,7 +659,7 @@ class MostlyReliablePipe(object): elif stype == SegmentBase.type_retransmit(): self._process_retransmit_request(segment) except ValueError, exc: - print "Bad segment: %s" % exc + print "(MRP): Bad segment: %s" % exc return True def _next_msg_seq(self): @@ -692,11 +698,12 @@ class MostlyReliablePipe(object): def _schedule_send_worker(self): if len(self._outgoing) > 0 and self._send_worker == 0: - self._send_worker = gobject.idle_add(self._send_worker_cb) + self._send_worker = gobject.timeout_add(50, self._send_worker_cb) def _send_worker_cb(self): """Send all queued segments that have yet to be transmitted.""" self._send_worker = 0 + nsent = 0 for segment in self._outgoing: packet = segment.packetize() segment.inc_transmits() @@ -709,7 +716,12 @@ class MostlyReliablePipe(object): segment.userdata = None # Retransmission GSource key = (segment.message_sequence_number(), segment.master_sha(), segment.segment_number()) self._sent[key] = segment - self._outgoing = [] + nsent = nsent + 1 + if nsent > 10: + break + self._outgoing = self._outgoing[nsent:] + if len(self._outgoing): + self._schedule_send_worker() return False @@ -1036,7 +1048,7 @@ class SHAUtilsTestCase(unittest.TestCase): -def main(): +def foobar(): suite = unittest.TestSuite() SegmentBaseInitTestCase.addToSuite(suite) DataSegmentTestCase.addToSuite(suite) @@ -1049,12 +1061,19 @@ def main(): def got_data(addr, data, user_data=None): - print "Data (%s): %s" % (addr, data) + print "Got data from %s, writing to %s." % (addr, user_data) + fl = open(user_data, "w+") + fl.write(data) + fl.close() -def retry_test(): - pipe = MostlyReliablePipe('', '224.0.0.222', 2293, got_data) - pipe.set_drop_probability(3) +def main(): + import sys + pipe = MostlyReliablePipe('', '224.0.0.222', 2293, got_data, sys.argv[2]) +# pipe.set_drop_probability(4) pipe.start() + fl = open(sys.argv[1], "r") + data = fl.read() + fl.close() msg = """The said Eliza, John, and Georgiana were now clustered round their mama in the drawing-room: she lay reclined on a sofa by the fireside, and with her darlings about her (for the time neither quarrelling nor crying) looked perfectly happy. Me, she had dispensed from joining the group; saying, @@ -1063,7 +1082,7 @@ Bessie, and could discover by her own observation, that I was endeavouring in go a more sociable and childlike disposition, a more attractive and sprightly manner -- something lighter, franker, more natural, as it were -- she really must exclude me from privileges intended only for contented, happy, little children.'""" - pipe.send(msg) + pipe.send(data) try: gtk.main() except KeyboardInterrupt: