Pathetically simplify retransmit timings.
This commit is contained in:
parent
4dca6521aa
commit
96a90ca32c
@ -324,6 +324,7 @@ class Message(object):
|
|||||||
self._last_incoming_time = 0
|
self._last_incoming_time = 0
|
||||||
self._segments = {}
|
self._segments = {}
|
||||||
self._complete = False
|
self._complete = False
|
||||||
|
self._dispatched_time = 0
|
||||||
self._data = None
|
self._data = None
|
||||||
self._data_sha = None
|
self._data_sha = None
|
||||||
self._src_addr = src_addr
|
self._src_addr = src_addr
|
||||||
@ -359,20 +360,18 @@ class Message(object):
|
|||||||
return i
|
return i
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
_DEF_RT_REQUEST_INTERVAL = 500
|
_DEF_RT_REQUEST_INTERVAL = 0.09 # 70ms (in seconds)
|
||||||
def update_rt_wait(self, now):
|
def update_rt_wait(self, now):
|
||||||
"""now argument should be in seconds."""
|
"""now argument should be in seconds."""
|
||||||
|
wait = self._DEF_RT_REQUEST_INTERVAL
|
||||||
|
if self._last_incoming_time > now - 0.02:
|
||||||
msg_completeness = float(len(self._segments)) / float(self._total_segments)
|
msg_completeness = float(len(self._segments)) / float(self._total_segments)
|
||||||
next = self._DEF_RT_REQUEST_INTERVAL * (1.1 - msg_completeness)
|
wait = wait + (self._DEF_RT_REQUEST_INTERVAL * (1.0 - msg_completeness))
|
||||||
rt_penalty = 100
|
self._next_rt_time = now + wait
|
||||||
first_missing = self.first_missing()
|
|
||||||
if first_missing:
|
|
||||||
rt_penalty = self._rt_tries[first_missing] * 100
|
|
||||||
# print "** msg_comp: %s, next: %d, rt_penalty: %s, rt: %s" % (msg_completeness, next, rt_penalty, (float(next + rt_penalty) / 1000))
|
|
||||||
next = next + rt_penalty
|
|
||||||
self._next_rt_time = now + (float(next) * .001)
|
|
||||||
|
|
||||||
def add_segment(self, segment):
|
def add_segment(self, segment):
|
||||||
|
if self.complete():
|
||||||
|
return
|
||||||
segno = segment.segment_number()
|
segno = segment.segment_number()
|
||||||
if self._segments.has_key(segno):
|
if self._segments.has_key(segno):
|
||||||
return
|
return
|
||||||
@ -382,13 +381,15 @@ class Message(object):
|
|||||||
self._last_incoming_time = now
|
self._last_incoming_time = now
|
||||||
|
|
||||||
num_segs = len(self._segments)
|
num_segs = len(self._segments)
|
||||||
if num_segs == segment.total_segments():
|
if num_segs == self._total_segments:
|
||||||
self._complete = True
|
self._complete = True
|
||||||
|
self._next_rt_time = 0
|
||||||
self._data = ''
|
self._data = ''
|
||||||
for seg in self._segments.values():
|
for seg in self._segments.values():
|
||||||
self._data = self._data + seg.data()
|
self._data = self._data + seg.data()
|
||||||
self._data_sha = _sha_data(self._data)
|
self._data_sha = _sha_data(self._data)
|
||||||
else:
|
elif segno == num_segs or num_segs == 1:
|
||||||
|
# If we're not missing segments, push back retransmit request
|
||||||
self.update_rt_wait(now)
|
self.update_rt_wait(now)
|
||||||
|
|
||||||
def get_retransmit_message(self, msg_seq_num, segno):
|
def get_retransmit_message(self, msg_seq_num, segno):
|
||||||
@ -403,6 +404,12 @@ class Message(object):
|
|||||||
def complete(self):
|
def complete(self):
|
||||||
return self._complete
|
return self._complete
|
||||||
|
|
||||||
|
def dispatch_time(self):
|
||||||
|
return self._dispatch_time
|
||||||
|
|
||||||
|
def set_dispatch_time(self):
|
||||||
|
self._dispatch_time = time.time()
|
||||||
|
|
||||||
def data(self):
|
def data(self):
|
||||||
return (self._data, self._data_sha)
|
return (self._data, self._data_sha)
|
||||||
|
|
||||||
@ -441,6 +448,7 @@ class MostlyReliablePipe(object):
|
|||||||
self._sent = {}
|
self._sent = {}
|
||||||
|
|
||||||
self._incoming = {} # (message sha, # of segments) -> [segment1, segment2, ...]
|
self._incoming = {} # (message sha, # of segments) -> [segment1, segment2, ...]
|
||||||
|
self._dispatched = {}
|
||||||
|
|
||||||
self._setup_listener()
|
self._setup_listener()
|
||||||
self._setup_sender()
|
self._setup_sender()
|
||||||
@ -483,18 +491,19 @@ class MostlyReliablePipe(object):
|
|||||||
# Watch the listener socket for data
|
# Watch the listener socket for data
|
||||||
gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
|
gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
|
||||||
gobject.timeout_add(self._SEGMENT_TTL * 1000, self._segment_ttl_worker)
|
gobject.timeout_add(self._SEGMENT_TTL * 1000, self._segment_ttl_worker)
|
||||||
gobject.timeout_add(250, self._retransmit_check_worker)
|
gobject.timeout_add(50, self._retransmit_check_worker)
|
||||||
|
|
||||||
self._started = True
|
self._started = True
|
||||||
|
|
||||||
def _segment_ttl_worker(self):
|
def _segment_ttl_worker(self):
|
||||||
"""Cull already-sent message segments that are past their TTL."""
|
"""Cull already-sent message segments that are past their TTL."""
|
||||||
now = time.time()
|
now = time.time()
|
||||||
for segment in self._sent[:]:
|
for key in self._sent.keys()[:]:
|
||||||
|
segment = self._sent[key]
|
||||||
if segment.stime() < now - self._SEGMENT_TTL:
|
if segment.stime() < now - self._SEGMENT_TTL:
|
||||||
if segment.userdata:
|
if segment.userdata:
|
||||||
gobject.source_remove(segment.userdata)
|
gobject.source_remove(segment.userdata)
|
||||||
self._sent.remove(segment)
|
del self._sent[key]
|
||||||
|
|
||||||
# Cull incomplete incoming segment chains that haven't gotten any data
|
# Cull incomplete incoming segment chains that haven't gotten any data
|
||||||
# for a long time either
|
# for a long time either
|
||||||
@ -502,11 +511,14 @@ class MostlyReliablePipe(object):
|
|||||||
message = self._incoming[msg_key]
|
message = self._incoming[msg_key]
|
||||||
if message.last_incoming_time() < now - self._SEGMENT_TTL:
|
if message.last_incoming_time() < now - self._SEGMENT_TTL:
|
||||||
del self._incoming[msg_key]
|
del self._incoming[msg_key]
|
||||||
return True
|
|
||||||
|
|
||||||
def _dispatch_message(self, addr, message):
|
# Remove already dispatched messages after a while
|
||||||
"""Send complete message data to the owner's data callback."""
|
for msg_key in self._dispatched.keys()[:]:
|
||||||
self._data_cb(addr, message, self._user_data)
|
message = self._dispatched[msg_key]
|
||||||
|
if message.dispatch_time() < now - (self._SEGMENT_TTL*2):
|
||||||
|
del self._dispatched[msg_key]
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
_MAX_SEGMENT_RETRIES = 10
|
_MAX_SEGMENT_RETRIES = 10
|
||||||
def _retransmit_request(self, message):
|
def _retransmit_request(self, message):
|
||||||
@ -519,7 +531,7 @@ class MostlyReliablePipe(object):
|
|||||||
msg_seq = self._next_msg_seq()
|
msg_seq = self._next_msg_seq()
|
||||||
seg = message.get_retransmit_message(msg_seq, first_missing)
|
seg = message.get_retransmit_message(msg_seq, first_missing)
|
||||||
if seg:
|
if seg:
|
||||||
print " Requesting retransmit of %d by %s" % (first_missing, message.source_address())
|
print "(MRP): Requesting retransmit of %d by %s" % (first_missing, message.source_address())
|
||||||
self._outgoing.append(seg)
|
self._outgoing.append(seg)
|
||||||
self._schedule_send_worker()
|
self._schedule_send_worker()
|
||||||
return False
|
return False
|
||||||
@ -530,10 +542,14 @@ class MostlyReliablePipe(object):
|
|||||||
message = self._incoming[key]
|
message = self._incoming[key]
|
||||||
if message.complete():
|
if message.complete():
|
||||||
continue
|
continue
|
||||||
if message.next_rt_time() < now:
|
next_rt = message.next_rt_time()
|
||||||
|
if next_rt == 0 or next_rt > now:
|
||||||
|
continue
|
||||||
if self._retransmit_request(message):
|
if self._retransmit_request(message):
|
||||||
# Kill the message, too many retries
|
# Kill the message, too many retries
|
||||||
print "Dropped message %s, exceeded retries." % _stringify_sha(message.sha())
|
print "(MRP): Dropped message %s, exceeded retries." % _stringify_sha(message.sha())
|
||||||
|
self._dispatched[key] = message
|
||||||
|
message.set_dispatch_time()
|
||||||
del self._incoming[key]
|
del self._incoming[key]
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -550,16 +566,13 @@ class MostlyReliablePipe(object):
|
|||||||
addr = segment.address()
|
addr = segment.address()
|
||||||
segno = segment.segment_number()
|
segno = segment.segment_number()
|
||||||
|
|
||||||
# Short-circuit single-segment messages
|
|
||||||
if segno == 1 and nsegs == 1:
|
|
||||||
# Ensure the header's master sha actually equals the data's sha
|
|
||||||
if msg_sha == _sha_data(segment.data()):
|
|
||||||
self._dispatch_message(addr, segment.data())
|
|
||||||
return
|
|
||||||
|
|
||||||
# Otherwise, track the new segment
|
|
||||||
msg_seq_num = segment.message_sequence_number()
|
msg_seq_num = segment.message_sequence_number()
|
||||||
msg_key = (addr[0], msg_seq_num, msg_sha, nsegs)
|
msg_key = (addr[0], msg_seq_num, msg_sha, nsegs)
|
||||||
|
|
||||||
|
if self._dispatched.has_key(msg_key):
|
||||||
|
# We already dispatched this message, this segment is useless
|
||||||
|
return
|
||||||
|
# First segment in the message
|
||||||
if not self._incoming.has_key(msg_key):
|
if not self._incoming.has_key(msg_key):
|
||||||
self._incoming[msg_key] = Message((addr[0], self._port), msg_seq_num, msg_sha, nsegs)
|
self._incoming[msg_key] = Message((addr[0], self._port), msg_seq_num, msg_sha, nsegs)
|
||||||
|
|
||||||
@ -573,20 +586,12 @@ class MostlyReliablePipe(object):
|
|||||||
if message.complete():
|
if message.complete():
|
||||||
(msg_data, complete_data_sha) = message.data()
|
(msg_data, complete_data_sha) = message.data()
|
||||||
if msg_sha == complete_data_sha:
|
if msg_sha == complete_data_sha:
|
||||||
self._dispatch_message(addr, msg_data)
|
self._data_cb(addr, msg_data, self._user_data)
|
||||||
|
self._dispatched[msg_key] = message
|
||||||
|
message.set_dispatch_time()
|
||||||
del self._incoming[msg_key]
|
del self._incoming[msg_key]
|
||||||
return
|
return
|
||||||
|
|
||||||
_STD_RETRANSMIT_INTERVAL = 150 # in milliseconds
|
|
||||||
def _calc_next_retransmit(self, segment, now):
|
|
||||||
"""Calculate the next time (in seconds) that a packet can be retransmitted."""
|
|
||||||
num_retrans = segment.transmits() - 1
|
|
||||||
interval = num_retrans * self._STD_RETRANSMIT_INTERVAL
|
|
||||||
randomness = num_retrans * ((random.random() * 15) - 5)
|
|
||||||
real_interval = max(self._STD_RETRANSMIT_INTERVAL, interval + randomness)
|
|
||||||
# print "------ Interval %s" % (real_interval * .001)
|
|
||||||
return max(now, segment.last_transmit() + (real_interval * .001))
|
|
||||||
|
|
||||||
def _segment_retransmit_cb(self, key, segment):
|
def _segment_retransmit_cb(self, key, segment):
|
||||||
"""Add a segment ot the outgoing queue and schedule its transmission."""
|
"""Add a segment ot the outgoing queue and schedule its transmission."""
|
||||||
del self._sent[key]
|
del self._sent[key]
|
||||||
@ -594,22 +599,22 @@ class MostlyReliablePipe(object):
|
|||||||
self._schedule_send_worker()
|
self._schedule_send_worker()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _schedule_segment_retransmit(self, key, segment, when):
|
def _schedule_segment_retransmit(self, key, segment, when, now):
|
||||||
"""Schedule retransmission of a segment if one is not already scheduled."""
|
"""Schedule retransmission of a segment if one is not already scheduled."""
|
||||||
if segment.userdata:
|
if segment.userdata:
|
||||||
# Already scheduled for retransmit
|
# Already scheduled for retransmit
|
||||||
return
|
return
|
||||||
|
|
||||||
if when == 0:
|
if when <= now:
|
||||||
# Immediate retransmission
|
# Immediate retransmission
|
||||||
segment.userdata = gobject.idle_add(self._segment_retransmit_cb, key, segment)
|
self._segment_retransmit_cb(key, segment)
|
||||||
else:
|
else:
|
||||||
# convert time to milliseconds
|
# convert time to milliseconds
|
||||||
timeout = int((when - time.time()) * 1000)
|
timeout = int((when - now) * 1000)
|
||||||
print "RT timeout: %s" % timeout
|
|
||||||
segment.userdata = gobject.timeout_add(timeout, self._segment_retransmit_cb,
|
segment.userdata = gobject.timeout_add(timeout, self._segment_retransmit_cb,
|
||||||
key, segment)
|
key, segment)
|
||||||
|
|
||||||
|
_STD_RETRANSMIT_INTERVAL = 0.05 # 50ms (in seconds)
|
||||||
def _process_retransmit_request(self, segment):
|
def _process_retransmit_request(self, segment):
|
||||||
"""Validate and process a retransmission request."""
|
"""Validate and process a retransmission request."""
|
||||||
key = (segment.rt_msg_seq_num(), segment.rt_master_sha(), segment.rt_segment_number())
|
key = (segment.rt_msg_seq_num(), segment.rt_master_sha(), segment.rt_segment_number())
|
||||||
@ -619,9 +624,10 @@ class MostlyReliablePipe(object):
|
|||||||
|
|
||||||
# Calculate next retransmission time and schedule packet for retransmit
|
# Calculate next retransmission time and schedule packet for retransmit
|
||||||
segment = self._sent[key]
|
segment = self._sent[key]
|
||||||
|
# only retransmit segments every 150ms or more
|
||||||
now = time.time()
|
now = time.time()
|
||||||
next_retrans = self._calc_next_retransmit(segment, now)
|
next_transmit = max(now, segment.last_transmit() + self._STD_RETRANSMIT_INTERVAL)
|
||||||
self._schedule_segment_retransmit(key, segment, next_retrans - now)
|
self._schedule_segment_retransmit(key, segment, next_transmit, now)
|
||||||
|
|
||||||
def set_drop_probability(self, prob=4):
|
def set_drop_probability(self, prob=4):
|
||||||
"""Debugging function to randomly drop incoming packets.
|
"""Debugging function to randomly drop incoming packets.
|
||||||
@ -645,7 +651,7 @@ class MostlyReliablePipe(object):
|
|||||||
try:
|
try:
|
||||||
segment = SegmentBase.new_from_data(addr, data)
|
segment = SegmentBase.new_from_data(addr, data)
|
||||||
if should_drop:
|
if should_drop:
|
||||||
print "Dropped segment %d (p (%s) < %s)." % (segment.segment_number(), p, self._drop_prob)
|
print "(MRP): Dropped segment %d." % segment.segment_number()
|
||||||
else:
|
else:
|
||||||
stype = segment.segment_type()
|
stype = segment.segment_type()
|
||||||
if stype == SegmentBase.type_data():
|
if stype == SegmentBase.type_data():
|
||||||
@ -653,7 +659,7 @@ class MostlyReliablePipe(object):
|
|||||||
elif stype == SegmentBase.type_retransmit():
|
elif stype == SegmentBase.type_retransmit():
|
||||||
self._process_retransmit_request(segment)
|
self._process_retransmit_request(segment)
|
||||||
except ValueError, exc:
|
except ValueError, exc:
|
||||||
print "Bad segment: %s" % exc
|
print "(MRP): Bad segment: %s" % exc
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _next_msg_seq(self):
|
def _next_msg_seq(self):
|
||||||
@ -692,11 +698,12 @@ class MostlyReliablePipe(object):
|
|||||||
|
|
||||||
def _schedule_send_worker(self):
|
def _schedule_send_worker(self):
|
||||||
if len(self._outgoing) > 0 and self._send_worker == 0:
|
if len(self._outgoing) > 0 and self._send_worker == 0:
|
||||||
self._send_worker = gobject.idle_add(self._send_worker_cb)
|
self._send_worker = gobject.timeout_add(50, self._send_worker_cb)
|
||||||
|
|
||||||
def _send_worker_cb(self):
|
def _send_worker_cb(self):
|
||||||
"""Send all queued segments that have yet to be transmitted."""
|
"""Send all queued segments that have yet to be transmitted."""
|
||||||
self._send_worker = 0
|
self._send_worker = 0
|
||||||
|
nsent = 0
|
||||||
for segment in self._outgoing:
|
for segment in self._outgoing:
|
||||||
packet = segment.packetize()
|
packet = segment.packetize()
|
||||||
segment.inc_transmits()
|
segment.inc_transmits()
|
||||||
@ -709,7 +716,12 @@ class MostlyReliablePipe(object):
|
|||||||
segment.userdata = None # Retransmission GSource
|
segment.userdata = None # Retransmission GSource
|
||||||
key = (segment.message_sequence_number(), segment.master_sha(), segment.segment_number())
|
key = (segment.message_sequence_number(), segment.master_sha(), segment.segment_number())
|
||||||
self._sent[key] = segment
|
self._sent[key] = segment
|
||||||
self._outgoing = []
|
nsent = nsent + 1
|
||||||
|
if nsent > 10:
|
||||||
|
break
|
||||||
|
self._outgoing = self._outgoing[nsent:]
|
||||||
|
if len(self._outgoing):
|
||||||
|
self._schedule_send_worker()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@ -1036,7 +1048,7 @@ class SHAUtilsTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def foobar():
|
||||||
suite = unittest.TestSuite()
|
suite = unittest.TestSuite()
|
||||||
SegmentBaseInitTestCase.addToSuite(suite)
|
SegmentBaseInitTestCase.addToSuite(suite)
|
||||||
DataSegmentTestCase.addToSuite(suite)
|
DataSegmentTestCase.addToSuite(suite)
|
||||||
@ -1049,12 +1061,19 @@ def main():
|
|||||||
|
|
||||||
|
|
||||||
def got_data(addr, data, user_data=None):
|
def got_data(addr, data, user_data=None):
|
||||||
print "Data (%s): %s" % (addr, data)
|
print "Got data from %s, writing to %s." % (addr, user_data)
|
||||||
|
fl = open(user_data, "w+")
|
||||||
|
fl.write(data)
|
||||||
|
fl.close()
|
||||||
|
|
||||||
def retry_test():
|
def main():
|
||||||
pipe = MostlyReliablePipe('', '224.0.0.222', 2293, got_data)
|
import sys
|
||||||
pipe.set_drop_probability(3)
|
pipe = MostlyReliablePipe('', '224.0.0.222', 2293, got_data, sys.argv[2])
|
||||||
|
# pipe.set_drop_probability(4)
|
||||||
pipe.start()
|
pipe.start()
|
||||||
|
fl = open(sys.argv[1], "r")
|
||||||
|
data = fl.read()
|
||||||
|
fl.close()
|
||||||
msg = """The said Eliza, John, and Georgiana were now clustered round their mama in the drawing-room:
|
msg = """The said Eliza, John, and Georgiana were now clustered round their mama in the drawing-room:
|
||||||
she lay reclined on a sofa by the fireside, and with her darlings about her (for the time neither
|
she lay reclined on a sofa by the fireside, and with her darlings about her (for the time neither
|
||||||
quarrelling nor crying) looked perfectly happy. Me, she had dispensed from joining the group; saying,
|
quarrelling nor crying) looked perfectly happy. Me, she had dispensed from joining the group; saying,
|
||||||
@ -1063,7 +1082,7 @@ Bessie, and could discover by her own observation, that I was endeavouring in go
|
|||||||
a more sociable and childlike disposition, a more attractive and sprightly manner -- something lighter,
|
a more sociable and childlike disposition, a more attractive and sprightly manner -- something lighter,
|
||||||
franker, more natural, as it were -- she really must exclude me from privileges intended only for
|
franker, more natural, as it were -- she really must exclude me from privileges intended only for
|
||||||
contented, happy, little children.'"""
|
contented, happy, little children.'"""
|
||||||
pipe.send(msg)
|
pipe.send(data)
|
||||||
try:
|
try:
|
||||||
gtk.main()
|
gtk.main()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
Loading…
Reference in New Issue
Block a user