More presence service rework
This commit is contained in:
+67
-41
@@ -1,44 +1,49 @@
|
||||
import xmlrpclib
|
||||
import socket
|
||||
import traceback
|
||||
import random
|
||||
|
||||
import network
|
||||
from MostlyReliablePipe import MostlyReliablePipe
|
||||
from sugar.presence import Service
|
||||
|
||||
class Stream(object):
|
||||
def __init__(self, service, group):
|
||||
if not service:
|
||||
raise ValueError("service must be valid")
|
||||
def __init__(self, service):
|
||||
if not isinstance(service, Service.Service):
|
||||
raise ValueError("service must be valid.")
|
||||
if not service.get_port():
|
||||
raise ValueError("service must have an address.")
|
||||
self._service = service
|
||||
self._group = group
|
||||
self._owner_nick_name = self._group.get_owner().get_nick_name()
|
||||
self._port = self._service.get_port()
|
||||
self._reader_port = self._service.get_port()
|
||||
self._writer_port = self._reader_port
|
||||
self._address = self._service.get_address()
|
||||
self._callback = None
|
||||
|
||||
def new_from_service(service, group):
|
||||
if service.is_multicast():
|
||||
return MulticastStream(service, group)
|
||||
def new_from_service(service, start_reader=True):
|
||||
if not isinstance(service, Service.Service):
|
||||
raise ValueError("service must be valid.")
|
||||
if service.is_multicast_service():
|
||||
return MulticastStream(service)
|
||||
else:
|
||||
return UnicastStream(service, group)
|
||||
return UnicastStream(service, start_reader)
|
||||
new_from_service = staticmethod(new_from_service)
|
||||
|
||||
def set_data_listener(self, callback):
|
||||
self._callback = callback
|
||||
|
||||
def recv(self, nick_name, data):
|
||||
if nick_name != self._owner_nick_name:
|
||||
if self._callback:
|
||||
self._callback(self._group.get_buddy(nick_name), data)
|
||||
def _recv(self, address, data):
|
||||
if self._callback:
|
||||
self._callback(data)
|
||||
|
||||
|
||||
class UnicastStreamWriter(object):
|
||||
def __init__(self, stream, service, owner_nick_name):
|
||||
def __init__(self, stream, service):
|
||||
# set up the writer
|
||||
if not service:
|
||||
if not isinstance(service, Service.Service):
|
||||
raise ValueError("service must be valid")
|
||||
self._service = service
|
||||
self._owner_nick_name = owner_nick_name
|
||||
if not service.get_address():
|
||||
raise ValueError("service must have a valid address.")
|
||||
self._address = self._service.get_address()
|
||||
self._port = self._service.get_port()
|
||||
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
|
||||
@@ -47,7 +52,7 @@ class UnicastStreamWriter(object):
|
||||
def write(self, xmlrpc_data):
|
||||
"""Write some data to the default endpoint of this pipe on the remote server."""
|
||||
try:
|
||||
self._writer.message(None, None, self._owner_nick_name, xmlrpc_data)
|
||||
self._writer.message(None, None, xmlrpc_data)
|
||||
return True
|
||||
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
|
||||
traceback.print_exc()
|
||||
@@ -65,58 +70,79 @@ class UnicastStreamWriter(object):
|
||||
|
||||
|
||||
class UnicastStream(Stream):
|
||||
def __init__(self, service, group):
|
||||
Stream.__init__(self, service, group)
|
||||
self._setup()
|
||||
def __init__(self, service, start_reader=True):
|
||||
"""Initializes the stream. If the 'start_reader' argument is True,
|
||||
the stream will initialize and start a new stream reader, if it
|
||||
is False, no reader will be created and the caller must call the
|
||||
start_reader() method to start the stream reader and be able to
|
||||
receive any data from the stream."""
|
||||
Stream.__init__(self, service)
|
||||
if start_reader:
|
||||
self.start_reader()
|
||||
|
||||
def _setup(self):
|
||||
def start_reader(self, update_service_port=True):
|
||||
"""Start the stream's reader, which for UnicastStream objects is
|
||||
and XMLRPC server. If there's a port conflict with some other
|
||||
service, the reader will try to find another port to use instead.
|
||||
Returns the port number used for the reader."""
|
||||
# Set up the reader
|
||||
started = False
|
||||
tries = 10
|
||||
port = self._service.get_port()
|
||||
self._reader = None
|
||||
while not started and tries > 0:
|
||||
try:
|
||||
self._reader = network.GlibXMLRPCServer(("", port))
|
||||
self._reader = network.GlibXMLRPCServer(("", self._reader_port))
|
||||
self._reader.register_function(self._message, "message")
|
||||
if update_service_port:
|
||||
self._service.set_port(self._reader_port) # Update the service's port
|
||||
started = True
|
||||
except(socket.error):
|
||||
port = port + 1
|
||||
self._reader_port = random.randint(self._reader_port + 1, 65500)
|
||||
tries = tries - 1
|
||||
if self._reader is None:
|
||||
print 'Could not start xmlrpc server.'
|
||||
self._service.set_port(port)
|
||||
print 'Could not start stream reader.'
|
||||
return self._reader_port
|
||||
|
||||
def _message(self, nick_name, message):
|
||||
def _message(self, message):
|
||||
"""Called by the XMLRPC server when network data arrives."""
|
||||
self.recv(nick_name, message)
|
||||
address = network.get_authinfo()
|
||||
self._recv(address, message)
|
||||
return True
|
||||
|
||||
def register_handler(self, handler, name):
|
||||
def register_reader_handler(self, handler, name):
|
||||
"""Register a custom message handler with the reader. This call
|
||||
adds a custom XMLRPC method call with the name 'name' to the reader's
|
||||
XMLRPC server, which then calls the 'handler' argument back when
|
||||
a method call for it arrives over the network."""
|
||||
if name == "message":
|
||||
raise ValueError("Handler name 'message' is a reserved handler.")
|
||||
self._reader.register_function(handler, name)
|
||||
|
||||
def new_writer(self, service):
|
||||
return UnicastStreamWriter(self, service, self._owner_nick_name)
|
||||
"""Return a new stream writer object."""
|
||||
return UnicastStreamWriter(self, service)
|
||||
|
||||
|
||||
class MulticastStream(Stream):
|
||||
def __init__(self, service, group):
|
||||
Stream.__init__(self, service, group)
|
||||
self._address = self._service.get_group_address()
|
||||
self._setup()
|
||||
def __init__(self, service):
|
||||
Stream.__init__(self, service)
|
||||
self._internal_start_reader()
|
||||
|
||||
def _setup(self):
|
||||
self._pipe = MostlyReliablePipe('', self._address, self._port, self._recv_data_cb)
|
||||
def start_reader(self):
|
||||
return self._reader_port
|
||||
|
||||
def _internal_start_reader(self):
|
||||
if not service.get_address():
|
||||
raise ValueError("service must have a valid address.")
|
||||
self._pipe = MostlyReliablePipe('', self._address, self._reader_port,
|
||||
self._recv_data_cb)
|
||||
self._pipe.start()
|
||||
|
||||
def write(self, data):
|
||||
self._pipe.send(self._owner_nick_name + " |**| " + data)
|
||||
self._pipe.send(data)
|
||||
|
||||
def _recv_data_cb(self, addr, data, user_data=None):
|
||||
[ nick_name, data ] = data.split(" |**| ", 2)
|
||||
self.recv(nick_name, data)
|
||||
def _recv_data_cb(self, address, data, user_data=None):
|
||||
self._recv(address, data)
|
||||
|
||||
def new_writer(self, service=None):
|
||||
return self
|
||||
|
||||
Reference in New Issue
Block a user