161 lines
5.9 KiB
Python
161 lines
5.9 KiB
Python
# Copyright (C) 2006, Red Hat, Inc.
|
|
#
|
|
# This library is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2 of the License, or (at your option) any later version.
|
|
#
|
|
# This library is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this library; if not, write to the
|
|
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
|
|
# Boston, MA 02111-1307, USA.
|
|
|
|
import xmlrpclib
|
|
import socket
|
|
import traceback
|
|
import random
|
|
import logging
|
|
|
|
import network
|
|
from MostlyReliablePipe import MostlyReliablePipe
|
|
from sugar.presence import Service
|
|
|
|
def is_multicast_address(address):
|
|
"""Simple numerical check for whether an IP4 address
|
|
is in the range for multicast addresses or not."""
|
|
if not address:
|
|
return False
|
|
if address[3] != '.':
|
|
return False
|
|
first = int(float(address[:3]))
|
|
if first >= 224 and first <= 239:
|
|
return True
|
|
return False
|
|
|
|
class Stream(object):
|
|
def __init__(self, service):
|
|
if not service.get_port():
|
|
raise ValueError("service must have an address.")
|
|
self._service = service
|
|
self._reader_port = self._service.get_port()
|
|
self._writer_port = self._reader_port
|
|
self._address = self._service.get_address()
|
|
self._callback = None
|
|
|
|
def new_from_service(service, start_reader=True):
|
|
if is_multicast_address(service.get_address()):
|
|
return MulticastStream(service)
|
|
else:
|
|
return UnicastStream(service, start_reader)
|
|
new_from_service = staticmethod(new_from_service)
|
|
|
|
def set_data_listener(self, callback):
|
|
self._callback = callback
|
|
|
|
def _recv(self, address, data):
|
|
if self._callback:
|
|
self._callback(address, data)
|
|
|
|
|
|
class UnicastStreamWriter(object):
|
|
def __init__(self, stream, service):
|
|
# set up the writer
|
|
self._service = service
|
|
if not service.get_address():
|
|
raise ValueError("service must have a valid address.")
|
|
self._address = self._service.get_address()
|
|
self._port = self._service.get_port()
|
|
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
|
|
self._writer = network.GlibServerProxy(self._xmlrpc_addr)
|
|
|
|
def write(self, xmlrpc_data):
|
|
"""Write some data to the default endpoint of this pipe on the remote server."""
|
|
try:
|
|
self._writer.message(None, None, xmlrpc_data)
|
|
return True
|
|
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def custom_request(self, method_name, request_cb, user_data, *args):
|
|
"""Call a custom XML-RPC method on the remote server."""
|
|
try:
|
|
method = getattr(self._writer, method_name)
|
|
method(request_cb, user_data, *args)
|
|
return True
|
|
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
|
|
class UnicastStream(Stream):
|
|
def __init__(self, service, start_reader=True):
|
|
"""Initializes the stream. If the 'start_reader' argument is True,
|
|
the stream will initialize and start a new stream reader, if it
|
|
is False, no reader will be created and the caller must call the
|
|
start_reader() method to start the stream reader and be able to
|
|
receive any data from the stream."""
|
|
Stream.__init__(self, service)
|
|
if start_reader:
|
|
self.start_reader()
|
|
|
|
def start_reader(self):
|
|
"""Start the stream's reader, which for UnicastStream objects is
|
|
and XMLRPC server. If there's a port conflict with some other
|
|
service, the reader will try to find another port to use instead.
|
|
Returns the port number used for the reader."""
|
|
# Set up the reader
|
|
self._reader = network.GlibXMLRPCServer(("", self._reader_port))
|
|
self._reader.register_function(self._message, "message")
|
|
|
|
def _message(self, message):
|
|
"""Called by the XMLRPC server when network data arrives."""
|
|
address = network.get_authinfo()
|
|
self._recv(address, message)
|
|
return True
|
|
|
|
def register_reader_handler(self, handler, name):
|
|
"""Register a custom message handler with the reader. This call
|
|
adds a custom XMLRPC method call with the name 'name' to the reader's
|
|
XMLRPC server, which then calls the 'handler' argument back when
|
|
a method call for it arrives over the network."""
|
|
if name == "message":
|
|
raise ValueError("Handler name 'message' is a reserved handler.")
|
|
self._reader.register_function(handler, name)
|
|
|
|
def new_writer(self, service):
|
|
"""Return a new stream writer object."""
|
|
return UnicastStreamWriter(self, service)
|
|
|
|
|
|
class MulticastStream(Stream):
|
|
def __init__(self, service):
|
|
Stream.__init__(self, service)
|
|
self._service = service
|
|
self._internal_start_reader()
|
|
|
|
def start_reader(self):
|
|
return self._reader_port
|
|
|
|
def _internal_start_reader(self):
|
|
logging.debug('Start multicast stream, address %s, port %d' % (self._address, self._reader_port))
|
|
if not self._service.get_address():
|
|
raise ValueError("service must have a valid address.")
|
|
self._pipe = MostlyReliablePipe('', self._address, self._reader_port,
|
|
self._recv_data_cb)
|
|
self._pipe.start()
|
|
|
|
def write(self, data):
|
|
self._pipe.send(data)
|
|
|
|
def _recv_data_cb(self, address, data, user_data=None):
|
|
self._recv(address[0], data)
|
|
|
|
def new_writer(self, service=None):
|
|
return self
|