From 26e82ba2502030f230c3ad8c6bf71b2f6956d51b Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 24 Apr 2006 13:08:18 -0400 Subject: [PATCH] Initial implementation of p2p IPv4 TCP chats --- chat/BuddyList.py | 4 +-- chat/chat.py | 72 +++++++++++++++++++++++++++++++++++++++-------- chat/network.py | 71 +++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 133 insertions(+), 14 deletions(-) diff --git a/chat/BuddyList.py b/chat/BuddyList.py index edd5f79b..fa3afb0b 100644 --- a/chat/BuddyList.py +++ b/chat/BuddyList.py @@ -14,8 +14,8 @@ class Buddy(object): self._servicename = servicename self._key = key self._host = host - self._address = address - self._port = port + self._address = str(address) + self._port = int(port) self._chat = None def set_chat(self, chat): diff --git a/chat/chat.py b/chat/chat.py index 038d35a4..840ba9cc 100755 --- a/chat/chat.py +++ b/chat/chat.py @@ -13,6 +13,7 @@ import sys import os import pwd import gc +import socket sys.path.append(os.getcwd()) sys.path.append('../shell/example-activity/') @@ -22,13 +23,19 @@ import presence import BuddyList import network import richtext +import xmlrpclib class Chat(object): - def __init__(self, view, label): + def __init__(self, parent, view, label): + self._parent = parent self._buffer = richtext.RichTextBuffer() self._view = view self._label = label + def error_message(self, msg): + aniter = self._buffer.get_end_iter() + self._buffer.insert(aniter, "Error: %s\n" % msg) + def activate(self, label): self._view.set_buffer(self._buffer) self._label.set_text(label) @@ -43,13 +50,36 @@ class Chat(object): aniter = self._buffer.get_end_iter() self._buffer.insert(aniter, "\n") +class BuddyChat(Chat): + def __init__(self, parent, buddy, view, label): + self._buddy = buddy + Chat.__init__(self, parent, view, label) + + def activate(self): + Chat.activate(self, self._buddy.nick()) + + def recv_message(self, msg): + Chat.recv_message(self, self._buddy, msg) + + def send_message(self, text): + if len(text) <= 0: + return + addr = "http://%s:%d" % (self._buddy.address(), self._buddy.port()) + peer = xmlrpclib.ServerProxy(addr) + msg = None + success = True + try: + peer.message(text) + except socket.error, e: + msg = str(e) + success = False + return (success, msg) + class GroupChat(Chat): def __init__(self, parent, view, label): - Chat.__init__(self, view, label) - self._parent = parent + Chat.__init__(self, parent, view, label) self._gc_controller = network.GroupChatController('224.0.0.221', 6666, self._recv_group_message) self._gc_controller.start() - self._label_prefix = "Cha" def activate(self): Chat.activate(self, "Group Chat") @@ -57,12 +87,24 @@ class GroupChat(Chat): def send_message(self, text): if len(text) > 0: self._gc_controller.send_msg(text) + return (True, None) def _recv_group_message(self, msg): buddy = self._parent.find_buddy_by_address(msg['addr']) if buddy: self.recv_message(buddy, msg['data']) + +class ChatRequestHandler(object): + def __init__(self, parent): + self._parent = parent + + def message(self, message): + client_address = network.get_authinfo() + buddy = self._parent.find_buddy_by_address(client_address[0]) + if buddy: + self.recv_message(buddy, message) + class ChatActivity(activity.Activity): def __init__(self): activity.Activity.__init__(self) @@ -142,8 +184,10 @@ class ChatActivity(activity.Activity): serializer = richtext.RichTextSerializer() text = serializer.serialize(buf) - chat.send_message(text) - + (success, msg) = chat.send_message(text) + if not success: + chat.error_message(msg) + buf.set_text("") buf.place_cursor(buf.get_start_iter()) @@ -151,8 +195,15 @@ class ChatActivity(activity.Activity): def _start(self): self._buddy_list.start() + print "Starting announce." self._pannounce.register_service(self._realname, 6666, presence.OLPC_CHAT_SERVICE, name = self._nick, realname = self._realname) + print "Done announce." + + # Create the P2P chat XMLRPC server + self._p2p_req_handler = ChatRequestHandler(self) + self._p2p_server = network.GlibXMLRPCServer(("", 6666)) + self._p2p_server.register_instance(self._p2p_req_handler) def activity_on_connected_to_shell(self): print "act %d: in activity_on_connected_to_shell" % self.activity_get_id() @@ -202,12 +253,11 @@ class ChatActivity(activity.Activity): chat = self._group_chat else: chat = buddy.chat() + if not chat: + chat = BuddyChat(self, buddy, self._chat_view, self._chat_label) + buddy.set_chat(chat) - if chat: - chat.activate() - else: - # start a new chat with them - pass + chat.activate() def _on_buddy_presence_event(self, action, buddy): if action == BuddyList.ACTION_BUDDY_ADDED: diff --git a/chat/network.py b/chat/network.py index 1f2c904a..dc80a943 100644 --- a/chat/network.py +++ b/chat/network.py @@ -6,6 +6,75 @@ import traceback import select import time import gobject +import SimpleXMLRPCServer +import SocketServer + + +__authinfos = {} + +def _add_authinfo(authinfo): + __authinfos[threading.currentThread()] = authinfo + +def get_authinfo(): + return __authinfos.get(threading.currentThread()) + +def _del_authinfo(): + del __authinfos[threading.currentThread()] + + +class GlibTCPServer(SocketServer.TCPServer): + """GlibTCPServer + + Integrate socket accept into glib mainloop. + """ + + allow_reuse_address = True + request_queue_size = 20 + + def __init__(self, server_address, RequestHandlerClass): + SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass) + self.socket.setblocking(0) # Set nonblocking + + # Watch the listener socket for data + gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept) + + def _handle_accept(self, source, condition): + if not (condition & gobject.IO_IN): + return True + self.handle_request() + return True + +class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): + """ GlibXMLRPCRequestHandler + + The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass + the client's address and/or SSL certificate into the function that actually + _processes_ the request. So we have to store it in a thread-indexed dict. + """ + + def do_POST(self): + _add_authinfo(self.client_address) + try: + SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self) + except socket.timeout: + pass + except socket.error, e: + print "Error (%s): socket error - '%s'" % (self.client_address, e) + _del_authinfo() + +class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher): + """GlibXMLRPCServer + + Use nonblocking sockets and handle the accept via glib rather than + blocking on accept(). + """ + + def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1): + self.logRequests = logRequests + + SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self) + GlibTCPServer.__init__(self, addr, requestHandler) + class GroupChatController(object): @@ -50,7 +119,7 @@ class GroupChatController(object): def _handle_incoming_data(self, source, condition): if not (condition & gobject.IO_IN): - return + return True msg = {} msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE) if self._data_cb: