Add test code for Sugar PresenceService bindings

master
Dan Williams 17 years ago
parent 6513fef6b2
commit 4ca4e5564e

@ -66,16 +66,17 @@ class PresenceService(gobject.GObject):
_PS_ACTIVITY_OP = DBUS_PATH + "/Activities/"
def __init__(self):
def __init__(self, allow_offline_iface=True):
"""Initialise the service and attempt to connect to events
"""
gobject.GObject.__init__(self)
self._objcache = {}
# attempt to load the interface to the service...
self._allow_offline_iface = allow_offline_iface
self._get_ps()
_bus_ = None
def _get_bus( self ):
def _get_bus(self):
"""Retrieve dbus session-bus or create new"""
if not self._bus_:
self._bus_ = dbus.SessionBus()
@ -85,7 +86,7 @@ class PresenceService(gobject.GObject):
"""DBUS SessionBus object for user-local communications"""
)
_ps_ = None
def _get_ps( self ):
def _get_ps(self):
"""Retrieve dbus interface to PresenceService
Also registers for updates from various dbus events on the
@ -110,7 +111,9 @@ class PresenceService(gobject.GObject):
"""Failure retrieving %r interface from the D-BUS service %r %r: %s""",
DBUS_INTERFACE, DBUS_SERVICE, DBUS_PATH, err
)
return _OfflineInterface()
if self._allow_offline_iface:
return _OfflineInterface()
raise RuntimeError("Failed to connect to the presence service.")
else:
self._ps_ = ps
ps.connect_to_signal('BuddyAppeared', self._buddy_appeared_cb)

@ -0,0 +1,278 @@
#!/usr/bin/env python
# Copyright (C) 2007, Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import gobject
import dbus, dbus.service, dbus.glib
class NotFoundError(dbus.DBusException):
def __init__(self):
dbus.DBusException.__init__(self)
self._dbus_error_name = _PRESENCE_INTERFACE + '.NotFound'
_ACTIVITY_PATH = "/org/laptop/Sugar/Presence/Activities/"
_ACTIVITY_INTERFACE = "org.laptop.Sugar.Presence.Activity"
class TestActivity(dbus.service.Object):
def __init__(self, bus_name, object_id, actid, name, color, atype):
self._actid = actid
self._name = name
self._color = color
self._type = atype
self._buddies = {}
self._object_id = object_id
self._object_path = _ACTIVITY_PATH + str(self._object_id)
dbus.service.Object.__init__(self, bus_name, self._object_path)
@dbus.service.signal(_ACTIVITY_INTERFACE, signature="o")
def BuddyJoined(self, buddy_path):
pass
@dbus.service.signal(_ACTIVITY_INTERFACE, signature="o")
def BuddyLeft(self, buddy_path):
pass
@dbus.service.signal(_ACTIVITY_INTERFACE, signature="o")
def NewChannel(self, channel_path):
pass
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="s")
def GetId(self):
return self._actid
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="s")
def GetColor(self):
return self._color
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="s")
def GetType(self):
return self._type
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="",
async_callbacks=('async_cb', 'async_err_cb'))
def Join(self, async_cb, async_err_cb):
pass
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="ao")
def GetJoinedBuddies(self):
return []
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="soao")
def GetChannels(self):
return None
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="s")
def GetName(self):
return self._name
_BUDDY_PATH = "/org/laptop/Sugar/Presence/Buddies/"
_BUDDY_INTERFACE = "org.laptop.Sugar.Presence.Buddy"
_OWNER_INTERFACE = "org.laptop.Sugar.Presence.Buddy.Owner"
_PROP_NICK = "nick"
_PROP_KEY = "key"
_PROP_ICON = "icon"
_PROP_CURACT = "current-activity"
_PROP_COLOR = "color"
_PROP_OWNER = "owner"
class TestBuddy(dbus.service.Object):
def __init__(self, bus_name, object_id, pubkey, nick, color):
self._key = pubkey
self._nick = nick
self._color = color
self._owner = False
self._curact = None
self._icon = ""
self._activities = {}
self._object_id = object_id
self._object_path = _BUDDY_PATH + str(self._object_id)
dbus.service.Object.__init__(self, bus_name, self._object_path)
@dbus.service.signal(_BUDDY_INTERFACE, signature="ay")
def IconChanged(self, icon_data):
pass
@dbus.service.signal(_BUDDY_INTERFACE, signature="o")
def JoinedActivity(self, activity_path):
pass
@dbus.service.signal(_BUDDY_INTERFACE, signature="o")
def LeftActivity(self, activity_path):
pass
@dbus.service.signal(_BUDDY_INTERFACE, signature="a{sv}")
def PropertyChanged(self, updated):
pass
# dbus methods
@dbus.service.method(_BUDDY_INTERFACE, in_signature="", out_signature="ay")
def GetIcon(self):
return dbus.ByteArray(self._icon)
@dbus.service.method(_BUDDY_INTERFACE, in_signature="", out_signature="ao")
def GetJoinedActivities(self):
acts = []
for key in self._activities.keys():
acts.append(dbus.ObjectPath(key))
return acts
@dbus.service.method(_BUDDY_INTERFACE, in_signature="", out_signature="a{sv}")
def GetProperties(self):
props = {}
props[_PROP_NICK] = self._nick
props[_PROP_OWNER] = self._owner
props[_PROP_KEY] = self._key
props[_PROP_COLOR] = self._color
if self._curact:
props[_PROP_CURACT] = self._curact
else:
props[_PROP_CURACT] = ""
return props
_OWNER_PUBKEY = "AAAAB3NzaC1kc3MAAACBAKEVDFJW9D9GK20QFYRKbhV7kpjnhKkkzudn34ij" \
"Ixje+x1ZXTIU6J1GFmJYrHq9uBRi72lOVAosGUop+HHZFRyTeYLxItmKfIoD" \
"S2rwyL9cGRoDsD4yjECMqa2I+pGxriw4OmHeu5vmBkk+5bXBdkLf0EfseuPC" \
"lT7FE+Fj4C6FAAAAFQCygOIpXXybKlVTcEfprOQp3Uud0QAAAIBjyjQhOWHq" \
"FdJlALmnriQR+Zi1i4N/UMjWihF245RXJuUU6DyYbx4QxznxRnYKx/ZvsD0O" \
"9+ihzmQd6eFwU/jQ6sxiL7DSlCJ3axgG9Yvbf7ELeXGo4/Z9keOVdei0sXz4" \
"VBvJC0c0laELsnU0spFC62qQKxNemTbXDGksauj19gAAAIEAmcvY8VX47pRP" \
"k7MjrDzZlPvvNQgHMNZSwHGIsF7EMGVDCYpbQTyR+cmtJBBFVyxtNbK7TWTZ" \
"K8uH1tm9GyMcViUdIT4xCirA0JanE597KdlBz39l/623wF4jvbnnHOZ/pIT9" \
"tPd1pCYJf+L7OEKCBUAyQhcq159X8A1toM48Soc="
_OWNER_PRIVKEY = "MIIBuwIBAAKBgQChFQxSVvQ/RittEBWESm4Ve5KY54SpJM7nZ9+IoyMY3vs" \
"dWV0yFOidRhZiWKx6vbgUYu9pTlQKLBlKKfhx2RUck3mC8SLZinyKA0tq8M" \
"i/XBkaA7A+MoxAjKmtiPqRsa4sODph3rub5gZJPuW1wXZC39BH7HrjwpU+x" \
"RPhY+AuhQIVALKA4ildfJsqVVNwR+ms5CndS53RAoGAY8o0ITlh6hXSZQC5" \
"p64kEfmYtYuDf1DI1ooRduOUVyblFOg8mG8eEMc58UZ2Csf2b7A9Dvfooc5" \
"kHenhcFP40OrMYi+w0pQid2sYBvWL23+xC3lxqOP2fZHjlXXotLF8+FQbyQ" \
"tHNJWhC7J1NLKRQutqkCsTXpk21wxpLGro9fYCgYEAmcvY8VX47pRPk7Mjr" \
"DzZlPvvNQgHMNZSwHGIsF7EMGVDCYpbQTyR+cmtJBBFVyxtNbK7TWTZK8uH" \
"1tm9GyMcViUdIT4xCirA0JanE597KdlBz39l/623wF4jvbnnHOZ/pIT9tPd" \
"1pCYJf+L7OEKCBUAyQhcq159X8A1toM48SocCFAvkZYCYtLhSDEPrlf0jLD" \
"jrMz+i"
_OWNER_NICK = "TestOwner"
_OWNER_COLOR = "#75C228,#308C30"
class TestOwner(TestBuddy):
def __init__(self, bus_name, object_id):
TestBuddy.__init__(self, bus_name, object_id, _OWNER_PUBKEY,
_OWNER_NICK, _OWNER_COLOR)
self._owner = True
_PRESENCE_SERVICE = "org.laptop.Sugar.Presence"
_PRESENCE_INTERFACE = "org.laptop.Sugar.Presence"
_PRESENCE_PATH = "/org/laptop/Sugar/Presence"
class TestPresenceService(dbus.service.Object):
"""A test D-Bus PresenceService used to exercise the Sugar PS bindings."""
def __init__(self):
self._next_object_id = 0
self._activities = {}
self._buddies = {}
self._bus_name = dbus.service.BusName(_PRESENCE_SERVICE,
bus=dbus.SessionBus())
objid = self._get_next_object_id()
self._owner = TestOwner(self._bus_name, objid)
dbus.service.Object.__init__(self, self._bus_name, _PRESENCE_PATH)
def _get_next_object_id(self):
"""Increment and return the object ID counter."""
self._next_object_id = self._next_object_id + 1
return self._next_object_id
@dbus.service.signal(_PRESENCE_INTERFACE, signature="o")
def ActivityAppeared(self, activity):
pass
@dbus.service.signal(_PRESENCE_INTERFACE, signature="o")
def ActivityDisappeared(self, activity):
pass
@dbus.service.signal(_PRESENCE_INTERFACE, signature="o")
def BuddyAppeared(self, buddy):
pass
@dbus.service.signal(_PRESENCE_INTERFACE, signature="o")
def BuddyDisappeared(self, buddy):
pass
@dbus.service.signal(_PRESENCE_INTERFACE, signature="o")
def ActivityInvitation(self, activity):
pass
@dbus.service.signal(_PRESENCE_INTERFACE, signature="soo")
def PrivateInvitation(self, bus_name, connection, channel):
pass
@dbus.service.method(_PRESENCE_INTERFACE, out_signature="ao")
def GetActivities(self):
ret = []
for act in self._activities.values():
ret.append(dbus.ObjectPath(act._object_path))
return ret
@dbus.service.method(_PRESENCE_INTERFACE, in_signature="s", out_signature="o")
def GetActivityById(self, actid):
if self._activities.has_key(actid):
return dbus.ObjectPath(self._activities[actid]._object_path)
raise NotFoundError("The activity was not found.")
@dbus.service.method(_PRESENCE_INTERFACE, out_signature="ao")
def GetBuddies(self):
ret = []
for buddy in self._buddies.values():
ret.append(buddy._object_path)
return ret
@dbus.service.method(_PRESENCE_INTERFACE, in_signature="ay", out_signature="o")
def GetBuddyByPublicKey(self, key):
key = ''.join([chr(item) for item in key])
if self._buddies.has_key(key):
return self._buddies[key]._object_path
raise NotFoundError("The buddy was not found.")
@dbus.service.method(_PRESENCE_INTERFACE, out_signature="o")
def GetOwner(self):
if not self._owner:
raise NotFoundError("The owner was not found.")
return dbus.ObjectPath(self._owner._object_path)
@dbus.service.method(_PRESENCE_INTERFACE, in_signature="sssa{sv}",
out_signature="o", async_callbacks=('async_cb', 'async_err_cb'))
def ShareActivity(self, actid, atype, name, properties, async_cb, async_err_cb):
pass
@dbus.service.method(_PRESENCE_INTERFACE, out_signature="so")
def GetPreferredConnection(self):
return "bar.baz.foo", "/bar/baz/foo"
def main():
loop = gobject.MainLoop()
ps = TestPresenceService()
loop.run()
if __name__ == "__main__":
main()

@ -0,0 +1,102 @@
#!/usr/bin/env python
# Copyright (C) 2007, Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import os, time
import dbus
import gobject, gtk
import unittest
from sugar.presence import presenceservice
import mockps
class PSBindingsTestCase(unittest.TestCase):
def setUp(self):
argv = ["mockps.py", "mockps.py"]
(self._pspid, stdin, stdout, stderr) = gobject.spawn_async(argv, flags=gobject.SPAWN_LEAVE_DESCRIPTORS_OPEN)
print "Presence service started, pid %d" % self._pspid
# Wait until it shows up on the bus
tries = 0
bus = dbus.SessionBus()
while tries < 5:
bus_object = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
try:
if bus_object.GetNameOwner(presenceservice.DBUS_SERVICE, dbus_interface='org.freedesktop.DBus'):
break
except dbus.exceptions.DBusException, err:
pass
time.sleep(1)
tries += 1
if tries >= 5:
self.tearDown()
raise RuntimeError("Couldn't start the mock presence service")
def tearDown(self):
if self._pspid >= 0:
os.kill(self._pspid, 15)
self._pspid = -1
print "Presence service stopped."
def _handle_error(self, err, user_data):
user_data["success"] = False
user_data["err"] = str(err)
gtk.main_quit()
def _testOwner_helper(self, user_data):
try:
ps = presenceservice.PresenceService(False)
except RuntimeError, err:
self._handle_error(err, user_data)
return False
try:
owner = ps.get_owner()
except RuntimeError, err:
self._handle_error(err, user_data)
return False
user_data["success"] = True
user_data["owner"] = owner
gtk.main_quit()
return False
def testOwner(self):
user_data = {"success": False, "err": "", "owner": None}
gobject.idle_add(self._testOwner_helper, user_data)
gtk.main()
assert user_data["success"] == True, user_data["err"]
assert user_data["owner"], "Owner could not be found."
owner = user_data["owner"]
assert owner.props.key == mockps._OWNER_PUBKEY, "Owner public key doesn't match expected"
assert owner.props.nick == mockps._OWNER_NICK, "Owner nickname doesn't match expected"
assert owner.props.color == mockps._OWNER_COLOR, "Owner color doesn't match expected"
def addToSuite(suite):
suite.addTest(PSBindingsTestCase("testOwner"))
addToSuite = staticmethod(addToSuite)
def main():
suite = unittest.TestSuite()
PSBindingsTestCase.addToSuite(suite)
runner = unittest.TextTestRunner()
runner.run(suite)
if __name__ == "__main__":
main()
Loading…
Cancel
Save