This commit is contained in:
Marco Pesenti Gritti 2006-05-22 02:11:39 -04:00
parent d3c5da1a66
commit f5173c33d0
4 changed files with 158 additions and 39 deletions

View File

@ -14,6 +14,7 @@ import gtk, gobject
from sugar.shell import activity
from sugar.p2p.Group import Group
from sugar.p2p import Buddy
from sugar.p2p.Group import LocalGroup
from sugar.p2p.Service import Service
from sugar.p2p.Stream import Stream
@ -34,6 +35,7 @@ GROUP_CHAT_SERVICE_PORT = 6200
class Chat(activity.Activity):
def __init__(self, controller):
Buddy.recognize_buddy_service_type(CHAT_SERVICE_TYPE)
self._controller = controller
activity.Activity.__init__(self)
self._stream_writer = None
@ -389,7 +391,7 @@ class BuddyChat(Chat):
self.activity_set_can_close(True)
self.activity_set_tab_icon_name("im")
self.activity_show_icon(True)
self._stream_writer = self._controller.new_buddy_writer(self._buddy.get_service_name())
self._stream_writer = self._controller.new_buddy_writer(self._buddy)
def recv_message(self, sender, msg):
Chat.recv_message(self, self._buddy, msg)
@ -433,17 +435,17 @@ class GroupChat(Chat):
def get_group(self):
return self._group
def new_buddy_writer(self, buddy_name):
service = self._group.get_service(buddy_name, CHAT_SERVICE_TYPE)
return self._buddy_stream.new_writer(service)
def new_buddy_writer(self, buddy, threaded=False):
service = buddy.get_service(CHAT_SERVICE_TYPE)
return self._buddy_stream.new_writer(service, threaded=threaded)
def _start(self):
self._group = LocalGroup()
self._group.add_presence_listener(self._on_group_presence_event)
self._group.add_service_listener(self._on_group_service_event)
self._group.join()
name = self._group.get_owner().get_service_name()
name = self._group.get_owner().get_nick_name()
# Group controls the Stream for incoming messages for
# specific buddy chats
@ -557,14 +559,18 @@ class GroupChat(Chat):
self._chats[buddy] = chat
chat.activity_connect_to_shell()
def _request_buddy_icon(self, buddy):
writer = self.new_buddy_writer(buddy.get_service_name())
icon = writer.custom_request("get_buddy_icon")
def _request_buddy_icon_cb(self, response, user_data):
icon = response
buddy = user_data
if icon and len(icon):
icon = base64.b64decode(icon)
print "Setting buddy icon for '%s' to %s" % (buddy.get_nick_name(), icon)
print "Buddy icon for '%s' is size %d" % (buddy.get_nick_name(), len(icon))
buddy.set_icon(icon)
def _request_buddy_icon(self, buddy):
writer = self.new_buddy_writer(buddy, threaded=True)
icon = writer.custom_request("get_buddy_icon", self._request_buddy_icon_cb, buddy)
def _on_group_service_event(self, action, service):
if action == Group.SERVICE_ADDED:
# Look for the olpc chat service
@ -574,6 +580,7 @@ class GroupChat(Chat):
if buddy and buddy.get_address() == service.get_address():
# Try to get the buddy's icon
if buddy.get_nick_name() != self._group.get_owner().get_nick_name():
print "Requesting buddy icon from '%s'." % buddy.get_nick_name()
self._request_buddy_icon(buddy)
elif action == Group.SERVICE_REMOVED:
pass
@ -651,6 +658,7 @@ if len(sys.argv) > 1 and sys.argv[1] == "--console":
ChatShell.get_instance().open_group_chat()
try:
gtk.threads_init()
gtk.main()
except KeyboardInterrupt:
pass

View File

