1 /***************************************************************************
2  * Copyright (c) 2024 Microsoft Corporation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the MIT License which is available at
6  * https://opensource.org/licenses/MIT.
7  *
8  * SPDX-License-Identifier: MIT
9  **************************************************************************/
10 
11 #include "nx_azure_iot_hub_client.h"
12 
13 #include "azure/core/az_version.h"
14 
15 
16 #define NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE     10
17 #define NX_AZURE_IOT_HUB_CLIENT_EMPTY_JSON              "{}"
18 #define NX_AZURE_IOT_HUB_CLIENT_THROTTLE_STATUS_CODE    429
19 
20 #ifndef NX_AZURE_IOT_HUB_CLIENT_USER_AGENT
21 #ifndef NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_INTERFACE_TYPE
22 #define NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_INTERFACE_TYPE NX_INTERFACE_TYPE_UNKNOWN
23 #define NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_UPDATE
24 #endif /* NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_INTERFACE_TYPE */
25 
26 #ifndef NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_DEVICE_VENDOR
27 #define NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_DEVICE_VENDOR 0
28 #endif /* NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_DEVICE_VENDOR */
29 
30 #ifndef NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_DEVICE_TYPE
31 #define NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_DEVICE_TYPE "U"
32 #endif /* NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_DEVICE_TYPE */
33 
34 /* Useragent e.g: azsdk-c%2F1.3.0%20%28azrtos%206.1.2%3B0%3B0%3BU%29 */
35 #define NX_AZURE_IOT_HUB_CLIENT_STR(C)          #C
36 #define NX_AZURE_IOT_HUB_CLIENT_TO_STR(x)       NX_AZURE_IOT_HUB_CLIENT_STR(x)
37 #define NX_AZURE_IOT_HUB_CLIENT_USER_AGENT      "azsdk-c%2F" AZ_SDK_VERSION_STRING "%20%28azrtos%20" \
38                                                 NX_AZURE_IOT_HUB_CLIENT_TO_STR(THREADX_MAJOR_VERSION) "." \
39                                                 NX_AZURE_IOT_HUB_CLIENT_TO_STR(THREADX_MINOR_VERSION) "." \
40                                                 NX_AZURE_IOT_HUB_CLIENT_TO_STR(THREADX_PATCH_VERSION) "%3B"\
41                                                 NX_AZURE_IOT_HUB_CLIENT_TO_STR(NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_INTERFACE_TYPE) "%3B"\
42                                                 NX_AZURE_IOT_HUB_CLIENT_TO_STR(NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_DEVICE_VENDOR) "%3B"\
43                                                 NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_DEVICE_TYPE "%29"
44 static UCHAR _nx_azure_iot_hub_client_user_agent[] = NX_AZURE_IOT_HUB_CLIENT_USER_AGENT;
45 
46 #endif /* NX_AZURE_IOT_HUB_CLIENT_USER_AGENT */
47 
48 #define NX_AZURE_IOT_HUB_CLIENT_COMPONENT_STRING        "$.sub"
49 
50 #define NX_AZURE_IOT_HUB_CLIENT_WEB_SOCKET_PATH         "/$iothub/websocket"
51 
52 static VOID nx_azure_iot_hub_client_received_message_cleanup(NX_AZURE_IOT_HUB_CLIENT_RECEIVE_MESSAGE *message);
53 static UINT nx_azure_iot_hub_client_cloud_message_sub_unsub(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
54                                                             UINT is_subscribe);
55 static UINT nx_azure_iot_hub_client_process_publish_packet(UCHAR *start_ptr,
56                                                            ULONG *topic_offset_ptr,
57                                                            USHORT *topic_length_ptr);
58 static VOID nx_azure_iot_hub_client_mqtt_receive_callback(NXD_MQTT_CLIENT* client_ptr,
59                                                           UINT number_of_messages);
60 static UINT nx_azure_iot_hub_client_c2d_process(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
61                                                 NX_PACKET *packet_ptr,
62                                                 ULONG topic_offset,
63                                                 USHORT topic_length);
64 static UINT nx_azure_iot_hub_client_properties_process(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
65                                                        NX_PACKET *packet_ptr,
66                                                        ULONG topic_offset,
67                                                        USHORT topic_length);
68 static UINT nx_azure_iot_hub_client_device_twin_parse(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
69                                                       NX_PACKET *packet_ptr, ULONG topic_offset,
70                                                       USHORT topic_length, UINT *request_id_ptr,
71                                                       ULONG *version_ptr, UINT *message_type_ptr,
72                                                       UINT *status_ptr);
73 extern UINT _nxd_mqtt_process_publish_packet(NX_PACKET *packet_ptr, ULONG *topic_offset_ptr,
74                                              USHORT *topic_length_ptr, ULONG *message_offset_ptr,
75                                              ULONG *message_length_ptr);
76 static VOID nx_azure_iot_hub_client_mqtt_connect_notify(struct NXD_MQTT_CLIENT_STRUCT *client_ptr,
77                                                         UINT status, VOID *context);
78 static VOID nx_azure_iot_hub_client_mqtt_disconnect_notify(NXD_MQTT_CLIENT *client_ptr);
79 static VOID nx_azure_iot_hub_client_mqtt_ack_receive_notify(NXD_MQTT_CLIENT *client_ptr, UINT type,
80                                                             USHORT packet_id, NX_PACKET *transmit_packet_ptr,
81                                                             VOID *context);
82 static VOID nx_azure_iot_hub_client_thread_dequeue(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
83                                                    NX_AZURE_IOT_THREAD *thread_list_ptr);
84 static UINT nx_azure_iot_hub_client_sas_token_get(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
85                                                   ULONG expiry_time_secs, const UCHAR *key, UINT key_len,
86                                                   UCHAR *sas_buffer, UINT sas_buffer_len, UINT *sas_length);
87 static UINT nx_azure_iot_hub_client_messages_enable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr);
88 UINT nx_azure_iot_hub_client_adjust_payload(NX_PACKET *packet_ptr);
89 UINT nx_azure_iot_hub_client_component_add_internal(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
90                                                     const UCHAR *component_name_ptr,
91                                                     USHORT component_name_length,
92                                                     UINT (*callback_ptr)(VOID *json_reader_ptr,
93                                                                         ULONG version,
94                                                                         VOID *args),
95                                                     VOID *callback_args);
96 
nx_azure_iot_hub_client_throttle_with_jitter(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)97 static UINT nx_azure_iot_hub_client_throttle_with_jitter(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
98 {
99 UINT jitter;
100 UINT base_delay = NX_AZURE_IOT_HUB_CLIENT_MAX_BACKOFF_IN_SEC;
101 UINT retry_count = hub_client_ptr -> nx_azure_iot_hub_client_throttle_count;
102 uint64_t delay;
103 
104     if (retry_count < (sizeof(UINT) * 8 - 1))
105     {
106         retry_count++;
107         delay = (uint64_t)((1 << retry_count) * NX_AZURE_IOT_HUB_CLIENT_INITIAL_BACKOFF_IN_SEC);
108 
109         if (delay <= (UINT)(-1))
110         {
111             base_delay = (UINT)delay;
112         }
113     }
114 
115     if (base_delay > NX_AZURE_IOT_HUB_CLIENT_MAX_BACKOFF_IN_SEC)
116     {
117         base_delay = NX_AZURE_IOT_HUB_CLIENT_MAX_BACKOFF_IN_SEC;
118     }
119     else
120     {
121        hub_client_ptr -> nx_azure_iot_hub_client_throttle_count = retry_count;
122     }
123 
124     jitter = base_delay * NX_AZURE_IOT_HUB_CLIENT_MAX_BACKOFF_JITTER_PERCENT * (NX_RAND() & 0xFF) / 25600;
125     return((UINT)(base_delay + jitter));
126 }
127 
nx_azure_iot_hub_client_throttled_check(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)128 static UINT nx_azure_iot_hub_client_throttled_check(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
129 {
130 ULONG current_time;
131 UINT status = NX_AZURE_IOT_SUCCESS;
132 
133     if (hub_client_ptr -> nx_azure_iot_hub_client_throttle_count != 0)
134     {
135         if ((status = nx_azure_iot_unix_time_get(hub_client_ptr -> nx_azure_iot_ptr, &current_time)))
136         {
137             LogError(LogLiteralArgs("IoTHub client fail to get unix time: %d"), status);
138             return(status);
139         }
140 
141         if (current_time < hub_client_ptr -> nx_azure_iot_hub_client_throttle_end_time)
142         {
143             return(NX_AZURE_IOT_THROTTLED);
144         }
145     }
146 
147     return(status);
148 }
149 
nx_azure_iot_hub_client_initialize(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_AZURE_IOT * nx_azure_iot_ptr,const UCHAR * host_name,UINT host_name_length,const UCHAR * device_id,UINT device_id_length,const UCHAR * module_id,UINT module_id_length,const NX_CRYPTO_METHOD ** crypto_array,UINT crypto_array_size,const NX_CRYPTO_CIPHERSUITE ** cipher_map,UINT cipher_map_size,UCHAR * metadata_memory,UINT memory_size,NX_SECURE_X509_CERT * trusted_certificate)150 UINT nx_azure_iot_hub_client_initialize(NX_AZURE_IOT_HUB_CLIENT* hub_client_ptr,
151                                         NX_AZURE_IOT *nx_azure_iot_ptr,
152                                         const UCHAR *host_name, UINT host_name_length,
153                                         const UCHAR *device_id, UINT device_id_length,
154                                         const UCHAR *module_id, UINT module_id_length,
155                                         const NX_CRYPTO_METHOD **crypto_array, UINT crypto_array_size,
156                                         const NX_CRYPTO_CIPHERSUITE **cipher_map, UINT cipher_map_size,
157                                         UCHAR * metadata_memory, UINT memory_size,
158                                         NX_SECURE_X509_CERT *trusted_certificate)
159 {
160 
161 UINT status;
162 NX_AZURE_IOT_RESOURCE *resource_ptr;
163 az_span hostname_span = az_span_create((UCHAR *)host_name, (INT)host_name_length);
164 az_span device_id_span = az_span_create((UCHAR *)device_id, (INT)device_id_length);
165 az_iot_hub_client_options options = az_iot_hub_client_options_default();
166 az_result core_result;
167 
168     if ((nx_azure_iot_ptr == NX_NULL) || (hub_client_ptr == NX_NULL) || (host_name == NX_NULL) ||
169         (device_id == NX_NULL) || (host_name_length == 0) || (device_id_length == 0))
170     {
171         LogError(LogLiteralArgs("IoTHub client initialization fail: INVALID POINTER"));
172         return(NX_AZURE_IOT_INVALID_PARAMETER);
173     }
174 
175     memset(hub_client_ptr, 0, sizeof(NX_AZURE_IOT_HUB_CLIENT));
176 
177     hub_client_ptr -> nx_azure_iot_ptr = nx_azure_iot_ptr;
178     hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_crypto_array = crypto_array;
179     hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_crypto_array_size = crypto_array_size;
180     hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_cipher_map = cipher_map;
181     hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_cipher_map_size = cipher_map_size;
182     hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_metadata_ptr = metadata_memory;
183     hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_metadata_size = memory_size;
184     hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_trusted_certificates[0] = trusted_certificate;
185     hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_hostname = host_name;
186     hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_hostname_length = host_name_length;
187     options.module_id = az_span_create((UCHAR *)module_id, (INT)module_id_length);
188     options.user_agent = AZ_SPAN_FROM_STR(NX_AZURE_IOT_HUB_CLIENT_USER_AGENT);
189     options.component_names = hub_client_ptr -> nx_azure_iot_hub_client_component_list;
190     options.component_names_length = 0;
191 
192     core_result = az_iot_hub_client_init(&hub_client_ptr -> iot_hub_client_core,
193                                          hostname_span, device_id_span, &options);
194     if (az_result_failed(core_result))
195     {
196         LogError(LogLiteralArgs("IoTHub client failed initialization with error status: %d"), core_result);
197         return(NX_AZURE_IOT_SDK_CORE_ERROR);
198     }
199 
200     /* Set resource pointer.  */
201     resource_ptr = &(hub_client_ptr -> nx_azure_iot_hub_client_resource);
202 
203     /* Create MQTT client.  */
204     status = _nxd_mqtt_client_cloud_create(&(resource_ptr -> resource_mqtt),
205                                            (CHAR *)nx_azure_iot_ptr -> nx_azure_iot_name,
206                                            "", 0,
207                                            nx_azure_iot_ptr -> nx_azure_iot_ip_ptr,
208                                            nx_azure_iot_ptr -> nx_azure_iot_pool_ptr,
209                                            &nx_azure_iot_ptr -> nx_azure_iot_cloud);
210     if (status)
211     {
212         LogError(LogLiteralArgs("IoTHub client initialization fail: MQTT CLIENT CREATE FAIL status: %d"), status);
213         return(status);
214     }
215 
216     /* Set mqtt receive notify.  */
217     status = nxd_mqtt_client_receive_notify_set(&(resource_ptr -> resource_mqtt),
218                                                 nx_azure_iot_hub_client_mqtt_receive_callback);
219     if (status)
220     {
221         LogError(LogLiteralArgs("IoTHub client set message callback status: %d"), status);
222         nxd_mqtt_client_delete(&(resource_ptr -> resource_mqtt));
223         return(status);
224     }
225 
226     /* Obtain the mutex.   */
227     tx_mutex_get(nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
228 
229     /* Link the resource.  */
230     resource_ptr -> resource_data_ptr = (VOID *)hub_client_ptr;
231     resource_ptr -> resource_type = NX_AZURE_IOT_RESOURCE_IOT_HUB;
232     nx_azure_iot_resource_add(nx_azure_iot_ptr, resource_ptr);
233 
234     /* Release the mutex.  */
235     tx_mutex_put(nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
236 
237     return(NX_AZURE_IOT_SUCCESS);
238 }
239 
nx_azure_iot_hub_client_connection_status_callback_set(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,VOID (* connection_status_cb)(struct NX_AZURE_IOT_HUB_CLIENT_STRUCT * client_ptr,UINT status))240 UINT nx_azure_iot_hub_client_connection_status_callback_set(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
241                                                             VOID (*connection_status_cb)(
242                                                                   struct NX_AZURE_IOT_HUB_CLIENT_STRUCT *client_ptr,
243                                                                   UINT status))
244 {
245 
246     /* Check for invalid input pointers.  */
247     if ((hub_client_ptr == NX_NULL) || (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL))
248     {
249         LogError(LogLiteralArgs("IoTHub client connect callback fail: INVALID POINTER"));
250         return(NX_AZURE_IOT_INVALID_PARAMETER);
251     }
252 
253     /* Set callback function for disconnection.  */
254     nxd_mqtt_client_disconnect_notify_set(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
255                                           nx_azure_iot_hub_client_mqtt_disconnect_notify);
256 
257     /* Obtain the mutex.   */
258     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
259 
260     /* Set connection status callback.  */
261     hub_client_ptr -> nx_azure_iot_hub_client_connection_status_callback = connection_status_cb;
262 
263     /* Release the mutex.  */
264     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
265 
266     /* Return success.  */
267     return(NX_AZURE_IOT_SUCCESS);
268 
269 }
270 
271 #ifdef NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_UPDATE
nx_azure_iot_hub_client_user_agent_update(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NXD_ADDRESS * server_address)272 static VOID nx_azure_iot_hub_client_user_agent_update(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr, NXD_ADDRESS *server_address)
273 {
274 NX_IP        *ip_ptr = hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_ip_ptr;
275 NX_INTERFACE *outgoing_interface = NX_NULL;
276 UINT          status;
277 #ifndef NX_DISABLE_IPV4
278 ULONG         next_hop_address;
279 #endif /* NX_DISABLE_IPV4 */
280 #ifdef FEATURE_NX_IPV6
281 NXD_IPV6_ADDRESS *ipv6_addr;
282 #endif /* FEATURE_NX_IPV6 */
283 ULONG         interface_type;
284 ULONG         index;
285 
286     /* Make sure the server IP address is accesible. */
287 #ifndef NX_DISABLE_IPV4
288     if (server_address -> nxd_ip_version == NX_IP_VERSION_V4)
289     {
290         if (_nx_ip_route_find(ip_ptr, server_address -> nxd_ip_address.v4, &outgoing_interface, &next_hop_address) != NX_SUCCESS)
291         {
292             return;
293         }
294     }
295 #endif /* !NX_DISABLE_IPV4  */
296 
297 #ifdef FEATURE_NX_IPV6
298     /* For IPv6 connections, find a suitable outgoing interface, based on the TCP peer address. */
299     if (server_address -> nxd_ip_version == NX_IP_VERSION_V6)
300     {
301 
302         status = _nxd_ipv6_interface_find(ip_ptr, server_address -> nxd_ip_address.v6,
303                                           &ipv6_addr,
304                                           NX_NULL);
305 
306         if (status != NX_SUCCESS)
307         {
308             return;
309         }
310 
311         outgoing_interface = ipv6_addr -> nxd_ipv6_address_attached;
312     }
313 #endif /* FEATURE_NX_IPV6 */
314 
315     /* Check if found the outgoing interface.  */
316     if (outgoing_interface == NX_NULL)
317     {
318         return;
319     }
320 
321     /* Get the interface type.  */
322     status = nx_ip_driver_interface_direct_command(ip_ptr, NX_LINK_GET_INTERFACE_TYPE, outgoing_interface -> nx_interface_index, &interface_type);
323 
324     /* Check status.  */
325     if (status != NX_SUCCESS)
326     {
327         return;
328     }
329 
330     /* Check if the interface type is listed in nx_api.h.  */
331     if (interface_type > NX_INTERFACE_TYPE_LORAWAN)
332     {
333         interface_type = NX_INTERFACE_TYPE_OTHER;
334     }
335 
336     /* Update the interface type in user agent.  */
337     for (index = 0; index < sizeof(_nx_azure_iot_hub_client_user_agent) - 3; index++)
338     {
339 
340         /* The interface type is after the first semicolon.  */
341         if ((_nx_azure_iot_hub_client_user_agent[index] == '%') &&
342             (_nx_azure_iot_hub_client_user_agent[index + 1] == '3') &&
343             (_nx_azure_iot_hub_client_user_agent[index + 2] == 'B'))
344         {
345             _nx_azure_iot_hub_client_user_agent[index + 3] = (UCHAR)(interface_type + '0');
346             hub_client_ptr -> iot_hub_client_core._internal.options.user_agent = AZ_SPAN_FROM_BUFFER(_nx_azure_iot_hub_client_user_agent);
347             return;
348         }
349     }
350 }
351 #endif /* NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_UPDATE */
352 
353 #ifdef NXD_MQTT_OVER_WEBSOCKET
nx_azure_iot_hub_client_websocket_enable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)354 UINT nx_azure_iot_hub_client_websocket_enable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
355 {
356 NX_AZURE_IOT_RESOURCE *resource_ptr;
357 
358     if (hub_client_ptr == NX_NULL)
359     {
360         LogError(LogLiteralArgs("IoTHub WebSocket enable fail: INVALID POINTER"));
361         return(NX_AZURE_IOT_INVALID_PARAMETER);
362     }
363 
364     /* Set resource pointer.  */
365     resource_ptr = &(hub_client_ptr -> nx_azure_iot_hub_client_resource);
366 
367     return(nxd_mqtt_client_websocket_set(&(resource_ptr -> resource_mqtt), (UCHAR *)resource_ptr -> resource_hostname, resource_ptr -> resource_hostname_length,
368                                          (UCHAR *)NX_AZURE_IOT_HUB_CLIENT_WEB_SOCKET_PATH, sizeof(NX_AZURE_IOT_HUB_CLIENT_WEB_SOCKET_PATH) - 1));
369 }
370 #endif /* NXD_MQTT_OVER_WEBSOCKET */
371 
nx_azure_iot_hub_client_connect(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT clean_session,UINT wait_option)372 UINT nx_azure_iot_hub_client_connect(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
373                                      UINT clean_session, UINT wait_option)
374 {
375 UINT            status;
376 NXD_ADDRESS     server_address;
377 NX_AZURE_IOT_RESOURCE *resource_ptr;
378 NXD_MQTT_CLIENT *mqtt_client_ptr;
379 UCHAR           *buffer_ptr;
380 UINT            buffer_size;
381 VOID            *buffer_context;
382 UINT            buffer_length;
383 ULONG           expiry_time_secs;
384 az_result       core_result;
385 UINT            server_port;
386 
387     /* Check for invalid input pointers.  */
388     if ((hub_client_ptr == NX_NULL) || (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL))
389     {
390         LogError(LogLiteralArgs("IoTHub client connect fail: INVALID POINTER"));
391         return(NX_AZURE_IOT_INVALID_PARAMETER);
392     }
393 
394     /* Check for status.  */
395     if (hub_client_ptr -> nx_azure_iot_hub_client_state == NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTED)
396     {
397         LogError(LogLiteralArgs("IoTHub client already connected"));
398         return(NX_AZURE_IOT_ALREADY_CONNECTED);
399     }
400     else if (hub_client_ptr -> nx_azure_iot_hub_client_state == NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTING)
401     {
402         LogError(LogLiteralArgs("IoTHub client is connecting"));
403         return(NX_AZURE_IOT_CONNECTING);
404     }
405 
406     /* Resolve the host name.  */
407     /* Note: always using default dns timeout.  */
408     status = nxd_dns_host_by_name_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_dns_ptr,
409                                       (UCHAR *)hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_hostname,
410                                       &server_address, NX_AZURE_IOT_HUB_CLIENT_DNS_TIMEOUT, NX_IP_VERSION_V4);
411     if (status)
412     {
413         LogError(LogLiteralArgs("IoTHub client connect fail: DNS RESOLVE FAIL status: %d"), status);
414         return(status);
415     }
416 
417     /* Allocate buffer for client id, username and sas token.  */
418     status = nx_azure_iot_buffer_allocate(hub_client_ptr -> nx_azure_iot_ptr,
419                                           &buffer_ptr, &buffer_size, &buffer_context);
420     if (status)
421     {
422         LogError(LogLiteralArgs("IoTHub client failed initialization: BUFFER ALLOCATE FAIL"));
423         return(status);
424     }
425 
426     /* Obtain the mutex.   */
427     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
428 
429 #ifdef NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_UPDATE
430     /* Add the real interface type into the user agent string.  */
431     nx_azure_iot_hub_client_user_agent_update(hub_client_ptr, &server_address);
432 #endif /* NX_AZURE_IOT_HUB_CLIENT_USER_AGENT_UPDATE */
433 
434     /* Set resource pointer and buffer context.  */
435     resource_ptr = &(hub_client_ptr -> nx_azure_iot_hub_client_resource);
436 
437     /* Build client id.  */
438     buffer_length = buffer_size;
439     core_result = az_iot_hub_client_get_client_id(&(hub_client_ptr -> iot_hub_client_core),
440                                                   (CHAR *)buffer_ptr, buffer_length, (size_t *)&buffer_length);
441     if (az_result_failed(core_result))
442     {
443 
444         /* Release the mutex.  */
445         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
446         nx_azure_iot_buffer_free(buffer_context);
447         LogError(LogLiteralArgs("IoTHub client failed to get clientId with error status: "), core_result);
448         return(NX_AZURE_IOT_SDK_CORE_ERROR);
449     }
450     resource_ptr -> resource_mqtt_client_id = buffer_ptr;
451     resource_ptr -> resource_mqtt_client_id_length = buffer_length;
452 
453     /* Update buffer for user name.  */
454     buffer_ptr += resource_ptr -> resource_mqtt_client_id_length;
455     buffer_size -= resource_ptr -> resource_mqtt_client_id_length;
456 
457     /* Build user name.  */
458     buffer_length = buffer_size;
459     core_result = az_iot_hub_client_get_user_name(&hub_client_ptr -> iot_hub_client_core,
460                                                   (CHAR *)buffer_ptr, buffer_length, (size_t *)&buffer_length);
461     if (az_result_failed(core_result))
462     {
463 
464         /* Release the mutex.  */
465         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
466         nx_azure_iot_buffer_free(buffer_context);
467         LogError(LogLiteralArgs("IoTHub client connect fail, with error status: %d"), core_result);
468         return(NX_AZURE_IOT_SDK_CORE_ERROR);
469     }
470     resource_ptr -> resource_mqtt_user_name = buffer_ptr;
471     resource_ptr -> resource_mqtt_user_name_length = buffer_length;
472 
473     /* Build sas token.  */
474     resource_ptr -> resource_mqtt_sas_token = buffer_ptr + buffer_length;
475     resource_ptr -> resource_mqtt_sas_token_length = buffer_size - buffer_length;
476 
477     /* Check if token refersh is setup.  */
478     if (hub_client_ptr -> nx_azure_iot_hub_client_token_refresh)
479     {
480         status = nx_azure_iot_unix_time_get(hub_client_ptr -> nx_azure_iot_ptr, &expiry_time_secs);
481         if (status)
482         {
483 
484             /* Release the mutex.  */
485             tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
486             nx_azure_iot_buffer_free(buffer_context);
487             LogError(LogLiteralArgs("IoTHub client connect fail: unixtime get failed status: %d"), status);
488             return(status);
489         }
490 
491         expiry_time_secs += NX_AZURE_IOT_HUB_CLIENT_TOKEN_CONNECTION_TIMEOUT;
492         status = hub_client_ptr -> nx_azure_iot_hub_client_token_refresh(hub_client_ptr,
493                                                                          expiry_time_secs,
494                                                                          hub_client_ptr -> nx_azure_iot_hub_client_symmetric_key,
495                                                                          hub_client_ptr -> nx_azure_iot_hub_client_symmetric_key_length,
496                                                                          resource_ptr -> resource_mqtt_sas_token,
497                                                                          resource_ptr -> resource_mqtt_sas_token_length,
498                                                                          &(resource_ptr -> resource_mqtt_sas_token_length));
499         if (status)
500         {
501 
502             /* Release the mutex.  */
503             tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
504             nx_azure_iot_buffer_free(buffer_context);
505             LogError(LogLiteralArgs("IoTHub client connect fail: Token generation failed status: %d"), status);
506             return(status);
507         }
508         hub_client_ptr -> nx_azure_iot_hub_client_sas_token_expiry_time = expiry_time_secs;
509     }
510     else
511     {
512         resource_ptr -> resource_mqtt_sas_token = NX_NULL;
513         resource_ptr ->  resource_mqtt_sas_token_length = 0;
514     }
515 
516     /* Set azure IoT and MQTT client.  */
517     mqtt_client_ptr = &(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt);
518 
519     /* Update client id.  */
520     mqtt_client_ptr -> nxd_mqtt_client_id = (CHAR *)resource_ptr -> resource_mqtt_client_id;
521     mqtt_client_ptr -> nxd_mqtt_client_id_length = resource_ptr -> resource_mqtt_client_id_length;
522 
523     /* Set login info.  */
524     status = nxd_mqtt_client_login_set(&(resource_ptr -> resource_mqtt),
525                                        (CHAR *)resource_ptr -> resource_mqtt_user_name,
526                                        resource_ptr -> resource_mqtt_user_name_length,
527                                        (CHAR *)resource_ptr -> resource_mqtt_sas_token,
528                                        resource_ptr -> resource_mqtt_sas_token_length);
529     if (status)
530     {
531 
532         /* Release the mutex.  */
533         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
534         nx_azure_iot_buffer_free(buffer_context);
535         LogError(LogLiteralArgs("IoTHub client connect fail: MQTT CLIENT LOGIN SET FAIL status: %d"), status);
536         return(status);
537     }
538 
539     /* Set connect notify for non-blocking mode.  */
540     if (wait_option == 0)
541     {
542         mqtt_client_ptr -> nxd_mqtt_connect_notify = nx_azure_iot_hub_client_mqtt_connect_notify;
543         mqtt_client_ptr -> nxd_mqtt_connect_context = hub_client_ptr;
544     }
545 
546     /* Save the resource buffer.  */
547     resource_ptr -> resource_mqtt_buffer_context = buffer_context;
548     resource_ptr -> resource_mqtt_buffer_size = buffer_size;
549 
550     /* Release the mutex.  */
551     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
552 
553 #ifdef NXD_MQTT_OVER_WEBSOCKET
554     if (mqtt_client_ptr -> nxd_mqtt_client_use_websocket == NX_TRUE)
555     {
556         server_port = NXD_MQTT_OVER_WEBSOCKET_TLS_PORT;
557     }
558     else
559     {
560         server_port = NXD_MQTT_TLS_PORT;
561     }
562 #else
563     server_port = NXD_MQTT_TLS_PORT;
564 #endif /* NXD_MQTT_OVER_WEBSOCKET */
565 
566     /* Start MQTT connection.  */
567     status = nxd_mqtt_client_secure_connect(mqtt_client_ptr, &server_address, server_port,
568                                             nx_azure_iot_mqtt_tls_setup, NX_AZURE_IOT_MQTT_KEEP_ALIVE,
569                                             clean_session, wait_option);
570 
571     /* Obtain the mutex.  */
572     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
573 
574     /* Check status for non-blocking mode.  */
575     if ((wait_option == 0) && (status == NX_IN_PROGRESS))
576     {
577         hub_client_ptr -> nx_azure_iot_hub_client_state = NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTING;
578 
579         /* Release the mutex.  */
580         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
581 
582         /* Return in-progress completion status.  */
583         return(NX_AZURE_IOT_CONNECTING);
584     }
585 
586     /* Release the mqtt connection resource.  */
587     if (resource_ptr -> resource_mqtt_buffer_context)
588     {
589         nx_azure_iot_buffer_free(resource_ptr -> resource_mqtt_buffer_context);
590         resource_ptr -> resource_mqtt_buffer_context = NX_NULL;
591     }
592 
593     /* Check status.  */
594     if (status != NX_AZURE_IOT_SUCCESS)
595     {
596         hub_client_ptr -> nx_azure_iot_hub_client_state = NX_AZURE_IOT_HUB_CLIENT_STATUS_NOT_CONNECTED;
597         LogError(LogLiteralArgs("IoTHub client connect fail: MQTT CONNECT FAIL status: %d"), status);
598     }
599     else
600     {
601 
602         /* Connected to IoT Hub.  */
603         hub_client_ptr -> nx_azure_iot_hub_client_state = NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTED;
604     }
605 
606     if (status == NX_AZURE_IOT_SUCCESS)
607     {
608         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
609 
610         status = nx_azure_iot_hub_client_messages_enable(hub_client_ptr);
611 
612         tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
613 
614         if (status)
615         {
616             hub_client_ptr -> nx_azure_iot_hub_client_state = NX_AZURE_IOT_HUB_CLIENT_STATUS_NOT_CONNECTED;
617             LogError(LogLiteralArgs("IoTHub client connect fail: MQTT SUBSCRIBE FAIL status: %d"), status);
618         }
619     }
620 
621     /* Call connection notify if it is set.  */
622     if (hub_client_ptr -> nx_azure_iot_hub_client_connection_status_callback)
623     {
624         hub_client_ptr -> nx_azure_iot_hub_client_connection_status_callback(hub_client_ptr, status);
625     }
626 
627     /* Release the mutex.  */
628     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
629 
630     return(status);
631 }
632 
nx_azure_iot_hub_client_mqtt_connect_notify(struct NXD_MQTT_CLIENT_STRUCT * client_ptr,UINT status,VOID * context)633 static VOID nx_azure_iot_hub_client_mqtt_connect_notify(struct NXD_MQTT_CLIENT_STRUCT *client_ptr,
634                                                         UINT status, VOID *context)
635 {
636 
637 NX_AZURE_IOT_HUB_CLIENT *iot_hub_client = (NX_AZURE_IOT_HUB_CLIENT*)context;
638 
639 
640     NX_PARAMETER_NOT_USED(client_ptr);
641 
642     /* Obtain the mutex.  */
643     tx_mutex_get(iot_hub_client -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
644 
645     /* Release the mqtt connection resource.  */
646     if (iot_hub_client -> nx_azure_iot_hub_client_resource.resource_mqtt_buffer_context)
647     {
648         nx_azure_iot_buffer_free(iot_hub_client -> nx_azure_iot_hub_client_resource.resource_mqtt_buffer_context);
649         iot_hub_client -> nx_azure_iot_hub_client_resource.resource_mqtt_buffer_context = NX_NULL;
650     }
651 
652     /* Update hub client status.  */
653     if (status == NXD_MQTT_SUCCESS)
654     {
655         iot_hub_client -> nx_azure_iot_hub_client_state = NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTED;
656     }
657     else
658     {
659         iot_hub_client -> nx_azure_iot_hub_client_state = NX_AZURE_IOT_HUB_CLIENT_STATUS_NOT_CONNECTED;
660     }
661 
662     if (status == NXD_MQTT_SUCCESS)
663     {
664         tx_mutex_put(iot_hub_client -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
665 
666         status = nx_azure_iot_hub_client_messages_enable(iot_hub_client);
667 
668         tx_mutex_get(iot_hub_client -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
669 
670         if (status)
671         {
672             iot_hub_client -> nx_azure_iot_hub_client_state = NX_AZURE_IOT_HUB_CLIENT_STATUS_NOT_CONNECTED;
673             LogError(LogLiteralArgs("IoTHub client connect fail: MQTT SUBSCRIBE FAIL status: %d"), status);
674         }
675     }
676 
677     /* Call connection notify if it is set.  */
678     if (iot_hub_client -> nx_azure_iot_hub_client_connection_status_callback)
679     {
680         iot_hub_client -> nx_azure_iot_hub_client_connection_status_callback(iot_hub_client, status);
681     }
682 
683     /* Release the mutex.  */
684     tx_mutex_put(iot_hub_client -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
685 }
686 
nx_azure_iot_hub_client_mqtt_disconnect_notify(NXD_MQTT_CLIENT * client_ptr)687 static VOID nx_azure_iot_hub_client_mqtt_disconnect_notify(NXD_MQTT_CLIENT *client_ptr)
688 {
689 NX_AZURE_IOT_RESOURCE *resource = nx_azure_iot_resource_search(client_ptr);
690 NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr = NX_NULL;
691 NX_AZURE_IOT_THREAD *thread_list_ptr;
692 ULONG current_time = 0;
693 
694     /* This function is protected by MQTT mutex.  */
695 
696     if (resource && (resource -> resource_type == NX_AZURE_IOT_RESOURCE_IOT_HUB))
697     {
698         hub_client_ptr = (NX_AZURE_IOT_HUB_CLIENT *)resource -> resource_data_ptr;
699     }
700 
701     if (hub_client_ptr == NX_NULL)
702     {
703         return;
704     }
705 
706     /* Wake up all threads.  */
707     for (thread_list_ptr = hub_client_ptr -> nx_azure_iot_hub_client_thread_suspended;
708          thread_list_ptr;
709          thread_list_ptr = thread_list_ptr -> thread_next)
710     {
711         tx_thread_wait_abort(thread_list_ptr -> thread_ptr);
712     }
713 
714     /* Do not call callback if not connected, as at our layer connected means : mqtt connect + subscribe messages topic.  */
715     if (hub_client_ptr -> nx_azure_iot_hub_client_state == NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTED)
716     {
717         hub_client_ptr -> nx_azure_iot_hub_client_state = NX_AZURE_IOT_HUB_CLIENT_STATUS_NOT_CONNECTED;
718 
719         /* Call connection notify if it is set.  */
720         if (hub_client_ptr && hub_client_ptr -> nx_azure_iot_hub_client_connection_status_callback)
721         {
722             if (hub_client_ptr -> nx_azure_iot_hub_client_token_refresh &&
723                 (nx_azure_iot_unix_time_get(hub_client_ptr -> nx_azure_iot_ptr,
724                                             &current_time) == NX_AZURE_IOT_SUCCESS) &&
725                 (current_time > hub_client_ptr -> nx_azure_iot_hub_client_sas_token_expiry_time))
726             {
727                 hub_client_ptr -> nx_azure_iot_hub_client_connection_status_callback(hub_client_ptr,
728                                                                                      NX_AZURE_IOT_SAS_TOKEN_EXPIRED);
729             }
730             else
731             {
732                 hub_client_ptr -> nx_azure_iot_hub_client_connection_status_callback(hub_client_ptr,
733                                                                                      NX_AZURE_IOT_DISCONNECTED);
734             }
735 
736         }
737     }
738 }
739 
nx_azure_iot_hub_client_disconnect(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)740 UINT nx_azure_iot_hub_client_disconnect(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
741 {
742 UINT status;
743 NX_AZURE_IOT_THREAD *thread_list_ptr;
744 
745 
746     /* Check for invalid input pointers.  */
747     if ((hub_client_ptr == NX_NULL) || (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL))
748     {
749         LogError(LogLiteralArgs("IoTHub client disconnect fail: INVALID POINTER"));
750         return(NX_AZURE_IOT_INVALID_PARAMETER);
751     }
752 
753     /* Disconnect.  */
754     status = nxd_mqtt_client_disconnect(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt));
755     if (status)
756     {
757         LogError(LogLiteralArgs("IoTHub client disconnect fail status: %d"), status);
758         return(status);
759     }
760 
761     /* Obtain the mutex.  */
762     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
763 
764     /* Release the mqtt connection resource.  */
765     if (hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt_buffer_context)
766     {
767         nx_azure_iot_buffer_free(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt_buffer_context);
768         hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt_buffer_context = NX_NULL;
769     }
770 
771     /* Wakeup all suspend threads.  */
772     for (thread_list_ptr = hub_client_ptr -> nx_azure_iot_hub_client_thread_suspended;
773          thread_list_ptr;
774          thread_list_ptr = thread_list_ptr -> thread_next)
775     {
776         tx_thread_wait_abort(thread_list_ptr -> thread_ptr);
777     }
778 
779     /* Cleanup received messages.  */
780     nx_azure_iot_hub_client_received_message_cleanup(&(hub_client_ptr -> nx_azure_iot_hub_client_c2d_message));
781     nx_azure_iot_hub_client_received_message_cleanup(&(hub_client_ptr -> nx_azure_iot_hub_client_properties_message));
782     nx_azure_iot_hub_client_received_message_cleanup(&(hub_client_ptr -> nx_azure_iot_hub_client_writable_properties_message));
783     nx_azure_iot_hub_client_received_message_cleanup(&(hub_client_ptr -> nx_azure_iot_hub_client_command_message));
784 
785     hub_client_ptr -> nx_azure_iot_hub_client_state = NX_AZURE_IOT_HUB_CLIENT_STATUS_NOT_CONNECTED;
786 
787     /* Release the mutex.  */
788     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
789 
790     return(NX_AZURE_IOT_SUCCESS);
791 }
792 
nx_azure_iot_hub_client_received_message_cleanup(NX_AZURE_IOT_HUB_CLIENT_RECEIVE_MESSAGE * message)793 static VOID nx_azure_iot_hub_client_received_message_cleanup(NX_AZURE_IOT_HUB_CLIENT_RECEIVE_MESSAGE *message)
794 {
795 NX_PACKET *current_ptr;
796 NX_PACKET *next_ptr;
797 
798     for (current_ptr = message -> message_head; current_ptr; current_ptr = next_ptr)
799     {
800 
801         /* Get next packet in queue.  */
802         next_ptr = current_ptr -> nx_packet_queue_next;
803 
804         /* Release current packet.  */
805         current_ptr -> nx_packet_queue_next = NX_NULL;
806         nx_packet_release(current_ptr);
807     }
808 
809     /* Reset received messages.  */
810     message -> message_head = NX_NULL;
811     message -> message_tail = NX_NULL;
812 }
813 
nx_azure_iot_hub_client_messages_enable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)814 static UINT nx_azure_iot_hub_client_messages_enable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
815 {
816 UINT status = NX_AZURE_IOT_SUCCESS;
817 
818     if (status == NX_AZURE_IOT_SUCCESS &&
819         (hub_client_ptr -> nx_azure_iot_hub_client_c2d_message.message_process != NX_NULL))
820     {
821         status = hub_client_ptr -> nx_azure_iot_hub_client_c2d_message.message_enable(hub_client_ptr);
822     }
823 
824     if (status == NX_AZURE_IOT_SUCCESS &&
825         (hub_client_ptr -> nx_azure_iot_hub_client_command_message.message_process != NX_NULL))
826     {
827         status = hub_client_ptr -> nx_azure_iot_hub_client_command_message.message_enable(hub_client_ptr);
828     }
829 
830     if (status == NX_AZURE_IOT_SUCCESS &&
831         (hub_client_ptr -> nx_azure_iot_hub_client_properties_message.message_process != NX_NULL))
832     {
833         status = hub_client_ptr -> nx_azure_iot_hub_client_properties_message.message_enable(hub_client_ptr);
834     }
835 
836     return(status);
837 }
838 
nx_azure_iot_hub_client_deinitialize(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)839 UINT nx_azure_iot_hub_client_deinitialize(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
840 {
841 UINT status;
842 
843     /* Check for invalid input pointers.  */
844     if ((hub_client_ptr == NX_NULL) || (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL))
845     {
846         LogError(LogLiteralArgs("IoTHub client deinitialize fail: INVALID POINTER"));
847         return(NX_AZURE_IOT_INVALID_PARAMETER);
848     }
849 
850     nx_azure_iot_hub_client_disconnect(hub_client_ptr);
851 
852     status = nxd_mqtt_client_delete(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt));
853     if (status)
854     {
855         LogError(LogLiteralArgs("IoTHub client delete fail status: %d"), status);
856         return(status);
857     }
858 
859     /* Obtain the mutex.  */
860     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
861 
862     /* Remove resource from list.  */
863     status = nx_azure_iot_resource_remove(hub_client_ptr -> nx_azure_iot_ptr,
864                                           &(hub_client_ptr -> nx_azure_iot_hub_client_resource));
865     if (status)
866     {
867 
868         /* Release the mutex.  */
869         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
870         LogError(LogLiteralArgs("IoTHub client handle not found"));
871         return(status);
872     }
873 
874     /* Release the mutex.  */
875     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
876 
877     return(NX_AZURE_IOT_SUCCESS);
878 }
879 
nx_azure_iot_hub_client_trusted_cert_add(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_SECURE_X509_CERT * trusted_certificate)880 UINT nx_azure_iot_hub_client_trusted_cert_add(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
881                                               NX_SECURE_X509_CERT *trusted_certificate)
882 {
883 UINT i;
884 NX_AZURE_IOT_RESOURCE *resource_ptr;
885 
886     if ((hub_client_ptr == NX_NULL) ||
887         (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL) ||
888         (trusted_certificate == NX_NULL))
889     {
890         LogError(LogLiteralArgs("IoTHub device certificate set fail: INVALID POINTER"));
891         return(NX_AZURE_IOT_INVALID_PARAMETER);
892     }
893 
894     /* Obtain the mutex.  */
895     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
896 
897     resource_ptr = &(hub_client_ptr -> nx_azure_iot_hub_client_resource);
898     for (i = 0; i < NX_AZURE_IOT_ARRAY_SIZE(resource_ptr -> resource_trusted_certificates); i++)
899     {
900         if (resource_ptr -> resource_trusted_certificates[i] == NX_NULL)
901         {
902             resource_ptr -> resource_trusted_certificates[i] = trusted_certificate;
903             break;
904         }
905     }
906 
907     /* Release the mutex.  */
908     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
909 
910     if (i < NX_AZURE_IOT_ARRAY_SIZE(resource_ptr -> resource_trusted_certificates))
911     {
912         return(NX_AZURE_IOT_SUCCESS);
913     }
914     else
915     {
916 
917         /* No more space to store trusted certificate.  */
918         return(NX_AZURE_IOT_INSUFFICIENT_BUFFER_SPACE);
919     }
920 }
921 
nx_azure_iot_hub_client_device_cert_set(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_SECURE_X509_CERT * device_certificate)922 UINT nx_azure_iot_hub_client_device_cert_set(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
923                                              NX_SECURE_X509_CERT *device_certificate)
924 {
925 UINT i;
926 NX_AZURE_IOT_RESOURCE *resource_ptr;
927 
928     if ((hub_client_ptr == NX_NULL) ||
929         (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL) ||
930         (device_certificate == NX_NULL))
931     {
932         LogError(LogLiteralArgs("IoTHub device certificate set fail: INVALID POINTER"));
933         return(NX_AZURE_IOT_INVALID_PARAMETER);
934     }
935 
936     /* Obtain the mutex.  */
937     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
938 
939     resource_ptr = &(hub_client_ptr -> nx_azure_iot_hub_client_resource);
940     for (i = 0; i < NX_AZURE_IOT_ARRAY_SIZE(resource_ptr -> resource_device_certificates); i++)
941     {
942         if (resource_ptr -> resource_device_certificates[i] == NX_NULL)
943         {
944             resource_ptr -> resource_device_certificates[i] = device_certificate;
945             break;
946         }
947     }
948 
949     /* Release the mutex.  */
950     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
951 
952     if (i < NX_AZURE_IOT_ARRAY_SIZE(resource_ptr -> resource_device_certificates))
953     {
954         return(NX_AZURE_IOT_SUCCESS);
955     }
956     else
957     {
958 
959         /* No more space to store device certificate.  */
960         return(NX_AZURE_IOT_INSUFFICIENT_BUFFER_SPACE);
961     }
962 }
963 
nx_azure_iot_hub_client_symmetric_key_set(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,const UCHAR * symmetric_key,UINT symmetric_key_length)964 UINT nx_azure_iot_hub_client_symmetric_key_set(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
965                                                const UCHAR *symmetric_key, UINT symmetric_key_length)
966 {
967     if ((hub_client_ptr == NX_NULL)  || (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL) ||
968         (symmetric_key == NX_NULL) || (symmetric_key_length == 0))
969     {
970         LogError(LogLiteralArgs("IoTHub client symmetric key fail: Invalid argument"));
971         return(NX_AZURE_IOT_INVALID_PARAMETER);
972     }
973 
974     /* Obtain the mutex.  */
975     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
976 
977     hub_client_ptr -> nx_azure_iot_hub_client_symmetric_key = symmetric_key;
978     hub_client_ptr -> nx_azure_iot_hub_client_symmetric_key_length = symmetric_key_length;
979 
980     hub_client_ptr -> nx_azure_iot_hub_client_token_refresh = nx_azure_iot_hub_client_sas_token_get;
981 
982     /* Release the mutex.  */
983     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
984 
985     return(NX_AZURE_IOT_SUCCESS);
986 }
987 
nx_azure_iot_hub_client_model_id_set(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,const UCHAR * model_id_ptr,UINT model_id_length)988 UINT nx_azure_iot_hub_client_model_id_set(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
989                                           const UCHAR *model_id_ptr, UINT model_id_length)
990 {
991     if ((hub_client_ptr == NX_NULL)  || (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL) ||
992         (model_id_ptr == NX_NULL) || (model_id_length == 0))
993     {
994         LogError(LogLiteralArgs("IoTHub client model Id fail: Invalid argument"));
995         return(NX_AZURE_IOT_INVALID_PARAMETER);
996     }
997 
998     /* Obtain the mutex.  */
999     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1000 
1001     /* Had no way to update option, so had to access the internal fields of iot_hub_client_core.  */
1002     hub_client_ptr -> iot_hub_client_core._internal.options.model_id =
1003         az_span_create((UCHAR *)model_id_ptr, (INT)model_id_length);
1004 
1005     /* Release the mutex.  */
1006     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1007 
1008     return(NX_AZURE_IOT_SUCCESS);
1009 }
1010 
nx_azure_iot_hub_client_component_add(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,const UCHAR * component_name_ptr,USHORT component_name_length)1011 UINT nx_azure_iot_hub_client_component_add(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1012                                            const UCHAR *component_name_ptr,
1013                                            USHORT component_name_length)
1014 {
1015     return(nx_azure_iot_hub_client_component_add_internal(hub_client_ptr, component_name_ptr, component_name_length, NX_NULL, NX_NULL));
1016 }
1017 
nx_azure_iot_hub_client_component_add_internal(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,const UCHAR * component_name_ptr,USHORT component_name_length,UINT (* callback_ptr)(VOID * json_reader_ptr,ULONG version,VOID * args),VOID * callback_args)1018 UINT nx_azure_iot_hub_client_component_add_internal(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1019                                                     const UCHAR *component_name_ptr,
1020                                                     USHORT component_name_length,
1021                                                     UINT (*callback_ptr)(VOID *json_reader_ptr,
1022                                                                          ULONG version,
1023                                                                          VOID *args),
1024                                                     VOID *callback_args)
1025 {
1026 UINT length_of_componet_list;
1027 
1028     if ((hub_client_ptr == NX_NULL) ||
1029         (component_name_ptr == NX_NULL) ||
1030         (component_name_length == NX_NULL))
1031     {
1032         LogError(LogLiteralArgs("IoT Hub add component fail: INVALID POINTER"));
1033         return(NX_AZURE_IOT_INVALID_PARAMETER);
1034     }
1035 
1036     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1037 
1038     length_of_componet_list =
1039         (UINT)hub_client_ptr -> iot_hub_client_core._internal.options.component_names_length;
1040 
1041     if (length_of_componet_list >= NX_AZURE_IOT_HUB_CLIENT_MAX_COMPONENT_LIST)
1042     {
1043         LogError(LogLiteralArgs("IoT Hub fail due to buffer insufficient"));
1044         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1045         return(NX_AZURE_IOT_INSUFFICIENT_BUFFER_SPACE);
1046     }
1047 
1048     /* Using internal fields for faster update */
1049     hub_client_ptr -> nx_azure_iot_hub_client_component_list[length_of_componet_list] =
1050         az_span_create((UCHAR *)component_name_ptr, (INT)component_name_length);
1051     hub_client_ptr -> iot_hub_client_core._internal.options.component_names_length++;
1052     hub_client_ptr -> nx_azure_iot_hub_client_component_callback[length_of_componet_list] = callback_ptr;
1053     hub_client_ptr -> nx_azure_iot_hub_client_component_callback_args[length_of_componet_list] = callback_args;
1054 
1055     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1056 
1057     return(NX_AZURE_IOT_SUCCESS);
1058 }
1059 
nx_azure_iot_hub_client_telemetry_message_create(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET ** packet_pptr,UINT wait_option)1060 UINT nx_azure_iot_hub_client_telemetry_message_create(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1061                                                       NX_PACKET **packet_pptr, UINT wait_option)
1062 {
1063 NX_PACKET *packet_ptr;
1064 UINT topic_length;
1065 UINT status;
1066 az_result core_result;
1067 
1068     if ((hub_client_ptr == NX_NULL) ||
1069         (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL) ||
1070         (packet_pptr == NX_NULL))
1071     {
1072         LogError(LogLiteralArgs("IoTHub telemetry message create fail: INVALID POINTER"));
1073         return(NX_AZURE_IOT_INVALID_PARAMETER);
1074     }
1075 
1076     status = nx_azure_iot_publish_packet_get(hub_client_ptr -> nx_azure_iot_ptr,
1077                                              &(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
1078                                              &packet_ptr, wait_option);
1079     if (status)
1080     {
1081         LogError(LogLiteralArgs("Create telemetry data fail"));
1082         return(status);
1083     }
1084 
1085     topic_length = (UINT)(packet_ptr -> nx_packet_data_end - packet_ptr -> nx_packet_prepend_ptr);
1086     core_result = az_iot_hub_client_telemetry_get_publish_topic(&(hub_client_ptr -> iot_hub_client_core),
1087                                                                 NULL, (CHAR *)packet_ptr -> nx_packet_prepend_ptr,
1088                                                                 topic_length, (size_t *)&topic_length);
1089     if (az_result_failed(core_result))
1090     {
1091         LogError(LogLiteralArgs("IoTHub client telemetry message create fail with error status: %d"), core_result);
1092         nx_packet_release(packet_ptr);
1093         return(NX_AZURE_IOT_SDK_CORE_ERROR);
1094     }
1095 
1096     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + topic_length;
1097     packet_ptr -> nx_packet_length = topic_length;
1098     *packet_pptr = packet_ptr;
1099     return(NX_AZURE_IOT_SUCCESS);
1100 }
1101 
nx_azure_iot_hub_client_telemetry_message_delete(NX_PACKET * packet_ptr)1102 UINT nx_azure_iot_hub_client_telemetry_message_delete(NX_PACKET *packet_ptr)
1103 {
1104     return(nx_packet_release(packet_ptr));
1105 }
1106 
nx_azure_iot_hub_client_telemetry_component_set(NX_PACKET * packet_ptr,const UCHAR * component_name_ptr,USHORT component_name_length,UINT wait_option)1107 UINT nx_azure_iot_hub_client_telemetry_component_set(NX_PACKET *packet_ptr,
1108                                                      const UCHAR *component_name_ptr,
1109                                                      USHORT component_name_length,
1110                                                      UINT wait_option)
1111 {
1112 UINT status;
1113 
1114     if ((packet_ptr == NX_NULL) ||
1115         (component_name_ptr == NX_NULL) ||
1116         (component_name_length == 0))
1117     {
1118         LogError(LogLiteralArgs("IoTHub telemetry component set fail: INVALID POINTER"));
1119         return(NX_AZURE_IOT_INVALID_PARAMETER);
1120     }
1121 
1122     status = nx_azure_iot_hub_client_telemetry_property_add(packet_ptr,
1123                                                             (const UCHAR*)NX_AZURE_IOT_HUB_CLIENT_COMPONENT_STRING,
1124                                                             sizeof(NX_AZURE_IOT_HUB_CLIENT_COMPONENT_STRING) - 1,
1125                                                             component_name_ptr, component_name_length,
1126                                                             wait_option);
1127     if (status)
1128     {
1129         LogError(LogLiteralArgs("Telemetry component append fail: error status: %d"), status);
1130         return(status);
1131     }
1132 
1133     return(NX_AZURE_IOT_SUCCESS);
1134 }
1135 
nx_azure_iot_hub_client_telemetry_property_add(NX_PACKET * packet_ptr,const UCHAR * property_name,USHORT property_name_length,const UCHAR * property_value,USHORT property_value_length,UINT wait_option)1136 UINT nx_azure_iot_hub_client_telemetry_property_add(NX_PACKET *packet_ptr,
1137                                                     const UCHAR *property_name, USHORT property_name_length,
1138                                                     const UCHAR *property_value, USHORT property_value_length,
1139                                                     UINT wait_option)
1140 {
1141 UINT status;
1142 
1143     if ((packet_ptr == NX_NULL) ||
1144         (property_name == NX_NULL) ||
1145         (property_value == NX_NULL))
1146     {
1147         LogError(LogLiteralArgs("IoTHub telemetry property add fail: INVALID POINTER"));
1148         return(NX_AZURE_IOT_INVALID_PARAMETER);
1149     }
1150 
1151     if (*(packet_ptr -> nx_packet_append_ptr - 1) != '/')
1152     {
1153         status = nx_packet_data_append(packet_ptr, "&", 1,
1154                                        packet_ptr -> nx_packet_pool_owner,
1155                                        wait_option);
1156         if (status)
1157         {
1158             LogError(LogLiteralArgs("Telemetry data append fail"));
1159             return(status);
1160         }
1161     }
1162 
1163     status = nx_packet_data_append(packet_ptr, (VOID *)property_name, (UINT)property_name_length,
1164                                    packet_ptr -> nx_packet_pool_owner, wait_option);
1165     if (status)
1166     {
1167         LogError(LogLiteralArgs("Telemetry data append fail"));
1168         return(status);
1169     }
1170 
1171     status = nx_packet_data_append(packet_ptr, "=", 1,
1172                                    packet_ptr -> nx_packet_pool_owner,
1173                                    wait_option);
1174     if (status)
1175     {
1176         LogError(LogLiteralArgs("Telemetry data append fail"));
1177         return(status);
1178     }
1179 
1180     status = nx_packet_data_append(packet_ptr, (VOID *)property_value, (UINT)property_value_length,
1181                                    packet_ptr -> nx_packet_pool_owner, wait_option);
1182     if (status)
1183     {
1184         LogError(LogLiteralArgs("Telemetry data append fail"));
1185         return(status);
1186     }
1187 
1188     return(NX_AZURE_IOT_SUCCESS);
1189 }
1190 
nx_azure_iot_hub_client_telemetry_send(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET * packet_ptr,const UCHAR * telemetry_data,UINT data_size,UINT wait_option)1191 UINT nx_azure_iot_hub_client_telemetry_send(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1192                                             NX_PACKET *packet_ptr, const UCHAR *telemetry_data,
1193                                             UINT data_size, UINT wait_option)
1194 {
1195 UINT status;
1196 
1197     status = nx_azure_iot_hub_client_telemetry_send_extended(hub_client_ptr, packet_ptr, telemetry_data, data_size,
1198                                                              NX_NULL, wait_option);
1199 
1200     return(status);
1201 }
1202 
nx_azure_iot_hub_client_telemetry_send_extended(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET * packet_ptr,const UCHAR * telemetry_data,UINT data_size,USHORT * packet_id_ptr,UINT wait_option)1203 UINT nx_azure_iot_hub_client_telemetry_send_extended(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1204                                                     NX_PACKET *packet_ptr, const UCHAR *telemetry_data,
1205                                                     UINT data_size, USHORT *packet_id_ptr, UINT wait_option)
1206 {
1207 UINT status;
1208 UINT topic_len;
1209 UCHAR packet_id[2];
1210 
1211     if ((hub_client_ptr == NX_NULL) || (packet_ptr == NX_NULL))
1212     {
1213         LogError(LogLiteralArgs("IoTHub telemetry send fail: INVALID POINTER"));
1214         return(NX_AZURE_IOT_INVALID_PARAMETER);
1215     }
1216 
1217     topic_len = packet_ptr -> nx_packet_length;
1218 
1219     status = nx_azure_iot_mqtt_packet_id_get(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
1220                                              packet_id);
1221     if (status)
1222     {
1223         LogError(LogLiteralArgs("Failed to get packet id"));
1224         return(status);
1225     }
1226 
1227     /* Append packet identifier.  */
1228     status = nx_packet_data_append(packet_ptr, packet_id, sizeof(packet_id),
1229                                    packet_ptr -> nx_packet_pool_owner,
1230                                    wait_option);
1231     if (status)
1232     {
1233         LogError(LogLiteralArgs("Telemetry append fail"));
1234         return(status);
1235     }
1236 
1237     if (telemetry_data && (data_size != 0))
1238     {
1239 
1240         /* Append payload.  */
1241         status = nx_packet_data_append(packet_ptr, (VOID *)telemetry_data, data_size,
1242                                        packet_ptr -> nx_packet_pool_owner,
1243                                        wait_option);
1244         if (status)
1245         {
1246             LogError(LogLiteralArgs("Telemetry data append fail"));
1247             return(status);
1248         }
1249     }
1250 
1251     status = nx_azure_iot_publish_mqtt_packet(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
1252                                               packet_ptr, topic_len, packet_id, NX_AZURE_IOT_HUB_CLIENT_TELEMETRY_QOS,
1253                                               wait_option);
1254     if (status)
1255     {
1256         LogError(LogLiteralArgs("IoTHub client send fail: PUBLISH FAIL status: %d"), status);
1257         return(status);
1258     }
1259 
1260     if (packet_id_ptr)
1261     {
1262         *packet_id_ptr = (USHORT)((packet_id[0] << 8) | packet_id[1]);
1263     }
1264 
1265     return(NX_AZURE_IOT_SUCCESS);
1266 }
1267 
nx_azure_iot_hub_client_telemetry_ack_callback_set(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,VOID (* callback_ptr)(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,USHORT packet_id))1268 UINT nx_azure_iot_hub_client_telemetry_ack_callback_set(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1269                                                         VOID (*callback_ptr)(
1270                                                               NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1271                                                               USHORT packet_id))
1272 {
1273 NXD_MQTT_CLIENT *client_ptr;
1274 
1275     if ((hub_client_ptr == NX_NULL) || (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL))
1276     {
1277         LogError(LogLiteralArgs("IoTHub telemetry callback set fail: INVALID POINTER"));
1278         return(NX_AZURE_IOT_INVALID_PARAMETER);
1279     }
1280 
1281     /* Obtain the mutex.  */
1282     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1283 
1284     /* Set the callback function.  */
1285     hub_client_ptr -> nx_azure_iot_hub_client_telemetry_ack_callback = callback_ptr;
1286 
1287     /* Check the callback pointer.  */
1288     if (callback_ptr)
1289     {
1290 
1291         /* Set mqtt ack receive notify for telemetry.  */
1292         client_ptr = &(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt);
1293         client_ptr -> nxd_mqtt_ack_receive_notify = nx_azure_iot_hub_client_mqtt_ack_receive_notify;
1294         client_ptr -> nxd_mqtt_ack_receive_context = hub_client_ptr;
1295     }
1296 
1297     /* Release the mutex.  */
1298     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1299 
1300     return(NX_AZURE_IOT_SUCCESS);
1301 }
1302 
nx_azure_iot_hub_client_receive_callback_set(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT message_type,VOID (* callback_ptr)(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,VOID * args),VOID * callback_args)1303 UINT nx_azure_iot_hub_client_receive_callback_set(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1304                                                   UINT message_type,
1305                                                   VOID (*callback_ptr)(
1306                                                         NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1307                                                         VOID *args),
1308                                                   VOID *callback_args)
1309 {
1310     if ((hub_client_ptr == NX_NULL) || (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL))
1311     {
1312         LogError(LogLiteralArgs("IoTHub receive callback set fail: INVALID POINTER"));
1313         return(NX_AZURE_IOT_INVALID_PARAMETER);
1314     }
1315 
1316     /* Obtain the mutex.  */
1317     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1318 
1319     if (message_type == NX_AZURE_IOT_HUB_CLOUD_TO_DEVICE_MESSAGE)
1320     {
1321         hub_client_ptr -> nx_azure_iot_hub_client_c2d_message.message_callback = callback_ptr;
1322         hub_client_ptr -> nx_azure_iot_hub_client_c2d_message.message_callback_args = callback_args;
1323     }
1324     else if (message_type == NX_AZURE_IOT_HUB_PROPERTIES)
1325     {
1326         hub_client_ptr -> nx_azure_iot_hub_client_properties_message.message_callback = callback_ptr;
1327         hub_client_ptr -> nx_azure_iot_hub_client_properties_message.message_callback_args = callback_args;
1328     }
1329     else if (message_type == NX_AZURE_IOT_HUB_WRITABLE_PROPERTIES)
1330     {
1331         hub_client_ptr -> nx_azure_iot_hub_client_writable_properties_message.message_callback = callback_ptr;
1332         hub_client_ptr -> nx_azure_iot_hub_client_writable_properties_message.message_callback_args = callback_args;
1333     }
1334     else if (message_type == NX_AZURE_IOT_HUB_COMMAND)
1335     {
1336         hub_client_ptr -> nx_azure_iot_hub_client_command_message.message_callback = callback_ptr;
1337         hub_client_ptr -> nx_azure_iot_hub_client_command_message.message_callback_args = callback_args;
1338     }
1339     else
1340     {
1341 
1342         /* Release the mutex.  */
1343         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1344         return(NX_AZURE_IOT_NOT_SUPPORTED);
1345     }
1346 
1347     /* Release the mutex.  */
1348     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1349 
1350     return(NX_AZURE_IOT_SUCCESS);
1351 }
1352 
nx_azure_iot_hub_client_cloud_message_enable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)1353 UINT nx_azure_iot_hub_client_cloud_message_enable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
1354 {
1355 
1356     if (hub_client_ptr == NX_NULL)
1357     {
1358         LogError(LogLiteralArgs("IoTHub cloud message subscribe fail: INVALID POINTER"));
1359         return(NX_AZURE_IOT_INVALID_PARAMETER);
1360     }
1361 
1362     return(nx_azure_iot_hub_client_cloud_message_sub_unsub(hub_client_ptr, NX_TRUE));
1363 }
1364 
nx_azure_iot_hub_client_cloud_message_disable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)1365 UINT nx_azure_iot_hub_client_cloud_message_disable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
1366 {
1367     if (hub_client_ptr == NX_NULL)
1368     {
1369         LogError(LogLiteralArgs("IoTHub cloud message unsubscribe fail: INVALID POINTER"));
1370         return(NX_AZURE_IOT_INVALID_PARAMETER);
1371     }
1372 
1373     return(nx_azure_iot_hub_client_cloud_message_sub_unsub(hub_client_ptr, NX_FALSE));
1374 }
1375 
nx_azure_iot_hub_client_process_publish_packet(UCHAR * start_ptr,ULONG * topic_offset_ptr,USHORT * topic_length_ptr)1376 static UINT nx_azure_iot_hub_client_process_publish_packet(UCHAR *start_ptr,
1377                                                            ULONG *topic_offset_ptr,
1378                                                            USHORT *topic_length_ptr)
1379 {
1380 UCHAR *byte = start_ptr;
1381 UINT byte_count = 0;
1382 UINT multiplier = 1;
1383 UINT remaining_length = 0;
1384 UINT topic_length;
1385 
1386     /* Validate packet start contains fixed header.  */
1387     do
1388     {
1389         if (byte_count >= 4)
1390         {
1391             LogError(LogLiteralArgs("Invalid mqtt packet start position"));
1392             return(NX_AZURE_IOT_INVALID_PACKET);
1393         }
1394 
1395         byte++;
1396         remaining_length += (((*byte) & 0x7F) * multiplier);
1397         multiplier = multiplier << 7;
1398         byte_count++;
1399     } while ((*byte) & 0x80);
1400 
1401     if (remaining_length < 2)
1402     {
1403         return(NX_AZURE_IOT_INVALID_PACKET);
1404     }
1405 
1406     /* Retrieve topic length.  */
1407     byte++;
1408     topic_length = (UINT)(*(byte) << 8) | (*(byte + 1));
1409 
1410     if (topic_length > remaining_length - 2u)
1411     {
1412         return(NX_AZURE_IOT_INVALID_PACKET);
1413     }
1414 
1415     *topic_offset_ptr = (ULONG)((byte + 2) - start_ptr);
1416     *topic_length_ptr = (USHORT)topic_length;
1417 
1418     /* Return.  */
1419     return(NX_AZURE_IOT_SUCCESS);
1420 }
1421 
nx_azure_iot_hub_client_message_receive(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT message_type,NX_AZURE_IOT_HUB_CLIENT_RECEIVE_MESSAGE * receive_message,NX_PACKET ** packet_pptr,UINT wait_option)1422 static UINT nx_azure_iot_hub_client_message_receive(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr, UINT message_type,
1423                                                     NX_AZURE_IOT_HUB_CLIENT_RECEIVE_MESSAGE *receive_message,
1424                                                     NX_PACKET **packet_pptr, UINT wait_option)
1425 {
1426 NX_PACKET *packet_ptr = NX_NULL;
1427 UINT old_threshold;
1428 NX_AZURE_IOT_THREAD thread_list;
1429 
1430     if ((hub_client_ptr == NX_NULL) ||
1431         (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL) ||
1432         (packet_pptr == NX_NULL))
1433     {
1434         LogError(LogLiteralArgs("IoTHub message receive fail: INVALID POINTER"));
1435         return(NX_AZURE_IOT_INVALID_PARAMETER);
1436     }
1437 
1438     if (receive_message -> message_process == NX_NULL)
1439     {
1440         LogError(LogLiteralArgs("IoTHub message receive fail: NOT ENABLED"));
1441         return(NX_AZURE_IOT_NOT_ENABLED);
1442     }
1443 
1444     /* Obtain the mutex.  */
1445     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1446 
1447     if (receive_message -> message_head)
1448     {
1449         packet_ptr = receive_message -> message_head;
1450         if (receive_message -> message_tail == packet_ptr)
1451         {
1452             receive_message -> message_tail = NX_NULL;
1453         }
1454         receive_message -> message_head = packet_ptr -> nx_packet_queue_next;
1455     }
1456     else if (wait_option)
1457     {
1458         thread_list.thread_message_type = message_type;
1459         thread_list.thread_ptr = tx_thread_identify();
1460         thread_list.thread_received_message = NX_NULL;
1461         thread_list.thread_expected_id = 0;
1462         thread_list.thread_next = hub_client_ptr -> nx_azure_iot_hub_client_thread_suspended;
1463         hub_client_ptr -> nx_azure_iot_hub_client_thread_suspended = &thread_list;
1464 
1465         /* Disable preemption.  */
1466         tx_thread_preemption_change(tx_thread_identify(), 0, &old_threshold);
1467 
1468         /* Release the mutex.  */
1469         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1470 
1471         tx_thread_sleep(wait_option);
1472 
1473         /* Obtain the mutex.  */
1474         tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1475 
1476         nx_azure_iot_hub_client_thread_dequeue(hub_client_ptr, &thread_list);
1477 
1478         /* Restore preemption.  */
1479         tx_thread_preemption_change(tx_thread_identify(), old_threshold, &old_threshold);
1480         packet_ptr = thread_list.thread_received_message;
1481     }
1482 
1483     /* Release the mutex.  */
1484     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1485 
1486     if (packet_ptr == NX_NULL)
1487     {
1488         if (hub_client_ptr -> nx_azure_iot_hub_client_state != NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTED)
1489         {
1490             LogError(LogLiteralArgs("IoTHub message receive fail:  IoTHub client not connected"));
1491             return(NX_AZURE_IOT_DISCONNECTED);
1492         }
1493 
1494         return(NX_AZURE_IOT_NO_PACKET);
1495     }
1496 
1497     *packet_pptr = packet_ptr;
1498 
1499     return(NX_AZURE_IOT_SUCCESS);
1500 }
1501 
nx_azure_iot_hub_client_adjust_payload(NX_PACKET * packet_ptr)1502 UINT nx_azure_iot_hub_client_adjust_payload(NX_PACKET *packet_ptr)
1503 {
1504 UINT status;
1505 ULONG topic_offset;
1506 USHORT topic_length;
1507 ULONG message_offset;
1508 ULONG message_length;
1509 
1510     status = _nxd_mqtt_process_publish_packet(packet_ptr, &topic_offset,
1511                                               &topic_length, &message_offset,
1512                                               &message_length);
1513     if (status)
1514     {
1515         nx_packet_release(packet_ptr);
1516         return(status);
1517     }
1518 
1519     packet_ptr -> nx_packet_length = message_length;
1520 
1521     /* Adjust packet to pointer to message payload.  */
1522     while (packet_ptr)
1523     {
1524         if ((ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr) > message_offset)
1525         {
1526 
1527             /* This packet contains message payload.  */
1528             packet_ptr -> nx_packet_prepend_ptr = packet_ptr -> nx_packet_prepend_ptr + message_offset;
1529             break;
1530         }
1531 
1532         message_offset -= (ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr);
1533 
1534         /* Set current packet to empty.  */
1535         packet_ptr -> nx_packet_prepend_ptr = packet_ptr -> nx_packet_append_ptr;
1536 
1537         /* Move to next packet.  */
1538         packet_ptr = packet_ptr -> nx_packet_next;
1539     }
1540 
1541     return(NX_AZURE_IOT_SUCCESS);
1542 }
1543 
nx_azure_iot_hub_client_cloud_message_receive(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET ** packet_pptr,UINT wait_option)1544 UINT nx_azure_iot_hub_client_cloud_message_receive(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1545                                                    NX_PACKET **packet_pptr, UINT wait_option)
1546 {
1547 UINT status;
1548 
1549     if ((hub_client_ptr == NX_NULL) ||
1550         (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL) ||
1551         (packet_pptr == NX_NULL))
1552     {
1553         LogError(LogLiteralArgs("IoTHub cloud message receive fail: INVALID POINTER"));
1554         return(NX_AZURE_IOT_INVALID_PARAMETER);
1555     }
1556 
1557     status = nx_azure_iot_hub_client_message_receive(hub_client_ptr, NX_AZURE_IOT_HUB_CLOUD_TO_DEVICE_MESSAGE,
1558                                                      &(hub_client_ptr -> nx_azure_iot_hub_client_c2d_message),
1559                                                      packet_pptr, wait_option);
1560     if (status)
1561     {
1562         return(status);
1563     }
1564 
1565     return(nx_azure_iot_hub_client_adjust_payload(*packet_pptr));
1566 }
1567 
nx_azure_iot_hub_client_cloud_message_property_get(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET * packet_ptr,const UCHAR * property_name,USHORT property_name_length,const UCHAR ** property_value,USHORT * property_value_length)1568 UINT nx_azure_iot_hub_client_cloud_message_property_get(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1569                                                         NX_PACKET *packet_ptr, const UCHAR *property_name,
1570                                                         USHORT property_name_length, const UCHAR **property_value,
1571                                                         USHORT *property_value_length)
1572 {
1573 USHORT topic_size;
1574 UINT status;
1575 ULONG topic_offset;
1576 UCHAR *topic_name;
1577 az_iot_hub_client_c2d_request request;
1578 az_span receive_topic;
1579 az_result core_result;
1580 az_span span;
1581 
1582     if (packet_ptr == NX_NULL ||
1583         property_name == NX_NULL ||
1584         property_value == NX_NULL ||
1585         property_value_length == NX_NULL)
1586     {
1587         LogError(LogLiteralArgs("IoTHub cloud message get property fail: INVALID POINTER"));
1588         return(NX_AZURE_IOT_INVALID_PARAMETER);
1589     }
1590 
1591     status = nx_azure_iot_hub_client_process_publish_packet(packet_ptr -> nx_packet_data_start,
1592                                                             &topic_offset, &topic_size);
1593     if (status)
1594     {
1595         return(status);
1596     }
1597 
1598     topic_name = packet_ptr -> nx_packet_data_start + topic_offset;
1599 
1600     /* NOTE: Current implementation does not support topic to span multiple packets.  */
1601     if ((ULONG)(packet_ptr -> nx_packet_append_ptr - topic_name) < (ULONG)topic_size)
1602     {
1603         LogError(LogLiteralArgs("IoTHub cloud message get property fail: topic out of boundaries of single packet"));
1604         return(NX_AZURE_IOT_TOPIC_TOO_LONG);
1605     }
1606 
1607     receive_topic = az_span_create(topic_name, (INT)topic_size);
1608     core_result = az_iot_hub_client_c2d_parse_received_topic(&hub_client_ptr -> iot_hub_client_core,
1609                                                              receive_topic, &request);
1610     if (az_result_failed(core_result))
1611     {
1612         LogError(LogLiteralArgs("IoTHub cloud message get property fail: parsing error"));
1613         return(NX_AZURE_IOT_SDK_CORE_ERROR);
1614     }
1615 
1616     span = az_span_create((UCHAR *)property_name, property_name_length);
1617     core_result = az_iot_message_properties_find(&request.properties, span, &span);
1618     if (az_result_failed(core_result))
1619     {
1620         if (core_result == AZ_ERROR_ITEM_NOT_FOUND)
1621         {
1622             status = NX_AZURE_IOT_NOT_FOUND;
1623         }
1624         else
1625         {
1626             LogError(LogLiteralArgs("IoTHub cloud message get property fail: property find"));
1627             status = NX_AZURE_IOT_SDK_CORE_ERROR;
1628         }
1629 
1630         return(status);
1631     }
1632 
1633     *property_value = (UCHAR *)az_span_ptr(span);
1634     *property_value_length = (USHORT)az_span_size(span);
1635 
1636     return(NX_AZURE_IOT_SUCCESS);
1637 }
1638 
nx_azure_iot_hub_client_mqtt_ack_receive_notify(NXD_MQTT_CLIENT * client_ptr,UINT type,USHORT packet_id,NX_PACKET * transmit_packet_ptr,VOID * context)1639 static VOID nx_azure_iot_hub_client_mqtt_ack_receive_notify(NXD_MQTT_CLIENT *client_ptr, UINT type,
1640                                                             USHORT packet_id, NX_PACKET *transmit_packet_ptr,
1641                                                             VOID *context)
1642 {
1643 
1644 NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr = (NX_AZURE_IOT_HUB_CLIENT *)context;
1645 UCHAR buffer[sizeof(AZ_IOT_HUB_CLIENT_TWIN_RESPONSE_SUBSCRIBE_TOPIC) - 1];
1646 ULONG bytes_copied;
1647 
1648 
1649     NX_PARAMETER_NOT_USED(client_ptr);
1650     NX_PARAMETER_NOT_USED(context);
1651 
1652     /* Monitor subscribe ack.  */
1653     if (type == MQTT_CONTROL_PACKET_TYPE_SUBACK)
1654     {
1655 
1656         /* Get the topic.  */
1657         if (nx_packet_data_extract_offset(transmit_packet_ptr,
1658                                           NX_AZURE_IOT_MQTT_SUBSCRIBE_TOPIC_OFFSET,
1659                                           buffer, sizeof(buffer), &bytes_copied))
1660         {
1661             return;
1662         }
1663 
1664         /* Compare the topic.  */
1665         if ((bytes_copied == sizeof(AZ_IOT_HUB_CLIENT_TWIN_RESPONSE_SUBSCRIBE_TOPIC) - 1) &&
1666             (!memcmp(AZ_IOT_HUB_CLIENT_TWIN_RESPONSE_SUBSCRIBE_TOPIC,
1667                      buffer, sizeof(AZ_IOT_HUB_CLIENT_TWIN_RESPONSE_SUBSCRIBE_TOPIC) - 1)))
1668         {
1669             hub_client_ptr -> nx_azure_iot_hub_client_properties_subscribe_ack = NX_TRUE;
1670         }
1671     }
1672     else if (type == MQTT_CONTROL_PACKET_TYPE_PUBACK)
1673     {
1674 
1675         /* Get the topic.  */
1676         if (nx_packet_data_extract_offset(transmit_packet_ptr,
1677                                           NX_AZURE_IOT_PUBLISH_PACKET_START_OFFSET,
1678                                           buffer, sizeof(buffer), &bytes_copied))
1679         {
1680             return;
1681         }
1682 
1683         /* Check if it is telemetry message. devices/{device-id}/messages/events/.  */
1684         if ((bytes_copied >= sizeof("devices/") - 1) &&
1685             (!memcmp(buffer, "devices/", sizeof("devices/") - 1)))
1686         {
1687             if (hub_client_ptr -> nx_azure_iot_hub_client_telemetry_ack_callback)
1688             {
1689                 hub_client_ptr -> nx_azure_iot_hub_client_telemetry_ack_callback(hub_client_ptr, packet_id);
1690             }
1691         }
1692     }
1693 }
1694 
nx_azure_iot_hub_client_properties_enable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)1695 UINT nx_azure_iot_hub_client_properties_enable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
1696 {
1697 UINT status;
1698 NXD_MQTT_CLIENT *client_ptr;
1699 
1700     if (hub_client_ptr == NX_NULL)
1701     {
1702         LogError(LogLiteralArgs("IoTHub client properties subscribe fail: INVALID POINTER"));
1703         return(NX_AZURE_IOT_INVALID_PARAMETER);
1704     }
1705 
1706     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1707 
1708     /* Atomically update the handler as we need to serialize the handler with incoming messages.  */
1709     hub_client_ptr -> nx_azure_iot_hub_client_properties_message.message_process =
1710                       nx_azure_iot_hub_client_properties_process;
1711     hub_client_ptr -> nx_azure_iot_hub_client_writable_properties_message.message_process =
1712                       nx_azure_iot_hub_client_properties_process;
1713     hub_client_ptr -> nx_azure_iot_hub_client_properties_message.message_enable =
1714                       nx_azure_iot_hub_client_properties_enable;
1715     hub_client_ptr -> nx_azure_iot_hub_client_writable_properties_message.message_enable =
1716                       nx_azure_iot_hub_client_properties_enable;
1717 
1718     /* Register callbacks even if not connect and when connect complete subscribe for topics.  */
1719     if (hub_client_ptr -> nx_azure_iot_hub_client_state != NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTED)
1720     {
1721         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1722         return(NX_AZURE_IOT_SUCCESS);
1723     }
1724 
1725     /* Initialize variables.  */
1726     hub_client_ptr -> nx_azure_iot_hub_client_properties_subscribe_ack = NX_FALSE;
1727 
1728     /* Set ack receive notify for subscribe ack.  */
1729     client_ptr = &(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt);
1730     client_ptr -> nxd_mqtt_ack_receive_notify = nx_azure_iot_hub_client_mqtt_ack_receive_notify;
1731     client_ptr -> nxd_mqtt_ack_receive_context = hub_client_ptr;
1732 
1733     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1734 
1735     status = nxd_mqtt_client_subscribe(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
1736                                        AZ_IOT_HUB_CLIENT_TWIN_RESPONSE_SUBSCRIBE_TOPIC,
1737                                        sizeof(AZ_IOT_HUB_CLIENT_TWIN_RESPONSE_SUBSCRIBE_TOPIC) - 1,
1738                                        NX_AZURE_IOT_MQTT_QOS_0);
1739     if (status)
1740     {
1741         tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1742 
1743         /* Clean ack receive notify if telemetry callback is not set.  */
1744         if (hub_client_ptr -> nx_azure_iot_hub_client_telemetry_ack_callback == NX_NULL)
1745         {
1746             client_ptr -> nxd_mqtt_ack_receive_notify = NX_NULL;
1747         }
1748 
1749         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1750         LogError(LogLiteralArgs("IoTHub client device twin subscribe fail status: %d"), status);
1751         return(status);
1752     }
1753 
1754     status = nxd_mqtt_client_subscribe(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
1755                                        AZ_IOT_HUB_CLIENT_TWIN_PATCH_SUBSCRIBE_TOPIC,
1756                                        sizeof(AZ_IOT_HUB_CLIENT_TWIN_PATCH_SUBSCRIBE_TOPIC) - 1,
1757                                        NX_AZURE_IOT_MQTT_QOS_0);
1758     if (status)
1759     {
1760         tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1761 
1762         /* Clean ack receive notify if telemetry callback is not set.  */
1763         if (hub_client_ptr -> nx_azure_iot_hub_client_telemetry_ack_callback == NX_NULL)
1764         {
1765             client_ptr -> nxd_mqtt_ack_receive_notify = NX_NULL;
1766         }
1767 
1768         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1769         LogError(LogLiteralArgs("IoTHub client device twin subscribe fail status: %d"), status);
1770         return(status);
1771     }
1772 
1773     return(NX_AZURE_IOT_SUCCESS);
1774 }
1775 
nx_azure_iot_hub_client_properties_disable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)1776 UINT nx_azure_iot_hub_client_properties_disable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
1777 {
1778 UINT status;
1779 
1780     if (hub_client_ptr == NX_NULL)
1781     {
1782         LogError(LogLiteralArgs("IoTHub client properties unsubscribe fail: INVALID POINTER"));
1783         return(NX_AZURE_IOT_INVALID_PARAMETER);
1784     }
1785 
1786     status = nxd_mqtt_client_unsubscribe(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
1787                                          AZ_IOT_HUB_CLIENT_TWIN_RESPONSE_SUBSCRIBE_TOPIC,
1788                                          sizeof(AZ_IOT_HUB_CLIENT_TWIN_RESPONSE_SUBSCRIBE_TOPIC) - 1);
1789     if (status)
1790     {
1791         LogError(LogLiteralArgs("IoTHub client device twin unsubscribe fail status: %d"), status);
1792         return(status);
1793     }
1794 
1795     status = nxd_mqtt_client_unsubscribe(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
1796                                          AZ_IOT_HUB_CLIENT_TWIN_PATCH_SUBSCRIBE_TOPIC,
1797                                          sizeof(AZ_IOT_HUB_CLIENT_TWIN_PATCH_SUBSCRIBE_TOPIC) - 1);
1798     if (status)
1799     {
1800         LogError(LogLiteralArgs("IoTHub client device twin unsubscribe fail status: %d"), status);
1801         return(status);
1802     }
1803 
1804     hub_client_ptr -> nx_azure_iot_hub_client_properties_message.message_process = NX_NULL;
1805     hub_client_ptr -> nx_azure_iot_hub_client_writable_properties_message.message_process = NX_NULL;
1806 
1807     return(NX_AZURE_IOT_SUCCESS);
1808 }
1809 
nx_azure_iot_hub_client_reported_properties_response_callback_set(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,VOID (* callback_ptr)(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT request_id,UINT response_status,ULONG version,VOID * args),VOID * callback_args)1810 UINT nx_azure_iot_hub_client_reported_properties_response_callback_set(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1811                                                                        VOID (*callback_ptr)(
1812                                                                              NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1813                                                                              UINT request_id,
1814                                                                              UINT response_status,
1815                                                                              ULONG version,
1816                                                                              VOID *args),
1817                                                                        VOID *callback_args)
1818 {
1819     if (hub_client_ptr == NX_NULL)
1820     {
1821         LogError(LogLiteralArgs("IoTHub client device twin set callback fail: INVALID POINTER"));
1822         return(NX_AZURE_IOT_INVALID_PARAMETER);
1823     }
1824 
1825     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1826 
1827     hub_client_ptr -> nx_azure_iot_hub_client_report_properties_response_callback = callback_ptr;
1828     hub_client_ptr -> nx_azure_iot_hub_client_report_properties_response_callback_args = callback_args;
1829 
1830     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1831 
1832     return(NX_AZURE_IOT_SUCCESS);
1833 }
1834 
nx_azure_iot_hub_client_properties_request_id_get(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UCHAR * buffer_ptr,UINT buffer_len,az_span * request_id_span_ptr,UINT * request_id_ptr,UINT odd_seq)1835 static UINT nx_azure_iot_hub_client_properties_request_id_get(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1836                                                               UCHAR *buffer_ptr, UINT buffer_len,
1837                                                               az_span *request_id_span_ptr,
1838                                                               UINT *request_id_ptr, UINT odd_seq)
1839 {
1840 az_span span;
1841 
1842     /* Obtain the mutex.  */
1843     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1844 
1845     /* Check if current request_id is even and new requested is also even or
1846        current request_id is odd and new requested is also odd.  */
1847     if ((hub_client_ptr -> nx_azure_iot_hub_client_request_id & 0x1) == odd_seq)
1848     {
1849         hub_client_ptr -> nx_azure_iot_hub_client_request_id += 2;
1850     }
1851     else
1852     {
1853         hub_client_ptr -> nx_azure_iot_hub_client_request_id += 1;
1854     }
1855 
1856     if (hub_client_ptr -> nx_azure_iot_hub_client_request_id == 0)
1857     {
1858         hub_client_ptr -> nx_azure_iot_hub_client_request_id = 2;
1859     }
1860 
1861     *request_id_ptr = hub_client_ptr -> nx_azure_iot_hub_client_request_id;
1862     span = az_span_create(buffer_ptr, (INT)buffer_len);
1863     if (az_result_failed(az_span_u32toa(span, *request_id_ptr, &span)))
1864     {
1865 
1866         /* Release the mutex.  */
1867         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1868         LogError(LogLiteralArgs("IoTHub client device failed to u32toa"));
1869         return(NX_AZURE_IOT_SDK_CORE_ERROR);
1870     }
1871 
1872     *request_id_span_ptr = az_span_create(buffer_ptr, (INT)(buffer_len - (UINT)az_span_size(span)));
1873 
1874     /* Release the mutex.  */
1875     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1876 
1877     return(NX_AZURE_IOT_SUCCESS);
1878 }
1879 
nx_azure_iot_hub_client_properties_subscribe_status_check(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT wait_option)1880 static UINT nx_azure_iot_hub_client_properties_subscribe_status_check(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr, UINT wait_option)
1881 {
1882 
1883     /* Wait for subscribe ack of topic "$iothub/twin/res/#".  */
1884     while (1)
1885     {
1886         tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1887 
1888         /* Check if it is still in connected status.  */
1889         if (hub_client_ptr -> nx_azure_iot_hub_client_state != NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTED)
1890         {
1891 
1892             /* Clean ack receive notify if telemetry callback is not set.  */
1893             if (hub_client_ptr -> nx_azure_iot_hub_client_telemetry_ack_callback == NX_NULL)
1894             {
1895                 hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt.nxd_mqtt_ack_receive_notify = NX_NULL;
1896             }
1897             tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1898             return(NX_AZURE_IOT_DISCONNECTED);
1899         }
1900 
1901         /* Check if receive the subscribe ack.  */
1902         if (hub_client_ptr -> nx_azure_iot_hub_client_properties_subscribe_ack == NX_TRUE)
1903         {
1904 
1905             /* Clean ack receive notify if telemetry callback is not set.  */
1906             if (hub_client_ptr -> nx_azure_iot_hub_client_telemetry_ack_callback == NX_NULL)
1907             {
1908                 hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt.nxd_mqtt_ack_receive_notify = NX_NULL;
1909             }
1910             tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1911             break;
1912         }
1913 
1914         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1915 
1916         /* Update wait time.  */
1917         if (wait_option != NX_WAIT_FOREVER)
1918         {
1919             if (wait_option > 0)
1920             {
1921                 wait_option--;
1922             }
1923             else
1924             {
1925                 return(NX_AZURE_IOT_NO_SUBSCRIBE_ACK);
1926             }
1927         }
1928 
1929         tx_thread_sleep(1);
1930     }
1931 
1932     return(NX_AZURE_IOT_SUCCESS);
1933 }
1934 
nx_azure_iot_hub_client_properties_request_response(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT request_id,NX_PACKET * packet_ptr,ULONG topic_length,const UCHAR * message_buffer,UINT message_length,UINT message_type,NX_PACKET ** response_packet_pptr,UINT wait_option)1935 static UINT nx_azure_iot_hub_client_properties_request_response(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
1936                                                                 UINT request_id, NX_PACKET *packet_ptr, ULONG topic_length,
1937                                                                 const UCHAR *message_buffer, UINT message_length, UINT message_type,
1938                                                                 NX_PACKET **response_packet_pptr, UINT wait_option)
1939 {
1940 NX_AZURE_IOT_THREAD thread_list;
1941 UINT status;
1942 
1943     if ((message_buffer != NX_NULL) && (message_length != 0))
1944     {
1945 
1946         /* Append payload.  */
1947         status = nx_packet_data_append(packet_ptr, (VOID *)message_buffer, message_length,
1948                                        packet_ptr -> nx_packet_pool_owner,
1949                                        wait_option);
1950         if (status)
1951         {
1952             LogError(LogLiteralArgs("IoTHub client reported state send fail: append failed"));
1953             return(status);
1954         }
1955     }
1956 
1957     /* Obtain the mutex.  */
1958     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1959 
1960     thread_list.thread_message_type = message_type;
1961     thread_list.thread_ptr = tx_thread_identify();
1962     thread_list.thread_expected_id = request_id;
1963     thread_list.thread_received_message = NX_NULL;
1964     thread_list.thread_next = hub_client_ptr -> nx_azure_iot_hub_client_thread_suspended;
1965     hub_client_ptr -> nx_azure_iot_hub_client_thread_suspended = &thread_list;
1966 
1967     /* Release the mutex.  */
1968     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1969 
1970     status = nx_azure_iot_publish_mqtt_packet(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
1971                                               packet_ptr, topic_length, NX_NULL, NX_AZURE_IOT_MQTT_QOS_0,
1972                                               wait_option);
1973 
1974     if (status)
1975     {
1976 
1977         /* Remove thread from waiting suspend queue.  */
1978         tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1979         nx_azure_iot_hub_client_thread_dequeue(hub_client_ptr, &thread_list);
1980         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1981 
1982         LogError(LogLiteralArgs("IoTHub transport send: PUBLISH FAIL status: %d"), status);
1983         return(status);
1984     }
1985 
1986     if ((thread_list.thread_received_message) == NX_NULL && wait_option)
1987     {
1988         tx_thread_sleep(wait_option);
1989     }
1990 
1991     /* Obtain the mutex.  */
1992     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
1993 
1994     nx_azure_iot_hub_client_thread_dequeue(hub_client_ptr, &thread_list);
1995     *response_packet_pptr = thread_list.thread_received_message;
1996 
1997     /* Release the mutex.  */
1998     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
1999 
2000     return(NX_AZURE_IOT_SUCCESS);
2001 }
2002 
nx_azure_iot_hub_client_reported_properties_create(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET ** packet_pptr,UINT wait_option)2003 UINT nx_azure_iot_hub_client_reported_properties_create(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2004                                                         NX_PACKET **packet_pptr,
2005                                                         UINT wait_option)
2006 {
2007 UINT status;
2008 NX_PACKET *packet_ptr;
2009 UINT request_id;
2010 UCHAR *buffer_ptr;
2011 ULONG buffer_size;
2012 az_span request_id_span;
2013 az_result core_result;
2014 UINT topic_length;
2015 
2016     if ((hub_client_ptr == NX_NULL) ||
2017         (packet_pptr == NX_NULL))
2018     {
2019         LogError(LogLiteralArgs("IoTHub reported properties create fail: INVALID POINTER"));
2020         return(NX_AZURE_IOT_INVALID_PARAMETER);
2021     }
2022 
2023     status = nx_azure_iot_publish_packet_get(hub_client_ptr -> nx_azure_iot_ptr,
2024                                              &(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
2025                                              &packet_ptr, wait_option);
2026     if (status)
2027     {
2028         LogError(LogLiteralArgs("IoTHub client reported properties create fail: BUFFER ALLOCATE FAIL"));
2029         return(status);
2030     }
2031 
2032     buffer_size = (UINT)(packet_ptr -> nx_packet_data_end - packet_ptr -> nx_packet_prepend_ptr);
2033     if (buffer_size <= NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE)
2034     {
2035         LogError(LogLiteralArgs("IoTHub client reported properties create fail: BUFFER INSUFFICENT"));
2036         nx_packet_release(packet_ptr);
2037         return(NX_AZURE_IOT_INSUFFICIENT_BUFFER_SPACE);
2038     }
2039 
2040     buffer_size -= NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE;
2041 
2042     /* Generate odd request id for reported properties send */
2043     status = nx_azure_iot_hub_client_properties_request_id_get(hub_client_ptr,
2044                                                                (UCHAR *)(packet_ptr -> nx_packet_data_end -
2045                                                                  NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE),
2046                                                                NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE,
2047                                                                &request_id_span, &request_id, NX_TRUE);
2048 
2049     if (status)
2050     {
2051         LogError(LogLiteralArgs("IoTHub client reported properties create fail: get request id failed"));
2052         nx_packet_release(packet_ptr);
2053         return(status);
2054     }
2055 
2056     core_result = az_iot_hub_client_properties_get_reported_publish_topic(&(hub_client_ptr -> iot_hub_client_core),
2057                                                                           request_id_span,
2058                                                                           (CHAR *)packet_ptr -> nx_packet_prepend_ptr,
2059                                                                           buffer_size, &topic_length);
2060     if (az_result_failed(core_result))
2061     {
2062         LogError(LogLiteralArgs("IoTHub client reported properties create fail: NX_AZURE_IOT_PNP_CLIENT_TOPIC_SIZE is too small."));
2063         nx_packet_release(packet_ptr);
2064         return(NX_AZURE_IOT_INSUFFICIENT_BUFFER_SPACE);
2065     }
2066 
2067     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + topic_length;
2068     packet_ptr -> nx_packet_length = topic_length;
2069 
2070     /* Set the buffer pointer.  */
2071     buffer_ptr = packet_ptr -> nx_packet_prepend_ptr - NX_AZURE_IOT_PUBLISH_PACKET_START_OFFSET;
2072 
2073     /* encode topic length */
2074     buffer_ptr[5] = (UCHAR)(packet_ptr -> nx_packet_length >> 8);
2075     buffer_ptr[6] = (UCHAR)(packet_ptr -> nx_packet_length & 0xFF);
2076 
2077     /* encode request id */
2078     buffer_ptr[4] = (UCHAR)((request_id & 0xFF));
2079     request_id >>= 8;
2080     buffer_ptr[3] = (UCHAR)((request_id & 0xFF));
2081     request_id >>= 8;
2082     buffer_ptr[2] = (UCHAR)((request_id & 0xFF));
2083     request_id >>= 8;
2084     buffer_ptr[1] = (UCHAR)(request_id & 0xFF);
2085 
2086     *packet_pptr = packet_ptr;
2087 
2088     return(NX_AZURE_IOT_SUCCESS);
2089 }
2090 
nx_azure_iot_hub_client_reported_properties_send(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET * packet_ptr,UINT * request_id_ptr,UINT * response_status_ptr,ULONG * version_ptr,UINT wait_option)2091 UINT nx_azure_iot_hub_client_reported_properties_send(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2092                                                       NX_PACKET *packet_ptr,
2093                                                       UINT *request_id_ptr, UINT *response_status_ptr,
2094                                                       ULONG *version_ptr, UINT wait_option)
2095 {
2096 NX_PACKET *response_packet_ptr;
2097 UINT topic_length;
2098 UINT request_id = 0;
2099 ULONG topic_offset;
2100 USHORT length;
2101 UCHAR *buffer_ptr;
2102 UINT status;
2103 
2104     if ((hub_client_ptr == NX_NULL) ||
2105         (packet_ptr == NX_NULL))
2106     {
2107         LogError(LogLiteralArgs("IoTHub client reported properties send fail: INVALID POINTER"));
2108         return(NX_AZURE_IOT_INVALID_PARAMETER);
2109     }
2110 
2111     /* Check if properties is subscribed */
2112     if ((status = nx_azure_iot_hub_client_properties_subscribe_status_check(hub_client_ptr, wait_option)))
2113     {
2114         LogError(LogLiteralArgs("IoTHub client reported properties send fail with error %d"), status);
2115         return(status);
2116     }
2117 
2118     /* Check if the last request was throttled and if the next need to be throttled.  */
2119     if ((status = nx_azure_iot_hub_client_throttled_check(hub_client_ptr)))
2120     {
2121         LogError(LogLiteralArgs("IoTHub client reported properties send fail with error %d"), status);
2122         return(status);
2123     }
2124 
2125     /* Steps.
2126      * 1. Publish message to topic "$iothub/twin/PATCH/properties/reported/?$rid={request id}"
2127      * 2. Wait for the response if required.
2128      * 3. Return result if present.
2129      * */
2130 
2131     buffer_ptr = packet_ptr -> nx_packet_prepend_ptr - NX_AZURE_IOT_PUBLISH_PACKET_START_OFFSET;
2132 
2133     topic_length = (UINT)((buffer_ptr[5] << 8) | buffer_ptr[6]);
2134 
2135     request_id += (buffer_ptr[1] & 0xFF);
2136     request_id <<= 8;
2137     request_id += (buffer_ptr[2] & 0xFF);
2138     request_id <<= 8;
2139     request_id += (buffer_ptr[3] & 0xFF);
2140     request_id <<= 8;
2141     request_id += (buffer_ptr[4] & 0xFF);
2142 
2143     status = nx_azure_iot_hub_client_properties_request_response(hub_client_ptr,
2144                                                                  request_id, packet_ptr, topic_length, NX_NULL, 0,
2145                                                                  NX_AZURE_IOT_HUB_REPORTED_PROPERTIES_RESPONSE,
2146                                                                  &response_packet_ptr, wait_option);
2147     if (status)
2148     {
2149         LogError(LogLiteralArgs("IoTHub client reported properties send fail: append failed"));
2150         return(status);
2151     }
2152 
2153     /* The packet of reported properties has been sent out successfully,
2154        next the return value should be NX_AZURE_IOT_SUCCESS.  */
2155 
2156     /* Continue to process response and the caller can check the response status to see if iothub accept the properties or not,
2157        the reponse status is available only when the return status is NX_AZURE_IOT_SUCCESS.    */
2158     if (request_id_ptr)
2159     {
2160         *request_id_ptr = request_id;
2161     }
2162 
2163     if (response_status_ptr)
2164     {
2165         *response_status_ptr = 0;
2166     }
2167 
2168     if (response_packet_ptr)
2169     {
2170         if(nx_azure_iot_hub_client_process_publish_packet(response_packet_ptr -> nx_packet_prepend_ptr,
2171                                                           &topic_offset, &length) == NX_AZURE_IOT_SUCCESS)
2172         {
2173             nx_azure_iot_hub_client_device_twin_parse(hub_client_ptr,
2174                                                       response_packet_ptr, topic_offset, length,
2175                                                       NX_NULL, version_ptr, NX_NULL,
2176                                                       response_status_ptr);
2177         }
2178 
2179         nx_packet_release(response_packet_ptr);
2180     }
2181 
2182     return(NX_AZURE_IOT_SUCCESS);
2183 }
2184 
nx_azure_iot_hub_client_properties_request(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT wait_option)2185 UINT nx_azure_iot_hub_client_properties_request(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2186                                                 UINT wait_option)
2187 {
2188 UINT status;
2189 UINT topic_length;
2190 UINT buffer_size;
2191 NX_PACKET *packet_ptr;
2192 az_span request_id_span;
2193 UINT request_id;
2194 az_result core_result;
2195 
2196     if (hub_client_ptr == NX_NULL)
2197     {
2198         LogError(LogLiteralArgs("IoTHub client properties request failed: INVALID POINTER"));
2199         return(NX_AZURE_IOT_INVALID_PARAMETER);
2200     }
2201 
2202     if ((status = nx_azure_iot_hub_client_properties_subscribe_status_check(hub_client_ptr, wait_option)))
2203     {
2204         LogError(LogLiteralArgs("IoTHub client properties request failed with error %d"), status);
2205         return(status);
2206     }
2207 
2208     /* Check if the last request was throttled and if the next need to be throttled.  */
2209     if ((status = nx_azure_iot_hub_client_throttled_check(hub_client_ptr)))
2210     {
2211         LogError(LogLiteralArgs("IoTHub client properties request failed with error %d"), status);
2212         return(status);
2213     }
2214 
2215     /* Steps.
2216      * 1. Publish message to topic "$iothub/twin/GET/?$rid={request id}"
2217      * */
2218     status = nx_azure_iot_publish_packet_get(hub_client_ptr -> nx_azure_iot_ptr,
2219                                              &(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
2220                                              &packet_ptr, wait_option);
2221     if (status)
2222     {
2223         LogError(LogLiteralArgs("IoTHub client properties request failed: BUFFER ALLOCATE FAIL"));
2224         return(status);
2225     }
2226 
2227     buffer_size = (UINT)(packet_ptr -> nx_packet_data_end - packet_ptr -> nx_packet_prepend_ptr);
2228     if (buffer_size <= NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE)
2229     {
2230         LogError(LogLiteralArgs("IoTHub client device twin publish fail: BUFFER ALLOCATE FAIL"));
2231         nx_packet_release(packet_ptr);
2232         return(NX_AZURE_IOT_INSUFFICIENT_BUFFER_SPACE);
2233     }
2234 
2235     buffer_size -= NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE;
2236 
2237     /* Generate even request id for properties request.  */
2238     status = nx_azure_iot_hub_client_properties_request_id_get(hub_client_ptr,
2239                                                                (UCHAR *)(packet_ptr -> nx_packet_data_end -
2240                                                                  NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE),
2241                                                                NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE,
2242                                                                &request_id_span, &request_id, NX_FALSE);
2243     if (status)
2244     {
2245         LogError(LogLiteralArgs("IoTHub client device twin failed to get request id"));
2246         nx_packet_release(packet_ptr);
2247         return(status);
2248     }
2249 
2250     core_result = az_iot_hub_client_properties_document_get_publish_topic(&(hub_client_ptr -> iot_hub_client_core),
2251                                                                           request_id_span, (CHAR *)packet_ptr -> nx_packet_prepend_ptr,
2252                                                                           buffer_size, (size_t *)&topic_length);
2253     if (az_result_failed(core_result))
2254     {
2255         LogError(LogLiteralArgs("IoTHub client device twin get topic fail."));
2256         nx_packet_release(packet_ptr);
2257         return(NX_AZURE_IOT_INSUFFICIENT_BUFFER_SPACE);
2258     }
2259 
2260     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + topic_length;
2261     packet_ptr -> nx_packet_length = topic_length;
2262 
2263     status = nx_azure_iot_publish_mqtt_packet(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
2264                                               packet_ptr, topic_length, NX_NULL, NX_AZURE_IOT_MQTT_QOS_0,
2265                                               wait_option);
2266     if (status)
2267     {
2268         LogError(LogLiteralArgs("IoTHub client device twin: PUBLISH FAIL status: %d"), status);
2269         nx_packet_release(packet_ptr);
2270         return(status);
2271     }
2272 
2273     return(NX_AZURE_IOT_SUCCESS);
2274 }
2275 
nx_azure_iot_hub_client_properties_receive(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET ** packet_pptr,UINT wait_option)2276 UINT nx_azure_iot_hub_client_properties_receive(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2277                                                 NX_PACKET **packet_pptr,
2278                                                 UINT wait_option)
2279 {
2280 UINT status;
2281 ULONG topic_offset;
2282 USHORT topic_length;
2283 az_result core_result;
2284 az_span topic_span;
2285 az_iot_hub_client_properties_message out_message;
2286 NX_PACKET *packet_ptr;
2287 
2288     if ((hub_client_ptr == NX_NULL) || (packet_pptr == NX_NULL))
2289     {
2290         LogError(LogLiteralArgs("IoTHub client properties receive failed: INVALID POINTER"));
2291         return(NX_AZURE_IOT_INVALID_PARAMETER);
2292     }
2293 
2294     /* Steps.
2295      * 1. Check if the properties document is available to receive from linklist.
2296      * 2. If present check the response.
2297      * 3. Return the payload of the response.
2298      * */
2299     status = nx_azure_iot_hub_client_message_receive(hub_client_ptr, NX_AZURE_IOT_HUB_PROPERTIES,
2300                                                      &(hub_client_ptr -> nx_azure_iot_hub_client_properties_message),
2301                                                      &packet_ptr, wait_option);
2302     if (status)
2303     {
2304         LogError(LogLiteralArgs("IoTHub client device twin receive failed status: %d"), status);
2305         return(status);
2306     }
2307 
2308     if (nx_azure_iot_hub_client_process_publish_packet(packet_ptr -> nx_packet_prepend_ptr, &topic_offset, &topic_length))
2309     {
2310 
2311         /* Message not supported. It will be released.  */
2312         nx_packet_release(packet_ptr);
2313         return(NX_AZURE_IOT_INVALID_PACKET);
2314     }
2315 
2316     topic_span = az_span_create(&(packet_ptr -> nx_packet_prepend_ptr[topic_offset]), (INT)topic_length);
2317     core_result = az_iot_hub_client_properties_parse_received_topic(&(hub_client_ptr -> iot_hub_client_core),
2318                                                                     topic_span, &out_message);
2319     if (az_result_failed(core_result))
2320     {
2321 
2322         /* Topic name does not match properties format.  */
2323         nx_packet_release(packet_ptr);
2324         return(NX_AZURE_IOT_SDK_CORE_ERROR);
2325     }
2326 
2327     if ((out_message.status < 200) || (out_message.status >= 300))
2328     {
2329         nx_packet_release(packet_ptr);
2330         return(NX_AZURE_IOT_SERVER_RESPONSE_ERROR);
2331     }
2332 
2333     if ((status = nx_azure_iot_hub_client_adjust_payload(packet_ptr)))
2334     {
2335         nx_packet_release(packet_ptr);
2336         return(status);
2337     }
2338 
2339     *packet_pptr = packet_ptr;
2340 
2341     return(NX_AZURE_IOT_SUCCESS);
2342 }
2343 
nx_azure_iot_hub_client_writable_properties_receive(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET ** packet_pptr,UINT wait_option)2344 UINT nx_azure_iot_hub_client_writable_properties_receive(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2345                                                         NX_PACKET **packet_pptr,
2346                                                         UINT wait_option)
2347 {
2348 UINT status;
2349 NX_PACKET *packet_ptr;
2350 
2351     if ((hub_client_ptr == NX_NULL) ||
2352         (packet_pptr == NX_NULL))
2353     {
2354         LogError(LogLiteralArgs("IoTHub client receive properties failed: INVALID POINTER"));
2355         return(NX_AZURE_IOT_INVALID_PARAMETER);
2356     }
2357 
2358     /* Steps.
2359      * 1. Check if the writable properties document is available to receive from linklist.
2360      * 2. Parse result if present.
2361      * 3. Return parse result.
2362      * */
2363     status = nx_azure_iot_hub_client_message_receive(hub_client_ptr, NX_AZURE_IOT_HUB_WRITABLE_PROPERTIES,
2364                                                      &(hub_client_ptr -> nx_azure_iot_hub_client_writable_properties_message),
2365                                                      &packet_ptr, wait_option);
2366     if (status)
2367     {
2368         LogError(LogLiteralArgs("IoTHub client device twin receive failed status: %d"), status);
2369         return(status);
2370     }
2371 
2372     if ((status = nx_azure_iot_hub_client_adjust_payload(packet_ptr)))
2373     {
2374         nx_packet_release(packet_ptr);
2375         return(status);
2376     }
2377 
2378     *packet_pptr = packet_ptr;
2379 
2380     return(NX_AZURE_IOT_SUCCESS);
2381 }
2382 
2383 
nx_azure_iot_hub_client_device_twin_enable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)2384 UINT nx_azure_iot_hub_client_device_twin_enable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
2385 {
2386 
2387     if (hub_client_ptr == NX_NULL)
2388     {
2389         LogError(LogLiteralArgs("IoTHub client device twin subscribe fail: INVALID POINTER"));
2390         return(NX_AZURE_IOT_INVALID_PARAMETER);
2391     }
2392 
2393     return(nx_azure_iot_hub_client_properties_enable(hub_client_ptr));
2394 }
2395 
nx_azure_iot_hub_client_device_twin_disable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)2396 UINT nx_azure_iot_hub_client_device_twin_disable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
2397 {
2398 
2399     if (hub_client_ptr == NX_NULL)
2400     {
2401         LogError(LogLiteralArgs("IoTHub client device twin unsubscribe fail: INVALID POINTER"));
2402         return(NX_AZURE_IOT_INVALID_PARAMETER);
2403     }
2404 
2405     return(nx_azure_iot_hub_client_properties_disable(hub_client_ptr));
2406 }
2407 
2408 
nx_azure_iot_hub_client_device_twin_reported_properties_send(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,const UCHAR * message_buffer,UINT message_length,UINT * request_id_ptr,UINT * response_status_ptr,ULONG * version_ptr,UINT wait_option)2409 UINT nx_azure_iot_hub_client_device_twin_reported_properties_send(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2410                                                                   const UCHAR *message_buffer, UINT message_length,
2411                                                                   UINT *request_id_ptr, UINT *response_status_ptr,
2412                                                                   ULONG *version_ptr, UINT wait_option)
2413 {
2414 UINT status;
2415 UINT buffer_size;
2416 NX_PACKET *packet_ptr;
2417 UINT topic_length;
2418 UINT request_id;
2419 az_span request_id_span;
2420 az_result core_result;
2421 ULONG topic_offset;
2422 USHORT length;
2423 NX_PACKET *response_packet_ptr;
2424 
2425     if (hub_client_ptr == NX_NULL)
2426     {
2427         LogError(LogLiteralArgs("IoTHub client reported state send fail: INVALID POINTER"));
2428         return(NX_AZURE_IOT_INVALID_PARAMETER);
2429     }
2430 
2431     if ((status = nx_azure_iot_hub_client_properties_subscribe_status_check(hub_client_ptr, wait_option)))
2432     {
2433         LogError(LogLiteralArgs("IoTHub client reported state send fail with error %d"), status);
2434         return(status);
2435     }
2436 
2437     /* Check if the last request was throttled and if the next need to be throttled.  */
2438     if ((status = nx_azure_iot_hub_client_throttled_check(hub_client_ptr)))
2439     {
2440         LogError(LogLiteralArgs("IoTHub client reported state send fail with error %d"), status);
2441         return(status);
2442     }
2443 
2444     /* Steps.
2445      * 1. Publish message to topic "$iothub/twin/PATCH/properties/reported/?$rid={request id}"
2446      * 2. Wait for the response if required.
2447      * 3. Return result if present.
2448      * */
2449     if (hub_client_ptr -> nx_azure_iot_hub_client_properties_message.message_process == NX_NULL)
2450     {
2451         LogError(LogLiteralArgs("IoTHub client reported state send fail: NOT ENABLED"));
2452         return(NX_AZURE_IOT_NOT_ENABLED);
2453     }
2454 
2455     status = nx_azure_iot_publish_packet_get(hub_client_ptr -> nx_azure_iot_ptr,
2456                                              &(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
2457                                              &packet_ptr, wait_option);
2458     if (status)
2459     {
2460         LogError(LogLiteralArgs("IoTHub client reported state send fail: BUFFER ALLOCATE FAIL"));
2461         return(status);
2462     }
2463 
2464     buffer_size = (UINT)(packet_ptr -> nx_packet_data_end - packet_ptr -> nx_packet_prepend_ptr);
2465     if (buffer_size <= NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE)
2466     {
2467         LogError(LogLiteralArgs("IoTHub client reported state send fail: BUFFER INSUFFICENT"));
2468         nx_packet_release(packet_ptr);
2469         return(NX_AZURE_IOT_INSUFFICIENT_BUFFER_SPACE);
2470     }
2471 
2472     buffer_size -= NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE;
2473 
2474     /* Generate odd request id for reported properties send.  */
2475     status = nx_azure_iot_hub_client_properties_request_id_get(hub_client_ptr,
2476                                                                (UCHAR *)(packet_ptr -> nx_packet_data_end -
2477                                                                  NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE),
2478                                                                NX_AZURE_IOT_HUB_CLIENT_U32_MAX_BUFFER_SIZE,
2479                                                                &request_id_span, &request_id, NX_TRUE);
2480 
2481     if (status)
2482     {
2483         LogError(LogLiteralArgs("IoTHub client reported state send failed to get request id"));
2484         nx_packet_release(packet_ptr);
2485         return(status);
2486     }
2487 
2488     core_result = az_iot_hub_client_twin_patch_get_publish_topic(&(hub_client_ptr -> iot_hub_client_core),
2489                                                                  request_id_span, (CHAR *)packet_ptr -> nx_packet_prepend_ptr,
2490                                                                  buffer_size, (size_t *)&topic_length);
2491     if (az_result_failed(core_result))
2492     {
2493         LogError(LogLiteralArgs("IoTHub client reported state send fail: NX_AZURE_IOT_HUB_CLIENT_TOPIC_SIZE is too small."));
2494         nx_packet_release(packet_ptr);
2495         return(NX_AZURE_IOT_INSUFFICIENT_BUFFER_SPACE);
2496     }
2497 
2498     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + topic_length;
2499     packet_ptr -> nx_packet_length = topic_length;
2500 
2501     status = nx_azure_iot_hub_client_properties_request_response(hub_client_ptr,
2502                                                                 request_id, packet_ptr, topic_length, message_buffer, message_length,
2503                                                                 NX_AZURE_IOT_HUB_REPORTED_PROPERTIES_RESPONSE,
2504                                                                 &response_packet_ptr, wait_option);
2505     if (status)
2506     {
2507         LogError(LogLiteralArgs("IoTHub client reported state send fail: append failed"));
2508         nx_packet_release(packet_ptr);
2509         return(status);
2510     }
2511 
2512     if (request_id_ptr)
2513     {
2514         *request_id_ptr = request_id;
2515     }
2516 
2517     if (response_packet_ptr == NX_NULL)
2518     {
2519         if (hub_client_ptr -> nx_azure_iot_hub_client_state != NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTED)
2520         {
2521             return(NX_AZURE_IOT_DISCONNECTED);
2522         }
2523         return(NX_AZURE_IOT_NO_PACKET);
2524     }
2525 
2526     if ((status = nx_azure_iot_hub_client_process_publish_packet(response_packet_ptr -> nx_packet_prepend_ptr,
2527                                                                  &topic_offset, &length)))
2528     {
2529         nx_packet_release(response_packet_ptr);
2530         return(status);
2531     }
2532 
2533     if ((status = nx_azure_iot_hub_client_device_twin_parse(hub_client_ptr,
2534                                                             response_packet_ptr, topic_offset, length,
2535                                                             NX_NULL, version_ptr, NX_NULL,
2536                                                             response_status_ptr)))
2537     {
2538         nx_packet_release(response_packet_ptr);
2539         return(status);
2540     }
2541 
2542     /* Release message block.  */
2543     nx_packet_release(response_packet_ptr);
2544 
2545     return(NX_AZURE_IOT_SUCCESS);
2546 }
2547 
nx_azure_iot_hub_client_device_twin_properties_request(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT wait_option)2548 UINT nx_azure_iot_hub_client_device_twin_properties_request(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2549                                                             UINT wait_option)
2550 {
2551 
2552     if (hub_client_ptr == NX_NULL)
2553     {
2554         LogError(LogLiteralArgs("IoTHub client device twin properties request failed: INVALID POINTER"));
2555         return(NX_AZURE_IOT_INVALID_PARAMETER);
2556     }
2557 
2558     return(nx_azure_iot_hub_client_properties_request(hub_client_ptr, wait_option));
2559 }
2560 
nx_azure_iot_hub_client_device_twin_properties_receive(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET ** packet_pptr,UINT wait_option)2561 UINT nx_azure_iot_hub_client_device_twin_properties_receive(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2562                                                             NX_PACKET **packet_pptr, UINT wait_option)
2563 {
2564 
2565     if (hub_client_ptr == NX_NULL || packet_pptr == NX_NULL)
2566     {
2567         LogError(LogLiteralArgs("IoTHub client device twin properties receive failed: INVALID POINTER"));
2568         return(NX_AZURE_IOT_INVALID_PARAMETER);
2569     }
2570 
2571     return(nx_azure_iot_hub_client_properties_receive(hub_client_ptr, packet_pptr, wait_option));
2572 }
2573 
nx_azure_iot_hub_client_device_twin_desired_properties_receive(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET ** packet_pptr,UINT wait_option)2574 UINT nx_azure_iot_hub_client_device_twin_desired_properties_receive(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2575                                                                     NX_PACKET **packet_pptr, UINT wait_option)
2576 {
2577 
2578     if (hub_client_ptr == NX_NULL || packet_pptr == NX_NULL)
2579     {
2580         LogError(LogLiteralArgs("IoTHub client device twin desired properties receive failed: INVALID POINTER"));
2581         return(NX_AZURE_IOT_INVALID_PARAMETER);
2582     }
2583 
2584     return(nx_azure_iot_hub_client_writable_properties_receive(hub_client_ptr, packet_pptr, wait_option));
2585 }
2586 
nx_azure_iot_hub_client_cloud_message_sub_unsub(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT is_subscribe)2587 static UINT nx_azure_iot_hub_client_cloud_message_sub_unsub(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr, UINT is_subscribe)
2588 {
2589 UINT status;
2590 
2591     if (is_subscribe)
2592     {
2593         tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
2594 
2595         /* Atomically update the handler as we need to serialize the handler with incoming messages.  */
2596         hub_client_ptr -> nx_azure_iot_hub_client_c2d_message.message_process = nx_azure_iot_hub_client_c2d_process;
2597         hub_client_ptr -> nx_azure_iot_hub_client_c2d_message.message_enable = nx_azure_iot_hub_client_cloud_message_enable;
2598 
2599         /* Register callbacks even if not connect and when connect complete subscribe for topics.  */
2600         if (hub_client_ptr -> nx_azure_iot_hub_client_state != NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTED)
2601         {
2602             tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
2603             return(NX_AZURE_IOT_SUCCESS);
2604         }
2605 
2606         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
2607 
2608         status = nxd_mqtt_client_subscribe(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
2609                                            AZ_IOT_HUB_CLIENT_C2D_SUBSCRIBE_TOPIC, sizeof(AZ_IOT_HUB_CLIENT_C2D_SUBSCRIBE_TOPIC) - 1,
2610                                            NX_AZURE_IOT_MQTT_QOS_1);
2611         if (status)
2612         {
2613             LogError(LogLiteralArgs("IoTHub cloud message subscribe fail: status: %d"), status);
2614             return(status);
2615         }
2616     }
2617     else
2618     {
2619         status = nxd_mqtt_client_unsubscribe(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
2620                                              AZ_IOT_HUB_CLIENT_C2D_SUBSCRIBE_TOPIC, sizeof(AZ_IOT_HUB_CLIENT_C2D_SUBSCRIBE_TOPIC) - 1);
2621         if (status)
2622         {
2623             LogError(LogLiteralArgs("IoTHub cloud message subscribe fail: status: %d"), status);
2624             return(status);
2625         }
2626 
2627         hub_client_ptr -> nx_azure_iot_hub_client_c2d_message.message_process = NX_NULL;
2628     }
2629 
2630     return(NX_AZURE_IOT_SUCCESS);
2631 }
2632 
nx_azure_iot_hub_client_mqtt_receive_callback(NXD_MQTT_CLIENT * client_ptr,UINT number_of_messages)2633 static VOID nx_azure_iot_hub_client_mqtt_receive_callback(NXD_MQTT_CLIENT* client_ptr,
2634                                                           UINT number_of_messages)
2635 {
2636 NX_AZURE_IOT_RESOURCE *resource = nx_azure_iot_resource_search(client_ptr);
2637 NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr = NX_NULL;
2638 NX_PACKET *packet_ptr;
2639 NX_PACKET *packet_next_ptr;
2640 ULONG topic_offset;
2641 USHORT topic_length;
2642 
2643     /* This function is protected by MQTT mutex.  */
2644 
2645     NX_PARAMETER_NOT_USED(number_of_messages);
2646 
2647     if (resource && (resource -> resource_type == NX_AZURE_IOT_RESOURCE_IOT_HUB))
2648     {
2649         hub_client_ptr = (NX_AZURE_IOT_HUB_CLIENT *)resource -> resource_data_ptr;
2650     }
2651 
2652     if (hub_client_ptr)
2653     {
2654         for (packet_ptr = client_ptr -> message_receive_queue_head;
2655              packet_ptr;
2656              packet_ptr = packet_next_ptr)
2657         {
2658 
2659             /* Store next packet in case current packet is consumed.  */
2660             packet_next_ptr = packet_ptr -> nx_packet_queue_next;
2661 
2662             /* Adjust packet to simply process logic.  */
2663             nx_azure_iot_mqtt_packet_adjust(packet_ptr);
2664 
2665             if (nx_azure_iot_hub_client_process_publish_packet(packet_ptr -> nx_packet_prepend_ptr, &topic_offset,
2666                                                                &topic_length))
2667             {
2668 
2669                 /* Message not supported. It will be released.  */
2670                 nx_packet_release(packet_ptr);
2671                 continue;
2672             }
2673 
2674             if ((topic_offset + topic_length) >
2675                 (ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr))
2676             {
2677 
2678                 /* Only process topic in the first packet since the fixed topic is short enough to fit into one packet.  */
2679                 topic_length = (USHORT)(((ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr) -
2680                                          topic_offset) & 0xFFFF);
2681             }
2682 
2683             if (hub_client_ptr -> nx_azure_iot_hub_client_command_message.message_process &&
2684                 (hub_client_ptr -> nx_azure_iot_hub_client_command_message.message_process(hub_client_ptr, packet_ptr,
2685                                                                                                  topic_offset,
2686                                                                                                  topic_length) == NX_AZURE_IOT_SUCCESS))
2687             {
2688 
2689                 /* Direct method message is processed.  */
2690                 continue;
2691             }
2692 
2693             if (hub_client_ptr -> nx_azure_iot_hub_client_c2d_message.message_process &&
2694                 (hub_client_ptr -> nx_azure_iot_hub_client_c2d_message.message_process(hub_client_ptr, packet_ptr,
2695                                                                                        topic_offset,
2696                                                                                        topic_length) == NX_AZURE_IOT_SUCCESS))
2697             {
2698 
2699                 /* Could to Device message is processed.  */
2700                 continue;
2701             }
2702 
2703             if ((hub_client_ptr -> nx_azure_iot_hub_client_properties_message.message_process) &&
2704                 (hub_client_ptr -> nx_azure_iot_hub_client_properties_message.message_process(hub_client_ptr,
2705                                                                                                packet_ptr, topic_offset,
2706                                                                                                topic_length) == NX_AZURE_IOT_SUCCESS))
2707             {
2708 
2709                 /* Device Twin message is processed.  */
2710                 continue;
2711             }
2712 
2713             /* Message not supported. It will be released.  */
2714             nx_packet_release(packet_ptr);
2715         }
2716 
2717         /* Clear all message from MQTT receive queue.  */
2718         client_ptr -> message_receive_queue_head = NX_NULL;
2719         client_ptr -> message_receive_queue_tail = NX_NULL;
2720         client_ptr -> message_receive_queue_depth = 0;
2721     }
2722 }
2723 
nx_azure_iot_hub_client_message_notify(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_AZURE_IOT_HUB_CLIENT_RECEIVE_MESSAGE * receive_message,NX_PACKET * packet_ptr)2724 static VOID nx_azure_iot_hub_client_message_notify(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2725                                                    NX_AZURE_IOT_HUB_CLIENT_RECEIVE_MESSAGE *receive_message,
2726                                                    NX_PACKET *packet_ptr)
2727 {
2728     if (receive_message -> message_tail)
2729     {
2730         receive_message -> message_tail -> nx_packet_queue_next = packet_ptr;
2731     }
2732     else
2733     {
2734         receive_message -> message_head = packet_ptr;
2735     }
2736     receive_message -> message_tail = packet_ptr;
2737 
2738     /* Check for user callback function.  */
2739     if (receive_message -> message_callback)
2740     {
2741         receive_message -> message_callback(hub_client_ptr, receive_message -> message_callback_args);
2742     }
2743 }
2744 
nx_azure_iot_hub_client_receive_thread_find(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET * packet_ptr,UINT message_type,UINT request_id,NX_AZURE_IOT_THREAD ** thread_list_pptr)2745 static UINT nx_azure_iot_hub_client_receive_thread_find(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2746                                                         NX_PACKET *packet_ptr, UINT message_type,
2747                                                         UINT request_id, NX_AZURE_IOT_THREAD **thread_list_pptr)
2748 {
2749 NX_AZURE_IOT_THREAD *thread_list_prev = NX_NULL;
2750 NX_AZURE_IOT_THREAD *thread_list_ptr;
2751 
2752     /* Search thread waiting for message type.  */
2753     for (thread_list_ptr = hub_client_ptr -> nx_azure_iot_hub_client_thread_suspended;
2754          thread_list_ptr;
2755          thread_list_ptr = thread_list_ptr -> thread_next)
2756     {
2757         if ((thread_list_ptr -> thread_message_type == message_type) &&
2758             (request_id == thread_list_ptr -> thread_expected_id))
2759         {
2760 
2761             /* Found a thread waiting for message type.  */
2762             if (thread_list_prev == NX_NULL)
2763             {
2764                 hub_client_ptr -> nx_azure_iot_hub_client_thread_suspended = thread_list_ptr -> thread_next;
2765             }
2766             else
2767             {
2768                 thread_list_prev -> thread_next = thread_list_ptr -> thread_next;
2769             }
2770             thread_list_ptr -> thread_received_message = packet_ptr;
2771             *thread_list_pptr =  thread_list_ptr;
2772             return(NX_AZURE_IOT_SUCCESS);
2773         }
2774 
2775         thread_list_prev = thread_list_ptr;
2776     }
2777 
2778     return(NX_AZURE_IOT_NOT_FOUND);
2779 }
2780 
nx_azure_iot_hub_client_c2d_process(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET * packet_ptr,ULONG topic_offset,USHORT topic_length)2781 static UINT nx_azure_iot_hub_client_c2d_process(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2782                                                 NX_PACKET *packet_ptr,
2783                                                 ULONG topic_offset,
2784                                                 USHORT topic_length)
2785 {
2786 UCHAR *topic_name;
2787 az_iot_hub_client_c2d_request request;
2788 az_span receive_topic;
2789 az_result core_result;
2790 UINT status;
2791 NX_AZURE_IOT_THREAD *thread_list_ptr;
2792 
2793     /* This function is protected by MQTT mutex.  */
2794 
2795     /* Check message type first.  */
2796     topic_name = &(packet_ptr -> nx_packet_prepend_ptr[topic_offset]);
2797 
2798     /* NOTE: Current implementation does not support topic to span multiple packets.  */
2799     if ((ULONG)(packet_ptr -> nx_packet_append_ptr - topic_name) < topic_length)
2800     {
2801         LogError(LogLiteralArgs("topic out of boundaries of single packet"));
2802         return(NX_AZURE_IOT_TOPIC_TOO_LONG);
2803     }
2804 
2805     receive_topic = az_span_create(topic_name, topic_length);
2806     core_result = az_iot_hub_client_c2d_parse_received_topic(&hub_client_ptr -> iot_hub_client_core,
2807                                                              receive_topic, &request);
2808     if (az_result_failed(core_result))
2809     {
2810 
2811         /* Topic name does not match C2D format.  */
2812         return(NX_AZURE_IOT_NOT_FOUND);
2813     }
2814 
2815     status = nx_azure_iot_hub_client_receive_thread_find(hub_client_ptr,
2816                                                          packet_ptr,
2817                                                          NX_AZURE_IOT_HUB_CLOUD_TO_DEVICE_MESSAGE,
2818                                                          0, &thread_list_ptr);
2819     if (status == NX_AZURE_IOT_SUCCESS)
2820     {
2821         tx_thread_wait_abort(thread_list_ptr -> thread_ptr);
2822         return(NX_AZURE_IOT_SUCCESS);
2823     }
2824 
2825     /* No thread is waiting for C2D message yet.  */
2826     nx_azure_iot_hub_client_message_notify(hub_client_ptr,
2827                                            &(hub_client_ptr -> nx_azure_iot_hub_client_c2d_message),
2828                                            packet_ptr);
2829 
2830     return(NX_AZURE_IOT_SUCCESS);
2831 }
2832 
nx_azure_iot_hub_client_command_process(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET * packet_ptr,ULONG topic_offset,USHORT topic_length)2833 static UINT nx_azure_iot_hub_client_command_process(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2834                                                     NX_PACKET *packet_ptr,
2835                                                     ULONG topic_offset,
2836                                                     USHORT topic_length)
2837 {
2838 UCHAR *topic_name;
2839 az_iot_hub_client_command_request request;
2840 az_span receive_topic;
2841 az_result core_result;
2842 UINT status;
2843 NX_AZURE_IOT_THREAD *thread_list_ptr;
2844 
2845     /* This function is protected by MQTT mutex.  */
2846 
2847     /* Check message type first.  */
2848     topic_name = &(packet_ptr -> nx_packet_prepend_ptr[topic_offset]);
2849 
2850     /* NOTE: Current implementation does not support topic to span multiple packets.  */
2851     if ((ULONG)(packet_ptr -> nx_packet_append_ptr - topic_name) < topic_length)
2852     {
2853         LogError(LogLiteralArgs("topic out of boundaries of single packet"));
2854         return(NX_AZURE_IOT_TOPIC_TOO_LONG);
2855     }
2856 
2857     receive_topic = az_span_create(topic_name, topic_length);
2858     core_result = az_iot_hub_client_commands_parse_received_topic(&(hub_client_ptr -> iot_hub_client_core),
2859                                                                   receive_topic, &request);
2860     if (az_result_failed(core_result))
2861     {
2862 
2863         /* Topic name does not match direct method format.  */
2864         return(NX_AZURE_IOT_NOT_FOUND);
2865     }
2866 
2867     status = nx_azure_iot_hub_client_receive_thread_find(hub_client_ptr,
2868                                                          packet_ptr,
2869                                                          NX_AZURE_IOT_HUB_COMMAND,
2870                                                          0, &thread_list_ptr);
2871     if (status == NX_AZURE_IOT_SUCCESS)
2872     {
2873         tx_thread_wait_abort(thread_list_ptr -> thread_ptr);
2874         return(NX_AZURE_IOT_SUCCESS);
2875     }
2876 
2877     /* No thread is waiting for direct method message yet.  */
2878     nx_azure_iot_hub_client_message_notify(hub_client_ptr,
2879                                            &(hub_client_ptr -> nx_azure_iot_hub_client_command_message),
2880                                            packet_ptr);
2881     return(NX_AZURE_IOT_SUCCESS);
2882 }
2883 
nx_azure_iot_hub_client_properties_message_type_get(UINT core_message_type,UINT request_id)2884 static UINT nx_azure_iot_hub_client_properties_message_type_get(UINT core_message_type, UINT request_id)
2885 {
2886 UINT message_type;
2887 
2888     switch (core_message_type)
2889     {
2890         case AZ_IOT_HUB_CLIENT_PROPERTIES_MESSAGE_TYPE_GET_RESPONSE :
2891 
2892         /* Fall through.  */
2893 
2894         case AZ_IOT_HUB_CLIENT_PROPERTIES_MESSAGE_TYPE_ACKNOWLEDGEMENT :
2895         {
2896 
2897             /* Odd requests are of reported properties and even of twin properties.  */
2898             message_type = request_id % 2 == 0 ? NX_AZURE_IOT_HUB_PROPERTIES : NX_AZURE_IOT_HUB_REPORTED_PROPERTIES_RESPONSE;
2899         }
2900         break;
2901 
2902         case AZ_IOT_HUB_CLIENT_PROPERTIES_MESSAGE_TYPE_WRITABLE_UPDATED :
2903         {
2904             message_type = NX_AZURE_IOT_HUB_WRITABLE_PROPERTIES;
2905         }
2906         break;
2907 
2908         default :
2909         {
2910             message_type = NX_AZURE_IOT_HUB_NONE;
2911         }
2912     }
2913 
2914     return message_type;
2915 }
2916 
nx_azure_iot_hub_client_device_twin_parse(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET * packet_ptr,ULONG topic_offset,USHORT topic_length,UINT * request_id_ptr,ULONG * version_ptr,UINT * message_type_ptr,UINT * status_ptr)2917 static UINT nx_azure_iot_hub_client_device_twin_parse(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2918                                                       NX_PACKET *packet_ptr, ULONG topic_offset,
2919                                                       USHORT topic_length, UINT *request_id_ptr,
2920                                                       ULONG *version_ptr, UINT *message_type_ptr,
2921                                                       UINT *status_ptr)
2922 {
2923 az_result core_result;
2924 az_span topic_span;
2925 az_iot_hub_client_twin_response out_twin_response;
2926 uint32_t request_id = 0;
2927 uint32_t version;
2928 
2929     topic_span = az_span_create(&(packet_ptr -> nx_packet_prepend_ptr[topic_offset]), (INT)topic_length);
2930     core_result = az_iot_hub_client_twin_parse_received_topic(&(hub_client_ptr -> iot_hub_client_core),
2931                                                               topic_span, &out_twin_response);
2932     if (az_result_failed(core_result))
2933     {
2934         return(NX_AZURE_IOT_SDK_CORE_ERROR);
2935     }
2936 
2937     if (version_ptr != NX_NULL && az_span_ptr(out_twin_response.version))
2938     {
2939         core_result = az_span_atou32(out_twin_response.version, &version);
2940         if (az_result_failed(core_result))
2941         {
2942             return(NX_AZURE_IOT_SDK_CORE_ERROR);
2943         }
2944 
2945         *version_ptr = (ULONG)version;
2946     }
2947 
2948     if (az_span_ptr(out_twin_response.request_id))
2949     {
2950         core_result = az_span_atou32(out_twin_response.request_id, &request_id);
2951         if (az_result_failed(core_result))
2952         {
2953             return(NX_AZURE_IOT_SDK_CORE_ERROR);
2954         }
2955     }
2956 
2957     if (request_id_ptr)
2958     {
2959         *request_id_ptr = (UINT)request_id;
2960     }
2961 
2962     if (message_type_ptr)
2963     {
2964         *message_type_ptr = nx_azure_iot_hub_client_properties_message_type_get((UINT)out_twin_response.response_type,
2965                                                                                 (UINT)request_id);
2966     }
2967 
2968     if (status_ptr)
2969     {
2970         *status_ptr = (UINT)out_twin_response.status;
2971     }
2972 
2973     return(NX_AZURE_IOT_SUCCESS);
2974 }
2975 
nx_azure_iot_hub_client_properties_process(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_PACKET * packet_ptr,ULONG topic_offset,USHORT topic_length)2976 static UINT nx_azure_iot_hub_client_properties_process(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
2977                                                        NX_PACKET *packet_ptr,
2978                                                        ULONG topic_offset,
2979                                                        USHORT topic_length)
2980 {
2981 NX_AZURE_IOT_THREAD *thread_list_ptr;
2982 UINT message_type;
2983 UINT response_status;
2984 UINT request_id = 0;
2985 ULONG version = 0;
2986 UINT correlation_id;
2987 UINT status;
2988 ULONG current_time;
2989 
2990     /* This function is protected by MQTT mutex. */
2991     if ((status = nx_azure_iot_hub_client_device_twin_parse(hub_client_ptr, packet_ptr,
2992                                                             topic_offset, topic_length,
2993                                                             &request_id, &version,
2994                                                             &message_type, &response_status)))
2995     {
2996         return(status);
2997     }
2998 
2999     if (response_status == NX_AZURE_IOT_HUB_CLIENT_THROTTLE_STATUS_CODE)
3000     {
3001         if ((status = nx_azure_iot_unix_time_get(hub_client_ptr -> nx_azure_iot_ptr, &current_time)))
3002         {
3003             LogError(LogLiteralArgs("IoTHub client fail to get unix time: %d"), status);
3004             return(status);
3005         }
3006 
3007         hub_client_ptr -> nx_azure_iot_hub_client_throttle_end_time =
3008             current_time + nx_azure_iot_hub_client_throttle_with_jitter(hub_client_ptr);
3009     }
3010     else
3011     {
3012         hub_client_ptr -> nx_azure_iot_hub_client_throttle_count = 0;
3013         hub_client_ptr -> nx_azure_iot_hub_client_throttle_end_time = 0;
3014     }
3015 
3016     if (message_type == NX_AZURE_IOT_HUB_NONE)
3017     {
3018         LogError(LogLiteralArgs("IoTHub client fail to parse device twin: %d"), NX_AZURE_IOT_SERVER_RESPONSE_ERROR);
3019         return(NX_AZURE_IOT_SERVER_RESPONSE_ERROR);
3020     }
3021     else if (message_type == NX_AZURE_IOT_HUB_REPORTED_PROPERTIES_RESPONSE)
3022     {
3023 
3024         /* Only requested thread should be woken.  */
3025         correlation_id = request_id;
3026     }
3027     else
3028     {
3029 
3030         /* Any thread can be woken.  */
3031         correlation_id = 0;
3032 
3033         /* Process system component.  */
3034         if ((hub_client_ptr -> nx_azure_iot_hub_client_component_properties_process) &&
3035             ((response_status >= 200) && (response_status < 300)))
3036         {
3037             hub_client_ptr -> nx_azure_iot_hub_client_component_properties_process(hub_client_ptr, packet_ptr, message_type);
3038         }
3039     }
3040 
3041     status = nx_azure_iot_hub_client_receive_thread_find(hub_client_ptr,
3042                                                          packet_ptr,
3043                                                          message_type,
3044                                                          correlation_id, &thread_list_ptr);
3045     if (status == NX_AZURE_IOT_SUCCESS)
3046     {
3047         tx_thread_wait_abort(thread_list_ptr -> thread_ptr);
3048         return(NX_AZURE_IOT_SUCCESS);
3049     }
3050 
3051     switch(message_type)
3052     {
3053         case NX_AZURE_IOT_HUB_REPORTED_PROPERTIES_RESPONSE :
3054         {
3055             if (hub_client_ptr -> nx_azure_iot_hub_client_report_properties_response_callback)
3056             {
3057                 hub_client_ptr -> nx_azure_iot_hub_client_report_properties_response_callback(hub_client_ptr,
3058                                                                                               request_id,
3059                                                                                               response_status,
3060                                                                                               version,
3061                                                                                               hub_client_ptr -> nx_azure_iot_hub_client_report_properties_response_callback_args);
3062             }
3063 
3064             nx_packet_release(packet_ptr);
3065         }
3066         break;
3067 
3068         case NX_AZURE_IOT_HUB_PROPERTIES :
3069         {
3070 
3071             /* No thread is waiting for device twin message yet.  */
3072             nx_azure_iot_hub_client_message_notify(hub_client_ptr,
3073                                                    &(hub_client_ptr -> nx_azure_iot_hub_client_properties_message),
3074                                                    packet_ptr);
3075         }
3076         break;
3077 
3078         case NX_AZURE_IOT_HUB_WRITABLE_PROPERTIES :
3079         {
3080 
3081             /* No thread is waiting for device twin message yet.  */
3082             nx_azure_iot_hub_client_message_notify(hub_client_ptr,
3083                                                    &(hub_client_ptr -> nx_azure_iot_hub_client_writable_properties_message),
3084                                                    packet_ptr);
3085         }
3086         break;
3087 
3088         default :
3089             nx_packet_release(packet_ptr);
3090     }
3091 
3092     return(NX_AZURE_IOT_SUCCESS);
3093 }
3094 
nx_azure_iot_hub_client_thread_dequeue(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,NX_AZURE_IOT_THREAD * thread_list_ptr)3095 static VOID nx_azure_iot_hub_client_thread_dequeue(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
3096                                                    NX_AZURE_IOT_THREAD *thread_list_ptr)
3097 {
3098 NX_AZURE_IOT_THREAD *thread_list_prev = NX_NULL;
3099 NX_AZURE_IOT_THREAD *thread_list_current;
3100 
3101     for (thread_list_current = hub_client_ptr -> nx_azure_iot_hub_client_thread_suspended;
3102          thread_list_current;
3103          thread_list_current = thread_list_current -> thread_next)
3104     {
3105         if (thread_list_current == thread_list_ptr)
3106         {
3107 
3108             /* Found the thread to dequeue.  */
3109             if (thread_list_prev == NX_NULL)
3110             {
3111                 hub_client_ptr -> nx_azure_iot_hub_client_thread_suspended = thread_list_current -> thread_next;
3112             }
3113             else
3114             {
3115                 thread_list_prev -> thread_next = thread_list_current -> thread_next;
3116             }
3117             break;
3118         }
3119 
3120         thread_list_prev = thread_list_current;
3121     }
3122 }
3123 
nx_azure_iot_hub_client_sas_token_get(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,ULONG expiry_time_secs,const UCHAR * key,UINT key_len,UCHAR * sas_buffer,UINT sas_buffer_len,UINT * sas_length)3124 static UINT nx_azure_iot_hub_client_sas_token_get(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
3125                                                   ULONG expiry_time_secs, const UCHAR *key, UINT key_len,
3126                                                   UCHAR *sas_buffer, UINT sas_buffer_len, UINT *sas_length)
3127 {
3128 UCHAR *buffer_ptr;
3129 UINT buffer_size;
3130 VOID *buffer_context;
3131 az_span span = az_span_create(sas_buffer, (INT)sas_buffer_len);
3132 az_span buffer_span;
3133 UINT status;
3134 UCHAR *output_ptr;
3135 UINT output_len;
3136 az_result core_result;
3137 
3138     status = nx_azure_iot_buffer_allocate(hub_client_ptr -> nx_azure_iot_ptr, &buffer_ptr, &buffer_size, &buffer_context);
3139     if (status)
3140     {
3141         LogError(LogLiteralArgs("IoTHub client sas token fail: BUFFER ALLOCATE FAIL"));
3142         return(status);
3143     }
3144 
3145     core_result = az_iot_hub_client_sas_get_signature(&(hub_client_ptr -> iot_hub_client_core),
3146                                                       expiry_time_secs, span, &span);
3147     if (az_result_failed(core_result))
3148     {
3149         LogError(LogLiteralArgs("IoTHub failed failed to get signature with error status: %d"), core_result);
3150         nx_azure_iot_buffer_free(buffer_context);
3151         return(NX_AZURE_IOT_SDK_CORE_ERROR);
3152     }
3153 
3154     status = nx_azure_iot_base64_hmac_sha256_calculate(&(hub_client_ptr -> nx_azure_iot_hub_client_resource),
3155                                                        key, key_len, az_span_ptr(span), (UINT)az_span_size(span),
3156                                                        buffer_ptr, buffer_size, &output_ptr, &output_len);
3157     if (status)
3158     {
3159         LogError(LogLiteralArgs("IoTHub failed to encoded hash"));
3160         nx_azure_iot_buffer_free(buffer_context);
3161         return(status);
3162     }
3163 
3164     buffer_span = az_span_create(output_ptr, (INT)output_len);
3165     core_result= az_iot_hub_client_sas_get_password(&(hub_client_ptr -> iot_hub_client_core),
3166                                                     expiry_time_secs, buffer_span, AZ_SPAN_EMPTY,
3167                                                     (CHAR *)sas_buffer, sas_buffer_len, (size_t *)&sas_buffer_len);
3168     if (az_result_failed(core_result))
3169     {
3170         LogError(LogLiteralArgs("IoTHub failed to generate token with error status: %d"), core_result);
3171         nx_azure_iot_buffer_free(buffer_context);
3172         return(NX_AZURE_IOT_SDK_CORE_ERROR);
3173     }
3174 
3175     *sas_length = sas_buffer_len;
3176     nx_azure_iot_buffer_free(buffer_context);
3177 
3178     return(NX_AZURE_IOT_SUCCESS);
3179 }
3180 
nx_azure_iot_hub_client_command_enable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)3181 UINT nx_azure_iot_hub_client_command_enable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
3182 {
3183 UINT status;
3184 
3185     if (hub_client_ptr == NX_NULL)
3186     {
3187         LogError(LogLiteralArgs("IoTHub client command subscribe fail: INVALID POINTER"));
3188         return(NX_AZURE_IOT_INVALID_PARAMETER);
3189     }
3190 
3191     tx_mutex_get(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr, TX_WAIT_FOREVER);
3192 
3193     /* Atomically update the handler as we need to serialize the handler with incoming messages.  */
3194     hub_client_ptr -> nx_azure_iot_hub_client_command_message.message_process = nx_azure_iot_hub_client_command_process;
3195     hub_client_ptr -> nx_azure_iot_hub_client_command_message.message_enable = nx_azure_iot_hub_client_command_enable;
3196 
3197     /* Register callbacks even if not connect and when connect complete subscribe for topics.  */
3198     if (hub_client_ptr -> nx_azure_iot_hub_client_state != NX_AZURE_IOT_HUB_CLIENT_STATUS_CONNECTED)
3199     {
3200         tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
3201         return(NX_AZURE_IOT_SUCCESS);
3202     }
3203 
3204     tx_mutex_put(hub_client_ptr -> nx_azure_iot_ptr -> nx_azure_iot_mutex_ptr);
3205 
3206     status = nxd_mqtt_client_subscribe(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
3207                                        AZ_IOT_HUB_CLIENT_METHODS_SUBSCRIBE_TOPIC,
3208                                        sizeof(AZ_IOT_HUB_CLIENT_METHODS_SUBSCRIBE_TOPIC) - 1,
3209                                        NX_AZURE_IOT_MQTT_QOS_0);
3210     if (status)
3211     {
3212         LogError(LogLiteralArgs("IoTHub client command subscribe fail %d"), status);
3213         return(status);
3214     }
3215 
3216     return(NX_AZURE_IOT_SUCCESS);
3217 }
3218 
nx_azure_iot_hub_client_command_disable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)3219 UINT nx_azure_iot_hub_client_command_disable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
3220 {
3221 UINT status;
3222 
3223     if (hub_client_ptr == NX_NULL)
3224     {
3225         LogError(LogLiteralArgs("IoTHub client command unsubscribe fail: INVALID POINTER"));
3226         return(NX_AZURE_IOT_INVALID_PARAMETER);
3227     }
3228 
3229     status = nxd_mqtt_client_unsubscribe(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
3230                                          AZ_IOT_HUB_CLIENT_METHODS_SUBSCRIBE_TOPIC,
3231                                          sizeof(AZ_IOT_HUB_CLIENT_METHODS_SUBSCRIBE_TOPIC) - 1);
3232     if (status)
3233     {
3234         LogError(LogLiteralArgs("IoTHub client command unsubscribe fail status: %d"), status);
3235         return(status);
3236     }
3237 
3238     hub_client_ptr -> nx_azure_iot_hub_client_command_message.message_process = NX_NULL;
3239 
3240     return(NX_AZURE_IOT_SUCCESS);
3241 }
3242 
nx_azure_iot_hub_client_command_message_receive(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,const UCHAR ** component_name_pptr,USHORT * component_name_length_ptr,const UCHAR ** command_name_pptr,USHORT * command_name_length_ptr,VOID ** context_pptr,USHORT * context_length_ptr,NX_PACKET ** packet_pptr,UINT wait_option)3243 UINT nx_azure_iot_hub_client_command_message_receive(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
3244                                                      const UCHAR **component_name_pptr, USHORT *component_name_length_ptr,
3245                                                      const UCHAR **command_name_pptr, USHORT *command_name_length_ptr,
3246                                                      VOID **context_pptr, USHORT *context_length_ptr,
3247                                                      NX_PACKET **packet_pptr, UINT wait_option)
3248 {
3249 
3250 UINT status;
3251 ULONG topic_offset = 0;
3252 USHORT topic_length = 0;
3253 az_span topic_span;
3254 NX_PACKET *packet_ptr = NX_NULL;
3255 az_result core_result;
3256 az_iot_hub_client_command_request request;
3257 
3258     if ((hub_client_ptr == NX_NULL) ||
3259         (command_name_pptr == NX_NULL) ||
3260         (command_name_length_ptr == NX_NULL) ||
3261         (context_pptr == NX_NULL) ||
3262         (context_length_ptr == NX_NULL) ||
3263         (packet_pptr == NX_NULL))
3264     {
3265         LogError(LogLiteralArgs("IoT PnP client command receive fail: INVALID POINTER"));
3266         return(NX_AZURE_IOT_INVALID_PARAMETER);
3267     }
3268 
3269     status = nx_azure_iot_hub_client_message_receive(hub_client_ptr, NX_AZURE_IOT_HUB_COMMAND,
3270                                                      &(hub_client_ptr -> nx_azure_iot_hub_client_command_message),
3271                                                      &packet_ptr, wait_option);
3272     if (status)
3273     {
3274         return(status);
3275     }
3276 
3277     status = nx_azure_iot_hub_client_process_publish_packet(packet_ptr -> nx_packet_prepend_ptr, &topic_offset, &topic_length);
3278     if (status)
3279     {
3280         nx_packet_release(packet_ptr);
3281         return(status);
3282     }
3283 
3284     topic_span = az_span_create(&(packet_ptr -> nx_packet_prepend_ptr[topic_offset]), topic_length);
3285     core_result = az_iot_hub_client_commands_parse_received_topic(&(hub_client_ptr -> iot_hub_client_core),
3286                                                                   topic_span, &request);
3287     if (az_result_failed(core_result))
3288     {
3289         nx_packet_release(packet_ptr);
3290         return(NX_AZURE_IOT_SDK_CORE_ERROR);
3291     }
3292 
3293     if ((status = nx_azure_iot_hub_client_adjust_payload(packet_ptr)))
3294     {
3295         nx_packet_release(packet_ptr);
3296         return(status);
3297     }
3298 
3299     *packet_pptr = packet_ptr;
3300     if ((component_name_pptr) && (component_name_length_ptr))
3301     {
3302         *component_name_pptr = (const UCHAR *)az_span_ptr(request.component_name);
3303         *component_name_length_ptr = (USHORT)az_span_size(request.component_name);
3304     }
3305     *command_name_pptr = (const UCHAR *)az_span_ptr(request.command_name);
3306     *command_name_length_ptr = (USHORT)az_span_size(request.command_name);
3307     *context_pptr = (VOID*)az_span_ptr(request.request_id);
3308     *context_length_ptr =  (USHORT)az_span_size(request.request_id);
3309 
3310     return(NX_AZURE_IOT_SUCCESS);
3311 }
3312 
nx_azure_iot_hub_client_command_message_response(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT status_code,VOID * context_ptr,USHORT context_length,const UCHAR * payload,UINT payload_length,UINT wait_option)3313 UINT nx_azure_iot_hub_client_command_message_response(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
3314                                                       UINT status_code, VOID *context_ptr,
3315                                                       USHORT context_length, const UCHAR *payload,
3316                                                       UINT payload_length, UINT wait_option)
3317 {
3318 NX_PACKET *packet_ptr;
3319 UINT topic_length;
3320 az_span request_id_span;
3321 UINT status;
3322 az_result core_result;
3323 
3324     if ((hub_client_ptr == NX_NULL) ||
3325         (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL) ||
3326         (context_ptr == NX_NULL) ||
3327         (context_length == 0))
3328     {
3329         LogError(LogLiteralArgs("IoTHub client command response fail: INVALID POINTER"));
3330         return(NX_AZURE_IOT_INVALID_PARAMETER);
3331     }
3332 
3333     /* Prepare response packet.  */
3334     status = nx_azure_iot_publish_packet_get(hub_client_ptr -> nx_azure_iot_ptr,
3335                                              &(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
3336                                              &packet_ptr, wait_option);
3337     if (status)
3338     {
3339         LogError(LogLiteralArgs("Create response data fail"));
3340         return(status);
3341     }
3342 
3343     topic_length = (UINT)(packet_ptr -> nx_packet_data_end - packet_ptr -> nx_packet_prepend_ptr);
3344     request_id_span = az_span_create((UCHAR*)context_ptr, (INT)context_length);
3345     core_result = az_iot_hub_client_commands_response_get_publish_topic(&(hub_client_ptr -> iot_hub_client_core),
3346                                                                         request_id_span, (USHORT)status_code,
3347                                                                         (CHAR *)packet_ptr -> nx_packet_prepend_ptr,
3348                                                                         topic_length, (size_t *)&topic_length);
3349     if (az_result_failed(core_result))
3350     {
3351         LogError(LogLiteralArgs("Failed to create the command response topic"));
3352         nx_packet_release(packet_ptr);
3353         return(NX_AZURE_IOT_SDK_CORE_ERROR);
3354     }
3355 
3356     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + topic_length;
3357     packet_ptr -> nx_packet_length = topic_length;
3358 
3359     if (payload && (payload_length != 0))
3360     {
3361 
3362         /* Append payload.  */
3363         status = nx_packet_data_append(packet_ptr, (VOID *)payload, payload_length,
3364                                        packet_ptr -> nx_packet_pool_owner,
3365                                        wait_option);
3366         if (status)
3367         {
3368             LogError(LogLiteralArgs("Command response data append fail"));
3369             nx_packet_release(packet_ptr);
3370             return(status);
3371         }
3372     }
3373     else
3374     {
3375 
3376         /* Append payload.  */
3377         status = nx_packet_data_append(packet_ptr, NX_AZURE_IOT_HUB_CLIENT_EMPTY_JSON,
3378                                        sizeof(NX_AZURE_IOT_HUB_CLIENT_EMPTY_JSON) - 1,
3379                                        packet_ptr -> nx_packet_pool_owner,
3380                                        wait_option);
3381         if (status)
3382         {
3383             LogError(LogLiteralArgs("Adding empty json failed."));
3384             nx_packet_release(packet_ptr);
3385             return(status);
3386         }
3387     }
3388 
3389     status = nx_azure_iot_publish_mqtt_packet(&(hub_client_ptr -> nx_azure_iot_hub_client_resource.resource_mqtt),
3390                                               packet_ptr, topic_length, NX_NULL, NX_AZURE_IOT_MQTT_QOS_0,
3391                                               wait_option);
3392     if (status)
3393     {
3394         LogError(LogLiteralArgs("IoTHub client command response fail: PUBLISH FAIL status: %d"), status);
3395         nx_packet_release(packet_ptr);
3396         return(status);
3397     }
3398 
3399     return(NX_AZURE_IOT_SUCCESS);
3400 }
3401 
nx_azure_iot_hub_client_direct_method_enable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)3402 UINT nx_azure_iot_hub_client_direct_method_enable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
3403 {
3404 
3405     if (hub_client_ptr == NX_NULL)
3406     {
3407         LogError(LogLiteralArgs("IoTHub client direct method subscribe fail: INVALID POINTER"));
3408         return(NX_AZURE_IOT_INVALID_PARAMETER);
3409     }
3410 
3411     return(nx_azure_iot_hub_client_command_enable(hub_client_ptr));
3412 }
3413 
nx_azure_iot_hub_client_direct_method_disable(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr)3414 UINT nx_azure_iot_hub_client_direct_method_disable(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr)
3415 {
3416 
3417     if (hub_client_ptr == NX_NULL)
3418     {
3419         LogError(LogLiteralArgs("IoTHub client direct method unsubscribe fail: INVALID POINTER"));
3420         return(NX_AZURE_IOT_INVALID_PARAMETER);
3421     }
3422 
3423     return(nx_azure_iot_hub_client_command_disable(hub_client_ptr));
3424 }
3425 
nx_azure_iot_hub_client_direct_method_message_receive(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,const UCHAR ** method_name_pptr,USHORT * method_name_length_ptr,VOID ** context_pptr,USHORT * context_length_ptr,NX_PACKET ** packet_pptr,UINT wait_option)3426 UINT nx_azure_iot_hub_client_direct_method_message_receive(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
3427                                                            const UCHAR **method_name_pptr, USHORT *method_name_length_ptr,
3428                                                            VOID **context_pptr, USHORT *context_length_ptr,
3429                                                            NX_PACKET **packet_pptr, UINT wait_option)
3430 {
3431 
3432     if ((hub_client_ptr == NX_NULL) ||
3433         (method_name_pptr == NX_NULL) ||
3434         (method_name_length_ptr == NX_NULL) ||
3435         (context_pptr == NX_NULL) ||
3436         (context_length_ptr == NX_NULL) ||
3437         (packet_pptr == NX_NULL))
3438     {
3439         LogError(LogLiteralArgs("IoTHub client direct method receive fail: INVALID POINTER"));
3440         return(NX_AZURE_IOT_INVALID_PARAMETER);
3441     }
3442 
3443     return(nx_azure_iot_hub_client_command_message_receive(hub_client_ptr, NX_NULL, NX_NULL,
3444                                                            method_name_pptr, method_name_length_ptr,
3445                                                            context_pptr, context_length_ptr,
3446                                                            packet_pptr, wait_option));
3447 }
3448 
nx_azure_iot_hub_client_direct_method_message_response(NX_AZURE_IOT_HUB_CLIENT * hub_client_ptr,UINT status_code,VOID * context_ptr,USHORT context_length,const UCHAR * payload,UINT payload_length,UINT wait_option)3449 UINT nx_azure_iot_hub_client_direct_method_message_response(NX_AZURE_IOT_HUB_CLIENT *hub_client_ptr,
3450                                                             UINT status_code, VOID *context_ptr,
3451                                                             USHORT context_length, const UCHAR *payload,
3452                                                             UINT payload_length, UINT wait_option)
3453 {
3454 
3455     if ((hub_client_ptr == NX_NULL) ||
3456         (hub_client_ptr -> nx_azure_iot_ptr == NX_NULL) ||
3457         (context_ptr == NX_NULL) ||
3458         (context_length == 0))
3459     {
3460         LogError(LogLiteralArgs("IoTHub direct method response fail: INVALID POINTER"));
3461         return(NX_AZURE_IOT_INVALID_PARAMETER);
3462     }
3463 
3464     return(nx_azure_iot_hub_client_command_message_response(hub_client_ptr, status_code,
3465                                                             context_ptr, context_length,
3466                                                             payload, payload_length,
3467                                                             wait_option));
3468 }
3469