1 /*
2 * coreMQTT Agent v1.1.0
3 * Copyright (C) 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
6 * this software and associated documentation files (the "Software"), to deal in
7 * the Software without restriction, including without limitation the rights to
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
9 * the Software, and to permit persons to whom the Software is furnished to do so,
10 * subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in all
13 * copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 */
22
23 /**
24 * @file core_mqtt_agent.c
25 * @brief Implements an MQTT agent (or daemon task) to enable multithreaded access to
26 * coreMQTT.
27 *
28 * @note Implements an MQTT agent (or daemon task) on top of the coreMQTT MQTT client
29 * library. The agent makes coreMQTT usage thread safe by being the only task (or
30 * thread) in the system that is allowed to access the native coreMQTT API - and in
31 * so doing, serializes all access to coreMQTT even when multiple tasks are using the
32 * same MQTT connection.
33 *
34 * The agent provides an equivalent API for each coreMQTT API. Whereas coreMQTT
35 * APIs are prefixed "MQTT_", the agent APIs are prefixed "MQTTAgent_". For example,
36 * that agent's MQTTAgent_Publish() API is the thread safe equivalent to coreMQTT's
37 * MQTT_Publish() API.
38 */
39
40 /* Standard includes. */
41 #include <string.h>
42 #include <stdio.h>
43 #include <assert.h>
44
45 /* MQTT agent include. */
46 #include "core_mqtt_agent.h"
47 #include "core_mqtt_agent_command_functions.h"
48
49 /*-----------------------------------------------------------*/
50
51 /**
52 * @brief Track an operation by adding it to a list, indicating it is anticipating
53 * an acknowledgment.
54 *
55 * @param[in] pAgentContext Agent context for the MQTT connection.
56 * @param[in] packetId Packet ID of pending ack.
57 * @param[in] pCommand Pointer to command that is expecting an ack.
58 *
59 * @return Returns one of the following:
60 * - #MQTTSuccess if an entry was added for the to the list.
61 * - #MQTTStateCollision if there already exists an entry for the same packet ID
62 * in the list.
63 * - #MQTTNoMemory if there is no space available in the list for adding a
64 * new entry.
65 */
66 static MQTTStatus_t addAwaitingOperation( MQTTAgentContext_t * pAgentContext,
67 uint16_t packetId,
68 MQTTAgentCommand_t * pCommand );
69
70 /**
71 * @brief Retrieve an operation from the list of pending acks, and optionally
72 * remove it from the list.
73 *
74 * @param[in] pAgentContext Agent context for the MQTT connection.
75 * @param[in] incomingPacketId Packet ID of incoming ack.
76 *
77 * @return Pointer to stored information about the operation awaiting the ack.
78 * Returns NULL if the packet ID is zero or original command does not exist.
79 */
80 static MQTTAgentAckInfo_t * getAwaitingOperation( MQTTAgentContext_t * pAgentContext,
81 uint16_t incomingPacketId );
82
83 /**
84 * @brief Populate the parameters of a #MQTTAgentCommand struct.
85 *
86 * @param[in] commandType Type of command. For example, publish or subscribe.
87 * @param[in] pMqttAgentContext Pointer to MQTT context to use for command.
88 * @param[in] pMqttInfoParam Pointer to MQTTPublishInfo_t or MQTTSubscribeInfo_t.
89 * @param[in] commandCompleteCallback Callback for when command completes.
90 * @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
91 * @param[out] pCommand Pointer to initialized command.
92 *
93 * @return #MQTTSuccess if all necessary fields for the command are passed,
94 * else an enumerated error code.
95 */
96 static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
97 const MQTTAgentContext_t * pMqttAgentContext,
98 void * pMqttInfoParam,
99 MQTTAgentCommandCallback_t commandCompleteCallback,
100 MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
101 MQTTAgentCommand_t * pCommand );
102
103 /**
104 * @brief Add a command to the global command queue.
105 *
106 * @param[in] pAgentContext Agent context for the MQTT connection.
107 * @param[in] pCommand Pointer to command to copy to queue.
108 * @param[in] blockTimeMs The maximum amount of time to milliseconds to wait in the
109 * Blocked state (so not consuming any CPU time) for the command to be posted to the
110 * queue should the queue already be full.
111 *
112 * @return #MQTTSuccess if the command was added to the queue, else an enumerated
113 * error code.
114 */
115 static MQTTStatus_t addCommandToQueue( const MQTTAgentContext_t * pAgentContext,
116 MQTTAgentCommand_t * pCommand,
117 uint32_t blockTimeMs );
118
119 /**
120 * @brief Process a #MQTTAgentCommand struct.
121 *
122 * @note This agent does not check existing subscriptions before sending a
123 * SUBSCRIBE or UNSUBSCRIBE packet. If a subscription already exists, then
124 * a SUBSCRIBE packet will be sent anyway, and if multiple tasks are subscribed
125 * to a topic filter, then they will all be unsubscribed after an UNSUBSCRIBE.
126 *
127 * @param[in] pMqttAgentContext Agent context for MQTT connection.
128 * @param[in] pCommand Pointer to command to process.
129 * @param[out] pEndLoop Whether the command loop should terminate.
130 *
131 * @return status of MQTT library API call.
132 */
133 static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
134 MQTTAgentCommand_t * pCommand,
135 bool * pEndLoop );
136
137 /**
138 * @brief Dispatch incoming publishes and acks to their various handler functions.
139 *
140 * @param[in] pMqttContext MQTT Context
141 * @param[in] pPacketInfo Pointer to incoming packet.
142 * @param[in] pDeserializedInfo Pointer to deserialized information from
143 * the incoming packet.
144 */
145 static void mqttEventCallback( MQTTContext_t * pMqttContext,
146 MQTTPacketInfo_t * pPacketInfo,
147 MQTTDeserializedInfo_t * pDeserializedInfo );
148
149 /**
150 * @brief Mark a command as complete after receiving an acknowledgment packet.
151 *
152 * @param[in] pAgentContext Agent context for the MQTT connection.
153 * @param[in] pPacketInfo Pointer to incoming packet.
154 * @param[in] pDeserializedInfo Pointer to deserialized information from
155 * the incoming packet.
156 * @param[in] pAckInfo Pointer to stored information for the original operation
157 * resulting in the received packet.
158 * @param[in] packetType The type of the incoming packet, either SUBACK, UNSUBACK,
159 * PUBACK, or PUBCOMP.
160 */
161 static void handleAcks( const MQTTAgentContext_t * pAgentContext,
162 const MQTTPacketInfo_t * pPacketInfo,
163 const MQTTDeserializedInfo_t * pDeserializedInfo,
164 MQTTAgentAckInfo_t * pAckInfo,
165 uint8_t packetType );
166
167 /**
168 * @brief Retrieve a pointer to an agent context given an MQTT context.
169 *
170 * @param[in] pMQTTContext MQTT Context to search for.
171 *
172 * @return Pointer to agent context, or NULL.
173 */
174 static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContext );
175
176 /**
177 * @brief Helper function for creating a command and adding it to the command
178 * queue.
179 *
180 * @param[in] commandType Type of command.
181 * @param[in] pMqttAgentContext Handle of the MQTT connection to use.
182 * @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
183 * @param[in] commandCompleteCallback Callback for when command completes.
184 * @param[in] pMqttInfoParam Pointer to command argument.
185 * @param[in] blockTimeMs Maximum amount of time in milliseconds to wait (in the
186 * Blocked state, so not consuming any CPU time) for the command to be posted to the
187 * MQTT agent should the MQTT agent's event queue be full.
188 *
189 * @return #MQTTSuccess if the command was posted to the MQTT agent's event queue.
190 * Otherwise an enumerated error code.
191 */
192 static MQTTStatus_t createAndAddCommand( MQTTAgentCommandType_t commandType,
193 const MQTTAgentContext_t * pMqttAgentContext,
194 void * pMqttInfoParam,
195 MQTTAgentCommandCallback_t commandCompleteCallback,
196 MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
197 uint32_t blockTimeMs );
198
199 /**
200 * @brief Helper function to mark a command as complete and invoke its callback.
201 * This function calls the releaseCommand callback.
202 *
203 * @param[in] pAgentContext Agent context for the MQTT connection.
204 * @param[in] pCommand Command to complete.
205 * @param[in] returnCode Return status of command.
206 * @param[in] pSubackCodes Pointer to suback array, if command is a SUBSCRIBE.
207 */
208 static void concludeCommand( const MQTTAgentContext_t * pAgentContext,
209 MQTTAgentCommand_t * pCommand,
210 MQTTStatus_t returnCode,
211 uint8_t * pSubackCodes );
212
213 /**
214 * @brief Resend QoS 1 and 2 publishes after resuming a session.
215 *
216 * @param[in] pMqttAgentContext Agent context for the MQTT connection.
217 *
218 * @return #MQTTSuccess if all publishes resent successfully, else error code
219 * from #MQTT_Publish.
220 */
221 static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext );
222
223 /**
224 * @brief Clears the list of pending acknowledgments by invoking each callback
225 * with #MQTTRecvFailed either for ALL operations in the list OR only for
226 * Subscribe/Unsubscribe operations.
227 *
228 * @param[in] pMqttAgentContext Agent context of the MQTT connection.
229 * @param[in] clearOnlySubUnsubEntries Flag indicating whether all entries OR
230 * entries pertaining to only Subscribe/Unsubscribe operations should be cleaned
231 * from the list.
232 */
233 static void clearPendingAcknowledgments( MQTTAgentContext_t * pMqttAgentContext,
234 bool clearOnlySubUnsubEntries );
235
236 /**
237 * @brief Validate an #MQTTAgentContext_t and a #MQTTAgentCommandInfo_t from API
238 * functions.
239 *
240 * @param[in] pMqttAgentContext #MQTTAgentContext_t to validate.
241 * @param[in] pCommandInfo #MQTTAgentCommandInfo_t to validate.
242 *
243 * @return `true` if parameters are valid, else `false`.
244 */
245 static bool validateStruct( const MQTTAgentContext_t * pMqttAgentContext,
246 const MQTTAgentCommandInfo_t * pCommandInfo );
247
248 /**
249 * @brief Validate the parameters for a CONNECT, SUBSCRIBE, UNSUBSCRIBE
250 * or PUBLISH.
251 *
252 * @param[in] commandType CONNECT, SUBSCRIBE, UNSUBSCRIBE, or PUBLISH.
253 * @param[in] pParams Parameter structure to validate.
254 *
255 * @return `true` if parameter structure is valid, else `false`.
256 */
257 static bool validateParams( MQTTAgentCommandType_t commandType,
258 const void * pParams );
259
260 /**
261 * @brief Called before accepting any PUBLISH or SUBSCRIBE messages to check
262 * there is space in the pending ACK list for the outgoing PUBLISH or SUBSCRIBE.
263 *
264 * @note Because the MQTT agent is inherently multi threaded, and this function
265 * is called from the context of the application task and not the MQTT agent
266 * task, this function can only return a best effort result. It can definitely
267 * say if there is space for a new pending ACK when the function is called, but
268 * the case of space being exhausted when the agent executes a command that
269 * results in an ACK must still be handled.
270 *
271 * @param[in] pAgentContext Pointer to the context for the MQTT connection to
272 * which the PUBLISH or SUBSCRIBE message is to be sent.
273 *
274 * @return true if there is space in that MQTT connection's ACK list, otherwise
275 * false;
276 */
277 static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext );
278
279 /*-----------------------------------------------------------*/
280
isSpaceInPendingAckList(const MQTTAgentContext_t * pAgentContext)281 static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext )
282 {
283 const MQTTAgentAckInfo_t * pendingAcks;
284 bool spaceFound = false;
285 size_t i;
286
287 assert( pAgentContext != NULL );
288
289 pendingAcks = pAgentContext->pPendingAcks;
290
291 /* Are there any open slots? */
292 for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
293 {
294 /* If the packetId is MQTT_PACKET_ID_INVALID then the array space is
295 * not in use. */
296 if( pendingAcks[ i ].packetId == MQTT_PACKET_ID_INVALID )
297 {
298 spaceFound = true;
299 break;
300 }
301 }
302
303 return spaceFound;
304 }
305
306 /*-----------------------------------------------------------*/
307
addAwaitingOperation(MQTTAgentContext_t * pAgentContext,uint16_t packetId,MQTTAgentCommand_t * pCommand)308 static MQTTStatus_t addAwaitingOperation( MQTTAgentContext_t * pAgentContext,
309 uint16_t packetId,
310 MQTTAgentCommand_t * pCommand )
311 {
312 size_t i = 0, unusedPos = MQTT_AGENT_MAX_OUTSTANDING_ACKS;
313 MQTTStatus_t status = MQTTNoMemory;
314 MQTTAgentAckInfo_t * pendingAcks = NULL;
315
316 assert( pAgentContext != NULL );
317 assert( pCommand != NULL );
318 assert( packetId != MQTT_PACKET_ID_INVALID );
319 pendingAcks = pAgentContext->pPendingAcks;
320
321 /* Before adding the record for the pending acknowledgement of the packet ID,
322 * make sure that there doesn't already exist an entry for the same packet ID.
323 * Also, as part of iterating through the list of pending acknowledgements,
324 * find an unused space for the packet ID to be added, if it can be. */
325 for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
326 {
327 /* If the packetId is MQTT_PACKET_ID_INVALID then the array space is not in
328 * use. */
329 if( ( unusedPos == MQTT_AGENT_MAX_OUTSTANDING_ACKS ) &&
330 ( pendingAcks[ i ].packetId == MQTT_PACKET_ID_INVALID ) )
331 {
332 unusedPos = i;
333 status = MQTTSuccess;
334 }
335
336 if( pendingAcks[ i ].packetId == packetId )
337 {
338 /* Check whether there exists a duplicate entry for pending
339 * acknowledgment for the same packet ID that we want to add to
340 * the list.
341 * Note: This is an unlikely edge case which represents that a packet ID
342 * didn't receive acknowledgment, but subsequent SUBSCRIBE/PUBLISH operations
343 * representing 65535 packet IDs were successful that caused the bit packet
344 * ID value to wrap around and reached the same packet ID as that was still
345 * pending acknowledgment.
346 */
347 status = MQTTStateCollision;
348 LogError( ( "Failed to add operation to list of pending acknowledgments: "
349 "Existing entry found for same packet: PacketId=%u\n", packetId ) );
350 break;
351 }
352 }
353
354 /* Add the packet ID to the list if there is space available, and there is no
355 * duplicate entry for the same packet ID found. */
356 if( status == MQTTSuccess )
357 {
358 pendingAcks[ unusedPos ].packetId = packetId;
359 pendingAcks[ unusedPos ].pOriginalCommand = pCommand;
360 }
361 else if( status == MQTTNoMemory )
362 {
363 LogError( ( "Failed to add operation to list of pending acknowledgments: "
364 "No memory available: PacketId=%u\n", packetId ) );
365 }
366 else
367 {
368 /* Empty else MISRA 15.7 */
369 }
370
371 return status;
372 }
373
374 /*-----------------------------------------------------------*/
375
getAwaitingOperation(MQTTAgentContext_t * pAgentContext,uint16_t incomingPacketId)376 static MQTTAgentAckInfo_t * getAwaitingOperation( MQTTAgentContext_t * pAgentContext,
377 uint16_t incomingPacketId )
378 {
379 size_t i = 0;
380 MQTTAgentAckInfo_t * pFoundAck = NULL;
381
382 assert( pAgentContext != NULL );
383
384 /* Look through the array of packet IDs that are still waiting to be acked to
385 * find one with incomingPacketId. */
386 for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
387 {
388 if( pAgentContext->pPendingAcks[ i ].packetId == incomingPacketId )
389 {
390 pFoundAck = &( pAgentContext->pPendingAcks[ i ] );
391 break;
392 }
393 }
394
395 if( pFoundAck == NULL )
396 {
397 LogError( ( "No ack found for packet id %u.\n", incomingPacketId ) );
398 }
399 else if( ( pFoundAck->pOriginalCommand == NULL ) || ( pFoundAck->packetId == 0U ) )
400 {
401 LogError( ( "Found ack had empty fields. PacketId=%hu, Original Command=%p",
402 ( unsigned short ) pFoundAck->packetId,
403 ( void * ) pFoundAck->pOriginalCommand ) );
404 ( void ) memset( pFoundAck, 0x00, sizeof( MQTTAgentAckInfo_t ) );
405 pFoundAck = NULL;
406 }
407 else
408 {
409 /* Empty else MISRA 15.7 */
410 }
411
412 return pFoundAck;
413 }
414
415 /*-----------------------------------------------------------*/
416
createCommand(MQTTAgentCommandType_t commandType,const MQTTAgentContext_t * pMqttAgentContext,void * pMqttInfoParam,MQTTAgentCommandCallback_t commandCompleteCallback,MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,MQTTAgentCommand_t * pCommand)417 static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
418 const MQTTAgentContext_t * pMqttAgentContext,
419 void * pMqttInfoParam,
420 MQTTAgentCommandCallback_t commandCompleteCallback,
421 MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
422 MQTTAgentCommand_t * pCommand )
423 {
424 bool isValid, isSpace = true;
425 MQTTStatus_t statusReturn;
426 const MQTTPublishInfo_t * pPublishInfo;
427 size_t uxHeaderBytes;
428 const size_t uxControlAndLengthBytes = ( size_t ) 4; /* Control, remaining length and length bytes. */
429
430 assert( pMqttAgentContext != NULL );
431 assert( pCommand != NULL );
432
433 ( void ) memset( pCommand, 0x00, sizeof( MQTTAgentCommand_t ) );
434
435 /* Determine if required parameters are present in context. */
436 switch( commandType )
437 {
438 case SUBSCRIBE:
439 case UNSUBSCRIBE:
440 assert( pMqttInfoParam != NULL );
441
442 /* This message type results in the broker returning an ACK. The
443 * agent maintains an array of outstanding ACK messages. See if
444 * the array contains space for another outstanding ack. */
445 isSpace = isSpaceInPendingAckList( pMqttAgentContext );
446
447 isValid = isSpace;
448
449 break;
450
451 case PUBLISH:
452 pPublishInfo = ( const MQTTPublishInfo_t * ) pMqttInfoParam;
453
454 /* Calculate the space consumed by everything other than the
455 * payload. */
456 uxHeaderBytes = uxControlAndLengthBytes;
457 uxHeaderBytes += pPublishInfo->topicNameLength;
458
459 /* This message type results in the broker returning an ACK. The
460 * agent maintains an array of outstanding ACK messages. See if
461 * the array contains space for another outstanding ack. QoS0
462 * publish does not result in an ack so it doesn't matter if
463 * there is no space in the ACK array. */
464 if( pPublishInfo->qos != MQTTQoS0 )
465 {
466 isSpace = isSpaceInPendingAckList( pMqttAgentContext );
467 }
468
469 /* Will the message fit in the defined buffer? */
470 isValid = ( uxHeaderBytes < pMqttAgentContext->mqttContext.networkBuffer.size ) &&
471 ( isSpace == true );
472
473 break;
474
475 case PROCESSLOOP:
476 case PING:
477 case CONNECT:
478 case DISCONNECT:
479 default:
480 /* Other operations don't need to store ACKs. */
481 isValid = true;
482 break;
483 }
484
485 if( isValid )
486 {
487 pCommand->commandType = commandType;
488 pCommand->pArgs = pMqttInfoParam;
489 pCommand->pCmdContext = pCommandCompleteCallbackContext;
490 pCommand->pCommandCompleteCallback = commandCompleteCallback;
491 }
492
493 statusReturn = ( isValid ) ? MQTTSuccess : MQTTBadParameter;
494
495 if( ( statusReturn == MQTTBadParameter ) && ( isSpace == false ) )
496 {
497 /* The error was caused not by a bad parameter, but because there was
498 * no room in the pending Ack list for the Ack response to an outgoing
499 * PUBLISH or SUBSCRIBE message. */
500 statusReturn = MQTTNoMemory;
501 }
502
503 return statusReturn;
504 }
505
506 /*-----------------------------------------------------------*/
507
addCommandToQueue(const MQTTAgentContext_t * pAgentContext,MQTTAgentCommand_t * pCommand,uint32_t blockTimeMs)508 static MQTTStatus_t addCommandToQueue( const MQTTAgentContext_t * pAgentContext,
509 MQTTAgentCommand_t * pCommand,
510 uint32_t blockTimeMs )
511 {
512 bool queueStatus;
513
514 assert( pAgentContext != NULL );
515 assert( pCommand != NULL );
516
517 /* The application called an API function. The API function was validated and
518 * packed into a MQTTAgentCommand_t structure. Now post a reference to the MQTTAgentCommand_t
519 * structure to the MQTT agent for processing. */
520 queueStatus = pAgentContext->agentInterface.send(
521 pAgentContext->agentInterface.pMsgCtx,
522 &pCommand,
523 blockTimeMs
524 );
525
526 return ( queueStatus ) ? MQTTSuccess : MQTTSendFailed;
527 }
528
529 /*-----------------------------------------------------------*/
530
processCommand(MQTTAgentContext_t * pMqttAgentContext,MQTTAgentCommand_t * pCommand,bool * pEndLoop)531 static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
532 MQTTAgentCommand_t * pCommand,
533 bool * pEndLoop )
534 {
535 const MQTTAgentCommandFunc_t pCommandFunctionTable[ NUM_COMMANDS ] = MQTT_AGENT_FUNCTION_TABLE;
536 MQTTStatus_t operationStatus = MQTTSuccess;
537 bool ackAdded = false;
538 MQTTAgentCommandFunc_t commandFunction = NULL;
539 void * pCommandArgs = NULL;
540 const uint32_t processLoopTimeoutMs = 0U;
541 MQTTAgentCommandFuncReturns_t commandOutParams = { 0 };
542
543 assert( pMqttAgentContext != NULL );
544 assert( pEndLoop != NULL );
545
546 if( pCommand != NULL )
547 {
548 assert( pCommand->commandType < NUM_COMMANDS );
549 commandFunction = pCommandFunctionTable[ pCommand->commandType ];
550 pCommandArgs = pCommand->pArgs;
551 }
552 else
553 {
554 commandFunction = pCommandFunctionTable[ NONE ];
555 }
556
557 operationStatus = commandFunction( pMqttAgentContext, pCommandArgs, &commandOutParams );
558
559 if( ( operationStatus == MQTTSuccess ) &&
560 commandOutParams.addAcknowledgment &&
561 ( commandOutParams.packetId != MQTT_PACKET_ID_INVALID ) )
562 {
563 operationStatus = addAwaitingOperation( pMqttAgentContext, commandOutParams.packetId, pCommand );
564 ackAdded = ( operationStatus == MQTTSuccess );
565 }
566
567 if( ( pCommand != NULL ) && ( ackAdded != true ) )
568 {
569 /* The command is complete, call the callback. */
570 concludeCommand( pMqttAgentContext, pCommand, operationStatus, NULL );
571 }
572
573 /* Run the process loop if there were no errors and the MQTT connection
574 * still exists. */
575 if( ( operationStatus == MQTTSuccess ) && commandOutParams.runProcessLoop )
576 {
577 do
578 {
579 pMqttAgentContext->packetReceivedInLoop = false;
580
581 if( ( operationStatus == MQTTSuccess ) &&
582 ( pMqttAgentContext->mqttContext.connectStatus == MQTTConnected ) )
583 {
584 operationStatus = MQTT_ProcessLoop( &( pMqttAgentContext->mqttContext ), processLoopTimeoutMs );
585 }
586 } while( pMqttAgentContext->packetReceivedInLoop );
587 }
588
589 /* Set the flag to break from the command loop. */
590 *pEndLoop = ( commandOutParams.endLoop || ( operationStatus != MQTTSuccess ) );
591
592 return operationStatus;
593 }
594
595 /*-----------------------------------------------------------*/
596
handleAcks(const MQTTAgentContext_t * pAgentContext,const MQTTPacketInfo_t * pPacketInfo,const MQTTDeserializedInfo_t * pDeserializedInfo,MQTTAgentAckInfo_t * pAckInfo,uint8_t packetType)597 static void handleAcks( const MQTTAgentContext_t * pAgentContext,
598 const MQTTPacketInfo_t * pPacketInfo,
599 const MQTTDeserializedInfo_t * pDeserializedInfo,
600 MQTTAgentAckInfo_t * pAckInfo,
601 uint8_t packetType )
602 {
603 uint8_t * pSubackCodes = NULL;
604
605 assert( pAckInfo != NULL );
606 assert( pAckInfo->pOriginalCommand != NULL );
607
608 /* A SUBACK's status codes start 2 bytes after the variable header. */
609 pSubackCodes = ( packetType == MQTT_PACKET_TYPE_SUBACK ) ? ( pPacketInfo->pRemainingData + 2U ) : NULL;
610
611 concludeCommand( pAgentContext,
612 pAckInfo->pOriginalCommand,
613 pDeserializedInfo->deserializationResult,
614 pSubackCodes );
615
616 /* Clear the entry from the list. */
617 ( void ) memset( pAckInfo, 0x00, sizeof( MQTTAgentAckInfo_t ) );
618 }
619
620 /*-----------------------------------------------------------*/
621
getAgentFromMQTTContext(MQTTContext_t * pMQTTContext)622 static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContext )
623 {
624 void * ret = pMQTTContext;
625
626 return ( MQTTAgentContext_t * ) ret;
627 }
628
629 /*-----------------------------------------------------------*/
630
mqttEventCallback(MQTTContext_t * pMqttContext,MQTTPacketInfo_t * pPacketInfo,MQTTDeserializedInfo_t * pDeserializedInfo)631 static void mqttEventCallback( MQTTContext_t * pMqttContext,
632 MQTTPacketInfo_t * pPacketInfo,
633 MQTTDeserializedInfo_t * pDeserializedInfo )
634 {
635 MQTTAgentAckInfo_t * pAckInfo;
636 uint16_t packetIdentifier = pDeserializedInfo->packetIdentifier;
637 MQTTAgentContext_t * pAgentContext;
638 const uint8_t upperNibble = ( uint8_t ) 0xF0;
639
640 assert( pMqttContext != NULL );
641 assert( pPacketInfo != NULL );
642
643 pAgentContext = getAgentFromMQTTContext( pMqttContext );
644
645 /* This callback executes from within MQTT_ProcessLoop(). Setting this flag
646 * indicates that the callback executed so the caller of MQTT_ProcessLoop() knows
647 * it should call it again as there may be more data to process. */
648 pAgentContext->packetReceivedInLoop = true;
649
650 /* Handle incoming publish. The lower 4 bits of the publish packet type is used
651 * for the dup, QoS, and retain flags. Hence masking out the lower bits to check
652 * if the packet is publish. */
653 if( ( pPacketInfo->type & upperNibble ) == MQTT_PACKET_TYPE_PUBLISH )
654 {
655 pAgentContext->pIncomingCallback( pAgentContext, packetIdentifier, pDeserializedInfo->pPublishInfo );
656 }
657 else
658 {
659 /* Handle other packets. */
660 switch( pPacketInfo->type )
661 {
662 case MQTT_PACKET_TYPE_PUBACK:
663 case MQTT_PACKET_TYPE_PUBCOMP:
664 case MQTT_PACKET_TYPE_SUBACK:
665 case MQTT_PACKET_TYPE_UNSUBACK:
666 pAckInfo = getAwaitingOperation( pAgentContext, packetIdentifier );
667
668 if( pAckInfo != NULL )
669 {
670 /* This function will also clear the memory associated with
671 * the ack list entry. */
672 handleAcks( pAgentContext,
673 pPacketInfo,
674 pDeserializedInfo,
675 pAckInfo,
676 pPacketInfo->type );
677 }
678 else
679 {
680 LogError( ( "No operation found matching packet id %u.\n", packetIdentifier ) );
681 }
682
683 break;
684
685 /* Nothing to do for these packets since they don't indicate command completion. */
686 case MQTT_PACKET_TYPE_PUBREC:
687 case MQTT_PACKET_TYPE_PUBREL:
688 break;
689
690 /* Any other packet type is invalid. */
691 case MQTT_PACKET_TYPE_PINGRESP:
692 default:
693 LogError( ( "Unknown packet type received:(%02x).\n",
694 pPacketInfo->type ) );
695 break;
696 }
697 }
698 }
699
700 /*-----------------------------------------------------------*/
701
createAndAddCommand(MQTTAgentCommandType_t commandType,const MQTTAgentContext_t * pMqttAgentContext,void * pMqttInfoParam,MQTTAgentCommandCallback_t commandCompleteCallback,MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,uint32_t blockTimeMs)702 static MQTTStatus_t createAndAddCommand( MQTTAgentCommandType_t commandType,
703 const MQTTAgentContext_t * pMqttAgentContext,
704 void * pMqttInfoParam,
705 MQTTAgentCommandCallback_t commandCompleteCallback,
706 MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
707 uint32_t blockTimeMs )
708 {
709 MQTTStatus_t statusReturn = MQTTBadParameter;
710 MQTTAgentCommand_t * pCommand;
711 bool commandReleased = false;
712
713 /* If the packet ID is zero then the MQTT context has not been initialized as 0
714 * is the initial value but not a valid packet ID. */
715 if( pMqttAgentContext->mqttContext.nextPacketId != MQTT_PACKET_ID_INVALID )
716 {
717 pCommand = pMqttAgentContext->agentInterface.getCommand( blockTimeMs );
718
719 if( pCommand != NULL )
720 {
721 statusReturn = createCommand( commandType,
722 pMqttAgentContext,
723 pMqttInfoParam,
724 commandCompleteCallback,
725 pCommandCompleteCallbackContext,
726 pCommand );
727
728 if( statusReturn == MQTTSuccess )
729 {
730 statusReturn = addCommandToQueue( pMqttAgentContext, pCommand, blockTimeMs );
731 }
732
733 if( statusReturn != MQTTSuccess )
734 {
735 /* Could not send the command to the queue so release the command
736 * structure again. */
737 commandReleased = pMqttAgentContext->agentInterface.releaseCommand( pCommand );
738
739 if( !commandReleased )
740 {
741 LogError( ( "Command %p could not be released.",
742 ( void * ) pCommand ) );
743 }
744 }
745 }
746 else
747 {
748 /* Ran out of MQTTAgentCommand_t structures - pool is empty. */
749 statusReturn = MQTTNoMemory;
750 }
751 }
752 else
753 {
754 LogError( ( "MQTT context must be initialized." ) );
755 }
756
757 return statusReturn;
758 }
759
760 /*-----------------------------------------------------------*/
761
concludeCommand(const MQTTAgentContext_t * pAgentContext,MQTTAgentCommand_t * pCommand,MQTTStatus_t returnCode,uint8_t * pSubackCodes)762 static void concludeCommand( const MQTTAgentContext_t * pAgentContext,
763 MQTTAgentCommand_t * pCommand,
764 MQTTStatus_t returnCode,
765 uint8_t * pSubackCodes )
766 {
767 bool commandReleased = false;
768 MQTTAgentReturnInfo_t returnInfo;
769
770 ( void ) memset( &returnInfo, 0x00, sizeof( MQTTAgentReturnInfo_t ) );
771 assert( pAgentContext != NULL );
772 assert( pAgentContext->agentInterface.releaseCommand != NULL );
773 assert( pCommand != NULL );
774
775 returnInfo.returnCode = returnCode;
776 returnInfo.pSubackCodes = pSubackCodes;
777
778 if( pCommand->pCommandCompleteCallback != NULL )
779 {
780 pCommand->pCommandCompleteCallback( pCommand->pCmdContext, &returnInfo );
781 }
782
783 commandReleased = pAgentContext->agentInterface.releaseCommand( pCommand );
784
785 if( !commandReleased )
786 {
787 LogError( ( "Failed to release command %p of type %d.",
788 ( void * ) pCommand,
789 pCommand->commandType ) );
790 }
791 }
792
793 /*-----------------------------------------------------------*/
794
resendPublishes(MQTTAgentContext_t * pMqttAgentContext)795 static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext )
796 {
797 MQTTStatus_t statusResult = MQTTSuccess;
798 MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
799 uint16_t packetId = MQTT_PACKET_ID_INVALID;
800 MQTTAgentAckInfo_t * pFoundAck = NULL;
801 MQTTPublishInfo_t * pOriginalPublish = NULL;
802 MQTTContext_t * pMqttContext;
803
804 assert( pMqttAgentContext != NULL );
805 pMqttContext = &( pMqttAgentContext->mqttContext );
806
807 packetId = MQTT_PublishToResend( pMqttContext, &cursor );
808
809 while( packetId != MQTT_PACKET_ID_INVALID )
810 {
811 /* Retrieve the operation but do not remove it from the list. */
812 pFoundAck = getAwaitingOperation( pMqttAgentContext, packetId );
813
814 if( pFoundAck != NULL )
815 {
816 /* Set the DUP flag. */
817 pOriginalPublish = ( MQTTPublishInfo_t * ) ( pFoundAck->pOriginalCommand->pArgs );
818 pOriginalPublish->dup = true;
819 statusResult = MQTT_Publish( pMqttContext, pOriginalPublish, packetId );
820
821 if( statusResult != MQTTSuccess )
822 {
823 concludeCommand( pMqttAgentContext, pFoundAck->pOriginalCommand, statusResult, NULL );
824 ( void ) memset( pFoundAck, 0x00, sizeof( MQTTAgentAckInfo_t ) );
825 LogError( ( "Failed to resend publishes. Error code=%s\n", MQTT_Status_strerror( statusResult ) ) );
826 break;
827 }
828 }
829
830 packetId = MQTT_PublishToResend( pMqttContext, &cursor );
831 }
832
833 return statusResult;
834 }
835
836 /*-----------------------------------------------------------*/
837
clearPendingAcknowledgments(MQTTAgentContext_t * pMqttAgentContext,bool clearOnlySubUnsubEntries)838 static void clearPendingAcknowledgments( MQTTAgentContext_t * pMqttAgentContext,
839 bool clearOnlySubUnsubEntries )
840 {
841 size_t i = 0;
842 MQTTAgentAckInfo_t * pendingAcks;
843
844 assert( pMqttAgentContext != NULL );
845
846 pendingAcks = pMqttAgentContext->pPendingAcks;
847
848 /* Clear all operations pending acknowledgments. */
849 for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
850 {
851 if( pendingAcks[ i ].packetId != MQTT_PACKET_ID_INVALID )
852 {
853 bool clearEntry = true;
854
855 assert( pendingAcks[ i ].pOriginalCommand != NULL );
856
857 if( clearOnlySubUnsubEntries &&
858 ( pendingAcks[ i ].pOriginalCommand->commandType != SUBSCRIBE ) &&
859 ( pendingAcks[ i ].pOriginalCommand->commandType != UNSUBSCRIBE ) )
860 {
861 clearEntry = false;
862 }
863
864 if( clearEntry )
865 {
866 /* Receive failed to indicate network error. */
867 concludeCommand( pMqttAgentContext, pendingAcks[ i ].pOriginalCommand, MQTTRecvFailed, NULL );
868
869 /* Now remove it from the list. */
870 ( void ) memset( &( pendingAcks[ i ] ), 0x00, sizeof( MQTTAgentAckInfo_t ) );
871 }
872 }
873 }
874 }
875
876 /*-----------------------------------------------------------*/
877
validateStruct(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)878 static bool validateStruct( const MQTTAgentContext_t * pMqttAgentContext,
879 const MQTTAgentCommandInfo_t * pCommandInfo )
880 {
881 bool ret = false;
882
883 if( ( pMqttAgentContext == NULL ) ||
884 ( pCommandInfo == NULL ) )
885 {
886 LogError( ( "Pointer cannot be NULL. pMqttAgentContext=%p, pCommandInfo=%p.",
887 ( void * ) pMqttAgentContext,
888 ( void * ) pCommandInfo ) );
889 }
890 else if( ( pMqttAgentContext->agentInterface.send == NULL ) ||
891 ( pMqttAgentContext->agentInterface.recv == NULL ) ||
892 ( pMqttAgentContext->agentInterface.getCommand == NULL ) ||
893 ( pMqttAgentContext->agentInterface.releaseCommand == NULL ) ||
894 ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
895 {
896 LogError( ( "pMqttAgentContext must have initialized its messaging interface." ) );
897 }
898 else
899 {
900 ret = true;
901 }
902
903 return ret;
904 }
905
906 /*-----------------------------------------------------------*/
907
validateParams(MQTTAgentCommandType_t commandType,const void * pParams)908 static bool validateParams( MQTTAgentCommandType_t commandType,
909 const void * pParams )
910 {
911 bool ret = false;
912 const MQTTAgentConnectArgs_t * pConnectArgs = NULL;
913 const MQTTAgentSubscribeArgs_t * pSubscribeArgs = NULL;
914
915 assert( ( commandType == CONNECT ) || ( commandType == PUBLISH ) ||
916 ( commandType == SUBSCRIBE ) || ( commandType == UNSUBSCRIBE ) );
917
918 switch( commandType )
919 {
920 case CONNECT:
921 pConnectArgs = ( const MQTTAgentConnectArgs_t * ) pParams;
922 ret = ( ( pConnectArgs != NULL ) &&
923 ( pConnectArgs->pConnectInfo != NULL ) );
924 break;
925
926 case SUBSCRIBE:
927 case UNSUBSCRIBE:
928 pSubscribeArgs = ( const MQTTAgentSubscribeArgs_t * ) pParams;
929 ret = ( ( pSubscribeArgs != NULL ) &&
930 ( pSubscribeArgs->pSubscribeInfo != NULL ) &&
931 ( pSubscribeArgs->numSubscriptions != 0U ) );
932 break;
933
934 case PUBLISH:
935 default:
936 /* Publish, does not need to be cast since we do not check it. */
937 ret = ( pParams != NULL );
938 break;
939 }
940
941 return ret;
942 }
943
944 /*-----------------------------------------------------------*/
945
MQTTAgent_Init(MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentMessageInterface_t * pMsgInterface,const MQTTFixedBuffer_t * pNetworkBuffer,const TransportInterface_t * pTransportInterface,MQTTGetCurrentTimeFunc_t getCurrentTimeMs,MQTTAgentIncomingPublishCallback_t incomingCallback,void * pIncomingPacketContext)946 MQTTStatus_t MQTTAgent_Init( MQTTAgentContext_t * pMqttAgentContext,
947 const MQTTAgentMessageInterface_t * pMsgInterface,
948 const MQTTFixedBuffer_t * pNetworkBuffer,
949 const TransportInterface_t * pTransportInterface,
950 MQTTGetCurrentTimeFunc_t getCurrentTimeMs,
951 MQTTAgentIncomingPublishCallback_t incomingCallback,
952 void * pIncomingPacketContext )
953 {
954 MQTTStatus_t returnStatus;
955
956 if( ( pMqttAgentContext == NULL ) ||
957 ( pMsgInterface == NULL ) ||
958 ( pTransportInterface == NULL ) ||
959 ( getCurrentTimeMs == NULL ) ||
960 ( incomingCallback == NULL ) )
961 {
962 returnStatus = MQTTBadParameter;
963 }
964 else if( ( pMsgInterface->pMsgCtx == NULL ) ||
965 ( pMsgInterface->send == NULL ) ||
966 ( pMsgInterface->recv == NULL ) ||
967 ( pMsgInterface->getCommand == NULL ) ||
968 ( pMsgInterface->releaseCommand == NULL ) )
969 {
970 LogError( ( "Invalid parameter: pMsgInterface must set all members." ) );
971 returnStatus = MQTTBadParameter;
972 }
973 else
974 {
975 ( void ) memset( pMqttAgentContext, 0x00, sizeof( MQTTAgentContext_t ) );
976
977 returnStatus = MQTT_Init( &( pMqttAgentContext->mqttContext ),
978 pTransportInterface,
979 getCurrentTimeMs,
980 mqttEventCallback,
981 pNetworkBuffer );
982
983 if( returnStatus == MQTTSuccess )
984 {
985 pMqttAgentContext->pIncomingCallback = incomingCallback;
986 pMqttAgentContext->pIncomingCallbackContext = pIncomingPacketContext;
987 pMqttAgentContext->agentInterface = *pMsgInterface;
988 }
989 }
990
991 return returnStatus;
992 }
993
994 /*-----------------------------------------------------------*/
995
MQTTAgent_CommandLoop(MQTTAgentContext_t * pMqttAgentContext)996 MQTTStatus_t MQTTAgent_CommandLoop( MQTTAgentContext_t * pMqttAgentContext )
997 {
998 MQTTAgentCommand_t * pCommand;
999 MQTTStatus_t operationStatus = MQTTSuccess;
1000 bool endLoop = false;
1001
1002 /* The command queue should have been created before this task gets created. */
1003 if( ( pMqttAgentContext == NULL ) || ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
1004 {
1005 operationStatus = MQTTBadParameter;
1006 }
1007
1008 /* Loop until an error or we receive a terminate command. */
1009 while( operationStatus == MQTTSuccess )
1010 {
1011 /* Wait for the next command, if any. */
1012 pCommand = NULL;
1013 ( void ) pMqttAgentContext->agentInterface.recv(
1014 pMqttAgentContext->agentInterface.pMsgCtx,
1015 &( pCommand ),
1016 MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME
1017 );
1018 operationStatus = processCommand( pMqttAgentContext, pCommand, &endLoop );
1019
1020 if( operationStatus != MQTTSuccess )
1021 {
1022 LogError( ( "MQTT operation failed with status %s\n",
1023 MQTT_Status_strerror( operationStatus ) ) );
1024 }
1025
1026 /* Terminate the loop on disconnects, errors, or the termination command. */
1027 if( endLoop )
1028 {
1029 break;
1030 }
1031 }
1032
1033 return operationStatus;
1034 }
1035
1036 /*-----------------------------------------------------------*/
1037
MQTTAgent_ResumeSession(MQTTAgentContext_t * pMqttAgentContext,bool sessionPresent)1038 MQTTStatus_t MQTTAgent_ResumeSession( MQTTAgentContext_t * pMqttAgentContext,
1039 bool sessionPresent )
1040 {
1041 MQTTStatus_t statusResult = MQTTSuccess;
1042
1043 /* If the packet ID is zero then the MQTT context has not been initialized as 0
1044 * is the initial value but not a valid packet ID. */
1045 if( ( pMqttAgentContext != NULL ) &&
1046 ( pMqttAgentContext->mqttContext.nextPacketId != MQTT_PACKET_ID_INVALID ) )
1047 {
1048 /* Resend publishes if session is present. NOTE: It's possible that some
1049 * of the operations that were in progress during the network interruption
1050 * were subscribes. In that case, we would want to mark those operations
1051 * as completing with error and remove them from the list of operations, so
1052 * that the calling task can try subscribing again. */
1053 if( sessionPresent )
1054 {
1055 /* The session has resumed, so clear any SUBSCRIBE/UNSUBSCRIBE operations
1056 * that were pending acknowledgments in the previous connection. */
1057 clearPendingAcknowledgments( pMqttAgentContext, true );
1058
1059 statusResult = resendPublishes( pMqttAgentContext );
1060 }
1061
1062 /* If we wanted to resume a session but none existed with the broker, we
1063 * should mark all in progress operations as errors so that the tasks that
1064 * created them can try again. */
1065 else
1066 {
1067 /* We have a clean session, so clear all operations pending acknowledgments. */
1068 clearPendingAcknowledgments( pMqttAgentContext, false );
1069 }
1070 }
1071 else
1072 {
1073 statusResult = MQTTBadParameter;
1074 }
1075
1076 return statusResult;
1077 }
1078
1079 /*-----------------------------------------------------------*/
1080
MQTTAgent_CancelAll(MQTTAgentContext_t * pMqttAgentContext)1081 MQTTStatus_t MQTTAgent_CancelAll( MQTTAgentContext_t * pMqttAgentContext )
1082 {
1083 MQTTStatus_t statusReturn = MQTTSuccess;
1084 MQTTAgentCommand_t * pReceivedCommand = NULL;
1085 bool commandWasReceived = false;
1086 MQTTAgentAckInfo_t * pendingAcks;
1087 size_t i;
1088
1089 if( ( pMqttAgentContext == NULL ) || ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
1090 {
1091 statusReturn = MQTTBadParameter;
1092 }
1093 else
1094 {
1095 /* Cancel all operations waiting in the queue. */
1096 do
1097 {
1098 pReceivedCommand = NULL;
1099 commandWasReceived = pMqttAgentContext->agentInterface.recv(
1100 pMqttAgentContext->agentInterface.pMsgCtx,
1101 &( pReceivedCommand ),
1102 0U );
1103
1104 if( pReceivedCommand != NULL )
1105 {
1106 concludeCommand( pMqttAgentContext, pReceivedCommand, MQTTRecvFailed, NULL );
1107 }
1108 } while( commandWasReceived );
1109
1110 pendingAcks = pMqttAgentContext->pPendingAcks;
1111
1112 /* Cancel any operations awaiting an acknowledgment. */
1113 for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
1114 {
1115 if( pendingAcks[ i ].packetId != MQTT_PACKET_ID_INVALID )
1116 {
1117 concludeCommand( pMqttAgentContext, pendingAcks[ i ].pOriginalCommand, MQTTRecvFailed, NULL );
1118
1119 /* Now remove it from the list. */
1120 ( void ) memset( &( pendingAcks[ i ] ), 0x00, sizeof( MQTTAgentAckInfo_t ) );
1121 }
1122 }
1123 }
1124
1125 return statusReturn;
1126 }
1127
1128 /*-----------------------------------------------------------*/
1129
MQTTAgent_Subscribe(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentSubscribeArgs_t * pSubscriptionArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1130 MQTTStatus_t MQTTAgent_Subscribe( const MQTTAgentContext_t * pMqttAgentContext,
1131 MQTTAgentSubscribeArgs_t * pSubscriptionArgs,
1132 const MQTTAgentCommandInfo_t * pCommandInfo )
1133 {
1134 MQTTStatus_t statusReturn = MQTTBadParameter;
1135 bool paramsValid = false;
1136
1137 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1138 validateParams( SUBSCRIBE, pSubscriptionArgs );
1139
1140 if( paramsValid )
1141 {
1142 statusReturn = createAndAddCommand( SUBSCRIBE, /* commandType */
1143 pMqttAgentContext, /* mqttContextHandle */
1144 pSubscriptionArgs, /* pMqttInfoParam */
1145 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1146 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1147 pCommandInfo->blockTimeMs );
1148 }
1149
1150 return statusReturn;
1151 }
1152
1153 /*-----------------------------------------------------------*/
1154
MQTTAgent_Unsubscribe(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentSubscribeArgs_t * pSubscriptionArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1155 MQTTStatus_t MQTTAgent_Unsubscribe( const MQTTAgentContext_t * pMqttAgentContext,
1156 MQTTAgentSubscribeArgs_t * pSubscriptionArgs,
1157 const MQTTAgentCommandInfo_t * pCommandInfo )
1158 {
1159 MQTTStatus_t statusReturn = MQTTBadParameter;
1160 bool paramsValid = false;
1161
1162 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1163 validateParams( UNSUBSCRIBE, pSubscriptionArgs );
1164
1165 if( paramsValid )
1166 {
1167 statusReturn = createAndAddCommand( UNSUBSCRIBE, /* commandType */
1168 pMqttAgentContext, /* mqttContextHandle */
1169 pSubscriptionArgs, /* pMqttInfoParam */
1170 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1171 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1172 pCommandInfo->blockTimeMs );
1173 }
1174
1175 return statusReturn;
1176 }
1177
1178 /*-----------------------------------------------------------*/
1179
MQTTAgent_Publish(const MQTTAgentContext_t * pMqttAgentContext,MQTTPublishInfo_t * pPublishInfo,const MQTTAgentCommandInfo_t * pCommandInfo)1180 MQTTStatus_t MQTTAgent_Publish( const MQTTAgentContext_t * pMqttAgentContext,
1181 MQTTPublishInfo_t * pPublishInfo,
1182 const MQTTAgentCommandInfo_t * pCommandInfo )
1183 {
1184 MQTTStatus_t statusReturn = MQTTBadParameter;
1185 bool paramsValid = false;
1186
1187 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1188 validateParams( PUBLISH, pPublishInfo );
1189
1190 if( paramsValid )
1191 {
1192 statusReturn = createAndAddCommand( PUBLISH, /* commandType */
1193 pMqttAgentContext, /* mqttContextHandle */
1194 pPublishInfo, /* pMqttInfoParam */
1195 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1196 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1197 pCommandInfo->blockTimeMs );
1198 }
1199
1200 return statusReturn;
1201 }
1202
1203 /*-----------------------------------------------------------*/
1204
MQTTAgent_ProcessLoop(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1205 MQTTStatus_t MQTTAgent_ProcessLoop( const MQTTAgentContext_t * pMqttAgentContext,
1206 const MQTTAgentCommandInfo_t * pCommandInfo )
1207 {
1208 MQTTStatus_t statusReturn = MQTTBadParameter;
1209 bool paramsValid = false;
1210
1211 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1212
1213 if( paramsValid )
1214 {
1215 statusReturn = createAndAddCommand( PROCESSLOOP, /* commandType */
1216 pMqttAgentContext, /* mqttContextHandle */
1217 NULL, /* pMqttInfoParam */
1218 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1219 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1220 pCommandInfo->blockTimeMs );
1221 }
1222
1223 return statusReturn;
1224 }
1225
1226 /*-----------------------------------------------------------*/
1227
MQTTAgent_Connect(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentConnectArgs_t * pConnectArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1228 MQTTStatus_t MQTTAgent_Connect( const MQTTAgentContext_t * pMqttAgentContext,
1229 MQTTAgentConnectArgs_t * pConnectArgs,
1230 const MQTTAgentCommandInfo_t * pCommandInfo )
1231 {
1232 MQTTStatus_t statusReturn = MQTTBadParameter;
1233 bool paramsValid = false;
1234
1235 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1236 validateParams( CONNECT, pConnectArgs );
1237
1238 if( paramsValid )
1239 {
1240 statusReturn = createAndAddCommand( CONNECT,
1241 pMqttAgentContext,
1242 pConnectArgs,
1243 pCommandInfo->cmdCompleteCallback,
1244 pCommandInfo->pCmdCompleteCallbackContext,
1245 pCommandInfo->blockTimeMs );
1246 }
1247
1248 return statusReturn;
1249 }
1250
1251 /*-----------------------------------------------------------*/
1252
MQTTAgent_Disconnect(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1253 MQTTStatus_t MQTTAgent_Disconnect( const MQTTAgentContext_t * pMqttAgentContext,
1254 const MQTTAgentCommandInfo_t * pCommandInfo )
1255 {
1256 MQTTStatus_t statusReturn = MQTTBadParameter;
1257 bool paramsValid = false;
1258
1259 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1260
1261 if( paramsValid )
1262 {
1263 statusReturn = createAndAddCommand( DISCONNECT, /* commandType */
1264 pMqttAgentContext, /* mqttContextHandle */
1265 NULL, /* pMqttInfoParam */
1266 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1267 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1268 pCommandInfo->blockTimeMs );
1269 }
1270
1271 return statusReturn;
1272 }
1273
1274 /*-----------------------------------------------------------*/
1275
MQTTAgent_Ping(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1276 MQTTStatus_t MQTTAgent_Ping( const MQTTAgentContext_t * pMqttAgentContext,
1277 const MQTTAgentCommandInfo_t * pCommandInfo )
1278 {
1279 MQTTStatus_t statusReturn = MQTTBadParameter;
1280 bool paramsValid = false;
1281
1282 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1283
1284 if( paramsValid )
1285 {
1286 statusReturn = createAndAddCommand( PING, /* commandType */
1287 pMqttAgentContext, /* mqttContextHandle */
1288 NULL, /* pMqttInfoParam */
1289 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1290 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1291 pCommandInfo->blockTimeMs );
1292 }
1293
1294 return statusReturn;
1295 }
1296
1297 /*-----------------------------------------------------------*/
1298
MQTTAgent_Terminate(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1299 MQTTStatus_t MQTTAgent_Terminate( const MQTTAgentContext_t * pMqttAgentContext,
1300 const MQTTAgentCommandInfo_t * pCommandInfo )
1301 {
1302 MQTTStatus_t statusReturn = MQTTBadParameter;
1303 bool paramsValid = false;
1304
1305 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1306
1307 if( paramsValid )
1308 {
1309 statusReturn = createAndAddCommand( TERMINATE,
1310 pMqttAgentContext,
1311 NULL,
1312 pCommandInfo->cmdCompleteCallback,
1313 pCommandInfo->pCmdCompleteCallbackContext,
1314 pCommandInfo->blockTimeMs );
1315 }
1316
1317 return statusReturn;
1318 }
1319
1320 /*-----------------------------------------------------------*/
1321