1# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15
16import time
17
18import dbus
19import dbus.mainloop.glib
20import netifaces
21
22
23def get_wiface_name():
24    for iface in netifaces.interfaces():
25        if iface.startswith('w'):
26            return iface
27    return None
28
29
30def get_wiface_IPv4(iface):
31    try:
32        [info] = netifaces.ifaddresses(iface)[netifaces.AF_INET]
33        return info['addr']
34    except KeyError:
35        return None
36
37
38class wpa_cli:
39    def __init__(self, iface, reset_on_exit=False):
40        self.iface_name = iface
41        self.iface_obj = None
42        self.iface_ifc = None
43        self.old_network = None
44        self.new_network = None
45        self.connected = False
46        self.reset_on_exit = reset_on_exit
47        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
48        bus = dbus.SystemBus()
49
50        service = dbus.Interface(bus.get_object('fi.w1.wpa_supplicant1', '/fi/w1/wpa_supplicant1'),
51                                 'fi.w1.wpa_supplicant1')
52        iface_path = service.GetInterface(self.iface_name)
53        self.iface_obj = bus.get_object('fi.w1.wpa_supplicant1', iface_path)
54        self.iface_ifc = dbus.Interface(self.iface_obj, 'fi.w1.wpa_supplicant1.Interface')
55        self.iface_props = dbus.Interface(self.iface_obj, 'org.freedesktop.DBus.Properties')
56        if self.iface_ifc is None:
57            raise RuntimeError('supplicant : Failed to fetch interface')
58
59        self.old_network = self._get_iface_property('CurrentNetwork')
60        print('Old network is %s' % self.old_network)
61
62        if self.old_network == '/':
63            self.old_network = None
64        else:
65            self.connected = True
66
67    def _get_iface_property(self, name):
68        """ Read the property with 'name' from the wi-fi interface object
69
70        Note: The result is a dbus wrapped type, so should usually convert it to the corresponding native
71        Python type
72        """
73        return self.iface_props.Get('fi.w1.wpa_supplicant1.Interface', name)
74
75    def connect(self, ssid, password):
76        if self.connected is True:
77            self.iface_ifc.Disconnect()
78            self.connected = False
79
80        if self.new_network is not None:
81            self.iface_ifc.RemoveNetwork(self.new_network)
82
83        print('Pre-connect state is %s, IP is %s' % (self._get_iface_property('State'), get_wiface_IPv4(self.iface_name)))
84
85        self.new_network = self.iface_ifc.AddNetwork({'ssid': ssid, 'psk': password})
86        self.iface_ifc.SelectNetwork(self.new_network)
87        time.sleep(10)
88
89        ip = None
90        retry = 10
91        while retry > 0:
92            time.sleep(5)
93            state = str(self._get_iface_property('State'))
94            print('wpa iface state %s (scanning %s)' % (state, bool(self._get_iface_property('Scanning'))))
95            if state in ['disconnected', 'inactive']:
96                self.iface_ifc.Reconnect()
97            ip = get_wiface_IPv4(self.iface_name)
98            print('wpa iface %s IP %s' % (self.iface_name, ip))
99            if ip is not None:
100                self.connected = True
101                return ip
102            retry -= 1
103            time.sleep(3)
104
105        self.reset()
106        raise RuntimeError('wpa_cli : Connection failed')
107
108    def reset(self):
109        if self.iface_ifc is not None:
110            if self.connected is True:
111                self.iface_ifc.Disconnect()
112                self.connected = False
113            if self.new_network is not None:
114                self.iface_ifc.RemoveNetwork(self.new_network)
115                self.new_network = None
116            if self.old_network is not None:
117                self.iface_ifc.SelectNetwork(self.old_network)
118                self.old_network = None
119
120    def __del__(self):
121        if self.reset_on_exit is True:
122            self.reset()
123