@ -2,38 +2,63 @@ import pwd
import os
from Service import Service
from sugar import env
PRESENCE_SERVICE_TYPE = "_olpc_presence._tcp"
PRESENCE_SERVICE_PORT = 6000
class BuddyBase:
def __init__(self, service, nick_name):
self._service = service
self._nick_name = nick_name
__buddy_service_types = [PRESENCE_SERVICE_TYPE]
def recognize_buddy_service_type(stype):
if stype not in __buddy_service_types:
__buddy_service_types.append(stype)
def get_recognized_buddy_service_types():
return __buddy_service_types[:]
class Buddy(object):
def __init__(self, service):
self._services = {}
self._services[service.get_type()] = service
self._nick_name = service.get_name()
self._address = service.get_address()
def get_icon(self):
"""Return the buddies icon, if any."""
return self._icon
def get_address(self):
return self._service.get_address()
return self._address
def get_service_name(self):
return self._service.get_name()
def add_service(self, service):
if service.get_name() != self._nick_name:
return False
if service.get_address() != self._address:
return False
if self._services.has_key(service.get_type()):
return False
self._services[service.get_type()] = service
def remove_service(self, stype):
if self._services.has_key(stype):
del self._services[stype]
def get_service(self, stype):
if self._services.has_key(stype):
return self._services[stype]
return None
def get_nick_name(self):
return self._nick_name
class Buddy(BuddyBase):
"""Normal buddy class."""
def set_icon(self, icon):
"""Can only set icon for other buddies. The Owner
takes care of setting it's own icon."""
self._icon = icon
class Owner(BuddyBase):
class Owner(Buddy):
"""Class representing the owner of this machine/instance."""
def __init__(self, group):
self._group = group
@ -42,19 +67,20 @@ class Owner(BuddyBase):
if not nick or not len(nick):
nick = "n00b"
service = Service(nick, PRESENCE_SERVICE_TYPE, PRESENCE_SERVICE_PORT)
BuddyBase.__init__(self, service, nick)
self._presence_service = Service(nick, PRESENCE_SERVICE_TYPE, PRESENCE_SERVICE_PORT)
Buddy.__init__(self, self._presence_service)
sugar_dir = os.path.abspath(os.path.expanduser("~/.sugar"))
icon = None
for fname in os.listdir(sugar_dir):
for fname in os.listdir(env.get_user_dir()):
if not fname.startswith("buddy-icon."):
continue
fd = open(os.path.join(sugar_dir, fname), "r")
fd = open(os.path.join(env.get_user_dir(), fname), "r")
self._icon = fd.read()
fd.close()
break
def set_icon(self, icon):
"""Can only set icon in constructor for now."""
pass
def register(self):
self._service.register(self._group)
self._presence_service.register(self._group)

View File

@ -1,6 +1,7 @@
import avahi
from Buddy import Buddy
from Buddy import get_recognized_buddy_service_types
from Buddy import Owner
from Buddy import PRESENCE_SERVICE_TYPE
from Service import Service
@ -106,8 +107,14 @@ class LocalGroup(Group):
self._pdiscovery.resolve_service(interface, protocol, name, stype, domain,
self._on_service_resolved)
elif action == presence.ACTION_SERVICE_REMOVED:
if stype == PRESENCE_SERVICE_TYPE:
self._remove_buddy(name)
if stype in get_recognized_buddy_service_types():
buddy = self.get_buddy(name)
if buddy:
buddy.remove_service(stype)
# Removal of the presence service removes the buddy too
if stype == PRESENCE_SERVICE_TYPE:
self._remove_buddy(name)
self.remove_service((name, stype))
elif stype.startswith(_OLPC_SERVICE_TYPE_PREFIX):
self.remove_service((name, stype))
@ -119,9 +126,20 @@ class LocalGroup(Group):
for prop in avahi.txt_array_to_string_array(txt):
(key, value) = prop.split('=')
if key == 'group_address':
service.set_group_address(value)
service.set_group_address(value)
if stype == PRESENCE_SERVICE_TYPE:
self._add_buddy(Buddy(service, name))
elif stype.startswith(_OLPC_SERVICE_TYPE_PREFIX):
# print "ServiceResolved: name=%s, stype=%s, port=%s, address=%s" % (name, stype, port, address)
if stype in get_recognized_buddy_service_types():
# Service recognized as Buddy services either create a new
# buddy if one doesn't exist yet, or get added to the existing
# buddy
buddy = self.get_buddy(name)
if buddy:
buddy.add_service(service)
else:
self._add_buddy(Buddy(service))
self.add_service(service)
elif stype.startswith(_OLPC_SERVICE_TYPE_PREFIX):
# These services aren't associated with buddies
self.add_service(service)

