More work on the new network stuff. Mercurial
This commit is contained in:
+9
-17
@@ -13,14 +13,14 @@ import sys
|
||||
|
||||
try:
|
||||
import activity
|
||||
from Group import *
|
||||
from sugar_globals import *
|
||||
except ImportError:
|
||||
from sugar import activity
|
||||
from sugar.Group import *
|
||||
from sugar.sugar_globals import *
|
||||
|
||||
import BuddyList
|
||||
import richtext
|
||||
import p2p
|
||||
|
||||
class Chat(activity.Activity):
|
||||
def __init__(self, controller):
|
||||
@@ -208,17 +208,9 @@ class GroupChat(Chat):
|
||||
Chat.__init__(self, self)
|
||||
|
||||
def _start(self):
|
||||
group = p2p.Group.get_instance()
|
||||
|
||||
self._buddy_list = group.get_buddy_list()
|
||||
self._buddy_list.add_buddy_listener(self._on_buddy_presence_event)
|
||||
|
||||
input_pipe = p2p.InputPipe(group, "group-chat")
|
||||
input_pipe.listen(self.recv_message)
|
||||
self._output_pipe = p2p.BroadcastOutputPipe(group, "group-chat")
|
||||
|
||||
input_pipe = p2p.InputPipe(group, "buddy-chat")
|
||||
input_pipe.listen(self._buddy_recv_message)
|
||||
self._group = LocalGroup()
|
||||
self._group.add_listener(self._on_group_event)
|
||||
self._group.join()
|
||||
|
||||
def _create_sidebar(self):
|
||||
vbox = gtk.VBox(False, 6)
|
||||
@@ -302,12 +294,12 @@ class GroupChat(Chat):
|
||||
buddy.set_chat(chat)
|
||||
chat.activity_connect_to_shell()
|
||||
|
||||
def _on_buddy_presence_event(self, action, buddy):
|
||||
if action == BuddyList.ACTION_BUDDY_ADDED:
|
||||
def _on_group_event(self, action, buddy):
|
||||
if action == BUDDY_JOIN:
|
||||
aniter = self._buddy_list_model.append(None)
|
||||
self._buddy_list_model.set(aniter, self._MODEL_COL_NICK, buddy.nick(),
|
||||
self._buddy_list_model.set(aniter, self._MODEL_COL_NICK, buddy.get_nick_name(),
|
||||
self._MODEL_COL_ICON, None, self._MODEL_COL_BUDDY, buddy)
|
||||
elif action == BuddyList.ACTION_BUDDY_REMOVED:
|
||||
elif action == BUDDY_LEAVE:
|
||||
aniter = self._get_iter_for_buddy(buddy)
|
||||
if aniter:
|
||||
self._buddy_list_model.remove(aniter)
|
||||
|
||||
-173
@@ -1,173 +0,0 @@
|
||||
# -*- tab-width: 4; indent-tabs-mode: t -*-
|
||||
|
||||
import socket
|
||||
import threading
|
||||
import traceback
|
||||
import select
|
||||
import time
|
||||
import xmlrpclib
|
||||
import sys
|
||||
|
||||
import gobject
|
||||
import SimpleXMLRPCServer
|
||||
import SocketServer
|
||||
|
||||
|
||||
__authinfos = {}
|
||||
|
||||
def _add_authinfo(authinfo):
|
||||
__authinfos[threading.currentThread()] = authinfo
|
||||
|
||||
def get_authinfo():
|
||||
return __authinfos.get(threading.currentThread())
|
||||
|
||||
def _del_authinfo():
|
||||
del __authinfos[threading.currentThread()]
|
||||
|
||||
|
||||
class GlibTCPServer(SocketServer.TCPServer):
|
||||
"""GlibTCPServer
|
||||
|
||||
Integrate socket accept into glib mainloop.
|
||||
"""
|
||||
|
||||
allow_reuse_address = True
|
||||
request_queue_size = 20
|
||||
|
||||
def __init__(self, server_address, RequestHandlerClass):
|
||||
SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
|
||||
self.socket.setblocking(0) # Set nonblocking
|
||||
|
||||
# Watch the listener socket for data
|
||||
gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)
|
||||
|
||||
def _handle_accept(self, source, condition):
|
||||
if not (condition & gobject.IO_IN):
|
||||
return True
|
||||
self.handle_request()
|
||||
return True
|
||||
|
||||
class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
|
||||
""" GlibXMLRPCRequestHandler
|
||||
|
||||
The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass
|
||||
the client's address and/or SSL certificate into the function that actually
|
||||
_processes_ the request. So we have to store it in a thread-indexed dict.
|
||||
"""
|
||||
|
||||
def do_POST(self):
|
||||
_add_authinfo(self.client_address)
|
||||
try:
|
||||
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
|
||||
except socket.timeout:
|
||||
pass
|
||||
except socket.error, e:
|
||||
print "Error (%s): socket error - '%s'" % (self.client_address, e)
|
||||
except:
|
||||
print "Error while processing POST:"
|
||||
traceback.print_exc()
|
||||
_del_authinfo()
|
||||
|
||||
class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
|
||||
"""GlibXMLRPCServer
|
||||
|
||||
Use nonblocking sockets and handle the accept via glib rather than
|
||||
blocking on accept().
|
||||
"""
|
||||
|
||||
def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1):
|
||||
self.logRequests = logRequests
|
||||
|
||||
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
|
||||
GlibTCPServer.__init__(self, addr, requestHandler)
|
||||
|
||||
def _marshaled_dispatch(self, data, dispatch_method = None):
|
||||
"""Dispatches an XML-RPC method from marshalled (XML) data.
|
||||
|
||||
XML-RPC methods are dispatched from the marshalled (XML) data
|
||||
using the _dispatch method and the result is returned as
|
||||
marshalled data. For backwards compatibility, a dispatch
|
||||
function can be provided as an argument (see comment in
|
||||
SimpleXMLRPCRequestHandler.do_POST) but overriding the
|
||||
existing method through subclassing is the prefered means
|
||||
of changing method dispatch behavior.
|
||||
"""
|
||||
|
||||
params, method = xmlrpclib.loads(data)
|
||||
|
||||
# generate response
|
||||
try:
|
||||
if dispatch_method is not None:
|
||||
response = dispatch_method(method, params)
|
||||
else:
|
||||
response = self._dispatch(method, params)
|
||||
# wrap response in a singleton tuple
|
||||
response = (response,)
|
||||
response = xmlrpclib.dumps(response, methodresponse=1)
|
||||
except xmlrpclib.Fault, fault:
|
||||
response = xmlrpclib.dumps(fault)
|
||||
except:
|
||||
print "Exception while processing request:"
|
||||
traceback.print_exc()
|
||||
|
||||
# report exception back to server
|
||||
response = xmlrpclib.dumps(
|
||||
xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
class GroupChatController(object):
|
||||
|
||||
_MAX_MSG_SIZE = 500
|
||||
|
||||
def __init__(self, address, port, data_cb):
|
||||
self._address = address
|
||||
self._port = port
|
||||
self._data_cb = data_cb
|
||||
|
||||
self._setup_sender()
|
||||
self._setup_listener()
|
||||
|
||||
def _setup_sender(self):
|
||||
self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
# Make the socket multicast-aware, and set TTL.
|
||||
self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit
|
||||
|
||||
def _setup_listener(self):
|
||||
# Listener socket
|
||||
self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
# Set some options to make it multicast-friendly
|
||||
self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
try:
|
||||
self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
||||
except:
|
||||
pass
|
||||
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20)
|
||||
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)
|
||||
|
||||
def start(self):
|
||||
# Set some more multicast options
|
||||
self._listen_sock.bind(('', self._port))
|
||||
self._listen_sock.settimeout(2)
|
||||
intf = socket.gethostbyname(socket.gethostname())
|
||||
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
|
||||
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0'))
|
||||
|
||||
# Watch the listener socket for data
|
||||
gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
|
||||
|
||||
def _handle_incoming_data(self, source, condition):
|
||||
if not (condition & gobject.IO_IN):
|
||||
return True
|
||||
msg = {}
|
||||
msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE)
|
||||
if self._data_cb:
|
||||
self._data_cb(msg)
|
||||
return True
|
||||
|
||||
def send_msg(self, data):
|
||||
self._send_sock.sendto(data, (self._address, self._port))
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
# -*- tab-width: 4; indent-tabs-mode: t -*-
|
||||
|
||||
import avahi, dbus, dbus.glib
|
||||
|
||||
OLPC_CHAT_SERVICE = "_olpc_chat._udp"
|
||||
|
||||
ACTION_SERVICE_NEW = 'new'
|
||||
ACTION_SERVICE_REMOVED = 'removed'
|
||||
|
||||
class PresenceDiscovery(object):
|
||||
def __init__(self):
|
||||
self.bus = dbus.SystemBus()
|
||||
self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER)
|
||||
self._service_browsers = {}
|
||||
self._service_type_browsers = {}
|
||||
self._service_listeners = []
|
||||
|
||||
def add_service_listener(self, listener):
|
||||
self._service_listeners.append(listener)
|
||||
|
||||
def start(self):
|
||||
# Always browse .local
|
||||
self.browse_domain(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, "local")
|
||||
db = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.DomainBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, "", avahi.DOMAIN_BROWSER_BROWSE, dbus.UInt32(0))), avahi.DBUS_INTERFACE_DOMAIN_BROWSER)
|
||||
db.connect_to_signal('ItemNew', self.new_domain)
|
||||
|
||||
def _error_handler(self, err):
|
||||
print "Error resolving: %s" % err
|
||||
|
||||
def resolve_service(self, interface, protocol, name, stype, domain, reply_handler, error_handler=None):
|
||||
if not error_handler:
|
||||
error_handler = self._error_handler
|
||||
self.server.ResolveService(int(interface), int(protocol), name, stype, domain, avahi.PROTO_UNSPEC, dbus.UInt32(0), reply_handler=reply_handler, error_handler=error_handler)
|
||||
|
||||
def new_service(self, interface, protocol, name, stype, domain, flags):
|
||||
# print "Found service '%s' (%d) of type '%s' in domain '%s' on %i.%i." % (name, flags, stype, domain, interface, protocol)
|
||||
for listener in self._service_listeners:
|
||||
listener(ACTION_SERVICE_NEW, interface, protocol, name, stype, domain, flags)
|
||||
|
||||
def remove_service(self, interface, protocol, name, stype, domain, flags):
|
||||
# print "Service '%s' of type '%s' in domain '%s' on %i.%i disappeared." % (name, stype, domain, interface, protocol)
|
||||
for listener in self._service_listeners:
|
||||
listener(ACTION_SERVICE_REMOVED, interface, protocol, name, stype, domain, flags)
|
||||
|
||||
def new_service_type(self, interface, protocol, stype, domain, flags):
|
||||
# Are we already browsing this domain for this type?
|
||||
if self._service_browsers.has_key((interface, protocol, stype, domain)):
|
||||
return
|
||||
|
||||
# print "Browsing for services of type '%s' in domain '%s' on %i.%i ..." % (stype, domain, interface, protocol)
|
||||
|
||||
b = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.ServiceBrowserNew(interface, protocol, stype, domain, dbus.UInt32(0))), avahi.DBUS_INTERFACE_SERVICE_BROWSER)
|
||||
b.connect_to_signal('ItemNew', self.new_service)
|
||||
b.connect_to_signal('ItemRemove', self.remove_service)
|
||||
|
||||
self._service_browsers[(interface, protocol, stype, domain)] = b
|
||||
|
||||
def browse_domain(self, interface, protocol, domain):
|
||||
# Are we already browsing this domain?
|
||||
if self._service_type_browsers.has_key((interface, protocol, domain)):
|
||||
return
|
||||
|
||||
# print "Browsing domain '%s' on %i.%i ..." % (domain, interface, protocol)
|
||||
|
||||
b = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.ServiceTypeBrowserNew(interface, protocol, domain, dbus.UInt32(0))), avahi.DBUS_INTERFACE_SERVICE_TYPE_BROWSER)
|
||||
b.connect_to_signal('ItemNew', self.new_service_type)
|
||||
|
||||
self._service_type_browsers[(interface, protocol, domain)] = b
|
||||
|
||||
def new_domain(self,interface, protocol, domain, flags):
|
||||
if domain != "local":
|
||||
return
|
||||
self.browse_domain(interface, protocol, domain)
|
||||
|
||||
|
||||
class PresenceAnnounce(object):
|
||||
def __init__(self):
|
||||
self.bus = dbus.SystemBus()
|
||||
self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER)
|
||||
self._hostname = None
|
||||
|
||||
def register_service(self, rs_name, rs_port, rs_service, **kwargs):
|
||||
g = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.EntryGroupNew()), avahi.DBUS_INTERFACE_ENTRY_GROUP)
|
||||
|
||||
if rs_name is None:
|
||||
if self._hostname is None:
|
||||
self._hostname = "%s:%s" % (self.server.GetHostName(), rs_port)
|
||||
rs_name = self._hostname
|
||||
|
||||
info = ["%s=%s" % (k,v) for k,v in kwargs.items()]
|
||||
g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, rs_name, rs_service,
|
||||
"", "", # domain, host (let the system figure it out)
|
||||
dbus.UInt16(rs_port), info,)
|
||||
g.Commit()
|
||||
return g
|
||||
Reference in New Issue
Block a user