1#!/usr/bin/env python 2# 3# Copyright (c) 2018, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28""" 29>> Thread Host Controller Interface 30>> Device : OpenThread_WpanCtl THCI 31>> Class : OpenThread_WpanCtl 32""" 33 34import re 35import time 36import socket 37import logging 38from Queue import Queue 39 40import serial 41from IThci import IThci 42from GRLLibs.UtilityModules.Test import Thread_Device_Role, Device_Data_Requirement, MacType 43from GRLLibs.UtilityModules.enums import PlatformDiagnosticPacket_Direction, PlatformDiagnosticPacket_Type 44from GRLLibs.UtilityModules.ModuleHelper import ModuleHelper 45from GRLLibs.ThreadPacket.PlatformPackets import PlatformDiagnosticPacket, PlatformPackets 46from GRLLibs.UtilityModules.Plugins.AES_CMAC import Thread_PBKDF2 47"""regex: used to split lines""" 48LINESEPX = re.compile(r'\r\n|\n') 49 50 51class OpenThread_WpanCtl(IThci): 52 LOWEST_POSSIBLE_PARTATION_ID = 0x1 53 LINK_QUALITY_CHANGE_TIME = 100 54 55 # Used for reference firmware version control for Test Harness. 56 # This variable will be updated to match the OpenThread reference firmware 57 # officially released. 58 firmwarePrefix = 'OPENTHREAD/' 59 60 def __init__(self, **kwargs): 61 """initialize the serial port and default network parameters 62 Args: 63 **kwargs: Arbitrary keyword arguments 64 Includes 'EUI' and 'SerialPort' 65 """ 66 try: 67 self.UIStatusMsg = '' 68 self.mac = kwargs.get('EUI') 69 self.handle = None 70 self.AutoDUTEnable = False 71 self._is_net = False # whether device is through ser2net 72 self.logStatus = {'stop': 'stop', 'running': 'running', 'pauseReq': 'pauseReq', 'paused': 'paused'} 73 self.logThreadStatus = self.logStatus['stop'] 74 # connection type 'ip' stands for SSH 75 self.connectType = (kwargs.get('Param5')).strip().lower() if kwargs.get('Param5') is not None else 'usb' 76 # comma separated CLI prompt, wpanctl cmd prefix, Wpan interface 77 (self.prompt, self.wpan_cmd_prefix, 78 self.wpan_interface) = (kwargs.get('Param8').strip().split(',') 79 if kwargs.get('Param8') else ['#', 'wpanctl', 'wpan0']) 80 self.wpan_cmd_prefix += ' ' 81 # comma separated setting commands 82 self.precmd = (kwargs.get('Param9')).strip().split(',') if kwargs.get('Param9') else [] 83 if self.connectType == 'ip': 84 self.dutIpv4 = kwargs.get('TelnetIP') 85 self.dutPort = kwargs.get('TelnetPort') 86 self.port = self.dutIpv4 + ':' + self.dutPort 87 # username for SSH 88 self.username = kwargs.get('Param6').strip() if kwargs.get('Param6') else None 89 # password for SSH 90 self.password = kwargs.get('Param7').strip() if kwargs.get('Param7') else None 91 else: 92 self.port = kwargs.get('SerialPort') 93 self.intialize() 94 except Exception as e: 95 ModuleHelper.WriteIntoDebugLogger('initialize() Error: ' + str(e)) 96 97 def __del__(self): 98 """close the serial port connection""" 99 try: 100 self.closeConnection() 101 self.deviceConnected = False 102 except Exception as e: 103 ModuleHelper.WriteIntoDebugLogger('delete() Error: ' + str(e)) 104 105 def _expect(self, expected, times=50): 106 """Find the `expected` line within `times` trials. 107 108 Args: 109 expected str: the expected string 110 times int: number of trials 111 """ 112 print('[%s] Expecting [%s]' % (self.port, expected)) 113 114 retry_times = 10 115 while times > 0 and retry_times > 0: 116 line = self._readline() 117 print('[%s] Got line [%s]' % (self.port, line)) 118 119 if line == expected: 120 print('[%s] Expected [%s]' % (self.port, expected)) 121 return 122 123 if not line: 124 retry_times -= 1 125 time.sleep(0.1) 126 127 times -= 1 128 129 raise Exception('failed to find expected string[%s]' % expected) 130 131 def _read(self, size=512): 132 logging.info('%s: reading', self.port) 133 if self._is_net: 134 return self.handle.recv(size) 135 else: 136 return self.handle.read(size) 137 138 def _write(self, data): 139 logging.info('%s: writing', self.port) 140 if self._is_net: 141 self.handle.sendall(data) 142 else: 143 self.handle.write(data) 144 145 def _readline(self): 146 """Read exactly one line from the device 147 148 Returns: 149 None on no data 150 """ 151 logging.info('%s: reading line', self.port) 152 if len(self._lines) > 1: 153 return self._lines.pop(0) 154 155 tail = '' 156 if len(self._lines): 157 tail = self._lines.pop() 158 159 try: 160 tail += self._read() 161 except socket.error: 162 logging.exception('%s: No new data', self.port) 163 time.sleep(0.1) 164 165 self._lines += LINESEPX.split(tail) 166 if len(self._lines) > 1: 167 return self._lines.pop(0) 168 169 def _sendline(self, line): 170 """Send exactly one line to the device 171 172 Args: 173 line str: data send to device 174 """ 175 logging.info('%s: sending line', self.port) 176 # clear buffer 177 self._lines = [] 178 try: 179 self._read() 180 except socket.error: 181 logging.debug('%s: Nothing cleared', self.port) 182 183 print('sending [%s]' % line) 184 self._write(line + '\r\n') 185 self._lines = [] 186 # wait for write to complete 187 time.sleep(0.1) 188 189 def __sendCommand(self, cmd): 190 """send specific command to reference unit over serial port 191 192 Args: 193 cmd: OpenThread_WpanCtl command string 194 195 Returns: 196 Fail: Failed to send the command to reference unit and parse it 197 Value: successfully retrieve the desired value from reference unit 198 Error: some errors occur, indicates by the followed specific error number 199 """ 200 logging.info('%s: sendCommand[%s]', self.port, cmd) 201 if self.logThreadStatus == self.logStatus['running']: 202 self.logThreadStatus = self.logStatus['pauseReq'] 203 while self.logThreadStatus not in (self.logStatus['paused'], self.logStatus['stop']): 204 pass 205 206 ssh_stdin = None 207 ssh_stdout = None 208 ssh_stderr = None 209 try: 210 # command retransmit times 211 retry_times = 3 212 while retry_times > 0: 213 retry_times -= 1 214 try: 215 if self._is_net: 216 ssh_stdin, ssh_stdout, ssh_stderr = self.handle.exec_command(cmd) 217 else: 218 self._sendline(cmd) 219 self._expect(cmd) 220 except Exception as e: 221 logging.exception('%s: failed to send command[%s]: %s', self.port, cmd, str(e)) 222 if retry_times == 0: 223 raise 224 else: 225 break 226 227 line = None 228 response = [] 229 retry_times = 20 230 stdout_lines = [] 231 stderr_lines = [] 232 if self._is_net: 233 stdout_lines = ssh_stdout.readlines() 234 stderr_lines = ssh_stderr.readlines() 235 if stderr_lines: 236 for stderr_line in stderr_lines: 237 if re.search(r'Not\s+Found|failed\s+with\s+error', stderr_line.strip(), re.M | re.I): 238 print('Command failed: %s' % stderr_line) 239 return 'Fail' 240 print('Got line: %s' % stderr_line) 241 logging.info('%s: the read line is[%s]', self.port, stderr_line) 242 response.append(str(stderr_line.strip())) 243 elif stdout_lines: 244 for stdout_line in stdout_lines: 245 logging.info('%s: the read line is[%s]', self.port, stdout_line) 246 if re.search(r'Not\s+Found|failed\s+with\s+error', stdout_line.strip(), re.M | re.I): 247 print('Command failed') 248 return 'Fail' 249 print('Got line: ' + stdout_line) 250 logging.info('%s: send command[%s] done!', self.port, cmd) 251 response.append(str(stdout_line.strip())) 252 response.append(self.prompt) 253 return response 254 else: 255 while retry_times > 0: 256 line = self._readline() 257 print('read line: %s' % line) 258 logging.info('%s: the read line is[%s]', self.port, line) 259 if line: 260 response.append(line) 261 if re.match(self.prompt, line): 262 break 263 elif re.search(r'Not\s+Found|failed\s+with\s+error', line, re.M | re.I): 264 print('Command failed') 265 return 'Fail' 266 267 retry_times -= 1 268 time.sleep(0.1) 269 270 if retry_times == 0: 271 raise Exception('%s: failed to find end of response' % self.port) 272 logging.info('%s: send command[%s] done!', self.port, cmd) 273 return response 274 except Exception as e: 275 ModuleHelper.WriteIntoDebugLogger('sendCommand() Error: ' + str(e)) 276 raise 277 278 def __stripValue(self, value): 279 """strip the special characters in the value 280 281 Args: 282 value: value string 283 284 Returns: 285 value string without special characters 286 """ 287 if isinstance(value, str): 288 if (value[0] == '"' and value[-1] == '"') or (value[0] == '[' and value[-1] == ']'): 289 return value[1:-1] 290 return value 291 292 def __padIp6Addr(self, ip6Addr): 293 segments = ip6Addr.split(':') 294 empty = None 295 for i, element in enumerate(segments): 296 if empty is None and len(element) == 0: 297 empty = i 298 elif len(element) < 4: 299 segments[i] = '0' * (4 - len(element)) + element 300 301 if empty is not None: 302 segments = segments[:empty] + ['0000'] * (8 - len(segments) + 1) + segments[empty + 1:] 303 304 return ':'.join(segments) 305 306 def __setDeviceMode(self, mode): 307 """set thread device mode: 308 309 Args: 310 mode: thread device mode. 11=rdn, 9=rn 311 r: rx-on-when-idle 312 s: secure IEEE 802.15.4 data request 313 d: full thread device 314 n: full network data 315 316 Returns: 317 True: successful to set the device mode 318 False: fail to set the device mode 319 """ 320 print('call __setDeviceMode') 321 322 try: 323 cmd = self.wpan_cmd_prefix + 'setprop Thread:DeviceMode %d' % mode 324 return self.__sendCommand(cmd)[0] != 'Fail' 325 except Exception as e: 326 ModuleHelper.WriteIntoDebugLogger('setDeviceMode() Error: ' + str(e)) 327 328 def __setRouterUpgradeThreshold(self, iThreshold): 329 """set router upgrade threshold 330 331 Args: 332 iThreshold: the number of active routers on the Thread network 333 partition below which a REED may decide to become a Router. 334 335 Returns: 336 True: successful to set the ROUTER_UPGRADE_THRESHOLD 337 False: fail to set ROUTER_UPGRADE_THRESHOLD 338 """ 339 print('call __setRouterUpgradeThreshold') 340 try: 341 cmd = self.wpan_cmd_prefix + 'setprop Thread:RouterUpgradeThreshold %s' % str(iThreshold) 342 return self.__sendCommand(cmd)[0] != 'Fail' 343 except Exception as e: 344 ModuleHelper.WriteIntoDebugLogger('setRouterUpgradeThreshold() Error: ' + str(e)) 345 346 def __setRouterDowngradeThreshold(self, iThreshold): 347 """set router downgrade threshold 348 349 Args: 350 iThreshold: the number of active routers on the Thread network 351 partition above which an active router may decide to 352 become a child. 353 354 Returns: 355 True: successful to set the ROUTER_DOWNGRADE_THRESHOLD 356 False: fail to set ROUTER_DOWNGRADE_THRESHOLD 357 """ 358 print('call __setRouterDowngradeThreshold') 359 try: 360 cmd = self.wpan_cmd_prefix + 'setprop Thread:RouterDowngradeThreshold %s' % str(iThreshold) 361 return self.__sendCommand(cmd)[0] != 'Fail' 362 except Exception as e: 363 ModuleHelper.WriteIntoDebugLogger('setRouterDowngradeThreshold() Error: ' + str(e)) 364 365 def __setRouterSelectionJitter(self, iRouterJitter): 366 """set ROUTER_SELECTION_JITTER parameter for REED to upgrade to Router 367 368 Args: 369 iRouterJitter: a random period prior to request Router ID for REED 370 371 Returns: 372 True: successful to set the ROUTER_SELECTION_JITTER 373 False: fail to set ROUTER_SELECTION_JITTER 374 """ 375 print('call _setRouterSelectionJitter') 376 try: 377 cmd = self.wpan_cmd_prefix + 'setprop Thread:RouterSelectionJitter %s' % str(iRouterJitter) 378 return self.__sendCommand(cmd) != 'Fail' 379 except Exception as e: 380 ModuleHelper.WriteIntoDebugLogger('setRouterSelectionJitter() Error: ' + str(e)) 381 382 def __setAddressfilterMode(self, mode): 383 """set address filter mode 384 385 Returns: 386 True: successful to set address filter mode. 387 False: fail to set address filter mode. 388 """ 389 print('call setAddressFilterMode() %s' % mode) 390 try: 391 if mode in ('allowlist', 'denylist'): 392 cmd = self.wpan_cmd_prefix + 'setprop MAC:' + mode.capitalize() + ':Enabled 1' 393 elif mode == 'disable': 394 if self._addressfilterMode != 'disable': 395 assert self._addressfilterMode in ('allowlist', 'denylist'), self._addressfilterMode 396 cmd = self.wpan_cmd_prefix + 'setprop MAC:' + self._addressfilterMode.capitalize() + ':Enabled 0' 397 else: 398 return True 399 else: 400 assert False, 'unknown address filter mode: %s' % mode 401 402 if self.__sendCommand(cmd)[0] != 'Fail': 403 self._addressfilterMode = mode 404 return True 405 return False 406 except Exception as e: 407 ModuleHelper.WriteIntoDebugLogger('__setAddressFilterMode() Error: ' + str(e)) 408 409 def __startOpenThreadWpan(self): 410 """start OpenThreadWpan 411 412 Returns: 413 True: successful to start OpenThreadWpan up 414 False: fail to start OpenThreadWpan 415 """ 416 print('call startOpenThreadWpan') 417 try: 418 419 # restore allowlist/denylist address filter mode if rejoin after 420 # reset 421 if self.isPowerDown: 422 if self._addressfilterMode == 'allowlist': 423 if self.__setAddressfilterMode('allowlist'): 424 for addr in self._addressfilterSet: 425 self.addAllowMAC(addr) 426 elif self._addressfilterMode == 'denylist': 427 if self.__setAddressfilterMode('denylist'): 428 for addr in self._addressfilterSet: 429 self.addBlockedMAC(addr) 430 time.sleep(1) 431 if ModuleHelper.LeaderDutChannelFound: 432 self.channel = ModuleHelper.Default_Channel 433 nodeType = 'router' 434 startType = 'join' 435 if self.deviceRole == Thread_Device_Role.Leader: 436 nodeType = 'router' 437 startType = 'form' 438 elif self.deviceRole == Thread_Device_Role.Router: 439 nodeType = 'router' 440 elif self.deviceRole == Thread_Device_Role.SED: 441 nodeType = 'sed' 442 elif self.deviceRole == Thread_Device_Role.EndDevice: 443 nodeType = 'end' 444 elif self.deviceRole == Thread_Device_Role.REED: 445 nodeType = 'router' 446 elif self.deviceRole == Thread_Device_Role.EndDevice_FED: 447 nodeType = 'router' 448 elif self.deviceRole == Thread_Device_Role.EndDevice_MED: 449 nodeType = 'end' 450 else: 451 pass 452 453 if self.deviceRole in [Thread_Device_Role.Leader, Thread_Device_Role.Router, Thread_Device_Role.REED]: 454 self.__setRouterSelectionJitter(1) 455 456 if startType == 'form': 457 startCmd = self.wpan_cmd_prefix + '%s "%s" -c %s -T %s ' % ( 458 startType, 459 self.networkName, 460 str(self.channel), 461 nodeType, 462 ) 463 else: 464 startCmd = self.wpan_cmd_prefix + '%s "%s" -p %s -c %s -T %s ' % ( 465 startType, 466 self.networkName, 467 str(hex(self.panId)), 468 str(self.channel), 469 nodeType, 470 ) 471 if self.__sendCommand(startCmd)[0] != 'Fail': 472 if self.__isOpenThreadWpanRunning(): 473 self.isPowerDown = False 474 if self.hasActiveDatasetToCommit: 475 if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Dataset:Command SetActive')[0] == 'Fail': 476 raise Exception('failed to commit active dataset') 477 else: 478 self.hasActiveDatasetToCommit = False 479 480 return True 481 else: 482 return False 483 except Exception as e: 484 ModuleHelper.WriteIntoDebugLogger('startOpenThreadWpan() Error: ' + str(e)) 485 486 def __stopOpenThreadWpan(self): 487 """stop OpenThreadWpan 488 489 Returns: 490 True: successfully stop OpenThreadWpan 491 False: failed to stop OpenThreadWpan 492 """ 493 print('call stopOpenThreadWpan') 494 try: 495 if (self.__sendCommand(self.wpan_cmd_prefix + 'leave')[0] != 'Fail' and 496 self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] != 'Fail'): 497 return True 498 else: 499 return False 500 except Exception as e: 501 ModuleHelper.WriteIntoDebugLogger('stopOpenThreadWpan() Error: ' + str(e)) 502 503 def __isOpenThreadWpanRunning(self): 504 """check whether or not OpenThreadWpan is running 505 506 Returns: 507 True: OpenThreadWpan is running 508 False: OpenThreadWpan is not running 509 """ 510 print('call __isOpenThreadWpanRunning') 511 if self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) == 'associated': 512 print('*****OpenThreadWpan is running') 513 return True 514 else: 515 print('*****Wrong OpenThreadWpan state') 516 return False 517 518 # rloc16 might be hex string or integer, need to return actual allocated 519 # router id 520 def __convertRlocToRouterId(self, xRloc16): 521 """mapping Rloc16 to router id 522 523 Args: 524 xRloc16: hex rloc16 short address 525 526 Returns: 527 actual router id allocated by leader 528 """ 529 routerList = [] 530 routerList = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:RouterTable') 531 print(routerList) 532 print(xRloc16) 533 534 for line in routerList: 535 if re.match(r'\[|\]', line): 536 continue 537 if re.match(self.prompt, line, re.M | re.I): 538 break 539 router = [] 540 router = self.__stripValue(line).split(',') 541 542 for item in router: 543 if 'RouterId' in item: 544 routerid = item.split(':')[1] 545 elif 'RLOC16' in line: 546 rloc16 = line.split(':')[1] 547 else: 548 pass 549 550 # process input rloc16 551 if isinstance(xRloc16, str): 552 rloc16 = '0x' + rloc16 553 if rloc16 == xRloc16: 554 return routerid 555 elif isinstance(xRloc16, int): 556 if int(rloc16, 16) == xRloc16: 557 return routerid 558 else: 559 pass 560 561 return None 562 563 def __convertIp6PrefixStringToIp6Address(self, strIp6Prefix): 564 """convert IPv6 prefix string to IPv6 dotted-quad format 565 for example: 566 2001000000000000 -> 2001:0000:0000:0000:: 567 568 Args: 569 strIp6Prefix: IPv6 address string 570 571 Returns: 572 IPv6 address dotted-quad format 573 """ 574 prefix1 = strIp6Prefix.rstrip('L') 575 prefix2 = self.__lstrip0x(prefix1) 576 hexPrefix = str(prefix2).ljust(16, '0') 577 hexIter = iter(hexPrefix) 578 finalMac = ':'.join(a + b + c + d for a, b, c, d in zip(hexIter, hexIter, hexIter, hexIter)) 579 prefix = str(finalMac) 580 strIp6Prefix = prefix[:19] 581 return strIp6Prefix + '::' 582 583 def __convertLongToHex(self, iValue, fillZeros=None): 584 """convert a long hex integer to string 585 remove '0x' and 'L' return string 586 587 Args: 588 iValue: long integer in hex format 589 fillZeros: pad string with zeros on the left to specified width 590 591 Returns: 592 string of this long integer without '0x' and 'L' 593 """ 594 fmt = '%x' 595 if fillZeros is not None: 596 fmt = '%%0%dx' % fillZeros 597 598 return fmt % iValue 599 600 def __convertChannelMask(self, channelsArray): 601 """convert channelsArray to bitmask format 602 603 Args: 604 channelsArray: channel array (i.e. [21, 22]) 605 606 Returns: 607 bitmask format corresponding to a given channel array 608 """ 609 maskSet = 0 610 611 for eachChannel in channelsArray: 612 mask = 1 << eachChannel 613 maskSet = maskSet | mask 614 615 return maskSet 616 617 def __ChannelMaskListToStr(self, channelList): 618 """convert a channel list to a string 619 620 Args: 621 channelList: channel list (i.e. [21, 22, 23]) 622 623 Returns: 624 a comma separated channel string (i.e. '21, 22, 23') 625 """ 626 chan_mask_range = '' 627 if isinstance(channelList, list): 628 if len(channelList): 629 chan_mask_range = ','.join(str(chan) for chan in channelList) 630 else: 631 print('empty list') 632 else: 633 print('not a valid channel list: %s' % channelList) 634 635 return chan_mask_range 636 637 def __setChannelMask(self, channelMask): 638 print('call _setChannelMask') 639 try: 640 cmd = self.wpan_cmd_prefix + 'setprop NCP:ChannelMask %s' % channelMask 641 datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:ChannelMaskPage0 %s' % channelMask 642 self.hasActiveDatasetToCommit = True 643 return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' 644 except Exception as e: 645 ModuleHelper.WriteIntoDebugLogger('setChannelMask() Error: ' + str(e)) 646 647 def __setSecurityPolicy(self, securityPolicySecs, securityPolicyFlags): 648 print('call _setSecurityPolicy') 649 try: 650 cmd1 = self.wpan_cmd_prefix + 'setprop Dataset:SecPolicy:KeyRotation %s' % str(securityPolicySecs) 651 if securityPolicyFlags == 'onrc': 652 cmd2 = self.wpan_cmd_prefix + 'setprop Dataset:SecPolicy:Flags 0xf7' 653 else: 654 print('unknown policy flag :' + securityPolicyFlags) 655 return False 656 self.hasActiveDatasetToCommit = True 657 return self.__sendCommand(cmd1) != 'Fail' and self.__sendCommand(cmd2) != 'Fail' 658 except Exception as e: 659 ModuleHelper.WriteIntoDebugLogger('setSecurityPolicy() Error: ' + str(e)) 660 661 def __setKeySwitchGuardTime(self, iKeySwitchGuardTime): 662 """ set the Key switch guard time 663 664 Args: 665 iKeySwitchGuardTime: key switch guard time 666 667 Returns: 668 True: successful to set key switch guard time 669 False: fail to set key switch guard time 670 """ 671 print('%s call setKeySwitchGuardTime' % self.port) 672 print(iKeySwitchGuardTime) 673 try: 674 cmd = self.wpan_cmd_prefix + 'setprop Network:KeySwitchGuardTime %s' % str(iKeySwitchGuardTime) 675 if self.__sendCommand(cmd)[0] != 'Fail': 676 time.sleep(1) 677 return True 678 else: 679 return False 680 except Exception as e: 681 ModuleHelper.WriteIntoDebugLogger('setKeySwitchGuardTime() Error; ' + str(e)) 682 683 def __getCommissionerSessionId(self): 684 """ get the commissioner session id allocated from Leader """ 685 print('%s call getCommissionerSessionId' % self.port) 686 return self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Commissioner:SessionId')[0] 687 688 def __getJoinerState(self): 689 """ get joiner state """ 690 maxDuration = 150 # seconds 691 t_end = time.time() + maxDuration 692 while time.time() < t_end: 693 joinerState = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) 694 if joinerState == 'offline:commissioned': 695 return True 696 elif joinerState == 'associating:credentials-needed': 697 return False 698 else: 699 time.sleep(5) 700 continue 701 return False 702 703 def _connect(self): 704 if self.connectType == 'usb': 705 print('My port is %s' % self.port) 706 try: 707 self.handle = serial.Serial(self.port, 115200, timeout=0.2) 708 except Exception as e: 709 ModuleHelper.WriteIntoDebugLogger('open serial error ' + str(e)) 710 711 try: 712 attempts = 0 713 user_prompted = False 714 pwd_prompted = False 715 while attempts < 10 or pwd_prompted: 716 time.sleep(0.5) 717 attempts = attempts + 1 718 print('attempts...%d' % attempts) 719 720 input_data = self.handle.read(self.handle.inWaiting()) 721 722 if not input_data: 723 if not user_prompted: 724 self.handle.write('\n') 725 time.sleep(0.5) 726 continue 727 728 if 'login' in input_data: 729 self.handle.write(self.username + '\n') 730 time.sleep(0.3) 731 print('user prompted') 732 user_prompted = True 733 734 elif 'Password' in input_data: 735 print('password prompted') 736 time.sleep(0.3) 737 self.handle.write(self.password + '\n') 738 pwd_prompted = True 739 740 elif self.prompt in input_data: 741 print('login success (serial)') 742 time.sleep(0.3) 743 self.deviceConnected = True 744 for precmd in self.precmd: 745 self.handle.write(precmd + '\n') 746 time.sleep(0.3) 747 time.sleep(1) 748 break 749 if not self.deviceConnected: 750 raise Exception('login fail') 751 else: 752 self._is_net = False 753 except Exception as e: 754 ModuleHelper.WriteIntoDebugLogger('connect to serial Error: ' + str(e)) 755 756 elif self.connectType == 'ip': 757 print('My IP: %s Port: %s' % (self.dutIpv4, self.dutPort)) 758 try: 759 import paramiko 760 761 if not self.password: 762 transport = paramiko.Transport((self.dutIpv4, int(self.dutPort))) 763 transport.start_client() 764 transport.auth_none(self.username) 765 self.handle = paramiko.SSHClient() 766 self.handle.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 767 self.handle._transport = transport 768 else: 769 self.handle = paramiko.SSHClient() 770 self.handle.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 771 self.handle.connect(self.dutIpv4, 772 port=int(self.dutPort), 773 username=self.username, 774 password=self.password) 775 print('login success (ssh)') 776 self.deviceConnected = True 777 for precmd in self.precmd: 778 self.handle.exec_command(precmd + '\n') 779 time.sleep(0.5) 780 self._is_net = True 781 782 except Exception as e: 783 ModuleHelper.WriteIntoDebugLogger('connect to ssh Error: ' + str(e)) 784 else: 785 raise Exception('Unknown port schema') 786 787 def closeConnection(self): 788 """close current serial port connection""" 789 print('%s call closeConnection' % self.port) 790 try: 791 if self.handle: 792 self.handle.close() 793 self.handle = None 794 except Exception as e: 795 ModuleHelper.WriteIntoDebugLogger('closeConnection() Error: ' + str(e)) 796 797 def intialize(self): 798 """initialize the serial port with baudrate, timeout parameters""" 799 print('%s call intialize' % self.port) 800 try: 801 # init serial port 802 self.deviceConnected = False 803 self._connect() 804 805 if self.deviceConnected: 806 self.UIStatusMsg = self.getVersionNumber() 807 if self.firmwarePrefix not in self.UIStatusMsg: 808 self.deviceConnected = False 809 self.UIStatusMsg = ('Firmware Not Matching Expecting ' + self.firmwarePrefix + ' Now is ' + 810 self.UIStatusMsg) 811 raise Exception('Err: OpenThread device Firmware not matching..') 812 self.__sendCommand(self.wpan_cmd_prefix + 'leave') 813 self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase') 814 else: 815 raise Exception('Err: Device not connected ..') 816 817 except Exception as e: 818 ModuleHelper.WriteIntoDebugLogger('intialize() Error: ' + str(e)) 819 820 def setNetworkName(self, networkName='GRL'): 821 """set Thread Network name 822 823 Args: 824 networkName: the networkname string to be set 825 826 Returns: 827 True: successful to set the Thread Networkname 828 False: fail to set the Thread Networkname 829 """ 830 print('%s call setNetworkName' % self.port) 831 assert '"' not in networkName 832 833 try: 834 cmd = self.wpan_cmd_prefix + 'setprop -s Network:Name "%s"' % networkName 835 datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkName "%s"' % networkName 836 self.hasActiveDatasetToCommit = True 837 return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' 838 except Exception as e: 839 ModuleHelper.WriteIntoDebugLogger('setNetworkName() Error: ' + str(e)) 840 841 def setChannel(self, channel=15): 842 """set channel of Thread device operates on. 843 844 Args: 845 channel: 846 (0 - 10: Reserved) 847 (11 - 26: 2.4GHz channels) 848 (27 - 65535: Reserved) 849 850 Returns: 851 True: successful to set the channel 852 False: fail to set the channel 853 """ 854 print('%s call setChannel' % self.port) 855 856 try: 857 cmd = self.wpan_cmd_prefix + 'setprop NCP:Channel %s' % channel 858 datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:Channel %s' % channel 859 self.hasActiveDatasetToCommit = True 860 return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' 861 except Exception as e: 862 ModuleHelper.WriteIntoDebugLogger('setChannel() Error: ' + str(e)) 863 864 def getChannel(self): 865 """get current channel""" 866 print('%s call getChannel' % self.port) 867 return self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:Channel')[0] 868 869 def setMAC(self, xEUI): 870 """set the extended address of Thread device 871 872 Args: 873 xEUI: extended address in hex format 874 875 Returns: 876 True: successful to set the extended address 877 False: fail to set the extended address 878 """ 879 print('%s call setMAC' % self.port) 880 881 address64 = '' 882 try: 883 if not isinstance(xEUI, str): 884 address64 = self.__convertLongToHex(xEUI, 16) 885 else: 886 address64 = xEUI 887 888 cmd = self.wpan_cmd_prefix + 'setprop NCP:MACAddress %s' % address64 889 890 if self.__sendCommand(cmd)[0] != 'Fail': 891 self.mac = address64 892 return True 893 else: 894 return False 895 except Exception as e: 896 ModuleHelper.WriteIntoDebugLogger('setMAC() Error: ' + str(e)) 897 898 def getMAC(self, bType=MacType.RandomMac): 899 """get one specific type of MAC address 900 currently OpenThreadWpan only supports Random MAC address 901 902 Args: 903 bType: indicate which kind of MAC address is required 904 905 Returns: 906 specific type of MAC address 907 """ 908 print('%s call getMAC' % self.port) 909 910 # if power down happens, return extended address assigned previously 911 if self.isPowerDown: 912 macAddr64 = self.mac 913 else: 914 if bType == MacType.FactoryMac: 915 macAddr64 = self.__stripValue( 916 self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:HardwareAddress')[0]) 917 elif bType == MacType.HashMac: 918 macAddr64 = self.__stripValue( 919 self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:MACAddress')[0]) 920 else: 921 macAddr64 = self.__stripValue( 922 self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:ExtendedAddress')[0]) 923 924 return int(macAddr64, 16) 925 926 def getLL64(self): 927 """get link local unicast IPv6 address""" 928 print('%s call getLL64' % self.port) 929 return self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:LinkLocalAddress')[0]) 930 931 def getMLEID(self): 932 """get mesh local endpoint identifier address""" 933 print('%s call getMLEID' % self.port) 934 return self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:MeshLocalAddress')[0]) 935 936 def getRloc16(self): 937 """get rloc16 short address""" 938 print('%s call getRloc16' % self.port) 939 rloc16 = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:RLOC16')[0] 940 return int(rloc16, 16) 941 942 def getRloc(self): 943 """get router locator unicast IPv6 address""" 944 print('%s call getRloc' % self.port) 945 prefix = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:MeshLocalPrefix')[0]) 946 mlprefix = prefix.split('/')[0] 947 rloc16 = self.__lstrip0x(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:RLOC16')[0]) 948 949 print('prefix: %s' % prefix) 950 print('mlprefix: %s ' % mlprefix) 951 print('rloc16: %s' % rloc16) 952 953 rloc = self.__padIp6Addr(mlprefix + '00ff:fe00:' + rloc16) 954 print('rloc: %s' % rloc) 955 return rloc 956 957 def getGlobal(self): 958 """get global unicast IPv6 address set 959 if configuring multiple entries 960 """ 961 print('%s call getGlobal' % self.port) 962 globalAddrs = [] 963 964 mleid = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:MeshLocalAddress')[0]) 965 966 mleid = ModuleHelper.GetFullIpv6Address(mleid).lower() 967 968 addrs = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:AllAddresses') 969 970 # find rloc address firstly as a reference for current mesh local prefix as for some TCs, 971 # mesh local prefix may be updated through pending dataset management. 972 for ip6AddrItem in addrs: 973 if re.match(r'\[|\]', ip6AddrItem): 974 continue 975 if re.match(self.prompt, ip6AddrItem, re.M | re.I): 976 break 977 ip6AddrItem = ip6AddrItem.strip() 978 ip6Addr = self.__stripValue(ip6AddrItem).split(' ')[0] 979 980 fullIp = ModuleHelper.GetFullIpv6Address(ip6Addr).lower() 981 982 print('fullip %s' % fullIp) 983 984 if fullIp.startswith('fe80'): 985 continue 986 987 if fullIp.startswith(mleid[0:19]): 988 continue 989 990 print('global') 991 globalAddrs.append(fullIp) 992 993 return globalAddrs 994 995 def setNetworkKey(self, key): 996 """set Thread network key 997 998 Args: 999 key: Thread network key used in secure the MLE/802.15.4 packet 1000 1001 Returns: 1002 True: successful to set the Thread network key 1003 False: fail to set the Thread network key 1004 """ 1005 networkKey = '' 1006 print('%s call setNetworkKey' % self.port) 1007 1008 try: 1009 if not isinstance(key, str): 1010 networkKey = self.__convertLongToHex(key, 32) 1011 cmd = self.wpan_cmd_prefix + 'setprop Network:Key %s' % networkKey 1012 datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkKey %s' % networkKey 1013 else: 1014 networkKey = key 1015 cmd = self.wpan_cmd_prefix + 'setprop Network:Key %s' % networkKey 1016 datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkKey %s' % networkKey 1017 1018 self.networkKey = networkKey 1019 self.hasActiveDatasetToCommit = True 1020 return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' 1021 except Exception as e: 1022 ModuleHelper.WriteIntoDebugLogger('setNetworkkey() Error: ' + str(e)) 1023 1024 def addBlockedMAC(self, xEUI): 1025 """add a given extended address to the denylist entry 1026 1027 Args: 1028 xEUI: extended address in hex format 1029 1030 Returns: 1031 True: successful to add a given extended address to the denylist entry 1032 False: fail to add a given extended address to the denylist entry 1033 """ 1034 print('%s call addBlockedMAC' % self.port) 1035 print(xEUI) 1036 if isinstance(xEUI, str): 1037 macAddr = xEUI 1038 else: 1039 macAddr = self.__convertLongToHex(xEUI) 1040 1041 try: 1042 # if blocked device is itself 1043 if macAddr == self.mac: 1044 print('block device itself') 1045 return True 1046 1047 if self._addressfilterMode != 'denylist': 1048 self.__setAddressfilterMode('denylist') 1049 1050 cmd = self.wpan_cmd_prefix + 'insert MAC:Denylist:Entries %s' % macAddr 1051 ret = self.__sendCommand(cmd)[0] != 'Fail' 1052 1053 self._addressfilterSet.add(macAddr) 1054 print('current denylist entries:') 1055 for addr in self._addressfilterSet: 1056 print(addr) 1057 1058 return ret 1059 except Exception as e: 1060 ModuleHelper.WriteIntoDebugLogger('addBlockedMAC() Error: ' + str(e)) 1061 1062 def addAllowMAC(self, xEUI): 1063 """add a given extended address to the allowlist addressfilter 1064 1065 Args: 1066 xEUI: a given extended address in hex format 1067 1068 Returns: 1069 True: successful to add a given extended address to the allowlist entry 1070 False: fail to add a given extended address to the allowlist entry 1071 """ 1072 print('%s call addAllowMAC' % self.port) 1073 print(xEUI) 1074 if isinstance(xEUI, str): 1075 macAddr = xEUI 1076 else: 1077 macAddr = self.__convertLongToHex(xEUI) 1078 1079 try: 1080 if self._addressfilterMode != 'allowlist': 1081 self.__setAddressfilterMode('allowlist') 1082 1083 cmd = self.wpan_cmd_prefix + 'insert MAC:Allowlist:Entries %s' % macAddr 1084 ret = self.__sendCommand(cmd)[0] != 'Fail' 1085 1086 self._addressfilterSet.add(macAddr) 1087 print('current allowlist entries:') 1088 for addr in self._addressfilterSet: 1089 print(addr) 1090 return ret 1091 1092 except Exception as e: 1093 ModuleHelper.WriteIntoDebugLogger('addAllowMAC() Error: ' + str(e)) 1094 1095 def clearBlockList(self): 1096 """clear all entries in denylist table 1097 1098 Returns: 1099 True: successful to clear the denylist 1100 False: fail to clear the denylist 1101 """ 1102 print('%s call clearBlockList' % self.port) 1103 1104 # remove all entries in denylist 1105 try: 1106 print('clearing denylist entries:') 1107 for addr in self._addressfilterSet: 1108 print(addr) 1109 1110 # disable denylist 1111 if self.__setAddressfilterMode('disable'): 1112 # clear ops 1113 for addr in self._addressfilterSet: 1114 cmd = self.wpan_cmd_prefix + 'remove MAC:Denylist:Entries ' + addr 1115 self.__sendCommand(cmd) 1116 1117 self._addressfilterSet.clear() 1118 return True 1119 1120 return False 1121 except Exception as e: 1122 ModuleHelper.WriteIntoDebugLogger('clearBlockList() Error: ' + str(e)) 1123 1124 def clearAllowList(self): 1125 """clear all entries in allowlist table 1126 1127 Returns: 1128 True: successful to clear the allowlist 1129 False: fail to clear the allowlist 1130 """ 1131 print('%s call clearAllowList' % self.port) 1132 1133 # remove all entries in allowlist 1134 try: 1135 print('clearing allowlist entries:') 1136 for addr in self._addressfilterSet: 1137 print(addr) 1138 1139 # disable allowlist 1140 if self.__setAddressfilterMode('disable'): 1141 # clear ops 1142 for addr in self._addressfilterSet: 1143 cmd = self.wpan_cmd_prefix + 'remove MAC:Allowlist:Entries ' + addr 1144 self.__sendCommand(cmd) 1145 1146 self._addressfilterSet.clear() 1147 return True 1148 1149 return False 1150 except Exception as e: 1151 ModuleHelper.WriteIntoDebugLogger('clearAllowList() Error: ' + str(e)) 1152 1153 def getDeviceRole(self): 1154 """get current device role in Thread Network""" 1155 print('%s call getDeviceRole' % self.port) 1156 return self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Network:NodeType')[0]) 1157 1158 def joinNetwork(self, eRoleId): 1159 """make device ready to join the Thread Network with a given role 1160 1161 Args: 1162 eRoleId: a given device role id 1163 1164 Returns: 1165 True: ready to set Thread Network parameter for joining desired Network 1166 """ 1167 print('%s call joinNetwork' % self.port) 1168 print(eRoleId) 1169 1170 self.deviceRole = eRoleId 1171 mode = 15 1172 try: 1173 if ModuleHelper.LeaderDutChannelFound: 1174 self.channel = ModuleHelper.Default_Channel 1175 1176 # FIXME: when Harness call setNetworkDataRequirement()? 1177 # only sleep end device requires stable networkdata now 1178 if eRoleId == Thread_Device_Role.Leader: 1179 print('join as leader') 1180 # rdn 1181 mode = 15 1182 if self.AutoDUTEnable is False: 1183 # set ROUTER_DOWNGRADE_THRESHOLD 1184 self.__setRouterDowngradeThreshold(33) 1185 elif eRoleId == Thread_Device_Role.Router: 1186 print('join as router') 1187 # rdn 1188 mode = 15 1189 if self.AutoDUTEnable is False: 1190 # set ROUTER_DOWNGRADE_THRESHOLD 1191 self.__setRouterDowngradeThreshold(33) 1192 elif eRoleId == Thread_Device_Role.SED: 1193 print('join as sleepy end device') 1194 # s 1195 mode = 4 1196 self.__setPollPeriod(self.__sedPollPeriod) 1197 elif eRoleId == Thread_Device_Role.EndDevice: 1198 print('join as end device') 1199 # rn 1200 mode = 13 1201 elif eRoleId == Thread_Device_Role.REED: 1202 print('join as REED') 1203 # rdn 1204 mode = 15 1205 # set ROUTER_UPGRADE_THRESHOLD 1206 self.__setRouterUpgradeThreshold(0) 1207 elif eRoleId == Thread_Device_Role.EndDevice_FED: 1208 # always remain an ED, never request to be a router 1209 print('join as FED') 1210 # rdn 1211 mode = 15 1212 # set ROUTER_UPGRADE_THRESHOLD 1213 self.__setRouterUpgradeThreshold(0) 1214 elif eRoleId == Thread_Device_Role.EndDevice_MED: 1215 print('join as MED') 1216 # rn 1217 mode = 13 1218 else: 1219 pass 1220 1221 # set Thread device mode with a given role 1222 self.__setDeviceMode(mode) 1223 time.sleep(0.1) 1224 # start OpenThreadWpan 1225 self.__startOpenThreadWpan() 1226 time.sleep(3) 1227 return True 1228 except Exception as e: 1229 ModuleHelper.WriteIntoDebugLogger('joinNetwork() Error: ' + str(e)) 1230 1231 def getNetworkFragmentID(self): 1232 """get current partition id of Thread Network Partition from LeaderData 1233 1234 Returns: 1235 The Thread network Partition Id 1236 """ 1237 print('%s call getNetworkFragmentID' % self.port) 1238 if not self.____isOpenThreadWpanRunning(): 1239 print('OpenThreadWpan is not running') 1240 return None 1241 1242 return self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Network:PartitionId')[0] 1243 1244 def getParentAddress(self): 1245 """get Thread device's parent extended address and rloc16 short address 1246 1247 Returns: 1248 The extended address of parent in hex format 1249 """ 1250 print('%s call getParentAddress' % self.port) 1251 parentInfo = [] 1252 parentInfo = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 1253 'getprop -v Thread:Parent')).split(' ') 1254 1255 return parentInfo[0] 1256 1257 def powerDown(self): 1258 """power down the OpenThreadWpan""" 1259 print('%s call powerDown' % self.port) 1260 if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset false')[0] != 'Fail': 1261 time.sleep(0.5) 1262 if self.__sendCommand(self.wpan_cmd_prefix + 'reset')[0] != 'Fail': 1263 self.isPowerDown = True 1264 return True 1265 else: 1266 return False 1267 else: 1268 return False 1269 1270 def powerUp(self): 1271 """power up the Thread device""" 1272 print('%s call powerUp' % self.port) 1273 if not self.handle: 1274 self._connect() 1275 1276 self.isPowerDown = False 1277 if self.__sendCommand(self.wpan_cmd_prefix + 'attach')[0] != 'Fail': 1278 time.sleep(3) 1279 else: 1280 return False 1281 1282 if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset true')[0] == 'Fail': 1283 return False 1284 1285 if self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) != 'associated': 1286 print('powerUp failed') 1287 return False 1288 else: 1289 return True 1290 1291 def reboot(self): 1292 """reset and rejoin to Thread Network without any timeout 1293 1294 Returns: 1295 True: successful to reset and rejoin the Thread Network 1296 False: fail to reset and rejoin the Thread Network 1297 """ 1298 print('%s call reboot' % self.port) 1299 try: 1300 self._sendline(self.wpan_cmd_prefix + 'reset') 1301 self.isPowerDown = True 1302 1303 if self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0] != 'associated': 1304 print('[FAIL] reboot') 1305 return False 1306 else: 1307 return True 1308 except Exception as e: 1309 ModuleHelper.WriteIntoDebugLogger('reboot() Error: ' + str(e)) 1310 1311 def ping(self, destination, length=20): 1312 """ send ICMPv6 echo request with a given length to a unicast destination 1313 address 1314 1315 Args: 1316 destination: the unicast destination address of ICMPv6 echo request 1317 length: the size of ICMPv6 echo request payload 1318 """ 1319 print('%s call ping' % self.port) 1320 print('destination: %s' % destination) 1321 try: 1322 cmd = 'ping %s -c 1 -s %s -I %s' % (destination, str(length), self.wpan_interface) 1323 if self._is_net: 1324 self.handle.exec_command(cmd) 1325 else: 1326 self._sendline(cmd) 1327 self._expect(cmd) 1328 # wait echo reply 1329 time.sleep(1) 1330 except Exception as e: 1331 ModuleHelper.WriteIntoDebugLogger('ping() Error: ' + str(e)) 1332 1333 def multicast_Ping(self, destination, length=20): 1334 """send ICMPv6 echo request with a given length to a multicast destination 1335 address 1336 1337 Args: 1338 destination: the multicast destination address of ICMPv6 echo request 1339 length: the size of ICMPv6 echo request payload 1340 """ 1341 print('%s call multicast_Ping' % self.port) 1342 print('destination: %s' % destination) 1343 try: 1344 cmd = 'ping %s -c 1 -s %s -I %s' % (destination, str(length), self.wpan_interface) 1345 if self._is_net: 1346 self.handle.exec_command(cmd) 1347 else: 1348 self._sendline(cmd) 1349 self._expect(cmd) 1350 # wait echo reply 1351 time.sleep(1) 1352 except Exception as e: 1353 ModuleHelper.WriteIntoDebugLogger('multicast_ping() Error: ' + str(e)) 1354 1355 def getVersionNumber(self): 1356 """get OpenThreadWpan stack firmware version number""" 1357 print('%s call getVersionNumber' % self.port) 1358 versionStr = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:Version')[0] 1359 1360 return self.__stripValue(versionStr) 1361 1362 def setPANID(self, xPAN): 1363 """set Thread Network PAN ID 1364 1365 Args: 1366 xPAN: a given PAN ID in hex format 1367 1368 Returns: 1369 True: successful to set the Thread Network PAN ID 1370 False: fail to set the Thread Network PAN ID 1371 """ 1372 print('%s call setPANID' % self.port) 1373 print(xPAN) 1374 panid = '' 1375 try: 1376 if not isinstance(xPAN, str): 1377 panid = str(hex(xPAN)) 1378 print(panid) 1379 1380 cmd = self.wpan_cmd_prefix + 'setprop -s Network:PANID %s' % panid 1381 datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:PanId %s' % panid 1382 self.hasActiveDatasetToCommit = True 1383 return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' 1384 except Exception as e: 1385 ModuleHelper.WriteIntoDebugLogger('setPANID() Error: ' + str(e)) 1386 1387 def reset(self): 1388 """factory reset""" 1389 print('%s call reset' % self.port) 1390 try: 1391 if self._is_net: 1392 self.__sendCommand(self.wpan_cmd_prefix + 'leave') 1393 else: 1394 self._sendline(self.wpan_cmd_prefix + 'leave') 1395 1396 self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase') 1397 time.sleep(2) 1398 if not self._is_net: 1399 self._read() 1400 except Exception as e: 1401 ModuleHelper.WriteIntoDebugLogger('reset() Error: ' + str(e)) 1402 1403 def removeRouter(self, xRouterId): 1404 """kick router with a given router id from the Thread Network 1405 1406 Args: 1407 xRouterId: a given router id in hex format 1408 1409 Returns: 1410 True: successful to remove the router from the Thread Network 1411 False: fail to remove the router from the Thread Network 1412 """ 1413 print('%s call removeRouter' % self.port) 1414 print(xRouterId) 1415 routerId = '' 1416 routerId = self.__convertRlocToRouterId(xRouterId) 1417 print(routerId) 1418 1419 if routerId is None: 1420 print('no matched xRouterId') 1421 return False 1422 1423 try: 1424 cmd = 'releaserouterid %s' % routerId 1425 return self.__sendCommand(cmd)[0] != 'Fail' 1426 except Exception as e: 1427 ModuleHelper.WriteIntoDebugLogger('removeRouter() Error: ' + str(e)) 1428 1429 def setDefaultValues(self): 1430 """set default mandatory Thread Network parameter value""" 1431 print('%s call setDefaultValues' % self.port) 1432 1433 # initialize variables 1434 self.networkName = ModuleHelper.Default_NwkName 1435 assert '"' not in self.networkName 1436 self.networkKey = ModuleHelper.Default_NwkKey 1437 self.channel = ModuleHelper.Default_Channel 1438 self.channelMask = '0x7fff800' # (0xffff << 11) 1439 self.panId = ModuleHelper.Default_PanId 1440 self.xpanId = ModuleHelper.Default_XpanId 1441 self.meshLocalPrefix = ModuleHelper.Default_MLPrefix 1442 # OT only accept hex format PSKc for now 1443 self.pskc = '00000000000000000000000000000000' 1444 self.securityPolicySecs = ModuleHelper.Default_SecurityPolicy 1445 self.securityPolicyFlags = 'onrc' 1446 self.activetimestamp = ModuleHelper.Default_ActiveTimestamp 1447 # self.sedPollingRate = ModuleHelper.Default_Harness_SED_Polling_Rate 1448 self.__sedPollPeriod = 3 * 1000 # in milliseconds 1449 self.deviceRole = None 1450 self.provisioningUrl = '' 1451 self.hasActiveDatasetToCommit = False 1452 self.logThread = Queue() 1453 self.logThreadStatus = self.logStatus['stop'] 1454 # indicate Thread device requests full or stable network data 1455 self.networkDataRequirement = '' 1456 # indicate if Thread device experiences a power down event 1457 self.isPowerDown = False 1458 # indicate AddressFilter mode ['disable', 'allowlist', 'denylist'] 1459 self._addressfilterMode = 'disable' 1460 self._addressfilterSet = set() # cache filter entries 1461 # indicate if Thread device is an active commissioner 1462 self.isActiveCommissioner = False 1463 self._lines = None # buffered lines read from device 1464 1465 # initialize device configuration 1466 try: 1467 self.setMAC(self.mac) 1468 self.__setChannelMask(self.channelMask) 1469 self.__setSecurityPolicy(self.securityPolicySecs, self.securityPolicyFlags) 1470 self.setChannel(self.channel) 1471 self.setPANID(self.panId) 1472 self.setXpanId(self.xpanId) 1473 self.setNetworkName(self.networkName) 1474 self.setNetworkKey(self.networkKey) 1475 self.setMLPrefix(self.meshLocalPrefix) 1476 self.setPSKc(self.pskc) 1477 self.setActiveTimestamp(self.activetimestamp) 1478 except Exception as e: 1479 ModuleHelper.WriteIntoDebugLogger('setDefaultValue() Error: ' + str(e)) 1480 1481 def getDeviceConncetionStatus(self): 1482 """check if serial port connection is ready or not""" 1483 print('%s call getDeviceConnectionStatus' % self.port) 1484 return self.deviceConnected 1485 1486 def setPollingRate(self, iPollingRate): 1487 """set data polling rate for sleepy end device 1488 1489 Args: 1490 iPollingRate: data poll period of sleepy end device (in seconds) 1491 1492 Returns: 1493 True: successful to set the data polling rate for sleepy end device 1494 False: fail to set the data polling rate for sleepy end device 1495 """ 1496 print('%s call setPollingRate' % self.port) 1497 1498 iPollingRate = int(iPollingRate * 1000) 1499 print(iPollingRate) 1500 1501 if self.__sedPollPeriod != iPollingRate: 1502 self.__sedPollPeriod = iPollingRate 1503 1504 # apply immediately 1505 if self.__isOpenThreadWpanRunning(): 1506 return self.__setPollPeriod(self.__sedPollPeriod) 1507 1508 return True 1509 1510 def __setPollPeriod(self, iPollPeriod): 1511 """set data poll period for sleepy end device 1512 1513 Args: 1514 iPollPeriod: data poll period of sleepy end device (in milliseconds) 1515 1516 Returns: 1517 True: successful to set the data poll period for sleepy end device 1518 False: fail to set the data poll period for sleepy end device 1519 """ 1520 try: 1521 cmd = self.wpan_cmd_prefix + 'setprop NCP:SleepyPollInterval %s' % str(iPollPeriod) 1522 print(cmd) 1523 return self.__sendCommand(cmd)[0] != 'Fail' 1524 except Exception as e: 1525 ModuleHelper.WriteIntoDebugLogger('__setPollingRate() Error: ' + str(e)) 1526 1527 def setLinkQuality(self, EUIadr, LinkQuality): 1528 """set custom LinkQualityIn for all receiving messages from the specified EUIadr 1529 1530 Args: 1531 EUIadr: a given extended address 1532 LinkQuality: a given custom link quality 1533 link quality/link margin mapping table 1534 3: 21 - 255 (dB) 1535 2: 11 - 20 (dB) 1536 1: 3 - 9 (dB) 1537 0: 0 - 2 (dB) 1538 1539 Returns: 1540 True: successful to set the link quality 1541 False: fail to set the link quality 1542 1543 @todo: required if as reference device 1544 """ 1545 1546 def setOutBoundLinkQuality(self, LinkQuality): 1547 """set custom LinkQualityIn for all receiving messages from the any address 1548 1549 Args: 1550 LinkQuality: a given custom link quality 1551 link quality/link margin mapping table 1552 3: 21 - 255 (dB) 1553 2: 11 - 20 (dB) 1554 1: 3 - 9 (dB) 1555 0: 0 - 2 (dB) 1556 1557 Returns: 1558 True: successful to set the link quality 1559 False: fail to set the link quality 1560 1561 @todo: required if as reference device 1562 """ 1563 1564 def removeRouterPrefix(self, prefixEntry): 1565 """remove the configured prefix on a border router 1566 1567 Args: 1568 prefixEntry: a on-mesh prefix entry 1569 1570 Returns: 1571 True: successful to remove the prefix entry from border router 1572 False: fail to remove the prefix entry from border router 1573 1574 @todo: required if as reference device 1575 """ 1576 1577 def resetAndRejoin(self, timeout): 1578 """reset and join back Thread Network with a given timeout delay 1579 1580 Args: 1581 timeout: a timeout interval before rejoin Thread Network 1582 1583 Returns: 1584 True: successful to reset and rejoin Thread Network 1585 False: fail to reset and rejoin the Thread Network 1586 """ 1587 print('%s call resetAndRejoin' % self.port) 1588 print(timeout) 1589 try: 1590 if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset false')[0] != 'Fail': 1591 time.sleep(0.5) 1592 if self.__sendCommand(self.wpan_cmd_prefix + 'reset')[0] != 'Fail': 1593 self.isPowerDown = True 1594 else: 1595 return False 1596 else: 1597 return False 1598 time.sleep(timeout) 1599 1600 if self.deviceRole == Thread_Device_Role.SED: 1601 self.__setPollPeriod(self.__sedPollPeriod) 1602 1603 if self.__sendCommand(self.wpan_cmd_prefix + 'attach')[0] != 'Fail': 1604 time.sleep(3) 1605 else: 1606 return False 1607 1608 if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset true')[0] == 'Fail': 1609 return False 1610 1611 if self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) != 'associated': 1612 print('[FAIL] reset and rejoin') 1613 return False 1614 return True 1615 except Exception as e: 1616 ModuleHelper.WriteIntoDebugLogger('resetAndRejoin() Error: ' + str(e)) 1617 1618 def configBorderRouter(self, 1619 P_Prefix, 1620 P_stable=1, 1621 P_default=1, 1622 P_slaac_preferred=0, 1623 P_Dhcp=0, 1624 P_preference=0, 1625 P_on_mesh=1, 1626 P_nd_dns=0): 1627 """configure the border router with a given prefix entry parameters 1628 1629 Args: 1630 P_Prefix: IPv6 prefix that is available on the Thread Network 1631 P_stable: true if the default router is expected to be stable network data 1632 P_default: true if border router offers the default route for P_Prefix 1633 P_slaac_preferred: true if allowing auto-configure address using P_Prefix 1634 P_Dhcp: is true if border router is a DHCPv6 Agent 1635 P_preference: is two-bit signed integer indicating router preference 1636 P_on_mesh: is true if P_Prefix is considered to be on-mesh 1637 P_nd_dns: is true if border router is able to supply DNS information obtained via ND 1638 1639 Returns: 1640 True: successful to configure the border router with a given prefix entry 1641 False: fail to configure the border router with a given prefix entry 1642 """ 1643 print('%s call configBorderRouter' % self.port) 1644 prefix = self.__convertIp6PrefixStringToIp6Address(str(P_Prefix)) 1645 print(prefix) 1646 try: 1647 parameter = '' 1648 1649 if P_slaac_preferred == 1: 1650 parameter += ' -a -f' 1651 1652 if P_stable == 1: 1653 parameter += ' -s' 1654 1655 if P_default == 1: 1656 parameter += ' -r' 1657 1658 if P_Dhcp == 1: 1659 parameter += ' -d' 1660 1661 if P_on_mesh == 1: 1662 parameter += ' -o' 1663 1664 cmd = self.wpan_cmd_prefix + 'add-prefix %s %s -P %d' % (prefix, parameter, P_preference) 1665 print(parameter) 1666 print(cmd) 1667 if self.__sendCommand(cmd)[0] != 'Fail': 1668 return True 1669 else: 1670 return False 1671 except Exception as e: 1672 ModuleHelper.WriteIntoDebugLogger('configBorderRouter() Error: ' + str(e)) 1673 1674 def setNetworkIDTimeout(self, iNwkIDTimeOut): 1675 """set networkid timeout for OpenThreadWpan 1676 1677 Args: 1678 iNwkIDTimeOut: a given NETWORK_ID_TIMEOUT 1679 1680 Returns: 1681 True: successful to set NETWORK_ID_TIMEOUT 1682 False: fail to set NETWORK_ID_TIMEOUT 1683 1684 @todo: required if as reference device 1685 """ 1686 1687 def setKeepAliveTimeOut(self, iTimeOut): 1688 """set keep alive timeout for device 1689 has been deprecated and also set SED polling rate 1690 1691 Args: 1692 iTimeOut: data poll period for sleepy end device 1693 1694 Returns: 1695 True: successful to set the data poll period for SED 1696 False: fail to set the data poll period for SED 1697 """ 1698 print('%s call setKeepAliveTimeOut' % self.port) 1699 print(iTimeOut) 1700 try: 1701 cmd = self.wpan_cmd_prefix + 'setprop NCP:SleepyPollInterval %s' % str(iTimeOut * 1000) 1702 print(cmd) 1703 return self.__sendCommand(cmd)[0] != 'Fail' 1704 except Exception as e: 1705 ModuleHelper.WriteIntoDebugLogger('setKeepAliveTimeOut() Error: ' + str(e)) 1706 1707 def setKeySequenceCounter(self, iKeySequenceValue): 1708 """ set the Key sequence counter corresponding to Thread network key 1709 1710 Args: 1711 iKeySequenceValue: key sequence value 1712 1713 Returns: 1714 True: successful to set the key sequence 1715 False: fail to set the key sequence 1716 """ 1717 print('%s call setKeySequenceCounter' % self.port) 1718 print(iKeySequenceValue) 1719 try: 1720 # avoid key switch guard timer protection for reference device 1721 self.__setKeySwitchGuardTime(0) 1722 1723 cmd = self.wpan_cmd_prefix + 'setprop Network:KeyIndex %s' % str(iKeySequenceValue) 1724 if self.__sendCommand(cmd)[0] != 'Fail': 1725 time.sleep(1) 1726 return True 1727 else: 1728 return False 1729 except Exception as e: 1730 ModuleHelper.WriteIntoDebugLogger('setKeySequenceCounter() Error: ' + str(e)) 1731 1732 def getKeySequenceCounter(self): 1733 """get current Thread Network key sequence""" 1734 print('%s call getKeySequenceCounter' % self.port) 1735 keySequence = '' 1736 keySequence = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Network:KeyIndex')[0] 1737 return keySequence 1738 1739 def incrementKeySequenceCounter(self, iIncrementValue=1): 1740 """increment the key sequence with a given value 1741 1742 Args: 1743 iIncrementValue: specific increment value to be added 1744 1745 Returns: 1746 True: successful to increment the key sequence with a given value 1747 False: fail to increment the key sequence with a given value 1748 """ 1749 print('%s call incrementKeySequenceCounter' % self.port) 1750 print(iIncrementValue) 1751 currentKeySeq = '' 1752 try: 1753 # avoid key switch guard timer protection for reference device 1754 self.__setKeySwitchGuardTime(0) 1755 1756 currentKeySeq = self.getKeySequenceCounter() 1757 keySequence = int(currentKeySeq, 10) + iIncrementValue 1758 print(keySequence) 1759 return self.setKeySequenceCounter(keySequence) 1760 except Exception as e: 1761 ModuleHelper.WriteIntoDebugLogger('incrementKeySequenceCounter() Error: ' + str(e)) 1762 1763 def setNetworkDataRequirement(self, eDataRequirement): 1764 """set whether the Thread device requires the full network data 1765 or only requires the stable network data 1766 1767 Args: 1768 eDataRequirement: is true if requiring the full network data 1769 1770 Returns: 1771 True: successful to set the network requirement 1772 """ 1773 print('%s call setNetworkDataRequirement' % self.port) 1774 print(eDataRequirement) 1775 1776 if eDataRequirement == Device_Data_Requirement.ALL_DATA: 1777 self.networkDataRequirement = 'n' 1778 return True 1779 1780 def configExternalRouter(self, P_Prefix, P_stable, R_Preference=0): 1781 """configure border router with a given external route prefix entry 1782 1783 Args: 1784 P_Prefix: IPv6 prefix for the route 1785 P_Stable: is true if the external route prefix is stable network data 1786 R_Preference: a two-bit signed integer indicating Router preference 1787 1: high 1788 0: medium 1789 -1: low 1790 1791 Returns: 1792 True: successful to configure the border router with a given external route prefix 1793 False: fail to configure the border router with a given external route prefix 1794 """ 1795 print('%s call configExternalRouter' % self.port) 1796 print(P_Prefix) 1797 prefix = self.__convertIp6PrefixStringToIp6Address(str(P_Prefix)) 1798 try: 1799 if P_stable: 1800 cmd = self.wpan_cmd_prefix + 'add-route %s -l 64 -p %d' % (prefix, R_Preference) 1801 else: 1802 cmd = self.wpan_cmd_prefix + 'add-route %s -l 64 -p %d -n' % (prefix, R_Preference) 1803 print(cmd) 1804 1805 if self.__sendCommand(cmd)[0] != 'Fail': 1806 return True 1807 else: 1808 return False 1809 except Exception as e: 1810 ModuleHelper.WriteIntoDebugLogger('configExternalRouter() Error: ' + str(e)) 1811 1812 def getNeighbouringRouters(self): 1813 """get neighboring routers information 1814 1815 Returns: 1816 neighboring routers' extended address 1817 1818 @todo: required if as reference device 1819 """ 1820 1821 def getChildrenInfo(self): 1822 """get all children information 1823 1824 Returns: 1825 children's extended address 1826 1827 @todo: required if as reference device 1828 """ 1829 1830 def setXpanId(self, xPanId): 1831 """set extended PAN ID of Thread Network 1832 1833 Args: 1834 xPanId: extended PAN ID in hex format 1835 1836 Returns: 1837 True: successful to set the extended PAN ID 1838 False: fail to set the extended PAN ID 1839 """ 1840 xpanid = '' 1841 print('%s call setXpanId' % self.port) 1842 print(xPanId) 1843 try: 1844 if not isinstance(xPanId, str): 1845 xpanid = self.__convertLongToHex(xPanId, 16) 1846 1847 cmd = self.wpan_cmd_prefix + 'setprop Network:XPANID %s' % xpanid 1848 datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:ExtendedPanId %s' % xpanid 1849 else: 1850 xpanid = xPanId 1851 cmd = self.wpan_cmd_prefix + 'setprop Network:XPANID %s' % xpanid 1852 datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:ExtendedPanId %s' % xpanid 1853 1854 self.xpanId = xpanid 1855 self.hasActiveDatasetToCommit = True 1856 return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' 1857 except Exception as e: 1858 ModuleHelper.WriteIntoDebugLogger('setXpanId() Error: ' + str(e)) 1859 1860 def getNeighbouringDevices(self): 1861 """gets the neighboring devices' extended address to compute the DUT 1862 extended address automatically 1863 1864 Returns: 1865 A list including extended address of neighboring routers, parent 1866 as well as children 1867 """ 1868 print('%s call getNeighbouringDevices' % self.port) 1869 neighbourList = [] 1870 1871 # get parent info 1872 parentAddr = self.getParentAddress() 1873 if parentAddr != 0: 1874 neighbourList.append(parentAddr) 1875 1876 # get ED/SED children info 1877 childNeighbours = self.getChildrenInfo() 1878 if childNeighbours is not None and len(childNeighbours) > 0: 1879 for entry in childNeighbours: 1880 neighbourList.append(entry) 1881 1882 # get neighboring routers info 1883 routerNeighbours = self.getNeighbouringRouters() 1884 if routerNeighbours is not None and len(routerNeighbours) > 0: 1885 for entry in routerNeighbours: 1886 neighbourList.append(entry) 1887 1888 print(neighbourList) 1889 return neighbourList 1890 1891 def setPartationId(self, partationId): 1892 """set Thread Network Partition ID 1893 1894 Args: 1895 partitionId: partition id to be set by leader 1896 1897 Returns: 1898 True: successful to set the Partition ID 1899 False: fail to set the Partition ID 1900 """ 1901 print('%s call setPartationId' % self.port) 1902 print(partationId) 1903 1904 cmd = self.wpan_cmd_prefix + 'setprop Network:PartitionId %s' % (str(hex(partationId)).rstrip('L')) 1905 print(cmd) 1906 return self.__sendCommand(cmd)[0] != 'Fail' 1907 1908 def getGUA(self, filterByPrefix=None): 1909 """get expected global unicast IPv6 address of OpenThreadWpan 1910 1911 note: existing filterByPrefix are string of in lowercase. e.g. 1912 '2001' or '2001:0db8:0001:0000". 1913 1914 Args: 1915 filterByPrefix: a given expected global IPv6 prefix to be matched 1916 1917 Returns: 1918 a global IPv6 address 1919 """ 1920 print('%s call getGUA' % self.port) 1921 print(filterByPrefix) 1922 globalAddrs = [] 1923 try: 1924 # get global addrs set if multiple 1925 globalAddrs = self.getGlobal() 1926 1927 if filterByPrefix is None: 1928 return self.globalAddrs[0] 1929 else: 1930 for fullIp in globalAddrs: 1931 if fullIp.startswith(filterByPrefix): 1932 print('target global %s' % fullIp) 1933 return fullIp 1934 print('no global address matched') 1935 return str(globalAddrs[0]) 1936 except Exception as e: 1937 ModuleHelper.WriteIntoDebugLogger('getGUA() Error: ' + str(e)) 1938 return e 1939 1940 def getShortAddress(self): 1941 """get Rloc16 short address of Thread device""" 1942 print('%s call getShortAddress' % self.port) 1943 return self.getRloc16() 1944 1945 def getULA64(self): 1946 """get mesh local EID of Thread device""" 1947 print('%s call getULA64' % self.port) 1948 return self.getMLEID() 1949 1950 def setMLPrefix(self, sMeshLocalPrefix): 1951 """set mesh local prefix""" 1952 print('%s call setMLPrefix' % self.port) 1953 try: 1954 cmd = self.wpan_cmd_prefix + 'setprop IPv6:MeshLocalPrefix %s' % sMeshLocalPrefix 1955 datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:MeshLocalPrefix %s' % sMeshLocalPrefix 1956 self.hasActiveDatasetToCommit = True 1957 return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' 1958 except Exception as e: 1959 ModuleHelper.WriteIntoDebugLogger('setMLPrefix() Error: ' + str(e)) 1960 1961 def getML16(self): 1962 """get mesh local 16 unicast address (Rloc)""" 1963 print('%s call getML16' % self.port) 1964 return self.getRloc() 1965 1966 def downgradeToDevice(self): 1967 pass 1968 1969 def upgradeToRouter(self): 1970 pass 1971 1972 def forceSetSlaac(self, slaacAddress): 1973 """@todo : required if as reference device""" 1974 1975 def setSleepyNodePollTime(self): 1976 pass 1977 1978 def enableAutoDUTObjectFlag(self): 1979 """set AutoDUTenable flag""" 1980 print('%s call enableAutoDUTObjectFlag' % self.port) 1981 self.AutoDUTEnable = True 1982 1983 def getChildTimeoutValue(self): 1984 """get child timeout""" 1985 print('%s call getChildTimeoutValue' % self.port) 1986 childTimeout = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:ChildTimeout')[0] 1987 return int(childTimeout) 1988 1989 def diagnosticGet(self, strDestinationAddr, listTLV_ids=[]): 1990 """@todo : required if as reference device""" 1991 1992 def diagnosticQuery(self, strDestinationAddr, listTLV_ids=[]): 1993 """@todo : required if as reference device""" 1994 self.diagnosticGet(strDestinationAddr, listTLV_ids) 1995 1996 def diagnosticReset(self, strDestinationAddr, listTLV_ids=[]): 1997 """@todo : required if as reference device""" 1998 1999 def startNativeCommissioner(self, strPSKc='GRLPASSPHRASE'): 2000 # TODO: Support the whole Native Commissioner functionality 2001 # Currently it only aims to trigger a Discovery Request message to pass 2002 # Certification test 5.8.4 2003 print('%s call startNativeCommissioner' % self.port) 2004 cmd = self.wpan_cmd_prefix + 'joiner --start %s' % (strPSKc) 2005 print(cmd) 2006 if self.__sendCommand(cmd)[0] != 'Fail': 2007 return True 2008 else: 2009 return False 2010 2011 def startCollapsedCommissioner(self): 2012 """start Collapsed Commissioner 2013 2014 Returns: 2015 True: successful to start Commissioner 2016 False: fail to start Commissioner 2017 """ 2018 print('%s call startCollapsedCommissioner' % self.port) 2019 startCmd = self.wpan_cmd_prefix + 'form "%s" -c %s -T router' % (self.networkName, str(self.channel)) 2020 if self.__sendCommand(startCmd) != 'Fail': 2021 time.sleep(2) 2022 cmd = self.wpan_cmd_prefix + 'commissioner start' 2023 print(cmd) 2024 if self.__sendCommand(cmd)[0] != 'Fail': 2025 self.isActiveCommissioner = True 2026 time.sleep(20) # time for petition process 2027 return True 2028 return False 2029 2030 def setJoinKey(self, strPSKc): 2031 pass 2032 2033 def scanJoiner(self, xEUI='*', strPSKd='THREADJPAKETEST'): 2034 """scan Joiner 2035 2036 Args: 2037 xEUI: Joiner's EUI-64 2038 strPSKd: Joiner's PSKd for commissioning 2039 2040 Returns: 2041 True: successful to add Joiner's steering data 2042 False: fail to add Joiner's steering data 2043 """ 2044 print('%s call scanJoiner' % self.port) 2045 if not isinstance(xEUI, str): 2046 eui64 = self.__convertLongToHex(xEUI, 16) 2047 else: 2048 eui64 = xEUI 2049 2050 # long timeout value to avoid automatic joiner removal (in seconds) 2051 timeout = 500 2052 2053 cmd = self.wpan_cmd_prefix + 'commissioner joiner-add "%s" %s %s' % (eui64, str(timeout), strPSKd) 2054 print(cmd) 2055 if not self.isActiveCommissioner: 2056 self.startCollapsedCommissioner() 2057 if self.__sendCommand(cmd)[0] != 'Fail': 2058 return True 2059 else: 2060 return False 2061 2062 def setProvisioningUrl(self, strURL='grl.com'): 2063 """set provisioning Url 2064 2065 Args: 2066 strURL: Provisioning Url string 2067 2068 Returns: 2069 True: successful to set provisioning Url 2070 False: fail to set provisioning Url 2071 """ 2072 print('%s call setProvisioningUrl' % self.port) 2073 self.provisioningUrl = strURL 2074 if self.deviceRole == Thread_Device_Role.Commissioner: 2075 cmd = self.wpan_cmd_prefix + 'setprop Commissioner:ProvisioningUrl %s' % (strURL) 2076 print(cmd) 2077 return self.__sendCommand(cmd)[0] != 'Fail' 2078 return True 2079 2080 def allowCommission(self): 2081 """start commissioner candidate petition process 2082 2083 Returns: 2084 True: successful to start commissioner candidate petition process 2085 False: fail to start commissioner candidate petition process 2086 """ 2087 print('%s call allowCommission' % self.port) 2088 try: 2089 cmd = self.wpan_cmd_prefix + 'commissioner start' 2090 print(cmd) 2091 if self.isActiveCommissioner: 2092 return True 2093 if self.__sendCommand(cmd)[0] != 'Fail': 2094 self.isActiveCommissioner = True 2095 time.sleep(40) # time for petition process and at least one keep alive 2096 return True 2097 else: 2098 return False 2099 except Exception as e: 2100 ModuleHelper.WriteIntoDebugLogger('allowcommission() error: ' + str(e)) 2101 2102 def joinCommissioned(self, strPSKd='THREADJPAKETEST', waitTime=20): 2103 """start joiner 2104 2105 Args: 2106 strPSKd: Joiner's PSKd 2107 2108 Returns: 2109 True: successful to start joiner 2110 False: fail to start joiner 2111 """ 2112 print('%s call joinCommissioned' % self.port) 2113 cmd = self.wpan_cmd_prefix + 'joiner --start %s %s' % (strPSKd, self.provisioningUrl) 2114 print(cmd) 2115 if self.__sendCommand(cmd)[0] != 'Fail': 2116 if self.__getJoinerState(): 2117 self.__sendCommand(self.wpan_cmd_prefix + 'joiner --attach') 2118 time.sleep(30) 2119 return True 2120 else: 2121 return False 2122 else: 2123 return False 2124 2125 def getCommissioningLogs(self): 2126 """get Commissioning logs 2127 2128 Returns: 2129 Commissioning logs 2130 """ 2131 rawLogs = self.logThread.get() 2132 ProcessedLogs = [] 2133 payload = [] 2134 while not rawLogs.empty(): 2135 rawLogEach = rawLogs.get() 2136 print(rawLogEach) 2137 if '[THCI]' not in rawLogEach: 2138 continue 2139 2140 EncryptedPacket = PlatformDiagnosticPacket() 2141 infoList = rawLogEach.split('[THCI]')[1].split(']')[0].split('|') 2142 for eachInfo in infoList: 2143 print(eachInfo) 2144 info = eachInfo.split('=') 2145 infoType = info[0].strip() 2146 infoValue = info[1].strip() 2147 if 'direction' in infoType: 2148 EncryptedPacket.Direction = (PlatformDiagnosticPacket_Direction.IN 2149 if 'recv' in infoValue else PlatformDiagnosticPacket_Direction.OUT if 2150 'send' in infoValue else PlatformDiagnosticPacket_Direction.UNKNOWN) 2151 elif 'type' in infoType: 2152 EncryptedPacket.Type = (PlatformDiagnosticPacket_Type.JOIN_FIN_req if 'JOIN_FIN.req' in infoValue 2153 else PlatformDiagnosticPacket_Type.JOIN_FIN_rsp if 'JOIN_FIN.rsp' 2154 in infoValue else PlatformDiagnosticPacket_Type.JOIN_ENT_req if 2155 'JOIN_ENT.ntf' in infoValue else PlatformDiagnosticPacket_Type.JOIN_ENT_rsp 2156 if 'JOIN_ENT.rsp' in infoValue else PlatformDiagnosticPacket_Type.UNKNOWN) 2157 elif 'len' in infoType: 2158 bytesInEachLine = 16 2159 EncryptedPacket.TLVsLength = int(infoValue) 2160 payloadLineCount = (int(infoValue) + bytesInEachLine - 1) / bytesInEachLine 2161 while payloadLineCount > 0: 2162 payloadLineCount = payloadLineCount - 1 2163 payloadLine = rawLogs.get() 2164 payloadSplit = payloadLine.split('|') 2165 for block in range(1, 3): 2166 payloadBlock = payloadSplit[block] 2167 payloadValues = payloadBlock.split(' ') 2168 for num in range(1, 9): 2169 if '..' not in payloadValues[num]: 2170 payload.append(int(payloadValues[num], 16)) 2171 2172 EncryptedPacket.TLVs = PlatformPackets.read(EncryptedPacket.Type, payload) if payload != [] else [] 2173 2174 ProcessedLogs.append(EncryptedPacket) 2175 return ProcessedLogs 2176 2177 def MGMT_ED_SCAN(self, sAddr, xCommissionerSessionId, listChannelMask, xCount, xPeriod, xScanDuration): 2178 """send MGMT_ED_SCAN message to a given destinaition. 2179 2180 Args: 2181 sAddr: IPv6 destination address for this message 2182 xCommissionerSessionId: commissioner session id 2183 listChannelMask: a channel array to indicate which channels to be scanned 2184 xCount: number of IEEE 802.15.4 ED Scans (milliseconds) 2185 xPeriod: Period between successive IEEE802.15.4 ED Scans (milliseconds) 2186 xScanDuration: IEEE 802.15.4 ScanDuration to use when performing an IEEE 802.15.4 ED Scan (milliseconds) 2187 2188 Returns: 2189 True: successful to send MGMT_ED_SCAN message. 2190 False: fail to send MGMT_ED_SCAN message 2191 """ 2192 print('%s call MGMT_ED_SCAN' % self.port) 2193 channelMask = '' 2194 channelMask = self.__ChannelMaskListToStr(listChannelMask) 2195 try: 2196 cmd = self.wpan_cmd_prefix + 'commissioner energy-scan %s %s %s %s %s' % ( 2197 channelMask, 2198 xCount, 2199 xPeriod, 2200 xScanDuration, 2201 sAddr, 2202 ) 2203 print(cmd) 2204 return self.__sendCommand(cmd) != 'Fail' 2205 except Exception as e: 2206 ModuleHelper.WriteIntoDebugLogger('MGMT_ED_SCAN() error: ' + str(e)) 2207 2208 def MGMT_PANID_QUERY(self, sAddr, xCommissionerSessionId, listChannelMask, xPanId): 2209 """send MGMT_PANID_QUERY message to a given destination 2210 2211 Args: 2212 xPanId: a given PAN ID to check the conflicts 2213 2214 Returns: 2215 True: successful to send MGMT_PANID_QUERY message. 2216 False: fail to send MGMT_PANID_QUERY message. 2217 """ 2218 print('%s call MGMT_PANID_QUERY' % self.port) 2219 panid = '' 2220 channelMask = '' 2221 channelMask = self.__ChannelMaskListToStr(listChannelMask) 2222 2223 if not isinstance(xPanId, str): 2224 panid = str(hex(xPanId)) 2225 2226 try: 2227 cmd = self.wpan_cmd_prefix + 'commissioner pan-id-query %s %s %s' % (panid, channelMask, sAddr) 2228 print(cmd) 2229 return self.__sendCommand(cmd) != 'Fail' 2230 except Exception as e: 2231 ModuleHelper.WriteIntoDebugLogger('MGMT_PANID_QUERY() error: ' + str(e)) 2232 2233 def MGMT_ANNOUNCE_BEGIN(self, sAddr, xCommissionerSessionId, listChannelMask, xCount, xPeriod): 2234 """send MGMT_ANNOUNCE_BEGIN message to a given destination 2235 2236 Returns: 2237 True: successful to send MGMT_ANNOUNCE_BEGIN message. 2238 False: fail to send MGMT_ANNOUNCE_BEGIN message. 2239 """ 2240 print('%s call MGMT_ANNOUNCE_BEGIN' % self.port) 2241 channelMask = '' 2242 channelMask = self.__ChannelMaskListToStr(listChannelMask) 2243 try: 2244 cmd = self.wpan_cmd_prefix + 'commissioner announce-begin %s %s %s %s' % ( 2245 channelMask, 2246 xCount, 2247 xPeriod, 2248 sAddr, 2249 ) 2250 print(cmd) 2251 return self.__sendCommand(cmd) != 'Fail' 2252 except Exception as e: 2253 ModuleHelper.WriteIntoDebugLogger('MGMT_ANNOUNCE_BEGIN() error: ' + str(e)) 2254 2255 def MGMT_ACTIVE_GET(self, Addr='', TLVs=[]): 2256 """send MGMT_ACTIVE_GET command 2257 2258 Returns: 2259 True: successful to send MGMT_ACTIVE_GET 2260 False: fail to send MGMT_ACTIVE_GET 2261 """ 2262 print('%s call MGMT_ACTIVE_GET' % self.port) 2263 try: 2264 2265 cmd = self.wpan_cmd_prefix + 'dataset mgmt-get-active' 2266 2267 if len(TLVs) != 0: 2268 tlvs = ''.join('%02x' % tlv for tlv in TLVs) 2269 setTLVCmd = self.wpan_cmd_prefix + 'setprop Dataset:RawTlvs ' + tlvs 2270 if self.__sendCommand(setTLVCmd)[0] == 'Fail': 2271 return False 2272 else: 2273 if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail': 2274 return False 2275 2276 if Addr != '': 2277 setAddressCmd = self.wpan_cmd_prefix + 'setprop Dataset:DestIpAddress ' + Addr 2278 if self.__sendCommand(setAddressCmd)[0] == 'Fail': 2279 return False 2280 2281 print(cmd) 2282 2283 return self.__sendCommand(cmd)[0] != 'Fail' 2284 2285 except Exception as e: 2286 ModuleHelper.WriteIntoDebugLogger('MGMT_ACTIVE_GET() Error: ' + str(e)) 2287 2288 def MGMT_ACTIVE_SET( 2289 self, 2290 sAddr='', 2291 xCommissioningSessionId=None, 2292 listActiveTimestamp=None, 2293 listChannelMask=None, 2294 xExtendedPanId=None, 2295 sNetworkName=None, 2296 sPSKc=None, 2297 listSecurityPolicy=None, 2298 xChannel=None, 2299 sMeshLocalPrefix=None, 2300 xMasterKey=None, 2301 xPanId=None, 2302 xTmfPort=None, 2303 xSteeringData=None, 2304 xBorderRouterLocator=None, 2305 BogusTLV=None, 2306 xDelayTimer=None, 2307 ): 2308 """send MGMT_ACTIVE_SET command 2309 2310 Returns: 2311 True: successful to send MGMT_ACTIVE_SET 2312 False: fail to send MGMT_ACTIVE_SET 2313 """ 2314 print('%s call MGMT_ACTIVE_SET' % self.port) 2315 try: 2316 cmd = self.wpan_cmd_prefix + 'dataset mgmt-set-active' 2317 2318 if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail': 2319 return False 2320 2321 if listActiveTimestamp is not None: 2322 sActiveTimestamp = '%016x' % listActiveTimestamp[0] 2323 setActiveTimeCmd = self.wpan_cmd_prefix + 'setprop Dataset:ActiveTimestamp ' + sActiveTimestamp 2324 if self.__sendCommand(setActiveTimeCmd)[0] == 'Fail': 2325 return False 2326 2327 if xExtendedPanId is not None: 2328 xpanid = self.__convertLongToHex(xExtendedPanId, 16) 2329 setExtendedPanIdCmd = self.wpan_cmd_prefix + 'setprop Dataset:ExtendedPanId ' + xpanid 2330 if self.__sendCommand(setExtendedPanIdCmd)[0] == 'Fail': 2331 return False 2332 2333 if sNetworkName is not None: 2334 setNetworkNameCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkName ' + str(sNetworkName) 2335 if self.__sendCommand(setNetworkNameCmd)[0] == 'Fail': 2336 return False 2337 2338 if xChannel is not None: 2339 setChannelCmd = self.wpan_cmd_prefix + 'setprop Dataset:Channel ' + str(xChannel) 2340 if self.__sendCommand(setChannelCmd)[0] == 'Fail': 2341 return False 2342 2343 if sMeshLocalPrefix is not None: 2344 setMLPrefixCmd = self.wpan_cmd_prefix + 'setprop Dataset:MeshLocalPrefix ' + str(sMeshLocalPrefix) 2345 if self.__sendCommand(setMLPrefixCmd)[0] == 'Fail': 2346 return False 2347 2348 if xMasterKey is not None: 2349 key = self.__convertLongToHex(xMasterKey, 32) 2350 setNetworkKeyCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkKey ' + key 2351 if self.__sendCommand(setNetworkKeyCmd)[0] == 'Fail': 2352 return False 2353 2354 if xPanId is not None: 2355 setPanIdCmd = self.wpan_cmd_prefix + 'setprop Dataset:PanId ' + str(xPanId) 2356 if self.__sendCommand(setPanIdCmd)[0] == 'Fail': 2357 return False 2358 2359 if listChannelMask is not None: 2360 setChannelMaskCmd = (self.wpan_cmd_prefix + 'setprop Dataset:ChannelMaskPage0 ' + '0x' + 2361 self.__convertLongToHex(self.__convertChannelMask(listChannelMask))) 2362 if self.__sendCommand(setChannelMaskCmd)[0] == 'Fail': 2363 return False 2364 2365 if (sPSKc is not None or listSecurityPolicy is not None or xCommissioningSessionId is not None or 2366 xTmfPort is not None or xSteeringData is not None or xBorderRouterLocator is not None or 2367 BogusTLV is not None): 2368 setRawTLVCmd = self.wpan_cmd_prefix + 'setprop Dataset:RawTlvs ' 2369 2370 if sPSKc is not None: 2371 setRawTLVCmd += '0410' 2372 stretchedPskc = Thread_PBKDF2.get(sPSKc, ModuleHelper.Default_XpanId, ModuleHelper.Default_NwkName) 2373 pskc = '%x' % stretchedPskc 2374 2375 if len(pskc) < 32: 2376 pskc = pskc.zfill(32) 2377 2378 setRawTLVCmd += pskc 2379 2380 if listSecurityPolicy is not None: 2381 setRawTLVCmd += '0c03' 2382 2383 rotationTime = 0 2384 policyBits = 0 2385 2386 # previous passing way listSecurityPolicy=[True, True, 3600, 2387 # False, False, True] 2388 if len(listSecurityPolicy) == 6: 2389 rotationTime = listSecurityPolicy[2] 2390 2391 # the last three reserved bits must be 1 2392 policyBits = 0b00000111 2393 2394 if listSecurityPolicy[0]: 2395 policyBits = policyBits | 0b10000000 2396 if listSecurityPolicy[1]: 2397 policyBits = policyBits | 0b01000000 2398 if listSecurityPolicy[3]: 2399 policyBits = policyBits | 0b00100000 2400 if listSecurityPolicy[4]: 2401 policyBits = policyBits | 0b00010000 2402 if listSecurityPolicy[5]: 2403 policyBits = policyBits | 0b00001000 2404 else: 2405 # new passing way listSecurityPolicy=[3600, 0b11001111] 2406 rotationTime = listSecurityPolicy[0] 2407 policyBits = listSecurityPolicy[1] 2408 2409 policy = str(hex(rotationTime))[2:] 2410 2411 if len(policy) < 4: 2412 policy = policy.zfill(4) 2413 2414 setRawTLVCmd += policy 2415 2416 setRawTLVCmd += str(hex(policyBits))[2:] 2417 2418 if xCommissioningSessionId is not None: 2419 setRawTLVCmd += '0b02' 2420 sessionid = str(hex(xCommissioningSessionId))[2:] 2421 2422 if len(sessionid) < 4: 2423 sessionid = sessionid.zfill(4) 2424 2425 setRawTLVCmd += sessionid 2426 2427 if xBorderRouterLocator is not None: 2428 setRawTLVCmd += '0902' 2429 locator = str(hex(xBorderRouterLocator))[2:] 2430 2431 if len(locator) < 4: 2432 locator = locator.zfill(4) 2433 2434 setRawTLVCmd += locator 2435 2436 if xSteeringData is not None: 2437 steeringData = self.__convertLongToHex(xSteeringData) 2438 setRawTLVCmd += '08' + str(len(steeringData) / 2).zfill(2) 2439 setRawTLVCmd += steeringData 2440 2441 if BogusTLV is not None: 2442 setRawTLVCmd += '8202aa55' 2443 2444 print(setRawTLVCmd) 2445 print(cmd) 2446 2447 if self.__sendCommand(setRawTLVCmd)[0] == 'Fail': 2448 return False 2449 2450 return self.__sendCommand(cmd)[0] != 'Fail' 2451 2452 except Exception as e: 2453 ModuleHelper.WriteIntoDebugLogger('MGMT_ACTIVE_SET() Error: ' + str(e)) 2454 2455 def MGMT_PENDING_GET(self, Addr='', TLVs=[]): 2456 """send MGMT_PENDING_GET command 2457 2458 Returns: 2459 True: successful to send MGMT_PENDING_GET 2460 False: fail to send MGMT_PENDING_GET 2461 """ 2462 print('%s call MGMT_PENDING_GET' % self.port) 2463 try: 2464 cmd = self.wpan_cmd_prefix + 'dataset mgmt-get-pending' 2465 2466 if len(TLVs) != 0: 2467 tlvs = ''.join('%02x' % tlv for tlv in TLVs) 2468 setTLVCmd = self.wpan_cmd_prefix + 'setprop Dataset:RawTlvs ' + tlvs 2469 if self.__sendCommand(setTLVCmd)[0] == 'Fail': 2470 return False 2471 else: 2472 if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail': 2473 return False 2474 2475 if Addr != '': 2476 setAddressCmd = self.wpan_cmd_prefix + 'setprop Dataset:DestIpAddress ' + Addr 2477 if self.__sendCommand(setAddressCmd)[0] == 'Fail': 2478 return False 2479 2480 print(cmd) 2481 2482 return self.__sendCommand(cmd)[0] != 'Fail' 2483 2484 except Exception as e: 2485 ModuleHelper.WriteIntoDebugLogger('MGMT_PENDING_GET() Error: ' + str(e)) 2486 2487 def MGMT_PENDING_SET( 2488 self, 2489 sAddr='', 2490 xCommissionerSessionId=None, 2491 listPendingTimestamp=None, 2492 listActiveTimestamp=None, 2493 xDelayTimer=None, 2494 xChannel=None, 2495 xPanId=None, 2496 xMasterKey=None, 2497 sMeshLocalPrefix=None, 2498 sNetworkName=None, 2499 ): 2500 """send MGMT_PENDING_SET command 2501 2502 Returns: 2503 True: successful to send MGMT_PENDING_SET 2504 False: fail to send MGMT_PENDING_SET 2505 """ 2506 print('%s call MGMT_PENDING_SET' % self.port) 2507 try: 2508 cmd = self.wpan_cmd_prefix + 'dataset mgmt-set-pending' 2509 if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail': 2510 return False 2511 2512 if listPendingTimestamp is not None: 2513 sActiveTimestamp = '%016x' % listPendingTimestamp[0] 2514 setPendingTimeCmd = self.wpan_cmd_prefix + 'setprop Dataset:PendingTimestamp ' + sActiveTimestamp 2515 if self.__sendCommand(setPendingTimeCmd)[0] == 'Fail': 2516 return False 2517 2518 if listActiveTimestamp is not None: 2519 sActiveTimestamp = '%016x' % listActiveTimestamp[0] 2520 setActiveTimeCmd = self.wpan_cmd_prefix + 'setprop Dataset:ActiveTimestamp ' + sActiveTimestamp 2521 if self.__sendCommand(setActiveTimeCmd)[0] == 'Fail': 2522 return False 2523 2524 if xDelayTimer is not None: 2525 setDelayTimerCmd = self.wpan_cmd_prefix + 'setprop Dataset:Delay ' + str(xDelayTimer) 2526 if self.__sendCommand(setDelayTimerCmd)[0] == 'Fail': 2527 return False 2528 2529 if sNetworkName is not None: 2530 setNetworkNameCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkName ' + str(sNetworkName) 2531 if self.__sendCommand(setNetworkNameCmd)[0] == 'Fail': 2532 return False 2533 2534 if xChannel is not None: 2535 setChannelCmd = self.wpan_cmd_prefix + 'setprop Dataset:Channel ' + str(xChannel) 2536 if self.__sendCommand(setChannelCmd)[0] == 'Fail': 2537 return False 2538 2539 if sMeshLocalPrefix is not None: 2540 setMLPrefixCmd = self.wpan_cmd_prefix + 'setprop Dataset:MeshLocalPrefix ' + str(sMeshLocalPrefix) 2541 if self.__sendCommand(setMLPrefixCmd)[0] == 'Fail': 2542 return False 2543 2544 if xMasterKey is not None: 2545 key = self.__convertLongToHex(xMasterKey, 32) 2546 setNetworkKeyCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkKey ' + key 2547 if self.__sendCommand(setNetworkKeyCmd)[0] == 'Fail': 2548 return False 2549 2550 if xPanId is not None: 2551 setPanIdCmd = self.wpan_cmd_prefix + 'setprop Dataset:PanId ' + str(xPanId) 2552 if self.__sendCommand(setPanIdCmd)[0] == 'Fail': 2553 return False 2554 2555 if xCommissionerSessionId is not None: 2556 print('not handle xCommissionerSessionId') 2557 2558 print(cmd) 2559 2560 return self.__sendCommand(cmd)[0] != 'Fail' 2561 2562 except Exception as e: 2563 ModuleHelper.WriteIntoDebugLogger('MGMT_PENDING_SET() Error: ' + str(e)) 2564 2565 def MGMT_COMM_GET(self, Addr='ff02::1', TLVs=[]): 2566 """send MGMT_COMM_GET command 2567 2568 Returns: 2569 True: successful to send MGMT_COMM_GET 2570 False: fail to send MGMT_COMM_GET 2571 """ 2572 print('%s call MGMT_COMM_GET' % self.port) 2573 try: 2574 cmd = self.wpan_cmd_prefix + 'commissioner mgmt-get ' 2575 print('TLVs:') 2576 print(TLVs) 2577 2578 if len(TLVs) != 0: 2579 tlvs = ''.join('%02x' % tlv for tlv in TLVs) 2580 cmd += tlvs 2581 2582 print(cmd) 2583 2584 return self.__sendCommand(cmd)[0] != 'Fail' 2585 2586 except Exception as e: 2587 ModuleHelper.WriteIntoDebugLogger('MGMT_COMM_GET() Error: ' + str(e)) 2588 2589 def MGMT_COMM_SET( 2590 self, 2591 Addr='ff02::1', 2592 xCommissionerSessionID=None, 2593 xSteeringData=None, 2594 xBorderRouterLocator=None, 2595 xChannelTlv=None, 2596 ExceedMaxPayload=False, 2597 ): 2598 """send MGMT_COMM_SET command 2599 2600 Returns: 2601 True: successful to send MGMT_COMM_SET 2602 False: fail to send MGMT_COMM_SET 2603 """ 2604 print('%s call MGMT_COMM_SET' % self.port) 2605 try: 2606 cmd = self.wpan_cmd_prefix + 'commissioner mgmt-set ' 2607 print('-------------------------------') 2608 print(xCommissionerSessionID) 2609 print(xSteeringData) 2610 print(str(xSteeringData) + ' ' + str(hex(xSteeringData)[2:])) 2611 print(xBorderRouterLocator) 2612 print(xChannelTlv) 2613 print(ExceedMaxPayload) 2614 print('-------------------------------') 2615 2616 if xCommissionerSessionID is not None: 2617 # use assigned session id 2618 cmd += '0b02' + str(xCommissionerSessionID) 2619 elif xCommissionerSessionID is None: 2620 # use original session id 2621 if self.isActiveCommissioner is True: 2622 cmd += '0b02' + self.__lstrip0x(self.__getCommissionerSessionId()) 2623 else: 2624 pass 2625 2626 if xSteeringData is not None: 2627 cmd += '08' + str(len(hex(xSteeringData)[2:])) + str(hex(xSteeringData)[2:]) 2628 2629 if xBorderRouterLocator is not None: 2630 cmd += '0902' + str(hex(xBorderRouterLocator)) 2631 2632 if xChannelTlv is not None: 2633 cmd += '000300' + '%04x' % xChannelTlv 2634 2635 print(cmd) 2636 2637 return self.__sendCommand(cmd)[0] != 'Fail' 2638 2639 except Exception as e: 2640 ModuleHelper.WriteIntoDebugLogger('MGMT_COMM_SET() Error: ' + str(e)) 2641 2642 def setActiveDataset(self, listActiveDataset=[]): 2643 print('%s call setActiveDataset' % self.port) 2644 2645 def setCommisionerMode(self): 2646 print('%s call setCommissionerMode' % self.port) 2647 2648 def setPSKc(self, strPSKc): 2649 print('%s call setPSKc' % self.port) 2650 try: 2651 cmd = self.wpan_cmd_prefix + 'setprop Network:PSKc %s' % strPSKc 2652 datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:PSKc %s' % strPSKc 2653 self.hasActiveDatasetToCommit = True 2654 return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' 2655 except Exception as e: 2656 ModuleHelper.WriteIntoDebugLogger('setPSKc() Error: ' + str(e)) 2657 2658 def setActiveTimestamp(self, xActiveTimestamp): 2659 print('%s call setActiveTimestamp' % self.port) 2660 try: 2661 sActiveTimestamp = str(xActiveTimestamp) 2662 if len(sActiveTimestamp) < 16: 2663 sActiveTimestamp = sActiveTimestamp.zfill(16) 2664 self.activetimestamp = sActiveTimestamp 2665 cmd = self.wpan_cmd_prefix + 'setprop Dataset:ActiveTimestamp %s' % sActiveTimestamp 2666 self.hasActiveDatasetToCommit = True 2667 return self.__sendCommand(cmd)[0] != 'Fail' 2668 except Exception as e: 2669 ModuleHelper.WriteIntoDebugLogger('setActiveTimestamp() Error: ' + str(e)) 2670 2671 def setUdpJoinerPort(self, portNumber): 2672 """set Joiner UDP Port 2673 2674 Args: 2675 portNumber: Joiner UDP Port number 2676 2677 Returns: 2678 True: successful to set Joiner UDP Port 2679 False: fail to set Joiner UDP Port 2680 2681 @todo : required if as reference device 2682 """ 2683 2684 def commissionerUnregister(self): 2685 """stop commissioner 2686 2687 Returns: 2688 True: successful to stop commissioner 2689 False: fail to stop commissioner 2690 """ 2691 print('%s call commissionerUnregister' % self.port) 2692 cmd = self.wpan_cmd_prefix + 'commissioner stop' 2693 print(cmd) 2694 if self.__sendCommand(cmd)[0] != 'Fail': 2695 self.isActiveCommissioner = False 2696 return True 2697 else: 2698 return False 2699 2700 def sendBeacons(self, sAddr, xCommissionerSessionId, listChannelMask, xPanId): 2701 print('%s call sendBeacons' % self.port) 2702 self._sendline(self.wpan_cmd_prefix + 'scan') 2703 return True 2704 2705 def updateRouterStatus(self): 2706 """force update to router as if there is child id request 2707 @todo : required if as reference device 2708 """ 2709 2710 def setRouterThresholdValues(self, upgradeThreshold, downgradeThreshold): 2711 print('%s call setRouterThresholdValues' % self.port) 2712 self.__setRouterUpgradeThreshold(upgradeThreshold) 2713 self.__setRouterDowngradeThreshold(downgradeThreshold) 2714 2715 def setMinDelayTimer(self, iSeconds): 2716 pass 2717 2718 def ValidateDeviceFirmware(self): 2719 print('%s call ValidateDeviceFirmware' % self.port) 2720 if 'OPENTHREAD' in self.UIStatusMsg: 2721 return True 2722 else: 2723 return False 2724 2725 @staticmethod 2726 def __lstrip0x(s): 2727 """strip 0x at the beginning of a hex string if it exists 2728 2729 Args: 2730 s: hex string 2731 2732 Returns: 2733 hex string with leading 0x stripped 2734 """ 2735 if s.startswith('0x'): 2736 s = s[2:] 2737 2738 return s 2739