- Refactor service handling. Buddies can now have more than one service associated
This commit is contained in:
parent
cbae21b487
commit
3b2f185e5c
@ -14,6 +14,7 @@ import gtk, gobject
|
||||
|
||||
from sugar.shell import activity
|
||||
from sugar.p2p.Group import Group
|
||||
from sugar.p2p import Buddy
|
||||
from sugar.p2p.Group import LocalGroup
|
||||
from sugar.p2p.Service import Service
|
||||
from sugar.p2p.Stream import Stream
|
||||
@ -34,6 +35,7 @@ GROUP_CHAT_SERVICE_PORT = 6200
|
||||
|
||||
class Chat(activity.Activity):
|
||||
def __init__(self, controller):
|
||||
Buddy.recognize_buddy_service_type(CHAT_SERVICE_TYPE)
|
||||
self._controller = controller
|
||||
activity.Activity.__init__(self)
|
||||
self._stream_writer = None
|
||||
@ -372,7 +374,7 @@ class BuddyChat(Chat):
|
||||
self.activity_set_can_close(True)
|
||||
self.activity_set_tab_icon_name("im")
|
||||
self.activity_show_icon(True)
|
||||
self._stream_writer = self._controller.new_buddy_writer(self._buddy.get_service_name())
|
||||
self._stream_writer = self._controller.new_buddy_writer(self._buddy)
|
||||
|
||||
def recv_message(self, sender, msg):
|
||||
Chat.recv_message(self, self._buddy, msg)
|
||||
@ -416,17 +418,17 @@ class GroupChat(Chat):
|
||||
def get_group(self):
|
||||
return self._group
|
||||
|
||||
def new_buddy_writer(self, buddy_name):
|
||||
service = self._group.get_service(buddy_name, CHAT_SERVICE_TYPE)
|
||||
return self._buddy_stream.new_writer(service)
|
||||
def new_buddy_writer(self, buddy, threaded=False):
|
||||
service = buddy.get_service(CHAT_SERVICE_TYPE)
|
||||
return self._buddy_stream.new_writer(service, threaded=threaded)
|
||||
|
||||
def _start(self):
|
||||
self._group = LocalGroup()
|
||||
self._group.add_presence_listener(self._on_group_presence_event)
|
||||
self._group.add_service_listener(self._on_group_service_event)
|
||||
self._group.join()
|
||||
|
||||
name = self._group.get_owner().get_service_name()
|
||||
|
||||
name = self._group.get_owner().get_nick_name()
|
||||
|
||||
# Group controls the Stream for incoming messages for
|
||||
# specific buddy chats
|
||||
@ -540,14 +542,18 @@ class GroupChat(Chat):
|
||||
self._chats[buddy] = chat
|
||||
chat.activity_connect_to_shell()
|
||||
|
||||
def _request_buddy_icon(self, buddy):
|
||||
writer = self.new_buddy_writer(buddy.get_service_name())
|
||||
icon = writer.custom_request("get_buddy_icon")
|
||||
def _request_buddy_icon_cb(self, response, user_data):
|
||||
icon = response
|
||||
buddy = user_data
|
||||
if icon and len(icon):
|
||||
icon = base64.b64decode(icon)
|
||||
print "Setting buddy icon for '%s' to %s" % (buddy.get_nick_name(), icon)
|
||||
print "Buddy icon for '%s' is size %d" % (buddy.get_nick_name(), len(icon))
|
||||
buddy.set_icon(icon)
|
||||
|
||||
def _request_buddy_icon(self, buddy):
|
||||
writer = self.new_buddy_writer(buddy, threaded=True)
|
||||
icon = writer.custom_request("get_buddy_icon", self._request_buddy_icon_cb, buddy)
|
||||
|
||||
def _on_group_service_event(self, action, service):
|
||||
if action == Group.SERVICE_ADDED:
|
||||
# Look for the olpc chat service
|
||||
@ -557,6 +563,7 @@ class GroupChat(Chat):
|
||||
if buddy and buddy.get_address() == service.get_address():
|
||||
# Try to get the buddy's icon
|
||||
if buddy.get_nick_name() != self._group.get_owner().get_nick_name():
|
||||
print "Requesting buddy icon from '%s'." % buddy.get_nick_name()
|
||||
self._request_buddy_icon(buddy)
|
||||
elif action == Group.SERVICE_REMOVED:
|
||||
pass
|
||||
@ -634,6 +641,7 @@ if len(sys.argv) > 1 and sys.argv[1] == "--console":
|
||||
|
||||
ChatShell.get_instance().open_group_chat()
|
||||
try:
|
||||
gtk.threads_init()
|
||||
gtk.main()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
@ -2,38 +2,63 @@ import pwd
|
||||
import os
|
||||
|
||||
from Service import Service
|
||||
from sugar import env
|
||||
|
||||
PRESENCE_SERVICE_TYPE = "_olpc_presence._tcp"
|
||||
PRESENCE_SERVICE_PORT = 6000
|
||||
|
||||
class BuddyBase:
|
||||
def __init__(self, service, nick_name):
|
||||
self._service = service
|
||||
self._nick_name = nick_name
|
||||
__buddy_service_types = [PRESENCE_SERVICE_TYPE]
|
||||
|
||||
def recognize_buddy_service_type(stype):
|
||||
if stype not in __buddy_service_types:
|
||||
__buddy_service_types.append(stype)
|
||||
|
||||
def get_recognized_buddy_service_types():
|
||||
return __buddy_service_types[:]
|
||||
|
||||
|
||||
class Buddy(object):
|
||||
def __init__(self, service):
|
||||
self._services = {}
|
||||
self._services[service.get_type()] = service
|
||||
self._nick_name = service.get_name()
|
||||
self._address = service.get_address()
|
||||
|
||||
def get_icon(self):
|
||||
"""Return the buddies icon, if any."""
|
||||
return self._icon
|
||||
|
||||
def get_address(self):
|
||||
return self._service.get_address()
|
||||
return self._address
|
||||
|
||||
def get_service_name(self):
|
||||
return self._service.get_name()
|
||||
def add_service(self, service):
|
||||
if service.get_name() != self._nick_name:
|
||||
return False
|
||||
if service.get_address() != self._address:
|
||||
return False
|
||||
if self._services.has_key(service.get_type()):
|
||||
return False
|
||||
self._services[service.get_type()] = service
|
||||
|
||||
def remove_service(self, stype):
|
||||
if self._services.has_key(stype):
|
||||
del self._services[stype]
|
||||
|
||||
def get_service(self, stype):
|
||||
if self._services.has_key(stype):
|
||||
return self._services[stype]
|
||||
return None
|
||||
|
||||
def get_nick_name(self):
|
||||
return self._nick_name
|
||||
|
||||
class Buddy(BuddyBase):
|
||||
"""Normal buddy class."""
|
||||
|
||||
def set_icon(self, icon):
|
||||
"""Can only set icon for other buddies. The Owner
|
||||
takes care of setting it's own icon."""
|
||||
self._icon = icon
|
||||
|
||||
|
||||
|
||||
class Owner(BuddyBase):
|
||||
class Owner(Buddy):
|
||||
"""Class representing the owner of this machine/instance."""
|
||||
def __init__(self, group):
|
||||
self._group = group
|
||||
@ -42,19 +67,20 @@ class Owner(BuddyBase):
|
||||
if not nick or not len(nick):
|
||||
nick = "n00b"
|
||||
|
||||
service = Service(nick, PRESENCE_SERVICE_TYPE, PRESENCE_SERVICE_PORT)
|
||||
BuddyBase.__init__(self, service, nick)
|
||||
self._presence_service = Service(nick, PRESENCE_SERVICE_TYPE, PRESENCE_SERVICE_PORT)
|
||||
Buddy.__init__(self, self._presence_service)
|
||||
|
||||
sugar_dir = os.path.abspath(os.path.expanduser("~/.sugar"))
|
||||
icon = None
|
||||
for fname in os.listdir(sugar_dir):
|
||||
for fname in os.listdir(env.get_user_dir()):
|
||||
if not fname.startswith("buddy-icon."):
|
||||
continue
|
||||
fd = open(os.path.join(sugar_dir, fname), "r")
|
||||
fd = open(os.path.join(env.get_user_dir(), fname), "r")
|
||||
self._icon = fd.read()
|
||||
fd.close()
|
||||
break
|
||||
|
||||
def set_icon(self, icon):
|
||||
"""Can only set icon in constructor for now."""
|
||||
pass
|
||||
|
||||
def register(self):
|
||||
self._service.register(self._group)
|
||||
self._presence_service.register(self._group)
|
||||
|
@ -1,6 +1,7 @@
|
||||
import avahi
|
||||
|
||||
from Buddy import Buddy
|
||||
from Buddy import get_recognized_buddy_service_types
|
||||
from Buddy import Owner
|
||||
from Buddy import PRESENCE_SERVICE_TYPE
|
||||
from Service import Service
|
||||
@ -105,8 +106,14 @@ class LocalGroup(Group):
|
||||
self._pdiscovery.resolve_service(interface, protocol, name, stype, domain,
|
||||
self._on_service_resolved)
|
||||
elif action == presence.ACTION_SERVICE_REMOVED:
|
||||
if stype == PRESENCE_SERVICE_TYPE:
|
||||
self._remove_buddy(name)
|
||||
if stype in get_recognized_buddy_service_types():
|
||||
buddy = self.get_buddy(name)
|
||||
if buddy:
|
||||
buddy.remove_service(stype)
|
||||
# Removal of the presence service removes the buddy too
|
||||
if stype == PRESENCE_SERVICE_TYPE:
|
||||
self._remove_buddy(name)
|
||||
self.remove_service((name, stype))
|
||||
elif stype.startswith(_OLPC_SERVICE_TYPE_PREFIX):
|
||||
self.remove_service((name, stype))
|
||||
|
||||
@ -118,9 +125,20 @@ class LocalGroup(Group):
|
||||
for prop in avahi.txt_array_to_string_array(txt):
|
||||
(key, value) = prop.split('=')
|
||||
if key == 'group_address':
|
||||
service.set_group_address(value)
|
||||
service.set_group_address(value)
|
||||
|
||||
if stype == PRESENCE_SERVICE_TYPE:
|
||||
self._add_buddy(Buddy(service, name))
|
||||
elif stype.startswith(_OLPC_SERVICE_TYPE_PREFIX):
|
||||
# print "ServiceResolved: name=%s, stype=%s, port=%s, address=%s" % (name, stype, port, address)
|
||||
if stype in get_recognized_buddy_service_types():
|
||||
# Service recognized as Buddy services either create a new
|
||||
# buddy if one doesn't exist yet, or get added to the existing
|
||||
# buddy
|
||||
buddy = self.get_buddy(name)
|
||||
if buddy:
|
||||
buddy.add_service(service)
|
||||
else:
|
||||
self._add_buddy(Buddy(service))
|
||||
self.add_service(service)
|
||||
elif stype.startswith(_OLPC_SERVICE_TYPE_PREFIX):
|
||||
# These services aren't associated with buddies
|
||||
self.add_service(service)
|
||||
|
||||
|
@ -1,6 +1,12 @@
|
||||
import xmlrpclib
|
||||
import socket
|
||||
import traceback
|
||||
import threading
|
||||
|
||||
import pygtk
|
||||
pygtk.require('2.0')
|
||||
import gobject
|
||||
|
||||
|
||||
import network
|
||||
from MostlyReliablePipe import MostlyReliablePipe
|
||||
@ -32,7 +38,7 @@ class Stream(object):
|
||||
self._callback(self._group.get_buddy(nick_name), data)
|
||||
|
||||
|
||||
class UnicastStreamWriter(object):
|
||||
class UnicastStreamWriterBase(object):
|
||||
def __init__(self, stream, service, owner_nick_name):
|
||||
# set up the writer
|
||||
if not service:
|
||||
@ -42,6 +48,10 @@ class UnicastStreamWriter(object):
|
||||
self._address = self._service.get_address()
|
||||
self._port = self._service.get_port()
|
||||
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
|
||||
|
||||
class UnicastStreamWriter(UnicastStreamWriterBase):
|
||||
def __init__(self, stream, service, owner_nick_name):
|
||||
UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
|
||||
self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr)
|
||||
|
||||
def write(self, data):
|
||||
@ -63,6 +73,60 @@ class UnicastStreamWriter(object):
|
||||
return None
|
||||
|
||||
|
||||
class ThreadedRequest(threading.Thread):
|
||||
def __init__(self, controller, addr, method, response_cb, user_data, *args):
|
||||
threading.Thread.__init__(self)
|
||||
self._controller = controller
|
||||
self._method = method
|
||||
self._args = args
|
||||
self._response_cb = response_cb
|
||||
self._user_data = user_data
|
||||
self._writer = xmlrpclib.ServerProxy(addr)
|
||||
|
||||
def run(self):
|
||||
response = None
|
||||
try:
|
||||
method = getattr(self._writer, self._method)
|
||||
response = method(*self._args)
|
||||
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
|
||||
traceback.print_exc()
|
||||
if self._response_cb:
|
||||
gobject.idle_add(self._response_cb, response, self._user_data)
|
||||
self._controller.notify_request_done(self)
|
||||
|
||||
class ThreadedUnicastStreamWriter(UnicastStreamWriterBase):
|
||||
def __init__(self, stream, service, owner_nick_name):
|
||||
self._requests_lock = threading.Lock()
|
||||
self._requests = []
|
||||
UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
|
||||
|
||||
def _add_request(self, request):
|
||||
self._requests_lock.acquire()
|
||||
if not request in self._requests:
|
||||
self._requests.append(request)
|
||||
self._requests_lock.release()
|
||||
|
||||
def write(self, response_cb, user_data, data):
|
||||
"""Write some data to the default endpoint of this pipe on the remote server."""
|
||||
request = ThreadedRequest(self, self._xmlrpc_addr, "message", response_cb,
|
||||
user_data, self._owner_nick_name, data)
|
||||
self._add_request(request)
|
||||
request.start()
|
||||
|
||||
def custom_request(self, method_name, response_cb, user_data, *args):
|
||||
"""Call a custom XML-RPC method on the remote server."""
|
||||
request = ThreadedRequest(self, self._xmlrpc_addr, method_name, response_cb,
|
||||
user_data, *args)
|
||||
self._add_request(request)
|
||||
request.start()
|
||||
|
||||
def notify_request_done(self, request):
|
||||
self._requests_lock.acquire()
|
||||
if request in self._requests:
|
||||
self._requests.remove(request)
|
||||
self._requests_lock.release()
|
||||
|
||||
|
||||
class UnicastStream(Stream):
|
||||
def __init__(self, service, group):
|
||||
Stream.__init__(self, service, group)
|
||||
@ -94,8 +158,11 @@ class UnicastStream(Stream):
|
||||
raise ValueError("Handler name 'message' is a reserved handler.")
|
||||
self._reader.register_function(handler, name)
|
||||
|
||||
def new_writer(self, service):
|
||||
return UnicastStreamWriter(self, service, self._owner_nick_name)
|
||||
def new_writer(self, service, threaded=False):
|
||||
if threaded:
|
||||
return ThreadedUnicastStreamWriter(self, service, self._owner_nick_name)
|
||||
else:
|
||||
return UnicastStreamWriter(self, service, self._owner_nick_name)
|
||||
|
||||
|
||||
class MulticastStream(Stream):
|
||||
@ -115,5 +182,5 @@ class MulticastStream(Stream):
|
||||
[ nick_name, data ] = data.split(" |**| ", 2)
|
||||
self.recv(nick_name, data)
|
||||
|
||||
def new_writer(self, service=None):
|
||||
def new_writer(self, service=None, threaded=False):
|
||||
return self
|
||||
|
Loading…
Reference in New Issue
Block a user