Add minimal current activity test

master
Dan Williams 17 years ago
parent f18df46c62
commit 6cc3e7acdb

@ -561,11 +561,124 @@ class ActivityTests(GenericTestCase):
assert self._buddy_joined_activity == self._activity, "Activity mismatch"
assert self._buddy_joined_buddy == owner, "Owner mismatch"
def _testCurrentActivity_helper_timeout(self):
self._handle_error("Timeout waiting for current activity")
return False
def _testCurrentActivity_set_current_activity(self, actid):
busobj = dbus.SessionBus().get_object(mockps._PRESENCE_SERVICE,
mockps._PRESENCE_PATH)
try:
testps = dbus.Interface(busobj, mockps._PRESENCE_TEST_INTERFACE)
testps.SetBuddyCurrentActivity(self._buddy.props.key, actid)
except dbus.exceptions.DBusException, err:
self._handle_error(err)
return
def _testCurrentActivity_buddy_property_changed_cb(self, buddy, proplist):
if not self._start_monitor:
return
if not 'current-activity' in proplist:
return
buddy_curact = buddy.props.current_activity
if buddy_curact.props.id == self._AA_ID:
self._got_first_curact = True
# set next current activity
self._testCurrentActivity_set_current_activity(self._other_actid)
elif buddy_curact.props.id == self._other_actid:
self._got_other_curact = True
if self._got_first_curact and self._got_other_curact:
self._handle_success()
def _testCurrentActivity_start_monitor_helper(self):
if len(self._activities) != 2 or not self._buddy:
return
self._start_monitor = True
# Set first current activity
self._testCurrentActivity_set_current_activity(self._AA_ID)
def _testCurrentActivity_activity_helper_cb(self, ps, activity):
if activity in self._activities:
self._handle_error("Activity %s already known." % activity.props.id)
self._activities.append(activity)
self._testCurrentActivity_start_monitor_helper()
def _testCurrentActivity_buddy_helper_cb(self, ps, buddy):
self._buddy = buddy
sid = buddy.connect("property-changed", self._testCurrentActivity_buddy_property_changed_cb)
self._signals.append((buddy, sid))
self._testCurrentActivity_start_monitor_helper()
def _testCurrentActivity_helper(self):
busobj = dbus.SessionBus().get_object(mockps._PRESENCE_SERVICE,
mockps._PRESENCE_PATH)
try:
testps = dbus.Interface(busobj, mockps._PRESENCE_TEST_INTERFACE)
except dbus.exceptions.DBusException, err:
self._handle_error(err)
return False
ps = get_ps()
sid = ps.connect('activity-appeared', self._testCurrentActivity_activity_helper_cb)
self._signals.append((ps, sid))
sid = ps.connect('buddy-appeared', self._testCurrentActivity_buddy_helper_cb)
self._signals.append((ps, sid))
# Add a fake buddy
try:
testps.AddBuddy(BuddyTests._BA_PUBKEY, BuddyTests._BA_NICK, BuddyTests._BA_COLOR)
except dbus.exceptions.DBusException, err:
self._handle_error(err)
return False
# Add first fake activity
try:
testps.AddActivity(self._AA_ID, self._AA_NAME, self._AA_COLOR, self._AA_TYPE, {})
testps.AddBuddyToActivity(BuddyTests._BA_PUBKEY, self._AA_ID)
except dbus.exceptions.DBusException, err:
self._handle_error(err)
return False
# Add second fake activity
try:
testps.AddActivity(self._other_actid, self._other_actname,
self._other_actcolor, self._AA_TYPE, {})
testps.AddBuddyToActivity(BuddyTests._BA_PUBKEY, self._other_actid)
except dbus.exceptions.DBusException, err:
self._handle_error(err)
return False
# Wait 10 seconds max for everything to complete
sid = gobject.timeout_add(10000, self._testCurrentActivity_helper_timeout)
self._sources.append(sid)
return False
def testCurrentActivity(self):
ps = get_ps()
assert ps, "Couldn't get presence service"
self._other_actid = "ea8a94522c53a6741e141adece1711e4d9884678"
self._other_actname = "Some random activity"
self._other_actcolor = "#073838,#3A6E3A"
self._activities = []
self._got_first_curact = False
self._got_other_curact = False
self._start_monitor = False
gobject.idle_add(self._testCurrentActivity_helper)
gtk.main()
assert self._success == True, "Test unsuccessful"
assert len(self._activities) == 2, "Shared activities were not received"
# FIXME: check everything
def addToSuite(suite):
suite.addTest(ActivityTests("testActivityAppeared"))
suite.addTest(ActivityTests("testActivityDisappeared"))
suite.addTest(ActivityTests("testActivityShare"))
suite.addTest(ActivityTests("testActivityJoin"))
suite.addTest(ActivityTests("testCurrentActivity"))
addToSuite = staticmethod(addToSuite)
def main():

Loading…
Cancel
Save