1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import collections 31import io 32import struct 33 34from binascii import hexlify 35from enum import IntEnum 36 37 38class CoapMessageType(IntEnum): 39 CON = 0 # Confirmable 40 NON = 1 # Non-confirmable 41 ACK = 2 # Acknowledgement 42 RST = 3 # Reset 43 44 45class CoapOptionsTypes(IntEnum): 46 IF_MATCH = 1 47 URI_HOST = 3 48 ETAG = 4 49 IF_NOT_MATCH = 5 50 URI_PORT = 7 51 LOCATION_PATH = 8 52 URI_PATH = 11 53 CONTENT_FORMAT = 12 54 MAX_AGE = 14 55 URI_QUERY = 15 56 ACCEPT = 17 57 LOCATION_QUERY = 20 58 PROXY_URI = 35 59 PROXY_SCHEME = 39 60 SIZE1 = 60 61 62 63class CoapOptionHeader(object): 64 """ Class representing CoAP optional header. """ 65 66 def __init__(self, delta, length): 67 self._delta = delta 68 self._length = length 69 70 @property 71 def delta(self): 72 return self._delta 73 74 @property 75 def length(self): 76 return self._length 77 78 @property 79 def is_payload_marker(self): 80 return self.delta == 0xF and self.length == 0xF 81 82 @classmethod 83 def _read_extended_value(cls, data, value): 84 if value == 13: 85 return ord(data.read(1)) + 13 86 elif value == 14: 87 data.read(1) 88 return ord(data.read(1)) + 269 89 else: 90 return value 91 92 @classmethod 93 def from_bytes(cls, data): 94 initial_byte = ord(data.read(1)) 95 96 delta = (initial_byte >> 4) & 0xF 97 length = initial_byte & 0xF 98 99 delta = cls._read_extended_value(data, delta) 100 length = cls._read_extended_value(data, length) 101 102 return cls(delta, length) 103 104 105class CoapOption(object): 106 """ Class representing CoAP option. """ 107 108 def __init__(self, _type, value): 109 self._type = _type 110 self._value = value 111 112 @property 113 def type(self): 114 return self._type 115 116 @property 117 def value(self): 118 return self._value 119 120 def __repr__(self): 121 return "CoapOption(type={}, value={})".format(self.type, hexlify(self.value)) 122 123 124class CoapOptionsFactory(object): 125 """ Factory that produces CoAP options. """ 126 127 def parse(self, data, message_info): 128 options = [] 129 130 _type = 0 131 while data.tell() < len(data.getvalue()): 132 option_header = CoapOptionHeader.from_bytes(data) 133 if option_header.is_payload_marker: 134 break 135 136 _type += option_header.delta 137 value = data.read(option_header.length) 138 139 option = CoapOption(_type, value) 140 options.append(option) 141 142 return options 143 144 145class CoapCode(object): 146 """ Class representing CoAP code. """ 147 148 def __init__(self, code): 149 self._code = code 150 151 @property 152 def code(self): 153 return self._code 154 155 @property 156 def _class(self): 157 return (self.code >> 5) & 0x7 158 159 @property 160 def detail(self): 161 return self.code & 0x1F 162 163 @classmethod 164 def from_class_and_detail(cls, _class, detail): 165 return cls(((_class & 0x7) << 5) | (detail & 0x1F)) 166 167 @classmethod 168 def from_dotted(cls, dotted_str): 169 _class, detail = dotted_str.split(".") 170 return cls.from_class_and_detail(int(_class), int(detail)) 171 172 def is_equal_dotted(self, dotted_code): 173 other = self.from_dotted(dotted_code) 174 return self.code == other.code 175 176 @property 177 def dotted(self): 178 return ".".join(["{:01d}".format(self._class), "{:02d}".format(self.detail)]) 179 180 def __eq__(self, other): 181 if isinstance(other, int): 182 return self.code == other 183 184 elif isinstance(other, str): 185 return self.is_equal_dotted(other) 186 187 elif isinstance(other, self.__class__): 188 return self.code == other.code 189 190 else: 191 raise TypeError("Could not compare {} and {}".format(type(self), type(other))) 192 193 def __repr__(self): 194 return self.dotted 195 196 197class CoapMessage(object): 198 """ Class representing CoAP message. """ 199 200 def __init__( 201 self, 202 version, 203 _type, 204 code, 205 message_id, 206 token, 207 options, 208 payload, 209 uri_path=None, 210 ): 211 self._version = version 212 self._type = _type 213 self._code = code 214 self._message_id = message_id 215 self._token = token 216 self._options = options 217 self._payload = payload 218 self._uri_path = uri_path 219 220 @property 221 def version(self): 222 return self._version 223 224 @property 225 def type(self): 226 return self._type 227 228 @property 229 def code(self): 230 return self._code 231 232 @property 233 def message_id(self): 234 return self._message_id 235 236 @property 237 def token(self): 238 return self._token 239 240 @property 241 def tkl(self): 242 return len(self._token) 243 244 @property 245 def options(self): 246 return self._options 247 248 @property 249 def payload(self): 250 return self._payload 251 252 @property 253 def uri_path(self): 254 return self._uri_path 255 256 def __repr__(self): 257 options_str = ", ".join([repr(opt) for opt in self.options]) 258 return ("CoapMessage(version={}, type={}, code={}, message_id={}, token={}, options=[{}], payload={},", 259 "uri-path='{}')").format( 260 self.version, 261 CoapMessageType.name[self.type], 262 self.code, 263 self.message_id, 264 hexlify(self.token), 265 options_str, 266 self.payload, 267 self.uri_path, 268 ) 269 270 271class CoapMessageProxy(object): 272 """ Proxy class of CoAP message. 273 274 The main idea behind this class is to delay parsing payload. Due to architecture of the existing solution 275 it is possible to process confirmation message before a request message. In such case it is not possible 276 to get URI path to get proper payload parser. 277 """ 278 279 def __init__( 280 self, 281 coap_message, 282 message_info, 283 mid_to_uri_path_binder, 284 uri_path_based_payload_factories, 285 ): 286 self._coap_message = coap_message 287 self._message_info = message_info 288 self._mid_to_uri_path_binder = mid_to_uri_path_binder 289 self._uri_path_based_payload_factories = (uri_path_based_payload_factories) 290 291 @property 292 def version(self): 293 return self._coap_message.version 294 295 @property 296 def type(self): 297 return self._coap_message.type 298 299 @property 300 def code(self): 301 return self._coap_message.code 302 303 @property 304 def message_id(self): 305 return self._coap_message.message_id 306 307 @property 308 def token(self): 309 return self._coap_message.token 310 311 @property 312 def tkl(self): 313 return self._coap_message.tkl 314 315 @property 316 def options(self): 317 return self._coap_message.options 318 319 @property 320 def payload(self): 321 try: 322 binded_uri_path = self._mid_to_uri_path_binder.get_uri_path_for(self.message_id, self.token) 323 324 factory = self._uri_path_based_payload_factories[binded_uri_path] 325 326 return factory.parse(io.BytesIO(self._coap_message.payload), self._message_info) 327 328 except RuntimeError: 329 return self._coap_message.payload 330 331 @property 332 def uri_path(self): 333 return self._coap_message.uri_path 334 335 def __repr__(self): 336 options_str = ", ".join([repr(opt) for opt in self.options]) 337 return ("CoapMessageProxy(version={}, type={}, code={}, message_id={}, token={}, options=[{}], payload={},", 338 "uri-path='{}')").format( 339 self.version, 340 self.type, 341 self.code, 342 self.message_id, 343 hexlify(self.token), 344 options_str, 345 self.payload, 346 self.uri_path, 347 ) 348 349 350class CoapMessageIdToUriPathBinder: 351 """ Class binds message id and token with URI path. """ 352 353 def __init__(self): 354 self._uri_path_binds = collections.defaultdict(collections.defaultdict) 355 356 def add_uri_path_for(self, message_id, token, uri_path): 357 self._uri_path_binds[message_id][hexlify(token)] = uri_path 358 359 def get_uri_path_for(self, message_id, token): 360 try: 361 return self._uri_path_binds[message_id][hexlify(token)] 362 except KeyError: 363 raise RuntimeError("Could not find URI PATH for message_id: {} and token: {}".format( 364 message_id, hexlify(token))) 365 366 367class CoapMessageFactory(object): 368 """ Factory that produces CoAP messages. """ 369 370 def __init__( 371 self, 372 options_factory, 373 uri_path_based_payload_factories, 374 message_id_to_uri_path_binder, 375 ): 376 self._options_factory = options_factory 377 self._uri_path_based_payload_factories = (uri_path_based_payload_factories) 378 self._mid_to_uri_path_binder = message_id_to_uri_path_binder 379 380 def _uri_path_from(self, options): 381 uri_path_options = [] 382 383 for option in options: 384 if option.type == CoapOptionsTypes.URI_PATH: 385 uri_path_options.append(option.value.decode("utf-8")) 386 387 if not uri_path_options: 388 return None 389 390 return "/" + "/".join(uri_path_options) 391 392 def _parse_initial_byte(self, data, message_info): 393 initial_byte = ord(data.read(1)) 394 395 version = (initial_byte >> 6) & 0x3 396 _type = CoapMessageType((initial_byte >> 4) & 0x3) 397 token_length = initial_byte & 0xF 398 399 return version, _type, token_length 400 401 def parse(self, data, message_info): 402 version, _type, token_length = self._parse_initial_byte(data, message_info) 403 404 code = CoapCode(ord(data.read(1))) 405 message_id = struct.unpack(">H", data.read(2))[0] 406 token = data.read(token_length) 407 408 options = self._options_factory.parse(data, message_info) 409 410 uri_path = self._uri_path_from(options) 411 if uri_path is not None: 412 self._mid_to_uri_path_binder.add_uri_path_for(message_id, token, uri_path) 413 414 coap_message = CoapMessage( 415 version, 416 _type, 417 code, 418 message_id, 419 token, 420 options, 421 data.read(), 422 uri_path, 423 ) 424 425 return CoapMessageProxy( 426 coap_message, 427 message_info, 428 self._mid_to_uri_path_binder, 429 self._uri_path_based_payload_factories, 430 ) 431