1#!/usr/bin/env python3
2#
3# Copyright (c) 2021, 2024 Intel Corporation
4#
5# SPDX-License-Identifier: Apache-2.0
6
7# Parsers are gonig to have very similar code.
8# So tell pylint not to care.
9# pylint: disable=duplicate-code
10
11"""
12Dictionary-based Logging Parser Version 3
13
14This contains the implementation of the parser for
15version 3 databases.
16"""
17
18import logging
19import struct
20
21import colorama
22from colorama import Fore
23
24from .data_types import DataTypes
25from .log_parser import LogParser, formalize_fmt_string, get_log_level_str_color
26
27HEX_BYTES_IN_LINE = 16
28
29# Need to keep sync with struct log_dict_output_msg_hdr in
30# include/logging/log_output_dict.h.
31#
32# struct log_dict_output_normal_msg_hdr_t {
33#     uint8_t type;
34#     uint32_t domain:4;
35#     uint32_t level:4;
36#     uint32_t package_len:16;
37#     uint32_t data_len:16;
38#     uintptr_t source;
39#     log_timestamp_t timestamp;
40# } __packed;
41#
42# Note "type" and "timestamp" are encoded separately below.
43FMT_MSG_HDR_32 = "BHHI"
44FMT_MSG_HDR_64 = "BHHQ"
45
46# Message type
47# 0: normal message
48# 1: number of dropped messages
49FMT_MSG_TYPE = "B"
50
51# Depends on CONFIG_LOG_TIMESTAMP_64BIT
52FMT_MSG_TIMESTAMP_32 = "I"
53FMT_MSG_TIMESTAMP_64 = "Q"
54
55# Keep message types in sync with include/logging/log_output_dict.h
56MSG_TYPE_NORMAL = 0
57MSG_TYPE_DROPPED = 1
58
59# Number of dropped messages
60FMT_DROPPED_CNT = "H"
61
62
63logger = logging.getLogger("parser")
64
65
66class LogParserV3(LogParser):
67    """Log Parser V1"""
68
69    def __init__(self, database):
70        super().__init__(database=database)
71
72        if self.database.is_tgt_little_endian():
73            endian = "<"
74            self.is_big_endian = False
75        else:
76            endian = ">"
77            self.is_big_endian = True
78
79        self.fmt_msg_type = endian + FMT_MSG_TYPE
80        self.fmt_dropped_cnt = endian + FMT_DROPPED_CNT
81
82        if self.database.is_tgt_64bit():
83            self.fmt_msg_hdr = endian + FMT_MSG_HDR_64
84        else:
85            self.fmt_msg_hdr = endian + FMT_MSG_HDR_32
86
87        if "CONFIG_LOG_TIMESTAMP_64BIT" in self.database.get_kconfigs():
88            self.fmt_msg_timestamp = endian + FMT_MSG_TIMESTAMP_64
89        else:
90            self.fmt_msg_timestamp = endian + FMT_MSG_TIMESTAMP_32
91
92    def __get_string(self, arg, arg_offset, string_tbl):
93        one_str = self.database.find_string(arg)
94        if one_str is not None:
95            ret = one_str
96        else:
97            # The index from the string table is basically
98            # the order in va_list. Need to add to the index
99            # to skip the packaged string header and
100            # the format string.
101            str_idx = arg_offset + self.data_types.get_sizeof(DataTypes.PTR) * 2
102            str_idx /= self.data_types.get_sizeof(DataTypes.INT)
103
104            ret = string_tbl.get(int(str_idx), f"<string@0x{arg:x}>")
105
106        return ret
107
108    def process_one_fmt_str(self, fmt_str, arg_list, string_tbl):
109        """Parse the format string to extract arguments from
110        the binary arglist and return a tuple usable with
111        Python's string formatting"""
112        idx = 0
113        arg_offset = 0
114        arg_data_type = None
115        is_parsing = False
116        do_extract = False
117
118        args = []
119
120        # Translated from cbvprintf_package()
121        for idx, fmt in enumerate(fmt_str):
122            if not is_parsing:
123                if fmt == '%':
124                    is_parsing = True
125                    arg_data_type = DataTypes.INT
126                    continue
127
128            elif fmt == '%':
129                # '%%' -> literal percentage sign
130                is_parsing = False
131                continue
132
133            elif fmt == '*':
134                pass
135
136            elif fmt.isdecimal() or str.lower(fmt) == 'l' or fmt in (' ', '#', '-', '+', '.', 'h'):
137                # formatting modifiers, just ignore
138                continue
139
140            elif fmt in ('j', 'z', 't'):
141                # intmax_t, size_t or ptrdiff_t
142                arg_data_type = DataTypes.LONG
143
144            elif fmt in ('c', 'd', 'i', 'o', 'u', 'x', 'X'):
145                unsigned = fmt in ('c', 'o', 'u', 'x', 'X')
146
147                if fmt_str[idx - 1] == 'l':
148                    if fmt_str[idx - 2] == 'l':
149                        arg_data_type = DataTypes.ULONG_LONG if unsigned else DataTypes.LONG_LONG
150                    else:
151                        arg_data_type = DataTypes.ULONG if unsigned else DataTypes.LONG
152                else:
153                    arg_data_type = DataTypes.UINT if unsigned else DataTypes.INT
154
155                is_parsing = False
156                do_extract = True
157
158            elif fmt in ('s', 'p', 'n'):
159                arg_data_type = DataTypes.PTR
160
161                is_parsing = False
162                do_extract = True
163
164            elif str.lower(fmt) in ('a', 'e', 'f', 'g'):
165                # Python doesn't do"long double".
166                #
167                # Parse it as double (probably incorrect), but
168                # still have to skip enough bytes.
169                if fmt_str[idx - 1] == 'L':
170                    arg_data_type = DataTypes.LONG_DOUBLE
171                else:
172                    arg_data_type = DataTypes.DOUBLE
173
174                is_parsing = False
175                do_extract = True
176
177            else:
178                is_parsing = False
179                continue
180
181            if do_extract:
182                do_extract = False
183
184                align = self.data_types.get_alignment(arg_data_type)
185                size = self.data_types.get_sizeof(arg_data_type)
186                unpack_fmt = self.data_types.get_formatter(arg_data_type)
187
188                # Align the argument list by rounding up
189                stack_align = self.data_types.get_stack_alignment(arg_data_type)
190                if stack_align > 1:
191                    arg_offset = int((arg_offset + (align - 1)) / align) * align
192
193                one_arg = struct.unpack_from(unpack_fmt, arg_list, arg_offset)[0]
194
195                if fmt == 's':
196                    one_arg = self.__get_string(one_arg, arg_offset, string_tbl)
197
198                args.append(one_arg)
199                arg_offset += size
200
201                # Align the offset
202                if stack_align > 1:
203                    arg_offset = int((arg_offset + align - 1) / align) * align
204
205        return tuple(args)
206
207    @staticmethod
208    def extract_string_table(str_tbl):
209        """Extract string table in a packaged log message"""
210        tbl = {}
211
212        one_str = ""
213        next_new_string = True
214        # Translated from cbvprintf_package()
215        for one_ch in str_tbl:
216            if next_new_string:
217                str_idx = one_ch
218                next_new_string = False
219                continue
220
221            if one_ch == 0:
222                tbl[str_idx] = one_str
223                one_str = ""
224                next_new_string = True
225                continue
226
227            one_str += chr(one_ch)
228
229        return tbl
230
231    @staticmethod
232    def print_hexdump(hex_data, prefix_len, color):
233        """Print hex dump"""
234        hex_vals = ""
235        chr_vals = ""
236        chr_done = 0
237
238        for one_hex in hex_data:
239            hex_vals += f'{one_hex:02x} '
240            chr_vals += chr(one_hex)
241            chr_done += 1
242
243            if chr_done == HEX_BYTES_IN_LINE / 2:
244                hex_vals += " "
245                chr_vals += " "
246
247            elif chr_done == HEX_BYTES_IN_LINE:
248                print(f"{color}%s%s|%s{Fore.RESET}" % ((" " * prefix_len), hex_vals, chr_vals))
249                hex_vals = ""
250                chr_vals = ""
251                chr_done = 0
252
253        if len(chr_vals) > 0:
254            hex_padding = "   " * (HEX_BYTES_IN_LINE - chr_done)
255            print(
256                f"{color}%s%s%s|%s{Fore.RESET}"
257                % ((" " * prefix_len), hex_vals, hex_padding, chr_vals)
258            )
259
260    def get_full_msg_hdr_size(self):
261        """Get the size of the full message header"""
262        return (
263            struct.calcsize(self.fmt_msg_type)
264            + struct.calcsize(self.fmt_msg_hdr)
265            + struct.calcsize(self.fmt_msg_timestamp)
266        )
267
268    def get_normal_msg_size(self, logdata, offset):
269        """Get the needed size of the normal log message at offset"""
270        _, pkg_len, data_len, _ = struct.unpack_from(
271            self.fmt_msg_hdr,
272            logdata,
273            offset + struct.calcsize(self.fmt_msg_type),
274        )
275        return self.get_full_msg_hdr_size() + pkg_len + data_len
276
277    def parse_one_normal_msg(self, logdata, offset):
278        """Parse one normal log message and print the encoded message"""
279        # Parse log message header
280        domain_lvl, pkg_len, data_len, source_id = struct.unpack_from(
281            self.fmt_msg_hdr, logdata, offset
282        )
283        offset += struct.calcsize(self.fmt_msg_hdr)
284
285        timestamp = struct.unpack_from(self.fmt_msg_timestamp, logdata, offset)[0]
286        offset += struct.calcsize(self.fmt_msg_timestamp)
287
288        # domain_id, level
289        if self.is_big_endian:
290            level = domain_lvl & 0x0F
291            domain_id = (domain_lvl >> 4) & 0x0F
292        else:
293            domain_id = domain_lvl & 0x0F
294            level = (domain_lvl >> 4) & 0x0F
295
296        level_str, color = get_log_level_str_color(level)
297        source_id_str = self.database.get_log_source_string(domain_id, source_id)
298
299        # Skip over data to point to next message (save as return value)
300        next_msg_offset = offset + pkg_len + data_len
301
302        # Offset from beginning of cbprintf_packaged data to end of va_list arguments
303        offset_end_of_args = struct.unpack_from("B", logdata, offset)[0]
304        offset_end_of_args *= self.data_types.get_sizeof(DataTypes.INT)
305        offset_end_of_args += offset
306
307        # Extra data after packaged log
308        extra_data = logdata[(offset + pkg_len) : next_msg_offset]
309
310        # Number of appended strings in package
311        num_packed_strings = struct.unpack_from("B", logdata, offset + 1)[0]
312
313        # Number of read-only string indexes
314        num_ro_str_indexes = struct.unpack_from("B", logdata, offset + 2)[0]
315        offset_end_of_args += num_ro_str_indexes
316
317        # Number of read-write string indexes
318        num_rw_str_indexes = struct.unpack_from("B", logdata, offset + 3)[0]
319        offset_end_of_args += num_rw_str_indexes
320
321        # Extract the string table in the packaged log message
322        string_tbl = self.extract_string_table(logdata[offset_end_of_args : (offset + pkg_len)])
323
324        if len(string_tbl) != num_packed_strings:
325            logger.error("------ Error extracting string table")
326            return None
327
328        # Skip packaged string header
329        offset += self.data_types.get_sizeof(DataTypes.PTR)
330
331        # Grab the format string
332        #
333        # Note the negative offset to __get_string(). It is because
334        # the offset begins at 0 for va_list. However, the format string
335        # itself is before the va_list, so need to go back the width of
336        # a pointer.
337        fmt_str_ptr = struct.unpack_from(
338            self.data_types.get_formatter(DataTypes.PTR), logdata, offset
339        )[0]
340        fmt_str = self.__get_string(
341            fmt_str_ptr, -self.data_types.get_sizeof(DataTypes.PTR), string_tbl
342        )
343        offset += self.data_types.get_sizeof(DataTypes.PTR)
344
345        if not fmt_str:
346            logger.error("------ Error getting format string at 0x%x", fmt_str_ptr)
347            return None
348
349        args = self.process_one_fmt_str(fmt_str, logdata[offset:offset_end_of_args], string_tbl)
350
351        fmt_str = formalize_fmt_string(fmt_str)
352        log_msg = fmt_str % args
353
354        if level == 0:
355            print(f"{log_msg}", end='')
356            log_prefix = ""
357        else:
358            log_prefix = f"[{timestamp:>10}] <{level_str}> {source_id_str}: "
359            print(f"{color}%s%s{Fore.RESET}" % (log_prefix, log_msg))
360
361        if data_len > 0:
362            # Has hexdump data
363            self.print_hexdump(extra_data, len(log_prefix), color)
364
365        # Point to next message
366        return next_msg_offset
367
368    def parse_one_msg(self, logdata, offset):
369        if offset + struct.calcsize(self.fmt_msg_type) > len(logdata):
370            return False, offset
371
372        # Get message type
373        msg_type = struct.unpack_from(self.fmt_msg_type, logdata, offset)[0]
374
375        if msg_type == MSG_TYPE_DROPPED:
376            if offset + struct.calcsize(self.fmt_dropped_cnt) > len(logdata):
377                return False, offset
378
379            offset += struct.calcsize(self.fmt_msg_type)
380
381            num_dropped = struct.unpack_from(self.fmt_dropped_cnt, logdata, offset)[0]
382            offset += struct.calcsize(self.fmt_dropped_cnt)
383
384            print(f"--- {num_dropped} messages dropped ---")
385
386        elif msg_type == MSG_TYPE_NORMAL:
387            if (offset + self.get_full_msg_hdr_size() > len(logdata)) or (
388                offset + self.get_normal_msg_size(logdata, offset) > len(logdata)
389            ):
390                return False, offset
391
392            offset += struct.calcsize(self.fmt_msg_type)
393
394            ret = self.parse_one_normal_msg(logdata, offset)
395            if ret is None:
396                raise ValueError("Error parsing normal log message")
397
398            offset = ret
399
400        else:
401            logger.error("------ Unknown message type: %s", msg_type)
402            raise ValueError(f"Unknown message type: {msg_type}")
403
404        return True, offset
405
406    def parse_log_data(self, logdata, debug=False):
407        """Parse binary log data and print the encoded log messages"""
408        offset = 0
409        still_parsing = True
410
411        while offset < len(logdata) and still_parsing:
412            still_parsing, offset = self.parse_one_msg(logdata, offset)
413
414        return offset
415
416
417colorama.init()
418