Some new files I forgot in the previous commit

This commit is contained in:
Marco Pesenti Gritti
2006-05-12 02:34:20 -04:00
parent f6491e6afc
commit 34987e1ef8
32 changed files with 3471 additions and 0 deletions
+34
View File
@@ -0,0 +1,34 @@
import pwd
import os
from Service import *
PRESENCE_SERVICE_TYPE = "_olpc_presence._tcp"
PRESENCE_SERVICE_PORT = 6000
class Buddy:
def __init__(self, service, nick_name):
self._service = service
self._nick_name = nick_name
def get_service_name(self):
return self._service.get_name()
def get_nick_name(self):
return self._nick_name
class Owner(Buddy):
def __init__(self, group):
self._group = group
nick = pwd.getpwuid(os.getuid())[0]
if not nick or not len(nick):
nick = "n00b"
service = Service(nick, PRESENCE_SERVICE_TYPE,
'', PRESENCE_SERVICE_PORT)
Buddy.__init__(self, service, nick)
def register(self):
self._service.register(self._group)
+102
View File
@@ -0,0 +1,102 @@
import avahi
import presence
from Buddy import *
from Service import *
SERVICE_ADDED = "service_added"
SERVICE_REMOVED = "service_removed"
BUDDY_JOIN = "buddy_join"
BUDDY_LEAVE = "buddy_leave"
class Group:
def __init__(self):
self._service_listeners = []
self._presence_listeners = []
def join(self, buddy):
pass
def add_service_listener(self, listener):
self._service_listeners.append(listener)
def add_presence_listener(self, listener):
self._presence_listeners.append(listener)
def _notify_service_added(self, service):
for listener in self._service_listeners:
listener(SERVICE_ADDED, buddy)
def _notify_service_removed(self, service):
for listener in self._service_listeners:
listener(SERVICE_REMOVED,buddy)
def _notify_buddy_join(self, buddy):
for listener in self._presence_listeners:
listener(BUDDY_JOIN, buddy)
def _notify_buddy_leave(self, buddy):
for listener in self._presence_listeners:
listener(BUDDY_LEAVE, buddy)
class LocalGroup(Group):
def __init__(self):
Group.__init__(self)
self._services = {}
self._buddies = {}
self._pdiscovery = presence.PresenceDiscovery()
self._pdiscovery.add_service_listener(self._on_service_change)
self._pdiscovery.start()
def get_owner(self):
return self._owner
def add_service(self, service):
sid = (service.get_name(), service.get_type())
self._services[sid] = service
self._notify_service_added(service)
def remove_service(self, sid):
self._notify_service_removed(service)
del self._services[sid]
def join(self):
self._owner = Owner(self)
self._owner.register()
def get_service(self, name, stype):
return self._services[(name, stype)]
def get_buddy(self, name):
return self._buddies[name]
def _add_buddy(self, buddy):
bid = buddy.get_nick_name()
if not self._buddies.has_key(bid):
self._buddies[bid] = buddy
self._notify_buddy_join(buddy)
def _remove_buddy(self, buddy):
self._notify_buddy_leave(buddy)
del self._buddies[buddy.get_nick_name()]
def _on_service_change(self, action, interface, protocol, name, stype, domain, flags):
if action == presence.ACTION_SERVICE_NEW:
self._pdiscovery.resolve_service(interface, protocol, name, stype, domain,
self._on_service_resolved)
elif action == presence.ACTION_SERVICE_REMOVED:
if stype == PRESENCE_SERVICE_TYPE:
self._remove_buddy(name)
elif stype.startswith("_olpc"):
self.remove_service((name, stype))
def _on_service_resolved(self, interface, protocol, name, stype, domain,
host, aprotocol, address, port, txt, flags):
service = Service(name, stype, address, port)
if stype == PRESENCE_SERVICE_TYPE:
self._add_buddy(Buddy(service, name))
elif stype.startswith("_olpc"):
self.add_service(service)
+10
View File
@@ -0,0 +1,10 @@
sugardir = $(pythondir)/sugar/p2p
sugar_PYTHON = \
__init__.py \
Buddy.py \
Group.py \
Service.py \
StreamReader.py \
StreamWriter.py \
network.py \
presence.py
+31
View File
@@ -0,0 +1,31 @@
import presence
class Service(object):
def __init__(self, name, stype, address, port, multicast=False):
self._name = name
self._stype = stype
self._address = str(address)
self._port = int(port)
self._multicast = multicast
def get_name(self):
return self._name
def get_type(self):
return self._stype
def get_address(self):
return self._address
def get_port(self):
return self._port
def set_port(self, port):
self._port = port
def is_multicast(self):
return self._multicast
def register(self, group):
pannounce = presence.PresenceAnnounce()
pannounce.register_service(self._name, self._port, self._stype)
+51
View File
@@ -0,0 +1,51 @@
import network
class StreamReaderRequestHandler(object):
def __init__(self, reader):
self._reader = reader
def message(self, nick_name, message):
address = network.get_authinfo()
self._reader.recv(nick_name, message)
return True
class StreamReader:
def __init__(self, group, service):
self._group = group
self._service = service
if self._service.is_multicast():
self._setup_multicast()
else:
self._setup_unicast()
def set_listener(self, callback):
self._callback = callback
def _setup_multicast(self):
address = self._service.get_address()
port = self._service.get_port()
server = network.GroupServer(address, port, self._recv_multicast)
server.start()
def _setup_unicast(self):
started = False
tries = 10
port = self._service.get_port()
while not started and tries > 0:
try:
p2p_server = network.GlibXMLRPCServer(("", port))
p2p_server.register_instance(StreamReaderRequestHandler(self))
started = True
except:
port = port + 1
tries = tries - 1
self._service.set_port(port)
def _recv_multicast(self, msg):
[ nick_name, data ] = msg['data'].split(" |**| ", 2)
self.recv(nick_name, data)
def recv(self, nick_name, data):
if nick_name != self._group.get_owner().get_nick_name():
self._callback(self._group.get_buddy(nick_name), data)
+43
View File
@@ -0,0 +1,43 @@
import xmlrpclib
import traceback
import socket
import network
class StreamWriter:
def __init__(self, group, service):
self._group = group
self._service = service
self._address = self._service.get_address()
self._port = self._service.get_port()
if self._service.is_multicast():
self._setup_multicast()
else:
self._setup_unicast()
def write(self, data):
if self._service.is_multicast():
self._multicast_write(data)
else:
self._unicast_write(data)
def _setup_unicast(self):
xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
self._uclient = xmlrpclib.ServerProxy(xmlrpc_addr)
def _unicast_write(self, data):
try:
nick_name = self._group.get_owner().get_nick_name()
self._uclient.message(nick_name, data)
return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError), e:
traceback.print_exc()
return False
def _setup_multicast(self):
self._mclient = network.GroupClient(self._address, self._port)
def _multicast_write(self, data):
nick_name = self._group.get_owner().get_nick_name()
self._mclient.send_msg(nick_name + " |**| " + data)
View File
+176
View File
@@ -0,0 +1,176 @@
# -*- tab-width: 4; indent-tabs-mode: t -*-
import socket
import threading
import traceback
import select
import time
import xmlrpclib
import sys
import gobject
import SimpleXMLRPCServer
import SocketServer
__authinfos = {}
def _add_authinfo(authinfo):
__authinfos[threading.currentThread()] = authinfo
def get_authinfo():
return __authinfos.get(threading.currentThread())
def _del_authinfo():
del __authinfos[threading.currentThread()]
class GlibTCPServer(SocketServer.TCPServer):
"""GlibTCPServer
Integrate socket accept into glib mainloop.
"""
allow_reuse_address = True
request_queue_size = 20
def __init__(self, server_address, RequestHandlerClass):
SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
self.socket.setblocking(0) # Set nonblocking
# Watch the listener socket for data
gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)
def _handle_accept(self, source, condition):
if not (condition & gobject.IO_IN):
return True
self.handle_request()
return True
class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
""" GlibXMLRPCRequestHandler
The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass
the client's address and/or SSL certificate into the function that actually
_processes_ the request. So we have to store it in a thread-indexed dict.
"""
def do_POST(self):
_add_authinfo(self.client_address)
try:
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
except socket.timeout:
pass
except socket.error, e:
print "Error (%s): socket error - '%s'" % (self.client_address, e)
except:
print "Error while processing POST:"
traceback.print_exc()
_del_authinfo()
class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
"""GlibXMLRPCServer
Use nonblocking sockets and handle the accept via glib rather than
blocking on accept().
"""
def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1):
self.logRequests = logRequests
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
GlibTCPServer.__init__(self, addr, requestHandler)
def _marshaled_dispatch(self, data, dispatch_method = None):
"""Dispatches an XML-RPC method from marshalled (XML) data.
XML-RPC methods are dispatched from the marshalled (XML) data
using the _dispatch method and the result is returned as
marshalled data. For backwards compatibility, a dispatch
function can be provided as an argument (see comment in
SimpleXMLRPCRequestHandler.do_POST) but overriding the
existing method through subclassing is the prefered means
of changing method dispatch behavior.
"""
params, method = xmlrpclib.loads(data)
# generate response
try:
if dispatch_method is not None:
response = dispatch_method(method, params)
else:
response = self._dispatch(method, params)
# wrap response in a singleton tuple
response = (response,)
response = xmlrpclib.dumps(response, methodresponse=1)
except xmlrpclib.Fault, fault:
response = xmlrpclib.dumps(fault)
except:
print "Exception while processing request:"
traceback.print_exc()
# report exception back to server
response = xmlrpclib.dumps(
xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))
)
return response
class GroupServer(object):
_MAX_MSG_SIZE = 500
def __init__(self, address, port, data_cb):
self._address = address
self._port = port
self._data_cb = data_cb
self._setup_listener()
def _setup_listener(self):
# Listener socket
self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Set some options to make it multicast-friendly
self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except:
pass
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20)
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)
def start(self):
# Set some more multicast options
self._listen_sock.bind(('', self._port))
self._listen_sock.settimeout(2)
intf = socket.gethostbyname(socket.gethostname())
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0'))
# Watch the listener socket for data
gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
def _handle_incoming_data(self, source, condition):
if not (condition & gobject.IO_IN):
return True
msg = {}
msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE)
if self._data_cb:
self._data_cb(msg)
return True
class GroupClient(object):
_MAX_MSG_SIZE = 500
def __init__(self, address, port):
self._address = address
self._port = port
self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Make the socket multicast-aware, and set TTL.
self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit
def send_msg(self, data):
self._send_sock.sendto(data, (self._address, self._port))
+92
View File
@@ -0,0 +1,92 @@
# -*- tab-width: 4; indent-tabs-mode: t -*-
import avahi, dbus, dbus.glib
ACTION_SERVICE_NEW = 'new'
ACTION_SERVICE_REMOVED = 'removed'
class PresenceDiscovery(object):
def __init__(self):
self.bus = dbus.SystemBus()
self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER)
self._service_browsers = {}
self._service_type_browsers = {}
self._service_listeners = []
def add_service_listener(self, listener):
self._service_listeners.append(listener)
def start(self):
# Always browse .local
self.browse_domain(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, "local")
db = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.DomainBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, "", avahi.DOMAIN_BROWSER_BROWSE, dbus.UInt32(0))), avahi.DBUS_INTERFACE_DOMAIN_BROWSER)
db.connect_to_signal('ItemNew', self.new_domain)
def _error_handler(self, err):
print "Error resolving: %s" % err
def resolve_service(self, interface, protocol, name, stype, domain, reply_handler, error_handler=None):
if not error_handler:
error_handler = self._error_handler
self.server.ResolveService(int(interface), int(protocol), name, stype, domain, avahi.PROTO_UNSPEC, dbus.UInt32(0), reply_handler=reply_handler, error_handler=error_handler)
def new_service(self, interface, protocol, name, stype, domain, flags):
# print "Found service '%s' (%d) of type '%s' in domain '%s' on %i.%i." % (name, flags, stype, domain, interface, protocol)
for listener in self._service_listeners:
listener(ACTION_SERVICE_NEW, interface, protocol, name, stype, domain, flags)
def remove_service(self, interface, protocol, name, stype, domain, flags):
# print "Service '%s' of type '%s' in domain '%s' on %i.%i disappeared." % (name, stype, domain, interface, protocol)
for listener in self._service_listeners:
listener(ACTION_SERVICE_REMOVED, interface, protocol, name, stype, domain, flags)
def new_service_type(self, interface, protocol, stype, domain, flags):
# Are we already browsing this domain for this type?
if self._service_browsers.has_key((interface, protocol, stype, domain)):
return
# print "Browsing for services of type '%s' in domain '%s' on %i.%i ..." % (stype, domain, interface, protocol)
b = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.ServiceBrowserNew(interface, protocol, stype, domain, dbus.UInt32(0))), avahi.DBUS_INTERFACE_SERVICE_BROWSER)
b.connect_to_signal('ItemNew', self.new_service)
b.connect_to_signal('ItemRemove', self.remove_service)
self._service_browsers[(interface, protocol, stype, domain)] = b
def browse_domain(self, interface, protocol, domain):
# Are we already browsing this domain?
if self._service_type_browsers.has_key((interface, protocol, domain)):
return
# print "Browsing domain '%s' on %i.%i ..." % (domain, interface, protocol)
b = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.ServiceTypeBrowserNew(interface, protocol, domain, dbus.UInt32(0))), avahi.DBUS_INTERFACE_SERVICE_TYPE_BROWSER)
b.connect_to_signal('ItemNew', self.new_service_type)
self._service_type_browsers[(interface, protocol, domain)] = b
def new_domain(self,interface, protocol, domain, flags):
if domain != "local":
return
self.browse_domain(interface, protocol, domain)
class PresenceAnnounce(object):
def __init__(self):
self.bus = dbus.SystemBus()
self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER)
self._hostname = None
def register_service(self, rs_name, rs_port, rs_service, **kwargs):
g = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.EntryGroupNew()), avahi.DBUS_INTERFACE_ENTRY_GROUP)
if rs_name is None:
if self._hostname is None:
self._hostname = "%s:%s" % (self.server.GetHostName(), rs_port)
rs_name = self._hostname
info = ["%s=%s" % (k,v) for k,v in kwargs.items()]
g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, rs_name, rs_service,
"", "", # domain, host (let the system figure it out)
dbus.UInt16(rs_port), info,)
g.Commit()
return g