- Implement async XML-RPC client

master
Dan Williams 18 years ago
parent 8e7a72c9da
commit 6cc5d749f6

@ -283,25 +283,22 @@ class Chat(activity.Activity):
def _insert_buddy(self, buf, nick):
buddy = self._controller.get_group().get_buddy(nick)
icon = buddy.get_icon_pixbuf()
if icon:
aniter = buf.get_end_iter()
buf.insert_pixbuf(aniter, icon)
aniter = buf.get_end_iter()
buf.insert(aniter, nick + ": ")
def _insert_rich_message(self, nick, msg):
msg = Emoticons.get_instance().replace(msg)
buf = self._chat_view.get_buffer()
buf = self._chat_view.get_buffer()
self._insert_buddy(buf, nick)
serializer = richtext.RichTextSerializer()
serializer.deserialize(msg, buf)
aniter = buf.get_end_iter()
buf.insert(aniter, "\n")
@ -442,9 +439,9 @@ class GroupChat(Chat):
def get_group(self):
return self._group
def new_buddy_writer(self, buddy, threaded=False):
def new_buddy_writer(self, buddy):
service = buddy.get_service(CHAT_SERVICE_TYPE)
return self._buddy_stream.new_writer(service, threaded=threaded)
return self._buddy_stream.new_writer(service)
def _start(self):
self._group = LocalGroup()
@ -575,7 +572,7 @@ class GroupChat(Chat):
gobject.timeout_add(1000, self._request_buddy_icon, buddy)
def _request_buddy_icon(self, buddy):
writer = self.new_buddy_writer(buddy, threaded=True)
writer = self.new_buddy_writer(buddy)
icon = writer.custom_request("get_buddy_icon", self._request_buddy_icon_cb, buddy)
def _on_group_service_event(self, action, service):

@ -1,7 +1,6 @@
import xmlrpclib
import socket
import traceback
import threading
import pygtk
pygtk.require('2.0')
@ -38,7 +37,7 @@ class Stream(object):
self._callback(self._group.get_buddy(nick_name), data)
class UnicastStreamWriterBase(object):
class UnicastStreamWriter(object):
def __init__(self, stream, service, owner_nick_name):
# set up the writer
if not service:
@ -48,84 +47,26 @@ class UnicastStreamWriterBase(object):
self._address = self._service.get_address()
self._port = self._service.get_port()
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
self._writer = network.GlibServerProxy(self._xmlrpc_addr)
class UnicastStreamWriter(UnicastStreamWriterBase):
def __init__(self, stream, service, owner_nick_name):
UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr)
def write(self, data):
def write(self, xmlrpc_data):
"""Write some data to the default endpoint of this pipe on the remote server."""
try:
self._writer.message(self._owner_nick_name, data)
self._writer.message(None, None, self._owner_nick_name, xmlrpc_data)
return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
return False
def custom_request(self, method_name, *args):
def custom_request(self, method_name, request_cb, user_data, *args):
"""Call a custom XML-RPC method on the remote server."""
try:
method = getattr(self._writer, method_name)
return method(*args)
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
pass
#traceback.print_exc()
return None
class ThreadedRequest(threading.Thread):
def __init__(self, controller, addr, method, response_cb, user_data, *args):
threading.Thread.__init__(self)
self._controller = controller
self._method = method
self._args = args
self._response_cb = response_cb
self._user_data = user_data
self._writer = xmlrpclib.ServerProxy(addr)
def run(self):
response = None
try:
method = getattr(self._writer, self._method)
response = method(*self._args)
method(request_cb, user_data, *args)
return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
if self._response_cb:
gobject.idle_add(self._response_cb, response, self._user_data)
self._controller.notify_request_done(self)
class ThreadedUnicastStreamWriter(UnicastStreamWriterBase):
def __init__(self, stream, service, owner_nick_name):
self._requests_lock = threading.Lock()
self._requests = []
UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
def _add_request(self, request):
self._requests_lock.acquire()
if not request in self._requests:
self._requests.append(request)
self._requests_lock.release()
def write(self, response_cb, user_data, data):
"""Write some data to the default endpoint of this pipe on the remote server."""
request = ThreadedRequest(self, self._xmlrpc_addr, "message", response_cb,
user_data, self._owner_nick_name, data)
self._add_request(request)
request.start()
def custom_request(self, method_name, response_cb, user_data, *args):
"""Call a custom XML-RPC method on the remote server."""
request = ThreadedRequest(self, self._xmlrpc_addr, method_name, response_cb,
user_data, *args)
self._add_request(request)
request.start()
def notify_request_done(self, request):
self._requests_lock.acquire()
if request in self._requests:
self._requests.remove(request)
self._requests_lock.release()
return False
class UnicastStream(Stream):
@ -159,11 +100,8 @@ class UnicastStream(Stream):
raise ValueError("Handler name 'message' is a reserved handler.")
self._reader.register_function(handler, name)
def new_writer(self, service, threaded=False):
if threaded:
return ThreadedUnicastStreamWriter(self, service, self._owner_nick_name)
else:
return UnicastStreamWriter(self, service, self._owner_nick_name)
def new_writer(self, service):
return UnicastStreamWriter(self, service, self._owner_nick_name)
class MulticastStream(Stream):
@ -183,5 +121,5 @@ class MulticastStream(Stream):
[ nick_name, data ] = data.split(" |**| ", 2)
self.recv(nick_name, data)
def new_writer(self, service=None, threaded=False):
def new_writer(self, service=None):
return self

