sugar-toolkit-gtk3/sugar/p2p/Stream.py

144 lines
4.5 KiB
Python
Raw Normal View History

import xmlrpclib
import socket
2006-05-22 04:21:42 +02:00
import traceback
2006-06-13 00:31:26 +02:00
import random
import logging
2006-05-22 08:11:39 +02:00
import network
from MostlyReliablePipe import MostlyReliablePipe
2006-06-13 00:31:26 +02:00
from sugar.presence import Service
def is_multicast_address(address):
"""Simple numerical check for whether an IP4 address
is in the range for multicast addresses or not."""
if not address:
return False
if address[3] != '.':
return False
first = int(float(address[:3]))
if first >= 224 and first <= 239:
return True
return False
class Stream(object):
2006-06-13 00:31:26 +02:00
def __init__(self, service):
if not service.get_port():
raise ValueError("service must have an address.")
self._service = service
2006-06-13 00:31:26 +02:00
self._reader_port = self._service.get_port()
self._writer_port = self._reader_port
self._address = self._service.get_address()
self._callback = None
2006-06-13 00:31:26 +02:00
def new_from_service(service, start_reader=True):
if is_multicast_address(service.get_address()):
2006-06-13 00:31:26 +02:00
return MulticastStream(service)
else:
2006-06-13 00:31:26 +02:00
return UnicastStream(service, start_reader)
new_from_service = staticmethod(new_from_service)
def set_data_listener(self, callback):
self._callback = callback
2006-06-13 00:31:26 +02:00
def _recv(self, address, data):
if self._callback:
self._callback(address, data)
2006-05-23 06:15:14 +02:00
class UnicastStreamWriter(object):
2006-06-13 00:31:26 +02:00
def __init__(self, stream, service):
# set up the writer
self._service = service
2006-06-13 00:31:26 +02:00
if not service.get_address():
raise ValueError("service must have a valid address.")
self._address = self._service.get_address()
self._port = self._service.get_port()
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
2006-05-23 06:15:14 +02:00
self._writer = network.GlibServerProxy(self._xmlrpc_addr)
2006-05-22 08:11:39 +02:00
2006-05-23 06:15:14 +02:00
def write(self, xmlrpc_data):
2006-05-22 04:21:42 +02:00
"""Write some data to the default endpoint of this pipe on the remote server."""
try:
2006-06-13 00:31:26 +02:00
self._writer.message(None, None, xmlrpc_data)
return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
return False
2006-05-23 06:15:14 +02:00
def custom_request(self, method_name, request_cb, user_data, *args):
2006-05-22 04:21:42 +02:00
"""Call a custom XML-RPC method on the remote server."""
try:
method = getattr(self._writer, method_name)
2006-05-23 06:15:14 +02:00
method(request_cb, user_data, *args)
return True
2006-05-22 08:11:39 +02:00
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
2006-05-23 06:15:14 +02:00
return False
2006-05-22 08:11:39 +02:00
class UnicastStream(Stream):
2006-06-13 00:31:26 +02:00
def __init__(self, service, start_reader=True):
"""Initializes the stream. If the 'start_reader' argument is True,
the stream will initialize and start a new stream reader, if it
is False, no reader will be created and the caller must call the
start_reader() method to start the stream reader and be able to
receive any data from the stream."""
Stream.__init__(self, service)
if start_reader:
self.start_reader()
def start_reader(self):
2006-06-13 00:31:26 +02:00
"""Start the stream's reader, which for UnicastStream objects is
and XMLRPC server. If there's a port conflict with some other
service, the reader will try to find another port to use instead.
Returns the port number used for the reader."""
# Set up the reader
self._reader = network.GlibXMLRPCServer(("", self._reader_port))
self._reader.register_function(self._message, "message")
2006-06-13 00:31:26 +02:00
def _message(self, message):
"""Called by the XMLRPC server when network data arrives."""
2006-06-13 00:31:26 +02:00
address = network.get_authinfo()
self._recv(address, message)
return True
2006-06-13 00:31:26 +02:00
def register_reader_handler(self, handler, name):
"""Register a custom message handler with the reader. This call
adds a custom XMLRPC method call with the name 'name' to the reader's
XMLRPC server, which then calls the 'handler' argument back when
a method call for it arrives over the network."""
2006-05-22 04:21:42 +02:00
if name == "message":
raise ValueError("Handler name 'message' is a reserved handler.")
self._reader.register_function(handler, name)
2006-05-23 06:15:14 +02:00
def new_writer(self, service):
2006-06-13 00:31:26 +02:00
"""Return a new stream writer object."""
return UnicastStreamWriter(self, service)
class MulticastStream(Stream):
2006-06-13 00:31:26 +02:00
def __init__(self, service):
Stream.__init__(self, service)
self._service = service
2006-06-13 00:31:26 +02:00
self._internal_start_reader()
def start_reader(self):
return self._reader_port
def _internal_start_reader(self):
logging.debug('Start multicast stream, address %s, port %d' % (self._address, self._reader_port))
if not self._service.get_address():
2006-06-13 00:31:26 +02:00
raise ValueError("service must have a valid address.")
self._pipe = MostlyReliablePipe('', self._address, self._reader_port,
self._recv_data_cb)
self._pipe.start()
def write(self, data):
2006-06-13 00:31:26 +02:00
self._pipe.send(data)
2006-06-13 00:31:26 +02:00
def _recv_data_cb(self, address, data, user_data=None):
self._recv(address[0], data)
2006-05-23 06:15:14 +02:00
def new_writer(self, service=None):
return self