Asynchronize activity join/share in the PS

This commit is contained in:
Dan Williams 2007-04-13 13:11:59 -04:00
parent 7b40f9bf60
commit 31000f6c3e
3 changed files with 116 additions and 43 deletions

View File

@ -65,7 +65,7 @@ class Activity(DBusGObject):
# the telepathy client # the telepathy client
self._tp = tp self._tp = tp
self._activity_text_channel = None self._text_channel = None
self._valid = False self._valid = False
self._id = None self._id = None
@ -161,10 +161,10 @@ class Activity(DBusGObject):
def GetType(self): def GetType(self):
return self.props.type return self.props.type
@dbus.service.method(_ACTIVITY_INTERFACE, @dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="",
in_signature="", out_signature="") async_callbacks=('async_cb', 'async_err_cb'))
def Join(self): def Join(self, async_cb, async_err_cb):
self.join() self.join(async_cb, async_err_cb)
@dbus.service.method(_ACTIVITY_INTERFACE, @dbus.service.method(_ACTIVITY_INTERFACE,
in_signature="", out_signature="ao") in_signature="", out_signature="ao")
@ -208,24 +208,72 @@ class Activity(DBusGObject):
if self.props.valid: if self.props.valid:
self.BuddyLeft(buddy.object_path()) self.BuddyLeft(buddy.object_path())
def join(self): def _handle_share_join(self, tp, text_channel):
if not self._joined: if not text_channel:
self._activity_text_channel = self._tp.join_activity(self.props.id) logging.debug("Error sharing: text channel was None, shouldn't happen")
self._activity_text_channel[CHANNEL_INTERFACE].connect_to_signal('Closed', self._activity_text_channel_closed_cb) raise RuntimeError("Plugin returned invalid text channel")
self._joined = True
self._text_channel = text_channel
self._text_channel[CHANNEL_INTERFACE].connect_to_signal('Closed',
self._text_channel_closed_cb)
self._joined = True
return True
def _shared_cb(self, tp, activity_id, text_channel, exc, userdata):
if activity_id != self.props.id:
# Not for us
return
(sigid, async_cb, async_err_cb) = userdata
self._tp.disconnect(sigid)
if exc:
async_err_cb(exc)
else:
self._handle_share_join(tp, text_channel)
self.send_properties()
async_cb(dbus.ObjectPath(self._object_path))
def _share(self, (async_cb, async_err_cb)):
if self._joined:
async_err_cb(RuntimeError("Already shared activity %s" % self.props.id))
return
sigid = self._tp.connect('activity-shared', self._shared_cb)
self._tp.share_activity(self.props.id, (sigid, async_cb, async_err_cb))
def _joined_cb(self, tp, activity_id, text_channel, exc, userdata):
if activity_id != self.props.id:
# Not for us
return
(sigid, async_cb, async_err_cb) = userdata
self._tp.disconnect(sigid)
if exc:
async_err_cb(exc)
else:
self._handle_share_join(tp, text_channel)
async_cb()
def join(self, async_cb, async_err_cb):
if self._joined:
async_err_cb(RuntimeError("Already joined activity %s" % self.props.id))
return
sigid = self._tp.connect('activity-joined', self._joined_cb)
self._tp.join_activity(self.props.id, (sigid, async_cb, async_err_cb))
def get_channels(self): def get_channels(self):
conn = self._tp.get_connection() conn = self._tp.get_connection()
# FIXME add tubes and others channels # FIXME add tubes and others channels
return str(conn.service_name), conn.object_path, [self._activity_text_channel.object_path] return str(conn.service_name), conn.object_path, [self._text_channel.object_path]
def leave(self): def leave(self):
if self._joined: if self._joined:
self._activity_text_channel[CHANNEL_INTERFACE].Close() self._text_channel[CHANNEL_INTERFACE].Close()
def _activity_text_channel_closed_cb(self): def _text_channel_closed_cb(self):
self._joined = False self._joined = False
self._activity_text_channel = None self._text_channel = None
def send_properties(self): def send_properties(self):
props = {} props = {}

View File

