1 /*
2  * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 #include <zephyr/kernel.h>
6 #include <zephyr/logging/log.h>
7 #include <zephyr/zbus/zbus.h>
8 
9 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
10 
zbus_chan_add_obs(const struct zbus_channel * chan,const struct zbus_observer * obs,k_timeout_t timeout)11 int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
12 		      k_timeout_t timeout)
13 {
14 	int err;
15 	struct zbus_observer_node *obs_nd, *tmp;
16 	struct zbus_channel_observation *observation;
17 
18 	_ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked");
19 	_ZBUS_ASSERT(chan != NULL, "chan is required");
20 	_ZBUS_ASSERT(obs != NULL, "obs is required");
21 
22 	err = k_mutex_lock(&chan->data->mutex, timeout);
23 	if (err) {
24 		return err;
25 	}
26 
27 	for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
28 	     i < limit; ++i) {
29 		STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
30 
31 		__ASSERT(observation != NULL, "observation must be not NULL");
32 
33 		if (observation->obs == obs) {
34 			k_mutex_unlock(&chan->data->mutex);
35 
36 			return -EEXIST;
37 		}
38 	}
39 
40 	/* Check if the observer is already a runtime observer */
41 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
42 		if (obs_nd->obs == obs) {
43 			k_mutex_unlock(&chan->data->mutex);
44 
45 			return -EALREADY;
46 		}
47 	}
48 
49 	struct zbus_observer_node *new_obs_nd = k_malloc(sizeof(struct zbus_observer_node));
50 
51 	if (new_obs_nd == NULL) {
52 		LOG_ERR("Could not allocate observer node the heap is full!");
53 
54 		k_mutex_unlock(&chan->data->mutex);
55 
56 		return -ENOMEM;
57 	}
58 
59 	new_obs_nd->obs = obs;
60 
61 	sys_slist_append(&chan->data->observers, &new_obs_nd->node);
62 
63 	k_mutex_unlock(&chan->data->mutex);
64 
65 	return 0;
66 }
67 
zbus_chan_rm_obs(const struct zbus_channel * chan,const struct zbus_observer * obs,k_timeout_t timeout)68 int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
69 		     k_timeout_t timeout)
70 {
71 	int err;
72 	struct zbus_observer_node *obs_nd, *tmp;
73 	struct zbus_observer_node *prev_obs_nd = NULL;
74 
75 	_ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked");
76 	_ZBUS_ASSERT(chan != NULL, "chan is required");
77 	_ZBUS_ASSERT(obs != NULL, "obs is required");
78 
79 	err = k_mutex_lock(&chan->data->mutex, timeout);
80 	if (err) {
81 		return err;
82 	}
83 
84 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
85 		if (obs_nd->obs == obs) {
86 			sys_slist_remove(&chan->data->observers, &prev_obs_nd->node, &obs_nd->node);
87 
88 			k_free(obs_nd);
89 
90 			k_mutex_unlock(&chan->data->mutex);
91 
92 			return 0;
93 		}
94 
95 		prev_obs_nd = obs_nd;
96 	}
97 
98 	k_mutex_unlock(&chan->data->mutex);
99 
100 	return -ENODATA;
101 }
102