Test activity appeared/disappeared

This commit is contained in:
Dan Williams 2007-04-24 12:11:36 -04:00
parent 5d11e7fdd9
commit f5305a3ea0
2 changed files with 163 additions and 18 deletions

View File

@ -30,7 +30,7 @@ _ACTIVITY_INTERFACE = "org.laptop.Sugar.Presence.Activity"
class TestActivity(dbus.service.Object):
def __init__(self, bus_name, object_id, actid, name, color, atype):
self._actid = actid
self._name = name
self._aname = name
self._color = color
self._type = atype
self._buddies = {}
@ -51,35 +51,35 @@ class TestActivity(dbus.service.Object):
def NewChannel(self, channel_path):
pass
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="s")
@dbus.service.method(_ACTIVITY_INTERFACE, out_signature="s")
def GetId(self):
return self._actid
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="s")
@dbus.service.method(_ACTIVITY_INTERFACE, out_signature="s")
def GetName(self):
return self._aname
@dbus.service.method(_ACTIVITY_INTERFACE, out_signature="s")
def GetColor(self):
return self._color
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="s")
@dbus.service.method(_ACTIVITY_INTERFACE, out_signature="s")
def GetType(self):
return self._type
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="",
@dbus.service.method(_ACTIVITY_INTERFACE,
async_callbacks=('async_cb', 'async_err_cb'))
def Join(self, async_cb, async_err_cb):
pass
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="ao")
@dbus.service.method(_ACTIVITY_INTERFACE, out_signature="ao")
def GetJoinedBuddies(self):
return []
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="soao")
@dbus.service.method(_ACTIVITY_INTERFACE, out_signature="soao")
def GetChannels(self):
return None
@dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="s")
def GetName(self):
return self._name
_BUDDY_PATH = "/org/laptop/Sugar/Presence/Buddies/"
_BUDDY_INTERFACE = "org.laptop.Sugar.Presence.Buddy"
@ -282,14 +282,31 @@ class TestPresenceService(dbus.service.Object):
@dbus.service.method(_PRESENCE_TEST_INTERFACE, in_signature="ay")
def RemoveBuddy(self, pubkey):
pubkey = ''.join([chr(item) for item in pubkey])
if self._buddies.has_key(pubkey):
buddy = self._buddies[pubkey]
self.BuddyDisappeared(buddy._object_path)
del self._buddies[pubkey]
return
raise NotFoundError("Buddy not found")
if not self._buddies.has_key(pubkey):
raise NotFoundError("Buddy not found")
buddy = self._buddies[pubkey]
self.BuddyDisappeared(buddy._object_path)
del self._buddies[pubkey]
@dbus.service.method(_PRESENCE_TEST_INTERFACE, in_signature="ssss")
def AddActivity(self, actid, name, color, atype):
objid = self._get_next_object_id()
act = TestActivity(self._bus_name, objid, actid, name, color, atype)
self._activities[actid] = act
self.ActivityAppeared(act._object_path)
@dbus.service.method(_PRESENCE_TEST_INTERFACE, in_signature="s")
def RemoveActivity(self, actid):
if not self._activities.has_key(actid):
raise NotFoundError("Activity not found")
act = self._activities[actid]
self.ActivityDisappeared(act._object_path)
del self._activities[actid]
def main():
import logging
logging.basicConfig(level=logging.DEBUG)
loop = gobject.MainLoop()
ps = TestPresenceService()
loop.run()

View File

