88ddaab1c7
An implementation of the NetworkManagerInfo service (for storing local networking config) and GUI bits for discovering network status and controlling networking, written in Python.
347 lines
10 KiB
Python
347 lines
10 KiB
Python
# vi: ts=4 ai noet
|
|
#
|
|
# Copyright (C) 2006, Red Hat, Inc.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
import dbus
|
|
import dbus.service
|
|
import time
|
|
import os
|
|
import binascii
|
|
from ConfigParser import ConfigParser
|
|
try:
|
|
from sugar import env
|
|
except ImportError:
|
|
pass
|
|
|
|
NM_INFO_IFACE='org.freedesktop.NetworkManagerInfo'
|
|
NM_INFO_PATH='/org/freedesktop/NetworkManagerInfo'
|
|
|
|
|
|
class NoNetworks(dbus.DBusException):
|
|
def __init__(self):
|
|
dbus.DBusException.__init__(self)
|
|
self._dbus_error_name = NM_INFO_IFACE + '.NoNetworks'
|
|
|
|
|
|
class NetworkInvalidError(Exception):
|
|
pass
|
|
|
|
|
|
class NMConfig(ConfigParser):
|
|
def get_bool(self, section, name):
|
|
opt = self.get(section, name)
|
|
if type(opt) == type(""):
|
|
if opt.lower() == 'yes' or opt.lower() == 'true':
|
|
return True
|
|
elif opt.lower() == 'no' or opt.lower() == 'false':
|
|
return False
|
|
raise ValueError("Invalid format for %s/%s. Should be one of [yes, no, true, false]." % (section, name))
|
|
|
|
def get_list(self, section, name):
|
|
opt = self.get(section, name)
|
|
if type(opt) == type(""):
|
|
if not len(opt):
|
|
return []
|
|
try:
|
|
return opt.split()
|
|
except Exception:
|
|
pass
|
|
raise ValueError("Invalid format for %s/%s. Should be a space-separate list." % (section, name))
|
|
|
|
def get_int(self, section, name):
|
|
opt = self.get(section, name)
|
|
try:
|
|
return int(opt)
|
|
except Exception:
|
|
pass
|
|
raise ValueError("Invalid format for %s/%s. Should be a valid integer." % (section, name))
|
|
|
|
|
|
IW_AUTH_CIPHER_NONE = 0x00000001
|
|
IW_AUTH_CIPHER_WEP40 = 0x00000002
|
|
IW_AUTH_CIPHER_TKIP = 0x00000004
|
|
IW_AUTH_CIPHER_CCMP = 0x00000008
|
|
IW_AUTH_CIPHER_WEP104 = 0x00000010
|
|
|
|
IW_AUTH_ALG_OPEN_SYSTEM = 0x00000001
|
|
IW_AUTH_ALG_SHARED_KEY = 0x00000002
|
|
|
|
NETWORK_TYPE_UNKNOWN = 0
|
|
NETWORK_TYPE_ALLOWED = 1
|
|
NETWORK_TYPE_INVALID = 2
|
|
|
|
|
|
class Security(object):
|
|
def __init__(self, we_cipher):
|
|
self._we_cipher = we_cipher
|
|
|
|
def read_from_config(self, cfg, name):
|
|
pass
|
|
|
|
def read_from_args(self, args):
|
|
pass
|
|
|
|
def new_from_config(cfg, name):
|
|
security = None
|
|
try:
|
|
we_cipher = cfg.get_int(name, "we_cipher")
|
|
if we_cipher == IW_AUTH_CIPHER_NONE:
|
|
security = Security(we_cipher)
|
|
elif we_cipher == IW_AUTH_CIPHER_WEP40 or we_cipher == IW_AUTH_CIPHER_WEP104:
|
|
security = WEPSecurity(we_cipher)
|
|
else:
|
|
# FIXME: find a way to make WPA config option matrix not
|
|
# make you want to throw up
|
|
raise ValueError("Unsupported security combo")
|
|
security.read_from_config(cfg, name)
|
|
except (NoOptionError, ValueError), e:
|
|
return None
|
|
return security
|
|
new_from_config = staticmethod(new_from_config)
|
|
|
|
def new_from_args(cfg, we_cipher, args):
|
|
security = None
|
|
try:
|
|
if we_cipher == IW_AUTH_CIPHER_NONE:
|
|
security = Security(we_cipher)
|
|
elif we_cipher == IW_AUTH_CIPHER_WEP40 or we_cipher == IW_AUTH_CIPHER_WEP104:
|
|
security = WEPSecurity(we_cipher)
|
|
else:
|
|
# FIXME: find a way to make WPA config option matrix not
|
|
# make you want to throw up
|
|
raise ValueError("Unsupported security combo")
|
|
security.read_from_args(args)
|
|
except (NoOptionError, ValueError), e:
|
|
del security
|
|
return None
|
|
return security
|
|
new_from_args = staticmethod(new_from_args)
|
|
|
|
def get_properties(self):
|
|
return [self._we_cipher]
|
|
|
|
def write_to_config(self, section, config):
|
|
config.set(section, "we_cipher", self._we_cipher)
|
|
|
|
|
|
class WEPSecurity(Security):
|
|
def read_from_args(self, *args):
|
|
if len(args) != 2:
|
|
raise ValueError("not enough arguments")
|
|
if not isinstance(args[0], str):
|
|
raise ValueError("wrong argument type for key")
|
|
if not isinstance(args[1], int):
|
|
raise ValueError("wrong argument type for auth_alg")
|
|
self._key = args[0]
|
|
self._auth_alg = args[1]
|
|
|
|
def read_from_config(self, cfg, name):
|
|
# Key should be a hex encoded string
|
|
self._key = cfg.get(name, "key")
|
|
if self._we_cipher == IW_AUTH_CIPHER_WEP40 and len(self._key) != 10:
|
|
raise ValueError("Key length not right for 40-bit WEP")
|
|
if self._we_cipher == IW_AUTH_CIPHER_WEP104 and len(self._key) != 26:
|
|
raise ValueError("Key length not right for 104-bit WEP")
|
|
|
|
try:
|
|
a = binascii.a2b_hex(self._key)
|
|
except TypeError:
|
|
raise ValueError("Key was not a hexadecimal string.")
|
|
|
|
self._auth_alg = cfg.get_int(name, "auth_alg")
|
|
if self._auth_alg != IW_AUTH_ALG_OPEN_SYSTEM and self._auth_alg != IW_AUTH_ALG_SHARED_KEY:
|
|
raise ValueError("Invalid authentication algorithm %d" % self._auth_alg)
|
|
|
|
def get_properties(self):
|
|
args = Security.get_properties(self)
|
|
args.append(self._key)
|
|
args.append(self._auth_alg)
|
|
return args
|
|
|
|
def write_to_config(self, section, config):
|
|
Security.write_to_config(self, section, config)
|
|
config.set(section, "key", self._key)
|
|
config.set(section, "auth_alg", self._auth_alg)
|
|
|
|
|
|
class Network:
|
|
def __init__(ssid):
|
|
self.ssid = ssid
|
|
self.timestamp = time.now()
|
|
self.fallback = False
|
|
self.bssids = []
|
|
self.we_cipher = 0
|
|
self._security = None
|
|
|
|
def get_properties(self):
|
|
args = [network.ssid, network.timestamp, network.fallback, network.bssids]
|
|
args += self._security.get_properties()
|
|
return tuple(args)
|
|
|
|
def read_from_args(self, auto, fallback, bssid, we_cipher, *args):
|
|
if auto == False:
|
|
self.timestamp = time.now()
|
|
self.fallback = True
|
|
if not self.bssids.contains(bssid):
|
|
self.bssids.append(bssid)
|
|
|
|
self._security = Security.new_from_args(we_cipher, args)
|
|
if not self._security:
|
|
raise NetworkInvalidError(e)
|
|
|
|
def read_from_config(self, config):
|
|
try:
|
|
self.timestamp = config.get_int(self.ssid, "timestamp")
|
|
self.fallback = config.get_bool(self.ssid, "fallback")
|
|
except (NoOptionError, ValueError), e:
|
|
raise NetworkInvalidError(e)
|
|
|
|
self._security = Security.new_from_config(config, self.ssid)
|
|
if not self._security:
|
|
raise NetworkInvalidError(e)
|
|
|
|
# The following don't need to be present
|
|
try:
|
|
self.bssids = config.get_list(self.ssid, "bssids")
|
|
except (NoOptionError, ValueError), e:
|
|
pass
|
|
|
|
def write_to_config(self, config):
|
|
config.add_section(self.ssid)
|
|
config.set(self.ssid, "timestamp", self.timestamp)
|
|
config.set(self.ssid, "fallback", self.fallback)
|
|
if len(self.bssids) > 0:
|
|
opt = ""
|
|
opt.join(self.bssids, " ")
|
|
config.set(self.ssid, "bssids", opt)
|
|
self._security.write_to_config(self.ssid, config)
|
|
|
|
|
|
class NMInfoDBusServiceHelper(dbus.service.Object):
|
|
def __init__(self, parent):
|
|
self._parent = parent
|
|
bus = dbus.SystemBus()
|
|
|
|
# If NMI is already around, don't grab the NMI service
|
|
bus_object = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
|
|
name = None
|
|
try:
|
|
name = bus_object.GetNameOwner("org.freedesktop.NetworkManagerInfo", \
|
|
dbus_interface='org.freedesktop.DBus')
|
|
except dbus.DBusException:
|
|
pass
|
|
if name:
|
|
print "NMI service already owned by %s, won't claim it." % name
|
|
raise RuntimeError
|
|
|
|
bus_name = dbus.service.BusName(NM_INFO_IFACE, bus=bus)
|
|
dbus.service.Object.__init__(self, bus_name, NM_INFO_PATH)
|
|
|
|
@dbus.service.method(NM_INFO_IFACE, in_signature='i', out_signature='as')
|
|
def getNetworks(self, net_type):
|
|
ssids = self._parent.get_networks(net_type)
|
|
if len(ssids) > 0:
|
|
return dbus.Array(ssids)
|
|
|
|
raise NoNetworks
|
|
|
|
@dbus.service.method(NM_INFO_IFACE, in_signature='si', out_signature='sibbas')
|
|
def getNetworkProperties(self, ssid, net_type):
|
|
props = self._parent.get_network_properties(ssid, net_type)
|
|
if not props:
|
|
raise NoNetworks
|
|
return props
|
|
|
|
@dbus.service.method(NM_INFO_IFACE)
|
|
def updateNetworkInfo(self, ssid, bauto, bfallback, bssid, cipher, *args):
|
|
self._parent.update_network_info(ssid, bauto, bfallback, bssid, cipher, args)
|
|
|
|
|
|
class NMInfo(object):
|
|
def __init__(self):
|
|
try:
|
|
profile_path = env.get_profile_path()
|
|
except NameError:
|
|
home = os.path.expanduser("~")
|
|
profile_path = os.path.join(home, ".sugar", "default")
|
|
self._cfg_file = os.path.join(profile_path, "nm", "networks.cfg")
|
|
self._allowed_networks = self._read_config()
|
|
self._dbus_helper = NMInfoDBusServiceHelper(self)
|
|
|
|
def save_config(self):
|
|
self._write_config(self._allowed_networks)
|
|
|
|
def _read_config(self):
|
|
if not os.path.exists(os.path.dirname(self._cfg_file)):
|
|
os.makedirs(os.path.dirname(self._cfg_file), 0755)
|
|
if not os.path.exists(self._cfg_file):
|
|
self._write_config({})
|
|
return {}
|
|
|
|
config = NMConfig()
|
|
config.read(self._cfg_file)
|
|
networks = {}
|
|
for name in config.sections():
|
|
net = Network(name)
|
|
try:
|
|
net.read_from_config(config)
|
|
networks[name] = net
|
|
except NetworkInvalidError, e:
|
|
print "Bad network!! %s" % e
|
|
del net
|
|
del config
|
|
return networks
|
|
|
|
def _write_config(self, networks):
|
|
fp = open(self._cfg_file, 'w')
|
|
config = NMConfig()
|
|
for net in networks:
|
|
net.write_to_config(config)
|
|
config.write(fp)
|
|
fp.close()
|
|
del config
|
|
|
|
def get_networks(self, net_type):
|
|
if net_type != NETWORK_TYPE_ALLOWED:
|
|
raise ValueError("Bad network type")
|
|
nets = []
|
|
for net in self._allowed_networks:
|
|
nets.append(net.ssid)
|
|
print "Returning networks: %s" % nets
|
|
return nets
|
|
|
|
def get_network_properties(self, ssid, net_type):
|
|
if net_type != NETWORK_TYPE_ALLOWED:
|
|
raise ValueError("Bad network type")
|
|
if not self._allowed_networks.has_key(ssid):
|
|
return None
|
|
network = self._allowed_networks[ssid]
|
|
props = network.get_properties()
|
|
print "Returning props for %s: %s" % (ssid, props)
|
|
return props
|
|
|
|
def update_network_info(self, ssid, bauto, bfallback, bssid, we_cipher, *args):
|
|
if self._allowed_networks.has_key(ssid):
|
|
del self._allowed_networks[ssid]
|
|
net = Network(ssid)
|
|
try:
|
|
net.read_from_args(auto, fallback, bssid, we_cipher, args)
|
|
self._allowed_networks[ssid] = net
|
|
except InvalidNetworkError, e:
|
|
print "Bad network!! %s" % e
|
|
del net
|