1# Copyright 2018 Espressif Systems (Shanghai) PTE LTD 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15 16import time 17 18import dbus 19import dbus.mainloop.glib 20import netifaces 21 22 23def get_wiface_name(): 24 for iface in netifaces.interfaces(): 25 if iface.startswith('w'): 26 return iface 27 return None 28 29 30def get_wiface_IPv4(iface): 31 try: 32 [info] = netifaces.ifaddresses(iface)[netifaces.AF_INET] 33 return info['addr'] 34 except KeyError: 35 return None 36 37 38class wpa_cli: 39 def __init__(self, iface, reset_on_exit=False): 40 self.iface_name = iface 41 self.iface_obj = None 42 self.iface_ifc = None 43 self.old_network = None 44 self.new_network = None 45 self.connected = False 46 self.reset_on_exit = reset_on_exit 47 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 48 bus = dbus.SystemBus() 49 50 service = dbus.Interface(bus.get_object('fi.w1.wpa_supplicant1', '/fi/w1/wpa_supplicant1'), 51 'fi.w1.wpa_supplicant1') 52 iface_path = service.GetInterface(self.iface_name) 53 self.iface_obj = bus.get_object('fi.w1.wpa_supplicant1', iface_path) 54 self.iface_ifc = dbus.Interface(self.iface_obj, 'fi.w1.wpa_supplicant1.Interface') 55 self.iface_props = dbus.Interface(self.iface_obj, 'org.freedesktop.DBus.Properties') 56 if self.iface_ifc is None: 57 raise RuntimeError('supplicant : Failed to fetch interface') 58 59 self.old_network = self._get_iface_property('CurrentNetwork') 60 print('Old network is %s' % self.old_network) 61 62 if self.old_network == '/': 63 self.old_network = None 64 else: 65 self.connected = True 66 67 def _get_iface_property(self, name): 68 """ Read the property with 'name' from the wi-fi interface object 69 70 Note: The result is a dbus wrapped type, so should usually convert it to the corresponding native 71 Python type 72 """ 73 return self.iface_props.Get('fi.w1.wpa_supplicant1.Interface', name) 74 75 def connect(self, ssid, password): 76 if self.connected is True: 77 self.iface_ifc.Disconnect() 78 self.connected = False 79 80 if self.new_network is not None: 81 self.iface_ifc.RemoveNetwork(self.new_network) 82 83 print('Pre-connect state is %s, IP is %s' % (self._get_iface_property('State'), get_wiface_IPv4(self.iface_name))) 84 85 self.new_network = self.iface_ifc.AddNetwork({'ssid': ssid, 'psk': password}) 86 self.iface_ifc.SelectNetwork(self.new_network) 87 time.sleep(10) 88 89 ip = None 90 retry = 10 91 while retry > 0: 92 time.sleep(5) 93 state = str(self._get_iface_property('State')) 94 print('wpa iface state %s (scanning %s)' % (state, bool(self._get_iface_property('Scanning')))) 95 if state in ['disconnected', 'inactive']: 96 self.iface_ifc.Reconnect() 97 ip = get_wiface_IPv4(self.iface_name) 98 print('wpa iface %s IP %s' % (self.iface_name, ip)) 99 if ip is not None: 100 self.connected = True 101 return ip 102 retry -= 1 103 time.sleep(3) 104 105 self.reset() 106 raise RuntimeError('wpa_cli : Connection failed') 107 108 def reset(self): 109 if self.iface_ifc is not None: 110 if self.connected is True: 111 self.iface_ifc.Disconnect() 112 self.connected = False 113 if self.new_network is not None: 114 self.iface_ifc.RemoveNetwork(self.new_network) 115 self.new_network = None 116 if self.old_network is not None: 117 self.iface_ifc.SelectNetwork(self.old_network) 118 self.old_network = None 119 120 def __del__(self): 121 if self.reset_on_exit is True: 122 self.reset() 123