1 /**************************************************************************/
2 /* */
3 /* Copyright (c) Microsoft Corporation. All rights reserved. */
4 /* */
5 /* This software is licensed under the Microsoft Software License */
6 /* Terms for Microsoft Azure RTOS. Full text of the license can be */
7 /* found in the LICENSE file at https://aka.ms/AzureRTOS_EULA */
8 /* and in the root directory of this software. */
9 /* */
10 /**************************************************************************/
11
12
13 /**************************************************************************/
14 /**************************************************************************/
15 /** */
16 /** POSIX wrapper for THREADX */
17 /** */
18 /** */
19 /** */
20 /**************************************************************************/
21 /**************************************************************************/
22
23 /* Include necessary system files. */
24
25 #include "tx_api.h" /* Threadx API */
26 #include "pthread.h" /* Posix API */
27 #include "px_int.h" /* Posix helper functions */
28
29
30 /**************************************************************************/
31 /* */
32 /* FUNCTION RELEASE */
33 /* */
34 /* mq_receive PORTABLE C */
35 /* 6.2.0 */
36 /* AUTHOR */
37 /* */
38 /* William E. Lamie, Microsoft Corporation */
39 /* */
40 /* DESCRIPTION */
41 /* */
42 /* */
43 /* Receive a message from a message queue. */
44 /* */
45 /* */
46 /* INPUT */
47 /* */
48 /* mqdes message queue descriptor */
49 /* *pMsg buffer to receive message */
50 /* msgLen size of buffer, in bytes */
51 /* *pMsgPrio If not NULL, return message priority */
52 /* */
53 /* OUTPUT */
54 /* */
55 /* temp1 no of bytes received */
56 /* ERROR If error occurs */
57 /* */
58 /* CALLS */
59 /* */
60 /* posix_internal_error Generic error handler */
61 /* tx_queue_receive ThreadX queue receive */
62 /* tx_byte_release Release bytes */
63 /* posix_memory_allocate Allocate memory */
64 /* tx_thread_identify Returns currently running thread */
65 /* */
66 /* CALLED BY */
67 /* */
68 /* Application Code */
69 /* */
70 /* RELEASE HISTORY */
71 /* */
72 /* DATE NAME DESCRIPTION */
73 /* */
74 /* 06-02-2021 William E. Lamie Initial Version 6.1.7 */
75 /* 10-31-2022 Scott Larson Add 64-bit support, */
76 /* resulting in version 6.2.0 */
77 /* */
78 /**************************************************************************/
mq_receive(mqd_t mqdes,VOID * pMsg,size_t msgLen,ULONG * pMsgPrio)79 ssize_t mq_receive( mqd_t mqdes, VOID * pMsg, size_t msgLen, ULONG *pMsgPrio)
80 {
81
82 TX_QUEUE * Queue;
83 POSIX_MSG_QUEUE * q_ptr;
84 INT temp1, retval = ERROR;
85 ULONG wait_option,length_of_message, priority_of_message,mycount;
86 ULONG * my_ptr;
87 CHAR * this_ptr;
88 VOID * msgbuf1;
89 UCHAR * msgbuf2;
90 VOID * message_source;
91
92 /* Assign a temporary variable for clarity. */
93 Queue = &(mqdes->f_data->queue);
94 q_ptr = (POSIX_MSG_QUEUE * )mqdes->f_data;
95
96 /* First, check for an invalid queue pointer. */
97 if ((!q_ptr) || ( (q_ptr -> px_queue_id) != PX_QUEUE_ID))
98 {
99 /* Queue pointer is invalid, return appropriate error code. */
100 posix_errno = EBADF;
101 posix_set_pthread_errno(EBADF);
102 /* Return ERROR. */
103 return(ERROR);
104 }
105
106 if(((mqdes ->f_flag & O_RDONLY) != O_RDONLY ) && ((mqdes->f_flag & O_RDWR) != O_RDWR))
107 {
108 /* Queue pointer is invalid, return appropriate error code. */
109 posix_errno = EBADF;
110 posix_set_pthread_errno(EBADF);
111
112 /* Return ERROR. */
113 return(ERROR);
114 }
115
116 /* Check if message length provided is less q size provided while creation. */
117 if( msgLen < q_ptr -> q_attr.mq_msgsize )
118 {
119 /* Return appropriate error. */
120 posix_errno = EMSGSIZE;
121 posix_set_pthread_errno(EMSGSIZE);
122
123 /* Return error. */
124 return(ERROR);
125 }
126
127 /* User has indicated a wait is acceptable. */
128 /* if a message is not immediately available. */
129 /* If we are not calling this routine from a thread context. */
130 if (!(tx_thread_identify()))
131 {
132 /* return appropriate error code. */
133 posix_errno = EBADF;
134 posix_set_pthread_errno(EBADF);
135 /* Return ERROR. */
136 return(ERROR);
137 }
138 if ( ( mqdes ->f_flag & O_NONBLOCK ) == O_NONBLOCK )
139 wait_option = TX_NO_WAIT;
140 else
141 wait_option = TX_WAIT_FOREVER;
142
143
144 /* Try to get a message from the message queue. */
145 /* Create a temporary buffer to get message pointer and message length. */
146 temp1 = posix_memory_allocate((sizeof(ULONG)) * TX_POSIX_MESSAGE_SIZE, (VOID**)&msgbuf1);
147 if(temp1 != TX_SUCCESS )
148 {
149 /* Return generic error. */
150 posix_internal_error(100);
151
152 /* Return error. */
153 return(ERROR);
154 }
155 /* Arrange the messages in the queue as per the required priority. */
156 temp1 = posix_arrange_msg( Queue, pMsgPrio );
157 /* Receive the message */
158 temp1 = tx_queue_receive(Queue, msgbuf1, wait_option);
159 /* Some ThreadX error codes map to posix error codes. */
160 switch(temp1)
161 {
162 case TX_SUCCESS:
163 {
164
165 /* All ok */
166 temp1 = OK;
167 break;
168 }
169
170 case TX_DELETED:
171 {
172 break;
173 }
174
175 case TX_QUEUE_EMPTY:
176 {
177 if (wait_option == TX_NO_WAIT)
178 {
179 /* No message to receive while NO_WAIT option is set */
180 posix_errno = EAGAIN;
181 posix_set_pthread_errno(EAGAIN);
182
183 /* Return error */
184 temp1 = ERROR;
185 return(temp1);
186 }
187 break;
188 }
189 case TX_QUEUE_ERROR:
190 case TX_PTR_ERROR:
191 {
192 /* Queue pointer is invalid, return appropriate error code. */
193 posix_errno = EBADF;
194 posix_set_pthread_errno(EBADF);
195
196 /* Return ERROR. */
197 temp1 = ERROR;
198 return(temp1);
199 }
200
201 default:
202 {
203 /* should not come here but send the default error. */
204 posix_errno = EBADF;
205 posix_set_pthread_errno(EBADF);
206
207 /* Return error. */
208 temp1 = ERROR;
209 return(temp1);
210 }
211 }
212
213 /* Assign a variable for clarity. */
214 my_ptr = ( ULONG *)msgbuf1;
215
216 /* Retrieve Message pointer, message Length and message priority. */
217 #ifdef TX_64_BIT
218 this_ptr = (CHAR *)((((ALIGN_TYPE)my_ptr[0]) << 32) | my_ptr[1]);
219 length_of_message = my_ptr[2];
220 priority_of_message = my_ptr[3];
221 #else
222 this_ptr = (CHAR *)(*my_ptr);
223 length_of_message = *(++my_ptr);
224 priority_of_message = *(++my_ptr);
225
226 #endif
227 message_source = (VOID *)this_ptr;
228
229 /* Copy message into supplied buffer. */
230 msgbuf2 = (UCHAR *)pMsg;
231
232 /* Release memory after storing data into destination. */
233 retval = tx_byte_release(msgbuf1);
234
235 if( retval)
236 {
237 /* return generic error. */
238 posix_internal_error(100);
239
240 /* Return error. */
241 return(ERROR);
242 }
243
244 if ( temp1 == OK)
245 {
246 for (mycount = 0; ( (mycount < length_of_message) && (mycount < msgLen)); mycount++)
247 {
248 *(msgbuf2++) = *((UCHAR *)(this_ptr++));
249 }
250
251 temp1 = mycount;
252 }
253
254 retval = tx_byte_release(message_source);
255
256 if( retval)
257 {
258 /* return generic error. */
259 posix_internal_error(100);
260
261 /* Return error. */
262 return(ERROR);
263 }
264
265 /* Copy message priority */
266 if (pMsgPrio)
267 {
268 *pMsgPrio = priority_of_message;
269 }
270
271 /* All done */
272 return(length_of_message);
273 }
274