diff --git a/sugar/chat/chat.py b/sugar/chat/chat.py index e3ddeebd..bea041cc 100755 --- a/sugar/chat/chat.py +++ b/sugar/chat/chat.py @@ -14,6 +14,7 @@ import gtk, gobject from sugar.shell import activity from sugar.p2p.Group import Group +from sugar.p2p import Buddy from sugar.p2p.Group import LocalGroup from sugar.p2p.Service import Service from sugar.p2p.Stream import Stream @@ -34,6 +35,7 @@ GROUP_CHAT_SERVICE_PORT = 6200 class Chat(activity.Activity): def __init__(self, controller): + Buddy.recognize_buddy_service_type(CHAT_SERVICE_TYPE) self._controller = controller activity.Activity.__init__(self) self._stream_writer = None @@ -372,7 +374,7 @@ class BuddyChat(Chat): self.activity_set_can_close(True) self.activity_set_tab_icon_name("im") self.activity_show_icon(True) - self._stream_writer = self._controller.new_buddy_writer(self._buddy.get_service_name()) + self._stream_writer = self._controller.new_buddy_writer(self._buddy) def recv_message(self, sender, msg): Chat.recv_message(self, self._buddy, msg) @@ -416,17 +418,17 @@ class GroupChat(Chat): def get_group(self): return self._group - def new_buddy_writer(self, buddy_name): - service = self._group.get_service(buddy_name, CHAT_SERVICE_TYPE) - return self._buddy_stream.new_writer(service) + def new_buddy_writer(self, buddy, threaded=False): + service = buddy.get_service(CHAT_SERVICE_TYPE) + return self._buddy_stream.new_writer(service, threaded=threaded) def _start(self): self._group = LocalGroup() self._group.add_presence_listener(self._on_group_presence_event) self._group.add_service_listener(self._on_group_service_event) self._group.join() - - name = self._group.get_owner().get_service_name() + + name = self._group.get_owner().get_nick_name() # Group controls the Stream for incoming messages for # specific buddy chats @@ -540,14 +542,18 @@ class GroupChat(Chat): self._chats[buddy] = chat chat.activity_connect_to_shell() - def _request_buddy_icon(self, buddy): - writer = self.new_buddy_writer(buddy.get_service_name()) - icon = writer.custom_request("get_buddy_icon") + def _request_buddy_icon_cb(self, response, user_data): + icon = response + buddy = user_data if icon and len(icon): icon = base64.b64decode(icon) - print "Setting buddy icon for '%s' to %s" % (buddy.get_nick_name(), icon) + print "Buddy icon for '%s' is size %d" % (buddy.get_nick_name(), len(icon)) buddy.set_icon(icon) + def _request_buddy_icon(self, buddy): + writer = self.new_buddy_writer(buddy, threaded=True) + icon = writer.custom_request("get_buddy_icon", self._request_buddy_icon_cb, buddy) + def _on_group_service_event(self, action, service): if action == Group.SERVICE_ADDED: # Look for the olpc chat service @@ -557,6 +563,7 @@ class GroupChat(Chat): if buddy and buddy.get_address() == service.get_address(): # Try to get the buddy's icon if buddy.get_nick_name() != self._group.get_owner().get_nick_name(): + print "Requesting buddy icon from '%s'." % buddy.get_nick_name() self._request_buddy_icon(buddy) elif action == Group.SERVICE_REMOVED: pass @@ -634,6 +641,7 @@ if len(sys.argv) > 1 and sys.argv[1] == "--console": ChatShell.get_instance().open_group_chat() try: + gtk.threads_init() gtk.main() except KeyboardInterrupt: pass diff --git a/sugar/p2p/Buddy.py b/sugar/p2p/Buddy.py index f3895a32..04b34e5e 100644 --- a/sugar/p2p/Buddy.py +++ b/sugar/p2p/Buddy.py @@ -2,38 +2,63 @@ import pwd import os from Service import Service +from sugar import env PRESENCE_SERVICE_TYPE = "_olpc_presence._tcp" PRESENCE_SERVICE_PORT = 6000 -class BuddyBase: - def __init__(self, service, nick_name): - self._service = service - self._nick_name = nick_name +__buddy_service_types = [PRESENCE_SERVICE_TYPE] + +def recognize_buddy_service_type(stype): + if stype not in __buddy_service_types: + __buddy_service_types.append(stype) + +def get_recognized_buddy_service_types(): + return __buddy_service_types[:] + + +class Buddy(object): + def __init__(self, service): + self._services = {} + self._services[service.get_type()] = service + self._nick_name = service.get_name() + self._address = service.get_address() def get_icon(self): """Return the buddies icon, if any.""" return self._icon def get_address(self): - return self._service.get_address() + return self._address - def get_service_name(self): - return self._service.get_name() + def add_service(self, service): + if service.get_name() != self._nick_name: + return False + if service.get_address() != self._address: + return False + if self._services.has_key(service.get_type()): + return False + self._services[service.get_type()] = service + + def remove_service(self, stype): + if self._services.has_key(stype): + del self._services[stype] + + def get_service(self, stype): + if self._services.has_key(stype): + return self._services[stype] + return None def get_nick_name(self): return self._nick_name - -class Buddy(BuddyBase): - """Normal buddy class.""" def set_icon(self, icon): """Can only set icon for other buddies. The Owner takes care of setting it's own icon.""" self._icon = icon - + -class Owner(BuddyBase): +class Owner(Buddy): """Class representing the owner of this machine/instance.""" def __init__(self, group): self._group = group @@ -42,19 +67,20 @@ class Owner(BuddyBase): if not nick or not len(nick): nick = "n00b" - service = Service(nick, PRESENCE_SERVICE_TYPE, PRESENCE_SERVICE_PORT) - BuddyBase.__init__(self, service, nick) + self._presence_service = Service(nick, PRESENCE_SERVICE_TYPE, PRESENCE_SERVICE_PORT) + Buddy.__init__(self, self._presence_service) - sugar_dir = os.path.abspath(os.path.expanduser("~/.sugar")) - icon = None - for fname in os.listdir(sugar_dir): + for fname in os.listdir(env.get_user_dir()): if not fname.startswith("buddy-icon."): continue - fd = open(os.path.join(sugar_dir, fname), "r") + fd = open(os.path.join(env.get_user_dir(), fname), "r") self._icon = fd.read() fd.close() break + def set_icon(self, icon): + """Can only set icon in constructor for now.""" + pass def register(self): - self._service.register(self._group) + self._presence_service.register(self._group) diff --git a/sugar/p2p/Group.py b/sugar/p2p/Group.py index c21bbf8d..daefedd5 100644 --- a/sugar/p2p/Group.py +++ b/sugar/p2p/Group.py @@ -1,6 +1,7 @@ import avahi from Buddy import Buddy +from Buddy import get_recognized_buddy_service_types from Buddy import Owner from Buddy import PRESENCE_SERVICE_TYPE from Service import Service @@ -105,8 +106,14 @@ class LocalGroup(Group): self._pdiscovery.resolve_service(interface, protocol, name, stype, domain, self._on_service_resolved) elif action == presence.ACTION_SERVICE_REMOVED: - if stype == PRESENCE_SERVICE_TYPE: - self._remove_buddy(name) + if stype in get_recognized_buddy_service_types(): + buddy = self.get_buddy(name) + if buddy: + buddy.remove_service(stype) + # Removal of the presence service removes the buddy too + if stype == PRESENCE_SERVICE_TYPE: + self._remove_buddy(name) + self.remove_service((name, stype)) elif stype.startswith(_OLPC_SERVICE_TYPE_PREFIX): self.remove_service((name, stype)) @@ -118,9 +125,20 @@ class LocalGroup(Group): for prop in avahi.txt_array_to_string_array(txt): (key, value) = prop.split('=') if key == 'group_address': - service.set_group_address(value) + service.set_group_address(value) - if stype == PRESENCE_SERVICE_TYPE: - self._add_buddy(Buddy(service, name)) - elif stype.startswith(_OLPC_SERVICE_TYPE_PREFIX): + # print "ServiceResolved: name=%s, stype=%s, port=%s, address=%s" % (name, stype, port, address) + if stype in get_recognized_buddy_service_types(): + # Service recognized as Buddy services either create a new + # buddy if one doesn't exist yet, or get added to the existing + # buddy + buddy = self.get_buddy(name) + if buddy: + buddy.add_service(service) + else: + self._add_buddy(Buddy(service)) self.add_service(service) + elif stype.startswith(_OLPC_SERVICE_TYPE_PREFIX): + # These services aren't associated with buddies + self.add_service(service) + diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py index e6c1a912..2467f5a2 100644 --- a/sugar/p2p/Stream.py +++ b/sugar/p2p/Stream.py @@ -1,6 +1,12 @@ import xmlrpclib import socket import traceback +import threading + +import pygtk +pygtk.require('2.0') +import gobject + import network from MostlyReliablePipe import MostlyReliablePipe @@ -32,7 +38,7 @@ class Stream(object): self._callback(self._group.get_buddy(nick_name), data) -class UnicastStreamWriter(object): +class UnicastStreamWriterBase(object): def __init__(self, stream, service, owner_nick_name): # set up the writer if not service: @@ -42,6 +48,10 @@ class UnicastStreamWriter(object): self._address = self._service.get_address() self._port = self._service.get_port() self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port) + +class UnicastStreamWriter(UnicastStreamWriterBase): + def __init__(self, stream, service, owner_nick_name): + UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name) self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr) def write(self, data): @@ -63,6 +73,60 @@ class UnicastStreamWriter(object): return None +class ThreadedRequest(threading.Thread): + def __init__(self, controller, addr, method, response_cb, user_data, *args): + threading.Thread.__init__(self) + self._controller = controller + self._method = method + self._args = args + self._response_cb = response_cb + self._user_data = user_data + self._writer = xmlrpclib.ServerProxy(addr) + + def run(self): + response = None + try: + method = getattr(self._writer, self._method) + response = method(*self._args) + except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): + traceback.print_exc() + if self._response_cb: + gobject.idle_add(self._response_cb, response, self._user_data) + self._controller.notify_request_done(self) + +class ThreadedUnicastStreamWriter(UnicastStreamWriterBase): + def __init__(self, stream, service, owner_nick_name): + self._requests_lock = threading.Lock() + self._requests = [] + UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name) + + def _add_request(self, request): + self._requests_lock.acquire() + if not request in self._requests: + self._requests.append(request) + self._requests_lock.release() + + def write(self, response_cb, user_data, data): + """Write some data to the default endpoint of this pipe on the remote server.""" + request = ThreadedRequest(self, self._xmlrpc_addr, "message", response_cb, + user_data, self._owner_nick_name, data) + self._add_request(request) + request.start() + + def custom_request(self, method_name, response_cb, user_data, *args): + """Call a custom XML-RPC method on the remote server.""" + request = ThreadedRequest(self, self._xmlrpc_addr, method_name, response_cb, + user_data, *args) + self._add_request(request) + request.start() + + def notify_request_done(self, request): + self._requests_lock.acquire() + if request in self._requests: + self._requests.remove(request) + self._requests_lock.release() + + class UnicastStream(Stream): def __init__(self, service, group): Stream.__init__(self, service, group) @@ -94,8 +158,11 @@ class UnicastStream(Stream): raise ValueError("Handler name 'message' is a reserved handler.") self._reader.register_function(handler, name) - def new_writer(self, service): - return UnicastStreamWriter(self, service, self._owner_nick_name) + def new_writer(self, service, threaded=False): + if threaded: + return ThreadedUnicastStreamWriter(self, service, self._owner_nick_name) + else: + return UnicastStreamWriter(self, service, self._owner_nick_name) class MulticastStream(Stream): @@ -115,5 +182,5 @@ class MulticastStream(Stream): [ nick_name, data ] = data.split(" |**| ", 2) self.recv(nick_name, data) - def new_writer(self, service=None): + def new_writer(self, service=None, threaded=False): return self