1 /***************************************************************************
2  * Copyright (c) 2024 Microsoft Corporation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the MIT License which is available at
6  * https://opensource.org/licenses/MIT.
7  *
8  * SPDX-License-Identifier: MIT
9  **************************************************************************/
10 
11 
12 /**************************************************************************/
13 /**************************************************************************/
14 /**                                                                       */
15 /** POSIX wrapper for THREADX                                             */
16 /**                                                                       */
17 /**                                                                       */
18 /**                                                                       */
19 /**************************************************************************/
20 /**************************************************************************/
21 
22 /* Include necessary system files.  */
23 
24 #include "tx_api.h"     /* Threadx API */
25 #include "pthread.h"    /* Posix API */
26 #include "px_int.h"     /* Posix helper functions */
27 
28 
29 /**************************************************************************/
30 /*                                                                        */
31 /*  FUNCTION                                               RELEASE        */
32 /*                                                                        */
33 /*    mq_receive                                          PORTABLE C      */
34 /*                                                           6.2.0        */
35 /*  AUTHOR                                                                */
36 /*                                                                        */
37 /*    William E. Lamie, Microsoft Corporation                             */
38 /*                                                                        */
39 /*  DESCRIPTION                                                           */
40 /*                                                                        */
41 /*                                                                        */
42 /*    Receive a message from a message queue.                             */
43 /*                                                                        */
44 /*                                                                        */
45 /*  INPUT                                                                 */
46 /*                                                                        */
47 /*    mqdes                         message queue descriptor              */
48 /*   *pMsg                          buffer to receive message             */
49 /*    msgLen                        size of buffer, in bytes              */
50 /*   *pMsgPrio                      If not NULL, return message priority  */
51 /*                                                                        */
52 /*  OUTPUT                                                                */
53 /*                                                                        */
54 /*    temp1                         no of bytes received                  */
55 /*    ERROR                         If error occurs                       */
56 /*                                                                        */
57 /*  CALLS                                                                 */
58 /*                                                                        */
59 /*    posix_internal_error          Generic error handler                 */
60 /*    tx_queue_receive              ThreadX queue receive                 */
61 /*    tx_byte_release               Release bytes                         */
62 /*    posix_memory_allocate         Allocate memory                       */
63 /*    tx_thread_identify            Returns currently running thread      */
64 /*                                                                        */
65 /*  CALLED BY                                                             */
66 /*                                                                        */
67 /*    Application Code                                                    */
68 /*                                                                        */
69 /*  RELEASE HISTORY                                                       */
70 /*                                                                        */
71 /*    DATE              NAME                      DESCRIPTION             */
72 /*                                                                        */
73 /*  06-02-2021      William E. Lamie        Initial Version 6.1.7         */
74 /*  10-31-2022      Scott Larson            Add 64-bit support,           */
75 /*                                            resulting in version 6.2.0  */
76 /*                                                                        */
77 /**************************************************************************/
mq_receive(mqd_t mqdes,VOID * pMsg,size_t msgLen,ULONG * pMsgPrio)78 ssize_t mq_receive( mqd_t mqdes, VOID * pMsg, size_t msgLen, ULONG *pMsgPrio)
79 {
80 
81 TX_QUEUE            * Queue;
82 POSIX_MSG_QUEUE     * q_ptr;
83 INT                   temp1, retval = ERROR;
84 ULONG                 wait_option,length_of_message, priority_of_message,mycount;
85 ULONG               * my_ptr;
86 CHAR                * this_ptr;
87 VOID                * msgbuf1;
88 UCHAR               * msgbuf2;
89 VOID                * message_source;
90 
91     /* Assign a temporary variable for clarity.  */
92     Queue = &(mqdes->f_data->queue);
93     q_ptr = (POSIX_MSG_QUEUE * )mqdes->f_data;
94 
95     /* First, check for an invalid queue pointer.  */
96     if ((!q_ptr) || ( (q_ptr -> px_queue_id) != PX_QUEUE_ID))
97     {
98         /* Queue pointer is invalid, return appropriate error code.  */
99         posix_errno = EBADF;
100             posix_set_pthread_errno(EBADF);
101         /* Return ERROR.  */
102         return(ERROR);
103     }
104 
105     if(((mqdes ->f_flag & O_RDONLY) != O_RDONLY ) && ((mqdes->f_flag & O_RDWR) != O_RDWR))
106     {
107         /* Queue pointer is invalid, return appropriate error code.  */
108         posix_errno = EBADF;
109         posix_set_pthread_errno(EBADF);
110 
111         /* Return ERROR.  */
112         return(ERROR);
113     }
114 
115     /* Check if message length provided is less q size provided while creation.  */
116     if( msgLen < q_ptr -> q_attr.mq_msgsize )
117     {
118         /* Return appropriate error.  */
119         posix_errno = EMSGSIZE;
120         posix_set_pthread_errno(EMSGSIZE);
121 
122         /* Return error.  */
123         return(ERROR);
124     }
125 
126     /* User has indicated a wait is acceptable.  */
127     /* if a message is not immediately available.  */
128     /* If we are not calling this routine from a thread context.  */
129     if (!(tx_thread_identify()))
130     {
131         /* return appropriate error code.  */
132         posix_errno = EBADF;
133         posix_set_pthread_errno(EBADF);
134         /* Return ERROR.  */
135         return(ERROR);
136     }
137     if ( ( mqdes ->f_flag & O_NONBLOCK ) == O_NONBLOCK )
138             wait_option = TX_NO_WAIT;
139     else
140             wait_option = TX_WAIT_FOREVER;
141 
142 
143     /* Try to get a message from the message queue.  */
144     /* Create a temporary buffer to get message pointer and message length.  */
145     temp1 = posix_memory_allocate((sizeof(ULONG)) * TX_POSIX_MESSAGE_SIZE, (VOID**)&msgbuf1);
146     if(temp1 != TX_SUCCESS )
147     {
148         /* Return generic error.  */
149         posix_internal_error(100);
150 
151         /* Return error.  */
152         return(ERROR);
153     }
154     /* Arrange the messages in the queue as per the required priority.  */
155     temp1 = posix_arrange_msg( Queue, pMsgPrio );
156     /* Receive the message */
157     temp1 = tx_queue_receive(Queue, msgbuf1, wait_option);
158    /* Some ThreadX error codes map to posix error codes.  */
159     switch(temp1)
160    {
161         case TX_SUCCESS:
162         {
163 
164            /* All ok  */
165             temp1 = OK;
166             break;
167         }
168 
169         case TX_DELETED:
170         {
171             break;
172         }
173 
174         case TX_QUEUE_EMPTY:
175         {
176             if (wait_option == TX_NO_WAIT)
177             {
178                 /* No message to receive while NO_WAIT option is set  */
179                 posix_errno = EAGAIN;
180                 posix_set_pthread_errno(EAGAIN);
181 
182                 /* Return error  */
183                 temp1 = ERROR;
184                 return(temp1);
185             }
186             break;
187         }
188         case TX_QUEUE_ERROR:
189         case TX_PTR_ERROR:
190         {
191             /* Queue pointer is invalid, return appropriate error code.  */
192             posix_errno = EBADF;
193             posix_set_pthread_errno(EBADF);
194 
195             /* Return ERROR.  */
196             temp1 = ERROR;
197             return(temp1);
198         }
199 
200         default:
201         {
202             /* should not come here  but send the default error.  */
203             posix_errno = EBADF;
204             posix_set_pthread_errno(EBADF);
205 
206             /* Return error.  */
207             temp1 = ERROR;
208             return(temp1);
209         }
210     }
211 
212     /* Assign a variable for clarity.  */
213     my_ptr = ( ULONG *)msgbuf1;
214 
215     /* Retrieve Message pointer, message Length and message priority.  */
216 #ifdef TX_64_BIT
217     this_ptr          = (CHAR *)((((ALIGN_TYPE)my_ptr[0]) << 32) | my_ptr[1]);
218     length_of_message =  my_ptr[2];
219     priority_of_message = my_ptr[3];
220 #else
221     this_ptr          =   (CHAR *)(*my_ptr);
222     length_of_message =    *(++my_ptr);
223     priority_of_message = *(++my_ptr);
224 
225 #endif
226     message_source    = (VOID *)this_ptr;
227 
228     /* Copy message into supplied buffer.  */
229     msgbuf2 = (UCHAR *)pMsg;
230 
231     /* Release memory after storing data into destination.  */
232     retval = tx_byte_release(msgbuf1);
233 
234     if( retval)
235     {
236         /* return generic error.  */
237         posix_internal_error(100);
238 
239         /* Return error.  */
240         return(ERROR);
241     }
242 
243     if ( temp1 == OK)
244     {
245         for (mycount = 0; ( (mycount < length_of_message) && (mycount < msgLen)); mycount++)
246         {
247             *(msgbuf2++) = *((UCHAR *)(this_ptr++));
248         }
249 
250         temp1 = mycount;
251     }
252 
253     retval = tx_byte_release(message_source);
254 
255     if( retval)
256     {
257         /* return generic error.  */
258         posix_internal_error(100);
259 
260         /* Return error.  */
261         return(ERROR);
262     }
263 
264     /* Copy message priority */
265     if (pMsgPrio)
266     {
267         *pMsgPrio = priority_of_message;
268     }
269 
270     /* All done  */
271     return(length_of_message);
272 }
273