Use asynchronous service resolution to capture service updates too

This commit is contained in:
Dan Williams 2006-09-19 13:54:32 -04:00
parent b6897cf1c5
commit d0f23744f0
4 changed files with 196 additions and 95 deletions

View File

@ -29,6 +29,11 @@ class ServiceAdv(object):
raise ValueError("local must be a boolean.")
self._local = local
self._state = _SA_UNRESOLVED
self._resolver = None
def __del__(self):
if self._resolver:
del self._resolver
def interface(self):
return self._interface
@ -47,7 +52,14 @@ class ServiceAdv(object):
def set_service(self, service):
if not isinstance(service, Service.Service):
raise ValueError("must be a valid service.")
self._service = service
if service != self._service:
self._service = service
def resolver(self):
return self._resolver
def set_resolver(self, resolver):
if not isinstance(resolver, dbus.Interface):
raise ValueError("must be a valid dbus object")
self._resolver = resolver
def state(self):
return self._state
def set_state(self, state):
@ -452,12 +464,16 @@ class PresenceService(object):
self._dbus_helper.ActivityDisappeared(activity.object_path())
del self._activities[actid]
def _resolve_service_reply_cb(self, adv, interface, protocol, full_name,
stype, domain, host, aprotocol, address, port, txt, flags):
def _service_resolved_cb(self, adv, interface, protocol, full_name,
stype, domain, host, aprotocol, address, port, txt, flags,
updated):
"""When the service discovery finally gets here, we've got enough
information about the service to assign it to a buddy."""
logging.debug("Resolved service '%s' type '%s' domain '%s' to " \
" %s:%s" % (full_name, stype, domain, address, port))
tag = "Resolved"
if updated:
tag = "Updated"
logging.debug("%s service '%s' type '%s' domain '%s' to " \
" %s:%s" % (tag, full_name, stype, domain, address, port))
if not adv in self._service_advs:
return False
@ -465,25 +481,33 @@ class PresenceService(object):
return False
# See if we know about this service already
service = None
key = (full_name, stype)
props = _txt_to_dict(txt)
if not self._services.has_key(key):
objid = self._get_next_object_id()
props = _txt_to_dict(txt)
service = Service.Service(self._bus_name, objid, name=full_name,
stype=stype, domain=domain, address=address, port=port,
properties=props, source_address=address, local=adv.is_local())
properties=props, source_address=address)
self._services[key] = service
else:
# Already tracking this service; likely we were the one that shared it
# in the first place, and therefore the source address would not have
# been set yet
# Already tracking this service; either:
# a) we were the one that shared it in the first place,
# and therefore the source address would not have
# been set yet
# b) the service has been updated
service = self._services[key]
if not service.get_source_address():
service.set_source_address(address)
if not service.get_address():
service.set_address(address)
adv.set_service(service)
if service and updated:
service.set_properties(props, from_network=True)
return False
# Merge the service into our buddy and activity lists, if needed
buddy = self._handle_new_service_for_buddy(service, adv.is_local())
if buddy and service.get_activity_id():
@ -491,24 +515,34 @@ class PresenceService(object):
return False
def _resolve_service_reply_cb_glue(self, adv, interface, protocol, name,
def _service_resolved_cb_glue(self, adv, interface, protocol, name,
stype, domain, host, aprotocol, address, port, txt, flags):
adv.set_state(_SA_RESOLVED)
gobject.idle_add(self._resolve_service_reply_cb, adv, interface,
protocol, name, stype, domain, host, aprotocol, address,
port, txt, flags)
# Avahi doesn't flag updates to existing services, so we have
# to determine that here
updated = False
if adv.state() == _SA_RESOLVED:
updated = True
def _resolve_service_error_handler(self, adv, err):
adv.set_state(_SA_RESOLVED)
gobject.idle_add(self._service_resolved_cb, adv, interface,
protocol, name, stype, domain, host, aprotocol, address,
port, txt, flags, updated)
def _service_resolved_failure_cb(self, adv, err):
adv.set_state(_SA_UNRESOLVED)
logging.error("Error resolving service %s.%s: %s" % (adv.name(), adv.stype(), err))
def _resolve_service(self, adv):
"""Resolve and lookup a ZeroConf service to obtain its address and TXT records."""
# Ask avahi to resolve this particular service
self._mdns_service.ResolveService(int(adv.interface()), int(adv.protocol()), adv.name(),
adv.stype(), adv.domain(), avahi.PROTO_UNSPEC, dbus.UInt32(0),
reply_handler=lambda *args: self._resolve_service_reply_cb_glue(adv, *args),
error_handler=lambda *args: self._resolve_service_error_handler(adv, *args))
path = self._mdns_service.ServiceResolverNew(dbus.Int32(adv.interface()),
dbus.Int32(adv.protocol()), adv.name(), adv.stype(), adv.domain(),
avahi.PROTO_UNSPEC, dbus.UInt32(0))
resolver = dbus.Interface(self._system_bus.get_object(avahi.DBUS_NAME, path),
avahi.DBUS_INTERFACE_SERVICE_RESOLVER)
resolver.connect_to_signal('Found', lambda *args: self._service_resolved_cb_glue(adv, *args))
resolver.connect_to_signal('Failure', lambda *args: self._service_resolved_failure_cb(adv, *args))
adv.set_resolver(resolver)
return False
def _service_appeared_cb(self, interface, protocol, full_name, stype, domain, flags):
@ -679,6 +713,11 @@ class PresenceService(object):
def register_service(self, name, stype, properties={}, address=None,
port=-1, domain=u"local", sender=None):
"""Register a new service, advertising it to other Buddies on the network."""
# Refuse to register if we can't get the dbus connection this request
# came from for some reason
if not sender:
raise RuntimeError("Service registration request must have a sender.")
(actid, person_name) = Service.decompose_service_name(name)
if self.get_owner() and person_name != self.get_owner().get_name():
raise RuntimeError("Tried to register a service that didn't have" \
@ -688,49 +727,18 @@ class PresenceService(object):
if not port or port == -1:
port = random.randint(4000, 65000)
try:
obj = self._system_bus.get_object(avahi.DBUS_NAME, self._mdns_service.EntryGroupNew())
group = dbus.Interface(obj, avahi.DBUS_INTERFACE_ENTRY_GROUP)
# Add properties; ensure they are converted to ByteArray types
# because python sometimes can't figure that out
info = dbus.Array([], signature="aay")
for k, v in properties.items():
info.append(dbus.types.ByteArray("%s=%s" % (k, v)))
objid = self._get_next_object_id()
service = Service.Service(self._bus_name, objid, name=name,
stype=stype, domain=domain, address=address, port=port,
properties=properties, source_address=None, local=True,
local_publisher=sender)
self._services[(name, stype)] = service
port = service.get_port()
logging.debug("Will register service with name='%s', stype='%s'," \
" domain='%s', address='%s', port=%d, info='%s'" % (name, stype,
domain, address, port, info))
group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, dbus.String(name),
dbus.String(stype), dbus.String(domain), dbus.String(""), # let Avahi figure the 'host' out
dbus.UInt16(port), info)
service.set_avahi_entry_group(group)
group.Commit()
except dbus.exceptions.DBusException, exc:
# FIXME: ignore local name collisions, since that means
# the zeroconf service is already registered. Ideally we
# should un-register it an re-register with the correct info
if str(exc) == "Local name collision":
pass
objid = self._get_next_object_id()
service = Service.Service(self._bus_name, objid, name=name,
stype=stype, domain=domain, address=address, port=port,
properties=properties, source_address=None,
local_publisher=sender)
self._services[(name, stype)] = service
self.register_service_type(stype)
service.register(self._system_bus, self._mdns_service)
return service
def unregister_service(self, service, sender=None):
local_publisher = service.get_local_publisher()
if sender is not None and local_publisher != sender:
raise ValueError("Service was not registered by requesting process!")
group = service.get_avahi_entry_group()
if not group:
raise ValueError("Service was not a local service provided by this laptop!")
group.Free()
service.unregister(sender)
def register_service_type(self, stype):
"""Requests that the Presence service look for and recognize

View File

@ -4,6 +4,7 @@ sys.path.insert(0, os.path.abspath("../../"))
from sugar import util
import dbus, dbus.service
import random
import logging
def compose_service_name(name, activity_id):
if type(name) == type(""):
@ -37,6 +38,31 @@ def decompose_service_name(name):
return (None, name)
return (activity_id, name[:start - 2])
def _one_dict_differs(dict1, dict2):
diff_keys = []
for key, value in dict1.items():
if not dict2.has_key(key) or dict2[key] != value:
diff_keys.append(key)
return diff_keys
def _dicts_differ(dict1, dict2):
diff_keys = []
diff1 = _one_dict_differs(dict1, dict2)
diff2 = _one_dict_differs(dict2, dict1)
for key in diff2:
if key not in diff1:
diff_keys.append(key)
diff_keys += diff1
return diff_keys
def _convert_properties_to_dbus_byte_array(props):
# Ensure properties are converted to ByteArray types
# because python sometimes can't figure that out
info = dbus.Array([], signature="aay")
for k, v in props.items():
info.append(dbus.types.ByteArray("%s=%s" % (k, v)))
return info
_ACTIVITY_ID_TAG = "ActivityID"
SERVICE_DBUS_INTERFACE = "org.laptop.Presence.Service"
@ -104,15 +130,18 @@ class ServiceDBusHelper(dbus.service.Object):
@dbus.service.method(SERVICE_DBUS_INTERFACE,
in_signature="a{sv}", sender_keyword="sender")
def setPublishedValues(self, values, sender):
if not self._parent.is_local():
raise ValueError("Service was not not registered by requesting process!")
self._parent.set_properties(values, sender)
class Service(object):
"""Encapsulates information about a specific ZeroConf/mDNS
service as advertised on the network."""
def __init__(self, bus_name, object_id, name, stype, domain=u"local",
address=None, port=-1, properties=None, source_address=None,
local=False, local_publisher=None):
local_publisher=None):
if not bus_name:
raise ValueError("DBus bus name must be valid")
if not object_id or type(object_id) != type(1):
@ -136,6 +165,12 @@ class Service(object):
if domain and domain != "local":
raise ValueError("must use the 'local' domain (for now).")
# ID of the D-Bus connection that published this service, if any.
# We only let the local publisher modify the service.
self._local_publisher = local_publisher
self._avahi_entry_group = None
(actid, real_name) = decompose_service_name(name)
self._name = real_name
self._full_name = name
@ -143,10 +178,9 @@ class Service(object):
self._domain = domain
self._port = -1
self.set_port(port)
self._properties = {}
self._internal_set_properties(properties, first_time=True)
self._avahi_entry_group = None
self._local = local
self._properties = None
self._dbus_helper = None
self._internal_set_properties(properties)
# Source address is the unicast source IP
self._source_address = None
@ -175,10 +209,6 @@ class Service(object):
self._owner = None
# ID of the D-Bus connection that published this service, if any.
# We only let the local publisher modify the service.
self._local_publisher = local_publisher
# register ourselves with dbus
self._object_id = object_id
self._object_path = SERVICE_DBUS_OBJECT_PATH + str(self._object_id)
@ -195,11 +225,10 @@ class Service(object):
raise RuntimeError("Can only set a service's owner once")
self._owner = owner
def get_local_publisher(self):
return self._local_publisher
def is_local(self):
return self._local
if self._local_publisher is not None:
return True
return False
def get_name(self):
"""Return the service's name, usually that of the
@ -225,12 +254,14 @@ class Service(object):
def set_property(self, key, value, sender=None):
"""Set one service property"""
if not self._local_publisher:
raise ValueError("Service was not not registered by requesting process!")
if sender is not None and self._local_publisher != sender:
raise ValueError("Service was not not registered by requesting process!")
if type(key) != type(u""):
raise ValueError("Key must be a unicode string.")
if type(value) != type(u"") or type(value) != type(True):
if type(value) != type(u"") and type(value) != type(True):
raise ValueError("Key must be a unicode string or a boolean.")
# Ignore setting the key to it's current value
@ -254,15 +285,22 @@ class Service(object):
if type(value) == type(True):
value = ""
self._properties[key] = value
self._dbus_helper.PublishedValueChanged(key)
def set_properties(self, properties, sender=None):
# if the service is locally published already, update the TXT records
if self._local_publisher and self._avahi_entry_group:
self.__internal_update_avahi_properties()
if self._dbus_helper:
self._dbus_helper.PublishedValueChanged([key])
def set_properties(self, properties, sender=None, from_network=False):
"""Set all service properties in one call"""
if sender is not None and self._local_publisher != sender:
raise ValueError("Service was not not registered by requesting process!")
self._internal_set_properties(properties)
def _internal_set_properties(self, properties, first_time=False):
self._internal_set_properties(properties, from_network)
def _internal_set_properties(self, properties, from_network=False):
"""Set the service's properties from either an Avahi
TXT record (a list of lists of integers), or a
python dictionary."""
@ -270,6 +308,11 @@ class Service(object):
raise ValueError("Properties must be a dictionary.")
self._properties = {}
# Make sure the properties are actually different
diff_keys = _dicts_differ(self._properties, properties)
if len(diff_keys) == 0:
return
# Set key/value pairs on internal property list
for key, value in properties.items():
if len(key) == 0:
@ -282,8 +325,19 @@ class Service(object):
tmp_val = unicode(tmp_val)
self._properties[tmp_key] = tmp_val
if not first_time:
self._dbus_helper.PublishedValueChanged(self._properties.keys())
# if the service is locally published already, update the TXT records
if self._local_publisher and self._avahi_entry_group and not from_network:
self.__internal_update_avahi_properties()
if self._dbus_helper:
self._dbus_helper.PublishedValueChanged(diff_keys)
def __internal_update_avahi_properties(self):
info = _convert_properties_to_dbus_byte_array(self._properties)
self._avahi_entry_group.UpdateServiceTxt(avahi.IF_UNSPEC,
avahi.PROTO_UNSPEC, 0,
dbus.String(self._full_name), dbus.String(self._stype),
dbus.String(self._domain), info)
def get_type(self):
"""Return the service's service type."""
@ -322,11 +376,42 @@ class Service(object):
"""Return the ZeroConf/mDNS domain the service was found in."""
return self._domain
def set_avahi_entry_group(self, group):
self._avahi_entry_group = group
def register(self, system_bus, avahi_service):
if self._avahi_entry_group is not None:
raise RuntimeError("Service already registered!")
obj = system_bus.get_object(avahi.DBUS_NAME, avahi_service.EntryGroupNew())
self._avahi_entry_group = dbus.Interface(obj, avahi.DBUS_INTERFACE_ENTRY_GROUP)
info = _convert_properties_to_dbus_byte_array(self._properties)
logging.debug("Will register service with name='%s', stype='%s'," \
" domain='%s', address='%s', port=%d, info='%s'" % (self._full_name,
self._stype, self._domain, self._address, self._port, info))
self._avahi_entry_group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0,
dbus.String(self._full_name), dbus.String(self._stype),
dbus.String(self._domain), dbus.String(""), # let Avahi figure the 'host' out
dbus.UInt16(self._port), info)
self._avahi_entry_group.connect_to_signal('StateChanged', self.__entry_group_changed_cb)
self._avahi_entry_group.Commit()
def __entry_group_changed_cb(self, state, error):
logging.debug("** %s.%s Entry group changed: state %s, error %s" % (self._full_name, self._stype, state, error))
def unregister(self, sender):
# Refuse to unregister if we can't get the dbus connection this request
# came from for some reason
if not sender:
raise RuntimeError("Service registration request must have a sender.")
if not self._local_publisher:
raise ValueError("Service was not a local service provided by this laptop!")
if sender is not None and self._local_publisher != sender:
raise ValueError("Service was not registered by requesting process!")
if not self._avahi_entry_group:
raise ValueError("Service was not registered by requesting process!")
self._avahi_entry_group.Free()
del self._avahi_entry_group
self._avahi_entry_group = None
def get_avahi_entry_group(self):
return self._avahi_entry_group
#################################################################
# Tests

