1#!/usr/bin/env python3
2#
3# Copyright (c) 2021, 2024 Intel Corporation
4#
5# SPDX-License-Identifier: Apache-2.0
6
7# Parsers are gonig to have very similar code.
8# So tell pylint not to care.
9# pylint: disable=duplicate-code
10
11"""
12Dictionary-based Logging Parser Version 3
13
14This contains the implementation of the parser for
15version 3 databases.
16"""
17
18import logging
19import struct
20import colorama
21from colorama import Fore
22
23from .log_parser import (LogParser, get_log_level_str_color, formalize_fmt_string)
24from .data_types import DataTypes
25
26
27HEX_BYTES_IN_LINE = 16
28
29# Need to keep sync with struct log_dict_output_msg_hdr in
30# include/logging/log_output_dict.h.
31#
32# struct log_dict_output_normal_msg_hdr_t {
33#     uint8_t type;
34#     uint32_t domain:4;
35#     uint32_t level:4;
36#     uint32_t package_len:16;
37#     uint32_t data_len:16;
38#     uintptr_t source;
39#     log_timestamp_t timestamp;
40# } __packed;
41#
42# Note "type" and "timestamp" are encoded separately below.
43FMT_MSG_HDR_32 = "BHHI"
44FMT_MSG_HDR_64 = "BHHQ"
45
46# Message type
47# 0: normal message
48# 1: number of dropped messages
49FMT_MSG_TYPE = "B"
50
51# Depends on CONFIG_LOG_TIMESTAMP_64BIT
52FMT_MSG_TIMESTAMP_32 = "I"
53FMT_MSG_TIMESTAMP_64 = "Q"
54
55# Keep message types in sync with include/logging/log_output_dict.h
56MSG_TYPE_NORMAL = 0
57MSG_TYPE_DROPPED = 1
58
59# Number of dropped messages
60FMT_DROPPED_CNT = "H"
61
62
63logger = logging.getLogger("parser")
64
65
66class LogParserV3(LogParser):
67    """Log Parser V1"""
68    def __init__(self, database):
69        super().__init__(database=database)
70
71        if self.database.is_tgt_little_endian():
72            endian = "<"
73            self.is_big_endian = False
74        else:
75            endian = ">"
76            self.is_big_endian = True
77
78        self.fmt_msg_type = endian + FMT_MSG_TYPE
79        self.fmt_dropped_cnt = endian + FMT_DROPPED_CNT
80
81        if self.database.is_tgt_64bit():
82            self.fmt_msg_hdr = endian + FMT_MSG_HDR_64
83        else:
84            self.fmt_msg_hdr = endian + FMT_MSG_HDR_32
85
86        if "CONFIG_LOG_TIMESTAMP_64BIT" in self.database.get_kconfigs():
87            self.fmt_msg_timestamp = endian + FMT_MSG_TIMESTAMP_64
88        else:
89            self.fmt_msg_timestamp = endian + FMT_MSG_TIMESTAMP_32
90
91
92    def __get_string(self, arg, arg_offset, string_tbl):
93        one_str = self.database.find_string(arg)
94        if one_str is not None:
95            ret = one_str
96        else:
97            # The index from the string table is basically
98            # the order in va_list. Need to add to the index
99            # to skip the packaged string header and
100            # the format string.
101            str_idx = arg_offset + self.data_types.get_sizeof(DataTypes.PTR) * 2
102            str_idx /= self.data_types.get_sizeof(DataTypes.INT)
103
104            if int(str_idx) not in string_tbl:
105                ret = f'<string@0x{arg:x}>'
106            else:
107                ret = string_tbl[int(str_idx)]
108
109        return ret
110
111
112    def process_one_fmt_str(self, fmt_str, arg_list, string_tbl):
113        """Parse the format string to extract arguments from
114        the binary arglist and return a tuple usable with
115        Python's string formatting"""
116        idx = 0
117        arg_offset = 0
118        arg_data_type = None
119        is_parsing = False
120        do_extract = False
121
122        args = []
123
124        # Translated from cbvprintf_package()
125        for idx, fmt in enumerate(fmt_str):
126            if not is_parsing:
127                if fmt == '%':
128                    is_parsing = True
129                    arg_data_type = DataTypes.INT
130                    continue
131
132            elif fmt == '%':
133                # '%%' -> literal percentage sign
134                is_parsing = False
135                continue
136
137            elif fmt == '*':
138                pass
139
140            elif fmt.isdecimal() or str.lower(fmt) == 'l' \
141                or fmt in (' ', '#', '-', '+', '.', 'h'):
142                # formatting modifiers, just ignore
143                continue
144
145            elif fmt in ('j', 'z', 't'):
146                # intmax_t, size_t or ptrdiff_t
147                arg_data_type = DataTypes.LONG
148
149            elif fmt in ('c', 'd', 'i', 'o', 'u', 'x', 'X'):
150                unsigned = fmt in ('c', 'o', 'u', 'x', 'X')
151
152                if fmt_str[idx - 1] == 'l':
153                    if fmt_str[idx - 2] == 'l':
154                        arg_data_type = DataTypes.ULONG_LONG if unsigned else DataTypes.LONG_LONG
155                    else:
156                        arg_data_type = DataTypes.ULONG if unsigned else DataTypes.LONG
157                else:
158                    arg_data_type = DataTypes.UINT if unsigned else DataTypes.INT
159
160                is_parsing = False
161                do_extract = True
162
163            elif fmt in ('s', 'p', 'n'):
164                arg_data_type = DataTypes.PTR
165
166                is_parsing = False
167                do_extract = True
168
169            elif str.lower(fmt) in ('a', 'e', 'f', 'g'):
170                # Python doesn't do"long double".
171                #
172                # Parse it as double (probably incorrect), but
173                # still have to skip enough bytes.
174                if fmt_str[idx - 1] == 'L':
175                    arg_data_type = DataTypes.LONG_DOUBLE
176                else:
177                    arg_data_type = DataTypes.DOUBLE
178
179                is_parsing = False
180                do_extract = True
181
182            else:
183                is_parsing = False
184                continue
185
186            if do_extract:
187                do_extract = False
188
189                align = self.data_types.get_alignment(arg_data_type)
190                size = self.data_types.get_sizeof(arg_data_type)
191                unpack_fmt = self.data_types.get_formatter(arg_data_type)
192
193                # Align the argument list by rounding up
194                stack_align = self.data_types.get_stack_alignment(arg_data_type)
195                if stack_align > 1:
196                    arg_offset = int((arg_offset + (align - 1)) / align) * align
197
198                one_arg = struct.unpack_from(unpack_fmt, arg_list, arg_offset)[0]
199
200                if fmt == 's':
201                    one_arg = self.__get_string(one_arg, arg_offset, string_tbl)
202
203                args.append(one_arg)
204                arg_offset += size
205
206                # Align the offset
207                if stack_align > 1:
208                    arg_offset = int((arg_offset + align - 1) / align) * align
209
210        return tuple(args)
211
212
213    @staticmethod
214    def extract_string_table(str_tbl):
215        """Extract string table in a packaged log message"""
216        tbl = {}
217
218        one_str = ""
219        next_new_string = True
220        # Translated from cbvprintf_package()
221        for one_ch in str_tbl:
222            if next_new_string:
223                str_idx = one_ch
224                next_new_string = False
225                continue
226
227            if one_ch == 0:
228                tbl[str_idx] = one_str
229                one_str = ""
230                next_new_string = True
231                continue
232
233            one_str += chr(one_ch)
234
235        return tbl
236
237
238    @staticmethod
239    def print_hexdump(hex_data, prefix_len, color):
240        """Print hex dump"""
241        hex_vals = ""
242        chr_vals = ""
243        chr_done = 0
244
245        for one_hex in hex_data:
246            hex_vals += f'{one_hex:02x} '
247            chr_vals += chr(one_hex)
248            chr_done += 1
249
250            if chr_done == HEX_BYTES_IN_LINE / 2:
251                hex_vals += " "
252                chr_vals += " "
253
254            elif chr_done == HEX_BYTES_IN_LINE:
255                print(f"{color}%s%s|%s{Fore.RESET}" % ((" " * prefix_len),
256                      hex_vals, chr_vals))
257                hex_vals = ""
258                chr_vals = ""
259                chr_done = 0
260
261        if len(chr_vals) > 0:
262            hex_padding = "   " * (HEX_BYTES_IN_LINE - chr_done)
263            print(f"{color}%s%s%s|%s{Fore.RESET}" % ((" " * prefix_len),
264                  hex_vals, hex_padding, chr_vals))
265
266
267    def parse_one_normal_msg(self, logdata, offset):
268        """Parse one normal log message and print the encoded message"""
269        # Parse log message header
270        domain_lvl, pkg_len, data_len, source_id = struct.unpack_from(self.fmt_msg_hdr,
271                                                                      logdata, offset)
272        offset += struct.calcsize(self.fmt_msg_hdr)
273
274        timestamp = struct.unpack_from(self.fmt_msg_timestamp, logdata, offset)[0]
275        offset += struct.calcsize(self.fmt_msg_timestamp)
276
277        # domain_id, level
278        if self.is_big_endian:
279            level = domain_lvl & 0x0F
280            domain_id = (domain_lvl >> 4) & 0x0F
281        else:
282            domain_id = domain_lvl & 0x0F
283            level = (domain_lvl >> 4) & 0x0F
284
285        level_str, color = get_log_level_str_color(level)
286        source_id_str = self.database.get_log_source_string(domain_id, source_id)
287
288        # Skip over data to point to next message (save as return value)
289        next_msg_offset = offset + pkg_len + data_len
290
291        # Offset from beginning of cbprintf_packaged data to end of va_list arguments
292        offset_end_of_args = struct.unpack_from("B", logdata, offset)[0]
293        offset_end_of_args *= self.data_types.get_sizeof(DataTypes.INT)
294        offset_end_of_args += offset
295
296        # Extra data after packaged log
297        extra_data = logdata[(offset + pkg_len):next_msg_offset]
298
299        # Number of appended strings in package
300        num_packed_strings = struct.unpack_from("B", logdata, offset+1)[0]
301
302        # Number of read-only string indexes
303        num_ro_str_indexes = struct.unpack_from("B", logdata, offset+2)[0]
304        offset_end_of_args += num_ro_str_indexes
305
306        # Number of read-write string indexes
307        num_rw_str_indexes = struct.unpack_from("B", logdata, offset+3)[0]
308        offset_end_of_args += num_rw_str_indexes
309
310        # Extract the string table in the packaged log message
311        string_tbl = self.extract_string_table(logdata[offset_end_of_args:(offset + pkg_len)])
312
313        if len(string_tbl) != num_packed_strings:
314            logger.error("------ Error extracting string table")
315            return None
316
317        # Skip packaged string header
318        offset += self.data_types.get_sizeof(DataTypes.PTR)
319
320        # Grab the format string
321        #
322        # Note the negative offset to __get_string(). It is because
323        # the offset begins at 0 for va_list. However, the format string
324        # itself is before the va_list, so need to go back the width of
325        # a pointer.
326        fmt_str_ptr = struct.unpack_from(self.data_types.get_formatter(DataTypes.PTR),
327                                         logdata, offset)[0]
328        fmt_str = self.__get_string(fmt_str_ptr,
329                                    -self.data_types.get_sizeof(DataTypes.PTR),
330                                    string_tbl)
331        offset += self.data_types.get_sizeof(DataTypes.PTR)
332
333        if not fmt_str:
334            logger.error("------ Error getting format string at 0x%x", fmt_str_ptr)
335            return None
336
337        args = self.process_one_fmt_str(fmt_str, logdata[offset:offset_end_of_args], string_tbl)
338
339        fmt_str = formalize_fmt_string(fmt_str)
340        log_msg = fmt_str % args
341
342        if level == 0:
343            print(f"{log_msg}", end='')
344            log_prefix = ""
345        else:
346            log_prefix = f"[{timestamp:>10}] <{level_str}> {source_id_str}: "
347            print(f"{color}%s%s{Fore.RESET}" % (log_prefix, log_msg))
348
349        if data_len > 0:
350            # Has hexdump data
351            self.print_hexdump(extra_data, len(log_prefix), color)
352
353        # Point to next message
354        return next_msg_offset
355
356
357    def parse_log_data(self, logdata, debug=False):
358        """Parse binary log data and print the encoded log messages"""
359        offset = 0
360
361        while offset < len(logdata):
362            # Get message type
363            msg_type = struct.unpack_from(self.fmt_msg_type, logdata, offset)[0]
364            offset += struct.calcsize(self.fmt_msg_type)
365
366            if msg_type == MSG_TYPE_DROPPED:
367                num_dropped = struct.unpack_from(self.fmt_dropped_cnt, logdata, offset)
368                offset += struct.calcsize(self.fmt_dropped_cnt)
369
370                print(f"--- {num_dropped} messages dropped ---")
371
372            elif msg_type == MSG_TYPE_NORMAL:
373                ret = self.parse_one_normal_msg(logdata, offset)
374                if ret is None:
375                    return False
376
377                offset = ret
378
379            else:
380                logger.error("------ Unknown message type: %s", msg_type)
381                return False
382
383        return True
384
385colorama.init()
386