Replace StreamReader and StreamWriter with a unified Stream class. Since the

This commit is contained in:
Dan Williams 2006-05-19 14:50:20 -04:00
parent 8cfc17ff55
commit 9a1324d0b0
5 changed files with 124 additions and 115 deletions

View File

@ -15,8 +15,7 @@ from sugar.shell import activity
from sugar.p2p.Group import Group
from sugar.p2p.Group import LocalGroup
from sugar.p2p.Service import Service
from sugar.p2p.StreamReader import StreamReader
from sugar.p2p.StreamWriter import StreamWriter
from sugar.p2p.Stream import Stream
from sugar.session.LogWriter import LogWriter
import sugar.env
@ -177,10 +176,7 @@ class BuddyChat(Chat):
Chat.__init__(self, controller)
def _start(self):
group = self._controller.get_group()
buddy_name = self._buddy.get_service_name()
service = group.get_service(buddy_name, CHAT_SERVICE_TYPE)
self._stream_writer = StreamWriter(group, service)
self._stream_writer = self._controller.new_buddy_writer(self._buddy.get_service_name())
def activity_on_connected_to_shell(self):
Chat.activity_on_connected_to_shell(self)
@ -220,26 +216,33 @@ class GroupChat(Chat):
def get_group(self):
return self._group
def new_buddy_writer(self, buddy_name):
service = self._group.get_service(buddy_name, CHAT_SERVICE_TYPE)
return self._buddy_stream.new_writer(service)
def _start(self):
self._group = LocalGroup()
self._group.add_presence_listener(self._on_group_event)
self._group.join()
name = self._group.get_owner().get_service_name()
service = Service(name, CHAT_SERVICE_TYPE, CHAT_SERVICE_PORT)
self._buddy_reader = StreamReader(self._group, service)
self._buddy_reader.set_listener(self._buddy_recv_message)
service.register(self._group)
service = Service(name, GROUP_CHAT_SERVICE_TYPE,
# Group controls the Stream for incoming messages for
# specific buddy chats
buddy_service = Service(name, CHAT_SERVICE_TYPE, CHAT_SERVICE_PORT)
self._buddy_stream = Stream.new_from_service(buddy_service, self._group)
self._buddy_stream.set_data_listener(self._buddy_recv_message)
buddy_service.register(self._group)
# Group chat Stream
group_service = Service(name, GROUP_CHAT_SERVICE_TYPE,
GROUP_CHAT_SERVICE_PORT,
GROUP_CHAT_SERVICE_ADDRESS)
self._group.add_service(service)
self._group.add_service(group_service)
self._buddy_reader = StreamReader(self._group, service)
self._buddy_reader.set_listener(self.recv_message)
self._stream_writer = StreamWriter(self._group, service)
self._group_stream = Stream.new_from_service(group_service, self._group)
self._group_stream.set_data_listener(self.recv_message)
self._stream_writer = self._group_stream.new_writer()
def _create_sidebar(self):
vbox = gtk.VBox(False, 6)
@ -404,4 +407,7 @@ if len(sys.argv) > 1 and sys.argv[1] == "--console":
sys.stderr = LogWriter("Chat")
ChatShell.get_instance().open_group_chat()
gtk.main()
try:
gtk.main()
except KeyboardInterrupt:
pass

View File

@ -8,7 +8,7 @@ sugar_PYTHON = \
NotificationListener.py \
Notifier.py \
Service.py \
StreamReader.py \
StreamWriter.py \
Stream.py \
MostlyReliablePipe.py \
network.py \
presence.py

99
sugar/p2p/Stream.py Normal file
View File

@ -0,0 +1,99 @@
import xmlrpclib
import socket
import network
from MostlyReliablePipe import MostlyReliablePipe
class Stream(object):
def __init__(self, service, group):
self._service = service
self._group = group
self._owner_nick_name = self._group.get_owner().get_nick_name()
self._port = self._service.get_port()
self._address = self._service.get_address()
self._callback = None
def new_from_service(service, group):
if service.is_multicast():
return MulticastStream(service, group)
else:
return UnicastStream(service, group)
new_from_service = staticmethod(new_from_service)
def set_data_listener(self, callback):
self._callback = callback
def recv(self, nick_name, data):
if nick_name != self._owner_nick_name:
if self._callback:
self._callback(self._group.get_buddy(nick_name), data)
class UnicastStreamWriter(object):
def __init__(self, stream, service, owner_nick_name):
# set up the writer
self._service = service
self._owner_nick_name = owner_nick_name
self._address = self._service.get_address()
self._port = self._service.get_port()
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr)
def write(self, data):
try:
self._writer.message(self._owner_nick_name, data)
return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
return False
class UnicastStream(Stream):
def __init__(self, service, group):
Stream.__init__(self, service, group)
self._setup()
def _setup(self):
# Set up the reader
started = False
tries = 10
port = self._service.get_port()
self._reader = None
while not started and tries > 0:
try:
self._reader = network.GlibXMLRPCServer(("", port))
self._reader.register_instance(self)
started = True
except(socket.error):
port = port + 1
tries = tries - 1
self._service.set_port(port)
def message(self, nick_name, message):
"""Called by the XMLRPC server when network data arrives."""
self.recv(nick_name, message)
return True
def new_writer(self, service):
return UnicastStreamWriter(self, service, self._owner_nick_name)
class MulticastStream(Stream):
def __init__(self, service, group):
Stream.__init__(self, service, group)
self._address = self._service.get_group_address()
self._setup()
def _setup(self):
self._pipe = MostlyReliablePipe('', self._address, self._port, self._recv_data_cb)
self._pipe.start()
def write(self, data):
self._pipe.send(self._owner_nick_name + " |**| " + data)
def _recv_data_cb(self, addr, data, user_data=None):
[ nick_name, data ] = data.split(" |**| ", 2)
self.recv(nick_name, data)
def new_writer(self, service=None):
return self

