1 /*
2 * coreMQTT Agent v1.2.0
3 * Copyright (C) 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
6 * this software and associated documentation files (the "Software"), to deal in
7 * the Software without restriction, including without limitation the rights to
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
9 * the Software, and to permit persons to whom the Software is furnished to do so,
10 * subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in all
13 * copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 */
22
23 /**
24 * @file core_mqtt_agent.c
25 * @brief Implements an MQTT agent (or daemon task) to enable multithreaded access to
26 * coreMQTT.
27 *
28 * @note Implements an MQTT agent (or daemon task) on top of the coreMQTT MQTT client
29 * library. The agent makes coreMQTT usage thread safe by being the only task (or
30 * thread) in the system that is allowed to access the native coreMQTT API - and in
31 * so doing, serializes all access to coreMQTT even when multiple tasks are using the
32 * same MQTT connection.
33 *
34 * The agent provides an equivalent API for each coreMQTT API. Whereas coreMQTT
35 * APIs are prefixed "MQTT_", the agent APIs are prefixed "MQTTAgent_". For example,
36 * that agent's MQTTAgent_Publish() API is the thread safe equivalent to coreMQTT's
37 * MQTT_Publish() API.
38 */
39
40 /* Standard includes. */
41 #include <string.h>
42 #include <stdio.h>
43 #include <assert.h>
44
45 /* MQTT agent include. */
46 #include "core_mqtt_agent.h"
47 #include "core_mqtt_agent_command_functions.h"
48
49 /* MQTT Agent default logging configuration include. */
50 #include "core_mqtt_agent_default_logging.h"
51
52 /*-----------------------------------------------------------*/
53
54 #if ( MQTT_AGENT_USE_QOS_1_2_PUBLISH != 0 )
55
56 /**
57 * @brief Array used to maintain the outgoing publish records and their
58 * state by the coreMQTT library.
59 */
60 static MQTTPubAckInfo_t pOutgoingPublishRecords[ MQTT_AGENT_MAX_OUTSTANDING_ACKS ];
61
62 /**
63 * @brief Array used to maintain the incoming publish records and their
64 * state by the coreMQTT library.
65 */
66 static MQTTPubAckInfo_t pIncomingPublishRecords[ MQTT_AGENT_MAX_OUTSTANDING_ACKS ];
67 #endif
68
69 /**
70 * @brief Track an operation by adding it to a list, indicating it is anticipating
71 * an acknowledgment.
72 *
73 * @param[in] pAgentContext Agent context for the MQTT connection.
74 * @param[in] packetId Packet ID of pending ack.
75 * @param[in] pCommand Pointer to command that is expecting an ack.
76 *
77 * @return Returns one of the following:
78 * - #MQTTSuccess if an entry was added for the to the list.
79 * - #MQTTStateCollision if there already exists an entry for the same packet ID
80 * in the list.
81 * - #MQTTNoMemory if there is no space available in the list for adding a
82 * new entry.
83 */
84 static MQTTStatus_t addAwaitingOperation( MQTTAgentContext_t * pAgentContext,
85 uint16_t packetId,
86 MQTTAgentCommand_t * pCommand );
87
88 /**
89 * @brief Retrieve an operation from the list of pending acks, and optionally
90 * remove it from the list.
91 *
92 * @param[in] pAgentContext Agent context for the MQTT connection.
93 * @param[in] incomingPacketId Packet ID of incoming ack.
94 *
95 * @return Pointer to stored information about the operation awaiting the ack.
96 * Returns NULL if the packet ID is zero or original command does not exist.
97 */
98 static MQTTAgentAckInfo_t * getAwaitingOperation( MQTTAgentContext_t * pAgentContext,
99 uint16_t incomingPacketId );
100
101 /**
102 * @brief Populate the parameters of a #MQTTAgentCommand struct.
103 *
104 * @param[in] commandType Type of command. For example, publish or subscribe.
105 * @param[in] pMqttAgentContext Pointer to MQTT context to use for command.
106 * @param[in] pMqttInfoParam Pointer to MQTTPublishInfo_t or MQTTSubscribeInfo_t.
107 * @param[in] commandCompleteCallback Callback for when command completes.
108 * @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
109 * @param[out] pCommand Pointer to initialized command.
110 *
111 * @return #MQTTSuccess if all necessary fields for the command are passed,
112 * else an enumerated error code.
113 */
114 static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
115 const MQTTAgentContext_t * pMqttAgentContext,
116 void * pMqttInfoParam,
117 MQTTAgentCommandCallback_t commandCompleteCallback,
118 MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
119 MQTTAgentCommand_t * pCommand );
120
121 /**
122 * @brief Add a command to the global command queue.
123 *
124 * @param[in] pAgentContext Agent context for the MQTT connection.
125 * @param[in] pCommand Pointer to command to copy to queue.
126 * @param[in] blockTimeMs The maximum amount of time to milliseconds to wait in the
127 * Blocked state (so not consuming any CPU time) for the command to be posted to the
128 * queue should the queue already be full.
129 *
130 * @return #MQTTSuccess if the command was added to the queue, else an enumerated
131 * error code.
132 */
133 static MQTTStatus_t addCommandToQueue( const MQTTAgentContext_t * pAgentContext,
134 MQTTAgentCommand_t * pCommand,
135 uint32_t blockTimeMs );
136
137 /**
138 * @brief Process a #MQTTAgentCommand struct.
139 *
140 * @note This agent does not check existing subscriptions before sending a
141 * SUBSCRIBE or UNSUBSCRIBE packet. If a subscription already exists, then
142 * a SUBSCRIBE packet will be sent anyway, and if multiple tasks are subscribed
143 * to a topic filter, then they will all be unsubscribed after an UNSUBSCRIBE.
144 *
145 * @param[in] pMqttAgentContext Agent context for MQTT connection.
146 * @param[in] pCommand Pointer to command to process.
147 * @param[out] pEndLoop Whether the command loop should terminate.
148 *
149 * @return status of MQTT library API call.
150 */
151 static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
152 MQTTAgentCommand_t * pCommand,
153 bool * pEndLoop );
154
155 /**
156 * @brief Dispatch incoming publishes and acks to their various handler functions.
157 *
158 * @param[in] pMqttContext MQTT Context
159 * @param[in] pPacketInfo Pointer to incoming packet.
160 * @param[in] pDeserializedInfo Pointer to deserialized information from
161 * the incoming packet.
162 */
163 static void mqttEventCallback( MQTTContext_t * pMqttContext,
164 MQTTPacketInfo_t * pPacketInfo,
165 MQTTDeserializedInfo_t * pDeserializedInfo );
166
167 /**
168 * @brief Mark a command as complete after receiving an acknowledgment packet.
169 *
170 * @param[in] pAgentContext Agent context for the MQTT connection.
171 * @param[in] pPacketInfo Pointer to incoming packet.
172 * @param[in] pDeserializedInfo Pointer to deserialized information from
173 * the incoming packet.
174 * @param[in] pAckInfo Pointer to stored information for the original operation
175 * resulting in the received packet.
176 * @param[in] packetType The type of the incoming packet, either SUBACK, UNSUBACK,
177 * PUBACK, or PUBCOMP.
178 */
179 static void handleAcks( const MQTTAgentContext_t * pAgentContext,
180 const MQTTPacketInfo_t * pPacketInfo,
181 const MQTTDeserializedInfo_t * pDeserializedInfo,
182 MQTTAgentAckInfo_t * pAckInfo,
183 uint8_t packetType );
184
185 /**
186 * @brief Retrieve a pointer to an agent context given an MQTT context.
187 *
188 * @param[in] pMQTTContext MQTT Context to search for.
189 *
190 * @return Pointer to agent context, or NULL.
191 */
192 static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContext );
193
194 /**
195 * @brief Helper function for creating a command and adding it to the command
196 * queue.
197 *
198 * @param[in] commandType Type of command.
199 * @param[in] pMqttAgentContext Handle of the MQTT connection to use.
200 * @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
201 * @param[in] commandCompleteCallback Callback for when command completes.
202 * @param[in] pMqttInfoParam Pointer to command argument.
203 * @param[in] blockTimeMs Maximum amount of time in milliseconds to wait (in the
204 * Blocked state, so not consuming any CPU time) for the command to be posted to the
205 * MQTT agent should the MQTT agent's event queue be full.
206 *
207 * @return #MQTTSuccess if the command was posted to the MQTT agent's event queue.
208 * Otherwise an enumerated error code.
209 */
210 static MQTTStatus_t createAndAddCommand( MQTTAgentCommandType_t commandType,
211 const MQTTAgentContext_t * pMqttAgentContext,
212 void * pMqttInfoParam,
213 MQTTAgentCommandCallback_t commandCompleteCallback,
214 MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
215 uint32_t blockTimeMs );
216
217 /**
218 * @brief Helper function to mark a command as complete and invoke its callback.
219 * This function calls the releaseCommand callback.
220 *
221 * @param[in] pAgentContext Agent context for the MQTT connection.
222 * @param[in] pCommand Command to complete.
223 * @param[in] returnCode Return status of command.
224 * @param[in] pSubackCodes Pointer to suback array, if command is a SUBSCRIBE.
225 */
226 static void concludeCommand( const MQTTAgentContext_t * pAgentContext,
227 MQTTAgentCommand_t * pCommand,
228 MQTTStatus_t returnCode,
229 uint8_t * pSubackCodes );
230
231 /**
232 * @brief Resend QoS 1 and 2 publishes after resuming a session.
233 *
234 * @param[in] pMqttAgentContext Agent context for the MQTT connection.
235 *
236 * @return #MQTTSuccess if all publishes resent successfully, else error code
237 * from #MQTT_Publish.
238 */
239 static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext );
240
241 /**
242 * @brief Clears the list of pending acknowledgments by invoking each callback
243 * with #MQTTRecvFailed either for ALL operations in the list OR only for
244 * Subscribe/Unsubscribe operations.
245 *
246 * @param[in] pMqttAgentContext Agent context of the MQTT connection.
247 * @param[in] clearOnlySubUnsubEntries Flag indicating whether all entries OR
248 * entries pertaining to only Subscribe/Unsubscribe operations should be cleaned
249 * from the list.
250 */
251 static void clearPendingAcknowledgments( MQTTAgentContext_t * pMqttAgentContext,
252 bool clearOnlySubUnsubEntries );
253
254 /**
255 * @brief Validate an #MQTTAgentContext_t and a #MQTTAgentCommandInfo_t from API
256 * functions.
257 *
258 * @param[in] pMqttAgentContext #MQTTAgentContext_t to validate.
259 * @param[in] pCommandInfo #MQTTAgentCommandInfo_t to validate.
260 *
261 * @return `true` if parameters are valid, else `false`.
262 */
263 static bool validateStruct( const MQTTAgentContext_t * pMqttAgentContext,
264 const MQTTAgentCommandInfo_t * pCommandInfo );
265
266 /**
267 * @brief Validate the parameters for a CONNECT, SUBSCRIBE, UNSUBSCRIBE
268 * or PUBLISH.
269 *
270 * @param[in] commandType CONNECT, SUBSCRIBE, UNSUBSCRIBE, or PUBLISH.
271 * @param[in] pParams Parameter structure to validate.
272 *
273 * @return `true` if parameter structure is valid, else `false`.
274 */
275 static bool validateParams( MQTTAgentCommandType_t commandType,
276 const void * pParams );
277
278 /**
279 * @brief Called before accepting any PUBLISH or SUBSCRIBE messages to check
280 * there is space in the pending ACK list for the outgoing PUBLISH or SUBSCRIBE.
281 *
282 * @note Because the MQTT agent is inherently multi threaded, and this function
283 * is called from the context of the application task and not the MQTT agent
284 * task, this function can only return a best effort result. It can definitely
285 * say if there is space for a new pending ACK when the function is called, but
286 * the case of space being exhausted when the agent executes a command that
287 * results in an ACK must still be handled.
288 *
289 * @param[in] pAgentContext Pointer to the context for the MQTT connection to
290 * which the PUBLISH or SUBSCRIBE message is to be sent.
291 *
292 * @return true if there is space in that MQTT connection's ACK list, otherwise
293 * false;
294 */
295 static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext );
296
297 /*-----------------------------------------------------------*/
298
isSpaceInPendingAckList(const MQTTAgentContext_t * pAgentContext)299 static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext )
300 {
301 const MQTTAgentAckInfo_t * pendingAcks;
302 bool spaceFound = false;
303 size_t i;
304
305 assert( pAgentContext != NULL );
306
307 pendingAcks = pAgentContext->pPendingAcks;
308
309 /* Are there any open slots? */
310 for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
311 {
312 /* If the packetId is MQTT_PACKET_ID_INVALID then the array space is
313 * not in use. */
314 if( pendingAcks[ i ].packetId == MQTT_PACKET_ID_INVALID )
315 {
316 spaceFound = true;
317 break;
318 }
319 }
320
321 return spaceFound;
322 }
323
324 /*-----------------------------------------------------------*/
325
addAwaitingOperation(MQTTAgentContext_t * pAgentContext,uint16_t packetId,MQTTAgentCommand_t * pCommand)326 static MQTTStatus_t addAwaitingOperation( MQTTAgentContext_t * pAgentContext,
327 uint16_t packetId,
328 MQTTAgentCommand_t * pCommand )
329 {
330 size_t i = 0, unusedPos = MQTT_AGENT_MAX_OUTSTANDING_ACKS;
331 MQTTStatus_t status = MQTTNoMemory;
332 MQTTAgentAckInfo_t * pendingAcks = NULL;
333
334 assert( pAgentContext != NULL );
335 assert( pCommand != NULL );
336 assert( packetId != MQTT_PACKET_ID_INVALID );
337 pendingAcks = pAgentContext->pPendingAcks;
338
339 /* Before adding the record for the pending acknowledgement of the packet ID,
340 * make sure that there doesn't already exist an entry for the same packet ID.
341 * Also, as part of iterating through the list of pending acknowledgements,
342 * find an unused space for the packet ID to be added, if it can be. */
343 for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
344 {
345 /* If the packetId is MQTT_PACKET_ID_INVALID then the array space is not in
346 * use. */
347 if( ( unusedPos == MQTT_AGENT_MAX_OUTSTANDING_ACKS ) &&
348 ( pendingAcks[ i ].packetId == MQTT_PACKET_ID_INVALID ) )
349 {
350 unusedPos = i;
351 status = MQTTSuccess;
352 }
353
354 if( pendingAcks[ i ].packetId == packetId )
355 {
356 /* Check whether there exists a duplicate entry for pending
357 * acknowledgment for the same packet ID that we want to add to
358 * the list.
359 * Note: This is an unlikely edge case which represents that a packet ID
360 * didn't receive acknowledgment, but subsequent SUBSCRIBE/PUBLISH operations
361 * representing 65535 packet IDs were successful that caused the bit packet
362 * ID value to wrap around and reached the same packet ID as that was still
363 * pending acknowledgment.
364 */
365 status = MQTTStateCollision;
366 LogError( ( "Failed to add operation to list of pending acknowledgments: "
367 "Existing entry found for same packet: PacketId=%u\n", packetId ) );
368 break;
369 }
370 }
371
372 /* Add the packet ID to the list if there is space available, and there is no
373 * duplicate entry for the same packet ID found. */
374 if( status == MQTTSuccess )
375 {
376 pendingAcks[ unusedPos ].packetId = packetId;
377 pendingAcks[ unusedPos ].pOriginalCommand = pCommand;
378 }
379 else if( status == MQTTNoMemory )
380 {
381 LogError( ( "Failed to add operation to list of pending acknowledgments: "
382 "No memory available: PacketId=%u\n", packetId ) );
383 }
384 else
385 {
386 /* Empty else MISRA 15.7 */
387 }
388
389 return status;
390 }
391
392 /*-----------------------------------------------------------*/
393
getAwaitingOperation(MQTTAgentContext_t * pAgentContext,uint16_t incomingPacketId)394 static MQTTAgentAckInfo_t * getAwaitingOperation( MQTTAgentContext_t * pAgentContext,
395 uint16_t incomingPacketId )
396 {
397 size_t i = 0;
398 MQTTAgentAckInfo_t * pFoundAck = NULL;
399
400 assert( pAgentContext != NULL );
401
402 /* Look through the array of packet IDs that are still waiting to be acked to
403 * find one with incomingPacketId. */
404 for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
405 {
406 if( pAgentContext->pPendingAcks[ i ].packetId == incomingPacketId )
407 {
408 pFoundAck = &( pAgentContext->pPendingAcks[ i ] );
409 break;
410 }
411 }
412
413 if( pFoundAck == NULL )
414 {
415 LogError( ( "No ack found for packet id %u.\n", incomingPacketId ) );
416 }
417 else if( ( pFoundAck->pOriginalCommand == NULL ) || ( pFoundAck->packetId == 0U ) )
418 {
419 LogError( ( "Found ack had empty fields. PacketId=%hu, Original Command=%p",
420 ( unsigned short ) pFoundAck->packetId,
421 ( void * ) pFoundAck->pOriginalCommand ) );
422 ( void ) memset( pFoundAck, 0x00, sizeof( MQTTAgentAckInfo_t ) );
423 pFoundAck = NULL;
424 }
425 else
426 {
427 /* Empty else MISRA 15.7 */
428 }
429
430 return pFoundAck;
431 }
432
433 /*-----------------------------------------------------------*/
434
createCommand(MQTTAgentCommandType_t commandType,const MQTTAgentContext_t * pMqttAgentContext,void * pMqttInfoParam,MQTTAgentCommandCallback_t commandCompleteCallback,MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,MQTTAgentCommand_t * pCommand)435 static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
436 const MQTTAgentContext_t * pMqttAgentContext,
437 void * pMqttInfoParam,
438 MQTTAgentCommandCallback_t commandCompleteCallback,
439 MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
440 MQTTAgentCommand_t * pCommand )
441 {
442 bool isValid, isSpace = true;
443 MQTTStatus_t statusReturn;
444 const MQTTPublishInfo_t * pPublishInfo;
445 size_t uxHeaderBytes;
446 const size_t uxControlAndLengthBytes = ( size_t ) 4; /* Control, remaining length and length bytes. */
447
448 assert( pMqttAgentContext != NULL );
449 assert( pCommand != NULL );
450
451 ( void ) memset( pCommand, 0x00, sizeof( MQTTAgentCommand_t ) );
452
453 /* Determine if required parameters are present in context. */
454 switch( commandType )
455 {
456 case SUBSCRIBE:
457 case UNSUBSCRIBE:
458 assert( pMqttInfoParam != NULL );
459
460 /* This message type results in the broker returning an ACK. The
461 * agent maintains an array of outstanding ACK messages. See if
462 * the array contains space for another outstanding ack. */
463 isSpace = isSpaceInPendingAckList( pMqttAgentContext );
464
465 isValid = isSpace;
466
467 break;
468
469 case PUBLISH:
470 pPublishInfo = ( const MQTTPublishInfo_t * ) pMqttInfoParam;
471
472 /* Calculate the space consumed by everything other than the
473 * payload. */
474 uxHeaderBytes = uxControlAndLengthBytes;
475 uxHeaderBytes += pPublishInfo->topicNameLength;
476
477 /* This message type results in the broker returning an ACK. The
478 * agent maintains an array of outstanding ACK messages. See if
479 * the array contains space for another outstanding ack. QoS0
480 * publish does not result in an ack so it doesn't matter if
481 * there is no space in the ACK array. */
482 if( pPublishInfo->qos != MQTTQoS0 )
483 {
484 isSpace = isSpaceInPendingAckList( pMqttAgentContext );
485 }
486
487 /* Will the message fit in the defined buffer? */
488 isValid = ( uxHeaderBytes < pMqttAgentContext->mqttContext.networkBuffer.size ) &&
489 ( isSpace == true );
490
491 break;
492
493 case PROCESSLOOP:
494 case PING:
495 case CONNECT:
496 case DISCONNECT:
497 default:
498 /* Other operations don't need to store ACKs. */
499 isValid = true;
500 break;
501 }
502
503 if( isValid )
504 {
505 pCommand->commandType = commandType;
506 pCommand->pArgs = pMqttInfoParam;
507 pCommand->pCmdContext = pCommandCompleteCallbackContext;
508 pCommand->pCommandCompleteCallback = commandCompleteCallback;
509 }
510
511 statusReturn = ( isValid ) ? MQTTSuccess : MQTTBadParameter;
512
513 if( ( statusReturn == MQTTBadParameter ) && ( isSpace == false ) )
514 {
515 /* The error was caused not by a bad parameter, but because there was
516 * no room in the pending Ack list for the Ack response to an outgoing
517 * PUBLISH or SUBSCRIBE message. */
518 statusReturn = MQTTNoMemory;
519 }
520
521 return statusReturn;
522 }
523
524 /*-----------------------------------------------------------*/
525
addCommandToQueue(const MQTTAgentContext_t * pAgentContext,MQTTAgentCommand_t * pCommand,uint32_t blockTimeMs)526 static MQTTStatus_t addCommandToQueue( const MQTTAgentContext_t * pAgentContext,
527 MQTTAgentCommand_t * pCommand,
528 uint32_t blockTimeMs )
529 {
530 bool queueStatus;
531
532 assert( pAgentContext != NULL );
533 assert( pCommand != NULL );
534
535 /* The application called an API function. The API function was validated and
536 * packed into a MQTTAgentCommand_t structure. Now post a reference to the MQTTAgentCommand_t
537 * structure to the MQTT agent for processing. */
538 queueStatus = pAgentContext->agentInterface.send(
539 pAgentContext->agentInterface.pMsgCtx,
540 &pCommand,
541 blockTimeMs
542 );
543
544 return ( queueStatus ) ? MQTTSuccess : MQTTSendFailed;
545 }
546
547 /*-----------------------------------------------------------*/
548
processCommand(MQTTAgentContext_t * pMqttAgentContext,MQTTAgentCommand_t * pCommand,bool * pEndLoop)549 static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
550 MQTTAgentCommand_t * pCommand,
551 bool * pEndLoop )
552 {
553 const MQTTAgentCommandFunc_t pCommandFunctionTable[ NUM_COMMANDS ] = MQTT_AGENT_FUNCTION_TABLE;
554 MQTTStatus_t operationStatus = MQTTSuccess;
555 bool ackAdded = false;
556 MQTTAgentCommandFunc_t commandFunction = NULL;
557 void * pCommandArgs = NULL;
558 MQTTAgentCommandFuncReturns_t commandOutParams = { 0 };
559
560 assert( pMqttAgentContext != NULL );
561 assert( pEndLoop != NULL );
562
563 if( pCommand != NULL )
564 {
565 assert( pCommand->commandType < NUM_COMMANDS );
566
567 if( ( pCommand->commandType >= NONE ) && ( pCommand->commandType < NUM_COMMANDS ) )
568 {
569 commandFunction = pCommandFunctionTable[ pCommand->commandType ];
570 pCommandArgs = pCommand->pArgs;
571 }
572 else
573 {
574 LogWarn( ( "An incorrect command type was received by the processCommand function."
575 " Type = %d.", pCommand->commandType ) );
576 commandFunction = pCommandFunctionTable[ NONE ];
577 }
578 }
579 else
580 {
581 commandFunction = pCommandFunctionTable[ NONE ];
582 }
583
584 operationStatus = commandFunction( pMqttAgentContext, pCommandArgs, &commandOutParams );
585
586 if( ( operationStatus == MQTTSuccess ) &&
587 commandOutParams.addAcknowledgment &&
588 ( commandOutParams.packetId != MQTT_PACKET_ID_INVALID ) )
589 {
590 operationStatus = addAwaitingOperation( pMqttAgentContext, commandOutParams.packetId, pCommand );
591 ackAdded = ( operationStatus == MQTTSuccess );
592 }
593
594 if( ( pCommand != NULL ) && ( ackAdded != true ) )
595 {
596 /* The command is complete, call the callback. */
597 concludeCommand( pMqttAgentContext, pCommand, operationStatus, NULL );
598 }
599
600 /* Run the process loop if there were no errors and the MQTT connection
601 * still exists. */
602 if( ( operationStatus == MQTTSuccess ) && commandOutParams.runProcessLoop )
603 {
604 do
605 {
606 pMqttAgentContext->packetReceivedInLoop = false;
607
608 if( ( ( operationStatus == MQTTSuccess ) || ( operationStatus == MQTTNeedMoreBytes ) ) &&
609 ( pMqttAgentContext->mqttContext.connectStatus == MQTTConnected ) )
610 {
611 operationStatus = MQTT_ProcessLoop( &( pMqttAgentContext->mqttContext ) );
612 }
613 } while( pMqttAgentContext->packetReceivedInLoop );
614 }
615
616 /* Set the flag to break from the command loop. */
617 *pEndLoop = ( commandOutParams.endLoop || ( operationStatus != MQTTSuccess ) );
618
619 return operationStatus;
620 }
621
622 /*-----------------------------------------------------------*/
623
handleAcks(const MQTTAgentContext_t * pAgentContext,const MQTTPacketInfo_t * pPacketInfo,const MQTTDeserializedInfo_t * pDeserializedInfo,MQTTAgentAckInfo_t * pAckInfo,uint8_t packetType)624 static void handleAcks( const MQTTAgentContext_t * pAgentContext,
625 const MQTTPacketInfo_t * pPacketInfo,
626 const MQTTDeserializedInfo_t * pDeserializedInfo,
627 MQTTAgentAckInfo_t * pAckInfo,
628 uint8_t packetType )
629 {
630 uint8_t * pSubackCodes = NULL;
631
632 assert( pAckInfo != NULL );
633 assert( pAckInfo->pOriginalCommand != NULL );
634
635 /* A SUBACK's status codes start 2 bytes after the variable header. */
636 pSubackCodes = ( packetType == MQTT_PACKET_TYPE_SUBACK ) ? ( pPacketInfo->pRemainingData + 2U ) : NULL;
637
638 concludeCommand( pAgentContext,
639 pAckInfo->pOriginalCommand,
640 pDeserializedInfo->deserializationResult,
641 pSubackCodes );
642
643 /* Clear the entry from the list. */
644 ( void ) memset( pAckInfo, 0x00, sizeof( MQTTAgentAckInfo_t ) );
645 }
646
647 /*-----------------------------------------------------------*/
648
getAgentFromMQTTContext(MQTTContext_t * pMQTTContext)649 static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContext )
650 {
651 MQTTAgentContext_t ctx = { 0 };
652 ptrdiff_t offset = ( ( uint8_t * ) &( ctx.mqttContext ) ) - ( ( uint8_t * ) &ctx );
653
654 return ( MQTTAgentContext_t * ) &( ( ( uint8_t * ) pMQTTContext )[ 0 - offset ] );
655 }
656
657 /*-----------------------------------------------------------*/
658
mqttEventCallback(MQTTContext_t * pMqttContext,MQTTPacketInfo_t * pPacketInfo,MQTTDeserializedInfo_t * pDeserializedInfo)659 static void mqttEventCallback( MQTTContext_t * pMqttContext,
660 MQTTPacketInfo_t * pPacketInfo,
661 MQTTDeserializedInfo_t * pDeserializedInfo )
662 {
663 MQTTAgentAckInfo_t * pAckInfo;
664 uint16_t packetIdentifier = pDeserializedInfo->packetIdentifier;
665 MQTTAgentContext_t * pAgentContext;
666 const uint8_t upperNibble = ( uint8_t ) 0xF0;
667
668 assert( pMqttContext != NULL );
669 assert( pPacketInfo != NULL );
670
671 pAgentContext = getAgentFromMQTTContext( pMqttContext );
672
673 /* This callback executes from within MQTT_ProcessLoop(). Setting this flag
674 * indicates that the callback executed so the caller of MQTT_ProcessLoop() knows
675 * it should call it again as there may be more data to process. */
676 pAgentContext->packetReceivedInLoop = true;
677
678 /* Handle incoming publish. The lower 4 bits of the publish packet type is used
679 * for the dup, QoS, and retain flags. Hence masking out the lower bits to check
680 * if the packet is publish. */
681 if( ( pPacketInfo->type & upperNibble ) == MQTT_PACKET_TYPE_PUBLISH )
682 {
683 pAgentContext->pIncomingCallback( pAgentContext, packetIdentifier, pDeserializedInfo->pPublishInfo );
684 }
685 else
686 {
687 /* Handle other packets. */
688 switch( pPacketInfo->type )
689 {
690 case MQTT_PACKET_TYPE_PUBACK:
691 case MQTT_PACKET_TYPE_PUBCOMP:
692 case MQTT_PACKET_TYPE_SUBACK:
693 case MQTT_PACKET_TYPE_UNSUBACK:
694 pAckInfo = getAwaitingOperation( pAgentContext, packetIdentifier );
695
696 if( pAckInfo != NULL )
697 {
698 /* This function will also clear the memory associated with
699 * the ack list entry. */
700 handleAcks( pAgentContext,
701 pPacketInfo,
702 pDeserializedInfo,
703 pAckInfo,
704 pPacketInfo->type );
705 }
706 else
707 {
708 LogError( ( "No operation found matching packet id %u.\n", packetIdentifier ) );
709 }
710
711 break;
712
713 /* Nothing to do for these packets since they don't indicate command completion. */
714 case MQTT_PACKET_TYPE_PUBREC:
715 case MQTT_PACKET_TYPE_PUBREL:
716 break;
717
718 /* Any other packet type is invalid. */
719 case MQTT_PACKET_TYPE_PINGRESP:
720 default:
721 LogError( ( "Unknown packet type received:(%02x).\n",
722 pPacketInfo->type ) );
723 break;
724 }
725 }
726 }
727
728 /*-----------------------------------------------------------*/
729
createAndAddCommand(MQTTAgentCommandType_t commandType,const MQTTAgentContext_t * pMqttAgentContext,void * pMqttInfoParam,MQTTAgentCommandCallback_t commandCompleteCallback,MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,uint32_t blockTimeMs)730 static MQTTStatus_t createAndAddCommand( MQTTAgentCommandType_t commandType,
731 const MQTTAgentContext_t * pMqttAgentContext,
732 void * pMqttInfoParam,
733 MQTTAgentCommandCallback_t commandCompleteCallback,
734 MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
735 uint32_t blockTimeMs )
736 {
737 MQTTStatus_t statusReturn = MQTTBadParameter;
738 MQTTAgentCommand_t * pCommand;
739 bool commandReleased = false;
740
741 /* If the packet ID is zero then the MQTT context has not been initialized as 0
742 * is the initial value but not a valid packet ID. */
743 if( pMqttAgentContext->mqttContext.nextPacketId != MQTT_PACKET_ID_INVALID )
744 {
745 pCommand = pMqttAgentContext->agentInterface.getCommand( blockTimeMs );
746
747 if( pCommand != NULL )
748 {
749 statusReturn = createCommand( commandType,
750 pMqttAgentContext,
751 pMqttInfoParam,
752 commandCompleteCallback,
753 pCommandCompleteCallbackContext,
754 pCommand );
755
756 if( statusReturn == MQTTSuccess )
757 {
758 statusReturn = addCommandToQueue( pMqttAgentContext, pCommand, blockTimeMs );
759 }
760
761 if( statusReturn != MQTTSuccess )
762 {
763 /* Could not send the command to the queue so release the command
764 * structure again. */
765 commandReleased = pMqttAgentContext->agentInterface.releaseCommand( pCommand );
766
767 if( !commandReleased )
768 {
769 LogError( ( "Command %p could not be released.",
770 ( void * ) pCommand ) );
771 }
772 }
773 }
774 else
775 {
776 /* Ran out of MQTTAgentCommand_t structures - pool is empty. */
777 statusReturn = MQTTNoMemory;
778 }
779 }
780 else
781 {
782 LogError( ( "MQTT context must be initialized." ) );
783 }
784
785 return statusReturn;
786 }
787
788 /*-----------------------------------------------------------*/
789
concludeCommand(const MQTTAgentContext_t * pAgentContext,MQTTAgentCommand_t * pCommand,MQTTStatus_t returnCode,uint8_t * pSubackCodes)790 static void concludeCommand( const MQTTAgentContext_t * pAgentContext,
791 MQTTAgentCommand_t * pCommand,
792 MQTTStatus_t returnCode,
793 uint8_t * pSubackCodes )
794 {
795 bool commandReleased = false;
796 MQTTAgentReturnInfo_t returnInfo;
797
798 ( void ) memset( &returnInfo, 0x00, sizeof( MQTTAgentReturnInfo_t ) );
799 assert( pAgentContext != NULL );
800 assert( pAgentContext->agentInterface.releaseCommand != NULL );
801 assert( pCommand != NULL );
802
803 returnInfo.returnCode = returnCode;
804 returnInfo.pSubackCodes = pSubackCodes;
805
806 if( pCommand->pCommandCompleteCallback != NULL )
807 {
808 pCommand->pCommandCompleteCallback( pCommand->pCmdContext, &returnInfo );
809 }
810
811 commandReleased = pAgentContext->agentInterface.releaseCommand( pCommand );
812
813 if( !commandReleased )
814 {
815 LogError( ( "Failed to release command %p of type %d.",
816 ( void * ) pCommand,
817 pCommand->commandType ) );
818 }
819 }
820
821 /*-----------------------------------------------------------*/
822
resendPublishes(MQTTAgentContext_t * pMqttAgentContext)823 static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext )
824 {
825 MQTTStatus_t statusResult = MQTTSuccess;
826 MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
827 uint16_t packetId = MQTT_PACKET_ID_INVALID;
828 MQTTAgentAckInfo_t * pFoundAck = NULL;
829 MQTTPublishInfo_t * pOriginalPublish = NULL;
830 MQTTContext_t * pMqttContext;
831
832 assert( pMqttAgentContext != NULL );
833 pMqttContext = &( pMqttAgentContext->mqttContext );
834
835 packetId = MQTT_PublishToResend( pMqttContext, &cursor );
836
837 while( packetId != MQTT_PACKET_ID_INVALID )
838 {
839 /* Retrieve the operation but do not remove it from the list. */
840 pFoundAck = getAwaitingOperation( pMqttAgentContext, packetId );
841
842 if( pFoundAck != NULL )
843 {
844 /* Set the DUP flag. */
845 pOriginalPublish = ( MQTTPublishInfo_t * ) ( pFoundAck->pOriginalCommand->pArgs );
846 pOriginalPublish->dup = true;
847 statusResult = MQTT_Publish( pMqttContext, pOriginalPublish, packetId );
848
849 if( statusResult != MQTTSuccess )
850 {
851 concludeCommand( pMqttAgentContext, pFoundAck->pOriginalCommand, statusResult, NULL );
852 ( void ) memset( pFoundAck, 0x00, sizeof( MQTTAgentAckInfo_t ) );
853 LogError( ( "Failed to resend publishes. Error code=%s\n", MQTT_Status_strerror( statusResult ) ) );
854 break;
855 }
856 }
857
858 packetId = MQTT_PublishToResend( pMqttContext, &cursor );
859 }
860
861 return statusResult;
862 }
863
864 /*-----------------------------------------------------------*/
865
clearPendingAcknowledgments(MQTTAgentContext_t * pMqttAgentContext,bool clearOnlySubUnsubEntries)866 static void clearPendingAcknowledgments( MQTTAgentContext_t * pMqttAgentContext,
867 bool clearOnlySubUnsubEntries )
868 {
869 size_t i = 0;
870 MQTTAgentAckInfo_t * pendingAcks;
871
872 assert( pMqttAgentContext != NULL );
873
874 pendingAcks = pMqttAgentContext->pPendingAcks;
875
876 /* Clear all operations pending acknowledgments. */
877 for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
878 {
879 if( pendingAcks[ i ].packetId != MQTT_PACKET_ID_INVALID )
880 {
881 bool clearEntry = true;
882
883 assert( pendingAcks[ i ].pOriginalCommand != NULL );
884
885 if( clearOnlySubUnsubEntries &&
886 ( pendingAcks[ i ].pOriginalCommand->commandType != SUBSCRIBE ) &&
887 ( pendingAcks[ i ].pOriginalCommand->commandType != UNSUBSCRIBE ) )
888 {
889 clearEntry = false;
890 }
891
892 if( clearEntry )
893 {
894 /* Receive failed to indicate network error. */
895 concludeCommand( pMqttAgentContext, pendingAcks[ i ].pOriginalCommand, MQTTRecvFailed, NULL );
896
897 /* Now remove it from the list. */
898 ( void ) memset( &( pendingAcks[ i ] ), 0x00, sizeof( MQTTAgentAckInfo_t ) );
899 }
900 }
901 }
902 }
903
904 /*-----------------------------------------------------------*/
905
validateStruct(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)906 static bool validateStruct( const MQTTAgentContext_t * pMqttAgentContext,
907 const MQTTAgentCommandInfo_t * pCommandInfo )
908 {
909 bool ret = false;
910
911 if( ( pMqttAgentContext == NULL ) ||
912 ( pCommandInfo == NULL ) )
913 {
914 LogError( ( "Pointer cannot be NULL. pMqttAgentContext=%p, pCommandInfo=%p.",
915 ( void * ) pMqttAgentContext,
916 ( void * ) pCommandInfo ) );
917 }
918 else if( ( pMqttAgentContext->agentInterface.send == NULL ) ||
919 ( pMqttAgentContext->agentInterface.recv == NULL ) ||
920 ( pMqttAgentContext->agentInterface.getCommand == NULL ) ||
921 ( pMqttAgentContext->agentInterface.releaseCommand == NULL ) ||
922 ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
923 {
924 LogError( ( "pMqttAgentContext must have initialized its messaging interface." ) );
925 }
926 else
927 {
928 ret = true;
929 }
930
931 return ret;
932 }
933
934 /*-----------------------------------------------------------*/
935
validateParams(MQTTAgentCommandType_t commandType,const void * pParams)936 static bool validateParams( MQTTAgentCommandType_t commandType,
937 const void * pParams )
938 {
939 bool ret = false;
940 const MQTTAgentConnectArgs_t * pConnectArgs = NULL;
941 const MQTTAgentSubscribeArgs_t * pSubscribeArgs = NULL;
942
943 assert( ( commandType == CONNECT ) || ( commandType == PUBLISH ) ||
944 ( commandType == SUBSCRIBE ) || ( commandType == UNSUBSCRIBE ) );
945
946 switch( commandType )
947 {
948 case CONNECT:
949 pConnectArgs = ( const MQTTAgentConnectArgs_t * ) pParams;
950 ret = ( ( pConnectArgs != NULL ) &&
951 ( pConnectArgs->pConnectInfo != NULL ) );
952 break;
953
954 case SUBSCRIBE:
955 case UNSUBSCRIBE:
956 pSubscribeArgs = ( const MQTTAgentSubscribeArgs_t * ) pParams;
957 ret = ( ( pSubscribeArgs != NULL ) &&
958 ( pSubscribeArgs->pSubscribeInfo != NULL ) &&
959 ( pSubscribeArgs->numSubscriptions != 0U ) );
960 break;
961
962 case PUBLISH:
963 default:
964 /* Publish, does not need to be cast since we do not check it. */
965 ret = ( pParams != NULL );
966 break;
967 }
968
969 return ret;
970 }
971
972 /*-----------------------------------------------------------*/
973
MQTTAgent_Init(MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentMessageInterface_t * pMsgInterface,const MQTTFixedBuffer_t * pNetworkBuffer,const TransportInterface_t * pTransportInterface,MQTTGetCurrentTimeFunc_t getCurrentTimeMs,MQTTAgentIncomingPublishCallback_t incomingCallback,void * pIncomingPacketContext)974 MQTTStatus_t MQTTAgent_Init( MQTTAgentContext_t * pMqttAgentContext,
975 const MQTTAgentMessageInterface_t * pMsgInterface,
976 const MQTTFixedBuffer_t * pNetworkBuffer,
977 const TransportInterface_t * pTransportInterface,
978 MQTTGetCurrentTimeFunc_t getCurrentTimeMs,
979 MQTTAgentIncomingPublishCallback_t incomingCallback,
980 void * pIncomingPacketContext )
981 {
982 MQTTStatus_t returnStatus;
983
984 if( ( pMqttAgentContext == NULL ) ||
985 ( pMsgInterface == NULL ) ||
986 ( pTransportInterface == NULL ) ||
987 ( getCurrentTimeMs == NULL ) ||
988 ( incomingCallback == NULL ) )
989 {
990 returnStatus = MQTTBadParameter;
991 }
992 else if( ( pMsgInterface->pMsgCtx == NULL ) ||
993 ( pMsgInterface->send == NULL ) ||
994 ( pMsgInterface->recv == NULL ) ||
995 ( pMsgInterface->getCommand == NULL ) ||
996 ( pMsgInterface->releaseCommand == NULL ) )
997 {
998 LogError( ( "Invalid parameter: pMsgInterface must set all members." ) );
999 returnStatus = MQTTBadParameter;
1000 }
1001 else
1002 {
1003 ( void ) memset( pMqttAgentContext, 0x00, sizeof( MQTTAgentContext_t ) );
1004
1005 returnStatus = MQTT_Init( &( pMqttAgentContext->mqttContext ),
1006 pTransportInterface,
1007 getCurrentTimeMs,
1008 mqttEventCallback,
1009 pNetworkBuffer );
1010
1011 #if ( MQTT_AGENT_USE_QOS_1_2_PUBLISH != 0 )
1012 {
1013 if( returnStatus == MQTTSuccess )
1014 {
1015 returnStatus = MQTT_InitStatefulQoS( &( pMqttAgentContext->mqttContext ),
1016 pOutgoingPublishRecords,
1017 MQTT_AGENT_MAX_OUTSTANDING_ACKS,
1018 pIncomingPublishRecords,
1019 MQTT_AGENT_MAX_OUTSTANDING_ACKS );
1020 }
1021 }
1022 #endif /* if ( MQTT_AGENT_USE_QOS_1_2_PUBLISH != 0 ) */
1023
1024 if( returnStatus == MQTTSuccess )
1025 {
1026 pMqttAgentContext->pIncomingCallback = incomingCallback;
1027 pMqttAgentContext->pIncomingCallbackContext = pIncomingPacketContext;
1028 pMqttAgentContext->agentInterface = *pMsgInterface;
1029 }
1030 }
1031
1032 return returnStatus;
1033 }
1034
1035 /*-----------------------------------------------------------*/
1036
MQTTAgent_CommandLoop(MQTTAgentContext_t * pMqttAgentContext)1037 MQTTStatus_t MQTTAgent_CommandLoop( MQTTAgentContext_t * pMqttAgentContext )
1038 {
1039 MQTTAgentCommand_t * pCommand;
1040 MQTTStatus_t operationStatus = MQTTSuccess;
1041 bool endLoop = false;
1042
1043 /* The command queue should have been created before this task gets created. */
1044 if( ( pMqttAgentContext == NULL ) || ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
1045 {
1046 operationStatus = MQTTBadParameter;
1047 }
1048
1049 /* Loop until an error or we receive a terminate command. */
1050 while( operationStatus == MQTTSuccess )
1051 {
1052 /* Wait for the next command, if any. */
1053 pCommand = NULL;
1054 ( void ) pMqttAgentContext->agentInterface.recv(
1055 pMqttAgentContext->agentInterface.pMsgCtx,
1056 &( pCommand ),
1057 MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME
1058 );
1059 operationStatus = processCommand( pMqttAgentContext, pCommand, &endLoop );
1060
1061 if( operationStatus != MQTTSuccess )
1062 {
1063 LogError( ( "MQTT operation failed with status %s\n",
1064 MQTT_Status_strerror( operationStatus ) ) );
1065 }
1066
1067 /* Terminate the loop on disconnects, errors, or the termination command. */
1068 if( endLoop )
1069 {
1070 break;
1071 }
1072 }
1073
1074 return operationStatus;
1075 }
1076
1077 /*-----------------------------------------------------------*/
1078
MQTTAgent_ResumeSession(MQTTAgentContext_t * pMqttAgentContext,bool sessionPresent)1079 MQTTStatus_t MQTTAgent_ResumeSession( MQTTAgentContext_t * pMqttAgentContext,
1080 bool sessionPresent )
1081 {
1082 MQTTStatus_t statusResult = MQTTSuccess;
1083
1084 /* If the packet ID is zero then the MQTT context has not been initialized as 0
1085 * is the initial value but not a valid packet ID. */
1086 if( ( pMqttAgentContext != NULL ) &&
1087 ( pMqttAgentContext->mqttContext.nextPacketId != MQTT_PACKET_ID_INVALID ) )
1088 {
1089 /* Resend publishes if session is present. NOTE: It's possible that some
1090 * of the operations that were in progress during the network interruption
1091 * were subscribes. In that case, we would want to mark those operations
1092 * as completing with error and remove them from the list of operations, so
1093 * that the calling task can try subscribing again. */
1094 if( sessionPresent )
1095 {
1096 /* The session has resumed, so clear any SUBSCRIBE/UNSUBSCRIBE operations
1097 * that were pending acknowledgments in the previous connection. */
1098 clearPendingAcknowledgments( pMqttAgentContext, true );
1099
1100 statusResult = resendPublishes( pMqttAgentContext );
1101 }
1102
1103 /* If we wanted to resume a session but none existed with the broker, we
1104 * should mark all in progress operations as errors so that the tasks that
1105 * created them can try again. */
1106 else
1107 {
1108 /* We have a clean session, so clear all operations pending acknowledgments. */
1109 clearPendingAcknowledgments( pMqttAgentContext, false );
1110 }
1111 }
1112 else
1113 {
1114 statusResult = MQTTBadParameter;
1115 }
1116
1117 return statusResult;
1118 }
1119
1120 /*-----------------------------------------------------------*/
1121
MQTTAgent_CancelAll(MQTTAgentContext_t * pMqttAgentContext)1122 MQTTStatus_t MQTTAgent_CancelAll( MQTTAgentContext_t * pMqttAgentContext )
1123 {
1124 MQTTStatus_t statusReturn = MQTTSuccess;
1125 MQTTAgentCommand_t * pReceivedCommand = NULL;
1126 bool commandWasReceived = false;
1127 MQTTAgentAckInfo_t * pendingAcks;
1128 size_t i;
1129
1130 if( ( pMqttAgentContext == NULL ) || ( pMqttAgentContext->agentInterface.pMsgCtx == NULL ) )
1131 {
1132 statusReturn = MQTTBadParameter;
1133 }
1134 else
1135 {
1136 /* Cancel all operations waiting in the queue. */
1137 do
1138 {
1139 pReceivedCommand = NULL;
1140 commandWasReceived = pMqttAgentContext->agentInterface.recv(
1141 pMqttAgentContext->agentInterface.pMsgCtx,
1142 &( pReceivedCommand ),
1143 0U );
1144
1145 if( pReceivedCommand != NULL )
1146 {
1147 concludeCommand( pMqttAgentContext, pReceivedCommand, MQTTRecvFailed, NULL );
1148 }
1149 } while( commandWasReceived );
1150
1151 pendingAcks = pMqttAgentContext->pPendingAcks;
1152
1153 /* Cancel any operations awaiting an acknowledgment. */
1154 for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
1155 {
1156 if( pendingAcks[ i ].packetId != MQTT_PACKET_ID_INVALID )
1157 {
1158 concludeCommand( pMqttAgentContext, pendingAcks[ i ].pOriginalCommand, MQTTRecvFailed, NULL );
1159
1160 /* Now remove it from the list. */
1161 ( void ) memset( &( pendingAcks[ i ] ), 0x00, sizeof( MQTTAgentAckInfo_t ) );
1162 }
1163 }
1164 }
1165
1166 return statusReturn;
1167 }
1168
1169 /*-----------------------------------------------------------*/
1170
MQTTAgent_Subscribe(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentSubscribeArgs_t * pSubscriptionArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1171 MQTTStatus_t MQTTAgent_Subscribe( const MQTTAgentContext_t * pMqttAgentContext,
1172 MQTTAgentSubscribeArgs_t * pSubscriptionArgs,
1173 const MQTTAgentCommandInfo_t * pCommandInfo )
1174 {
1175 MQTTStatus_t statusReturn = MQTTBadParameter;
1176 bool paramsValid = false;
1177
1178 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1179 validateParams( SUBSCRIBE, pSubscriptionArgs );
1180
1181 if( paramsValid )
1182 {
1183 statusReturn = createAndAddCommand( SUBSCRIBE, /* commandType */
1184 pMqttAgentContext, /* mqttContextHandle */
1185 pSubscriptionArgs, /* pMqttInfoParam */
1186 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1187 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1188 pCommandInfo->blockTimeMs );
1189 }
1190
1191 return statusReturn;
1192 }
1193
1194 /*-----------------------------------------------------------*/
1195
MQTTAgent_Unsubscribe(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentSubscribeArgs_t * pSubscriptionArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1196 MQTTStatus_t MQTTAgent_Unsubscribe( const MQTTAgentContext_t * pMqttAgentContext,
1197 MQTTAgentSubscribeArgs_t * pSubscriptionArgs,
1198 const MQTTAgentCommandInfo_t * pCommandInfo )
1199 {
1200 MQTTStatus_t statusReturn = MQTTBadParameter;
1201 bool paramsValid = false;
1202
1203 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1204 validateParams( UNSUBSCRIBE, pSubscriptionArgs );
1205
1206 if( paramsValid )
1207 {
1208 statusReturn = createAndAddCommand( UNSUBSCRIBE, /* commandType */
1209 pMqttAgentContext, /* mqttContextHandle */
1210 pSubscriptionArgs, /* pMqttInfoParam */
1211 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1212 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1213 pCommandInfo->blockTimeMs );
1214 }
1215
1216 return statusReturn;
1217 }
1218
1219 /*-----------------------------------------------------------*/
1220
MQTTAgent_Publish(const MQTTAgentContext_t * pMqttAgentContext,MQTTPublishInfo_t * pPublishInfo,const MQTTAgentCommandInfo_t * pCommandInfo)1221 MQTTStatus_t MQTTAgent_Publish( const MQTTAgentContext_t * pMqttAgentContext,
1222 MQTTPublishInfo_t * pPublishInfo,
1223 const MQTTAgentCommandInfo_t * pCommandInfo )
1224 {
1225 MQTTStatus_t statusReturn = MQTTBadParameter;
1226 bool paramsValid = false;
1227
1228 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1229 validateParams( PUBLISH, pPublishInfo );
1230
1231 if( paramsValid )
1232 {
1233 statusReturn = createAndAddCommand( PUBLISH, /* commandType */
1234 pMqttAgentContext, /* mqttContextHandle */
1235 pPublishInfo, /* pMqttInfoParam */
1236 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1237 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1238 pCommandInfo->blockTimeMs );
1239 }
1240
1241 return statusReturn;
1242 }
1243
1244 /*-----------------------------------------------------------*/
1245
MQTTAgent_ProcessLoop(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1246 MQTTStatus_t MQTTAgent_ProcessLoop( const MQTTAgentContext_t * pMqttAgentContext,
1247 const MQTTAgentCommandInfo_t * pCommandInfo )
1248 {
1249 MQTTStatus_t statusReturn = MQTTBadParameter;
1250 bool paramsValid = false;
1251
1252 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1253
1254 if( paramsValid )
1255 {
1256 statusReturn = createAndAddCommand( PROCESSLOOP, /* commandType */
1257 pMqttAgentContext, /* mqttContextHandle */
1258 NULL, /* pMqttInfoParam */
1259 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1260 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1261 pCommandInfo->blockTimeMs );
1262 }
1263
1264 return statusReturn;
1265 }
1266
1267 /*-----------------------------------------------------------*/
1268
MQTTAgent_Connect(const MQTTAgentContext_t * pMqttAgentContext,MQTTAgentConnectArgs_t * pConnectArgs,const MQTTAgentCommandInfo_t * pCommandInfo)1269 MQTTStatus_t MQTTAgent_Connect( const MQTTAgentContext_t * pMqttAgentContext,
1270 MQTTAgentConnectArgs_t * pConnectArgs,
1271 const MQTTAgentCommandInfo_t * pCommandInfo )
1272 {
1273 MQTTStatus_t statusReturn = MQTTBadParameter;
1274 bool paramsValid = false;
1275
1276 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
1277 validateParams( CONNECT, pConnectArgs );
1278
1279 if( paramsValid )
1280 {
1281 statusReturn = createAndAddCommand( CONNECT,
1282 pMqttAgentContext,
1283 pConnectArgs,
1284 pCommandInfo->cmdCompleteCallback,
1285 pCommandInfo->pCmdCompleteCallbackContext,
1286 pCommandInfo->blockTimeMs );
1287 }
1288
1289 return statusReturn;
1290 }
1291
1292 /*-----------------------------------------------------------*/
1293
MQTTAgent_Disconnect(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1294 MQTTStatus_t MQTTAgent_Disconnect( const MQTTAgentContext_t * pMqttAgentContext,
1295 const MQTTAgentCommandInfo_t * pCommandInfo )
1296 {
1297 MQTTStatus_t statusReturn = MQTTBadParameter;
1298 bool paramsValid = false;
1299
1300 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1301
1302 if( paramsValid )
1303 {
1304 statusReturn = createAndAddCommand( DISCONNECT, /* commandType */
1305 pMqttAgentContext, /* mqttContextHandle */
1306 NULL, /* pMqttInfoParam */
1307 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1308 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1309 pCommandInfo->blockTimeMs );
1310 }
1311
1312 return statusReturn;
1313 }
1314
1315 /*-----------------------------------------------------------*/
1316
MQTTAgent_Ping(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1317 MQTTStatus_t MQTTAgent_Ping( const MQTTAgentContext_t * pMqttAgentContext,
1318 const MQTTAgentCommandInfo_t * pCommandInfo )
1319 {
1320 MQTTStatus_t statusReturn = MQTTBadParameter;
1321 bool paramsValid = false;
1322
1323 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1324
1325 if( paramsValid )
1326 {
1327 statusReturn = createAndAddCommand( PING, /* commandType */
1328 pMqttAgentContext, /* mqttContextHandle */
1329 NULL, /* pMqttInfoParam */
1330 pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
1331 pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
1332 pCommandInfo->blockTimeMs );
1333 }
1334
1335 return statusReturn;
1336 }
1337
1338 /*-----------------------------------------------------------*/
1339
MQTTAgent_Terminate(const MQTTAgentContext_t * pMqttAgentContext,const MQTTAgentCommandInfo_t * pCommandInfo)1340 MQTTStatus_t MQTTAgent_Terminate( const MQTTAgentContext_t * pMqttAgentContext,
1341 const MQTTAgentCommandInfo_t * pCommandInfo )
1342 {
1343 MQTTStatus_t statusReturn = MQTTBadParameter;
1344 bool paramsValid = false;
1345
1346 paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
1347
1348 if( paramsValid )
1349 {
1350 statusReturn = createAndAddCommand( TERMINATE,
1351 pMqttAgentContext,
1352 NULL,
1353 pCommandInfo->cmdCompleteCallback,
1354 pCommandInfo->pCmdCompleteCallbackContext,
1355 pCommandInfo->blockTimeMs );
1356 }
1357
1358 return statusReturn;
1359 }
1360
1361 /*-----------------------------------------------------------*/
1362