xref: /CoreMQTT-Agent-v1.1.0/source/core_mqtt_agent.c (revision c76bf4a7fea24d4200a48aa22cf91767b59c7dae)
1 /*
2  * coreMQTT Agent v1.1.0
3  * Copyright (C) 2021 Amazon.com, Inc. or its affiliates.  All Rights Reserved.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of
6  * this software and associated documentation files (the "Software"), to deal in
7  * the Software without restriction, including without limitation the rights to
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
9  * the Software, and to permit persons to whom the Software is furnished to do so,
10  * subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in all
13  * copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21  */
22 
23 /**
24  * @file core_mqtt_agent.c
25  * @brief Implements an MQTT agent (or daemon task) to enable multithreaded access to
26  * coreMQTT.
27  *
28  * @note Implements an MQTT agent (or daemon task) on top of the coreMQTT MQTT client
29  * library.  The agent makes coreMQTT usage thread safe by being the only task (or
30  * thread) in the system that is allowed to access the native coreMQTT API - and in
31  * so doing, serializes all access to coreMQTT even when multiple tasks are using the
32  * same MQTT connection.
33  *
34  * The agent provides an equivalent API for each coreMQTT API.  Whereas coreMQTT
35  * APIs are prefixed "MQTT_", the agent APIs are prefixed "MQTTAgent_".  For example,
36  * that agent's MQTTAgent_Publish() API is the thread safe equivalent to coreMQTT's
37  * MQTT_Publish() API.
38  */
39 
40 /* Standard includes. */
41 #include <string.h>
42 #include <stdio.h>
43 #include <assert.h>
44 
45 /* MQTT agent include. */
46 #include "core_mqtt_agent.h"
47 #include "core_mqtt_agent_command_functions.h"
48 
49 /*-----------------------------------------------------------*/
50 
51 /**
52  * @brief Track an operation by adding it to a list, indicating it is anticipating
53  * an acknowledgment.
54  *
55  * @param[in] pAgentContext Agent context for the MQTT connection.
56  * @param[in] packetId Packet ID of pending ack.
57  * @param[in] pCommand Pointer to command that is expecting an ack.
58  *
59  * @return Returns one of the following:
60  * - #MQTTSuccess if an entry was added for the to the list.
61  * - #MQTTStateCollision if there already exists an entry for the same packet ID
62  * in the list.
63  * - #MQTTNoMemory if there is no space available in the list for adding a
64  * new entry.
65  */
66 static MQTTStatus_t addAwaitingOperation( MQTTAgentContext_t * pAgentContext,
67                                           uint16_t packetId,
68                                           MQTTAgentCommand_t * pCommand );
69 
70 /**
71  * @brief Retrieve an operation from the list of pending acks, and optionally
72  * remove it from the list.
73  *
74  * @param[in] pAgentContext Agent context for the MQTT connection.
75  * @param[in] incomingPacketId Packet ID of incoming ack.
76  *
77  * @return Pointer to stored information about the operation awaiting the ack.
78  * Returns NULL if the packet ID is zero or original command does not exist.
79  */
80 static MQTTAgentAckInfo_t * getAwaitingOperation( MQTTAgentContext_t * pAgentContext,
81                                                   uint16_t incomingPacketId );
82 
83 /**
84  * @brief Populate the parameters of a #MQTTAgentCommand struct.
85  *
86  * @param[in] commandType Type of command.  For example, publish or subscribe.
87  * @param[in] pMqttAgentContext Pointer to MQTT context to use for command.
88  * @param[in] pMqttInfoParam Pointer to MQTTPublishInfo_t or MQTTSubscribeInfo_t.
89  * @param[in] commandCompleteCallback Callback for when command completes.
90  * @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
91  * @param[out] pCommand Pointer to initialized command.
92  *
93  * @return #MQTTSuccess if all necessary fields for the command are passed,
94  * else an enumerated error code.
95  */
96 static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
97                                    const MQTTAgentContext_t * pMqttAgentContext,
98                                    void * pMqttInfoParam,
99                                    MQTTAgentCommandCallback_t commandCompleteCallback,
100                                    MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
101                                    MQTTAgentCommand_t * pCommand );
102 
103 /**
104  * @brief Add a command to the global command queue.
105  *
106  * @param[in] pAgentContext Agent context for the MQTT connection.
107  * @param[in] pCommand Pointer to command to copy to queue.
108  * @param[in] blockTimeMs The maximum amount of time to milliseconds to wait in the
109  * Blocked state (so not consuming any CPU time) for the command to be posted to the
110  * queue should the queue already be full.
111  *
112  * @return #MQTTSuccess if the command was added to the queue, else an enumerated
113  * error code.
114  */
115 static MQTTStatus_t addCommandToQueue( const MQTTAgentContext_t * pAgentContext,
116                                        MQTTAgentCommand_t * pCommand,
117                                        uint32_t blockTimeMs );
118 
119 /**
120  * @brief Process a #MQTTAgentCommand struct.
121  *
122  * @note This agent does not check existing subscriptions before sending a
123  * SUBSCRIBE or UNSUBSCRIBE packet. If a subscription already exists, then
124  * a SUBSCRIBE packet will be sent anyway, and if multiple tasks are subscribed
125  * to a topic filter, then they will all be unsubscribed after an UNSUBSCRIBE.
126  *
127  * @param[in] pMqttAgentContext Agent context for MQTT connection.
128  * @param[in] pCommand Pointer to command to process.
129  * @param[out] pEndLoop Whether the command loop should terminate.
130  *
131  * @return status of MQTT library API call.
132  */
133 static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
134                                     MQTTAgentCommand_t * pCommand,
135                                     bool * pEndLoop );
136 
137 /**
138  * @brief Dispatch incoming publishes and acks to their various handler functions.
139  *
140  * @param[in] pMqttContext MQTT Context
141  * @param[in] pPacketInfo Pointer to incoming packet.
142  * @param[in] pDeserializedInfo Pointer to deserialized information from
143  * the incoming packet.
144  */
145 static void mqttEventCallback( MQTTContext_t * pMqttContext,
146                                MQTTPacketInfo_t * pPacketInfo,
147                                MQTTDeserializedInfo_t * pDeserializedInfo );
148 
149 /**
150  * @brief Mark a command as complete after receiving an acknowledgment packet.
151  *
152  * @param[in] pAgentContext Agent context for the MQTT connection.
153  * @param[in] pPacketInfo Pointer to incoming packet.
154  * @param[in] pDeserializedInfo Pointer to deserialized information from
155  * the incoming packet.
156  * @param[in] pAckInfo Pointer to stored information for the original operation
157  * resulting in the received packet.
158  * @param[in] packetType The type of the incoming packet, either SUBACK, UNSUBACK,
159  * PUBACK, or PUBCOMP.
160  */
161 static void handleAcks( const MQTTAgentContext_t * pAgentContext,
162                         const MQTTPacketInfo_t * pPacketInfo,
163                         const MQTTDeserializedInfo_t * pDeserializedInfo,
164                         MQTTAgentAckInfo_t * pAckInfo,
165                         uint8_t packetType );
166 
167 /**
168  * @brief Retrieve a pointer to an agent context given an MQTT context.
169  *
170  * @param[in] pMQTTContext MQTT Context to search for.
171  *
172  * @return Pointer to agent context, or NULL.
173  */
174 static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContext );
175 
176 /**
177  * @brief Helper function for creating a command and adding it to the command
178  * queue.
179  *
180  * @param[in] commandType Type of command.
181  * @param[in] pMqttAgentContext Handle of the MQTT connection to use.
182  * @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
183  * @param[in] commandCompleteCallback Callback for when command completes.
184  * @param[in] pMqttInfoParam Pointer to command argument.
185  * @param[in] blockTimeMs Maximum amount of time in milliseconds to wait (in the
186  * Blocked state, so not consuming any CPU time) for the command to be posted to the
187  * MQTT agent should the MQTT agent's event queue be full.
188  *
189  * @return #MQTTSuccess if the command was posted to the MQTT agent's event queue.
190  * Otherwise an enumerated error code.
191  */
192 static MQTTStatus_t createAndAddCommand( MQTTAgentCommandType_t commandType,
193                                          const MQTTAgentContext_t * pMqttAgentContext,
194                                          void * pMqttInfoParam,
195                                          MQTTAgentCommandCallback_t commandCompleteCallback,
196                                          MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
197                                          uint32_t blockTimeMs );
198 
199 /**
200  * @brief Helper function to mark a command as complete and invoke its callback.
201  * This function calls the releaseCommand callback.
202  *
203  * @param[in] pAgentContext Agent context for the MQTT connection.
204  * @param[in] pCommand Command to complete.
205  * @param[in] returnCode Return status of command.
206  * @param[in] pSubackCodes Pointer to suback array, if command is a SUBSCRIBE.
207  */
208 static void concludeCommand( const MQTTAgentContext_t * pAgentContext,
209                              MQTTAgentCommand_t * pCommand,
210                              MQTTStatus_t returnCode,
211                              uint8_t * pSubackCodes );
212 
213 /**
214  * @brief Resend QoS 1 and 2 publishes after resuming a session.
215  *
216  * @param[in] pMqttAgentContext Agent context for the MQTT connection.
217  *
218  * @return #MQTTSuccess if all publishes resent successfully, else error code
219  * from #MQTT_Publish.
220  */
221 static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext );
222 
223 /**
224  * @brief Clears the list of pending acknowledgments by invoking each callback
225  * with #MQTTRecvFailed either for ALL operations in the list OR only for
226  * Subscribe/Unsubscribe operations.
227  *
228  * @param[in] pMqttAgentContext Agent context of the MQTT connection.
229  * @param[in] clearOnlySubUnsubEntries Flag indicating whether all entries OR
230  * entries pertaining to only Subscribe/Unsubscribe operations should be cleaned
231  * from the list.
232  */
233 static void clearPendingAcknowledgments( MQTTAgentContext_t * pMqttAgentContext,
234                                          bool clearOnlySubUnsubEntries );
235 
236 /**
237  * @brief Validate an #MQTTAgentContext_t and a #MQTTAgentCommandInfo_t from API
238  * functions.
239  *
240  * @param[in] pMqttAgentContext #MQTTAgentContext_t to validate.
241  * @param[in] pCommandInfo #MQTTAgentCommandInfo_t to validate.
242  *
243  * @return `true` if parameters are valid, else `false`.
244  */
245 static bool validateStruct( const MQTTAgentContext_t * pMqttAgentContext,
246                             const MQTTAgentCommandInfo_t * pCommandInfo );
247 
248 /**
249  * @brief Validate the parameters for a CONNECT, SUBSCRIBE, UNSUBSCRIBE
250  * or PUBLISH.
251  *
252  * @param[in] commandType CONNECT, SUBSCRIBE, UNSUBSCRIBE, or PUBLISH.
253  * @param[in] pParams Parameter structure to validate.
254  *
255  * @return `true` if parameter structure is valid, else `false`.
256  */
257 static bool validateParams( MQTTAgentCommandType_t commandType,
258                             const void * pParams );
259 
260 /**
261  * @brief Called before accepting any PUBLISH or SUBSCRIBE messages to check
262  * there is space in the pending ACK list for the outgoing PUBLISH or SUBSCRIBE.
263  *
264  * @note Because the MQTT agent is inherently multi threaded, and this function
265  * is called from the context of the application task and not the MQTT agent
266  * task, this function can only return a best effort result.  It can definitely
267  * say if there is space for a new pending ACK when the function is called, but
268  * the case of space being exhausted when the agent executes a command that
269  * results in an ACK must still be handled.
270  *
271  * @param[in] pAgentContext Pointer to the context for the MQTT connection to
272  * which the PUBLISH or SUBSCRIBE message is to be sent.
273  *
274  * @return true if there is space in that MQTT connection's ACK list, otherwise
275  * false;
276  */
277 static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext );
278 
279 /*-----------------------------------------------------------*/
280 
isSpaceInPendingAckList(const MQTTAgentContext_t * pAgentContext)281 static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext )
282 {
283     const MQTTAgentAckInfo_t * pendingAcks;
284     bool spaceFound = false;
285     size_t i;
286 
287     assert( pAgentContext != NULL );
288 
289     pendingAcks = pAgentContext->pPendingAcks;
290 
291     /* Are there any open slots? */
292     for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
293     {
294         /* If the packetId is MQTT_PACKET_ID_INVALID then the array space is
295          * not in use. */
296         if( pendingAcks[ i ].packetId == MQTT_PACKET_ID_INVALID )
297         {
298             spaceFound = true;
299             break;
300         }
301     }
302 
303     return spaceFound;
304 }
305 
306 /*-----------------------------------------------------------*/
307 
addAwaitingOperation(MQTTAgentContext_t * pAgentContext,uint16_t packetId,MQTTAgentCommand_t * pCommand)308 static MQTTStatus_t addAwaitingOperation( MQTTAgentContext_t * pAgentContext,
309                                           uint16_t packetId,
310                                           MQTTAgentCommand_t * pCommand )
311 {
312     size_t i = 0, unusedPos = MQTT_AGENT_MAX_OUTSTANDING_ACKS;
313     MQTTStatus_t status = MQTTNoMemory;
314     MQTTAgentAckInfo_t * pendingAcks = NULL;
315 
316     assert( pAgentContext != NULL );
317     assert( pCommand != NULL );
318     assert( packetId != MQTT_PACKET_ID_INVALID );
319     pendingAcks = pAgentContext->pPendingAcks;
320 
321     /* Before adding the record for the pending acknowledgement of the packet ID,
322      * make sure that there doesn't already exist an entry for the same packet ID.
323      * Also, as part of iterating through the list of pending acknowledgements,
324      * find an unused space for the packet ID to be added, if it can be. */
325     for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
326     {
327         /* If the packetId is MQTT_PACKET_ID_INVALID then the array space is not in
328          * use. */
329         if( ( unusedPos == MQTT_AGENT_MAX_OUTSTANDING_ACKS ) &&
330             ( pendingAcks[ i ].packetId == MQTT_PACKET_ID_INVALID ) )
331         {
332             unusedPos = i;
333             status = MQTTSuccess;
334         }
335 
336         if( pendingAcks[ i ].packetId == packetId )
337         {
338             /* Check whether there exists a duplicate entry for pending
339              * acknowledgment for the same packet ID that we want to add to
340              * the list.
341              * Note: This is an unlikely edge case which represents that a packet ID
342              * didn't receive acknowledgment, but subsequent SUBSCRIBE/PUBLISH operations
343              * representing 65535 packet IDs were successful that caused the bit packet
344              * ID value to wrap around and reached the same packet ID as that was still
345              * pending acknowledgment.
346              */
347             status = MQTTStateCollision;
348             LogError( ( "Failed to add operation to list of pending acknowledgments: "
349                         "Existing entry found for same packet: PacketId=%u\n", packetId ) );
350             break;
351         }
352     }
353 
354     /* Add the packet ID to the list if there is space available, and there is no
355      * duplicate entry for the same packet ID found. */
356     if( status == MQTTSuccess )
357     {
358         pendingAcks[ unusedPos ].packetId = packetId;
359         pendingAcks[ unusedPos ].pOriginalCommand = pCommand;
360     }
361     else if( status == MQTTNoMemory )
362     {
363         LogError( ( "Failed to add operation to list of pending acknowledgments: "
364                     "No memory available: PacketId=%u\n", packetId ) );
365     }
366     else
367     {
368         /* Empty else MISRA 15.7 */
369     }
370 
371     return status;
372 }
373 
374 /*-----------------------------------------------------------*/
375 
getAwaitingOperation(MQTTAgentContext_t * pAgentContext,uint16_t incomingPacketId)376 static MQTTAgentAckInfo_t * getAwaitingOperation( MQTTAgentContext_t * pAgentContext,
377                                                   uint16_t incomingPacketId )
378 {
379     size_t i = 0;
380     MQTTAgentAckInfo_t * pFoundAck = NULL;
381 
382     assert( pAgentContext != NULL );
383 
384     /* Look through the array of packet IDs that are still waiting to be acked to
385      * find one with incomingPacketId. */
386     for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
387     {
388         if( pAgentContext->pPendingAcks[ i ].packetId == incomingPacketId )
389         {
390             pFoundAck = &( pAgentContext->pPendingAcks[ i ] );
391             break;
392         }
393     }
394 
395     if( pFoundAck == NULL )
396     {
397         LogError( ( "No ack found for packet id %u.\n", incomingPacketId ) );
398     }
399     else if( ( pFoundAck->pOriginalCommand == NULL ) || ( pFoundAck->packetId == 0U ) )
400     {
401         LogError( ( "Found ack had empty fields. PacketId=%hu, Original Command=%p",
402                     ( unsigned short ) pFoundAck->packetId,
403                     ( void * ) pFoundAck->pOriginalCommand ) );
404         ( void ) memset( pFoundAck, 0x00, sizeof( MQTTAgentAckInfo_t ) );
405         pFoundAck = NULL;
406     }
407     else
408     {
409         /* Empty else MISRA 15.7 */
410     }
411 
412     return pFoundAck;
413 }
414 
415 /*-----------------------------------------------------------*/
416 
createCommand(MQTTAgentCommandType_t commandType,const MQTTAgentContext_t * pMqttAgentContext,void * pMqttInfoParam,MQTTAgentCommandCallback_t commandCompleteCallback,MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,MQTTAgentCommand_t * pCommand)417 static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
418                                    const MQTTAgentContext_t * pMqttAgentContext,
419                                    void * pMqttInfoParam,
420                                    MQTTAgentCommandCallback_t commandCompleteCallback,
421                                    MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
422                                    MQTTAgentCommand_t * pCommand )
423 {
424     bool isValid, isSpace = true;
425     MQTTStatus_t statusReturn;
426     const MQTTPublishInfo_t * pPublishInfo;
427     size_t uxHeaderBytes;
428     const size_t uxControlAndLengthBytes = ( size_t ) 4; /* Control, remaining length and length bytes. */
429 
430     assert( pMqttAgentContext != NULL );
431     assert( pCommand != NULL );
432 
433     ( void ) memset( pCommand, 0x00, sizeof( MQTTAgentCommand_t ) );
434 
435     /* Determine if required parameters are present in context. */
436     switch( commandType )
437     {
438         case SUBSCRIBE:
439         case UNSUBSCRIBE:
440             assert( pMqttInfoParam != NULL );
441 
442             /* This message type results in the broker returning an ACK.  The
443              * agent maintains an array of outstanding ACK messages.  See if
444              * the array contains space for another outstanding ack. */
445             isSpace = isSpaceInPendingAckList( pMqttAgentContext );
446 
447             isValid = isSpace;
448 
449             break;
450 
451         case PUBLISH:
452             pPublishInfo = ( const MQTTPublishInfo_t * ) pMqttInfoParam;
453 
454             /* Calculate the space consumed by everything other than the
455              * payload. */
456             uxHeaderBytes = uxControlAndLengthBytes;
457             uxHeaderBytes += pPublishInfo->topicNameLength;
458 
459             /* This message type results in the broker returning an ACK. The
460              * agent maintains an array of outstanding ACK messages.  See if
461              * the array contains space for another outstanding ack.  QoS0
462              * publish does not result in an ack so it doesn't matter if
463              * there is no space in the ACK array. */
464             if( pPublishInfo->qos != MQTTQoS0 )
465             {
466                 isSpace = isSpaceInPendingAckList( pMqttAgentContext );
467             }
468 
469             /* Will the message fit in the defined buffer? */
470             isValid = ( uxHeaderBytes < pMqttAgentContext->mqttContext.networkBuffer.size ) &&
471                       ( isSpace == true );
472 
473             break;
474 
475         case PROCESSLOOP:
476         case PING:
477         case CONNECT:
478         case DISCONNECT:
479         default:
480             /* Other operations don't need to store ACKs. */
481             isValid = true;
482             break;
483     }
484 
485     if( isValid )
486     {
487         pCommand->commandType = commandType;
488         pCommand->pArgs = pMqttInfoParam;
489         pCommand->pCmdContext = pCommandCompleteCallbackContext;
490         pCommand->pCommandCompleteCallback = commandCompleteCallback;
491     }
492 
493     statusReturn = ( isValid ) ? MQTTSuccess : MQTTBadParameter;
494 
495     if( ( statusReturn == MQTTBadParameter ) && ( isSpace == false ) )
496     {
497         /* The error was caused not by a bad parameter, but because there was
498          * no room in the pending Ack list for the Ack response to an outgoing
499          * PUBLISH or SUBSCRIBE message. */
500         statusReturn = MQTTNoMemory;
501     }
502 
503     return statusReturn;
504 }
505 
506 /*-----------------------------------------------------------*/
507 
addCommandToQueue(const MQTTAgentContext_t * pAgentContext,MQTTAgentCommand_t * pCommand,uint32_t blockTimeMs)508 static MQTTStatus_t addCommandToQueue( const MQTTAgentContext_t * pAgentContext,
509                                        MQTTAgentCommand_t * pCommand,
510                                        uint32_t blockTimeMs )
511 {
512     bool queueStatus;
513 
514     assert( pAgentContext != NULL );
515     assert( pCommand != NULL );
516 
517     /* The application called an API function.  The API function was validated and
518      * packed into a MQTTAgentCommand_t structure.  Now post a reference to the MQTTAgentCommand_t
519      * structure to the MQTT agent for processing. */
520     queueStatus = pAgentContext->agentInterface.send(
521         pAgentContext->agentInterface.pMsgCtx,
522         &pCommand,
523         blockTimeMs
524         );
525 
526     return ( queueStatus ) ? MQTTSuccess : MQTTSendFailed;
527 }
528 
529 /*-----------------------------------------------------------*/
530 
processCommand(MQTTAgentContext_t * pMqttAgentContext,MQTTAgentCommand_t * pCommand,bool * pEndLoop)531 static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
532                                     MQTTAgentCommand_t * pCommand,
533                                     bool * pEndLoop )
534 {
535     const MQTTAgentCommandFunc_t pCommandFunctionTable[ NUM_COMMANDS ] = MQTT_AGENT_FUNCTION_TABLE;
536     MQTTStatus_t operationStatus = MQTTSuccess;
537     bool ackAdded = false;
538     MQTTAgentCommandFunc_t commandFunction = NULL;
539     void * pCommandArgs = NULL;
540     const uint32_t processLoopTimeoutMs = 0U;
541     MQTTAgentCommandFuncReturns_t commandOutParams = { 0 };
542 
543     assert( pMqttAgentContext != NULL );
544     assert( pEndLoop != NULL );
545 
546     if( pCommand != NULL )
547     {
548         assert( pCommand->commandType < NUM_COMMANDS );
549         commandFunction = pCommandFunctionTable[ pCommand->commandType ];
550         pCommandArgs = pCommand->pArgs;
551     }
552     else
553     {
554         commandFunction = pCommandFunctionTable[ NONE ];
555     }
556 
557     operationStatus = commandFunction( pMqttAgentContext, pCommandArgs, &commandOutParams );
558 
559     if( ( operationStatus == MQTTSuccess ) &&
560         commandOutParams.addAcknowledgment &&
561         ( commandOutParams.packetId != MQTT_PACKET_ID_INVALID ) )
562     {
563         operationStatus = addAwaitingOperation( pMqttAgentContext, commandOutParams.packetId, pCommand );
564         ackAdded = ( operationStatus == MQTTSuccess );
565     }
566 
567     if( ( pCommand != NULL ) && ( ackAdded != true ) )
568     {
569         /* The command is complete, call the callback. */
570         concludeCommand( pMqttAgentContext, pCommand, operationStatus, NULL );
571     }
572 
573     /* Run the process loop if there were no errors and the MQTT connection
574      * still exists. */
575     if( ( operationStatus == MQTTSuccess ) && commandOutParams.runProcessLoop )
576     {
577         do
578         {
579             pMqttAgentContext->packetReceivedInLoop = false;
580 
581             if( ( operationStatus == MQTTSuccess ) &&
582                 ( pMqttAgentContext->mqttContext.connectStatus == MQTTConnected ) )
583             {
584                 operationStatus = MQTT_ProcessLoop( &( pMqttAgentContext->mqttContext ), processLoopTimeoutMs );
585             }
586         } while( pMqttAgentContext->packetReceivedInLoop );
587     }
588 
589     /* Set the flag to break from the command loop. */
590     *pEndLoop = ( commandOutParams.endLoop || ( operationStatus != MQTTSuccess ) );
591 
592     return operationStatus;
593 }
594 
595 /*-----------------------------------------------------------*/
596 
handleAcks(const MQTTAgentContext_t * pAgentContext,const MQTTPacketInfo_t * pPacketInfo,const MQTTDeserializedInfo_t * pDeserializedInfo,MQTTAgentAckInfo_t * pAckInfo,uint8_t packetType)597 static void handleAcks( const MQTTAgentContext_t * pAgentContext,
598                         const MQTTPacketInfo_t * pPacketInfo,
599                         const MQTTDeserializedInfo_t * pDeserializedInfo,
600                         MQTTAgentAckInfo_t * pAckInfo,
601                         uint8_t packetType )
602 {
603     uint8_t * pSubackCodes = NULL;
604 
605     assert( pAckInfo != NULL );
606     assert( pAckInfo->pOriginalCommand != NULL );
607 
608     /* A SUBACK's status codes start 2 bytes after the variable header. */
609     pSubackCodes = ( packetType == MQTT_PACKET_TYPE_SUBACK ) ? ( pPacketInfo->pRemainingData + 2U ) : NULL;
610 
611     concludeCommand( pAgentContext,
612                      pAckInfo->pOriginalCommand,
613                      pDeserializedInfo->deserializationResult,
614                      pSubackCodes );
615 
616     /* Clear the entry from the list. */
617     ( void ) memset( pAckInfo, 0x00, sizeof( MQTTAgentAckInfo_t ) );
618 }
619 
620 /*-----------------------------------------------------------*/
621 
getAgentFromMQTTContext(MQTTContext_t * pMQTTContext)622 static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContext )
623 {
624     void * ret = pMQTTContext;
625 
626     return ( MQTTAgentContext_t * ) ret;
627 }
628 
629 /*-----------------------------------------------------------*/
630 
mqttEventCallback(MQTTContext_t * pMqttContext,MQTTPacketInfo_t * pPacketInfo,MQTTDeserializedInfo_t * pDeserializedInfo)631 static void mqttEventCallback( MQTTContext_t * pMqttContext,
632                                MQTTPacketInfo_t * pPacketInfo,
633                                MQTTDeserializedInfo_t * pDeserializedInfo )
634 {
635     MQTTAgentAckInfo_t * pAckInfo;
636     uint16_t packetIdentifier = pDeserializedInfo->packetIdentifier;
637     MQTTAgentContext_t * pAgentContext;
638     const uint8_t upperNibble = ( uint8_t ) 0xF0;
639 
640     assert( pMqttContext != NULL );
641     assert( pPacketInfo != NULL );
642 
643     pAgentContext = getAgentFromMQTTContext( pMqttContext );
644 
645     /* This callback executes from within MQTT_ProcessLoop().  Setting this flag
646      * indicates that the callback executed so the caller of MQTT_ProcessLoop() knows
647      * it should call it again as there may be more data to process. */
648     pAgentContext->packetReceivedInLoop = true;
649 
650     /* Handle incoming publish. The lower 4 bits of the publish packet type is used
651      * for the dup, QoS, and retain flags. Hence masking out the lower bits to check
652      * if the packet is publish. */
653     if( ( pPacketInfo->type & upperNibble ) == MQTT_PACKET_TYPE_PUBLISH )
654     {
655         pAgentContext->pIncomingCallback( pAgentContext, packetIdentifier, pDeserializedInfo->pPublishInfo );
656     }
657     else
658     {
659         /* Handle other packets. */
660         switch( pPacketInfo->type )
661         {
662             case MQTT_PACKET_TYPE_PUBACK:
663             case MQTT_PACKET_TYPE_PUBCOMP:
664             case MQTT_PACKET_TYPE_SUBACK:
665             case MQTT_PACKET_TYPE_UNSUBACK:
666                 pAckInfo = getAwaitingOperation( pAgentContext, packetIdentifier );
667 
668                 if( pAckInfo != NULL )
669                 {
670                     /* This function will also clear the memory associated with
671                      * the ack list entry. */
672                     handleAcks( pAgentContext,
673                                 pPacketInfo,
674                                 pDeserializedInfo,
675                                 pAckInfo,
676                                 pPacketInfo->type );
677                 }
678                 else
679                 {
680                     LogError( ( "No operation found matching packet id %u.\n", packetIdentifier ) );
681                 }
682 
683                 break;
684 
685             /* Nothing to do for these packets since they don't indicate command completion. */
686             case MQTT_PACKET_TYPE_PUBREC:
687             case MQTT_PACKET_TYPE_PUBREL:
688                 break;
689 
690             /* Any other packet type is invalid. */
691             case MQTT_PACKET_TYPE_PINGRESP:
692             default:
693                 LogError( ( "Unknown packet type received:(%02x).\n",
694                             pPacketInfo->type ) );
695                 break;
696         }
697     }
698 }
699 
700 /*-----------------------------------------------------------*/
701 
createAndAddCommand(MQTTAgentCommandType_t commandType,const MQTTAgentContext_t * pMqttAgentContext,void * pMqttInfoParam,MQTTAgentCommandCallback_t commandCompleteCallback,MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,uint32_t blockTimeMs)702 static MQTTStatus_t createAndAddCommand( MQTTAgentCommandType_t commandType,
703                                          const MQTTAgentContext_t * pMqttAgentContext,
704                                          void * pMqttInfoParam,
705                                          MQTTAgentCommandCallback_t commandCompleteCallback,
706                                          MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
707                                          uint32_t blockTimeMs )
708 {
709     MQTTStatus_t statusReturn = MQTTBadParameter;
710     MQTTAgentCommand_t * pCommand;
711     bool commandReleased = false;
712 
713     /* If the packet ID is zero then the MQTT context has not been initialized as 0
714      * is the initial value but not a valid packet ID. */
715     if( pMqttAgentContext->mqttContext.nextPacketId != MQTT_PACKET_ID_INVALID )
716     {
717         pCommand = pMqttAgentContext->agentInterface.getCommand( blockTimeMs );
718 
719         if( pCommand != NULL )
720         {
721             statusReturn = createCommand( commandType,
722                                           pMqttAgentContext,
723                                           pMqttInfoParam,
724                                           commandCompleteCallback,
725                                           pCommandCompleteCallbackContext,
726                                           pCommand );
727 
728             if( statusReturn == MQTTSuccess )
729             {
730                 statusReturn = addCommandToQueue( pMqttAgentContext, pCommand, blockTimeMs );
731             }
732 
733             if( statusReturn != MQTTSuccess )
734             {
735                 /* Could not send the command to the queue so release the command
736                  * structure again. */
737                 commandReleased = pMqttAgentContext->agentInterface.releaseCommand( pCommand );
738 
739                 if( !commandReleased )
740                 {
741                     LogError( ( "Command %p could not be released.",
742                                 ( void * ) pCommand ) );
743                 }
744             }
745         }
746         else
747         {
748             /* Ran out of MQTTAgentCommand_t structures - pool is empty. */
749             statusReturn = MQTTNoMemory;
750         }
751     }
752     else
753     {
754         LogError( ( "MQTT context must be initialized." ) );
755     }
756 
757     return statusReturn;
758 }
759 
760 /*-----------------------------------------------------------*/
761 
concludeCommand(const MQTTAgentContext_t * pAgentContext,MQTTAgentCommand_t * pCommand,MQTTStatus_t returnCode,uint8_t * pSubackCodes)762 static void concludeCommand( const MQTTAgentContext_t * pAgentContext,
763                              MQTTAgentCommand_t * pCommand,
764                              MQTTStatus_t returnCode,
765                              uint8_t * pSubackCodes )
766 {
767     bool commandReleased = false;
768     MQTTAgentReturnInfo_t returnInfo;
769 
770     ( void ) memset( &returnInfo, 0x00, sizeof( MQTTAgentReturnInfo_t ) );
771     assert( pAgentContext != NULL );
772     assert( pAgentContext->agentInterface.releaseCommand != NULL );
773     assert( pCommand != NULL );
774 
775     returnInfo.returnCode = returnCode;
776     returnInfo.pSubackCodes = pSubackCodes;
777 
778     if( pCommand->pCommandCompleteCallback != NULL )
779     {
780         pCommand->pCommandCompleteCallback( pCommand->pCmdContext, &returnInfo );
781     }
782 
783     commandReleased = pAgentContext->agentInterface.releaseCommand( pCommand );
784 
785     if( !commandReleased )
786     {
787         LogError( ( "Failed to release command %p of type %d.",
788                     ( void * ) pCommand,
789                     pCommand->commandType ) );
790     }
791 }
792 
793 /*-----------------------------------------------------------*/
794 
resendPublishes(MQTTAgentContext_t * pMqttAgentContext)795 static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext )
796 {
797     MQTTStatus_t statusResult = MQTTSuccess;
798     MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
799     uint16_t packetId = MQTT_PACKET_ID_INVALID;
800     MQTTAgentAckInfo_t * pFoundAck = NULL;
801     MQTTPublishInfo_t * pOriginalPublish = NULL;
802     MQTTContext_t * pMqttContext;
803 
804     assert( pMqttAgentContext != NULL );
805     pMqttContext = &( pMqttAgentContext->mqttContext );
806 
807     packetId = MQTT_PublishToResend( pMqttContext, &cursor );
808 
809     while( packetId != MQTT_PACKET_ID_INVALID )
810     {
811         /* Retrieve the operation but do not remove it from the list. */
812         pFoundAck = getAwaitingOperation( pMqttAgentContext, packetId );
813 
814         if( pFoundAck != NULL )
815         {
816             /* Set the DUP flag. */
817             pOriginalPublish = ( MQTTPublishInfo_t * ) ( pFoundAck->pOriginalCommand->pArgs );
818             pOriginalPublish->dup = true;
819             statusResult = MQTT_Publish( pMqttContext, pOriginalPublish, packetId );
820 
821             if( statusResult != MQTTSuccess )
822             {
823                 concludeCommand( pMqttAgentContext, pFoundAck->pOriginalCommand, statusResult, NULL );
824                 ( void ) memset( pFoundAck, 0x00, sizeof( MQTTAgentAckInfo_t ) );
825                 LogError( ( "Failed to resend publishes. Error code=%s\n", MQTT_Status_strerror( statusResult ) ) );
826                 break;
827             }
828         }
829 
830         packetId = MQTT_PublishToResend( pMqttContext, &cursor );
831     }
832 
833     return statusResult;
834 }
835 
836 /*-----------------------------------------------------------*/
837 
clearPendingAcknowledgments(MQTTAgentContext_t * pMqttAgentContext,bool clearOnlySubUnsubEntries)838 static void clearPendingAcknowledgments( MQTTAgentContext_t * pMqttAgentContext,
839                                          bool clearOnlySubUnsubEntries )
840 {
841     size_t i = 0;
842     MQTTAgentAckInfo_t * pendingAcks;
843 
844     assert( pMqttAgentContext != NULL );
845 
846     pendingAcks = pMqttAgentContext->pPendingAcks;
847 
848     /* Clear all operations pending acknowledgments. */
849     for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
850     {
851         if( pendingAcks[ i ].packetId != MQTT_PACKET_ID_INVALID )
852         {
853             bool clearEntry = true;
854 
855             assert( pendingAcks[ i ].pOriginalCommand != NULL );
856 
857             if( clearOnlySubUnsubEntries &&
858                 ( pendingAcks[ i ].pOriginalCommand->commandType != SUBSCRIBE ) &&
859                 ( pendingAcks[ i ].pOriginalCommand->commandType != UNSUBSCRIBE ) )
860             {
861                 clearEntry = false;
862             }
863 
864             if( clearEntry )
865             {
866                 /* Receive failed to indicate network error. */
867                 concludeCommand( pMqttAgentContext, pendingAcks[ i ].pOriginalCommand, MQTTRecvFailed, NULL );
868 
869                 /* Now remove it from the list. */
870                 ( void ) memset( &( pendingAcks[ i ] ), 0x00, sizeof( MQTTAgentAckInfo_t ) );
871             }
872         }
873     }
874 }
875 
876 /*-----------------------------------------------------------*/
877 
validateStruct(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)878 static bool validateStruct( const MQTTAgentContext_t * pMqttAgentContext,
879                             const MQTTAgentCommandInfo_t * pCommandInfo )
880 {
881     bool ret = false;
882 
883     if( ( pMqttAgentContext == NULL ) ||
884         ( pCommandInfo == NULL ) )
885     {
886         LogError( ( "Pointer cannot be NULL. pMqttAgentContext=%p, pCommandInfo=%p.",
887                     ( void * ) pMqttAgentContext,
888                     ( void * ) pCommandInfo ) );
889     }
890     else if( ( pMqttAgentContext->agentInterface.send == NULL ) ||
891              ( pMqttAgentContext->agentInterface.recv == NULL ) ||
892              ( pMqttAgentContext->agentInterface.getCommand == NULL ) ||
893              ( pMqttAgentContext->agentInterface.releaseCommand == NULL ) ||
894              ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
895     {
896         LogError( ( "pMqttAgentContext must have initialized its messaging interface." ) );
897     }
898     else
899     {
900         ret = true;
901     }
902 
903     return ret;
904 }
905 
906 /*-----------------------------------------------------------*/
907 
validateParams(MQTTAgentCommandType_t commandType,const void * pParams)908 static bool validateParams( MQTTAgentCommandType_t commandType,
909                             const void * pParams )
910 {
911     bool ret = false;
912     const MQTTAgentConnectArgs_t * pConnectArgs = NULL;
913     const MQTTAgentSubscribeArgs_t * pSubscribeArgs = NULL;
914 
915     assert( ( commandType == CONNECT ) || ( commandType == PUBLISH ) ||
916             ( commandType == SUBSCRIBE ) || ( commandType == UNSUBSCRIBE ) );
917 
918     switch( commandType )
919     {
920         case CONNECT:
921             pConnectArgs = ( const MQTTAgentConnectArgs_t * ) pParams;
922             ret = ( ( pConnectArgs != NULL ) &&
923                     ( pConnectArgs->pConnectInfo != NULL ) );
924             break;
925 
926         case SUBSCRIBE:
927         case UNSUBSCRIBE:
928             pSubscribeArgs = ( const MQTTAgentSubscribeArgs_t * ) pParams;
929             ret = ( ( pSubscribeArgs != NULL ) &&
930                     ( pSubscribeArgs->pSubscribeInfo != NULL ) &&
931                     ( pSubscribeArgs->numSubscriptions != 0U ) );
932             break;
933 
934         case PUBLISH:
935         default:
936             /* Publish, does not need to be cast since we do not check it. */
937             ret = ( pParams != NULL );
938             break;
939     }
940 
941     return ret;
942 }
943 
944 /*-----------------------------------------------------------*/
945 
MQTTAgent_Init(MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentMessageInterface_t * pMsgInterface,const MQTTFixedBuffer_t * pNetworkBuffer,const TransportInterface_t * pTransportInterface,MQTTGetCurrentTimeFunc_t getCurrentTimeMs,MQTTAgentIncomingPublishCallback_t incomingCallback,void * pIncomingPacketContext)946 MQTTStatus_t MQTTAgent_Init( MQTTAgentContext_t * pMqttAgentContext,
947                              const MQTTAgentMessageInterface_t * pMsgInterface,
948                              const MQTTFixedBuffer_t * pNetworkBuffer,
949                              const TransportInterface_t * pTransportInterface,
950                              MQTTGetCurrentTimeFunc_t getCurrentTimeMs,
951                              MQTTAgentIncomingPublishCallback_t incomingCallback,
952                              void * pIncomingPacketContext )
953 {
954     MQTTStatus_t returnStatus;
955 
956     if( ( pMqttAgentContext == NULL ) ||
957         ( pMsgInterface == NULL ) ||
958         ( pTransportInterface == NULL ) ||
959         ( getCurrentTimeMs == NULL ) ||
960         ( incomingCallback == NULL ) )
961     {
962         returnStatus = MQTTBadParameter;
963     }
964     else if( ( pMsgInterface->pMsgCtx == NULL ) ||
965              ( pMsgInterface->send == NULL ) ||
966              ( pMsgInterface->recv == NULL ) ||
967              ( pMsgInterface->getCommand == NULL ) ||
968              ( pMsgInterface->releaseCommand == NULL ) )
969     {
970         LogError( ( "Invalid parameter: pMsgInterface must set all members." ) );
971         returnStatus = MQTTBadParameter;
972     }
973     else
974     {
975         ( void ) memset( pMqttAgentContext, 0x00, sizeof( MQTTAgentContext_t ) );
976 
977         returnStatus = MQTT_Init( &( pMqttAgentContext->mqttContext ),
978                                   pTransportInterface,
979                                   getCurrentTimeMs,
980                                   mqttEventCallback,
981                                   pNetworkBuffer );
982 
983         if( returnStatus == MQTTSuccess )
984         {
985             pMqttAgentContext->pIncomingCallback = incomingCallback;
986             pMqttAgentContext->pIncomingCallbackContext = pIncomingPacketContext;
987             pMqttAgentContext->agentInterface = *pMsgInterface;
988         }
989     }
990 
991     return returnStatus;
992 }
993 
994 /*-----------------------------------------------------------*/
995 
MQTTAgent_CommandLoop(MQTTAgentContext_t * pMqttAgentContext)996 MQTTStatus_t MQTTAgent_CommandLoop( MQTTAgentContext_t * pMqttAgentContext )
997 {
998     MQTTAgentCommand_t * pCommand;
999     MQTTStatus_t operationStatus = MQTTSuccess;
1000     bool endLoop = false;
1001 
1002     /* The command queue should have been created before this task gets created. */
1003     if( ( pMqttAgentContext == NULL ) || ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
1004     {
1005         operationStatus = MQTTBadParameter;
1006     }
1007 
1008     /* Loop until an error or we receive a terminate command. */
1009     while( operationStatus == MQTTSuccess )
1010     {
1011         /* Wait for the next command, if any. */
1012         pCommand = NULL;
1013         ( void ) pMqttAgentContext->agentInterface.recv(
1014             pMqttAgentContext->agentInterface.pMsgCtx,
1015             &( pCommand ),
1016             MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME
1017             );
1018         operationStatus = processCommand( pMqttAgentContext, pCommand, &endLoop );
1019 
1020         if( operationStatus != MQTTSuccess )
1021         {
1022             LogError( ( "MQTT operation failed with status %s\n",
1023                         MQTT_Status_strerror( operationStatus ) ) );
1024         }
1025 
1026         /* Terminate the loop on disconnects, errors, or the termination command. */
1027         if( endLoop )
1028         {
1029             break;
1030         }
1031     }
1032 
1033     return operationStatus;
1034 }
1035 
1036 /*-----------------------------------------------------------*/
1037 
MQTTAgent_ResumeSession(MQTTAgentContext_t * pMqttAgentContext,bool sessionPresent)1038 MQTTStatus_t MQTTAgent_ResumeSession( MQTTAgentContext_t * pMqttAgentContext,
1039                                       bool sessionPresent )
1040 {
1041     MQTTStatus_t statusResult = MQTTSuccess;
1042 
1043     /* If the packet ID is zero then the MQTT context has not been initialized as 0
1044      * is the initial value but not a valid packet ID. */
1045     if( ( pMqttAgentContext != NULL ) &&
1046         ( pMqttAgentContext->mqttContext.nextPacketId != MQTT_PACKET_ID_INVALID ) )
1047     {
1048         /* Resend publishes if session is present. NOTE: It's possible that some
1049          * of the operations that were in progress during the network interruption
1050          * were subscribes. In that case, we would want to mark those operations
1051          * as completing with error and remove them from the list of operations, so
1052          * that the calling task can try subscribing again. */
1053         if( sessionPresent )
1054         {
1055             /* The session has resumed, so clear any SUBSCRIBE/UNSUBSCRIBE operations
1056              * that were pending acknowledgments in the previous connection. */
1057             clearPendingAcknowledgments( pMqttAgentContext, true );
1058 
1059             statusResult = resendPublishes( pMqttAgentContext );
1060         }
1061 
1062         /* If we wanted to resume a session but none existed with the broker, we
1063          * should mark all in progress operations as errors so that the tasks that
1064          * created them can try again. */
1065         else
1066         {
1067             /* We have a clean session, so clear all operations pending acknowledgments. */
1068             clearPendingAcknowledgments( pMqttAgentContext, false );
1069         }
1070     }
1071     else
1072     {
1073         statusResult = MQTTBadParameter;
1074     }
1075 
1076     return statusResult;
1077 }
1078 
1079 /*-----------------------------------------------------------*/
1080 
MQTTAgent_CancelAll(MQTTAgentContext_t * pMqttAgentContext)1081 MQTTStatus_t MQTTAgent_CancelAll( MQTTAgentContext_t * pMqttAgentContext )
1082 {
1083     MQTTStatus_t statusReturn = MQTTSuccess;
1084     MQTTAgentCommand_t * pReceivedCommand = NULL;
1085     bool commandWasReceived = false;
1086     MQTTAgentAckInfo_t * pendingAcks;
1087     size_t i;
1088 
1089     if( ( pMqttAgentContext == NULL ) || ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
1090     {
1091         statusReturn = MQTTBadParameter;
1092     }
1093     else
1094     {
1095         /* Cancel all operations waiting in the queue. */
1096         do
1097         {
1098             pReceivedCommand = NULL;
1099             commandWasReceived = pMqttAgentContext->agentInterface.recv(
1100                 pMqttAgentContext->agentInterface.pMsgCtx,
1101                 &( pReceivedCommand ),
1102                 0U );
1103 
1104             if( pReceivedCommand != NULL )
1105             {
1106                 concludeCommand( pMqttAgentContext, pReceivedCommand, MQTTRecvFailed, NULL );
1107             }
1108         } while( commandWasReceived );
1109 
1110         pendingAcks = pMqttAgentContext->pPendingAcks;
1111 
1112         /* Cancel any operations awaiting an acknowledgment. */
1113         for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
1114         {
1115             if( pendingAcks[ i ].packetId != MQTT_PACKET_ID_INVALID )
1116             {
1117                 concludeCommand( pMqttAgentContext, pendingAcks[ i ].pOriginalCommand, MQTTRecvFailed, NULL );
1118 
1119                 /* Now remove it from the list. */
1120                 ( void ) memset( &( pendingAcks[ i ] ), 0x00, sizeof( MQTTAgentAckInfo_t ) );
1121             }
1122         }
1123     }
1124 
1125     return statusReturn;
1126 }
1127 
1128 /*-----------------------------------------------------------*/
1129 
MQTTAgent_Subscribe(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentSubscribeArgs_t * pSubscriptionArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1130 MQTTStatus_t MQTTAgent_Subscribe( const MQTTAgentContext_t * pMqttAgentContext,
1131                                   MQTTAgentSubscribeArgs_t * pSubscriptionArgs,
1132                                   const MQTTAgentCommandInfo_t * pCommandInfo )
1133 {
1134     MQTTStatus_t statusReturn = MQTTBadParameter;
1135     bool paramsValid = false;
1136 
1137     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1138                   validateParams( SUBSCRIBE, pSubscriptionArgs );
1139 
1140     if( paramsValid )
1141     {
1142         statusReturn = createAndAddCommand( SUBSCRIBE,                                 /* commandType */
1143                                             pMqttAgentContext,                         /* mqttContextHandle */
1144                                             pSubscriptionArgs,                         /* pMqttInfoParam */
1145                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1146                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1147                                             pCommandInfo->blockTimeMs );
1148     }
1149 
1150     return statusReturn;
1151 }
1152 
1153 /*-----------------------------------------------------------*/
1154 
MQTTAgent_Unsubscribe(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentSubscribeArgs_t * pSubscriptionArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1155 MQTTStatus_t MQTTAgent_Unsubscribe( const MQTTAgentContext_t * pMqttAgentContext,
1156                                     MQTTAgentSubscribeArgs_t * pSubscriptionArgs,
1157                                     const MQTTAgentCommandInfo_t * pCommandInfo )
1158 {
1159     MQTTStatus_t statusReturn = MQTTBadParameter;
1160     bool paramsValid = false;
1161 
1162     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1163                   validateParams( UNSUBSCRIBE, pSubscriptionArgs );
1164 
1165     if( paramsValid )
1166     {
1167         statusReturn = createAndAddCommand( UNSUBSCRIBE,                               /* commandType */
1168                                             pMqttAgentContext,                         /* mqttContextHandle */
1169                                             pSubscriptionArgs,                         /* pMqttInfoParam */
1170                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1171                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1172                                             pCommandInfo->blockTimeMs );
1173     }
1174 
1175     return statusReturn;
1176 }
1177 
1178 /*-----------------------------------------------------------*/
1179 
MQTTAgent_Publish(const MQTTAgentContext_t * pMqttAgentContext,MQTTPublishInfo_t * pPublishInfo,const MQTTAgentCommandInfo_t * pCommandInfo)1180 MQTTStatus_t MQTTAgent_Publish( const MQTTAgentContext_t * pMqttAgentContext,
1181                                 MQTTPublishInfo_t * pPublishInfo,
1182                                 const MQTTAgentCommandInfo_t * pCommandInfo )
1183 {
1184     MQTTStatus_t statusReturn = MQTTBadParameter;
1185     bool paramsValid = false;
1186 
1187     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1188                   validateParams( PUBLISH, pPublishInfo );
1189 
1190     if( paramsValid )
1191     {
1192         statusReturn = createAndAddCommand( PUBLISH,                                   /* commandType */
1193                                             pMqttAgentContext,                         /* mqttContextHandle */
1194                                             pPublishInfo,                              /* pMqttInfoParam */
1195                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1196                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1197                                             pCommandInfo->blockTimeMs );
1198     }
1199 
1200     return statusReturn;
1201 }
1202 
1203 /*-----------------------------------------------------------*/
1204 
MQTTAgent_ProcessLoop(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1205 MQTTStatus_t MQTTAgent_ProcessLoop( const MQTTAgentContext_t * pMqttAgentContext,
1206                                     const MQTTAgentCommandInfo_t * pCommandInfo )
1207 {
1208     MQTTStatus_t statusReturn = MQTTBadParameter;
1209     bool paramsValid = false;
1210 
1211     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1212 
1213     if( paramsValid )
1214     {
1215         statusReturn = createAndAddCommand( PROCESSLOOP,                               /* commandType */
1216                                             pMqttAgentContext,                         /* mqttContextHandle */
1217                                             NULL,                                      /* pMqttInfoParam */
1218                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1219                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1220                                             pCommandInfo->blockTimeMs );
1221     }
1222 
1223     return statusReturn;
1224 }
1225 
1226 /*-----------------------------------------------------------*/
1227 
MQTTAgent_Connect(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentConnectArgs_t * pConnectArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1228 MQTTStatus_t MQTTAgent_Connect( const MQTTAgentContext_t * pMqttAgentContext,
1229                                 MQTTAgentConnectArgs_t * pConnectArgs,
1230                                 const MQTTAgentCommandInfo_t * pCommandInfo )
1231 {
1232     MQTTStatus_t statusReturn = MQTTBadParameter;
1233     bool paramsValid = false;
1234 
1235     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1236                   validateParams( CONNECT, pConnectArgs );
1237 
1238     if( paramsValid )
1239     {
1240         statusReturn = createAndAddCommand( CONNECT,
1241                                             pMqttAgentContext,
1242                                             pConnectArgs,
1243                                             pCommandInfo->cmdCompleteCallback,
1244                                             pCommandInfo->pCmdCompleteCallbackContext,
1245                                             pCommandInfo->blockTimeMs );
1246     }
1247 
1248     return statusReturn;
1249 }
1250 
1251 /*-----------------------------------------------------------*/
1252 
MQTTAgent_Disconnect(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1253 MQTTStatus_t MQTTAgent_Disconnect( const MQTTAgentContext_t * pMqttAgentContext,
1254                                    const MQTTAgentCommandInfo_t * pCommandInfo )
1255 {
1256     MQTTStatus_t statusReturn = MQTTBadParameter;
1257     bool paramsValid = false;
1258 
1259     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1260 
1261     if( paramsValid )
1262     {
1263         statusReturn = createAndAddCommand( DISCONNECT,                                /* commandType */
1264                                             pMqttAgentContext,                         /* mqttContextHandle */
1265                                             NULL,                                      /* pMqttInfoParam */
1266                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1267                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1268                                             pCommandInfo->blockTimeMs );
1269     }
1270 
1271     return statusReturn;
1272 }
1273 
1274 /*-----------------------------------------------------------*/
1275 
MQTTAgent_Ping(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1276 MQTTStatus_t MQTTAgent_Ping( const MQTTAgentContext_t * pMqttAgentContext,
1277                              const MQTTAgentCommandInfo_t * pCommandInfo )
1278 {
1279     MQTTStatus_t statusReturn = MQTTBadParameter;
1280     bool paramsValid = false;
1281 
1282     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1283 
1284     if( paramsValid )
1285     {
1286         statusReturn = createAndAddCommand( PING,                                      /* commandType */
1287                                             pMqttAgentContext,                         /* mqttContextHandle */
1288                                             NULL,                                      /* pMqttInfoParam */
1289                                             pCommandInfo->cmdCompleteCallback,         /* commandCompleteCallback */
1290                                             pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1291                                             pCommandInfo->blockTimeMs );
1292     }
1293 
1294     return statusReturn;
1295 }
1296 
1297 /*-----------------------------------------------------------*/
1298 
MQTTAgent_Terminate(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1299 MQTTStatus_t MQTTAgent_Terminate( const MQTTAgentContext_t * pMqttAgentContext,
1300                                   const MQTTAgentCommandInfo_t * pCommandInfo )
1301 {
1302     MQTTStatus_t statusReturn = MQTTBadParameter;
1303     bool paramsValid = false;
1304 
1305     paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1306 
1307     if( paramsValid )
1308     {
1309         statusReturn = createAndAddCommand( TERMINATE,
1310                                             pMqttAgentContext,
1311                                             NULL,
1312                                             pCommandInfo->cmdCompleteCallback,
1313                                             pCommandInfo->pCmdCompleteCallbackContext,
1314                                             pCommandInfo->blockTimeMs );
1315     }
1316 
1317     return statusReturn;
1318 }
1319 
1320 /*-----------------------------------------------------------*/
1321