@ -3,11 +3,13 @@ import threading
import traceback
import xmlrpclib
import sys
import httplib
import gobject
import SimpleXMLRPCServer
import SocketServer
__authinfos = {}
def _add_authinfo(authinfo):
@ -37,6 +39,8 @@ class GlibTCPServer(SocketServer.TCPServer):
gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)
def _handle_accept(self, source, condition):
"""Process incoming data on the server's socket by doing an accept()
via handle_request()."""
if not (condition & gobject.IO_IN):
return True
self.handle_request()
@ -72,7 +76,6 @@ class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher)
def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1):
self.logRequests = logRequests
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
GlibTCPServer.__init__(self, addr, requestHandler)
@ -112,6 +115,141 @@ class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher)
return response
class GlibHTTP(httplib.HTTP):
"""Subclass HTTP so we can return it's connection class' socket."""
def get_sock(self):
return self._conn.sock
class GlibXMLRPCTransport(xmlrpclib.Transport):
"""Integrate the request with the glib mainloop rather than blocking."""
##
# Connect to server.
#
# @param host Target host.
# @return A connection handle.
def make_connection(self, host):
"""Use our own connection object so we can get its socket."""
# create a HTTP connection object from a host descriptor
host, extra_headers, x509 = self.get_host_info(host)
return GlibHTTP(host)
##
# Send a complete request, and parse the response.
#
# @param host Target host.
# @param handler Target PRC handler.
# @param request_body XML-RPC request body.
# @param verbose Debugging flag.
# @return Parsed response.
def start_request(self, host, handler, request_body, verbose=0, request_cb=None, user_data=None):
"""Do the first half of the request by sending data to the remote
server. The bottom half bits get run when the remote server's response
actually comes back."""
# issue XML-RPC request
h = self.make_connection(host)
if verbose:
h.set_debuglevel(1)
self.send_request(h, handler, request_body)
self.send_host(h, host)
self.send_user_agent(h)
self.send_content(h, request_body)
# Schedule a GIOWatch so we don't block waiting for the response
gobject.io_add_watch(h.get_sock(), gobject.IO_IN, self._finish_request,
h, host, handler, verbose, request_cb, user_data)
def _finish_request(self, source, condition, h, host, handler, verbose, request_cb, user_data):
"""Parse and return response when the remote server actually returns it."""
if not (condition & gobject.IO_IN):
return True
errcode, errmsg, headers = h.getreply()
if errcode != 200:
raise xmlrpclib.ProtocolError(host + handler, errcode, errmsg, headers)
self.verbose = verbose
response = self._parse_response(h.getfile(), h.get_sock())
if request_cb:
if len(response) == 1:
response = response[0]
gobject.idle_add(request_cb, response, user_data)
class _Method:
"""Right, so python people thought it would be funny to make this
class private to xmlrpclib.py..."""
# some magic to bind an XML-RPC method to an RPC server.
# supports "nested" methods (e.g. examples.getStateName)
def __init__(self, send, name):
self.__send = send
self.__name = name
def __getattr__(self, name):
return _Method(self.__send, "%s.%s" % (self.__name, name))
def __call__(self, request_cb, user_data, *args):
return self.__send(self.__name, request_cb, user_data, args)
class GlibServerProxy(xmlrpclib.ServerProxy):
"""Subclass xmlrpclib.ServerProxy so we can run the XML-RPC request
in two parts, integrated with the glib mainloop, such that we don't
block anywhere.
Using this object is somewhat special; it requires more arguments to each
XML-RPC request call than the normal xmlrpclib.ServerProxy object:
client = GlibServerProxy("http://127.0.0.1:8888")
user_data = "bar"
xmlrpc_arg1 = "test"
xmlrpc_arg2 = "foo"
client.test(xmlrpc_test_cb, user_data, xmlrpc_arg1, xmlrpc_arg2)
Here, 'xmlrpc_test_cb' is the callback function, which has the following
signature:
def xmlrpc_test_cb(response, user_data=None):
...
"""
def __init__(self, uri, encoding=None, verbose=0, allow_none=0):
self._transport = GlibXMLRPCTransport()
self._encoding = encoding
self._verbose = verbose
self._allow_none = allow_none
xmlrpclib.ServerProxy.__init__(self, uri, self._transport, encoding, verbose, allow_none)
# get the url
import urllib
type, uri = urllib.splittype(uri)
if type not in ("http", "https"):
raise IOError, "unsupported XML-RPC protocol"
self._host, self._handler = urllib.splithost(uri)
if not self._handler:
self._handler = "/RPC2"
def __request(self, methodname, request_cb, user_data, params):
"""Call the method on the remote server. We just start the request here
and the transport itself takes care of scheduling the response callback
when the remote server returns the response. We don't want to block anywhere."""
request = xmlrpclib.dumps(params, methodname, encoding=self._encoding,
allow_none=self._allow_none)
response = self._transport.start_request(
self._host,
self._handler,
request,
verbose=self._verbose,
request_cb=request_cb,
user_data=user_data
)
def __getattr__(self, name):
# magic method dispatcher
return _Method(self.__request, name)
class GroupServer(object):
_MAX_MSG_SIZE = 500
@ -166,3 +304,38 @@ class GroupClient(object):
def send_msg(self, data):
self._send_sock.sendto(data, (self._address, self._port))
class Test(object):
def test(self, arg1):
print "Request got %s" % arg1
return "success"
def xmlrpc_test_cb(response, user_data=None):
print "Response was %s, user_data was %s" % (response, user_data)
import gtk
gtk.main_quit()
def xmlrpc_test():
client = GlibServerProxy("http://127.0.0.1:8888")
client.test(xmlrpc_test_cb, "bar", "test data")
def main():
import gtk, gobject
server = GlibXMLRPCServer(("", 8888))
inst = Test()
server.register_instance(inst)
gobject.idle_add(xmlrpc_test)
try:
gtk.main()
except KeyboardInterrupt:
pass
print "Done."
if __name__ == "__main__":
main()

Loading…
Cancel
Save