1 /*
2  * Copyright (c) 2023 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 #include "messages.h"
6 
7 #include <zephyr/kernel.h>
8 #include <zephyr/sys/util_macro.h>
9 #include <zephyr/sys/atomic.h>
10 #include <zephyr/zbus/zbus.h>
11 
12 #define CONSUMER_STACK_SIZE (CONFIG_IDLE_STACK_SIZE + CONFIG_BM_MESSAGE_SIZE)
13 
14 extern atomic_t count;
15 
16 ZBUS_CHAN_DECLARE(bm_channel);
17 
18 #define SFY(i, _) s##i
19 
20 #define CONSUMERS_NAME_LIST LISTIFY(CONFIG_BM_ONE_TO, SFY, (, /* separator */))
21 
22 #define CREATE_OBSERVER(s) ZBUS_SUBSCRIBER_DEFINE(s, 4)
23 
24 #define CREATE_OBSERVATIONS(s) ZBUS_CHAN_ADD_OBS(bm_channel, s, 3)
25 
26 /* clang-format off */
27 FOR_EACH(CREATE_OBSERVER, (;), CONSUMERS_NAME_LIST);
28 
29 FOR_EACH(CREATE_OBSERVATIONS, (;), CONSUMERS_NAME_LIST);
30 /* clang-format on */
31 
sub_thread(void * sub_ref,void * ptr2,void * ptr3)32 static int sub_thread(void *sub_ref, void *ptr2, void *ptr3)
33 {
34 	ARG_UNUSED(ptr2);
35 	ARG_UNUSED(ptr3);
36 
37 	const struct zbus_channel *chan;
38 	struct zbus_observer *sub = sub_ref;
39 
40 	while (1) {
41 		if (zbus_sub_wait(sub, &chan, K_FOREVER) == 0) {
42 			if (zbus_chan_claim(chan, K_FOREVER) != 0) {
43 				k_oops();
44 			}
45 
46 			if (IS_ENABLED(CONFIG_BM_FAIRPLAY)) {
47 				struct bm_msg message;
48 
49 				memcpy(zbus_chan_msg(chan), &message, chan->message_size);
50 
51 				atomic_add(&count, *((uint16_t *)message.bytes));
52 			} else {
53 				const struct bm_msg *msg_received = zbus_chan_const_msg(chan);
54 
55 				atomic_add(&count, *((uint16_t *)msg_received->bytes));
56 			}
57 
58 			zbus_chan_finish(chan);
59 		} else {
60 			k_oops();
61 		}
62 	}
63 	return -EFAULT;
64 }
65 
66 #define CREATE_THREADS(name)                                                                       \
67 	K_THREAD_DEFINE(name##_sub_id, CONSUMER_STACK_SIZE, sub_thread, &name, NULL, NULL, 3, 0, 0);
68 
69 /* clang-format off */
70 FOR_EACH(CREATE_THREADS, (;), CONSUMERS_NAME_LIST);
71 /* clang-format on */
72