From 6cc5d749f61f8e68de946678a9c5981700a3af49 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 23 May 2006 00:15:14 -0400 Subject: [PATCH] - Implement async XML-RPC client --- sugar/chat/chat.py | 13 ++-- sugar/p2p/Stream.py | 84 +++------------------ sugar/p2p/network.py | 175 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 190 insertions(+), 82 deletions(-) diff --git a/sugar/chat/chat.py b/sugar/chat/chat.py index c9e642e8..43cd21cc 100755 --- a/sugar/chat/chat.py +++ b/sugar/chat/chat.py @@ -283,25 +283,22 @@ class Chat(activity.Activity): def _insert_buddy(self, buf, nick): buddy = self._controller.get_group().get_buddy(nick) - icon = buddy.get_icon_pixbuf() if icon: aniter = buf.get_end_iter() buf.insert_pixbuf(aniter, icon) - + aniter = buf.get_end_iter() buf.insert(aniter, nick + ": ") def _insert_rich_message(self, nick, msg): msg = Emoticons.get_instance().replace(msg) - - buf = self._chat_view.get_buffer() + buf = self._chat_view.get_buffer() self._insert_buddy(buf, nick) serializer = richtext.RichTextSerializer() serializer.deserialize(msg, buf) - aniter = buf.get_end_iter() buf.insert(aniter, "\n") @@ -442,9 +439,9 @@ class GroupChat(Chat): def get_group(self): return self._group - def new_buddy_writer(self, buddy, threaded=False): + def new_buddy_writer(self, buddy): service = buddy.get_service(CHAT_SERVICE_TYPE) - return self._buddy_stream.new_writer(service, threaded=threaded) + return self._buddy_stream.new_writer(service) def _start(self): self._group = LocalGroup() @@ -575,7 +572,7 @@ class GroupChat(Chat): gobject.timeout_add(1000, self._request_buddy_icon, buddy) def _request_buddy_icon(self, buddy): - writer = self.new_buddy_writer(buddy, threaded=True) + writer = self.new_buddy_writer(buddy) icon = writer.custom_request("get_buddy_icon", self._request_buddy_icon_cb, buddy) def _on_group_service_event(self, action, service): diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py index ef95d602..7f00c575 100644 --- a/sugar/p2p/Stream.py +++ b/sugar/p2p/Stream.py @@ -1,7 +1,6 @@ import xmlrpclib import socket import traceback -import threading import pygtk pygtk.require('2.0') @@ -38,7 +37,7 @@ class Stream(object): self._callback(self._group.get_buddy(nick_name), data) -class UnicastStreamWriterBase(object): +class UnicastStreamWriter(object): def __init__(self, stream, service, owner_nick_name): # set up the writer if not service: @@ -48,84 +47,26 @@ class UnicastStreamWriterBase(object): self._address = self._service.get_address() self._port = self._service.get_port() self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port) + self._writer = network.GlibServerProxy(self._xmlrpc_addr) -class UnicastStreamWriter(UnicastStreamWriterBase): - def __init__(self, stream, service, owner_nick_name): - UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name) - self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr) - - def write(self, data): + def write(self, xmlrpc_data): """Write some data to the default endpoint of this pipe on the remote server.""" try: - self._writer.message(self._owner_nick_name, data) + self._writer.message(None, None, self._owner_nick_name, xmlrpc_data) return True except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): traceback.print_exc() return False - def custom_request(self, method_name, *args): + def custom_request(self, method_name, request_cb, user_data, *args): """Call a custom XML-RPC method on the remote server.""" try: method = getattr(self._writer, method_name) - return method(*args) - except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): - pass - #traceback.print_exc() - return None - - -class ThreadedRequest(threading.Thread): - def __init__(self, controller, addr, method, response_cb, user_data, *args): - threading.Thread.__init__(self) - self._controller = controller - self._method = method - self._args = args - self._response_cb = response_cb - self._user_data = user_data - self._writer = xmlrpclib.ServerProxy(addr) - - def run(self): - response = None - try: - method = getattr(self._writer, self._method) - response = method(*self._args) + method(request_cb, user_data, *args) + return True except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): traceback.print_exc() - if self._response_cb: - gobject.idle_add(self._response_cb, response, self._user_data) - self._controller.notify_request_done(self) - -class ThreadedUnicastStreamWriter(UnicastStreamWriterBase): - def __init__(self, stream, service, owner_nick_name): - self._requests_lock = threading.Lock() - self._requests = [] - UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name) - - def _add_request(self, request): - self._requests_lock.acquire() - if not request in self._requests: - self._requests.append(request) - self._requests_lock.release() - - def write(self, response_cb, user_data, data): - """Write some data to the default endpoint of this pipe on the remote server.""" - request = ThreadedRequest(self, self._xmlrpc_addr, "message", response_cb, - user_data, self._owner_nick_name, data) - self._add_request(request) - request.start() - - def custom_request(self, method_name, response_cb, user_data, *args): - """Call a custom XML-RPC method on the remote server.""" - request = ThreadedRequest(self, self._xmlrpc_addr, method_name, response_cb, - user_data, *args) - self._add_request(request) - request.start() - - def notify_request_done(self, request): - self._requests_lock.acquire() - if request in self._requests: - self._requests.remove(request) - self._requests_lock.release() + return False class UnicastStream(Stream): @@ -159,11 +100,8 @@ class UnicastStream(Stream): raise ValueError("Handler name 'message' is a reserved handler.") self._reader.register_function(handler, name) - def new_writer(self, service, threaded=False): - if threaded: - return ThreadedUnicastStreamWriter(self, service, self._owner_nick_name) - else: - return UnicastStreamWriter(self, service, self._owner_nick_name) + def new_writer(self, service): + return UnicastStreamWriter(self, service, self._owner_nick_name) class MulticastStream(Stream): @@ -183,5 +121,5 @@ class MulticastStream(Stream): [ nick_name, data ] = data.split(" |**| ", 2) self.recv(nick_name, data) - def new_writer(self, service=None, threaded=False): + def new_writer(self, service=None): return self diff --git a/sugar/p2p/network.py b/sugar/p2p/network.py index 4c054fea..684d114e 100644 --- a/sugar/p2p/network.py +++ b/sugar/p2p/network.py @@ -3,11 +3,13 @@ import threading import traceback import xmlrpclib import sys +import httplib import gobject import SimpleXMLRPCServer import SocketServer + __authinfos = {} def _add_authinfo(authinfo): @@ -37,6 +39,8 @@ class GlibTCPServer(SocketServer.TCPServer): gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept) def _handle_accept(self, source, condition): + """Process incoming data on the server's socket by doing an accept() + via handle_request().""" if not (condition & gobject.IO_IN): return True self.handle_request() @@ -72,7 +76,6 @@ class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher) def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1): self.logRequests = logRequests - SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self) GlibTCPServer.__init__(self, addr, requestHandler) @@ -112,6 +115,141 @@ class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher) return response + +class GlibHTTP(httplib.HTTP): + """Subclass HTTP so we can return it's connection class' socket.""" + def get_sock(self): + return self._conn.sock + +class GlibXMLRPCTransport(xmlrpclib.Transport): + """Integrate the request with the glib mainloop rather than blocking.""" + ## + # Connect to server. + # + # @param host Target host. + # @return A connection handle. + + def make_connection(self, host): + """Use our own connection object so we can get its socket.""" + # create a HTTP connection object from a host descriptor + host, extra_headers, x509 = self.get_host_info(host) + return GlibHTTP(host) + + ## + # Send a complete request, and parse the response. + # + # @param host Target host. + # @param handler Target PRC handler. + # @param request_body XML-RPC request body. + # @param verbose Debugging flag. + # @return Parsed response. + + def start_request(self, host, handler, request_body, verbose=0, request_cb=None, user_data=None): + """Do the first half of the request by sending data to the remote + server. The bottom half bits get run when the remote server's response + actually comes back.""" + # issue XML-RPC request + + h = self.make_connection(host) + if verbose: + h.set_debuglevel(1) + + self.send_request(h, handler, request_body) + self.send_host(h, host) + self.send_user_agent(h) + self.send_content(h, request_body) + + # Schedule a GIOWatch so we don't block waiting for the response + gobject.io_add_watch(h.get_sock(), gobject.IO_IN, self._finish_request, + h, host, handler, verbose, request_cb, user_data) + + def _finish_request(self, source, condition, h, host, handler, verbose, request_cb, user_data): + """Parse and return response when the remote server actually returns it.""" + if not (condition & gobject.IO_IN): + return True + + errcode, errmsg, headers = h.getreply() + if errcode != 200: + raise xmlrpclib.ProtocolError(host + handler, errcode, errmsg, headers) + self.verbose = verbose + response = self._parse_response(h.getfile(), h.get_sock()) + if request_cb: + if len(response) == 1: + response = response[0] + gobject.idle_add(request_cb, response, user_data) + +class _Method: + """Right, so python people thought it would be funny to make this + class private to xmlrpclib.py...""" + # some magic to bind an XML-RPC method to an RPC server. + # supports "nested" methods (e.g. examples.getStateName) + def __init__(self, send, name): + self.__send = send + self.__name = name + def __getattr__(self, name): + return _Method(self.__send, "%s.%s" % (self.__name, name)) + def __call__(self, request_cb, user_data, *args): + return self.__send(self.__name, request_cb, user_data, args) + + +class GlibServerProxy(xmlrpclib.ServerProxy): + """Subclass xmlrpclib.ServerProxy so we can run the XML-RPC request + in two parts, integrated with the glib mainloop, such that we don't + block anywhere. + + Using this object is somewhat special; it requires more arguments to each + XML-RPC request call than the normal xmlrpclib.ServerProxy object: + + client = GlibServerProxy("http://127.0.0.1:8888") + user_data = "bar" + xmlrpc_arg1 = "test" + xmlrpc_arg2 = "foo" + client.test(xmlrpc_test_cb, user_data, xmlrpc_arg1, xmlrpc_arg2) + + Here, 'xmlrpc_test_cb' is the callback function, which has the following + signature: + + def xmlrpc_test_cb(response, user_data=None): + ... + """ + def __init__(self, uri, encoding=None, verbose=0, allow_none=0): + self._transport = GlibXMLRPCTransport() + self._encoding = encoding + self._verbose = verbose + self._allow_none = allow_none + xmlrpclib.ServerProxy.__init__(self, uri, self._transport, encoding, verbose, allow_none) + + # get the url + import urllib + type, uri = urllib.splittype(uri) + if type not in ("http", "https"): + raise IOError, "unsupported XML-RPC protocol" + self._host, self._handler = urllib.splithost(uri) + if not self._handler: + self._handler = "/RPC2" + + def __request(self, methodname, request_cb, user_data, params): + """Call the method on the remote server. We just start the request here + and the transport itself takes care of scheduling the response callback + when the remote server returns the response. We don't want to block anywhere.""" + + request = xmlrpclib.dumps(params, methodname, encoding=self._encoding, + allow_none=self._allow_none) + + response = self._transport.start_request( + self._host, + self._handler, + request, + verbose=self._verbose, + request_cb=request_cb, + user_data=user_data + ) + + def __getattr__(self, name): + # magic method dispatcher + return _Method(self.__request, name) + + class GroupServer(object): _MAX_MSG_SIZE = 500 @@ -166,3 +304,38 @@ class GroupClient(object): def send_msg(self, data): self._send_sock.sendto(data, (self._address, self._port)) + + + +class Test(object): + def test(self, arg1): + print "Request got %s" % arg1 + return "success" + +def xmlrpc_test_cb(response, user_data=None): + print "Response was %s, user_data was %s" % (response, user_data) + import gtk + gtk.main_quit() + + +def xmlrpc_test(): + client = GlibServerProxy("http://127.0.0.1:8888") + client.test(xmlrpc_test_cb, "bar", "test data") + + +def main(): + import gtk, gobject + server = GlibXMLRPCServer(("", 8888)) + inst = Test() + server.register_instance(inst) + + gobject.idle_add(xmlrpc_test) + + try: + gtk.main() + except KeyboardInterrupt: + pass + print "Done." + +if __name__ == "__main__": + main()