1 /*
2 * Copyright (c) 2023 Rodrigo Peixoto <rodrigopex@gmail.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5 #include "messages.h"
6
7 #include <zephyr/kernel.h>
8 #include <zephyr/sys/util_macro.h>
9 #include <zephyr/sys/atomic.h>
10 #include <zephyr/zbus/zbus.h>
11
12 #define CONSUMER_STACK_SIZE (CONFIG_IDLE_STACK_SIZE + CONFIG_BM_MESSAGE_SIZE)
13
14 extern atomic_t count;
15
16 ZBUS_CHAN_DECLARE(bm_channel);
17
18 #define SFY(i, _) s##i
19
20 #define CONSUMERS_NAME_LIST LISTIFY(CONFIG_BM_ONE_TO, SFY, (, /* separator */))
21
22 #define CREATE_OBSERVER(s) ZBUS_MSG_SUBSCRIBER_DEFINE(s)
23
24 #define CREATE_OBSERVATIONS(s) ZBUS_CHAN_ADD_OBS(bm_channel, s, 3)
25
26 /* clang-format off */
27 FOR_EACH(CREATE_OBSERVER, (;), CONSUMERS_NAME_LIST);
28
29 FOR_EACH(CREATE_OBSERVATIONS, (;), CONSUMERS_NAME_LIST);
30 /* clang-format on */
31
msg_sub_thread(void * msub_ref,void * ptr2,void * ptr3)32 static int msg_sub_thread(void *msub_ref, void *ptr2, void *ptr3)
33 {
34 ARG_UNUSED(ptr2);
35 ARG_UNUSED(ptr3);
36
37 const struct zbus_channel *chan;
38 struct bm_msg msg_received;
39 struct zbus_observer *msub = msub_ref;
40
41 while (1) {
42 if (zbus_sub_wait_msg(msub, &chan, &msg_received, K_FOREVER) == 0) {
43 atomic_add(&count, *((uint16_t *)msg_received.bytes));
44 } else {
45 k_oops();
46 }
47 }
48
49 return -EFAULT;
50 }
51
52 #define CREATE_THREADS(name) \
53 K_THREAD_DEFINE(name##_msub_id, CONSUMER_STACK_SIZE, msg_sub_thread, &name, NULL, NULL, 3, \
54 0, 0);
55 /* clang-format off */
56 FOR_EACH(CREATE_THREADS, (;), CONSUMERS_NAME_LIST);
57 /* clang-format on */
58