#!/usr/bin/env python3 # Copyright 2019 Oticon A/S # SPDX-License-Identifier: Apache-2.0 import time import argparse import struct from csv_common import * # Wireshark constants WS_FLAGS_PHY_1M = 0b00<<14 WS_FLAGS_PHY_2M = 0b01<<14 WS_FLAGS_PHY_CODED = 0b10<<14 WS_FLAGS_SIGNAL_POWER = 0b1<<1 # BabbleSim constants (see ext_2G4_libPhyComv1/src/bs_pc_2G4_modulations.h) P2G4_MOD_BLE = 0x10 P2G4_MOD_BLE2M = 0x20 P2G4_MOD_BLE_CODED = 0x50 # default max packet length (phdr + access address + pdu + crc) SNAPLEN = 512 def get_data_and_len(row): pdu_crc = bytes.fromhex("00") orig_len = int(row['packet_size'], 10) if orig_len != 0: try: pdu_crc = bytes.fromhex(row['packet']) except: #In case the packet is broken mid byte pdu_crc = bytes.fromhex("00") if len(pdu_crc) != orig_len: # Let's handle this somehow gracefully print("Truncated input file (partial packet), writing partial packet in output") orig_len = len(pdu_crc) orig_len += 14; # 10 bytes phdr + 4 bytes access address return (orig_len, pdu_crc) def is_coded_FEC2(row): if not row: return False if int(row['modulation'], 10) != P2G4_MOD_BLE_CODED: return False if int(row['packet_size'], 10) < 5: # It must at least have a header and CRC (2+3 bytes) return False return True def write(outfile, *inputs, snaplen = SNAPLEN, basetime=None): #For information on the pcap format see https://wiki.wireshark.org/Development/LibpcapFileFormat buf = bytearray(30) if basetime == None: basetime = int(time.time() * 1000000) # write pcap header struct.pack_into('= 1.0 and freq < 81.0: rf_channel = int((freq - 1.0) / 2) elif freq >= 2401.0 and freq < 2481.0: rf_channel = int((freq - 2401.0) / 2) else: raise ValueError access_address = int(row['phy_address'], 16) flags = 0 modulation = int(row['modulation'], 10) if modulation == P2G4_MOD_BLE: flags |= WS_FLAGS_PHY_1M elif modulation == P2G4_MOD_BLE2M: flags |= WS_FLAGS_PHY_2M elif modulation == P2G4_MOD_BLE_CODED: flags |= WS_FLAGS_PHY_CODED if (int(row['packet_size'], 10) != 1): # If this is a FEC1, it has only the CI byte continue # The next row likely contains this packet FEC2: row = rows[min_idx] if not is_coded_FEC2(row): # Otherwise the coded phy packet was aborted before the FEC2, so we just ignore it continue rows[min_idx] = next(inputs[min_idx], None) (orig_len, pdu_crc_FEC2) = get_data_and_len(row) pdu_crc = bytes(pdu_crc) + pdu_crc_FEC2 orig_len += 1 # Transmission power (dBm) flags |= WS_FLAGS_SIGNAL_POWER signal_power = int(float(row['power_level'])) incl_len = min(orig_len, snaplen) ts = basetime + min_ts ts_sec = ts // 1000000 ts_usec = ts % 1000000 struct.pack_into('