1#! /usr/bin/env python3 2# Copyright 2019 Oticon A/S 3# SPDX-License-Identifier: Apache-2.0 4 5import socket 6import struct 7import time 8 9from components.basic_commands import Commands 10from datetime import datetime 11from enum import IntEnum 12 13class BleMonitorOpcode(IntEnum): 14 NEW_INDEX = 0 15 DEL_INDEX = 1 16 COMMAND = 2 17 EVENT = 3 18 ACL_TX = 4 19 ACL_RX = 5 20 OPEN_INDEX = 8 21 SYSTEM_NOTE = 12 22 USER_LOGGING = 13 23 ISO_TX = 18 24 ISO_RX = 19 25 26class BtsnoopPriority(IntEnum): 27 EMERGENCY = 0 28 ALERT = 1 29 CRITICAL = 2 30 ERROR = 3 31 WARNING = 4 32 NOTICE = 5 33 INFO = 6 34 DEBUG = 7 35 36class Btsnoop: 37 def __init__(self, store_to_file, socket_path) -> None: 38 39 self.non_hci_edtt_cmds = (Commands.CMD_HAS_EVENT_REQ, Commands.CMD_HAS_EVENT_RSP, 40 Commands.CMD_FLUSH_EVENTS_REQ, Commands.CMD_FLUSH_EVENTS_RSP, 41 Commands.CMD_GET_EVENT_REQ, 42 Commands.CMD_LE_DATA_FLUSH_REQ, Commands.CMD_LE_DATA_FLUSH_RSP, 43 Commands.CMD_LE_DATA_READY_REQ, Commands.CMD_LE_DATA_READY_RSP, 44 Commands.CMD_LE_DATA_READ_REQ, Commands.CMD_LE_DATA_WRITE_RSP, 45 Commands.CMD_LE_ISO_DATA_FLUSH_REQ, Commands.CMD_LE_ISO_DATA_FLUSH_RSP, 46 Commands.CMD_LE_ISO_DATA_READY_REQ, Commands.CMD_LE_ISO_DATA_READY_RSP, 47 Commands.CMD_LE_ISO_DATA_READ_REQ, Commands.CMD_LE_ISO_DATA_WRITE_RSP) 48 49 self.tx_data_edtt_cmds = (Commands.CMD_LE_DATA_WRITE_REQ, Commands.CMD_LE_ISO_DATA_WRITE_REQ) 50 51 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 52 self.sock.settimeout(5) 53 54 try: 55 print("Opening socket ", socket_path) 56 self.sock.connect(socket_path) 57 except Exception: 58 self.sock = None 59 print("Could not connect to the btmon socket: ", socket_path, Exception) 60 61 self.start_time = time.time() 62 63 if store_to_file == False: 64 self.file = None 65 return 66 67 now = datetime.now() 68 btsnoop_file_name = "btsnoop_" + str(now.date()) + "_" + str(now.time()) +".log" 69 70 print("Opening file ", btsnoop_file_name) 71 self.file = open(btsnoop_file_name, "wb") 72 73 """ 74 btsnoop header 75 0----------------------------------------64 76 | Id | 77 +------------------32---------------------+ 78 | btsnoop_ver= 1 | type | 79 +-------------------+---------------------+ 80 """ 81 btsnoop_id = [0x62, 0x74, 0x73, 0x6e, 0x6f, 0x6f, 0x70, 0x00 ] 82 btsnoop_monitor_format = 2001 83 header = struct.pack(">BBBBBBBBLL", btsnoop_id[0], btsnoop_id[1], 84 btsnoop_id[2], btsnoop_id[3], 85 btsnoop_id[4], btsnoop_id[5], 86 btsnoop_id[6], btsnoop_id[7], 87 1, btsnoop_monitor_format) 88 self.file.write(header) 89 90 91 def close(self): 92 if self.file: 93 self.file.close() 94 if self.sock: 95 self.sock.close() 96 97 98 def send_monitor_hdr_btmon_socket(self, idx, opcode, data_len): 99 """ 100 mgmt_hdr 101 0----------16---------32---------------48 102 | opcode | index | len | 103 +----------+----------+----------------- 104 105 """ 106 ble_monitor_hdr = struct.pack("<HHH", opcode, idx, data_len) 107 self.sock.send(ble_monitor_hdr) 108 109 110 def send_monitor_hdr_file(self, idx, opcode, data_len, timestamp): 111 """ 112 btsnoop_pkt 113 0-----------------32---------------64 114 | original len | included len | 115 +------------------+----------------+ 116 | flags |cumulative drops| 117 +-----------------------------------+ 118 | timestamp | 119 +------------------+----------------+ 120 """ 121 flags = (idx << 16) | opcode 122 pkt_hdr = struct.pack(">LLLLQ", data_len, data_len, flags, 0, timestamp) 123 self.file.write(pkt_hdr) 124 125 126 def send_monitor_hdr(self, idx, opcode, data_len): 127 idx +=10 128 timestamp = int((time.time() - self.start_time) * 1000) 129 130 if self.file: 131 self.send_monitor_hdr_file(idx, opcode, data_len, timestamp) 132 if self.sock: 133 self.send_monitor_hdr_btmon_socket(idx, opcode, data_len) 134 135 136 def send_event_sock(self, packet, data, eventLen): 137 self.sock.send(packet[8:10]) 138 if eventLen > 0: 139 self.sock.send(data) 140 141 142 def send_event_file(self, packet, data, eventLen): 143 self.file.write(packet[8:10]) 144 if eventLen > 0: 145 self.file.write(data) 146 147 148 def send_event(self, idx, packet, data): 149 opcode = BleMonitorOpcode.EVENT 150 RespCmd, RespLen, time, event, eventLen = struct.unpack('<HHIBB', packet[:10]); 151 self.send_monitor_hdr(idx, opcode, eventLen + 2) 152 153 if self.file: 154 self.send_event_file(packet, data, eventLen) 155 if self.sock: 156 self.send_event_sock(packet, data, eventLen) 157 158 159 def send_command_sock(self, opcode, len, data): 160 self.sock.send(opcode) 161 #This is this additional octet 162 hci_len=struct.pack("<B", len) 163 self.sock.send(hci_len) 164 if len > 0: 165 self.sock.send(data) 166 167 168 def send_command_file(self, opcode, len, data): 169 self.file.write(opcode) 170 #This is this additional octet 171 hci_len=struct.pack("<B", len) 172 self.file.write(hci_len) 173 if len > 0: 174 self.file.write(data) 175 176 177 def send_monitor_command(self, opcode, len, data): 178 if self.file: 179 self.send_command_file(opcode, len, data) 180 if self.sock: 181 self.send_command_sock( opcode, len, data) 182 183 184 def send_index_added(self, idx, addr, name): 185 """ 186 NewIndexHdr 187 0------8-----16-----...-----64 -------+ 188 | type | bus | address | name | 189 +------+---------------------+--------+ 190 """ 191 new_index = struct.pack("<BB6B", 0, 10, addr[0], addr[1], addr[2], addr[3], addr[4], addr[5]) 192 l = len(name) 193 packed_name = struct.pack("<%dsb" % l, name.encode('ascii'), 0) 194 self.send_monitor_hdr(idx, BleMonitorOpcode.NEW_INDEX, 8 + len(packed_name)) 195 if self.file: 196 self.file.write(new_index) 197 self.file.write(packed_name) 198 199 if self.sock: 200 self.sock.send(new_index) 201 self.sock.send(packed_name) 202 203 204 def send_monitor_iso_rx(self, idx, handle, dataLen, packet): 205 self.send_monitor_hdr(idx, BleMonitorOpcode.ISO_RX, dataLen + 4) 206 207 hdr = struct.pack('<HH', handle, dataLen) 208 if self.file: 209 self.file.write(hdr) 210 self.file.write(packet) 211 if self.sock: 212 self.sock.send(hdr) 213 self.sock.send(packet) 214 215 216 def send_monitor_acl_rx(self, idx, handle, dataLen, packet): 217 self.send_monitor_hdr(idx, BleMonitorOpcode.ACL_RX, dataLen + 4) 218 219 hdr = struct.pack('<HH', handle, dataLen) 220 if self.file: 221 self.file.write(hdr) 222 self.file.write(packet) 223 if self.sock: 224 self.sock.send(hdr) 225 self.sock.send(packet) 226 227 228 def send(self, idx, message): 229 # unpack and validate EDTT header first 230 op, payload_len = struct.unpack_from('<HH', message) 231 232 if op in self.non_hci_edtt_cmds: 233 return 234 235 if payload_len == 0: 236 return 237 238 if op in self.tx_data_edtt_cmds: 239 # Assume it is ACL data 240 opcode = BleMonitorOpcode.ACL_TX 241 242 if op == Commands.CMD_LE_ISO_DATA_WRITE_REQ: 243 opcode = BleMonitorOpcode.ISO_TX 244 245 self.send_monitor_hdr(idx, opcode, payload_len) 246 if self.file: 247 self.file.write(message[4:]) 248 if self.sock: 249 self.sock.send(message[4:]) 250 251 return 252 253 # All requests are even. 254 if op % 2 == 1: 255 opcode = BleMonitorOpcode.COMMAND 256 # + 1 is because we need to add octet for the hci command len 257 self.send_monitor_hdr(idx, opcode, payload_len + 1) 258 data = None 259 if (payload_len - 2 > 0): 260 data = message[6:] 261 262 self.send_monitor_command(message[4:6], payload_len - 2, data) 263 264 265 def send_user_data(self, idx, priority, string): 266 l = len(string) 267 hdr = struct.pack("<BB", priority, l) 268 log = struct.pack("<%dsb" % l, string.encode('ascii'), 0) 269 self.send_monitor_hdr(idx, BleMonitorOpcode.USER_LOGGING, 2 + len(log)) 270 271 if self.file: 272 self.file.write(hdr) 273 self.file.write(log) 274 if self.sock: 275 self.sock.send(hdr) 276 self.sock.send(log) 277