View File

@ -1,6 +1,12 @@
import xmlrpclib
import socket
import traceback
import threading
import pygtk
pygtk.require('2.0')
import gobject
import network
from MostlyReliablePipe import MostlyReliablePipe
@ -32,7 +38,7 @@ class Stream(object):
self._callback(self._group.get_buddy(nick_name), data)
class UnicastStreamWriter(object):
class UnicastStreamWriterBase(object):
def __init__(self, stream, service, owner_nick_name):
# set up the writer
if not service:
@ -42,6 +48,10 @@ class UnicastStreamWriter(object):
self._address = self._service.get_address()
self._port = self._service.get_port()
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
class UnicastStreamWriter(UnicastStreamWriterBase):
def __init__(self, stream, service, owner_nick_name):
UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr)
def write(self, data):
@ -63,6 +73,60 @@ class UnicastStreamWriter(object):
return None
class ThreadedRequest(threading.Thread):
def __init__(self, controller, addr, method, response_cb, user_data, *args):
threading.Thread.__init__(self)
self._controller = controller
self._method = method
self._args = args
self._response_cb = response_cb
self._user_data = user_data
self._writer = xmlrpclib.ServerProxy(addr)
def run(self):
response = None
try:
method = getattr(self._writer, self._method)
response = method(*self._args)
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
if self._response_cb:
gobject.idle_add(self._response_cb, response, self._user_data)
self._controller.notify_request_done(self)
class ThreadedUnicastStreamWriter(UnicastStreamWriterBase):
def __init__(self, stream, service, owner_nick_name):
self._requests_lock = threading.Lock()
self._requests = []
UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
def _add_request(self, request):
self._requests_lock.acquire()
if not request in self._requests:
self._requests.append(request)
self._requests_lock.release()
def write(self, response_cb, user_data, data):
"""Write some data to the default endpoint of this pipe on the remote server."""
request = ThreadedRequest(self, self._xmlrpc_addr, "message", response_cb,
user_data, self._owner_nick_name, data)
self._add_request(request)
request.start()
def custom_request(self, method_name, response_cb, user_data, *args):
"""Call a custom XML-RPC method on the remote server."""
request = ThreadedRequest(self, self._xmlrpc_addr, method_name, response_cb,
user_data, *args)
self._add_request(request)
request.start()
def notify_request_done(self, request):
self._requests_lock.acquire()
if request in self._requests:
self._requests.remove(request)
self._requests_lock.release()
class UnicastStream(Stream):
def __init__(self, service, group):
Stream.__init__(self, service, group)
@ -94,8 +158,11 @@ class UnicastStream(Stream):
raise ValueError("Handler name 'message' is a reserved handler.")
self._reader.register_function(handler, name)
def new_writer(self, service):
return UnicastStreamWriter(self, service, self._owner_nick_name)
def new_writer(self, service, threaded=False):
if threaded:
return ThreadedUnicastStreamWriter(self, service, self._owner_nick_name)
else:
return UnicastStreamWriter(self, service, self._owner_nick_name)
class MulticastStream(Stream):
@ -115,5 +182,5 @@ class MulticastStream(Stream):
[ nick_name, data ] = data.split(" |**| ", 2)
self.recv(nick_name, data)
def new_writer(self, service=None):
def new_writer(self, service=None, threaded=False):
return self