View File

@ -1,52 +0,0 @@
import socket
import network
class StreamReaderRequestHandler(object):
def __init__(self, reader):
self._reader = reader
def message(self, nick_name, message):
self._reader.recv(nick_name, message)
return True
class StreamReader:
def __init__(self, group, service):
self._group = group
self._service = service
if self._service.is_multicast():
self._setup_multicast()
else:
self._setup_unicast()
def set_listener(self, callback):
self._callback = callback
def _setup_multicast(self):
address = self._service.get_group_address()
port = self._service.get_port()
server = network.GroupServer(address, port, self._recv_multicast)
server.start()
def _setup_unicast(self):
started = False
tries = 10
port = self._service.get_port()
while not started and tries > 0:
try:
p2p_server = network.GlibXMLRPCServer(("", port))
p2p_server.register_instance(StreamReaderRequestHandler(self))
started = True
except(socket.error):
port = port + 1
tries = tries - 1
self._service.set_port(port)
def _recv_multicast(self, msg):
[ nick_name, data ] = msg['data'].split(" |**| ", 2)
self.recv(nick_name, data)
def recv(self, nick_name, data):
if nick_name != self._group.get_owner().get_nick_name():
self._callback(self._group.get_buddy(nick_name), data)

View File

@ -1,44 +0,0 @@
import xmlrpclib
import traceback
import socket
import network
class StreamWriter:
def __init__(self, group, service):
self._group = group
self._service = service
self._address = self._service.get_address()
self._port = self._service.get_port()
self._group_address = self._service.get_group_address()
if self._service.is_multicast():
self._setup_multicast()
else:
self._setup_unicast()
def write(self, data):
if self._service.is_multicast():
self._multicast_write(data)
else:
self._unicast_write(data)
def _setup_unicast(self):
xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
self._uclient = xmlrpclib.ServerProxy(xmlrpc_addr)
def _unicast_write(self, data):
try:
nick_name = self._group.get_owner().get_nick_name()
self._uclient.message(nick_name, data)
return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
return False
def _setup_multicast(self):
self._mclient = network.GroupClient(self._group_address, self._port)
def _multicast_write(self, data):
nick_name = self._group.get_owner().get_nick_name()
self._mclient.send_msg(nick_name + " |**| " + data)