169 lines
5.0 KiB
Python
169 lines
5.0 KiB
Python
|
import socket
|
||
|
import threading
|
||
|
import traceback
|
||
|
import xmlrpclib
|
||
|
import sys
|
||
|
|
||
|
import gobject
|
||
|
import SimpleXMLRPCServer
|
||
|
import SocketServer
|
||
|
|
||
|
__authinfos = {}
|
||
|
|
||
|
def _add_authinfo(authinfo):
|
||
|
__authinfos[threading.currentThread()] = authinfo
|
||
|
|
||
|
def get_authinfo():
|
||
|
return __authinfos.get(threading.currentThread())
|
||
|
|
||
|
def _del_authinfo():
|
||
|
del __authinfos[threading.currentThread()]
|
||
|
|
||
|
|
||
|
class GlibTCPServer(SocketServer.TCPServer):
|
||
|
"""GlibTCPServer
|
||
|
|
||
|
Integrate socket accept into glib mainloop.
|
||
|
"""
|
||
|
|
||
|
allow_reuse_address = True
|
||
|
request_queue_size = 20
|
||
|
|
||
|
def __init__(self, server_address, RequestHandlerClass):
|
||
|
SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
|
||
|
self.socket.setblocking(0) # Set nonblocking
|
||
|
|
||
|
# Watch the listener socket for data
|
||
|
gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)
|
||
|
|
||
|
def _handle_accept(self, source, condition):
|
||
|
if not (condition & gobject.IO_IN):
|
||
|
return True
|
||
|
self.handle_request()
|
||
|
return True
|
||
|
|
||
|
class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
|
||
|
""" GlibXMLRPCRequestHandler
|
||
|
|
||
|
The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass
|
||
|
the client's address and/or SSL certificate into the function that actually
|
||
|
_processes_ the request. So we have to store it in a thread-indexed dict.
|
||
|
"""
|
||
|
|
||
|
def do_POST(self):
|
||
|
_add_authinfo(self.client_address)
|
||
|
try:
|
||
|
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
|
||
|
except socket.timeout:
|
||
|
pass
|
||
|
except socket.error, e:
|
||
|
print "Error (%s): socket error - '%s'" % (self.client_address, e)
|
||
|
except:
|
||
|
print "Error while processing POST:"
|
||
|
traceback.print_exc()
|
||
|
_del_authinfo()
|
||
|
|
||
|
class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
|
||
|
"""GlibXMLRPCServer
|
||
|
|
||
|
Use nonblocking sockets and handle the accept via glib rather than
|
||
|
blocking on accept().
|
||
|
"""
|
||
|
|
||
|
def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1):
|
||
|
self.logRequests = logRequests
|
||
|
|
||
|
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
|
||
|
GlibTCPServer.__init__(self, addr, requestHandler)
|
||
|
|
||
|
def _marshaled_dispatch(self, data, dispatch_method = None):
|
||
|
"""Dispatches an XML-RPC method from marshalled (XML) data.
|
||
|
|
||
|
XML-RPC methods are dispatched from the marshalled (XML) data
|
||
|
using the _dispatch method and the result is returned as
|
||
|
marshalled data. For backwards compatibility, a dispatch
|
||
|
function can be provided as an argument (see comment in
|
||
|
SimpleXMLRPCRequestHandler.do_POST) but overriding the
|
||
|
existing method through subclassing is the prefered means
|
||
|
of changing method dispatch behavior.
|
||
|
"""
|
||
|
|
||
|
params, method = xmlrpclib.loads(data)
|
||
|
|
||
|
# generate response
|
||
|
try:
|
||
|
if dispatch_method is not None:
|
||
|
response = dispatch_method(method, params)
|
||
|
else:
|
||
|
response = self._dispatch(method, params)
|
||
|
# wrap response in a singleton tuple
|
||
|
response = (response,)
|
||
|
response = xmlrpclib.dumps(response, methodresponse=1)
|
||
|
except xmlrpclib.Fault, fault:
|
||
|
response = xmlrpclib.dumps(fault)
|
||
|
except:
|
||
|
print "Exception while processing request:"
|
||
|
traceback.print_exc()
|
||
|
|
||
|
# report exception back to server
|
||
|
response = xmlrpclib.dumps(
|
||
|
xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))
|
||
|
)
|
||
|
|
||
|
return response
|
||
|
|
||
|
class GroupServer(object):
|
||
|
|
||
|
_MAX_MSG_SIZE = 500
|
||
|
|
||
|
def __init__(self, address, port, data_cb):
|
||
|
self._address = address
|
||
|
self._port = port
|
||
|
self._data_cb = data_cb
|
||
|
|
||
|
self._setup_listener()
|
||
|
|
||
|
def _setup_listener(self):
|
||
|
# Listener socket
|
||
|
self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
|
||
|
# Set some options to make it multicast-friendly
|
||
|
self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
|
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20)
|
||
|
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)
|
||
|
|
||
|
def start(self):
|
||
|
# Set some more multicast options
|
||
|
self._listen_sock.bind(('', self._port))
|
||
|
self._listen_sock.settimeout(2)
|
||
|
intf = socket.gethostbyname(socket.gethostname())
|
||
|
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
|
||
|
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0'))
|
||
|
|
||
|
# Watch the listener socket for data
|
||
|
gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
|
||
|
|
||
|
def _handle_incoming_data(self, source, condition):
|
||
|
if not (condition & gobject.IO_IN):
|
||
|
return True
|
||
|
msg = {}
|
||
|
msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE)
|
||
|
if self._data_cb:
|
||
|
self._data_cb(msg)
|
||
|
return True
|
||
|
|
||
|
class GroupClient(object):
|
||
|
|
||
|
_MAX_MSG_SIZE = 500
|
||
|
|
||
|
def __init__(self, address, port):
|
||
|
self._address = address
|
||
|
self._port = port
|
||
|
|
||
|
self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
# Make the socket multicast-aware, and set TTL.
|
||
|
self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit
|
||
|
|
||
|
def send_msg(self, data):
|
||
|
self._send_sock.sendto(data, (self._address, self._port))
|