1 /***************************************************************************
2  * Copyright (c) 2024 Microsoft Corporation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the MIT License which is available at
6  * https://opensource.org/licenses/MIT.
7  *
8  * SPDX-License-Identifier: MIT
9  **************************************************************************/
10 
11 
12 /**************************************************************************/
13 /**************************************************************************/
14 /**                                                                       */
15 /** ThreadX Component                                                     */
16 /**                                                                       */
17 /**   Queue                                                               */
18 /**                                                                       */
19 /**************************************************************************/
20 /**************************************************************************/
21 
22 #define TX_SOURCE_CODE
23 
24 
25 /* Include necessary system files.  */
26 
27 #include "tx_api.h"
28 #include "tx_trace.h"
29 #include "tx_thread.h"
30 #include "tx_queue.h"
31 
32 
33 /**************************************************************************/
34 /*                                                                        */
35 /*  FUNCTION                                               RELEASE        */
36 /*                                                                        */
37 /*    _tx_queue_receive                                   PORTABLE C      */
38 /*                                                           6.1          */
39 /*  AUTHOR                                                                */
40 /*                                                                        */
41 /*    William E. Lamie, Microsoft Corporation                             */
42 /*                                                                        */
43 /*  DESCRIPTION                                                           */
44 /*                                                                        */
45 /*    This function receives a message from the specified queue. If there */
46 /*    are no messages in the queue, this function waits according to the  */
47 /*    option specified.                                                   */
48 /*                                                                        */
49 /*  INPUT                                                                 */
50 /*                                                                        */
51 /*    queue_ptr                         Pointer to queue control block    */
52 /*    destination_ptr                   Pointer to message destination    */
53 /*                                        **** MUST BE LARGE ENOUGH TO    */
54 /*                                             HOLD MESSAGE ****          */
55 /*    wait_option                       Suspension option                 */
56 /*                                                                        */
57 /*  OUTPUT                                                                */
58 /*                                                                        */
59 /*    status                            Completion status                 */
60 /*                                                                        */
61 /*  CALLS                                                                 */
62 /*                                                                        */
63 /*    _tx_thread_system_resume          Resume thread routine             */
64 /*    _tx_thread_system_ni_resume       Non-interruptable resume thread   */
65 /*    _tx_thread_system_suspend         Suspend thread routine            */
66 /*    _tx_thread_system_ni_suspend      Non-interruptable suspend thread  */
67 /*                                                                        */
68 /*  CALLED BY                                                             */
69 /*                                                                        */
70 /*    Application Code                                                    */
71 /*                                                                        */
72 /*  RELEASE HISTORY                                                       */
73 /*                                                                        */
74 /*    DATE              NAME                      DESCRIPTION             */
75 /*                                                                        */
76 /*  05-19-2020     William E. Lamie         Initial Version 6.0           */
77 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
78 /*                                            resulting in version 6.1    */
79 /*                                                                        */
80 /**************************************************************************/
_tx_queue_receive(TX_QUEUE * queue_ptr,VOID * destination_ptr,ULONG wait_option)81 UINT  _tx_queue_receive(TX_QUEUE *queue_ptr, VOID *destination_ptr, ULONG wait_option)
82 {
83 
84 TX_INTERRUPT_SAVE_AREA
85 
86 TX_THREAD       *thread_ptr;
87 ULONG           *source;
88 ULONG           *destination;
89 UINT            size;
90 UINT            suspended_count;
91 TX_THREAD       *next_thread;
92 TX_THREAD       *previous_thread;
93 UINT            status;
94 
95 
96     /* Default the status to TX_SUCCESS.  */
97     status =  TX_SUCCESS;
98 
99     /* Disable interrupts to receive message from queue.  */
100     TX_DISABLE
101 
102 #ifdef TX_QUEUE_ENABLE_PERFORMANCE_INFO
103 
104     /* Increment the total messages received counter.  */
105     _tx_queue_performance__messages_received_count++;
106 
107     /* Increment the number of messages received from this queue.  */
108     queue_ptr -> tx_queue_performance_messages_received_count++;
109 
110 #endif
111 
112     /* If trace is enabled, insert this event into the trace buffer.  */
113     TX_TRACE_IN_LINE_INSERT(TX_TRACE_QUEUE_RECEIVE, queue_ptr, TX_POINTER_TO_ULONG_CONVERT(destination_ptr), wait_option, queue_ptr -> tx_queue_enqueued, TX_TRACE_QUEUE_EVENTS)
114 
115     /* Log this kernel call.  */
116     TX_EL_QUEUE_RECEIVE_INSERT
117 
118     /* Pickup the thread suspension count.  */
119     suspended_count =  queue_ptr -> tx_queue_suspended_count;
120 
121     /* Determine if there is anything in the queue.  */
122     if (queue_ptr -> tx_queue_enqueued != TX_NO_MESSAGES)
123     {
124 
125         /* Determine if there are any suspensions.  */
126         if (suspended_count == TX_NO_SUSPENSIONS)
127         {
128 
129             /* There is a message waiting in the queue and there are no suspensi.  */
130 
131             /* Setup source and destination pointers.  */
132             source =       queue_ptr -> tx_queue_read;
133             destination =  TX_VOID_TO_ULONG_POINTER_CONVERT(destination_ptr);
134             size =         queue_ptr -> tx_queue_message_size;
135 
136             /* Copy message. Note that the source and destination pointers are
137                incremented by the macro.  */
138             TX_QUEUE_MESSAGE_COPY(source, destination, size)
139 
140             /* Determine if we are at the end.  */
141             if (source == queue_ptr -> tx_queue_end)
142             {
143 
144                 /* Yes, wrap around to the beginning.  */
145                 source =  queue_ptr -> tx_queue_start;
146             }
147 
148             /* Setup the queue read pointer.   */
149             queue_ptr -> tx_queue_read =  source;
150 
151             /* Increase the amount of available storage.  */
152             queue_ptr -> tx_queue_available_storage++;
153 
154             /* Decrease the enqueued count.  */
155             queue_ptr -> tx_queue_enqueued--;
156 
157             /* Restore interrupts.  */
158             TX_RESTORE
159         }
160         else
161         {
162 
163             /* At this point we know the queue is full.  */
164 
165             /* Pickup thread suspension list head pointer.  */
166             thread_ptr =  queue_ptr -> tx_queue_suspension_list;
167 
168             /* Now determine if there is a queue front suspension active.   */
169 
170             /* Is the front suspension flag set?  */
171             if (thread_ptr -> tx_thread_suspend_option == TX_TRUE)
172             {
173 
174                 /* Yes, a queue front suspension is present.  */
175 
176                 /* Return the message associated with this suspension.  */
177 
178                 /* Setup source and destination pointers.  */
179                 source =       TX_VOID_TO_ULONG_POINTER_CONVERT(thread_ptr -> tx_thread_additional_suspend_info);
180                 destination =  TX_VOID_TO_ULONG_POINTER_CONVERT(destination_ptr);
181                 size =         queue_ptr -> tx_queue_message_size;
182 
183                 /* Copy message. Note that the source and destination pointers are
184                    incremented by the macro.  */
185                 TX_QUEUE_MESSAGE_COPY(source, destination, size)
186 
187                 /* Message is now in the caller's destination. See if this is the only suspended thread
188                    on the list.  */
189                 suspended_count--;
190                 if (suspended_count == TX_NO_SUSPENSIONS)
191                 {
192 
193                     /* Yes, the only suspended thread.  */
194 
195                     /* Update the head pointer.  */
196                     queue_ptr -> tx_queue_suspension_list =  TX_NULL;
197                 }
198                 else
199                 {
200 
201                     /* At least one more thread is on the same expiration list.  */
202 
203                     /* Update the list head pointer.  */
204                     next_thread =                            thread_ptr -> tx_thread_suspended_next;
205                     queue_ptr -> tx_queue_suspension_list =  next_thread;
206 
207                     /* Update the links of the adjacent threads.  */
208                     previous_thread =                              thread_ptr -> tx_thread_suspended_previous;
209                     next_thread -> tx_thread_suspended_previous =  previous_thread;
210                     previous_thread -> tx_thread_suspended_next =  next_thread;
211                 }
212 
213                 /* Decrement the suspension count.  */
214                 queue_ptr -> tx_queue_suspended_count =  suspended_count;
215 
216                 /* Prepare for resumption of the first thread.  */
217 
218                 /* Clear cleanup routine to avoid timeout.  */
219                 thread_ptr -> tx_thread_suspend_cleanup =  TX_NULL;
220 
221                 /* Put return status into the thread control block.  */
222                 thread_ptr -> tx_thread_suspend_status =  TX_SUCCESS;
223 
224 #ifdef TX_NOT_INTERRUPTABLE
225 
226                 /* Resume the thread!  */
227                 _tx_thread_system_ni_resume(thread_ptr);
228 
229                 /* Restore interrupts.  */
230                 TX_RESTORE
231 #else
232 
233                 /* Temporarily disable preemption.  */
234                 _tx_thread_preempt_disable++;
235 
236                 /* Restore interrupts.  */
237                 TX_RESTORE
238 
239                 /* Resume thread.  */
240                 _tx_thread_system_resume(thread_ptr);
241 #endif
242             }
243             else
244             {
245 
246                 /* At this point, we know that the queue is full and there
247                    are one or more threads suspended trying to send another
248                    message to this queue.  */
249 
250                 /* Setup source and destination pointers.  */
251                 source =       queue_ptr -> tx_queue_read;
252                 destination =  TX_VOID_TO_ULONG_POINTER_CONVERT(destination_ptr);
253                 size =         queue_ptr -> tx_queue_message_size;
254 
255                 /* Copy message. Note that the source and destination pointers are
256                    incremented by the macro.  */
257                 TX_QUEUE_MESSAGE_COPY(source, destination, size)
258 
259                 /* Determine if we are at the end.  */
260                 if (source == queue_ptr -> tx_queue_end)
261                 {
262 
263                     /* Yes, wrap around to the beginning.  */
264                     source =  queue_ptr -> tx_queue_start;
265                 }
266 
267                 /* Setup the queue read pointer.   */
268                 queue_ptr -> tx_queue_read =  source;
269 
270                 /* Disable preemption.  */
271                 _tx_thread_preempt_disable++;
272 
273 #ifdef TX_NOT_INTERRUPTABLE
274 
275                 /* Restore interrupts.  */
276                 TX_RESTORE
277 
278                 /* Interrupts are enabled briefly here to keep the interrupt
279                    lockout time deterministic.  */
280 
281                 /* Disable interrupts again.  */
282                 TX_DISABLE
283 #endif
284 
285                 /* Decrement the preemption disable variable.  */
286                 _tx_thread_preempt_disable--;
287 
288                 /* Setup source and destination pointers.  */
289                 source =       TX_VOID_TO_ULONG_POINTER_CONVERT(thread_ptr -> tx_thread_additional_suspend_info);
290                 destination =  queue_ptr -> tx_queue_write;
291                 size =         queue_ptr -> tx_queue_message_size;
292 
293                 /* Copy message. Note that the source and destination pointers are
294                    incremented by the macro.  */
295                 TX_QUEUE_MESSAGE_COPY(source, destination, size)
296 
297                 /* Determine if we are at the end.  */
298                 if (destination == queue_ptr -> tx_queue_end)
299                 {
300 
301                     /* Yes, wrap around to the beginning.  */
302                     destination =  queue_ptr -> tx_queue_start;
303                 }
304 
305                 /* Adjust the write pointer.  */
306                 queue_ptr -> tx_queue_write =  destination;
307 
308                 /* Pickup thread pointer.  */
309                 thread_ptr =  queue_ptr -> tx_queue_suspension_list;
310 
311                 /* Message is now in the queue.  See if this is the only suspended thread
312                    on the list.  */
313                 suspended_count--;
314                 if (suspended_count == TX_NO_SUSPENSIONS)
315                 {
316 
317                   /* Yes, the only suspended thread.  */
318 
319                     /* Update the head pointer.  */
320                     queue_ptr -> tx_queue_suspension_list =  TX_NULL;
321                 }
322                 else
323                 {
324 
325                     /* At least one more thread is on the same expiration list.  */
326 
327                     /* Update the list head pointer.  */
328                     next_thread =                            thread_ptr -> tx_thread_suspended_next;
329                     queue_ptr -> tx_queue_suspension_list =  next_thread;
330 
331                     /* Update the links of the adjacent threads.  */
332                     previous_thread =                               thread_ptr -> tx_thread_suspended_previous;
333                     next_thread -> tx_thread_suspended_previous =   previous_thread;
334                     previous_thread -> tx_thread_suspended_next =   next_thread;
335                 }
336 
337                 /* Decrement the suspension count.  */
338                 queue_ptr -> tx_queue_suspended_count =  suspended_count;
339 
340                 /* Prepare for resumption of the first thread.  */
341 
342                 /* Clear cleanup routine to avoid timeout.  */
343                 thread_ptr -> tx_thread_suspend_cleanup =  TX_NULL;
344 
345                 /* Put return status into the thread control block.  */
346                 thread_ptr -> tx_thread_suspend_status =  TX_SUCCESS;
347 
348 #ifdef TX_NOT_INTERRUPTABLE
349 
350                 /* Resume the thread!  */
351                 _tx_thread_system_ni_resume(thread_ptr);
352 
353                 /* Restore interrupts.  */
354                 TX_RESTORE
355 #else
356 
357                 /* Temporarily disable preemption.  */
358                 _tx_thread_preempt_disable++;
359 
360                 /* Restore interrupts.  */
361                 TX_RESTORE
362 
363                 /* Resume thread.  */
364                 _tx_thread_system_resume(thread_ptr);
365 #endif
366             }
367         }
368     }
369 
370     /* Determine if the request specifies suspension.  */
371     else if (wait_option != TX_NO_WAIT)
372     {
373 
374         /* Determine if the preempt disable flag is non-zero.  */
375         if (_tx_thread_preempt_disable != ((UINT) 0))
376         {
377 
378             /* Restore interrupts.  */
379             TX_RESTORE
380 
381             /* Suspension is not allowed if the preempt disable flag is non-zero at this point - return error completion.  */
382             status =  TX_QUEUE_EMPTY;
383         }
384         else
385         {
386 
387             /* Prepare for suspension of this thread.  */
388 
389 #ifdef TX_QUEUE_ENABLE_PERFORMANCE_INFO
390 
391             /* Increment the total queue empty suspensions counter.  */
392             _tx_queue_performance_empty_suspension_count++;
393 
394             /* Increment the number of empty suspensions on this queue.  */
395             queue_ptr -> tx_queue_performance_empty_suspension_count++;
396 #endif
397 
398             /* Pickup thread pointer.  */
399             TX_THREAD_GET_CURRENT(thread_ptr)
400 
401             /* Setup cleanup routine pointer.  */
402             thread_ptr -> tx_thread_suspend_cleanup =  &(_tx_queue_cleanup);
403 
404             /* Setup cleanup information, i.e. this queue control
405                block and the source pointer.  */
406             thread_ptr -> tx_thread_suspend_control_block =    (VOID *) queue_ptr;
407             thread_ptr -> tx_thread_additional_suspend_info =  (VOID *) destination_ptr;
408             thread_ptr -> tx_thread_suspend_option =           TX_FALSE;
409 
410 #ifndef TX_NOT_INTERRUPTABLE
411 
412             /* Increment the suspension sequence number, which is used to identify
413                this suspension event.  */
414             thread_ptr -> tx_thread_suspension_sequence++;
415 #endif
416 
417             /* Setup suspension list.  */
418             if (suspended_count == TX_NO_SUSPENSIONS)
419             {
420 
421                 /* No other threads are suspended.  Setup the head pointer and
422                    just setup this threads pointers to itself.  */
423                 queue_ptr -> tx_queue_suspension_list =         thread_ptr;
424                 thread_ptr -> tx_thread_suspended_next =        thread_ptr;
425                 thread_ptr -> tx_thread_suspended_previous =    thread_ptr;
426             }
427             else
428             {
429 
430                 /* This list is not NULL, add current thread to the end. */
431                 next_thread =                                   queue_ptr -> tx_queue_suspension_list;
432                 thread_ptr -> tx_thread_suspended_next =        next_thread;
433                 previous_thread =                               next_thread -> tx_thread_suspended_previous;
434                 thread_ptr -> tx_thread_suspended_previous =    previous_thread;
435                 previous_thread -> tx_thread_suspended_next =   thread_ptr;
436                 next_thread -> tx_thread_suspended_previous =   thread_ptr;
437             }
438 
439             /* Increment the suspended thread count.  */
440             queue_ptr -> tx_queue_suspended_count =  suspended_count + ((UINT) 1);
441 
442             /* Set the state to suspended.  */
443             thread_ptr -> tx_thread_state =    TX_QUEUE_SUSP;
444 
445 #ifdef TX_NOT_INTERRUPTABLE
446 
447             /* Call actual non-interruptable thread suspension routine.  */
448             _tx_thread_system_ni_suspend(thread_ptr, wait_option);
449 
450             /* Restore interrupts.  */
451             TX_RESTORE
452 #else
453 
454             /* Set the suspending flag.  */
455             thread_ptr -> tx_thread_suspending =  TX_TRUE;
456 
457             /* Setup the timeout period.  */
458             thread_ptr -> tx_thread_timer.tx_timer_internal_remaining_ticks =  wait_option;
459 
460             /* Temporarily disable preemption.  */
461             _tx_thread_preempt_disable++;
462 
463             /* Restore interrupts.  */
464             TX_RESTORE
465 
466             /* Call actual thread suspension routine.  */
467             _tx_thread_system_suspend(thread_ptr);
468 #endif
469 
470             /* Return the completion status.  */
471             status =  thread_ptr -> tx_thread_suspend_status;
472         }
473     }
474     else
475     {
476 
477         /* Restore interrupts.  */
478         TX_RESTORE
479 
480         /* Immediate return, return error completion.  */
481         status =  TX_QUEUE_EMPTY;
482     }
483 
484     /* Return completion status.  */
485     return(status);
486 }
487 
488