Add (slightly) sugarized NetworkManager applet

An implementation of the NetworkManagerInfo service (for storing local
networking config) and GUI bits for discovering network status and
controlling networking, written in Python.
This commit is contained in:
Dan Williams 2006-10-26 13:04:39 -04:00
parent dbd6fcaf3e
commit 88ddaab1c7
8 changed files with 769 additions and 1 deletions

View File

@ -63,6 +63,7 @@ lib/python/Makefile
lib/threadframe/Makefile
services/Makefile
services/presence/Makefile
services/nm/Makefile
shell/Makefile
shell/conf/Makefile
shell/data/Makefile

View File

@ -1 +1 @@
SUBDIRS = presence
SUBDIRS = presence nm

12
services/nm/Makefile.am Normal file
View File

@ -0,0 +1,12 @@
sugardir = $(pkgdatadir)/services/nm
sugar_PYTHON = \
__init__.py \
nmclient.py \
nminfo.py
bin_SCRIPTS = sugar-nm-applet
dbusservicedir = $(sysconfdir)
dbusservice_DATA = NetworkManagerInfo.conf
EXTRA_DIST = $(dbusservice_DATA) $(bin_SCRIPTS)

View File

@ -0,0 +1,26 @@
<!DOCTYPE busconfig PUBLIC
"-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN"
"http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
<busconfig>
<policy user="root">
<allow own="org.freedesktop.NetworkManagerInfo"/>
<allow send_destination="org.freedesktop.NetworkManagerInfo"/>
<allow send_interface="org.freedesktop.NetworkManagerInfo"/>
</policy>
<policy at_console="true">
<allow own="org.freedesktop.NetworkManagerInfo"/>
<allow send_destination="org.freedesktop.NetworkManagerInfo"/>
<allow send_interface="org.freedesktop.NetworkManagerInfo"/>
</policy>
<policy context="default">
<deny own="org.freedesktop.NetworkManagerInfo"/>
<deny send_destination="org.freedesktop.NetworkManagerInfo"/>
<deny send_interface="org.freedesktop.NetworkManagerInfo"/>
</policy>
<limit name="max_replies_per_connection">512</limit>
</busconfig>

0
services/nm/__init__.py Normal file
View File

349
services/nm/nmclient.py Normal file
View File

