1 /***************************************************************************
2 * Copyright (c) 2024 Microsoft Corporation
3 *
4 * This program and the accompanying materials are made available under the
5 * terms of the MIT License which is available at
6 * https://opensource.org/licenses/MIT.
7 *
8 * SPDX-License-Identifier: MIT
9 **************************************************************************/
10
11
12 /**************************************************************************/
13 /**************************************************************************/
14 /** */
15 /** POSIX wrapper for THREADX */
16 /** */
17 /** */
18 /** */
19 /**************************************************************************/
20 /**************************************************************************/
21
22 /* Include necessary system files. */
23
24 #include "tx_api.h" /* Threadx API */
25 #include "pthread.h" /* Posix API */
26 #include "px_int.h" /* Posix helper functions */
27
28
29 /**************************************************************************/
30 /* */
31 /* FUNCTION RELEASE */
32 /* */
33 /* mq_receive PORTABLE C */
34 /* 6.2.0 */
35 /* AUTHOR */
36 /* */
37 /* William E. Lamie, Microsoft Corporation */
38 /* */
39 /* DESCRIPTION */
40 /* */
41 /* */
42 /* Receive a message from a message queue. */
43 /* */
44 /* */
45 /* INPUT */
46 /* */
47 /* mqdes message queue descriptor */
48 /* *pMsg buffer to receive message */
49 /* msgLen size of buffer, in bytes */
50 /* *pMsgPrio If not NULL, return message priority */
51 /* */
52 /* OUTPUT */
53 /* */
54 /* temp1 no of bytes received */
55 /* ERROR If error occurs */
56 /* */
57 /* CALLS */
58 /* */
59 /* posix_internal_error Generic error handler */
60 /* tx_queue_receive ThreadX queue receive */
61 /* tx_byte_release Release bytes */
62 /* posix_memory_allocate Allocate memory */
63 /* tx_thread_identify Returns currently running thread */
64 /* */
65 /* CALLED BY */
66 /* */
67 /* Application Code */
68 /* */
69 /* RELEASE HISTORY */
70 /* */
71 /* DATE NAME DESCRIPTION */
72 /* */
73 /* 06-02-2021 William E. Lamie Initial Version 6.1.7 */
74 /* 10-31-2022 Scott Larson Add 64-bit support, */
75 /* resulting in version 6.2.0 */
76 /* */
77 /**************************************************************************/
mq_receive(mqd_t mqdes,VOID * pMsg,size_t msgLen,ULONG * pMsgPrio)78 ssize_t mq_receive( mqd_t mqdes, VOID * pMsg, size_t msgLen, ULONG *pMsgPrio)
79 {
80
81 TX_QUEUE * Queue;
82 POSIX_MSG_QUEUE * q_ptr;
83 INT temp1, retval = ERROR;
84 ULONG wait_option,length_of_message, priority_of_message,mycount;
85 ULONG * my_ptr;
86 CHAR * this_ptr;
87 VOID * msgbuf1;
88 UCHAR * msgbuf2;
89 VOID * message_source;
90
91 /* Assign a temporary variable for clarity. */
92 Queue = &(mqdes->f_data->queue);
93 q_ptr = (POSIX_MSG_QUEUE * )mqdes->f_data;
94
95 /* First, check for an invalid queue pointer. */
96 if ((!q_ptr) || ( (q_ptr -> px_queue_id) != PX_QUEUE_ID))
97 {
98 /* Queue pointer is invalid, return appropriate error code. */
99 posix_errno = EBADF;
100 posix_set_pthread_errno(EBADF);
101 /* Return ERROR. */
102 return(ERROR);
103 }
104
105 if(((mqdes ->f_flag & O_RDONLY) != O_RDONLY ) && ((mqdes->f_flag & O_RDWR) != O_RDWR))
106 {
107 /* Queue pointer is invalid, return appropriate error code. */
108 posix_errno = EBADF;
109 posix_set_pthread_errno(EBADF);
110
111 /* Return ERROR. */
112 return(ERROR);
113 }
114
115 /* Check if message length provided is less q size provided while creation. */
116 if( msgLen < q_ptr -> q_attr.mq_msgsize )
117 {
118 /* Return appropriate error. */
119 posix_errno = EMSGSIZE;
120 posix_set_pthread_errno(EMSGSIZE);
121
122 /* Return error. */
123 return(ERROR);
124 }
125
126 /* User has indicated a wait is acceptable. */
127 /* if a message is not immediately available. */
128 /* If we are not calling this routine from a thread context. */
129 if (!(tx_thread_identify()))
130 {
131 /* return appropriate error code. */
132 posix_errno = EBADF;
133 posix_set_pthread_errno(EBADF);
134 /* Return ERROR. */
135 return(ERROR);
136 }
137 if ( ( mqdes ->f_flag & O_NONBLOCK ) == O_NONBLOCK )
138 wait_option = TX_NO_WAIT;
139 else
140 wait_option = TX_WAIT_FOREVER;
141
142
143 /* Try to get a message from the message queue. */
144 /* Create a temporary buffer to get message pointer and message length. */
145 temp1 = posix_memory_allocate((sizeof(ULONG)) * TX_POSIX_MESSAGE_SIZE, (VOID**)&msgbuf1);
146 if(temp1 != TX_SUCCESS )
147 {
148 /* Return generic error. */
149 posix_internal_error(100);
150
151 /* Return error. */
152 return(ERROR);
153 }
154 /* Arrange the messages in the queue as per the required priority. */
155 temp1 = posix_arrange_msg( Queue, pMsgPrio );
156 /* Receive the message */
157 temp1 = tx_queue_receive(Queue, msgbuf1, wait_option);
158 /* Some ThreadX error codes map to posix error codes. */
159 switch(temp1)
160 {
161 case TX_SUCCESS:
162 {
163
164 /* All ok */
165 temp1 = OK;
166 break;
167 }
168
169 case TX_DELETED:
170 {
171 break;
172 }
173
174 case TX_QUEUE_EMPTY:
175 {
176 if (wait_option == TX_NO_WAIT)
177 {
178 /* No message to receive while NO_WAIT option is set */
179 posix_errno = EAGAIN;
180 posix_set_pthread_errno(EAGAIN);
181
182 /* Return error */
183 temp1 = ERROR;
184 return(temp1);
185 }
186 break;
187 }
188 case TX_QUEUE_ERROR:
189 case TX_PTR_ERROR:
190 {
191 /* Queue pointer is invalid, return appropriate error code. */
192 posix_errno = EBADF;
193 posix_set_pthread_errno(EBADF);
194
195 /* Return ERROR. */
196 temp1 = ERROR;
197 return(temp1);
198 }
199
200 default:
201 {
202 /* should not come here but send the default error. */
203 posix_errno = EBADF;
204 posix_set_pthread_errno(EBADF);
205
206 /* Return error. */
207 temp1 = ERROR;
208 return(temp1);
209 }
210 }
211
212 /* Assign a variable for clarity. */
213 my_ptr = ( ULONG *)msgbuf1;
214
215 /* Retrieve Message pointer, message Length and message priority. */
216 #ifdef TX_64_BIT
217 this_ptr = (CHAR *)((((ALIGN_TYPE)my_ptr[0]) << 32) | my_ptr[1]);
218 length_of_message = my_ptr[2];
219 priority_of_message = my_ptr[3];
220 #else
221 this_ptr = (CHAR *)(*my_ptr);
222 length_of_message = *(++my_ptr);
223 priority_of_message = *(++my_ptr);
224
225 #endif
226 message_source = (VOID *)this_ptr;
227
228 /* Copy message into supplied buffer. */
229 msgbuf2 = (UCHAR *)pMsg;
230
231 /* Release memory after storing data into destination. */
232 retval = tx_byte_release(msgbuf1);
233
234 if( retval)
235 {
236 /* return generic error. */
237 posix_internal_error(100);
238
239 /* Return error. */
240 return(ERROR);
241 }
242
243 if ( temp1 == OK)
244 {
245 for (mycount = 0; ( (mycount < length_of_message) && (mycount < msgLen)); mycount++)
246 {
247 *(msgbuf2++) = *((UCHAR *)(this_ptr++));
248 }
249
250 temp1 = mycount;
251 }
252
253 retval = tx_byte_release(message_source);
254
255 if( retval)
256 {
257 /* return generic error. */
258 posix_internal_error(100);
259
260 /* Return error. */
261 return(ERROR);
262 }
263
264 /* Copy message priority */
265 if (pMsgPrio)
266 {
267 *pMsgPrio = priority_of_message;
268 }
269
270 /* All done */
271 return(length_of_message);
272 }
273