@ -54,7 +54,7 @@ def stop_ps(pid):
os.kill(pid, 15)
class BuddyTests(unittest.TestCase):
class GenericTestCase(unittest.TestCase):
def setUp(self):
self._pspid = start_ps()
@ -68,6 +68,8 @@ class BuddyTests(unittest.TestCase):
user_data["err"] = str(err)
gtk.main_quit()
class BuddyTests(GenericTestCase):
def _testOwner_helper(self, user_data):
try:
ps = presenceservice.get_instance(False)
@ -218,9 +220,135 @@ class BuddyTests(unittest.TestCase):
suite.addTest(BuddyTests("testBuddyDisappeared"))
addToSuite = staticmethod(addToSuite)
class ActivityTests(GenericTestCase):
_AA_ID = "d622b99b9f365d712296094b9f6110521e6c9cba"
_AA_NAME = "Test Activity"
_AA_TYPE = "org.laptop.Sugar.Foobar"
_AA_COLOR = "#adfe20,#bf781a"
def _testActivityAppeared_helper_timeout(self, user_data):
self._handle_error("Timeout waiting for activity-appeared signal", user_data)
return False
def _testActivityAppeared_helper_cb(self, ps, activity, user_data):
user_data["activity"] = activity
user_data["success"] = True
gtk.main_quit()
def _testActivityAppeared_helper(self, user_data):
ps = presenceservice.get_instance(False)
ps.connect('activity-appeared', self._testActivityAppeared_helper_cb, user_data)
# Wait 5 seconds max for signal to be emitted
gobject.timeout_add(5000, self._testActivityAppeared_helper_timeout, user_data)
busobj = dbus.SessionBus().get_object(mockps._PRESENCE_SERVICE,
mockps._PRESENCE_PATH)
try:
testps = dbus.Interface(busobj, mockps._PRESENCE_TEST_INTERFACE)
except dbus.exceptions.DBusException, err:
self._handle_error(err, user_data)
return False
try:
testps.AddActivity(self._AA_ID, self._AA_NAME, self._AA_COLOR, self._AA_TYPE)
except dbus.exceptions.DBusException, err:
self._handle_error(err, user_data)
return False
return False
def testActivityAppeared(self):
ps = presenceservice.get_instance(False)
assert ps, "Couldn't get presence service"
user_data = {"success": False, "err": "", "activity": None}
gobject.idle_add(self._testActivityAppeared_helper, user_data)
gtk.main()
assert user_data["success"] == True, user_data["err"]
assert user_data["activity"], "Activity was not received"
act = user_data["activity"]
assert act.props.id == self._AA_ID, "ID doesn't match expected"
assert act.props.name == self._AA_NAME, "Name doesn't match expected"
assert act.props.color == self._AA_COLOR, "Color doesn't match expected"
assert act.props.type == self._AA_TYPE, "Type doesn't match expected"
assert act.props.joined == False, "Joined doesn't match expected"
# Try to get activity by activity ID
act2 = ps.get_activity(self._AA_ID)
assert act2.props.id == self._AA_ID, "ID doesn't match expected"
assert act2.props.name == self._AA_NAME, "Name doesn't match expected"
assert act2.props.color == self._AA_COLOR, "Color doesn't match expected"
assert act2.props.type == self._AA_TYPE, "Type doesn't match expected"
assert act2.props.joined == False, "Joined doesn't match expected"
def _testActivityDisappeared_helper_timeout(self, user_data):
self._handle_error("Timeout waiting for activity-disappeared signal", user_data)
return False
def _testActivityDisappeared_helper_cb(self, ps, activity, user_data):
user_data["activity"] = activity
user_data["success"] = True
gtk.main_quit()
def _testActivityDisappeared_helper(self, user_data):
busobj = dbus.SessionBus().get_object(mockps._PRESENCE_SERVICE,
mockps._PRESENCE_PATH)
try:
testps = dbus.Interface(busobj, mockps._PRESENCE_TEST_INTERFACE)
except dbus.exceptions.DBusException, err:
self._handle_error(err, user_data)
return False
# Add a fake activity
try:
testps.AddActivity(self._AA_ID, self._AA_NAME, self._AA_COLOR, self._AA_TYPE)
except dbus.exceptions.DBusException, err:
self._handle_error(err, user_data)
return False
ps = presenceservice.get_instance(False)
ps.connect('activity-disappeared', self._testActivityDisappeared_helper_cb, user_data)
# Wait 5 seconds max for signal to be emitted
gobject.timeout_add(5000, self._testActivityDisappeared_helper_timeout, user_data)
# Delete the fake activity
try:
testps.RemoveActivity(self._AA_ID)
except dbus.exceptions.DBusException, err:
self._handle_error(err, user_data)
return False
return False
def testActivityDisappeared(self):
ps = presenceservice.get_instance(False)
assert ps, "Couldn't get presence service"
user_data = {"success": False, "err": "", "activity": None}
gobject.idle_add(self._testActivityDisappeared_helper, user_data)
gtk.main()
assert user_data["success"] == True, user_data["err"]
assert user_data["activity"], "Activity was not received"
act = user_data["activity"]
assert act.props.id == self._AA_ID, "ID doesn't match expected"
assert act.props.name == self._AA_NAME, "Name doesn't match expected"
assert act.props.color == self._AA_COLOR, "Color doesn't match expected"
assert act.props.type == self._AA_TYPE, "Type doesn't match expected"
assert act.props.joined == False, "Joined doesn't match expected"
def addToSuite(suite):
suite.addTest(ActivityTests("testActivityAppeared"))
suite.addTest(ActivityTests("testActivityDisappeared"))
addToSuite = staticmethod(addToSuite)
def main():
suite = unittest.TestSuite()
BuddyTests.addToSuite(suite)
ActivityTests.addToSuite(suite)
runner = unittest.TextTestRunner()
runner.run(suite)