@ -0,0 +1,349 @@
# vi: ts=4 ai noet
#
# Copyright (C) 2006, Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import dbus
import dbus.glib
import dbus.decorators
import gobject
import gtk
import nminfo
NM_STATE_STRINGS=("Unknown",
"Asleep",
"Connecting",
"Connected",
"Disconnected"
)
NM_DEVICE_STAGE_STRINGS=("Unknown",
"Prepare",
"Config",
"Need Users Key",
"IP Config",
"IP Config Get",
"IP Config Commit",
"Activated",
"Failed",
"Cancled"
)
NM_SERVICE = 'org.freedesktop.NetworkManager'
NM_IFACE = 'org.freedesktop.NetworkManager'
NM_IFACE_DEVICES = 'org.freedesktop.NetworkManager.Devices'
NM_PATH = '/org/freedesktop/NetworkManager'
DEVICE_TYPE_UNKNOWN = 0
DEVICE_TYPE_802_3_ETHERNET = 1
DEVICE_TYPE_802_11_WIRELESS = 2
sys_bus = dbus.SystemBus()
class Network(gobject.GObject):
__gsignals__ = {
'init-failed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, ([]))
}
def __init__(self, op):
gobject.GObject.__init__(self)
self._op = op
self._ssid = None
self._mode = None
self._strength = 0
obj = sys_bus.get_object(NM_SERVICE, self._op)
net = dbus.Interface(obj, NM_IFACE_DEVICES)
net.getProperties(reply_handler=self._update_reply_cb,
error_handler=self._update_error_cb)
def _update_reply_cb(self, *props):
self._ssid = props[1]
self._strength = props[3]
self._mode = props[6]
print "Net(%s): ssid '%s', mode %d, strength %d" % (self._op, self._ssid, self._mode, self._strength)
def _update_error_cb(self, err):
print "Net(%s): failed to update." % self._op
self.emit('init-failed')
def get_ssid(self):
return self._ssid
def get_op(self):
return self._op
def get_strength(self):
return self._strength
def set_strength(self, strength):
self._strength = strength
class Device(gobject.GObject):
__gsignals__ = {
'init-failed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, ([]))
}
def __init__(self, op):
gobject.GObject.__init__(self)
self._op = op
self._iface = None
self._type = 0
self._udi = None
self._active = False
self._strength = 0
self._link = False
self._networks = {}
obj = sys_bus.get_object(NM_SERVICE, self._op)
dev = dbus.Interface(obj, NM_IFACE_DEVICES)
dev.getProperties(reply_handler=self._update_reply_cb,
error_handler=self._update_error_cb)
def _update_reply_cb(self, *props):
self._iface = props[1]
self._type = props[2]
self._udi = props[3]
self._active = props[4]
self._link = props[15]
if self._type == DEVICE_TYPE_802_11_WIRELESS:
self._strength = props[14]
self._update_networks(props[20], props[19])
def _update_networks(self, net_ops, active_op):
for op in net_ops:
net = Network(op)
self._networks[op] = net
net.connect('init-failed', self._net_init_failed)
if op == active_op:
self._active_net = net
def _update_error_cb(self, err):
print "Device(%s): failed to update from dbus." % self._op
self.emit('init-failed')
def _net_init_failed(self, net):
net_op = net.get_op()
if not self._networks.has_key(net_op):
return
net = self._networks[net_op]
if net == self._active_net:
self._active_net = None
del self._networks[net_op]
def get_op(self):
return self._op
def get_network(self, op):
if self._networks.has_key(op):
return self._networks[op]
return None
def get_network_ops(self):
return self._networks.keys()
def get_strength(self):
return self._strength
def set_strength(self, strength):
if strength >= 0 and strength <= 100:
self._strength = strength
else:
self._strength = 0
def network_appeared(self, network):
if self._networks.has_key(network):
return
net = Network(network)
self._networks[network] = net
net.connect('init-failed', self._net_init_failed)
def network_disappeared(self, network):
if not self._networks.has_key(network):
return
del self._networks[network]
class NMClientApp:
def __init__(self):
self.menu = None
self.nminfo = None
try:
self.nminfo = nminfo.NMInfo()
except RuntimeError:
pass
self._setup_dbus()
self._devices = {}
self._update_devices()
self._setup_trayicon()
def _setup_trayicon(self):
self.trayicon = gtk.status_icon_new_from_file("/home/dcbw/Development/olpc/nm-python-client/icons/nm-no-connection.png")
self.trayicon.connect("popup_menu", self._popup)
self.trayicon.connect("activate", self._popup)
def _popup(self, status, button=0, time=None):
def menu_pos(menu):
return gtk.status_icon_position_menu(menu, self.trayicon)
if time is None:
time = gtk.get_current_event_time()
if self.menu:
del self.menu
self.menu = self._construct_new_menu()
self.menu.popup(None, None, menu_pos, button, time)
self.menu.show_all()
def _construct_new_menu(self):
menu = gtk.Menu()
item = gtk.CheckMenuItem()
label = gtk.Label("foobar")
label.set_alignment(0.0, 0.5)
item.add(label)
label.show()
menu.add(item)
return menu
def _update_devices_reply_cb(self, ops):
for op in ops:
dev = Device(op)
self._devices[op] = dev
dev.connect('init-failed', self._dev_init_failed_cb)
def _dev_init_failed_cb(self, dev):
# Device failed to initialize, likely due to dbus errors or something
op = dev.get_op()
if self._devices.has_key(op):
del self._devices[op]
def _update_devices_error_cb(self, err):
print "Error updating devices; %s" % err
def _update_devices(self):
for dev_name in self._devices.keys():
del self._devices[dev_name]
self._devices = {}
nm_obj = sys_bus.get_object(NM_SERVICE, NM_PATH)
nm = dbus.Interface(nm_obj, NM_IFACE)
nm.getDevices(reply_handler=self._update_devices_reply_cb, \
error_handler=self._update_devices_error_cb)
def _setup_dbus(self):
sig_handlers = {
'DeviceActivationStage': self.device_activation_stage_sig_handler,
'StateChange': self.state_change_sig_handler,
'DeviceActivating': self.device_activating_sig_handler,
'DeviceNowActive': self.device_now_active_sig_handler,
'WirelessNetworkAppeared': self.wireless_network_appeared_sig_handler,
'WirelessNetworkDisappeared': self.wireless_network_disappeared_sig_handler,
'DeviceStrengthChanged': self.wireless_device_strength_changed_sig_handler,
'WirelessNetworkStrengthChanged': self.wireless_network_strength_changed_sig_handler
}
self.nm_proxy = sys_bus.get_object(NM_SERVICE, NM_PATH)
sys_bus.add_signal_receiver(self.name_owner_changed_sig_handler,
signal_name="NameOwnerChanged",
dbus_interface="org.freedesktop.DBus")
sys_bus.add_signal_receiver(self.catchall_signal_handler,
dbus_interface=NM_IFACE)
sys_bus.add_signal_receiver(self.catchall_signal_handler,
dbus_interface=NM_IFACE + 'Devices')
for (signal, handler) in sig_handlers.items():
sys_bus.add_signal_receiver(handler, signal_name=signal, dbus_interface=NM_IFACE)
@dbus.decorators.explicitly_pass_message
def catchall_signal_handler(*args, **keywords):
dbus_message = keywords['dbus_message']
mem = dbus_message.get_member()
iface = dbus_message.get_interface()
if iface == NM_IFACE and \
(mem == 'DeviceActivationStage' or \
mem == 'StateChange' or \
mem == 'DeviceActivating' or \
mem == 'DeviceNowActive' or \
mem == 'DeviceStrengthChanged' or \
mem == 'WirelessNetworkAppeared' or \
mem == 'WirelessNetworkDisappeared' or \
mem == 'WirelessNetworkStrengthChanged'):
return
print 'Caught signal %s.%s' % (dbus_message.get_interface(), mem)
for arg in args:
print ' ' + str(arg)
def device_activation_stage_sig_handler(self, device, stage):
print 'Network Manager Device Stage "%s" for device %s'%(NM_DEVICE_STAGE_STRINGS[stage], device)
def state_change_sig_handler(self, state):
print 'Network Manager State "%s"'%NM_STATE_STRINGS[state]
def device_activating_sig_handler(self, device):
print 'Device %s activating'%device
def device_now_active_sig_handler(self, device, essid=None):
print 'Device %s now activated (%s)'%(device, essid)
def name_owner_changed_sig_handler(self, name, old, new):
if name != NM_SERVICE:
return
if (old and len(old)) and (not new and not len(new)):
# NM went away
pass
elif (not old and not len(old)) and (new and len(new)):
# NM started up
self._update_devices()
def wireless_network_appeared_sig_handler(self, device, network):
if not self._devices.has_key(device):
return
self._devices[device].network_appeared(network)
def wireless_network_disappeared_sig_handler(self, device, network):
if not self._devices.has_key(device):
return
self._devices[device].network_disappeared(network)
def wireless_device_strength_changed_sig_handler(self, device, strength):
if not self._devices.has_key(device):
return
self._devices[device].set_strength(strength)
def wireless_network_strength_changed_sig_handler(self, device, network, strength):
if not self._devices.has_key(device):
return
net = self._devices[device].get_network(network)
if net:
net.set_strength(strength)
def run(self):
loop = gobject.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
pass

