/* * Copyright (c) 2018 Intel Corporation * Copyright (c) 2024 BayLibre, SAS * * SPDX-License-Identifier: Apache-2.0 */ #include #include #include #include #include #define N_THR 2 #define MESSAGE_SIZE 16 #define MESG_COUNT_PERMQ 4 static char queue[16] = "server"; static char send_data[MESSAGE_SIZE] = "timed data send"; /* * For platforms that select CONFIG_KERNEL_COHERENCE, the receive buffer can * not be on the stack as the k_msgq that underlies the mq_timedsend() will * copy directly to the receiver's buffer when there is already a waiting * receiver. */ static char rec_data[MESSAGE_SIZE]; static void *sender_thread(void *p1) { mqd_t mqd; struct timespec curtime; mqd = mq_open(queue, O_WRONLY); clock_gettime(CLOCK_MONOTONIC, &curtime); curtime.tv_sec += 1; zassert_false(mq_timedsend(mqd, send_data, MESSAGE_SIZE, 0, &curtime), "Not able to send message in timer"); usleep(USEC_PER_MSEC); zassert_false(mq_close(mqd), "unable to close message queue descriptor."); pthread_exit(p1); return NULL; } static void *receiver_thread(void *p1) { mqd_t mqd; struct timespec curtime; mqd = mq_open(queue, O_RDONLY); clock_gettime(CLOCK_MONOTONIC, &curtime); curtime.tv_sec += 1; mq_timedreceive(mqd, rec_data, MESSAGE_SIZE, 0, &curtime); zassert_false(strcmp(rec_data, send_data), "Error in data reception. exp: %s act: %s", send_data, rec_data); usleep(USEC_PER_MSEC); zassert_false(mq_close(mqd), "unable to close message queue descriptor."); pthread_exit(p1); return NULL; } ZTEST(mqueue, test_mqueue) { mqd_t mqd; struct mq_attr attrs; int32_t mode = 0777; int flags = O_RDWR | O_CREAT; void *retval; pthread_t newthread[N_THR]; attrs.mq_msgsize = MESSAGE_SIZE; attrs.mq_maxmsg = MESG_COUNT_PERMQ; mqd = mq_open(queue, flags, mode, &attrs); for (int i = 0; i < N_THR; i++) { /* Creating threads */ zassert_ok(pthread_create(&newthread[i], NULL, (i % 2 == 0) ? receiver_thread : sender_thread, NULL)); } usleep(USEC_PER_MSEC * 10U); for (int i = 0; i < N_THR; i++) { pthread_join(newthread[i], &retval); } zassert_false(mq_close(mqd), "unable to close message queue descriptor."); zassert_false(mq_unlink(queue), "Not able to unlink Queue"); } static bool notification_executed; void notify_function_basic(union sigval val) { mqd_t mqd; bool *executed = (bool *)val.sival_ptr; mqd = mq_open(queue, O_RDONLY); mq_receive(mqd, rec_data, MESSAGE_SIZE, 0); zassert_ok(strcmp(rec_data, send_data), "Error in data reception. exp: %s act: %s", send_data, rec_data); zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); *executed = true; } ZTEST(mqueue, test_mqueue_notify_basic) { mqd_t mqd; struct mq_attr attrs = { .mq_msgsize = MESSAGE_SIZE, .mq_maxmsg = MESG_COUNT_PERMQ, }; struct sigevent not = { .sigev_notify = SIGEV_NONE, .sigev_value.sival_ptr = (void *)¬ification_executed, .sigev_notify_function = notify_function_basic, }; int32_t mode = 0777; int flags = O_RDWR | O_CREAT; notification_executed = false; memset(rec_data, 0, MESSAGE_SIZE); mqd = mq_open(queue, flags, mode, &attrs); zassert_ok(mq_notify(mqd, ¬), "Unable to set notification."); zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message"); zassert_true(notification_executed, "Notification not triggered."); zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); zassert_ok(mq_unlink(queue), "Unable to unlink queue"); } void notify_function_thread(union sigval val) { mqd_t mqd; pthread_t sender = (pthread_t)val.sival_int; zassert_not_equal(sender, pthread_self(), "Notification function should be executed from different thread."); mqd = mq_open(queue, O_RDONLY); mq_receive(mqd, rec_data, MESSAGE_SIZE, 0); zassert_ok(strcmp(rec_data, send_data), "Error in data reception. exp: %s act: %s", send_data, rec_data); zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); notification_executed = true; } ZTEST(mqueue, test_mqueue_notify_thread) { mqd_t mqd; struct mq_attr attrs = { .mq_msgsize = MESSAGE_SIZE, .mq_maxmsg = MESG_COUNT_PERMQ, }; struct sigevent not = { .sigev_notify = SIGEV_THREAD, .sigev_value.sival_int = (int)pthread_self(), .sigev_notify_function = notify_function_thread, }; int32_t mode = 0777; int flags = O_RDWR | O_CREAT; notification_executed = false; memset(rec_data, 0, MESSAGE_SIZE); mqd = mq_open(queue, flags, mode, &attrs); zassert_ok(mq_notify(mqd, ¬), "Unable to set notification."); zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message"); usleep(USEC_PER_MSEC * 100U); zassert_true(notification_executed, "Notification not triggered."); zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); zassert_ok(mq_unlink(queue), "Unable to unlink queue"); } ZTEST(mqueue, test_mqueue_notify_non_empty_queue) { mqd_t mqd; struct mq_attr attrs = { .mq_msgsize = MESSAGE_SIZE, .mq_maxmsg = MESG_COUNT_PERMQ, }; struct sigevent not = { .sigev_notify = SIGEV_NONE, .sigev_value.sival_ptr = (void *)¬ification_executed, .sigev_notify_function = notify_function_basic, }; int32_t mode = 0777; int flags = O_RDWR | O_CREAT; notification_executed = false; memset(rec_data, 0, MESSAGE_SIZE); mqd = mq_open(queue, flags, mode, &attrs); zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message"); zassert_ok(mq_notify(mqd, ¬), "Unable to set notification."); zassert_false(notification_executed, "Notification shouldn't be processed."); mq_receive(mqd, rec_data, MESSAGE_SIZE, 0); zassert_false(strcmp(rec_data, send_data), "Error in data reception. exp: %s act: %s", send_data, rec_data); memset(rec_data, 0, MESSAGE_SIZE); zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message"); zassert_true(notification_executed, "Notification not triggered."); zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); zassert_ok(mq_unlink(queue), "Unable to unlink queue"); } ZTEST(mqueue, test_mqueue_notify_errors) { mqd_t mqd; struct mq_attr attrs = { .mq_msgsize = MESSAGE_SIZE, .mq_maxmsg = MESG_COUNT_PERMQ, }; struct sigevent not = { .sigev_notify = SIGEV_SIGNAL, .sigev_value.sival_ptr = (void *)¬ification_executed, .sigev_notify_function = notify_function_basic, }; int32_t mode = 0777; int flags = O_RDWR | O_CREAT; zassert_not_ok(mq_notify(NULL, NULL), "Should return -1 and set errno to EBADF."); zassert_equal(errno, EBADF); mqd = mq_open(queue, flags, mode, &attrs); zassert_not_ok(mq_notify(mqd, NULL), "Should return -1 and set errno to EINVAL."); zassert_equal(errno, EINVAL); zassert_not_ok(mq_notify(mqd, ¬), "SIGEV_SIGNAL not supported should return -1."); zassert_equal(errno, ENOSYS); not.sigev_notify = SIGEV_NONE; zassert_ok(mq_notify(mqd, ¬), "Unexpected error while asigning notification to the queue."); zassert_not_ok(mq_notify(mqd, ¬), "Can't assign notification when there is another assigned."); zassert_equal(errno, EBUSY); zassert_ok(mq_notify(mqd, NULL), "Unable to remove notification from the message queue."); zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); zassert_ok(mq_unlink(queue), "Unable to unlink queue"); } static void before(void *arg) { ARG_UNUSED(arg); if (!IS_ENABLED(CONFIG_DYNAMIC_THREAD)) { /* skip redundant testing if there is no thread pool / heap allocation */ ztest_test_skip(); } } ZTEST_SUITE(mqueue, NULL, NULL, before, NULL, NULL);