diff --git a/sugar/chat/chat.py b/sugar/chat/chat.py index e23b3475..3898f759 100755 --- a/sugar/chat/chat.py +++ b/sugar/chat/chat.py @@ -15,8 +15,7 @@ from sugar.shell import activity from sugar.p2p.Group import Group from sugar.p2p.Group import LocalGroup from sugar.p2p.Service import Service -from sugar.p2p.StreamReader import StreamReader -from sugar.p2p.StreamWriter import StreamWriter +from sugar.p2p.Stream import Stream from sugar.session.LogWriter import LogWriter import sugar.env @@ -177,10 +176,7 @@ class BuddyChat(Chat): Chat.__init__(self, controller) def _start(self): - group = self._controller.get_group() - buddy_name = self._buddy.get_service_name() - service = group.get_service(buddy_name, CHAT_SERVICE_TYPE) - self._stream_writer = StreamWriter(group, service) + self._stream_writer = self._controller.new_buddy_writer(self._buddy.get_service_name()) def activity_on_connected_to_shell(self): Chat.activity_on_connected_to_shell(self) @@ -220,26 +216,33 @@ class GroupChat(Chat): def get_group(self): return self._group + def new_buddy_writer(self, buddy_name): + service = self._group.get_service(buddy_name, CHAT_SERVICE_TYPE) + return self._buddy_stream.new_writer(service) + def _start(self): self._group = LocalGroup() self._group.add_presence_listener(self._on_group_event) self._group.join() name = self._group.get_owner().get_service_name() - service = Service(name, CHAT_SERVICE_TYPE, CHAT_SERVICE_PORT) - self._buddy_reader = StreamReader(self._group, service) - self._buddy_reader.set_listener(self._buddy_recv_message) - service.register(self._group) - service = Service(name, GROUP_CHAT_SERVICE_TYPE, + # Group controls the Stream for incoming messages for + # specific buddy chats + buddy_service = Service(name, CHAT_SERVICE_TYPE, CHAT_SERVICE_PORT) + self._buddy_stream = Stream.new_from_service(buddy_service, self._group) + self._buddy_stream.set_data_listener(self._buddy_recv_message) + buddy_service.register(self._group) + + # Group chat Stream + group_service = Service(name, GROUP_CHAT_SERVICE_TYPE, GROUP_CHAT_SERVICE_PORT, GROUP_CHAT_SERVICE_ADDRESS) - self._group.add_service(service) + self._group.add_service(group_service) - self._buddy_reader = StreamReader(self._group, service) - self._buddy_reader.set_listener(self.recv_message) - - self._stream_writer = StreamWriter(self._group, service) + self._group_stream = Stream.new_from_service(group_service, self._group) + self._group_stream.set_data_listener(self.recv_message) + self._stream_writer = self._group_stream.new_writer() def _create_sidebar(self): vbox = gtk.VBox(False, 6) @@ -404,4 +407,7 @@ if len(sys.argv) > 1 and sys.argv[1] == "--console": sys.stderr = LogWriter("Chat") ChatShell.get_instance().open_group_chat() -gtk.main() +try: + gtk.main() +except KeyboardInterrupt: + pass diff --git a/sugar/p2p/Makefile.am b/sugar/p2p/Makefile.am index 6c86ed25..224bc3cd 100644 --- a/sugar/p2p/Makefile.am +++ b/sugar/p2p/Makefile.am @@ -8,7 +8,7 @@ sugar_PYTHON = \ NotificationListener.py \ Notifier.py \ Service.py \ - StreamReader.py \ - StreamWriter.py \ + Stream.py \ + MostlyReliablePipe.py \ network.py \ presence.py diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py new file mode 100644 index 00000000..77d8945b --- /dev/null +++ b/sugar/p2p/Stream.py @@ -0,0 +1,99 @@ +import xmlrpclib +import socket + +import network +from MostlyReliablePipe import MostlyReliablePipe + +class Stream(object): + def __init__(self, service, group): + self._service = service + self._group = group + self._owner_nick_name = self._group.get_owner().get_nick_name() + self._port = self._service.get_port() + self._address = self._service.get_address() + self._callback = None + + def new_from_service(service, group): + if service.is_multicast(): + return MulticastStream(service, group) + else: + return UnicastStream(service, group) + new_from_service = staticmethod(new_from_service) + + def set_data_listener(self, callback): + self._callback = callback + + def recv(self, nick_name, data): + if nick_name != self._owner_nick_name: + if self._callback: + self._callback(self._group.get_buddy(nick_name), data) + + +class UnicastStreamWriter(object): + def __init__(self, stream, service, owner_nick_name): + # set up the writer + self._service = service + self._owner_nick_name = owner_nick_name + self._address = self._service.get_address() + self._port = self._service.get_port() + self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port) + self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr) + + def write(self, data): + try: + self._writer.message(self._owner_nick_name, data) + return True + except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): + traceback.print_exc() + return False + + +class UnicastStream(Stream): + def __init__(self, service, group): + Stream.__init__(self, service, group) + self._setup() + + def _setup(self): + # Set up the reader + started = False + tries = 10 + port = self._service.get_port() + self._reader = None + while not started and tries > 0: + try: + self._reader = network.GlibXMLRPCServer(("", port)) + self._reader.register_instance(self) + started = True + except(socket.error): + port = port + 1 + tries = tries - 1 + self._service.set_port(port) + + def message(self, nick_name, message): + """Called by the XMLRPC server when network data arrives.""" + self.recv(nick_name, message) + return True + + def new_writer(self, service): + return UnicastStreamWriter(self, service, self._owner_nick_name) + + +class MulticastStream(Stream): + def __init__(self, service, group): + Stream.__init__(self, service, group) + self._address = self._service.get_group_address() + self._setup() + + def _setup(self): + self._pipe = MostlyReliablePipe('', self._address, self._port, self._recv_data_cb) + self._pipe.start() + + def write(self, data): + self._pipe.send(self._owner_nick_name + " |**| " + data) + + def _recv_data_cb(self, addr, data, user_data=None): + [ nick_name, data ] = data.split(" |**| ", 2) + self.recv(nick_name, data) + + def new_writer(self, service=None): + return self diff --git a/sugar/p2p/StreamReader.py b/sugar/p2p/StreamReader.py deleted file mode 100644 index 3aca0a8f..00000000 --- a/sugar/p2p/StreamReader.py +++ /dev/null @@ -1,52 +0,0 @@ -import socket - -import network - -class StreamReaderRequestHandler(object): - def __init__(self, reader): - self._reader = reader - - def message(self, nick_name, message): - self._reader.recv(nick_name, message) - return True - -class StreamReader: - def __init__(self, group, service): - self._group = group - self._service = service - - if self._service.is_multicast(): - self._setup_multicast() - else: - self._setup_unicast() - - def set_listener(self, callback): - self._callback = callback - - def _setup_multicast(self): - address = self._service.get_group_address() - port = self._service.get_port() - server = network.GroupServer(address, port, self._recv_multicast) - server.start() - - def _setup_unicast(self): - started = False - tries = 10 - port = self._service.get_port() - while not started and tries > 0: - try: - p2p_server = network.GlibXMLRPCServer(("", port)) - p2p_server.register_instance(StreamReaderRequestHandler(self)) - started = True - except(socket.error): - port = port + 1 - tries = tries - 1 - self._service.set_port(port) - - def _recv_multicast(self, msg): - [ nick_name, data ] = msg['data'].split(" |**| ", 2) - self.recv(nick_name, data) - - def recv(self, nick_name, data): - if nick_name != self._group.get_owner().get_nick_name(): - self._callback(self._group.get_buddy(nick_name), data) diff --git a/sugar/p2p/StreamWriter.py b/sugar/p2p/StreamWriter.py deleted file mode 100644 index 4fc912a7..00000000 --- a/sugar/p2p/StreamWriter.py +++ /dev/null @@ -1,44 +0,0 @@ -import xmlrpclib -import traceback -import socket - -import network - -class StreamWriter: - def __init__(self, group, service): - self._group = group - self._service = service - self._address = self._service.get_address() - self._port = self._service.get_port() - self._group_address = self._service.get_group_address() - - if self._service.is_multicast(): - self._setup_multicast() - else: - self._setup_unicast() - - def write(self, data): - if self._service.is_multicast(): - self._multicast_write(data) - else: - self._unicast_write(data) - - def _setup_unicast(self): - xmlrpc_addr = "http://%s:%d" % (self._address, self._port) - self._uclient = xmlrpclib.ServerProxy(xmlrpc_addr) - - def _unicast_write(self, data): - try: - nick_name = self._group.get_owner().get_nick_name() - self._uclient.message(nick_name, data) - return True - except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): - traceback.print_exc() - return False - - def _setup_multicast(self): - self._mclient = network.GroupClient(self._group_address, self._port) - - def _multicast_write(self, data): - nick_name = self._group.get_owner().get_nick_name() - self._mclient.send_msg(nick_name + " |**| " + data)