346
services/nm/nminfo.py Normal file
View File

@ -0,0 +1,346 @@
# vi: ts=4 ai noet
#
# Copyright (C) 2006, Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import dbus
import dbus.service
import time
import os
import binascii
from ConfigParser import ConfigParser
try:
from sugar import env
except ImportError:
pass
NM_INFO_IFACE='org.freedesktop.NetworkManagerInfo'
NM_INFO_PATH='/org/freedesktop/NetworkManagerInfo'
class NoNetworks(dbus.DBusException):
def __init__(self):
dbus.DBusException.__init__(self)
self._dbus_error_name = NM_INFO_IFACE + '.NoNetworks'
class NetworkInvalidError(Exception):
pass
class NMConfig(ConfigParser):
def get_bool(self, section, name):
opt = self.get(section, name)
if type(opt) == type(""):
if opt.lower() == 'yes' or opt.lower() == 'true':
return True
elif opt.lower() == 'no' or opt.lower() == 'false':
return False
raise ValueError("Invalid format for %s/%s. Should be one of [yes, no, true, false]." % (section, name))
def get_list(self, section, name):
opt = self.get(section, name)
if type(opt) == type(""):
if not len(opt):
return []
try:
return opt.split()
except Exception:
pass
raise ValueError("Invalid format for %s/%s. Should be a space-separate list." % (section, name))
def get_int(self, section, name):
opt = self.get(section, name)
try:
return int(opt)
except Exception:
pass
raise ValueError("Invalid format for %s/%s. Should be a valid integer." % (section, name))
IW_AUTH_CIPHER_NONE = 0x00000001
IW_AUTH_CIPHER_WEP40 = 0x00000002
IW_AUTH_CIPHER_TKIP = 0x00000004
IW_AUTH_CIPHER_CCMP = 0x00000008
IW_AUTH_CIPHER_WEP104 = 0x00000010
IW_AUTH_ALG_OPEN_SYSTEM = 0x00000001
IW_AUTH_ALG_SHARED_KEY = 0x00000002
NETWORK_TYPE_UNKNOWN = 0
NETWORK_TYPE_ALLOWED = 1
NETWORK_TYPE_INVALID = 2
class Security(object):
def __init__(self, we_cipher):
self._we_cipher = we_cipher
def read_from_config(self, cfg, name):
pass
def read_from_args(self, args):
pass
def new_from_config(cfg, name):
security = None
try:
we_cipher = cfg.get_int(name, "we_cipher")
if we_cipher == IW_AUTH_CIPHER_NONE:
security = Security(we_cipher)
elif we_cipher == IW_AUTH_CIPHER_WEP40 or we_cipher == IW_AUTH_CIPHER_WEP104:
security = WEPSecurity(we_cipher)
else:
# FIXME: find a way to make WPA config option matrix not
# make you want to throw up
raise ValueError("Unsupported security combo")
security.read_from_config(cfg, name)
except (NoOptionError, ValueError), e:
return None
return security
new_from_config = staticmethod(new_from_config)
def new_from_args(cfg, we_cipher, args):
security = None
try:
if we_cipher == IW_AUTH_CIPHER_NONE:
security = Security(we_cipher)
elif we_cipher == IW_AUTH_CIPHER_WEP40 or we_cipher == IW_AUTH_CIPHER_WEP104:
security = WEPSecurity(we_cipher)
else:
# FIXME: find a way to make WPA config option matrix not
# make you want to throw up
raise ValueError("Unsupported security combo")
security.read_from_args(args)
except (NoOptionError, ValueError), e:
del security
return None
return security
new_from_args = staticmethod(new_from_args)
def get_properties(self):
return [self._we_cipher]
def write_to_config(self, section, config):
config.set(section, "we_cipher", self._we_cipher)
class WEPSecurity(Security):
def read_from_args(self, *args):
if len(args) != 2:
raise ValueError("not enough arguments")
if not isinstance(args[0], str):
raise ValueError("wrong argument type for key")
if not isinstance(args[1], int):
raise ValueError("wrong argument type for auth_alg")
self._key = args[0]
self._auth_alg = args[1]
def read_from_config(self, cfg, name):
# Key should be a hex encoded string
self._key = cfg.get(name, "key")
if self._we_cipher == IW_AUTH_CIPHER_WEP40 and len(self._key) != 10:
raise ValueError("Key length not right for 40-bit WEP")
if self._we_cipher == IW_AUTH_CIPHER_WEP104 and len(self._key) != 26:
raise ValueError("Key length not right for 104-bit WEP")
try:
a = binascii.a2b_hex(self._key)
except TypeError:
raise ValueError("Key was not a hexadecimal string.")
self._auth_alg = cfg.get_int(name, "auth_alg")
if self._auth_alg != IW_AUTH_ALG_OPEN_SYSTEM and self._auth_alg != IW_AUTH_ALG_SHARED_KEY:
raise ValueError("Invalid authentication algorithm %d" % self._auth_alg)
def get_properties(self):
args = Security.get_properties(self)
args.append(self._key)
args.append(self._auth_alg)
return args
def write_to_config(self, section, config):
Security.write_to_config(self, section, config)
config.set(section, "key", self._key)
config.set(section, "auth_alg", self._auth_alg)
class Network:
def __init__(ssid):
self.ssid = ssid
self.timestamp = time.now()
self.fallback = False
self.bssids = []
self.we_cipher = 0
self._security = None
def get_properties(self):
args = [network.ssid, network.timestamp, network.fallback, network.bssids]
args += self._security.get_properties()
return tuple(args)
def read_from_args(self, auto, fallback, bssid, we_cipher, *args):
if auto == False:
self.timestamp = time.now()
self.fallback = True
if not self.bssids.contains(bssid):
self.bssids.append(bssid)
self._security = Security.new_from_args(we_cipher, args)
if not self._security:
raise NetworkInvalidError(e)
def read_from_config(self, config):
try:
self.timestamp = config.get_int(self.ssid, "timestamp")
self.fallback = config.get_bool(self.ssid, "fallback")
except (NoOptionError, ValueError), e:
raise NetworkInvalidError(e)
self._security = Security.new_from_config(config, self.ssid)
if not self._security:
raise NetworkInvalidError(e)
# The following don't need to be present
try:
self.bssids = config.get_list(self.ssid, "bssids")
except (NoOptionError, ValueError), e:
pass
def write_to_config(self, config):
config.add_section(self.ssid)
config.set(self.ssid, "timestamp", self.timestamp)
config.set(self.ssid, "fallback", self.fallback)
if len(self.bssids) > 0:
opt = ""
opt.join(self.bssids, " ")
config.set(self.ssid, "bssids", opt)
self._security.write_to_config(self.ssid, config)
class NMInfoDBusServiceHelper(dbus.service.Object):
def __init__(self, parent):
self._parent = parent
bus = dbus.SystemBus()
# If NMI is already around, don't grab the NMI service
bus_object = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
name = None
try:
name = bus_object.GetNameOwner("org.freedesktop.NetworkManagerInfo", \
dbus_interface='org.freedesktop.DBus')
except dbus.DBusException:
pass
if name:
print "NMI service already owned by %s, won't claim it." % name
raise RuntimeError
bus_name = dbus.service.BusName(NM_INFO_IFACE, bus=bus)
dbus.service.Object.__init__(self, bus_name, NM_INFO_PATH)
@dbus.service.method(NM_INFO_IFACE, in_signature='i', out_signature='as')
def getNetworks(self, net_type):
ssids = self._parent.get_networks(net_type)
if len(ssids) > 0:
return dbus.Array(ssids)
raise NoNetworks
@dbus.service.method(NM_INFO_IFACE, in_signature='si', out_signature='sibbas')
def getNetworkProperties(self, ssid, net_type):
props = self._parent.get_network_properties(ssid, net_type)
if not props:
raise NoNetworks
return props
@dbus.service.method(NM_INFO_IFACE)
def updateNetworkInfo(self, ssid, bauto, bfallback, bssid, cipher, *args):
self._parent.update_network_info(ssid, bauto, bfallback, bssid, cipher, args)
class NMInfo(object):
def __init__(self):
try:
profile_path = env.get_profile_path()
except NameError:
home = os.path.expanduser("~")
profile_path = os.path.join(home, ".sugar", "default")
self._cfg_file = os.path.join(profile_path, "nm", "networks.cfg")
self._allowed_networks = self._read_config()
self._dbus_helper = NMInfoDBusServiceHelper(self)
def save_config(self):
self._write_config(self._allowed_networks)
def _read_config(self):
if not os.path.exists(os.path.dirname(self._cfg_file)):
os.makedirs(os.path.dirname(self._cfg_file), 0755)
if not os.path.exists(self._cfg_file):
self._write_config({})
return {}
config = NMConfig()
config.read(self._cfg_file)
networks = {}
for name in config.sections():
net = Network(name)
try:
net.read_from_config(config)
networks[name] = net
except NetworkInvalidError, e:
print "Bad network!! %s" % e
del net
del config
return networks
def _write_config(self, networks):
fp = open(self._cfg_file, 'w')
config = NMConfig()
for net in networks:
net.write_to_config(config)
config.write(fp)
fp.close()
del config
def get_networks(self, net_type):
if net_type != NETWORK_TYPE_ALLOWED:
raise ValueError("Bad network type")
nets = []
for net in self._allowed_networks:
nets.append(net.ssid)
print "Returning networks: %s" % nets
return nets
def get_network_properties(self, ssid, net_type):
if net_type != NETWORK_TYPE_ALLOWED:
raise ValueError("Bad network type")
if not self._allowed_networks.has_key(ssid):
return None
network = self._allowed_networks[ssid]
props = network.get_properties()
print "Returning props for %s: %s" % (ssid, props)
return props
def update_network_info(self, ssid, bauto, bfallback, bssid, we_cipher, *args):
if self._allowed_networks.has_key(ssid):
del self._allowed_networks[ssid]
net = Network(ssid)
try:
net.read_from_args(auto, fallback, bssid, we_cipher, args)
self._allowed_networks[ssid] = net
except InvalidNetworkError, e:
print "Bad network!! %s" % e
del net

View File

@ -0,0 +1,34 @@
#! /bin/env python
# vi: ts=4 ai noet
#
# Copyright (C) 2006, Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import sys
import logging
from sugar import logger
from sugar import env
sys.path.insert(0, env.get_services_dir())
from nm import nmclient
logger.start('nm-applet')
logging.info('Starting network applet')
app = nmclient.NMClientApp()
app.run()