Implement retransmit requests on receiver side, fix some stuff up.
This commit is contained in:
parent
3e1f404879
commit
533dbd638e
@ -100,15 +100,18 @@ class SegmentBase(object):
|
|||||||
self._msg_seq_num = msg_seq_num
|
self._msg_seq_num = msg_seq_num
|
||||||
self._master_sha = master_sha
|
self._master_sha = master_sha
|
||||||
|
|
||||||
def new_from_data(addr, data):
|
def _validate_address(addr):
|
||||||
"""Static constructor for creation from a packed data stream."""
|
|
||||||
|
|
||||||
if not addr or type(addr) != type(()):
|
if not addr or type(addr) != type(()):
|
||||||
raise ValueError("Address must be a tuple.")
|
raise ValueError("Address must be a tuple.")
|
||||||
if len(addr) != 2 or type(addr[0]) != type("") or type(addr[1]) != type(1):
|
if len(addr) != 2 or type(addr[0]) != type("") or type(addr[1]) != type(1):
|
||||||
raise ValueError("Address format was invalid.")
|
raise ValueError("Address format was invalid.")
|
||||||
if addr[1] < 1 or addr[1] > 65535:
|
if addr[1] < 1 or addr[1] > 65535:
|
||||||
raise ValueError("Address port was invalid.")
|
raise ValueError("Address port was invalid.")
|
||||||
|
_validate_address = staticmethod(_validate_address)
|
||||||
|
|
||||||
|
def new_from_data(addr, data):
|
||||||
|
"""Static constructor for creation from a packed data stream."""
|
||||||
|
SegmentBase._validate_address(addr)
|
||||||
|
|
||||||
# Verify minimum length
|
# Verify minimum length
|
||||||
if not data:
|
if not data:
|
||||||
@ -162,7 +165,7 @@ class SegmentBase(object):
|
|||||||
def stime(self):
|
def stime(self):
|
||||||
return self._stime
|
return self._stime
|
||||||
|
|
||||||
def addr(self):
|
def address(self):
|
||||||
return self._addr
|
return self._addr
|
||||||
|
|
||||||
def segment_number(self):
|
def segment_number(self):
|
||||||
@ -271,7 +274,7 @@ class RetransmitSegment(SegmentBase):
|
|||||||
return (data, _sha_data(data))
|
return (data, _sha_data(data))
|
||||||
_make_rtms_data = staticmethod(_make_rtms_data)
|
_make_rtms_data = staticmethod(_make_rtms_data)
|
||||||
|
|
||||||
def new_from_parts(msg_seq_num, rt_msg_seq_num, rt_master_sha, rt_segment_number):
|
def new_from_parts(addr, msg_seq_num, rt_msg_seq_num, rt_master_sha, rt_segment_number):
|
||||||
"""Static constructor for creation from individual attributes."""
|
"""Static constructor for creation from individual attributes."""
|
||||||
|
|
||||||
RetransmitSegment._verify_data(rt_msg_seq_num, rt_master_sha, rt_segment_number)
|
RetransmitSegment._verify_data(rt_msg_seq_num, rt_master_sha, rt_segment_number)
|
||||||
@ -280,6 +283,8 @@ class RetransmitSegment(SegmentBase):
|
|||||||
segment = RetransmitSegment(1, 1, msg_seq_num, data_sha)
|
segment = RetransmitSegment(1, 1, msg_seq_num, data_sha)
|
||||||
segment._data_len = RetransmitSegment._RT_DATA_LEN
|
segment._data_len = RetransmitSegment._RT_DATA_LEN
|
||||||
segment._data = data
|
segment._data = data
|
||||||
|
SegmentBase._validate_address(addr)
|
||||||
|
segment._addr = addr
|
||||||
|
|
||||||
segment._rt_msg_seq_num = rt_msg_seq_num
|
segment._rt_msg_seq_num = rt_msg_seq_num
|
||||||
segment._rt_master_sha = rt_master_sha
|
segment._rt_master_sha = rt_master_sha
|
||||||
@ -310,6 +315,109 @@ class RetransmitSegment(SegmentBase):
|
|||||||
return self._rt_segment_number
|
return self._rt_segment_number
|
||||||
|
|
||||||
|
|
||||||
|
class Message(object):
|
||||||
|
"""Tracks an entire message object, which is composed of a number
|
||||||
|
of individual segments."""
|
||||||
|
def __init__(self, src_addr, msg_seq_num, msg_sha, total_segments):
|
||||||
|
self._rt_target = 0
|
||||||
|
self._next_rt_time = 0
|
||||||
|
self._last_incoming_time = 0
|
||||||
|
self._segments = {}
|
||||||
|
self._complete = False
|
||||||
|
self._data = None
|
||||||
|
self._data_sha = None
|
||||||
|
self._src_addr = src_addr
|
||||||
|
self._msg_seq_num = msg_seq_num
|
||||||
|
self._msg_sha = msg_sha
|
||||||
|
self._total_segments = total_segments
|
||||||
|
self._rt_tries = {}
|
||||||
|
for i in range(1, self._total_segments + 1):
|
||||||
|
self._rt_tries[i] = 0
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.clear()
|
||||||
|
|
||||||
|
def sha(self):
|
||||||
|
return self._msg_sha
|
||||||
|
|
||||||
|
def source_address(self):
|
||||||
|
return self._src_addr
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
for key in self._segments.keys()[:]:
|
||||||
|
del self._segments[key]
|
||||||
|
del self._rt_tries[key]
|
||||||
|
self._segments = {}
|
||||||
|
self._rt_tries = {}
|
||||||
|
|
||||||
|
def has_segment(self, segno):
|
||||||
|
return self._segments.has_key(segno)
|
||||||
|
|
||||||
|
def first_missing(self):
|
||||||
|
for i in range(1, self._total_segments + 1):
|
||||||
|
if not self._segments.has_key(i):
|
||||||
|
return i
|
||||||
|
return 0
|
||||||
|
|
||||||
|
_DEF_RT_REQUEST_INTERVAL = 500
|
||||||
|
def update_rt_wait(self, now):
|
||||||
|
"""now argument should be in seconds."""
|
||||||
|
msg_completeness = float(len(self._segments)) / float(self._total_segments)
|
||||||
|
next = self._DEF_RT_REQUEST_INTERVAL * (1.1 - msg_completeness)
|
||||||
|
rt_penalty = 100
|
||||||
|
first_missing = self.first_missing()
|
||||||
|
if first_missing:
|
||||||
|
rt_penalty = self._rt_tries[first_missing] * 100
|
||||||
|
# print "** msg_comp: %s, next: %d, rt_penalty: %s, rt: %s" % (msg_completeness, next, rt_penalty, (float(next + rt_penalty) / 1000))
|
||||||
|
next = next + rt_penalty
|
||||||
|
self._next_rt_time = now + (float(next) * .001)
|
||||||
|
|
||||||
|
def add_segment(self, segment):
|
||||||
|
segno = segment.segment_number()
|
||||||
|
if self._segments.has_key(segno):
|
||||||
|
return
|
||||||
|
self._segments[segno] = segment
|
||||||
|
self._rt_tries[segno] = 0
|
||||||
|
now = time.time()
|
||||||
|
self._last_incoming_time = now
|
||||||
|
|
||||||
|
num_segs = len(self._segments)
|
||||||
|
if num_segs == segment.total_segments():
|
||||||
|
self._complete = True
|
||||||
|
self._data = ''
|
||||||
|
for seg in self._segments.values():
|
||||||
|
self._data = self._data + seg.data()
|
||||||
|
self._data_sha = _sha_data(self._data)
|
||||||
|
else:
|
||||||
|
self.update_rt_wait(now)
|
||||||
|
|
||||||
|
def get_retransmit_message(self, msg_seq_num, segno):
|
||||||
|
if segno < 1 or segno > self._total_segments:
|
||||||
|
return None
|
||||||
|
seg = RetransmitSegment.new_from_parts(self._src_addr, msg_seq_num,
|
||||||
|
self._msg_seq_num, self._msg_sha, segno)
|
||||||
|
self._rt_tries[segno] = self._rt_tries[segno] + 1
|
||||||
|
self.update_rt_wait(time.time())
|
||||||
|
return seg
|
||||||
|
|
||||||
|
def complete(self):
|
||||||
|
return self._complete
|
||||||
|
|
||||||
|
def data(self):
|
||||||
|
return (self._data, self._data_sha)
|
||||||
|
|
||||||
|
def last_incoming_time(self):
|
||||||
|
return self._last_incoming_time
|
||||||
|
|
||||||
|
def next_rt_time(self):
|
||||||
|
return self._next_rt_time
|
||||||
|
|
||||||
|
def rt_tries(self, segno):
|
||||||
|
if self._rt_tries.has_key(segno):
|
||||||
|
return self._rt_tries[segno]
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
class MostlyReliablePipe(object):
|
class MostlyReliablePipe(object):
|
||||||
"""Implement Mostly-Reliable UDP. We don't actually care about guaranteeing
|
"""Implement Mostly-Reliable UDP. We don't actually care about guaranteeing
|
||||||
delivery or receipt, just a better effort than no effort at all."""
|
delivery or receipt, just a better effort than no effort at all."""
|
||||||
@ -324,8 +432,10 @@ class MostlyReliablePipe(object):
|
|||||||
self._data_cb = data_cb
|
self._data_cb = data_cb
|
||||||
self._user_data = user_data
|
self._user_data = user_data
|
||||||
self._started = False
|
self._started = False
|
||||||
self._worker = 0
|
self._send_worker = 0
|
||||||
self._seq_counter = 0
|
self._seq_counter = 0
|
||||||
|
self._drop_prob = 0
|
||||||
|
self._rt_check_worker = 0
|
||||||
|
|
||||||
self._outgoing = []
|
self._outgoing = []
|
||||||
self._sent = {}
|
self._sent = {}
|
||||||
@ -335,6 +445,14 @@ class MostlyReliablePipe(object):
|
|||||||
self._setup_listener()
|
self._setup_listener()
|
||||||
self._setup_sender()
|
self._setup_sender()
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
if self._send_worker > 0:
|
||||||
|
gobject.source_remove(self._send_worker)
|
||||||
|
self._send_worker = 0
|
||||||
|
if self._rt_check_worker > 0:
|
||||||
|
gobject.source_remove(self._rt_check_worker)
|
||||||
|
self._rt_check_worker = 0
|
||||||
|
|
||||||
def _setup_sender(self):
|
def _setup_sender(self):
|
||||||
"""Setup the send socket for multicast."""
|
"""Setup the send socket for multicast."""
|
||||||
self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
@ -354,7 +472,7 @@ class MostlyReliablePipe(object):
|
|||||||
def start(self):
|
def start(self):
|
||||||
"""Let the listener socket start listening for network data."""
|
"""Let the listener socket start listening for network data."""
|
||||||
# Set some more multicast options
|
# Set some more multicast options
|
||||||
self._listen_sock.bind((self._local_addr, self._port)) # Bind to all interfaces
|
self._listen_sock.bind((self._local_addr, self._port))
|
||||||
self._listen_sock.settimeout(2)
|
self._listen_sock.settimeout(2)
|
||||||
intf = socket.gethostbyname(socket.gethostname())
|
intf = socket.gethostbyname(socket.gethostname())
|
||||||
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF,
|
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF,
|
||||||
@ -365,6 +483,7 @@ class MostlyReliablePipe(object):
|
|||||||
# Watch the listener socket for data
|
# Watch the listener socket for data
|
||||||
gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
|
gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
|
||||||
gobject.timeout_add(self._SEGMENT_TTL * 1000, self._segment_ttl_worker)
|
gobject.timeout_add(self._SEGMENT_TTL * 1000, self._segment_ttl_worker)
|
||||||
|
gobject.timeout_add(250, self._retransmit_check_worker)
|
||||||
|
|
||||||
self._started = True
|
self._started = True
|
||||||
|
|
||||||
@ -373,13 +492,51 @@ class MostlyReliablePipe(object):
|
|||||||
now = time.time()
|
now = time.time()
|
||||||
for segment in self._sent[:]:
|
for segment in self._sent[:]:
|
||||||
if segment.stime() < now - self._SEGMENT_TTL:
|
if segment.stime() < now - self._SEGMENT_TTL:
|
||||||
|
if segment.userdata:
|
||||||
|
gobject.source_remove(segment.userdata)
|
||||||
self._sent.remove(segment)
|
self._sent.remove(segment)
|
||||||
|
|
||||||
|
# Cull incomplete incoming segment chains that haven't gotten any data
|
||||||
|
# for a long time either
|
||||||
|
for msg_key in self._incoming.keys()[:]:
|
||||||
|
message = self._incoming[msg_key]
|
||||||
|
if message.last_incoming_time() < now - self._SEGMENT_TTL:
|
||||||
|
del self._incoming[msg_key]
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _dispatch_message(self, addr, message):
|
def _dispatch_message(self, addr, message):
|
||||||
"""Send complete message data to the owner's data callback."""
|
"""Send complete message data to the owner's data callback."""
|
||||||
self._data_cb(addr, message, self._user_data)
|
self._data_cb(addr, message, self._user_data)
|
||||||
|
|
||||||
|
_MAX_SEGMENT_RETRIES = 10
|
||||||
|
def _retransmit_request(self, message):
|
||||||
|
"""Returns true if the message has exceeded it's retry limit."""
|
||||||
|
first_missing = message.first_missing()
|
||||||
|
if first_missing > 0:
|
||||||
|
num_retries = message.rt_tries(first_missing)
|
||||||
|
if num_retries > self._MAX_SEGMENT_RETRIES:
|
||||||
|
return True
|
||||||
|
msg_seq = self._next_msg_seq()
|
||||||
|
seg = message.get_retransmit_message(msg_seq, first_missing)
|
||||||
|
if seg:
|
||||||
|
print " Requesting retransmit of %d by %s" % (first_missing, message.source_address())
|
||||||
|
self._outgoing.append(seg)
|
||||||
|
self._schedule_send_worker()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _retransmit_check_worker(self):
|
||||||
|
now = time.time()
|
||||||
|
for key in self._incoming.keys()[:]:
|
||||||
|
message = self._incoming[key]
|
||||||
|
if message.complete():
|
||||||
|
continue
|
||||||
|
if message.next_rt_time() < now:
|
||||||
|
if self._retransmit_request(message):
|
||||||
|
# Kill the message, too many retries
|
||||||
|
print "Dropped message %s, exceeded retries." % _stringify_sha(message.sha())
|
||||||
|
del self._incoming[key]
|
||||||
|
return True
|
||||||
|
|
||||||
def _process_incoming_data(self, segment):
|
def _process_incoming_data(self, segment):
|
||||||
"""Handle a new message segment. First checks if there is only one
|
"""Handle a new message segment. First checks if there is only one
|
||||||
segment to the message, and if the checksum from the header matches
|
segment to the message, and if the checksum from the header matches
|
||||||
@ -388,55 +545,56 @@ class MostlyReliablePipe(object):
|
|||||||
checks to see if the message is complete. If all segments are present,
|
checks to see if the message is complete. If all segments are present,
|
||||||
the message is reassembled and dispatched."""
|
the message is reassembled and dispatched."""
|
||||||
|
|
||||||
string_sha = _stringify_sha(segment.master_sha())
|
msg_sha = segment.master_sha()
|
||||||
nsegs = segment.total_segments()
|
nsegs = segment.total_segments()
|
||||||
addr = segment.addr()
|
addr = segment.address()
|
||||||
segno = segment.segment_number()
|
segno = segment.segment_number()
|
||||||
|
|
||||||
# Short-circuit single-segment messages
|
# Short-circuit single-segment messages
|
||||||
if segno == 1 and nsegs == 1:
|
if segno == 1 and nsegs == 1:
|
||||||
# Ensure the header's master sha actually equals the data's sha
|
# Ensure the header's master sha actually equals the data's sha
|
||||||
if string_sha == _stringify_sha(_sha_data(segment.data())):
|
if msg_sha == _sha_data(segment.data()):
|
||||||
self._dispatch_message(addr, segment.data())
|
self._dispatch_message(addr, segment.data())
|
||||||
return
|
return
|
||||||
|
|
||||||
# Otherwise, track the new segment
|
# Otherwise, track the new segment
|
||||||
msg_seq_num = segment.message_sequence_number()
|
msg_seq_num = segment.message_sequence_number()
|
||||||
msg_key = (addr[0], msg_seq_num, string_sha, nsegs)
|
msg_key = (addr[0], msg_seq_num, msg_sha, nsegs)
|
||||||
if not self._incoming.has_key(msg_key):
|
if not self._incoming.has_key(msg_key):
|
||||||
self._incoming[msg_key] = {}
|
self._incoming[msg_key] = Message((addr[0], self._port), msg_seq_num, msg_sha, nsegs)
|
||||||
|
|
||||||
|
message = self._incoming[msg_key]
|
||||||
# Look for a dupe, and if so, drop the new segment
|
# Look for a dupe, and if so, drop the new segment
|
||||||
if self._incoming[msg_key].has_key(segno):
|
if message.has_segment(segno):
|
||||||
return
|
return
|
||||||
self._incoming[msg_key][segno] = segment
|
message.add_segment(segment)
|
||||||
|
|
||||||
# Dispatch the message if all segments are present and the sha is correct
|
# Dispatch the message if all segments are present and the sha is correct
|
||||||
if len(self._incoming[msg_key]) == nsegs:
|
if message.complete():
|
||||||
all_data = ''
|
(msg_data, complete_data_sha) = message.data()
|
||||||
for i in range(1, nsegs + 1):
|
if msg_sha == complete_data_sha:
|
||||||
all_data = all_data + self._incoming[msg_key][i].data()
|
self._dispatch_message(addr, msg_data)
|
||||||
if string_sha == _stringify_sha(_sha_data(all_data)):
|
|
||||||
self._dispatch_message(addr, all_data)
|
|
||||||
del self._incoming[msg_key]
|
del self._incoming[msg_key]
|
||||||
|
return
|
||||||
|
|
||||||
_STD_RETRANSMIT_INTERVAL = 500 # 1/2 second (in milliseconds)
|
_STD_RETRANSMIT_INTERVAL = 150 # in milliseconds
|
||||||
def _calc_next_retransmit(self, segment, now):
|
def _calc_next_retransmit(self, segment, now):
|
||||||
"""Calculate the next time (in seconds) that a packet can be retransmitted."""
|
"""Calculate the next time (in seconds) that a packet can be retransmitted."""
|
||||||
num_retrans = segment.transmits() - 1
|
num_retrans = segment.transmits() - 1
|
||||||
interval = num_retrans * self._STD_RETRANSMIT_INTERVAL
|
interval = num_retrans * self._STD_RETRANSMIT_INTERVAL
|
||||||
randomness = num_retrans * random.randint(-4, 11)
|
randomness = num_retrans * ((random.random() * 15) - 5)
|
||||||
real_interval = max(self._STD_RETRANSMIT_INTERVAL, interval + randomness)
|
real_interval = max(self._STD_RETRANSMIT_INTERVAL, interval + randomness)
|
||||||
|
# print "------ Interval %s" % (real_interval * .001)
|
||||||
return max(now, segment.last_transmit() + (real_interval * .001))
|
return max(now, segment.last_transmit() + (real_interval * .001))
|
||||||
|
|
||||||
def _segment_retransmit_cb(self, segment):
|
def _segment_retransmit_cb(self, key, segment):
|
||||||
"""Add a segment ot the outgoing queue and schedule its transmission."""
|
"""Add a segment ot the outgoing queue and schedule its transmission."""
|
||||||
del self._sent[key]
|
del self._sent[key]
|
||||||
self._outgoing.append(segment)
|
self._outgoing.append(segment)
|
||||||
self._schedule_send_worker()
|
self._schedule_send_worker()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _schedule_segment_retransmit(self, segment, when):
|
def _schedule_segment_retransmit(self, key, segment, when):
|
||||||
"""Schedule retransmission of a segment if one is not already scheduled."""
|
"""Schedule retransmission of a segment if one is not already scheduled."""
|
||||||
if segment.userdata:
|
if segment.userdata:
|
||||||
# Already scheduled for retransmit
|
# Already scheduled for retransmit
|
||||||
@ -444,12 +602,13 @@ class MostlyReliablePipe(object):
|
|||||||
|
|
||||||
if when == 0:
|
if when == 0:
|
||||||
# Immediate retransmission
|
# Immediate retransmission
|
||||||
segment.userdata = gobject.idle_add(self._segment_retransmit_cb, segment)
|
segment.userdata = gobject.idle_add(self._segment_retransmit_cb, key, segment)
|
||||||
else:
|
else:
|
||||||
# convert time to milliseconds
|
# convert time to milliseconds
|
||||||
timeout = int((when - time.time()) * 1000)
|
timeout = int((when - time.time()) * 1000)
|
||||||
|
print "RT timeout: %s" % timeout
|
||||||
segment.userdata = gobject.timeout_add(timeout, self._segment_retransmit_cb,
|
segment.userdata = gobject.timeout_add(timeout, self._segment_retransmit_cb,
|
||||||
segment)
|
key, segment)
|
||||||
|
|
||||||
def _process_retransmit_request(self, segment):
|
def _process_retransmit_request(self, segment):
|
||||||
"""Validate and process a retransmission request."""
|
"""Validate and process a retransmission request."""
|
||||||
@ -462,7 +621,13 @@ class MostlyReliablePipe(object):
|
|||||||
segment = self._sent[key]
|
segment = self._sent[key]
|
||||||
now = time.time()
|
now = time.time()
|
||||||
next_retrans = self._calc_next_retransmit(segment, now)
|
next_retrans = self._calc_next_retransmit(segment, now)
|
||||||
self._schedule_segment_retransmit(segment, next_retrans - now)
|
self._schedule_segment_retransmit(key, segment, next_retrans - now)
|
||||||
|
|
||||||
|
def set_drop_probability(self, prob=4):
|
||||||
|
"""Debugging function to randomly drop incoming packets.
|
||||||
|
The prob argument should be an integer between 1 and 10 to drop,
|
||||||
|
or 0 to drop none. Higher numbers drop more packets."""
|
||||||
|
self._drop_prob = prob
|
||||||
|
|
||||||
def _handle_incoming_data(self, source, condition):
|
def _handle_incoming_data(self, source, condition):
|
||||||
"""Handle incoming network data by making a message segment out of it
|
"""Handle incoming network data by making a message segment out of it
|
||||||
@ -471,8 +636,17 @@ class MostlyReliablePipe(object):
|
|||||||
return True
|
return True
|
||||||
msg = {}
|
msg = {}
|
||||||
data, addr = source.recvfrom(self._UDP_MSG_SIZE)
|
data, addr = source.recvfrom(self._UDP_MSG_SIZE)
|
||||||
|
|
||||||
|
should_drop = False
|
||||||
|
p = random.random() * 10.0
|
||||||
|
if self._drop_prob > 0 and p <= self._drop_prob:
|
||||||
|
should_drop = True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
segment = SegmentBase.new_from_data(addr, data)
|
segment = SegmentBase.new_from_data(addr, data)
|
||||||
|
if should_drop:
|
||||||
|
print "Dropped segment %d (p (%s) < %s)." % (segment.segment_number(), p, self._drop_prob)
|
||||||
|
else:
|
||||||
stype = segment.segment_type()
|
stype = segment.segment_type()
|
||||||
if stype == SegmentBase.type_data():
|
if stype == SegmentBase.type_data():
|
||||||
self._process_incoming_data(segment)
|
self._process_incoming_data(segment)
|
||||||
@ -482,14 +656,18 @@ class MostlyReliablePipe(object):
|
|||||||
print "Bad segment: %s" % exc
|
print "Bad segment: %s" % exc
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def _next_msg_seq(self):
|
||||||
|
self._seq_counter = self._seq_counter + 1
|
||||||
|
if self._seq_counter > 65535:
|
||||||
|
self._seq_counter = 1
|
||||||
|
return self._seq_counter
|
||||||
|
|
||||||
def send(self, data):
|
def send(self, data):
|
||||||
"""Break data up into chunks and queue for later transmission."""
|
"""Break data up into chunks and queue for later transmission."""
|
||||||
if not self._started:
|
if not self._started:
|
||||||
raise Exception("Can't send anything until started!")
|
raise Exception("Can't send anything until started!")
|
||||||
|
|
||||||
self._seq_counter = self._seq_counter + 1
|
msg_seq = self._next_msg_seq()
|
||||||
if self._seq_counter > 65535:
|
|
||||||
self._seq_counter = 1
|
|
||||||
|
|
||||||
# Pack the data into network byte order
|
# Pack the data into network byte order
|
||||||
template = "! %ds" % len(data)
|
template = "! %ds" % len(data)
|
||||||
@ -505,7 +683,7 @@ class MostlyReliablePipe(object):
|
|||||||
msg_num = 1
|
msg_num = 1
|
||||||
while left > 0:
|
while left > 0:
|
||||||
seg = DataSegment.new_from_parts(msg_num, nmessages,
|
seg = DataSegment.new_from_parts(msg_num, nmessages,
|
||||||
self._seq_counter, master_sha, data[:mtu])
|
msg_seq, master_sha, data[:mtu])
|
||||||
self._outgoing.append(seg)
|
self._outgoing.append(seg)
|
||||||
msg_num = msg_num + 1
|
msg_num = msg_num + 1
|
||||||
data = data[mtu:]
|
data = data[mtu:]
|
||||||
@ -513,16 +691,21 @@ class MostlyReliablePipe(object):
|
|||||||
self._schedule_send_worker()
|
self._schedule_send_worker()
|
||||||
|
|
||||||
def _schedule_send_worker(self):
|
def _schedule_send_worker(self):
|
||||||
if len(self._outgoing) > 0 and self._worker == 0:
|
if len(self._outgoing) > 0 and self._send_worker == 0:
|
||||||
self._worker = gobject.idle_add(self._send_worker)
|
self._send_worker = gobject.idle_add(self._send_worker_cb)
|
||||||
|
|
||||||
def _send_worker(self):
|
def _send_worker_cb(self):
|
||||||
"""Send all queued segments that have yet to be transmitted."""
|
"""Send all queued segments that have yet to be transmitted."""
|
||||||
self._worker = 0
|
self._send_worker = 0
|
||||||
for segment in self._outgoing:
|
for segment in self._outgoing:
|
||||||
packet = segment.packetize()
|
packet = segment.packetize()
|
||||||
segment.inc_transmits()
|
segment.inc_transmits()
|
||||||
self._send_sock.sendto(packet, (self._remote_addr, self._port))
|
addr = (self._remote_addr, self._port)
|
||||||
|
if segment.address():
|
||||||
|
addr = segment.address()
|
||||||
|
self._send_sock.sendto(packet, addr)
|
||||||
|
if segment.userdata:
|
||||||
|
gobject.source_remove(segment.userdata)
|
||||||
segment.userdata = None # Retransmission GSource
|
segment.userdata = None # Retransmission GSource
|
||||||
key = (segment.message_sequence_number(), segment.master_sha(), segment.segment_number())
|
key = (segment.message_sequence_number(), segment.master_sha(), segment.segment_number())
|
||||||
self._sent[key] = segment
|
self._sent[key] = segment
|
||||||
@ -566,7 +749,7 @@ class SegmentBaseInitTestCase(SegmentBaseTestCase):
|
|||||||
def testGoodInit(self):
|
def testGoodInit(self):
|
||||||
seg = SegmentBase(self._DEF_SEGNO, self._DEF_TOT_SEGS, self._DEF_MSG_SEQ_NUM, self._DEF_MASTER_SHA)
|
seg = SegmentBase(self._DEF_SEGNO, self._DEF_TOT_SEGS, self._DEF_MSG_SEQ_NUM, self._DEF_MASTER_SHA)
|
||||||
assert seg.stime() < time.time(), "segment start time is less than now!"
|
assert seg.stime() < time.time(), "segment start time is less than now!"
|
||||||
assert not seg.addr(), "Segment address was not None after init."
|
assert not seg.address(), "Segment address was not None after init."
|
||||||
assert seg.segment_number() == self._DEF_SEGNO, "Segment number wasn't correct after init."
|
assert seg.segment_number() == self._DEF_SEGNO, "Segment number wasn't correct after init."
|
||||||
assert seg.total_segments() == self._DEF_TOT_SEGS, "Total segments wasn't correct after init."
|
assert seg.total_segments() == self._DEF_TOT_SEGS, "Total segments wasn't correct after init."
|
||||||
assert seg.message_sequence_number() == self._DEF_MSG_SEQ_NUM, "Message sequence number wasn't correct after init."
|
assert seg.message_sequence_number() == self._DEF_MSG_SEQ_NUM, "Message sequence number wasn't correct after init."
|
||||||
@ -692,7 +875,7 @@ class DataSegmentTestCase(SegmentBaseTestCase):
|
|||||||
self._DEF_TOT_SEGS, self._DEF_MSG_SEQ_NUM, payload_sha)
|
self._DEF_TOT_SEGS, self._DEF_MSG_SEQ_NUM, payload_sha)
|
||||||
seg = SegmentBase.new_from_data(self._DEF_ADDRESS, header + payload)
|
seg = SegmentBase.new_from_data(self._DEF_ADDRESS, header + payload)
|
||||||
|
|
||||||
assert seg.addr() == self._DEF_ADDRESS, "Segment address did not match expected."
|
assert seg.address() == self._DEF_ADDRESS, "Segment address did not match expected."
|
||||||
assert seg.segment_type() == SegmentBase.type_data(), "Segment type did not match expected."
|
assert seg.segment_type() == SegmentBase.type_data(), "Segment type did not match expected."
|
||||||
assert seg.segment_number() == self._DEF_SEGNO, "Segment number did not match expected."
|
assert seg.segment_number() == self._DEF_SEGNO, "Segment number did not match expected."
|
||||||
assert seg.total_segments() == self._DEF_TOT_SEGS, "Total segments did not match expected."
|
assert seg.total_segments() == self._DEF_TOT_SEGS, "Total segments did not match expected."
|
||||||
@ -730,7 +913,8 @@ class RetransmitSegmentTestCase(SegmentBaseTestCase):
|
|||||||
|
|
||||||
def _test_new_from_parts_fail(self, msg_seq_num, rt_msg_seq_num, rt_master_sha, rt_segment_number, fail_msg):
|
def _test_new_from_parts_fail(self, msg_seq_num, rt_msg_seq_num, rt_master_sha, rt_segment_number, fail_msg):
|
||||||
try:
|
try:
|
||||||
seg = RetransmitSegment.new_from_parts(msg_seq_num, rt_msg_seq_num, rt_master_sha, rt_segment_number)
|
seg = RetransmitSegment.new_from_parts(self._DEF_ADDRESS, msg_seq_num, rt_msg_seq_num,
|
||||||
|
rt_master_sha, rt_segment_number)
|
||||||
except ValueError, exc:
|
except ValueError, exc:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
@ -775,7 +959,7 @@ class RetransmitSegmentTestCase(SegmentBaseTestCase):
|
|||||||
self._DEF_MASTER_SHA, "", "invalid retransmit message segment number")
|
self._DEF_MASTER_SHA, "", "invalid retransmit message segment number")
|
||||||
|
|
||||||
# Ensure message data is same as we stuff in after object is instantiated
|
# Ensure message data is same as we stuff in after object is instantiated
|
||||||
seg = RetransmitSegment.new_from_parts(self._DEF_MSG_SEQ_NUM, self._DEF_MSG_SEQ_NUM,
|
seg = RetransmitSegment.new_from_parts(self._DEF_ADDRESS, self._DEF_MSG_SEQ_NUM, self._DEF_MSG_SEQ_NUM,
|
||||||
self._DEF_MASTER_SHA, self._DEF_SEGNO)
|
self._DEF_MASTER_SHA, self._DEF_SEGNO)
|
||||||
assert seg.rt_msg_seq_num() == self._DEF_MSG_SEQ_NUM, "RT message sequence number after segment creation didn't match expected."
|
assert seg.rt_msg_seq_num() == self._DEF_MSG_SEQ_NUM, "RT message sequence number after segment creation didn't match expected."
|
||||||
assert seg.rt_master_sha() == self._DEF_MASTER_SHA, "RT master SHA after segment creation didn't match expected."
|
assert seg.rt_master_sha() == self._DEF_MASTER_SHA, "RT master SHA after segment creation didn't match expected."
|
||||||
@ -814,10 +998,19 @@ class RetransmitSegmentTestCase(SegmentBaseTestCase):
|
|||||||
assert seg.rt_master_sha() == self._DEF_MASTER_SHA, "Segment RT master SHA didn't match expected."
|
assert seg.rt_master_sha() == self._DEF_MASTER_SHA, "Segment RT master SHA didn't match expected."
|
||||||
assert seg.rt_segment_number() == self._DEF_SEGNO, "Segment RT segment number didn't match expected."
|
assert seg.rt_segment_number() == self._DEF_SEGNO, "Segment RT segment number didn't match expected."
|
||||||
|
|
||||||
|
def testPartsToData(self):
|
||||||
|
seg = RetransmitSegment.new_from_parts(self._DEF_ADDRESS, self._DEF_MSG_SEQ_NUM, self._DEF_MSG_SEQ_NUM,
|
||||||
|
self._DEF_MASTER_SHA, self._DEF_SEGNO)
|
||||||
|
new_seg = SegmentBase.new_from_data(self._DEF_ADDRESS, seg.packetize())
|
||||||
|
assert new_seg.rt_msg_seq_num() == self._DEF_MSG_SEQ_NUM, "Segment RT message sequence number didn't match expected."
|
||||||
|
assert new_seg.rt_master_sha() == self._DEF_MASTER_SHA, "Segment RT master SHA didn't match expected."
|
||||||
|
assert new_seg.rt_segment_number() == self._DEF_SEGNO, "Segment RT segment number didn't match expected."
|
||||||
|
|
||||||
def addToSuite(suite):
|
def addToSuite(suite):
|
||||||
suite.addTest(RetransmitSegmentTestCase("testInit"))
|
suite.addTest(RetransmitSegmentTestCase("testInit"))
|
||||||
suite.addTest(RetransmitSegmentTestCase("testNewFromParts"))
|
suite.addTest(RetransmitSegmentTestCase("testNewFromParts"))
|
||||||
suite.addTest(RetransmitSegmentTestCase("testNewFromData"))
|
suite.addTest(RetransmitSegmentTestCase("testNewFromData"))
|
||||||
|
suite.addTest(RetransmitSegmentTestCase("testPartsToData"))
|
||||||
addToSuite = staticmethod(addToSuite)
|
addToSuite = staticmethod(addToSuite)
|
||||||
|
|
||||||
|
|
||||||
@ -858,10 +1051,19 @@ def main():
|
|||||||
def got_data(addr, data, user_data=None):
|
def got_data(addr, data, user_data=None):
|
||||||
print "Data (%s): %s" % (addr, data)
|
print "Data (%s): %s" % (addr, data)
|
||||||
|
|
||||||
def foobar():
|
def retry_test():
|
||||||
pipe = MostlyReliablePipe('', '224.0.0.222', 2293, got_data)
|
pipe = MostlyReliablePipe('', '224.0.0.222', 2293, got_data)
|
||||||
|
pipe.set_drop_probability(3)
|
||||||
pipe.start()
|
pipe.start()
|
||||||
pipe.send('The quick brown fox jumps over the lazy dog')
|
msg = """The said Eliza, John, and Georgiana were now clustered round their mama in the drawing-room:
|
||||||
|
she lay reclined on a sofa by the fireside, and with her darlings about her (for the time neither
|
||||||
|
quarrelling nor crying) looked perfectly happy. Me, she had dispensed from joining the group; saying,
|
||||||
|
'She regretted to be under the necessity of keeping me at a distance; but that until she heard from
|
||||||
|
Bessie, and could discover by her own observation, that I was endeavouring in good earnest to acquire
|
||||||
|
a more sociable and childlike disposition, a more attractive and sprightly manner -- something lighter,
|
||||||
|
franker, more natural, as it were -- she really must exclude me from privileges intended only for
|
||||||
|
contented, happy, little children.'"""
|
||||||
|
pipe.send(msg)
|
||||||
try:
|
try:
|
||||||
gtk.main()
|
gtk.main()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
Loading…
Reference in New Issue
Block a user