xref: /CoreMQTT-Agent-v1.2.0/source/core_mqtt_agent.c (revision 86c51db1a7d060744a06fa3619bd3ba86a938a94)
1 /*
2  * coreMQTT Agent v1.2.0
3  * Copyright (C) 2021 Amazon.com, Inc. or its affiliates.  All Rights Reserved.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of
6  * this software and associated documentation files (the "Software"), to deal in
7  * the Software without restriction, including without limitation the rights to
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
9  * the Software, and to permit persons to whom the Software is furnished to do so,
10  * subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in all
13  * copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21  */
22 
23 /**
24  * @file core_mqtt_agent.c
25  * @brief Implements an MQTT agent (or daemon task) to enable multithreaded access to
26  * coreMQTT.
27  *
28  * @note Implements an MQTT agent (or daemon task) on top of the coreMQTT MQTT client
29  * library.  The agent makes coreMQTT usage thread safe by being the only task (or
30  * thread) in the system that is allowed to access the native coreMQTT API - and in
31  * so doing, serializes all access to coreMQTT even when multiple tasks are using the
32  * same MQTT connection.
33  *
34  * The agent provides an equivalent API for each coreMQTT API.  Whereas coreMQTT
35  * APIs are prefixed "MQTT_", the agent APIs are prefixed "MQTTAgent_".  For example,
36  * that agent's MQTTAgent_Publish() API is the thread safe equivalent to coreMQTT's
37  * MQTT_Publish() API.
38  */
39 
40 /* Standard includes. */
41 #include <string.h>
42 #include <stdio.h>
43 #include <assert.h>
44 
45 /* MQTT agent include. */
46 #include "core_mqtt_agent.h"
47 #include "core_mqtt_agent_command_functions.h"
48 
49 /* MQTT Agent default logging configuration include. */
50 #include "core_mqtt_agent_default_logging.h"
51 
52 /*-----------------------------------------------------------*/
53 
54 #if ( MQTT_AGENT_USE_QOS_1_2_PUBLISH != 0 )
55 
56 /**
57  * @brief Array used to maintain the outgoing publish records and their
58  * state by the coreMQTT library.
59  */
60     static MQTTPubAckInfo_t pOutgoingPublishRecords[ MQTT_AGENT_MAX_OUTSTANDING_ACKS ];
61 
62 /**
63  * @brief Array used to maintain the incoming publish records and their
64  * state by the coreMQTT library.
65  */
66     static MQTTPubAckInfo_t pIncomingPublishRecords[ MQTT_AGENT_MAX_OUTSTANDING_ACKS ];
67 #endif
68 
69 /**
70  * @brief Track an operation by adding it to a list, indicating it is anticipating
71  * an acknowledgment.
72  *
73  * @param[in] pAgentContext Agent context for the MQTT connection.
74  * @param[in] packetId Packet ID of pending ack.
75  * @param[in] pCommand Pointer to command that is expecting an ack.
76  *
77  * @return Returns one of the following:
78  * - #MQTTSuccess if an entry was added for the to the list.
79  * - #MQTTStateCollision if there already exists an entry for the same packet ID
80  * in the list.
81  * - #MQTTNoMemory if there is no space available in the list for adding a
82  * new entry.
83  */
84 static MQTTStatus_t addAwaitingOperation( MQTTAgentContext_t * pAgentContext,
85                                           uint16_t packetId,
86                                           MQTTAgentCommand_t * pCommand );
87 
88 /**
89  * @brief Retrieve an operation from the list of pending acks, and optionally
90  * remove it from the list.
91  *
92  * @param[in] pAgentContext Agent context for the MQTT connection.
93  * @param[in] incomingPacketId Packet ID of incoming ack.
94  *
95  * @return Pointer to stored information about the operation awaiting the ack.
96  * Returns NULL if the packet ID is zero or original command does not exist.
97  */
98 static MQTTAgentAckInfo_t * getAwaitingOperation( MQTTAgentContext_t * pAgentContext,
99                                                   uint16_t incomingPacketId );
100 
101 /**
102  * @brief Populate the parameters of a #MQTTAgentCommand struct.
103  *
104  * @param[in] commandType Type of command.  For example, publish or subscribe.
105  * @param[in] pMqttAgentContext Pointer to MQTT context to use for command.
106  * @param[in] pMqttInfoParam Pointer to MQTTPublishInfo_t or MQTTSubscribeInfo_t.
107  * @param[in] commandCompleteCallback Callback for when command completes.
108  * @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
109  * @param[out] pCommand Pointer to initialized command.
110  *
111  * @return #MQTTSuccess if all necessary fields for the command are passed,
112  * else an enumerated error code.
113  */
114 static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
115                                    const MQTTAgentContext_t * pMqttAgentContext,
116                                    void * pMqttInfoParam,
117                                    MQTTAgentCommandCallback_t commandCompleteCallback,
118                                    MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
119                                    MQTTAgentCommand_t * pCommand );
120 
121 /**
122  * @brief Add a command to the global command queue.
123  *
124  * @param[in] pAgentContext Agent context for the MQTT connection.
125  * @param[in] pCommand Pointer to command to copy to queue.
126  * @param[in] blockTimeMs The maximum amount of time to milliseconds to wait in the
127  * Blocked state (so not consuming any CPU time) for the command to be posted to the
128  * queue should the queue already be full.
129  *
130  * @return #MQTTSuccess if the command was added to the queue, else an enumerated
131  * error code.
132  */
133 static MQTTStatus_t addCommandToQueue( const MQTTAgentContext_t * pAgentContext,
134                                        MQTTAgentCommand_t * pCommand,
135                                        uint32_t blockTimeMs );
136 
137 /**
138  * @brief Process a #MQTTAgentCommand struct.
139  *
140  * @note This agent does not check existing subscriptions before sending a
141  * SUBSCRIBE or UNSUBSCRIBE packet. If a subscription already exists, then
142  * a SUBSCRIBE packet will be sent anyway, and if multiple tasks are subscribed
143  * to a topic filter, then they will all be unsubscribed after an UNSUBSCRIBE.
144  *
145  * @param[in] pMqttAgentContext Agent context for MQTT connection.
146  * @param[in] pCommand Pointer to command to process.
147  * @param[out] pEndLoop Whether the command loop should terminate.
148  *
149  * @return status of MQTT library API call.
150  */
151 static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
152                                     MQTTAgentCommand_t * pCommand,
153                                     bool * pEndLoop );
154 
155 /**
156  * @brief Dispatch incoming publishes and acks to their various handler functions.
157  *
158  * @param[in] pMqttContext MQTT Context
159  * @param[in] pPacketInfo Pointer to incoming packet.
160  * @param[in] pDeserializedInfo Pointer to deserialized information from
161  * the incoming packet.
162  */
163 static void mqttEventCallback( MQTTContext_t * pMqttContext,
164                                MQTTPacketInfo_t * pPacketInfo,
165                                MQTTDeserializedInfo_t * pDeserializedInfo );
166 
167 /**
168  * @brief Mark a command as complete after receiving an acknowledgment packet.
169  *
170  * @param[in] pAgentContext Agent context for the MQTT connection.
171  * @param[in] pPacketInfo Pointer to incoming packet.
172  * @param[in] pDeserializedInfo Pointer to deserialized information from
173  * the incoming packet.
174  * @param[in] pAckInfo Pointer to stored information for the original operation
175  * resulting in the received packet.
176  * @param[in] packetType The type of the incoming packet, either SUBACK, UNSUBACK,
177  * PUBACK, or PUBCOMP.
178  */
179 static void handleAcks( const MQTTAgentContext_t * pAgentContext,
180                         const MQTTPacketInfo_t * pPacketInfo,
181                         const MQTTDeserializedInfo_t * pDeserializedInfo,
182                         MQTTAgentAckInfo_t * pAckInfo,
183                         uint8_t packetType );
184 
185 /**
186  * @brief Retrieve a pointer to an agent context given an MQTT context.
187  *
188  * @param[in] pMQTTContext MQTT Context to search for.
189  *
190  * @return Pointer to agent context, or NULL.
191  */
192 static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContext );
193 
194 /**
195  * @brief Helper function for creating a command and adding it to the command
196  * queue.
197  *
198  * @param[in] commandType Type of command.
199  * @param[in] pMqttAgentContext Handle of the MQTT connection to use.
200  * @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
201  * @param[in] commandCompleteCallback Callback for when command completes.
202  * @param[in] pMqttInfoParam Pointer to command argument.
203  * @param[in] blockTimeMs Maximum amount of time in milliseconds to wait (in the
204  * Blocked state, so not consuming any CPU time) for the command to be posted to the
205  * MQTT agent should the MQTT agent's event queue be full.
206  *
207  * @return #MQTTSuccess if the command was posted to the MQTT agent's event queue.
208  * Otherwise an enumerated error code.
209  */
210 static MQTTStatus_t createAndAddCommand( MQTTAgentCommandType_t commandType,
211                                          const MQTTAgentContext_t * pMqttAgentContext,
212                                          void * pMqttInfoParam,
213                                          MQTTAgentCommandCallback_t commandCompleteCallback,
214                                          MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
215                                          uint32_t blockTimeMs );
216 
217 /**
218  * @brief Helper function to mark a command as complete and invoke its callback.
219  * This function calls the releaseCommand callback.
220  *
221  * @param[in] pAgentContext Agent context for the MQTT connection.
222  * @param[in] pCommand Command to complete.
223  * @param[in] returnCode Return status of command.
224  * @param[in] pSubackCodes Pointer to suback array, if command is a SUBSCRIBE.
225  */
226 static void concludeCommand( const MQTTAgentContext_t * pAgentContext,
227                              MQTTAgentCommand_t * pCommand,
228                              MQTTStatus_t returnCode,
229                              uint8_t * pSubackCodes );
230 
231 /**
232  * @brief Resend QoS 1 and 2 publishes after resuming a session.
233  *
234  * @param[in] pMqttAgentContext Agent context for the MQTT connection.
235  *
236  * @return #MQTTSuccess if all publishes resent successfully, else error code
237  * from #MQTT_Publish.
238  */
239 static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext );
240 
241 /**
242  * @brief Clears the list of pending acknowledgments by invoking each callback
243  * with #MQTTRecvFailed either for ALL operations in the list OR only for
244  * Subscribe/Unsubscribe operations.
245  *
246  * @param[in] pMqttAgentContext Agent context of the MQTT connection.
247  * @param[in] clearOnlySubUnsubEntries Flag indicating whether all entries OR
248  * entries pertaining to only Subscribe/Unsubscribe operations should be cleaned
249  * from the list.
250  */
251 static void clearPendingAcknowledgments( MQTTAgentContext_t * pMqttAgentContext,
252                                          bool clearOnlySubUnsubEntries );
253 
254 /**
255  * @brief Validate an #MQTTAgentContext_t and a #MQTTAgentCommandInfo_t from API
256  * functions.
257  *
258  * @param[in] pMqttAgentContext #MQTTAgentContext_t to validate.
259  * @param[in] pCommandInfo #MQTTAgentCommandInfo_t to validate.
260  *
261  * @return `true` if parameters are valid, else `false`.
262  */
263 static bool validateStruct( const MQTTAgentContext_t * pMqttAgentContext,
264                             const MQTTAgentCommandInfo_t * pCommandInfo );
265 
266 /**
267  * @brief Validate the parameters for a CONNECT, SUBSCRIBE, UNSUBSCRIBE
268  * or PUBLISH.
269  *
270  * @param[in] commandType CONNECT, SUBSCRIBE, UNSUBSCRIBE, or PUBLISH.
271  * @param[in] pParams Parameter structure to validate.
272  *
273  * @return `true` if parameter structure is valid, else `false`.
274  */
275 static bool validateParams( MQTTAgentCommandType_t commandType,
276                             const void * pParams );
277 
278 /**
279  * @brief Called before accepting any PUBLISH or SUBSCRIBE messages to check
280  * there is space in the pending ACK list for the outgoing PUBLISH or SUBSCRIBE.
281  *
282  * @note Because the MQTT agent is inherently multi threaded, and this function
283  * is called from the context of the application task and not the MQTT agent
284  * task, this function can only return a best effort result.  It can definitely
285  * say if there is space for a new pending ACK when the function is called, but
286  * the case of space being exhausted when the agent executes a command that
287  * results in an ACK must still be handled.
288  *
289  * @param[in] pAgentContext Pointer to the context for the MQTT connection to
290  * which the PUBLISH or SUBSCRIBE message is to be sent.
291  *
292  * @return true if there is space in that MQTT connection's ACK list, otherwise
293  * false;
294  */
295 static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext );
296 
297 /*-----------------------------------------------------------*/
298 
isSpaceInPendingAckList(const MQTTAgentContext_t * pAgentContext)299 static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext )
300 {
301     const MQTTAgentAckInfo_t * pendingAcks;
302     bool spaceFound = false;
303     size_t i;
304 
305     assert( pAgentContext != NULL );
306 
307     pendingAcks = pAgentContext->pPendingAcks;
308 
309     /* Are there any open slots? */
310     for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
311     {
312         /* If the packetId is MQTT_PACKET_ID_INVALID then the array space is
313          * not in use. */
314         if( pendingAcks[ i ].packetId == MQTT_PACKET_ID_INVALID )
315         {
316             spaceFound = true;
317             break;
318         }
319     }
320 
321     return spaceFound;
322 }
323 
324 /*-----------------------------------------------------------*/
325 
addAwaitingOperation(MQTTAgentContext_t * pAgentContext,uint16_t packetId,MQTTAgentCommand_t * pCommand)326 static MQTTStatus_t addAwaitingOperation( MQTTAgentContext_t * pAgentContext,
327                                           uint16_t packetId,
328                                           MQTTAgentCommand_t * pCommand )
329 {
330     size_t i = 0, unusedPos = MQTT_AGENT_MAX_OUTSTANDING_ACKS;
331     MQTTStatus_t status = MQTTNoMemory;
332     MQTTAgentAckInfo_t * pendingAcks = NULL;
333 
334     assert( pAgentContext != NULL );
335     assert( pCommand != NULL );
336     assert( packetId != MQTT_PACKET_ID_INVALID );
337     pendingAcks = pAgentContext->pPendingAcks;
338 
339     /* Before adding the record for the pending acknowledgement of the packet ID,
340      * make sure that there doesn't already exist an entry for the same packet ID.
341      * Also, as part of iterating through the list of pending acknowledgements,
342      * find an unused space for the packet ID to be added, if it can be. */
343     for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
344     {
345         /* If the packetId is MQTT_PACKET_ID_INVALID then the array space is not in
346          * use. */
347         if( ( unusedPos == MQTT_AGENT_MAX_OUTSTANDING_ACKS ) &&
348             ( pendingAcks[ i ].packetId == MQTT_PACKET_ID_INVALID ) )
349         {
350             unusedPos = i;
351             status = MQTTSuccess;
352         }
353 
354         if( pendingAcks[ i ].packetId == packetId )
355         {
356             /* Check whether there exists a duplicate entry for pending
357              * acknowledgment for the same packet ID that we want to add to
358              * the list.
359              * Note: This is an unlikely edge case which represents that a packet ID
360              * didn't receive acknowledgment, but subsequent SUBSCRIBE/PUBLISH operations
361              * representing 65535 packet IDs were successful that caused the bit packet
362              * ID value to wrap around and reached the same packet ID as that was still
363              * pending acknowledgment.
364              */
365             status = MQTTStateCollision;
366             LogError( ( "Failed to add operation to list of pending acknowledgments: "
367                         "Existing entry found for same packet: PacketId=%u\n", packetId ) );
368             break;
369         }
370     }
371 
372     /* Add the packet ID to the list if there is space available, and there is no
373      * duplicate entry for the same packet ID found. */
374     if( status == MQTTSuccess )
375     {
376         pendingAcks[ unusedPos ].packetId = packetId;
377         pendingAcks[ unusedPos ].pOriginalCommand = pCommand;
378     }
379     else if( status == MQTTNoMemory )
380     {
381         LogError( ( "Failed to add operation to list of pending acknowledgments: "
382                     "No memory available: PacketId=%u\n", packetId ) );
383     }
384     else
385     {
386         /* Empty else MISRA 15.7 */
387     }
388 
389     return status;
390 }
391 
392 /*-----------------------------------------------------------*/
393 
getAwaitingOperation(MQTTAgentContext_t * pAgentContext,uint16_t incomingPacketId)394 static MQTTAgentAckInfo_t * getAwaitingOperation( MQTTAgentContext_t * pAgentContext,
395                                                   uint16_t incomingPacketId )
396 {
397     size_t i = 0;
398     MQTTAgentAckInfo_t * pFoundAck = NULL;
399 
400     assert( pAgentContext != NULL );
401 
402     /* Look through the array of packet IDs that are still waiting to be acked to
403      * find one with incomingPacketId. */
404     for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
405     {
406         if( pAgentContext->pPendingAcks[ i ].packetId == incomingPacketId )
407         {
408             pFoundAck = &( pAgentContext->pPendingAcks[ i ] );
409             break;
410         }
411     }
412 
413     if( pFoundAck == NULL )
414     {
415         LogError( ( "No ack found for packet id %u.\n", incomingPacketId ) );
416     }
417     else if( ( pFoundAck->pOriginalCommand == NULL ) || ( pFoundAck->packetId == 0U ) )
418     {
419         LogError( ( "Found ack had empty fields. PacketId=%hu, Original Command=%p",
420                     ( unsigned short ) pFoundAck->packetId,
421                     ( void * ) pFoundAck->pOriginalCommand ) );
422         ( void ) memset( pFoundAck, 0x00, sizeof( MQTTAgentAckInfo_t ) );
423         pFoundAck = NULL;
424     }
425     else
426     {
427         /* Empty else MISRA 15.7 */
428     }
429 
430     return pFoundAck;
431 }
432 
433 /*-----------------------------------------------------------*/
434 
createCommand(MQTTAgentCommandType_t commandType,const MQTTAgentContext_t * pMqttAgentContext,void * pMqttInfoParam,MQTTAgentCommandCallback_t commandCompleteCallback,MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,MQTTAgentCommand_t * pCommand)435 static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
436                                    const MQTTAgentContext_t * pMqttAgentContext,
437                                    void * pMqttInfoParam,
438                                    MQTTAgentCommandCallback_t commandCompleteCallback,
439                                    MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
440                                    MQTTAgentCommand_t * pCommand )
441 {
442     bool isValid, isSpace = true;
443     MQTTStatus_t statusReturn;
444     const MQTTPublishInfo_t * pPublishInfo;
445     size_t uxHeaderBytes;
446     const size_t uxControlAndLengthBytes = ( size_t ) 4; /* Control, remaining length and length bytes. */
447 
448     assert( pMqttAgentContext != NULL );
449     assert( pCommand != NULL );
450 
451     ( void ) memset( pCommand, 0x00, sizeof( MQTTAgentCommand_t ) );
452 
453     /* Determine if required parameters are present in context. */
454     switch( commandType )
455     {
456         case SUBSCRIBE:
457         case UNSUBSCRIBE:
458             assert( pMqttInfoParam != NULL );
459 
460             /* This message type results in the broker returning an ACK.  The
461              * agent maintains an array of outstanding ACK messages.  See if
462              * the array contains space for another outstanding ack. */
463             isSpace = isSpaceInPendingAckList( pMqttAgentContext );
464 
465             isValid = isSpace;
466 
467             break;
468 
469         case PUBLISH:
470             pPublishInfo = ( const MQTTPublishInfo_t * ) pMqttInfoParam;
471 
472             /* Calculate the space consumed by everything other than the
473              * payload. */
474             uxHeaderBytes = uxControlAndLengthBytes;
475             uxHeaderBytes += pPublishInfo->topicNameLength;
476 
477             /* This message type results in the broker returning an ACK. The
478              * agent maintains an array of outstanding ACK messages.  See if
479              * the array contains space for another outstanding ack.  QoS0
480              * publish does not result in an ack so it doesn't matter if
481              * there is no space in the ACK array. */
482             if( pPublishInfo->qos != MQTTQoS0 )
483             {
484                 isSpace = isSpaceInPendingAckList( pMqttAgentContext );
485             }
486 
487             /* Will the message fit in the defined buffer? */
488             isValid = ( uxHeaderBytes < pMqttAgentContext->mqttContext.networkBuffer.size ) &&
489                       ( isSpace == true );
490 
491             break;
492 
493         case PROCESSLOOP:
494         case PING:
495         case CONNECT:
496         case DISCONNECT:
497         default:
498             /* Other operations don't need to store ACKs. */
499             isValid = true;
500             break;
501     }
502 
503     if( isValid )
504     {
505         pCommand->commandType = commandType;
506         pCommand->pArgs = pMqttInfoParam;
507         pCommand->pCmdContext = pCommandCompleteCallbackContext;
508         pCommand->pCommandCompleteCallback = commandCompleteCallback;
509     }
510 
511     statusReturn = ( isValid ) ? MQTTSuccess : MQTTBadParameter;
512 
513     if( ( statusReturn == MQTTBadParameter ) && ( isSpace == false ) )
514     {
515         /* The error was caused not by a bad parameter, but because there was
516          * no room in the pending Ack list for the Ack response to an outgoing
517          * PUBLISH or SUBSCRIBE message. */
518         statusReturn = MQTTNoMemory;
519     }
520 
521     return statusReturn;
522 }
523 
524 /*-----------------------------------------------------------*/
525 
addCommandToQueue(const MQTTAgentContext_t * pAgentContext,MQTTAgentCommand_t * pCommand,uint32_t blockTimeMs)526 static MQTTStatus_t addCommandToQueue( const MQTTAgentContext_t * pAgentContext,
527                                        MQTTAgentCommand_t * pCommand,
528                                        uint32_t blockTimeMs )
529 {
530     bool queueStatus;
531 
532     assert( pAgentContext != NULL );
533     assert( pCommand != NULL );
534 
535     /* The application called an API function.  The API function was validated and
536      * packed into a MQTTAgentCommand_t structure.  Now post a reference to the MQTTAgentCommand_t
537      * structure to the MQTT agent for processing. */
538     queueStatus = pAgentContext->agentInterface.send(
539         pAgentContext->agentInterface.pMsgCtx,
540         &pCommand,
541         blockTimeMs
542         );
543 
544     return ( queueStatus ) ? MQTTSuccess : MQTTSendFailed;
545 }
546 
547 /*-----------------------------------------------------------*/
548 
processCommand(MQTTAgentContext_t * pMqttAgentContext,MQTTAgentCommand_t * pCommand,bool * pEndLoop)549 static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
550                                     MQTTAgentCommand_t * pCommand,
551                                     bool * pEndLoop )
552 {
553     const MQTTAgentCommandFunc_t pCommandFunctionTable[ NUM_COMMANDS ] = MQTT_AGENT_FUNCTION_TABLE;
554     MQTTStatus_t operationStatus = MQTTSuccess;
555     bool ackAdded = false;
556     MQTTAgentCommandFunc_t commandFunction = NULL;
557     void * pCommandArgs = NULL;
558     MQTTAgentCommandFuncReturns_t commandOutParams = { 0 };
559 
560     assert( pMqttAgentContext != NULL );
561     assert( pEndLoop != NULL );
562 
563     if( pCommand != NULL )
564     {
565         assert( pCommand->commandType < NUM_COMMANDS );
566 
567         if( ( pCommand->commandType >= NONE ) && ( pCommand->commandType < NUM_COMMANDS ) )
568         {
569             commandFunction = pCommandFunctionTable[ pCommand->commandType ];
570             pCommandArgs = pCommand->pArgs;
571         }
572         else
573         {
574             LogWarn( ( "An incorrect command type was received by the processCommand function."
575                        " Type = %d.", pCommand->commandType ) );
576             commandFunction = pCommandFunctionTable[ NONE ];
577         }
578     }
579     else
580     {
581         commandFunction = pCommandFunctionTable[ NONE ];
582     }
583 
584     operationStatus = commandFunction( pMqttAgentContext, pCommandArgs, &commandOutParams );
585 
586     if( ( operationStatus == MQTTSuccess ) &&
587         commandOutParams.addAcknowledgment &&
588         ( commandOutParams.packetId != MQTT_PACKET_ID_INVALID ) )
589     {
590         operationStatus = addAwaitingOperation( pMqttAgentContext, commandOutParams.packetId, pCommand );
591         ackAdded = ( operationStatus == MQTTSuccess );
592     }
593 
594     if( ( pCommand != NULL ) && ( ackAdded != true ) )
595     {
596         /* The command is complete, call the callback. */
597         concludeCommand( pMqttAgentContext, pCommand, operationStatus, NULL );
598     }
599 
600     /* Run the process loop if there were no errors and the MQTT connection
601      * still exists. */
602     if( ( operationStatus == MQTTSuccess ) && commandOutParams.runProcessLoop )
603     {
604         do
605         {
606             pMqttAgentContext->packetReceivedInLoop = false;
607 
608             if( ( ( operationStatus == MQTTSuccess ) || ( operationStatus == MQTTNeedMoreBytes ) ) &&
609                 ( pMqttAgentContext->mqttContext.connectStatus == MQTTConnected ) )
610             {
611                 operationStatus = MQTT_ProcessLoop( &( pMqttAgentContext->mqttContext ) );
612             }
613         } while( pMqttAgentContext->packetReceivedInLoop );
614     }
615 
616     /* Set the flag to break from the command loop. */
617     *pEndLoop = ( commandOutParams.endLoop || ( operationStatus != MQTTSuccess ) );
618 
619     return operationStatus;
620 }
621 
622 /*-----------------------------------------------------------*/
623 
handleAcks(const MQTTAgentContext_t * pAgentContext,const MQTTPacketInfo_t * pPacketInfo,const MQTTDeserializedInfo_t * pDeserializedInfo,MQTTAgentAckInfo_t * pAckInfo,uint8_t packetType)624 static void handleAcks( const MQTTAgentContext_t * pAgentContext,
625                         const MQTTPacketInfo_t * pPacketInfo,
626                         const MQTTDeserializedInfo_t * pDeserializedInfo,
627                         MQTTAgentAckInfo_t * pAckInfo,
628                         uint8_t packetType )
629 {
630     uint8_t * pSubackCodes = NULL;
631 
632     assert( pAckInfo != NULL );
633     assert( pAckInfo->pOriginalCommand != NULL );
634 
635     /* A SUBACK's status codes start 2 bytes after the variable header. */
636     pSubackCodes = ( packetType == MQTT_PACKET_TYPE_SUBACK ) ? ( pPacketInfo->pRemainingData + 2U ) : NULL;
637 
638     concludeCommand( pAgentContext,
639                      pAckInfo->pOriginalCommand,
640                      pDeserializedInfo->deserializationResult,
641                      pSubackCodes );
642 
643     /* Clear the entry from the list. */
644     ( void ) memset( pAckInfo, 0x00, sizeof( MQTTAgentAckInfo_t ) );
645 }
646 
647 /*-----------------------------------------------------------*/
648 
getAgentFromMQTTContext(MQTTContext_t * pMQTTContext)649 static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContext )
650 {
651     MQTTAgentContext_t ctx = { 0 };
652     ptrdiff_t offset = ( ( uint8_t * ) &( ctx.mqttContext ) ) - ( ( uint8_t * ) &ctx );
653 
654     return ( MQTTAgentContext_t * ) &( ( ( uint8_t * ) pMQTTContext )[ 0 - offset ] );
655 }
656 
657 /*-----------------------------------------------------------*/
658 
mqttEventCallback(MQTTContext_t * pMqttContext,MQTTPacketInfo_t * pPacketInfo,MQTTDeserializedInfo_t * pDeserializedInfo)659 static void mqttEventCallback( MQTTContext_t * pMqttContext,
660                                MQTTPacketInfo_t * pPacketInfo,
661                                MQTTDeserializedInfo_t * pDeserializedInfo )
662 {
663     MQTTAgentAckInfo_t * pAckInfo;
664     uint16_t packetIdentifier = pDeserializedInfo->packetIdentifier;
665     MQTTAgentContext_t * pAgentContext;
666     const uint8_t upperNibble = ( uint8_t ) 0xF0;
667 
668     assert( pMqttContext != NULL );
669     assert( pPacketInfo != NULL );
670 
671     pAgentContext = getAgentFromMQTTContext( pMqttContext );
672 
673     /* This callback executes from within MQTT_ProcessLoop().  Setting this flag
674      * indicates that the callback executed so the caller of MQTT_ProcessLoop() knows
675      * it should call it again as there may be more data to process. */
676     pAgentContext->packetReceivedInLoop = true;
677 
678     /* Handle incoming publish. The lower 4 bits of the publish packet type is used
679      * for the dup, QoS, and retain flags. Hence masking out the lower bits to check
680      * if the packet is publish. */
681     if( ( pPacketInfo->type & upperNibble ) == MQTT_PACKET_TYPE_PUBLISH )
682     {
683         pAgentContext->pIncomingCallback( pAgentContext, packetIdentifier, pDeserializedInfo->pPublishInfo );
684     }
685     else
686     {
687         /* Handle other packets. */
688         switch( pPacketInfo->type )
689         {
690             case MQTT_PACKET_TYPE_PUBACK:
691             case MQTT_PACKET_TYPE_PUBCOMP:
692             case MQTT_PACKET_TYPE_SUBACK:
693             case MQTT_PACKET_TYPE_UNSUBACK:
694                 pAckInfo = getAwaitingOperation( pAgentContext, packetIdentifier );
695 
696                 if( pAckInfo != NULL )
697                 {
698                     /* This function will also clear the memory associated with
699                      * the ack list entry. */
700                     handleAcks( pAgentContext,
701                                 pPacketInfo,
702                                 pDeserializedInfo,
703                                 pAckInfo,
704                                 pPacketInfo->type );
705                 }
706                 else
707                 {
708                     LogError( ( "No operation found matching packet id %u.\n", packetIdentifier ) );
709                 }
710 
711                 break;
712 
713             /* Nothing to do for these packets since they don't indicate command completion. */
714             case MQTT_PACKET_TYPE_PUBREC:
715             case MQTT_PACKET_TYPE_PUBREL:
716                 break;
717 
718             /* Any other packet type is invalid. */
719             case MQTT_PACKET_TYPE_PINGRESP:
720             default:
721                 LogError( ( "Unknown packet type received:(%02x).\n",
722                             pPacketInfo->type ) );
723                 break;
724         }
725     }
726 }
727 
728 /*-----------------------------------------------------------*/
729 
createAndAddCommand(MQTTAgentCommandType_t commandType,const MQTTAgentContext_t * pMqttAgentContext,void * pMqttInfoParam,MQTTAgentCommandCallback_t commandCompleteCallback,MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,uint32_t blockTimeMs)730 static MQTTStatus_t createAndAddCommand( MQTTAgentCommandType_t commandType,
731                                          const MQTTAgentContext_t * pMqttAgentContext,
732                                          void * pMqttInfoParam,
733                                          MQTTAgentCommandCallback_t commandCompleteCallback,
734                                          MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
735                                          uint32_t blockTimeMs )
736 {
737     MQTTStatus_t statusReturn = MQTTBadParameter;
738     MQTTAgentCommand_t * pCommand;
739     bool commandReleased = false;
740 
741     /* If the packet ID is zero then the MQTT context has not been initialized as 0
742      * is the initial value but not a valid packet ID. */
743     if( pMqttAgentContext->mqttContext.nextPacketId != MQTT_PACKET_ID_INVALID )
744     {
745         pCommand = pMqttAgentContext->agentInterface.getCommand( blockTimeMs );
746 
747         if( pCommand != NULL )
748         {
749             statusReturn = createCommand( commandType,
750                                           pMqttAgentContext,
751                                           pMqttInfoParam,
752                                           commandCompleteCallback,
753                                           pCommandCompleteCallbackContext,
754                                           pCommand );
755 
756             if( statusReturn == MQTTSuccess )
757             {
758                 statusReturn = addCommandToQueue( pMqttAgentContext, pCommand, blockTimeMs );
759             }
760 
761             if( statusReturn != MQTTSuccess )
762             {
763                 /* Could not send the command to the queue so release the command
764                  * structure again. */
765                 commandReleased = pMqttAgentContext->agentInterface.releaseCommand( pCommand );
766 
767                 if( !commandReleased )
768                 {
769                     LogError( ( "Command %p could not be released.",
770                                 ( void * ) pCommand ) );
771                 }
772             }
773         }
774         else
775         {
776             /* Ran out of MQTTAgentCommand_t structures - pool is empty. */
777             statusReturn = MQTTNoMemory;
778         }
779     }
780     else
781     {
782         LogError( ( "MQTT context must be initialized." ) );
783     }
784 
785     return statusReturn;
786 }
787 
788 /*-----------------------------------------------------------*/
789 
concludeCommand(const MQTTAgentContext_t * pAgentContext,MQTTAgentCommand_t * pCommand,MQTTStatus_t returnCode,uint8_t * pSubackCodes)790 static void concludeCommand( const MQTTAgentContext_t * pAgentContext,
791                              MQTTAgentCommand_t * pCommand,
792                              MQTTStatus_t returnCode,
793                              uint8_t * pSubackCodes )
794 {
795     bool commandReleased = false;
796     MQTTAgentReturnInfo_t returnInfo;
797 
798     ( void ) memset( &returnInfo, 0x00, sizeof( MQTTAgentReturnInfo_t ) );
799     assert( pAgentContext != NULL );
800     assert( pAgentContext->agentInterface.releaseCommand != NULL );
801     assert( pCommand != NULL );
802 
803     returnInfo.returnCode = returnCode;
804     returnInfo.pSubackCodes = pSubackCodes;
805 
806     if( pCommand->pCommandCompleteCallback != NULL )
807     {
808         pCommand->pCommandCompleteCallback( pCommand->pCmdContext, &returnInfo );
809     }
810 
811     commandReleased = pAgentContext->agentInterface.releaseCommand( pCommand );
812 
813     if( !commandReleased )
814     {
815         LogError( ( "Failed to release command %p of type %d.",
816                     ( void * ) pCommand,
817                     pCommand->commandType ) );
818     }
819 }
820 
821 /*-----------------------------------------------------------*/
822 
resendPublishes(MQTTAgentContext_t * pMqttAgentContext)823 static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext )
824 {
825     MQTTStatus_t statusResult = MQTTSuccess;
826     MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
827     uint16_t packetId = MQTT_PACKET_ID_INVALID;
828     MQTTAgentAckInfo_t * pFoundAck = NULL;
829     MQTTPublishInfo_t * pOriginalPublish = NULL;
830     MQTTContext_t * pMqttContext;
831 
832     assert( pMqttAgentContext != NULL );
833     pMqttContext = &( pMqttAgentContext->mqttContext );
834 
835     packetId = MQTT_PublishToResend( pMqttContext, &cursor );
836 
837     while( packetId != MQTT_PACKET_ID_INVALID )
838     {
839         /* Retrieve the operation but do not remove it from the list. */
840         pFoundAck = getAwaitingOperation( pMqttAgentContext, packetId );
841 
842         if( pFoundAck != NULL )
843         {
844             /* Set the DUP flag. */
845             pOriginalPublish = ( MQTTPublishInfo_t * ) ( pFoundAck->pOriginalCommand->pArgs );
846             pOriginalPublish->dup = true;
847             statusResult = MQTT_Publish( pMqttContext, pOriginalPublish, packetId );
848 
849             if( statusResult != MQTTSuccess )
850             {
851                 concludeCommand( pMqttAgentContext, pFoundAck->pOriginalCommand, statusResult, NULL );
852                 ( void ) memset( pFoundAck, 0x00, sizeof( MQTTAgentAckInfo_t ) );
853                 LogError( ( "Failed to resend publishes. Error code=%s\n", MQTT_Status_strerror( statusResult ) ) );
854                 break;
855             }
856         }
857 
858         packetId = MQTT_PublishToResend( pMqttContext, &cursor );
859     }
860 
861     return statusResult;
862 }
863 
864 /*-----------------------------------------------------------*/
865 
clearPendingAcknowledgments(MQTTAgentContext_t * pMqttAgentContext,bool clearOnlySubUnsubEntries)866 static void clearPendingAcknowledgments( MQTTAgentContext_t * pMqttAgentContext,
867                                          bool clearOnlySubUnsubEntries )
868 {
869     size_t i = 0;
870     MQTTAgentAckInfo_t * pendingAcks;
871 
872     assert( pMqttAgentContext != NULL );
873 
874     pendingAcks = pMqttAgentContext->pPendingAcks;
875 
876     /* Clear all operations pending acknowledgments. */
877     for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
878     {
879         if( pendingAcks[ i ].packetId != MQTT_PACKET_ID_INVALID )
880         {
881             bool clearEntry = true;
882 
883             assert( pendingAcks[ i ].pOriginalCommand != NULL );
884 
885             if( clearOnlySubUnsubEntries &&
886                 ( pendingAcks[ i ].pOriginalCommand->commandType != SUBSCRIBE ) &&
887                 ( pendingAcks[ i ].pOriginalCommand->commandType != UNSUBSCRIBE ) )
888             {
889                 clearEntry = false;
890             }
891 
892             if( clearEntry )
893             {
894                 /* Receive failed to indicate network error. */
895                 concludeCommand( pMqttAgentContext, pendingAcks[ i ].pOriginalCommand, MQTTRecvFailed, NULL );
896 
897                 /* Now remove it from the list. */
898                 ( void ) memset( &( pendingAcks[ i ] ), 0x00, sizeof( MQTTAgentAckInfo_t ) );
899             }
900         }
901     }
902 }
903 
904 /*-----------------------------------------------------------*/
905 
validateStruct(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)906 static bool validateStruct( const MQTTAgentContext_t * pMqttAgentContext,
907                             const MQTTAgentCommandInfo_t * pCommandInfo )
908 {
909     bool ret = false;
910 
911     if( ( pMqttAgentContext == NULL ) ||
912         ( pCommandInfo == NULL ) )
913     {
914         LogError( ( "Pointer cannot be NULL. pMqttAgentContext=%p, pCommandInfo=%p.",
915                     ( void * ) pMqttAgentContext,
916                     ( void * ) pCommandInfo ) );
917     }
918     else if( ( pMqttAgentContext->agentInterface.send == NULL ) ||
919              ( pMqttAgentContext->agentInterface.recv == NULL ) ||
920              ( pMqttAgentContext->agentInterface.getCommand == NULL ) ||
921              ( pMqttAgentContext->agentInterface.releaseCommand == NULL ) ||
922              ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
923     {
924         LogError( ( "pMqttAgentContext must have initialized its messaging interface." ) );
925     }
926     else
927     {
928         ret = true;
929     }
930 
931     return ret;
932 }
933 
934 /*-----------------------------------------------------------*/
935 
validateParams(MQTTAgentCommandType_t commandType,const void * pParams)936 static bool validateParams( MQTTAgentCommandType_t commandType,
937                             const void * pParams )
938 {
939     bool ret = false;
940     const MQTTAgentConnectArgs_t * pConnectArgs = NULL;
941     const MQTTAgentSubscribeArgs_t * pSubscribeArgs = NULL;
942 
943     assert( ( commandType == CONNECT ) || ( commandType == PUBLISH ) ||
944             ( commandType == SUBSCRIBE ) || ( commandType == UNSUBSCRIBE ) );
945 
946     switch( commandType )
947     {
948         case CONNECT:
949             pConnectArgs = ( const MQTTAgentConnectArgs_t * ) pParams;
950             ret = ( ( pConnectArgs != NULL ) &&
951                     ( pConnectArgs->pConnectInfo != NULL ) );
952             break;
953 
954         case SUBSCRIBE:
955         case UNSUBSCRIBE:
956             pSubscribeArgs = ( const MQTTAgentSubscribeArgs_t * ) pParams;
957             ret = ( ( pSubscribeArgs != NULL ) &&
958                     ( pSubscribeArgs->pSubscribeInfo != NULL ) &&
959                     ( pSubscribeArgs->numSubscriptions != 0U ) );
960             break;
961 
962         case PUBLISH:
963         default:
964             /* Publish, does not need to be cast since we do not check it. */
965             ret = ( pParams != NULL );
966             break;
967     }
968 
969     return ret;
970 }
971 
972 /*-----------------------------------------------------------*/
973 
MQTTAgent_Init(MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentMessageInterface_t * pMsgInterface,const MQTTFixedBuffer_t * pNetworkBuffer,const TransportInterface_t * pTransportInterface,MQTTGetCurrentTimeFunc_t getCurrentTimeMs,MQTTAgentIncomingPublishCallback_t incomingCallback,void * pIncomingPacketContext)974 MQTTStatus_t MQTTAgent_Init( MQTTAgentContext_t * pMqttAgentContext,
975                              const MQTTAgentMessageInterface_t * pMsgInterface,
976                              const MQTTFixedBuffer_t * pNetworkBuffer,
977                              const TransportInterface_t * pTransportInterface,
978                              MQTTGetCurrentTimeFunc_t getCurrentTimeMs,
979                              MQTTAgentIncomingPublishCallback_t incomingCallback,
980                              void * pIncomingPacketContext )
981 {
982     MQTTStatus_t returnStatus;
983 
984     if( ( pMqttAgentContext == NULL ) ||
985         ( pMsgInterface == NULL ) ||
986         ( pTransportInterface == NULL ) ||
987         ( getCurrentTimeMs == NULL ) ||
988         ( incomingCallback == NULL ) )
989     {
990         returnStatus = MQTTBadParameter;
991     }
992     else if( ( pMsgInterface->pMsgCtx == NULL ) ||
993              ( pMsgInterface->send == NULL ) ||
994              ( pMsgInterface->recv == NULL ) ||
995              ( pMsgInterface->getCommand == NULL ) ||
996              ( pMsgInterface->releaseCommand == NULL ) )
997     {
998         LogError( ( "Invalid parameter: pMsgInterface must set all members." ) );
999         returnStatus = MQTTBadParameter;
1000     }
1001     else
1002     {
1003         ( void ) memset( pMqttAgentContext, 0x00, sizeof( MQTTAgentContext_t ) );
1004 
1005         returnStatus = MQTT_Init( &( pMqttAgentContext->mqttContext ),
1006                                   pTransportInterface,
1007                                   getCurrentTimeMs,
1008                                   mqttEventCallback,
1009                                   pNetworkBuffer );
1010 
1011         #if ( MQTT_AGENT_USE_QOS_1_2_PUBLISH != 0 )
1012             {
1013                 if( returnStatus == MQTTSuccess )
1014                 {
1015                     returnStatus = MQTT_InitStatefulQoS( &( pMqttAgentContext->mqttContext ),
1016                                                          pOutgoingPublishRecords,
1017                                                          MQTT_AGENT_MAX_OUTSTANDING_ACKS,
1018                                                          pIncomingPublishRecords,
1019                                                          MQTT_AGENT_MAX_OUTSTANDING_ACKS );
1020                 }
1021             }
1022         #endif /* if ( MQTT_AGENT_USE_QOS_1_2_PUBLISH != 0 ) */
1023 
1024         if( returnStatus == MQTTSuccess )
1025         {
1026             pMqttAgentContext->pIncomingCallback = incomingCallback;
1027             pMqttAgentContext->pIncomingCallbackContext = pIncomingPacketContext;
1028             pMqttAgentContext->agentInterface = *pMsgInterface;
1029         }
1030     }
1031 
1032     return returnStatus;
1033 }
1034 
1035 /*-----------------------------------------------------------*/
1036 
MQTTAgent_CommandLoop(MQTTAgentContext_t * pMqttAgentContext)1037 MQTTStatus_t MQTTAgent_CommandLoop( MQTTAgentContext_t * pMqttAgentContext )
1038 {
1039     MQTTAgentCommand_t * pCommand;
1040     MQTTStatus_t operationStatus = MQTTSuccess;
1041     bool endLoop = false;
1042 
1043     /* The command queue should have been created before this task gets created. */
1044     if( ( pMqttAgentContext == NULL ) || ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
1045     {
1046         operationStatus = MQTTBadParameter;
1047     }
1048 
1049     /* Loop until an error or we receive a terminate command. */
1050     while( operationStatus == MQTTSuccess )
1051     {
1052         /* Wait for the next command, if any. */
1053         pCommand = NULL;
1054         ( void ) pMqttAgentContext->agentInterface.recv(
1055             pMqttAgentContext->agentInterface.pMsgCtx,
1056             &( pCommand ),
1057             MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME
1058             );
1059         operationStatus = processCommand( pMqttAgentContext, pCommand, &endLoop );
1060 
1061         if( operationStatus != MQTTSuccess )
1062         {
1063             LogError( ( "MQTT operation failed with status %s\n",
1064                         MQTT_Status_strerror( operationStatus ) ) );
1065         }
1066 
1067         /* Terminate the loop on disconnects, errors, or the termination command. */
1068         if( endLoop )
1069         {
1070             break;
1071         }
1072     }
1073 
1074     return operationStatus;
1075 }
1076 
1077 /*-----------------------------------------------------------*/
1078 
MQTTAgent_ResumeSession(MQTTAgentContext_t * pMqttAgentContext,bool sessionPresent)1079 MQTTStatus_t MQTTAgent_ResumeSession( MQTTAgentContext_t * pMqttAgentContext,
1080                                       bool sessionPresent )
1081 {
1082     MQTTStatus_t statusResult = MQTTSuccess;
1083 
1084     /* If the packet ID is zero then the MQTT context has not been initialized as 0
1085      * is the initial value but not a valid packet ID. */
1086     if( ( pMqttAgentContext != NULL ) &&
1087         ( pMqttAgentContext->mqttContext.nextPacketId != MQTT_PACKET_ID_INVALID ) )
1088     {
1089         /* Resend publishes if session is present. NOTE: It's possible that some
1090          * of the operations that were in progress during the network interruption
1091          * were subscribes. In that case, we would want to mark those operations
1092          * as completing with error and remove them from the list of operations, so
1093          * that the calling task can try subscribing again. */
1094         if( sessionPresent )
1095         {
1096             /* The session has resumed, so clear any SUBSCRIBE/UNSUBSCRIBE operations
1097              * that were pending acknowledgments in the previous connection. */
1098             clearPendingAcknowledgments( pMqttAgentContext, true );
1099 
1100             statusResult = resendPublishes( pMqttAgentContext );
1101         }
1102 
1103         /* If we wanted to resume a session but none existed with the broker, we
1104          * should mark all in progress operations as errors so that the tasks that
1105          * created them can try again. */
1106         else
1107         {
1108             /* We have a clean session, so clear all operations pending acknowledgments. */
1109             clearPendingAcknowledgments( pMqttAgentContext, false );
1110         }
1111     }
1112     else
1113     {
1114         statusResult = MQTTBadParameter;
1115     }
1116 
1117     return statusResult;
1118 }
1119 
1120 /*-----------------------------------------------------------*/
1121 
MQTTAgent_CancelAll(MQTTAgentContext_t * pMqttAgentContext)1122 MQTTStatus_t MQTTAgent_CancelAll( MQTTAgentContext_t * pMqttAgentContext )
1123 {
1124     MQTTStatus_t statusReturn = MQTTSuccess;
1125     MQTTAgentCommand_t * pReceivedCommand = NULL;
1126     bool commandWasReceived = false;
1127     MQTTAgentAckInfo_t * pendingAcks;
1128     size_t i;
1129 
1130     if( ( pMqttAgentContext == NULL ) || ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
1131     {
1132         statusReturn = MQTTBadParameter;
1133     }
1134     else
1135     {
1136         /* Cancel all operations waiting in the queue. */
1137         do
1138         {
1139             pReceivedCommand = NULL;
1140             commandWasReceived = pMqttAgentContext->agentInterface.recv(
1141                 pMqttAgentContext->agentInterface.pMsgCtx,
1142                 &( pReceivedCommand ),
1143                 0U );
1144 
1145             if( pReceivedCommand != NULL )
1146             {
1147                 concludeCommand( pMqttAgentContext, pReceivedCommand, MQTTRecvFailed, NULL );
1148             }
1149         } while( commandWasReceived );
1150 
1151         pendingAcks = pMqttAgentContext->pPendingAcks;
1152 
1153         /* Cancel any operations awaiting an acknowledgment. */
1154         for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
1155         {
1156             if( pendingAcks[ i ].packetId != MQTT_PACKET_ID_INVALID )
1157             {
1158                 concludeCommand( pMqttAgentContext, pendingAcks[ i ].pOriginalCommand, MQTTRecvFailed, NULL );
1159 
1160                 /* Now remove it from the list. */
1161                 ( void ) memset( &( pendingAcks[ i ] ), 0x00, sizeof( MQTTAgentAckInfo_t ) );
1162             }
1163         }
1164     }
1165 
1166     return statusReturn;
1167 }
1168 
1169 /*-----------------------------------------------------------*/
1170 
MQTTAgent_Subscribe(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentSubscribeArgs_t * pSubscriptionArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1171 MQTTStatus_t MQTTAgent_Subscribe( const MQTTAgentContext_t * pMqttAgentContext,
1172                                   MQTTAgentSubscribeArgs_t * pSubscriptionArgs,
1173                                   const MQTTAgentCommandInfo_t * pCommandInfo )
1174 {
1175     MQTTStatus_t statusReturn = MQTTBadParameter;
1176     bool paramsValid = false;
1177 
1178     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1179                   validateParams( SUBSCRIBE, pSubscriptionArgs );
1180 
1181     if( paramsValid )
1182     {
1183         statusReturn = createAndAddCommand( SUBSCRIBE,                                 /* commandType */
1184                                             pMqttAgentContext,                         /* mqttContextHandle */
1185                                             pSubscriptionArgs,                         /* pMqttInfoParam */
1186                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1187                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1188                                             pCommandInfo->blockTimeMs );
1189     }
1190 
1191     return statusReturn;
1192 }
1193 
1194 /*-----------------------------------------------------------*/
1195 
MQTTAgent_Unsubscribe(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentSubscribeArgs_t * pSubscriptionArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1196 MQTTStatus_t MQTTAgent_Unsubscribe( const MQTTAgentContext_t * pMqttAgentContext,
1197                                     MQTTAgentSubscribeArgs_t * pSubscriptionArgs,
1198                                     const MQTTAgentCommandInfo_t * pCommandInfo )
1199 {
1200     MQTTStatus_t statusReturn = MQTTBadParameter;
1201     bool paramsValid = false;
1202 
1203     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1204                   validateParams( UNSUBSCRIBE, pSubscriptionArgs );
1205 
1206     if( paramsValid )
1207     {
1208         statusReturn = createAndAddCommand( UNSUBSCRIBE,                               /* commandType */
1209                                             pMqttAgentContext,                         /* mqttContextHandle */
1210                                             pSubscriptionArgs,                         /* pMqttInfoParam */
1211                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1212                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1213                                             pCommandInfo->blockTimeMs );
1214     }
1215 
1216     return statusReturn;
1217 }
1218 
1219 /*-----------------------------------------------------------*/
1220 
MQTTAgent_Publish(const MQTTAgentContext_t * pMqttAgentContext,MQTTPublishInfo_t * pPublishInfo,const MQTTAgentCommandInfo_t * pCommandInfo)1221 MQTTStatus_t MQTTAgent_Publish( const MQTTAgentContext_t * pMqttAgentContext,
1222                                 MQTTPublishInfo_t * pPublishInfo,
1223                                 const MQTTAgentCommandInfo_t * pCommandInfo )
1224 {
1225     MQTTStatus_t statusReturn = MQTTBadParameter;
1226     bool paramsValid = false;
1227 
1228     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1229                   validateParams( PUBLISH, pPublishInfo );
1230 
1231     if( paramsValid )
1232     {
1233         statusReturn = createAndAddCommand( PUBLISH,                                   /* commandType */
1234                                             pMqttAgentContext,                         /* mqttContextHandle */
1235                                             pPublishInfo,                              /* pMqttInfoParam */
1236                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1237                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1238                                             pCommandInfo->blockTimeMs );
1239     }
1240 
1241     return statusReturn;
1242 }
1243 
1244 /*-----------------------------------------------------------*/
1245 
MQTTAgent_ProcessLoop(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1246 MQTTStatus_t MQTTAgent_ProcessLoop( const MQTTAgentContext_t * pMqttAgentContext,
1247                                     const MQTTAgentCommandInfo_t * pCommandInfo )
1248 {
1249     MQTTStatus_t statusReturn = MQTTBadParameter;
1250     bool paramsValid = false;
1251 
1252     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1253 
1254     if( paramsValid )
1255     {
1256         statusReturn = createAndAddCommand( PROCESSLOOP,                               /* commandType */
1257                                             pMqttAgentContext,                         /* mqttContextHandle */
1258                                             NULL,                                      /* pMqttInfoParam */
1259                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1260                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1261                                             pCommandInfo->blockTimeMs );
1262     }
1263 
1264     return statusReturn;
1265 }
1266 
1267 /*-----------------------------------------------------------*/
1268 
MQTTAgent_Connect(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentConnectArgs_t * pConnectArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1269 MQTTStatus_t MQTTAgent_Connect( const MQTTAgentContext_t * pMqttAgentContext,
1270                                 MQTTAgentConnectArgs_t * pConnectArgs,
1271                                 const MQTTAgentCommandInfo_t * pCommandInfo )
1272 {
1273     MQTTStatus_t statusReturn = MQTTBadParameter;
1274     bool paramsValid = false;
1275 
1276     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1277                   validateParams( CONNECT, pConnectArgs );
1278 
1279     if( paramsValid )
1280     {
1281         statusReturn = createAndAddCommand( CONNECT,
1282                                             pMqttAgentContext,
1283                                             pConnectArgs,
1284                                             pCommandInfo->cmdCompleteCallback,
1285                                             pCommandInfo->pCmdCompleteCallbackContext,
1286                                             pCommandInfo->blockTimeMs );
1287     }
1288 
1289     return statusReturn;
1290 }
1291 
1292 /*-----------------------------------------------------------*/
1293 
MQTTAgent_Disconnect(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1294 MQTTStatus_t MQTTAgent_Disconnect( const MQTTAgentContext_t * pMqttAgentContext,
1295                                    const MQTTAgentCommandInfo_t * pCommandInfo )
1296 {
1297     MQTTStatus_t statusReturn = MQTTBadParameter;
1298     bool paramsValid = false;
1299 
1300     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1301 
1302     if( paramsValid )
1303     {
1304         statusReturn = createAndAddCommand( DISCONNECT,                                /* commandType */
1305                                             pMqttAgentContext,                         /* mqttContextHandle */
1306                                             NULL,                                      /* pMqttInfoParam */
1307                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1308                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1309                                             pCommandInfo->blockTimeMs );
1310     }
1311 
1312     return statusReturn;
1313 }
1314 
1315 /*-----------------------------------------------------------*/
1316 
MQTTAgent_Ping(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1317 MQTTStatus_t MQTTAgent_Ping( const MQTTAgentContext_t * pMqttAgentContext,
1318                              const MQTTAgentCommandInfo_t * pCommandInfo )
1319 {
1320     MQTTStatus_t statusReturn = MQTTBadParameter;
1321     bool paramsValid = false;
1322 
1323     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1324 
1325     if( paramsValid )
1326     {
1327         statusReturn = createAndAddCommand( PING,                                      /* commandType */
1328                                             pMqttAgentContext,                         /* mqttContextHandle */
1329                                             NULL,                                      /* pMqttInfoParam */
1330                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1331                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1332                                             pCommandInfo->blockTimeMs );
1333     }
1334 
1335     return statusReturn;
1336 }
1337 
1338 /*-----------------------------------------------------------*/
1339 
MQTTAgent_Terminate(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1340 MQTTStatus_t MQTTAgent_Terminate( const MQTTAgentContext_t * pMqttAgentContext,
1341                                   const MQTTAgentCommandInfo_t * pCommandInfo )
1342 {
1343     MQTTStatus_t statusReturn = MQTTBadParameter;
1344     bool paramsValid = false;
1345 
1346     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1347 
1348     if( paramsValid )
1349     {
1350         statusReturn = createAndAddCommand( TERMINATE,
1351                                             pMqttAgentContext,
1352                                             NULL,
1353                                             pCommandInfo->cmdCompleteCallback,
1354                                             pCommandInfo->pCmdCompleteCallbackContext,
1355                                             pCommandInfo->blockTimeMs );
1356     }
1357 
1358     return statusReturn;
1359 }
1360 
1361 /*-----------------------------------------------------------*/
1362