View File

@ -9,6 +9,7 @@ import logging
from sugar.p2p import Stream
from sugar.presence import PresenceService
from model.Invites import Invites
import dbus
PRESENCE_SERVICE_TYPE = "_presence_olpc._tcp"
@ -70,7 +71,9 @@ class ShellOwner(object):
def __update_advertised_current_activity_cb(self):
self._last_activity_update = time.time()
self._pending_activity_update_timer = None
logging.debug("*** Updating current activity to %s" % self._pending_activity_update)
if self._pending_activity_update:
logging.debug("*** Updating current activity to %s" % self._pending_activity_update)
self._service.set_published_value('curact', dbus.String(self._pending_activity_update))
return False
def set_current_activity(self, activity_id):

View File

@ -2,18 +2,22 @@ import gobject
import dbus
def __one_dict_differs(dict1, dict2):
def _one_dict_differs(dict1, dict2):
diff_keys = []
for key, value in dict1.items():
if not dict2.has_key(key) or dict2[key] != value:
return True
return False
diff_keys.append(key)
return diff_keys
def __dicts_differ(dict1, dict2):
if __one_dict_differs(dict1, dict2):
return True
if __one_dict_differs(dict2, dict1):
return True
return False
def _dicts_differ(dict1, dict2):
diff_keys = []
diff1 = _one_dict_differs(dict1, dict2)
diff2 = _one_dict_differs(dict2, dict1)
for key in diff2:
if key not in diff1:
diff_keys.append(key)
diff_keys += diff1
return diff_keys
class Service(gobject.GObject):
@ -64,8 +68,9 @@ class Service(gobject.GObject):
def __published_value_changed_cb(self, keys):
oldvals = self._pubvals
self.get_published_values()
if __dicts_differ(oldvals, self._pubvals):
self.emit('published-value-changed', keys)
diff_keys = _dicts_differ(oldvals, self._pubvals)
if len(diff_keys) > 0:
self.emit('published-value-changed', diff_keys)
def get_name(self):
return self._props['name']