Make XMLRPC proxy object callback semantics saner

This commit is contained in:
Dan Williams 2007-05-02 23:58:14 -04:00
parent 7774073276
commit f0205fde5c

View File

@ -33,9 +33,6 @@ import SimpleHTTPServer
import SocketServer
RESULT_FAILED = 0
RESULT_SUCCESS = 1
__authinfos = {}
def _add_authinfo(authinfo):
@ -187,17 +184,14 @@ class GlibURLDownloader(gobject.GObject):
def start(self):
self._info = urllib.urlopen(self._url)
self._fname = _get_filename_from_headers(info.headers)
if not self._fname:
import tempfile
garbage, path = urllib.splittype(url)
garbage, path = urllib.splithost(path or "")
path, garbage = urllib.splitquery(path or "")
path, garbage = urllib.splitattr(path or "")
suffix = os.path.splitext(path)[1]
(outf, self._fname) = tempfile.mkstemp(suffix=suffix, dir=self._destdir)
else:
outf = open(os.path.join(self._destdir, self._fname), "w")
self._suggested_fname = _get_filename_from_headers(info.headers)
import tempfile
garbage, path = urllib.splittype(url)
garbage, path = urllib.splithost(path or "")
path, garbage = urllib.splitquery(path or "")
path, garbage = urllib.splitattr(path or "")
suffix = os.path.splitext(path)[1]
(outf, self._fname) = tempfile.mkstemp(suffix=suffix, dir=self._destdir)
fcntl.fcntl(self._info.fp.fileno(), fcntl.F_SETFD, os.O_NDELAY)
self._srcid = gobject.io_add_watch(self._info.fp.fileno(), gobject.IO_IN, self._read_next_chunk)
@ -221,14 +215,15 @@ class GlibURLDownloader(gobject.GObject):
def _read_next_chunk(self, source, condition):
if not (condition & gobject.IO_IN):
self.cleanup()
self.emit("finished", None)
os.remove(self._fname)
self.emit("finished", None, None)
return False
data = info.fp.read(self.CHUNK_SIZE)
count = outf.write(data)
if len(data) < self.CHUNK_SIZE:
self.cleanup()
self.emit("finished", self._fname)
self.emit("finished", self._fname, self._suggested_fname)
return False
return True
@ -319,8 +314,6 @@ class GlibHTTP(httplib.HTTP):
def connect(self, host=None, port=None):
httplib.HTTP.connect(self, host, port)
self._conn.sock.setblocking(0)
def get_sock(self):
return self._conn.sock
class GlibXMLRPCTransport(xmlrpclib.Transport):
"""Integrate the request with the glib mainloop rather than blocking."""
@ -349,7 +342,7 @@ class GlibXMLRPCTransport(xmlrpclib.Transport):
# @param verbose Debugging flag.
# @return Parsed response.
def start_request(self, host, handler, request_body, verbose=0, request_cb=None, user_data=None):
def start_request(self, host, handler, request_body, verbose=0, reply_handler=None, error_handler=None, user_data=None):
"""Do the first half of the request by sending data to the remote
server. The bottom half bits get run when the remote server's response
actually comes back."""
@ -365,10 +358,10 @@ class GlibXMLRPCTransport(xmlrpclib.Transport):
self.send_content(h, request_body)
# Schedule a GIOWatch so we don't block waiting for the response
gobject.io_add_watch(h.get_sock(), gobject.IO_IN, self._finish_request,
h, host, handler, verbose, request_cb, user_data)
gobject.io_add_watch(h._conn.sock, gobject.IO_IN, self._finish_request,
h, host, handler, verbose, reply_handler, error_handler, user_data)
def _finish_request(self, source, condition, h, host, handler, verbose, request_cb, user_data):
def _finish_request(self, source, condition, h, host, handler, verbose, reply_handler=None, error_handler=None, user_data=None):
"""Parse and return response when the remote server actually returns it."""
if not (condition & gobject.IO_IN):
return True
@ -379,17 +372,18 @@ class GlibXMLRPCTransport(xmlrpclib.Transport):
if err[0] != 104:
raise socket.error(err)
else:
gobject.idle_add(request_cb, RESULT_FAILED, None, user_data)
if error_handler:
gobject.idle_add(error_handler, err, user_data)
return False
if errcode != 200:
raise xmlrpclib.ProtocolError(host + handler, errcode, errmsg, headers)
self.verbose = verbose
response = self._parse_response(h.getfile(), h.get_sock())
if request_cb:
if len(response) == 1:
response = response[0]
gobject.idle_add(request_cb, RESULT_SUCCESS, response, user_data)
response = self._parse_response(h.getfile(), h._conn.sock)
if reply_handler:
response = response[0]
response.append(user_data)
gobject.idle_add(reply_handler, *response)
return False
class _Method:
@ -402,8 +396,8 @@ class _Method:
self.__name = name
def __getattr__(self, name):
return _Method(self.__send, "%s.%s" % (self.__name, name))
def __call__(self, request_cb, user_data, *args):
return self.__send(self.__name, request_cb, user_data, args)
def __call__(self, *args, **kwargs):
return self.__send(self.__name, *args, **kwargs)
class GlibServerProxy(xmlrpclib.ServerProxy):
@ -442,25 +436,30 @@ class GlibServerProxy(xmlrpclib.ServerProxy):
if not self._handler:
self._handler = "/RPC2"
def __request(self, methodname, request_cb, user_data, params):
def __request(self, methodname, *args, **kwargs):
"""Call the method on the remote server. We just start the request here
and the transport itself takes care of scheduling the response callback
when the remote server returns the response. We don't want to block anywhere."""
request = xmlrpclib.dumps(params, methodname, encoding=self._encoding,
request = xmlrpclib.dumps(args, methodname, encoding=self._encoding,
allow_none=self._allow_none)
reply_hdl = kwargs.get("reply_handler")
err_hdl = kwargs.get("error_handler")
udata = kwargs.get("user_data")
try:
response = self._transport.start_request(
self._host,
self._handler,
request,
verbose=self._verbose,
request_cb=request_cb,
user_data=user_data
reply_handler=reply_hdl,
error_handler=err_hdl,
user_data=udata
)
except socket.error, exc:
gobject.idle_add(request_cb, RESULT_FAILED, None, user_data)
if err_hdl:
gobject.idle_add(err_hdl, exc, udata)
def __getattr__(self, name):
# magic method dispatcher
@ -523,33 +522,34 @@ class GroupClient(object):
self._send_sock.sendto(data, (self._address, self._port))
class Test(object):
def test(self, arg1):
print "Request got %s" % arg1
return "success"
def test(self, arg1, arg2):
print "Request got %s, %s" % (arg1, arg2)
return "success", "bork"
def xmlrpc_test_cb(response, user_data=None):
print "Response was %s, user_data was %s" % (response, user_data)
import gtk
gtk.main_quit()
def xmlrpc_success_cb(response, resp2, loop):
print "Response was %s %s" % (response, resp2)
loop.quit()
def xmlrpc_error_cb(err, loop):
print "Error: %s" % err
loop.quit()
def xmlrpc_test():
def xmlrpc_test(loop):
client = GlibServerProxy("http://127.0.0.1:8888")
client.test(xmlrpc_test_cb, "bar", "test data")
client.test("bar", "baz",
reply_handler=xmlrpc_success_cb,
error_handler=xmlrpc_error_cb,
user_data=loop)
def main():
import gtk
loop = gobject.MainLoop()
server = GlibXMLRPCServer(("", 8888))
inst = Test()
server.register_instance(inst)
gobject.idle_add(xmlrpc_test)
gobject.idle_add(xmlrpc_test, loop)
try:
gtk.main()
loop.run()
except KeyboardInterrupt:
print 'Ctrl+C pressed, exiting...'
print "Done."