sugar-toolkit-gtk3/services/nm/nmclient.py
Dan Williams 88ddaab1c7 Add (slightly) sugarized NetworkManager applet
An implementation of the NetworkManagerInfo service (for storing local
networking config) and GUI bits for discovering network status and
controlling networking, written in Python.
2006-10-26 13:04:39 -04:00

350 lines
9.6 KiB
Python

# vi: ts=4 ai noet
#
# Copyright (C) 2006, Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import dbus
import dbus.glib
import dbus.decorators
import gobject
import gtk
import nminfo
NM_STATE_STRINGS=("Unknown",
"Asleep",
"Connecting",
"Connected",
"Disconnected"
)
NM_DEVICE_STAGE_STRINGS=("Unknown",
"Prepare",
"Config",
"Need Users Key",
"IP Config",
"IP Config Get",
"IP Config Commit",
"Activated",
"Failed",
"Cancled"
)
NM_SERVICE = 'org.freedesktop.NetworkManager'
NM_IFACE = 'org.freedesktop.NetworkManager'
NM_IFACE_DEVICES = 'org.freedesktop.NetworkManager.Devices'
NM_PATH = '/org/freedesktop/NetworkManager'
DEVICE_TYPE_UNKNOWN = 0
DEVICE_TYPE_802_3_ETHERNET = 1
DEVICE_TYPE_802_11_WIRELESS = 2
sys_bus = dbus.SystemBus()
class Network(gobject.GObject):
__gsignals__ = {
'init-failed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, ([]))
}
def __init__(self, op):
gobject.GObject.__init__(self)
self._op = op
self._ssid = None
self._mode = None
self._strength = 0
obj = sys_bus.get_object(NM_SERVICE, self._op)
net = dbus.Interface(obj, NM_IFACE_DEVICES)
net.getProperties(reply_handler=self._update_reply_cb,
error_handler=self._update_error_cb)
def _update_reply_cb(self, *props):
self._ssid = props[1]
self._strength = props[3]
self._mode = props[6]
print "Net(%s): ssid '%s', mode %d, strength %d" % (self._op, self._ssid, self._mode, self._strength)
def _update_error_cb(self, err):
print "Net(%s): failed to update." % self._op
self.emit('init-failed')
def get_ssid(self):
return self._ssid
def get_op(self):
return self._op
def get_strength(self):
return self._strength
def set_strength(self, strength):
self._strength = strength
class Device(gobject.GObject):
__gsignals__ = {
'init-failed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, ([]))
}
def __init__(self, op):
gobject.GObject.__init__(self)
self._op = op
self._iface = None
self._type = 0
self._udi = None
self._active = False
self._strength = 0
self._link = False
self._networks = {}
obj = sys_bus.get_object(NM_SERVICE, self._op)
dev = dbus.Interface(obj, NM_IFACE_DEVICES)
dev.getProperties(reply_handler=self._update_reply_cb,
error_handler=self._update_error_cb)
def _update_reply_cb(self, *props):
self._iface = props[1]
self._type = props[2]
self._udi = props[3]
self._active = props[4]
self._link = props[15]
if self._type == DEVICE_TYPE_802_11_WIRELESS:
self._strength = props[14]
self._update_networks(props[20], props[19])
def _update_networks(self, net_ops, active_op):
for op in net_ops:
net = Network(op)
self._networks[op] = net
net.connect('init-failed', self._net_init_failed)
if op == active_op:
self._active_net = net
def _update_error_cb(self, err):
print "Device(%s): failed to update from dbus." % self._op
self.emit('init-failed')
def _net_init_failed(self, net):
net_op = net.get_op()
if not self._networks.has_key(net_op):
return
net = self._networks[net_op]
if net == self._active_net:
self._active_net = None
del self._networks[net_op]
def get_op(self):
return self._op
def get_network(self, op):
if self._networks.has_key(op):
return self._networks[op]
return None
def get_network_ops(self):
return self._networks.keys()
def get_strength(self):
return self._strength
def set_strength(self, strength):
if strength >= 0 and strength <= 100:
self._strength = strength
else:
self._strength = 0
def network_appeared(self, network):
if self._networks.has_key(network):
return
net = Network(network)
self._networks[network] = net
net.connect('init-failed', self._net_init_failed)
def network_disappeared(self, network):
if not self._networks.has_key(network):
return
del self._networks[network]
class NMClientApp:
def __init__(self):
self.menu = None
self.nminfo = None
try:
self.nminfo = nminfo.NMInfo()
except RuntimeError:
pass
self._setup_dbus()
self._devices = {}
self._update_devices()
self._setup_trayicon()
def _setup_trayicon(self):
self.trayicon = gtk.status_icon_new_from_file("/home/dcbw/Development/olpc/nm-python-client/icons/nm-no-connection.png")
self.trayicon.connect("popup_menu", self._popup)
self.trayicon.connect("activate", self._popup)
def _popup(self, status, button=0, time=None):
def menu_pos(menu):
return gtk.status_icon_position_menu(menu, self.trayicon)
if time is None:
time = gtk.get_current_event_time()
if self.menu:
del self.menu
self.menu = self._construct_new_menu()
self.menu.popup(None, None, menu_pos, button, time)
self.menu.show_all()
def _construct_new_menu(self):
menu = gtk.Menu()
item = gtk.CheckMenuItem()
label = gtk.Label("foobar")
label.set_alignment(0.0, 0.5)
item.add(label)
label.show()
menu.add(item)
return menu
def _update_devices_reply_cb(self, ops):
for op in ops:
dev = Device(op)
self._devices[op] = dev
dev.connect('init-failed', self._dev_init_failed_cb)
def _dev_init_failed_cb(self, dev):
# Device failed to initialize, likely due to dbus errors or something
op = dev.get_op()
if self._devices.has_key(op):
del self._devices[op]
def _update_devices_error_cb(self, err):
print "Error updating devices; %s" % err
def _update_devices(self):
for dev_name in self._devices.keys():
del self._devices[dev_name]
self._devices = {}
nm_obj = sys_bus.get_object(NM_SERVICE, NM_PATH)
nm = dbus.Interface(nm_obj, NM_IFACE)
nm.getDevices(reply_handler=self._update_devices_reply_cb, \
error_handler=self._update_devices_error_cb)
def _setup_dbus(self):
sig_handlers = {
'DeviceActivationStage': self.device_activation_stage_sig_handler,
'StateChange': self.state_change_sig_handler,
'DeviceActivating': self.device_activating_sig_handler,
'DeviceNowActive': self.device_now_active_sig_handler,
'WirelessNetworkAppeared': self.wireless_network_appeared_sig_handler,
'WirelessNetworkDisappeared': self.wireless_network_disappeared_sig_handler,
'DeviceStrengthChanged': self.wireless_device_strength_changed_sig_handler,
'WirelessNetworkStrengthChanged': self.wireless_network_strength_changed_sig_handler
}
self.nm_proxy = sys_bus.get_object(NM_SERVICE, NM_PATH)
sys_bus.add_signal_receiver(self.name_owner_changed_sig_handler,
signal_name="NameOwnerChanged",
dbus_interface="org.freedesktop.DBus")
sys_bus.add_signal_receiver(self.catchall_signal_handler,
dbus_interface=NM_IFACE)
sys_bus.add_signal_receiver(self.catchall_signal_handler,
dbus_interface=NM_IFACE + 'Devices')
for (signal, handler) in sig_handlers.items():
sys_bus.add_signal_receiver(handler, signal_name=signal, dbus_interface=NM_IFACE)
@dbus.decorators.explicitly_pass_message
def catchall_signal_handler(*args, **keywords):
dbus_message = keywords['dbus_message']
mem = dbus_message.get_member()
iface = dbus_message.get_interface()
if iface == NM_IFACE and \
(mem == 'DeviceActivationStage' or \
mem == 'StateChange' or \
mem == 'DeviceActivating' or \
mem == 'DeviceNowActive' or \
mem == 'DeviceStrengthChanged' or \
mem == 'WirelessNetworkAppeared' or \
mem == 'WirelessNetworkDisappeared' or \
mem == 'WirelessNetworkStrengthChanged'):
return
print 'Caught signal %s.%s' % (dbus_message.get_interface(), mem)
for arg in args:
print ' ' + str(arg)
def device_activation_stage_sig_handler(self, device, stage):
print 'Network Manager Device Stage "%s" for device %s'%(NM_DEVICE_STAGE_STRINGS[stage], device)
def state_change_sig_handler(self, state):
print 'Network Manager State "%s"'%NM_STATE_STRINGS[state]
def device_activating_sig_handler(self, device):
print 'Device %s activating'%device
def device_now_active_sig_handler(self, device, essid=None):
print 'Device %s now activated (%s)'%(device, essid)
def name_owner_changed_sig_handler(self, name, old, new):
if name != NM_SERVICE:
return
if (old and len(old)) and (not new and not len(new)):
# NM went away
pass
elif (not old and not len(old)) and (new and len(new)):
# NM started up
self._update_devices()
def wireless_network_appeared_sig_handler(self, device, network):
if not self._devices.has_key(device):
return
self._devices[device].network_appeared(network)
def wireless_network_disappeared_sig_handler(self, device, network):
if not self._devices.has_key(device):
return
self._devices[device].network_disappeared(network)
def wireless_device_strength_changed_sig_handler(self, device, strength):
if not self._devices.has_key(device):
return
self._devices[device].set_strength(strength)
def wireless_network_strength_changed_sig_handler(self, device, network, strength):
if not self._devices.has_key(device):
return
net = self._devices[device].get_network(network)
if net:
net.set_strength(strength)
def run(self):
loop = gobject.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
pass