1#!/usr/bin/env python 2# 3# Copyright 2019 Espressif Systems (Shanghai) PTE LTD 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18# DBus-Bluez BLE library 19 20from __future__ import print_function 21 22import sys 23import time 24 25try: 26 import dbus 27 import dbus.mainloop.glib 28 from gi.repository import GLib 29except ImportError as e: 30 if 'linux' not in sys.platform: 31 raise e 32 print(e) 33 print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue') 34 print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue') 35 raise 36 37from . import lib_gap, lib_gatt 38 39BLUEZ_SERVICE_NAME = 'org.bluez' 40DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' 41DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' 42 43ADAPTER_IFACE = 'org.bluez.Adapter1' 44DEVICE_IFACE = 'org.bluez.Device1' 45 46GATT_MANAGER_IFACE = 'org.bluez.GattManager1' 47LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' 48 49GATT_SERVICE_IFACE = 'org.bluez.GattService1' 50GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' 51 52 53class DBusException(dbus.exceptions.DBusException): 54 pass 55 56 57class Characteristic: 58 def __init__(self): 59 self.iface = None 60 self.path = None 61 self.props = None 62 63 64class Service: 65 def __init__(self): 66 self.iface = None 67 self.path = None 68 self.props = None 69 self.chars = [] 70 71 72class Device: 73 def __init__(self): 74 self.iface = None 75 self.path = None 76 self.props = None 77 self.name = None 78 self.addr = None 79 self.services = [] 80 81 82class Adapter: 83 def __init__(self): 84 self.iface = None 85 self.path = None 86 self.props = None 87 88 89class BLE_Bluez_Client: 90 def __init__(self, iface=None): 91 self.bus = None 92 self.hci_iface = iface 93 self.adapter = Adapter() 94 self.device = None 95 self.gatt_app = None 96 self.gatt_mgr = None 97 self.mainloop = None 98 self.loop_cnt = 0 99 100 try: 101 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 102 self.bus = dbus.SystemBus() 103 except dbus.exceptions.DBusException as dbus_err: 104 raise DBusException('Failed to initialise client: {}'.format(dbus_err)) 105 except Exception as err: 106 raise Exception('Failed to initialise client: {}'.format(err)) 107 108 def __del__(self): 109 try: 110 # Cleanup 111 self.disconnect() 112 print('Test Exit') 113 except Exception as e: 114 print(e) 115 116 def set_adapter(self): 117 ''' 118 Discover Bluetooth Adapter 119 Power On Bluetooth Adapter 120 ''' 121 try: 122 print('discovering adapter') 123 dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) 124 dbus_objs = dbus_obj_mgr.GetManagedObjects() 125 for path, interfaces in dbus_objs.items(): 126 adapter = interfaces.get(ADAPTER_IFACE) 127 if adapter is not None and path.endswith(self.hci_iface): 128 self.adapter.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), ADAPTER_IFACE) 129 self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE) 130 self.adapter.path = path 131 break 132 133 if self.adapter.iface is None: 134 print('bluetooth adapter not found') 135 return False 136 137 print('bluetooth adapter discovered') 138 print('checking if bluetooth adapter is already powered on') 139 # Check if adapter is already powered on 140 powered = self.adapter.props.Get(ADAPTER_IFACE, 'Powered') 141 if powered == 1: 142 print('adapter already powered on') 143 return True 144 # Power On Adapter 145 print('powering on adapter') 146 self.adapter.props.Set(ADAPTER_IFACE, 'Powered', dbus.Boolean(1)) 147 # Check if adapter is powered on 148 print('checking if adapter is powered on') 149 for cnt in range(10, 0, -1): 150 time.sleep(5) 151 powered_on = self.adapter.props.Get(ADAPTER_IFACE, 'Powered') 152 if powered_on == 1: 153 # Set adapter props again with powered on value 154 self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), DBUS_PROP_IFACE) 155 print('bluetooth adapter powered on') 156 return True 157 print('number of retries left({})'.format(cnt - 1)) 158 159 # Adapter not powered on 160 print('bluetooth adapter not powered on') 161 return False 162 163 except Exception as err: 164 raise Exception('Failed to set adapter: {}'.format(err)) 165 166 def connect(self, devname=None, devaddr=None): 167 ''' 168 Start Discovery and Connect to the device 169 ''' 170 try: 171 device_found = None 172 start_discovery = False 173 self.device = Device() 174 175 discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering') 176 # Start Discovery 177 if discovery_val == 0: 178 print('starting discovery') 179 self.adapter.iface.StartDiscovery() 180 start_discovery = True 181 182 for cnt in range(10, 0, -1): 183 time.sleep(5) 184 discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering') 185 if discovery_val == 1: 186 print('start discovery successful') 187 break 188 print('number of retries left ({})'.format(cnt - 1)) 189 190 if discovery_val == 0: 191 print('start discovery failed') 192 return False 193 194 # Get device 195 for cnt in range(10, 0, -1): 196 # Wait for device to be discovered 197 time.sleep(5) 198 device_found = self.get_device( 199 devname=devname, 200 devaddr=devaddr) 201 if device_found: 202 break 203 # Retry 204 print('number of retries left ({})'.format(cnt - 1)) 205 206 if not device_found: 207 print('expected device {} [ {} ] not found'.format(devname, devaddr)) 208 return False 209 210 # Connect to expected device found 211 print('connecting to device {} [ {} ] '.format(self.device.name, self.device.addr)) 212 self.device.iface.Connect(dbus_interface=DEVICE_IFACE) 213 for cnt in range(10, 0, -1): 214 time.sleep(5) 215 connected = self.device.props.Get(DEVICE_IFACE, 'Connected') 216 if connected == 1: 217 # Set device props again with connected on value 218 self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.device.path), DBUS_PROP_IFACE) 219 print('connected to device with iface {}'.format(self.device.path)) 220 return True 221 print('number of retries left({})'.format(cnt - 1)) 222 223 # Device not connected 224 print('connection to device failed') 225 return False 226 227 except Exception as err: 228 raise Exception('Connect to device failed : {}'.format(err)) 229 finally: 230 try: 231 if start_discovery: 232 print('stopping discovery') 233 self.adapter.iface.StopDiscovery() 234 for cnt in range(10, 0, -1): 235 time.sleep(5) 236 discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering') 237 if discovery_val == 0: 238 print('stop discovery successful') 239 break 240 print('number of retries left ({})'.format(cnt - 1)) 241 if discovery_val == 1: 242 print('stop discovery failed') 243 except dbus.exceptions.DBusException as dbus_err: 244 print('Warning: Failure during cleanup for device connection : {}'.format(dbus_err)) 245 246 def get_device(self, devname=None, devaddr=None): 247 ''' 248 Get device based on device name 249 and device address and connect to device 250 ''' 251 dev_path = None 252 expected_device_addr = devaddr.lower() 253 expected_device_name = devname.lower() 254 255 print('checking if expected device {} [ {} ] is present'.format(devname, devaddr)) 256 257 dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) 258 dbus_objs = dbus_obj_mgr.GetManagedObjects() 259 260 # Check if expected device is present 261 for path, interfaces in dbus_objs.items(): 262 if DEVICE_IFACE not in interfaces.keys(): 263 continue 264 265 # Check expected device address is received device address 266 received_device_addr_path = (path.replace('_', ':')).lower() 267 if expected_device_addr not in received_device_addr_path: 268 continue 269 270 device_props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE) 271 received_device_name = device_props.Get(DEVICE_IFACE, 'Name').lower() 272 273 # Check expected device name is received device name 274 if expected_device_name == received_device_name: 275 # Set device iface path 276 dev_path = path 277 break 278 279 if not dev_path: 280 print('\nBLE device not found') 281 return False 282 283 print('device {} [ {} ] found'.format(devname, devaddr)) 284 285 # Set device details 286 self.device.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DEVICE_IFACE) 287 self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE) 288 self.device.path = dev_path 289 self.device.name = devname 290 self.device.addr = devaddr 291 return True 292 293 def get_services(self): 294 ''' 295 Retrieve Services found in the device connected 296 ''' 297 try: 298 # Get current dbus objects 299 dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) 300 dbus_objs = dbus_obj_mgr.GetManagedObjects() 301 302 # Get services 303 for path, interfaces in dbus_objs.items(): 304 if GATT_SERVICE_IFACE in interfaces.keys(): 305 if not path.startswith(self.device.path): 306 continue 307 received_service = self.bus.get_object(BLUEZ_SERVICE_NAME, path) 308 # Retrieve all services on device iface path 309 # and set each service received 310 service = Service() 311 service.path = path 312 service.iface = dbus.Interface(received_service, GATT_SERVICE_IFACE) 313 service.props = dbus.Interface(received_service, DBUS_PROP_IFACE) 314 self.device.services.append(service) 315 316 if not self.device.services: 317 print('no services found for device: {}'.format(self.device.path)) 318 return False 319 320 return True 321 322 except Exception as err: 323 raise Exception('Failed to get services: {}'.format(err)) 324 325 def get_chars(self): 326 ''' 327 Get characteristics of the services set for the device connected 328 ''' 329 try: 330 if not self.device.services: 331 print('No services set for device: {}'.format(self.device.path)) 332 return 333 334 # Read chars for all the services received for device 335 for service in self.device.services: 336 char_found = False 337 dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) 338 dbus_objs = dbus_obj_mgr.GetManagedObjects() 339 for path, interfaces in dbus_objs.items(): 340 if GATT_CHRC_IFACE in interfaces.keys(): 341 if not path.startswith(self.device.path): 342 continue 343 if not path.startswith(service.path): 344 continue 345 # Set characteristics 346 received_char = self.bus.get_object(BLUEZ_SERVICE_NAME, path) 347 char = Characteristic() 348 char.path = path 349 char.iface = dbus.Interface(received_char, GATT_CHRC_IFACE) 350 char.props = dbus.Interface(received_char, DBUS_PROP_IFACE) 351 service.chars.append(char) 352 char_found = True 353 354 if not char_found: 355 print('Characteristic not found for service: {}'.format(service.iface)) 356 357 except Exception as err: 358 raise Exception('Failed to get characteristics : {}'.format(err)) 359 360 def read_chars(self): 361 ''' 362 Read value of characteristics 363 ''' 364 try: 365 if not self.device.services: 366 print('No services set for device: {}'.format(self.device.path)) 367 return 368 369 # Read chars for all services of device 370 for service in self.device.services: 371 # Read properties of characteristic 372 for char in service.chars: 373 # Print path 374 print('Characteristic: {}'.format(char.path)) 375 # Print uuid 376 uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID') 377 print('UUID: {}'.format(uuid)) 378 # Print flags 379 flags = [flag for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')] 380 print('Flags: {}'.format(flags)) 381 # Read value if `read` flag is present 382 if 'read' in flags: 383 value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) 384 print('Value: {}'.format(value)) 385 386 except Exception as err: 387 raise Exception('Failed to read characteristics : {}'.format(err)) 388 389 def write_chars(self, new_value): 390 ''' 391 Write to characteristics 392 ''' 393 try: 394 if not self.device.services: 395 print('No services set for device: {}'.format(self.device.path)) 396 return False 397 398 print('writing data to characteristics with read and write permission') 399 # Read chars of all services of device 400 for service in self.device.services: 401 if not service.chars: 402 print('No chars found for service: {}'.format(service.path)) 403 continue 404 for char in service.chars: 405 flags = [flag.lower() for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')] 406 if not ('read' in flags and 'write' in flags): 407 continue 408 409 # Write new value to characteristic 410 curr_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) 411 print('current value: {}'.format(curr_value)) 412 413 print('writing {} to characteristic {}'.format(new_value, char.path)) 414 char.iface.WriteValue(new_value, {}, dbus_interface=GATT_CHRC_IFACE) 415 416 time.sleep(5) 417 updated_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) 418 print('updated value: {}'.format(updated_value)) 419 420 if not (ord(new_value) == int(updated_value[0])): 421 print('write operation to {} failed'.format(char.path)) 422 return False 423 print('write operation to {} successful'.format(char.path)) 424 return True 425 426 except Exception as err: 427 raise Exception('Failed to write to characteristics: {}'.format(err)) 428 429 def get_char_if_exists(self, char_uuid): 430 ''' 431 Get char if exists for given uuid 432 ''' 433 try: 434 for service in self.device.services: 435 for char in service.chars: 436 curr_uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID') 437 if char_uuid.lower() in curr_uuid.lower(): 438 return char 439 print('char {} not found'.format(char_uuid)) 440 return False 441 except Exception as err: 442 raise Exception('Failed to get char based on uuid {} - {}'.format(char_uuid, err)) 443 444 def get_service_if_exists(self, service_uuid): 445 try: 446 for service in self.device.services: 447 uuid = service.props.Get(GATT_SERVICE_IFACE, 'UUID') 448 if service_uuid.lower() in uuid.lower(): 449 return service 450 print('service {} not found'.format(service_uuid)) 451 return False 452 except Exception as err: 453 raise Exception('Failed to get service based on uuid {} - {}'.format(service_uuid, err)) 454 455 def start_notify(self, char): 456 try: 457 notify_started = 0 458 notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying') 459 if notifying == 0: 460 # Start Notify 461 char.iface.StartNotify() 462 notify_started = 1 463 # Check notify started 464 for _ in range(10, 0, -1): 465 notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying') 466 if notifying == 1: 467 print('subscribe to notifications: on') 468 break 469 if notifying == 0: 470 print('Failed to start notifications') 471 return False 472 473 # Get updated value 474 for _ in range(10, 0, -1): 475 time.sleep(1) 476 char_value = char.props.Get(GATT_CHRC_IFACE, 'Value') 477 print(char_value) 478 479 return None 480 481 except Exception as err: 482 raise Exception('Failed to perform notification operation: {}'.format(err)) 483 finally: 484 try: 485 if notify_started == 1: 486 # Stop notify 487 char.iface.StopNotify() 488 for _ in range(10, 0, -1): 489 notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying') 490 if notifying == 0: 491 print('subscribe to notifications: off') 492 break 493 if notifying == 1: 494 print('Failed to stop notifications') 495 except dbus.exceptions.DBusException as dbus_err: 496 print('Warning: Failure during cleanup for start notify : {}'.format(dbus_err)) 497 498 def _create_mainloop(self): 499 ''' 500 Create GLibMainLoop 501 ''' 502 if not self.mainloop: 503 self.mainloop = GLib.MainLoop() 504 505 def register_gatt_app(self): 506 ''' 507 Create Gatt Application 508 Register Gatt Application 509 ''' 510 try: 511 # Create mainloop, if does not exist 512 self._create_mainloop() 513 514 # Create Gatt Application 515 self.gatt_app = lib_gatt.AlertNotificationApp(self.bus, self.adapter.path) 516 print('GATT Application created') 517 self.gatt_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), GATT_MANAGER_IFACE) 518 519 # Register Gatt Application 520 self.gatt_mgr.RegisterApplication( 521 self.gatt_app, {}, 522 reply_handler=self.gatt_app_success_handler, 523 error_handler=self.gatt_app_error_handler) 524 self.mainloop.run() 525 526 except dbus.exceptions.DBusException as dbus_err: 527 raise DBusException('Failed to create GATT Application : {}'.format(dbus_err)) 528 except Exception as err: 529 raise Exception('Failed to register Gatt Application: {}'.format(err)) 530 531 def gatt_app_success_handler(self): 532 print('GATT Application successfully registered') 533 self.mainloop.quit() 534 535 def gatt_app_error_handler(self): 536 raise DBusException('Failed to register GATT Application') 537 538 def check_le_iface(self): 539 ''' 540 Check if LEAdvertisingManager1 interface exists 541 ''' 542 try: 543 dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) 544 dbus_objs = dbus_obj_mgr.GetManagedObjects() 545 for path, iface in dbus_objs.items(): 546 if LE_ADVERTISING_MANAGER_IFACE in iface: 547 le_adv_iface_path = path 548 break 549 # Check LEAdvertisingManager1 interface is found 550 assert le_adv_iface_path, '\n Cannot start advertising. LEAdvertisingManager1 Interface not found' 551 552 return le_adv_iface_path 553 554 except AssertionError: 555 raise 556 except Exception as err: 557 raise Exception('Failed to find LEAdvertisingManager1 interface: {}'.format(err)) 558 559 def register_adv(self, adv_host_name, adv_type, adv_uuid): 560 try: 561 # Gatt Application is expected to be registered 562 if not self.gatt_app: 563 print('No Gatt Application is registered') 564 return 565 566 adv_iface_index = 0 567 568 # Create mainloop, if does not exist 569 self._create_mainloop() 570 571 # Check LEAdvertisingManager1 interface exists 572 le_iface_path = self.check_le_iface() 573 574 # Create Advertisement data 575 leadv_obj = lib_gap.Advertisement( 576 self.bus, 577 adv_iface_index, 578 adv_type, 579 adv_uuid, 580 adv_host_name) 581 print('Advertisement registered') 582 583 # Register Advertisement 584 leadv_mgr_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_iface_path), LE_ADVERTISING_MANAGER_IFACE) 585 leadv_mgr_iface_obj.RegisterAdvertisement( 586 leadv_obj.get_path(), {}, 587 reply_handler=self.adv_success_handler, 588 error_handler=self.adv_error_handler) 589 590 # Handler to read events received and exit from mainloop 591 GLib.timeout_add_seconds(3, self.check_adv) 592 593 self.mainloop.run() 594 595 except AssertionError: 596 raise 597 except dbus.exceptions.DBusException as dbus_err: 598 raise DBusException('Failure during registering advertisement : {}'.format(dbus_err)) 599 except Exception as err: 600 raise Exception('Failure during registering advertisement : {}'.format(err)) 601 else: 602 try: 603 try: 604 # Stop Notify if not already stopped 605 chars = self.gatt_app.service.get_characteristics() 606 for char in chars: 607 if char.uuid == lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID']: 608 if char.notifying: 609 char.StopNotify() 610 except dbus.exceptions.DBusException as dbus_err: 611 print('Warning: {}'.format(dbus_err)) 612 613 try: 614 # Unregister Advertisement 615 leadv_mgr_iface_obj.UnregisterAdvertisement(leadv_obj.get_path()) 616 except dbus.exceptions.DBusException as dbus_err: 617 print('Warning: {}'.format(dbus_err)) 618 619 try: 620 # Remove advertising data 621 dbus.service.Object.remove_from_connection(leadv_obj) 622 except LookupError as err: 623 print('Warning: Failed to remove connection from dbus for advertisement object: {} - {}'.format(leadv_obj, err)) 624 625 try: 626 # Unregister Gatt Application 627 self.gatt_mgr.UnregisterApplication(self.gatt_app.get_path()) 628 except dbus.exceptions.DBusException as dbus_err: 629 print('Warning: {}'.format(dbus_err)) 630 631 try: 632 # Remove Gatt Application 633 dbus.service.Object.remove_from_connection(self.gatt_app) 634 except LookupError as err: 635 print('Warning: Failed to remove connection from dbus for Gatt application object: {} - {}'.format(self.gatt_app, err)) 636 637 except RuntimeError as err: 638 print('Warning: Failure during cleanup of Advertisement: {}'.format(err)) 639 640 def adv_success_handler(self): 641 print('Registered Advertisement successfully') 642 643 def adv_error_handler(self, err): 644 raise DBusException('{}'.format(err)) 645 646 def check_adv(self): 647 ''' 648 Handler to check for events triggered (read/write/subscribe) 649 for advertisement registered for AlertNotificationApp 650 ''' 651 try: 652 retry = 10 653 # Exit loop if read and write and subscribe is successful 654 if self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'], 'read') and \ 655 self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['ALERT_NOTIF_UUID'], 'write') and \ 656 self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'], 'notify'): 657 if self.mainloop.is_running(): 658 self.mainloop.quit() 659 # return False to stop polling 660 return False 661 662 self.loop_cnt += 1 663 print('Check read/write/subscribe events are received...Retry {}'.format(self.loop_cnt)) 664 665 # Exit loop if max retry value is reached and 666 # all three events (read and write and subscribe) have not yet passed 667 # Retry total 10 times 668 if self.loop_cnt == (retry - 1): 669 if self.mainloop.is_running(): 670 self.mainloop.quit() 671 # return False to stop polling 672 return False 673 674 # return True to continue polling 675 return True 676 677 except RuntimeError as err: 678 print('Failure in advertisment handler: {}'.format(err)) 679 if self.mainloop.is_running(): 680 self.mainloop.quit() 681 # return False to stop polling 682 return False 683 684 def disconnect(self): 685 ''' 686 Disconnect device 687 ''' 688 try: 689 if not self.device or not self.device.iface: 690 return 691 print('disconnecting device') 692 # Disconnect device 693 device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected') 694 if device_conn == 1: 695 self.device.iface.Disconnect(dbus_interface=DEVICE_IFACE) 696 for cnt in range(10, 0, -1): 697 time.sleep(5) 698 device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected') 699 if device_conn == 0: 700 print('device disconnected') 701 break 702 print('number of retries left ({})'.format(cnt - 1)) 703 if device_conn == 1: 704 print('failed to disconnect device') 705 706 self.adapter.iface.RemoveDevice(self.device.iface) 707 self.device = None 708 709 except dbus.exceptions.DBusException as dbus_err: 710 print('Warning: {}'.format(dbus_err)) 711 except Exception as err: 712 raise Exception('Failed to disconnect device: {}'.format(err)) 713