@ -84,6 +84,13 @@ class PresenceService(dbus.service.Object):
dbus.service.Object.__init__(self, self._bus_name, _PRESENCE_PATH) dbus.service.Object.__init__(self, self._bus_name, _PRESENCE_PATH)
def _activity_shared_cb(self, tp, activity, success, exc, async_cb, async_err_cb):
if success:
async_cb(activity.object_path())
else:
del self._activities[activity.props.id]
async_err_cb(exc)
def _server_status_cb(self, plugin, status, reason): def _server_status_cb(self, plugin, status, reason):
if status == CONNECTION_STATUS_CONNECTED: if status == CONNECTION_STATUS_CONNECTED:
pass pass
@ -290,32 +297,24 @@ class PresenceService(dbus.service.Object):
else: else:
return self._owner.get_object_path() return self._owner.get_object_path()
@dbus.service.method(_PRESENCE_INTERFACE, in_signature="sssa{sv}", out_signature="o") @dbus.service.method(_PRESENCE_INTERFACE, in_signature="sssa{sv}",
def ShareActivity(self, actid, atype, name, properties): out_signature="o", async_callbacks=('async_cb', 'async_err_cb'))
activity = self._share_activity(actid, atype, name, properties) def ShareActivity(self, actid, atype, name, properties, async_cb, async_err_cb):
return activity.object_path() self._share_activity(actid, atype, name, properties, (async_cb, async_err_cb))
def cleanup(self): def cleanup(self):
for tp in self._handles_buddies: for tp in self._handles_buddies:
tp.cleanup() tp.cleanup()
def _share_activity(self, actid, atype, name, properties): def _share_activity(self, actid, atype, name, properties, callbacks):
objid = self._get_next_object_id() objid = self._get_next_object_id()
# FIXME check which tp client we should use to share the activity # FIXME check which tp client we should use to share the activity
import time
start = time.time()
logging.debug("Start share of %s (%s)" % (actid, atype))
color = self._owner.props.color color = self._owner.props.color
activity = Activity(self._bus_name, objid, self._server_plugin, activity = Activity(self._bus_name, objid, self._server_plugin,
id=actid, type=atype, name=name, color=color, local=True) id=actid, type=atype, name=name, color=color, local=True)
activity.connect("validity-changed", self._activity_validity_changed_cb) activity.connect("validity-changed", self._activity_validity_changed_cb)
self._activities[actid] = activity self._activities[actid] = activity
activity._share(callbacks)
activity.join()
activity.send_properties()
logging.debug("End share of %s (%s). Time: %f" % (actid, atype, (float)(time.time() - start)))
return activity
def _activity_validity_changed_cb(self, activity, valid): def _activity_validity_changed_cb(self, activity, valid):
if valid: if valid:

View File

@ -95,6 +95,12 @@ class ServerPlugin(gobject.GObject):
([gobject.TYPE_PYOBJECT])), ([gobject.TYPE_PYOBJECT])),
'activity-properties-changed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, 'activity-properties-changed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
([gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT])), ([gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT])),
'activity-shared': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
([gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT,
gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT])),
'activity-joined': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
([gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT,
gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT]))
} }
def __init__(self, registry, owner): def __init__(self, registry, owner):
@ -289,26 +295,46 @@ class ServerPlugin(gobject.GObject):
reply_handler=self._set_self_avatar_cb, reply_handler=self._set_self_avatar_cb,
error_handler=lambda *args: self._log_error_cb("avatar", *args)) error_handler=lambda *args: self._log_error_cb("avatar", *args))
def join_activity(self, act): def _join_activity_create_channel_cb(self, activity_id, signal, handle, userdata, chan_path):
handle = self._activities.get(act) channel = Channel(self._conn._dbus_object._named_service, chan_path)
self._joined_activities.append((activity_id, handle))
self._set_self_activities()
self.emit(signal, activity_id, channel, None, userdata)
if not handle: def _join_activity_get_channel_cb(self, activity_id, signal, userdata, handles):
handle = self._conn[CONN_INTERFACE].RequestHandles(CONNECTION_HANDLE_TYPE_ROOM, [act])[0] if not self._activities.has_key(activity_id):
self._activities[act] = handle self._activities[activity_id] = handles[0]
if (act, handle) in self._joined_activities: if (activity_id, handles[0]) in self._joined_activities:
logging.debug("Already joined %s" % act) e = RuntimeError("Already joined activity %s" % activity_id)
logging.debug(str(e))
self.emit(signal, activity_id, None, e, userdata)
return return
chan_path = self._conn[CONN_INTERFACE].RequestChannel( self._conn[CONN_INTERFACE].RequestChannel(CHANNEL_TYPE_TEXT,
CHANNEL_TYPE_TEXT, CONNECTION_HANDLE_TYPE_ROOM, CONNECTION_HANDLE_TYPE_ROOM, handles[0], True,
handle, True) reply_handler=lambda *args: self._join_activity_create_channel_cb(activity_id, signal, handle, userdata, *args),
channel = Channel(self._conn._dbus_object._named_service, chan_path) error_handler=lambda *args: self._join_error_cb(activity_id, signal, userdata, *args))
self._joined_activities.append((act, handle)) def _join_error_cb(self, activity_id, signal, userdata, err):
self._conn[CONN_INTERFACE_BUDDY_INFO].SetActivities(self._joined_activities) e = Exception("Error joining/sharing activity %s: %s" % (activity_id, err))
logging.debug(str(e))
self.emit(signal, activity_id, None, e, userdata)
return channel def _internal_join_activity(self, activity_id, signal, userdata):
handle = self._activities.get(activity_id)
if not handle:
self._conn[CONN_INTERFACE].RequestHandles(CONNECTION_HANDLE_TYPE_ROOM, [activity_id],
reply_handler=lambda *args: self._join_activity_get_channel_cb(activity_id, signal, userdata, *args),
error_handler=lambda *args: self._join_error_cb(activity_id, signal, userdata, *args))
else:
self._join_activity_get_channel_cb(activity_id, userdata, [handle])
def share_activity(self, activity_id, userdata):
self._internal_join_activity(activity_id, "activity-shared", userdata)
def join_activity(self, activity_id, userdata):
self._internal_join_activity(activity_id, "activity-joined", userdata)
def _ignore_success_cb(self): def _ignore_success_cb(self):
pass pass