169 lines
		
	
	
		
			5.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			169 lines
		
	
	
		
			5.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import socket
 | |
| import threading
 | |
| import traceback
 | |
| import xmlrpclib
 | |
| import sys
 | |
| 
 | |
| import gobject
 | |
| import SimpleXMLRPCServer
 | |
| import SocketServer
 | |
| 
 | |
| __authinfos = {}
 | |
| 
 | |
| def _add_authinfo(authinfo):
 | |
| 	__authinfos[threading.currentThread()] = authinfo
 | |
| 
 | |
| def get_authinfo():
 | |
| 	return __authinfos.get(threading.currentThread())
 | |
| 
 | |
| def _del_authinfo():
 | |
| 	del __authinfos[threading.currentThread()]
 | |
| 
 | |
| 
 | |
| class GlibTCPServer(SocketServer.TCPServer):
 | |
| 	"""GlibTCPServer
 | |
| 
 | |
| 	Integrate socket accept into glib mainloop.
 | |
| 	"""
 | |
| 
 | |
| 	allow_reuse_address = True
 | |
| 	request_queue_size = 20
 | |
| 
 | |
| 	def __init__(self, server_address, RequestHandlerClass):
 | |
| 		SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
 | |
| 		self.socket.setblocking(0)  # Set nonblocking
 | |
| 
 | |
| 		# Watch the listener socket for data
 | |
| 		gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)
 | |
| 
 | |
| 	def _handle_accept(self, source, condition):
 | |
| 		if not (condition & gobject.IO_IN):
 | |
| 			return True
 | |
| 		self.handle_request()
 | |
| 		return True
 | |
| 
 | |
| class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
 | |
| 	""" GlibXMLRPCRequestHandler
 | |
| 	
 | |
| 	The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass
 | |
| 	the client's address and/or SSL certificate into the function that actually
 | |
| 	_processes_ the request.  So we have to store it in a thread-indexed dict.
 | |
| 	"""
 | |
| 
 | |
| 	def do_POST(self):
 | |
| 		_add_authinfo(self.client_address)
 | |
| 		try:
 | |
| 			SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
 | |
| 		except socket.timeout:
 | |
| 			pass
 | |
| 		except socket.error, e:
 | |
| 			print "Error (%s): socket error - '%s'" % (self.client_address, e)
 | |
| 		except:
 | |
| 			print "Error while processing POST:"
 | |
| 			traceback.print_exc()
 | |
| 		_del_authinfo()
 | |
| 
 | |
| class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
 | |
| 	"""GlibXMLRPCServer
 | |
| 	
 | |
| 	Use nonblocking sockets and handle the accept via glib rather than
 | |
| 	blocking on accept().
 | |
| 	"""
 | |
| 
 | |
| 	def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1):
 | |
| 		self.logRequests = logRequests
 | |
| 
 | |
| 		SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
 | |
| 		GlibTCPServer.__init__(self, addr, requestHandler)
 | |
| 
 | |
| 	def _marshaled_dispatch(self, data, dispatch_method = None):
 | |
| 		"""Dispatches an XML-RPC method from marshalled (XML) data.
 | |
| 
 | |
| 		XML-RPC methods are dispatched from the marshalled (XML) data
 | |
| 		using the _dispatch method and the result is returned as
 | |
| 		marshalled data. For backwards compatibility, a dispatch
 | |
| 		function can be provided as an argument (see comment in
 | |
| 		SimpleXMLRPCRequestHandler.do_POST) but overriding the
 | |
| 		existing method through subclassing is the prefered means
 | |
| 		of changing method dispatch behavior.
 | |
| 		"""
 | |
| 
 | |
| 		params, method = xmlrpclib.loads(data)
 | |
| 
 | |
| 		# generate response
 | |
| 		try:
 | |
| 			if dispatch_method is not None:
 | |
| 				response = dispatch_method(method, params)
 | |
| 			else:
 | |
| 				response = self._dispatch(method, params)
 | |
| 			# wrap response in a singleton tuple
 | |
| 			response = (response,)
 | |
| 			response = xmlrpclib.dumps(response, methodresponse=1)
 | |
| 		except xmlrpclib.Fault, fault:
 | |
| 			response = xmlrpclib.dumps(fault)
 | |
| 		except:
 | |
| 			print "Exception while processing request:"
 | |
| 			traceback.print_exc()
 | |
| 
 | |
| 			# report exception back to server
 | |
| 			response = xmlrpclib.dumps(
 | |
| 				xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))
 | |
| 				)
 | |
| 
 | |
| 		return response
 | |
| 
 | |
| class GroupServer(object):
 | |
| 
 | |
| 	_MAX_MSG_SIZE = 500
 | |
| 
 | |
| 	def __init__(self, address, port, data_cb):
 | |
| 		self._address = address
 | |
| 		self._port = port
 | |
| 		self._data_cb = data_cb
 | |
| 
 | |
| 		self._setup_listener()
 | |
| 
 | |
| 	def _setup_listener(self):
 | |
| 		# Listener socket
 | |
| 		self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | |
| 
 | |
| 		# Set some options to make it multicast-friendly
 | |
| 		self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | |
| 		self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20)
 | |
| 		self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)
 | |
| 
 | |
| 	def start(self):
 | |
| 		# Set some more multicast options
 | |
| 		self._listen_sock.bind(('', self._port))
 | |
| 		self._listen_sock.settimeout(2)
 | |
| 		intf = socket.gethostbyname(socket.gethostname())
 | |
| 		self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
 | |
| 		self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0'))
 | |
| 
 | |
| 		# Watch the listener socket for data
 | |
| 		gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
 | |
| 
 | |
| 	def _handle_incoming_data(self, source, condition):
 | |
| 		if not (condition & gobject.IO_IN):
 | |
| 			return True
 | |
| 		msg = {}
 | |
| 		msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE)
 | |
| 		if self._data_cb:
 | |
| 			self._data_cb(msg)
 | |
| 		return True
 | |
| 
 | |
| class GroupClient(object):
 | |
| 
 | |
| 	_MAX_MSG_SIZE = 500
 | |
| 
 | |
| 	def __init__(self, address, port):
 | |
| 		self._address = address
 | |
| 		self._port = port
 | |
| 
 | |
| 		self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | |
| 		# Make the socket multicast-aware, and set TTL.
 | |
| 		self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit
 | |
| 
 | |
| 	def send_msg(self, data):
 | |
| 		self._send_sock.sendto(data, (self._address, self._port))
 | 
