1.. _zbus:
2
3Zephyr bus (zbus)
4#################
5
6..
7   Note to documentation authors: the diagrams included in this documentation page were designed
8   using the following Figma library:
9   https://www.figma.com/community/file/1292866458780627559/zbus-diagram-assets
10
11
12The :dfn:`Zephyr bus - zbus` is a lightweight and flexible software bus enabling a simple way for
13threads to talk to one another in a many-to-many way.
14
15.. contents::
16    :local:
17    :depth: 2
18
19Concepts
20********
21Threads can send messages to one or more observers using zbus. It makes the many-to-many
22communication possible. The bus implements message-passing and publish/subscribe communication
23paradigms that enable threads to communicate synchronously or asynchronously through shared memory.
24
25The communication through zbus is channel-based. Threads (or callbacks) use channels to exchange
26messages. Additionally, besides other actions, threads can publish and observe channels. When a
27thread publishes a message on a channel, the bus will make the message available to all the
28published channel's observers. Based on the observer's type, it can access the message directly,
29receive a copy of it, or even receive only a reference of the published channel.
30
31The figure below shows an example of a typical application using zbus in which the application logic
32(hardware independent) talks to other threads via software bus. Note that the threads are decoupled
33from each other because they only use zbus channels and do not need to know each other to talk.
34
35
36.. figure:: images/zbus_overview.svg
37    :alt: zbus usage overview
38    :width: 75%
39
40    A typical zbus application architecture.
41
42The bus comprises:
43
44* Set of channels that consists of the control metadata information, and the message itself;
45* :dfn:`Virtual Distributed Event Dispatcher` (VDED), the bus logic responsible for sending
46  notifications/messages to the observers. The VDED logic runs inside the publishing action in the same
47  thread context, giving the bus an idea of a distributed execution. When a thread publishes to a
48  channel, it also propagates the notifications to the observers;
49* Threads (subscribers and message subscribers) and callbacks (listeners) publishing, reading, and
50  receiving notifications from the bus.
51
52.. figure:: images/zbus_anatomy.svg
53    :alt: ZBus anatomy
54    :width: 70%
55
56    ZBus anatomy.
57
58The bus makes the publish, read, claim, finish, notify, and subscribe actions available over
59channels. Publishing, reading, claiming, and finishing are available in all RTOS thread contexts,
60including ISRs. The publish and read operations are simple and fast; the procedure is channel
61locking followed by a memory copy to and from a shared memory region and then a channel unlocking.
62Another essential aspect of zbus is the observers. There are three types of observers:
63
64.. figure:: images/zbus_type_of_observers.svg
65    :alt: ZBus observers type
66    :width: 70%
67
68    ZBus observers.
69
70* Listeners, a callback that the event dispatcher executes every time an observed channel is
71  published or notified;
72* Subscriber, a thread-based observer that relies internally on a message queue where the event
73  dispatcher puts a changed channel's reference every time an observed channel is published or
74  notified. Note this kind of observer does not receive the message itself. It should read the
75  message from the channel after receiving the notification;
76* Message subscribers, a thread-based observer that relies internally on a FIFO where the event
77  dispatcher puts a copy of the message every time an observed channel is published or notified.
78
79Channel observation structures define the relationship between a channel and its observers. For
80every observation, a pair channel/observer. Developers can statically allocate observation using the
81:c:macro:`ZBUS_CHAN_DEFINE` or :c:macro:`ZBUS_CHAN_ADD_OBS`. There are also runtime observers,
82enabling developers to create runtime observations. It is possible to disable an observer entirely
83or observations individually.  The event dispatcher will ignore disabled observers and observations.
84
85.. figure:: images/zbus_observation_mask.svg
86    :alt: ZBus observation mask.
87    :width: 75%
88
89    ZBus observation mask.
90
91The above figure illustrates some states, from (a) to (d), for channels from ``C1`` to ``C5``,
92``Subscriber 1``, and the observations. The last two are in orange to indicate they are dynamically
93allocated (runtime observation). (a) shows that the observer and all observations are enabled. (b)
94shows the observer is disabled, so the event dispatcher will ignore it. (c) shows the observer
95enabled. However, there is one static observation disabled. The event dispatcher will only stop
96sending notifications from channel ``C3``.  In (d), the event dispatcher will stop sending
97notifications from channels ``C3`` and ``C5`` to ``Subscriber 1``.
98
99
100Suppose a usual sensor-based solution is in the figure below for illustration purposes. When
101triggered, the timer publishes to the ``Trigger`` channel. As the sensor thread subscribed to the
102``Trigger`` channel, it receives the sensor data. Notice the VDED executes the ``Blink`` because it
103also listens to the ``Trigger`` channel. When the sensor data is ready, the sensor thread publishes
104it to the ``Sensor data`` channel. The core thread receives the message as a ``Sensor data`` channel
105message subscriber, processes the sensor data, and stores it in an internal sample buffer. It
106repeats until the sample buffer is full; when it happens, the core thread aggregates the sample
107buffer information, prepares a package, and publishes that to the ``Payload`` channel. The Lora
108thread receives that because it is a ``Payload`` channel message subscriber and sends the payload to
109the cloud. When it completes the transmission, the Lora thread publishes to the ``Transmission
110done`` channel. The VDED executes the ``Blink`` again since it listens to the ``Transmission done``
111channel.
112
113.. figure:: images/zbus_operations.svg
114    :alt: ZBus sensor-based application
115    :width: 85%
116
117    ZBus sensor-based application.
118
119This way of implementing the solution makes the application more flexible, enabling us to change
120things independently. For example, we want to change the trigger from a timer to a button press. We
121can do that, and the change does not affect other parts of the system. Likewise, we would like to
122change the communication interface from LoRa to Bluetooth; we only need to change the LoRa thread.
123No other change is required in order to make that work. Thus, the developer would do that for every
124block of the image. Based on that, there is a sign zbus promotes decoupling in the system
125architecture.
126
127Another important aspect of using zbus is the reuse of system modules. If a code portion with
128well-defined behaviors (we call that module) only uses zbus channels and not hardware interfaces, it
129can easily be reused in other solutions. The new solution must implement the interfaces (set of
130channels) the module needs to work. That indicates zbus could improve the module reuse.
131
132The last important note is the zbus solution reach. We can count on many ways of using zbus to
133enable the developer to be as free as possible to create what they need. For example, messages can
134be dynamic or static allocated; notifications can be synchronous or asynchronous; the developer can
135control the channel in so many different ways claiming the channel, developers can add their
136metadata information to a channel by using the user-data field, the discretionary use of a validator
137enables the systems to be accurate over message format, and so on. Those characteristics increase
138the solutions that can be done with zbus and make it a good fit as an open-source community tool.
139
140
141.. _Virtual Distributed Event Dispatcher:
142
143Virtual Distributed Event Dispatcher
144====================================
145
146The VDED execution always happens in the publisher's context. It can be a thread or an ISR. Be
147careful with publications inside ISR because the scheduler won't preempt the VDED. Use that wisely.
148The basic description of the execution is as follows:
149
150
151* The channel lock is acquired;
152* The channel receives the new message via direct copy (by a raw :c:func:`memcpy`);
153* The event dispatcher logic executes the listeners, sends a copy of the message to the message
154  subscribers, and pushes the channel's reference to the subscribers' notification message queue in
155  the same sequence they appear on the channel observers' list. The listeners can perform non-copy
156  quick access to the constant message reference directly (via the :c:func:`zbus_chan_const_msg`
157  function) since the channel is still locked;
158* At last, the publishing function unlocks the channel.
159
160
161To illustrate the VDED execution, consider the example illustrated below. We have four threads in
162ascending priority ``S1``, ``MS2``, ``MS1``, and ``T1`` (the highest priority); two listeners,
163``L1`` and ``L2``; and channel A. Supposing ``L1``, ``L2``, ``MS1``, ``MS2``, and ``S1`` observer
164channel A.
165
166.. figure:: images/zbus_publishing_process_example_scenario.svg
167    :alt: ZBus example scenario
168    :width: 45%
169
170    ZBus VDED execution example scenario.
171
172
173The following code implements channel A. Note the ``struct a_msg`` is illustrative only.
174
175.. code-block:: c
176
177    ZBUS_CHAN_DEFINE(a_chan,                       /* Name */
178             struct a_msg,                         /* Message type */
179
180             NULL,                                 /* Validator */
181             NULL,                                 /* User Data */
182             ZBUS_OBSERVERS(L1, L2, MS1, MS2, S1), /* observers */
183             ZBUS_MSG_INIT(0)                      /* Initial value {0} */
184    );
185
186
187In the figure below, the letters indicate some action related to the VDED execution. The X-axis
188represents the time, and the Y-axis represents the priority of threads. Channel A's message,
189represented by a voice balloon, is only one memory portion (shared memory). It appears several times
190only as an illustration of the message at that point in time.
191
192
193.. figure:: images/zbus_publishing_process_example.svg
194    :alt: ZBus publish processing detail
195    :width: 85%
196
197    ZBus VDED execution detail for priority T1 > MS1 > MS2 > S1.
198
199
200
201The figure above illustrates the actions performed during the VDED execution when T1 publishes to
202channel A. Thus, the table below describes the activities (represented by a letter) of the VDED
203execution. The scenario considers the following priorities: T1 > MS1 > MS2 > S1. T1 has the highest
204priority.
205
206
207.. list-table:: VDED execution steps in detail for priority T1 > MS1 > MS2 > S1.
208   :widths: 5 65
209   :header-rows: 1
210
211   * - Actions
212     - Description
213   * - a
214     - T1 starts and, at some point, publishes to channel A.
215   * - b
216     - The publishing (VDED) process starts. The VDED locks the channel A.
217   * - c
218     - The VDED copies the T1 message to the channel A message.
219
220   * - d, e
221     - The VDED executes L1 and L2 in the respective sequence. Inside the listeners, usually, there
222       is a call to the :c:func:`zbus_chan_const_msg` function, which provides a direct constant
223       reference to channel A's message. It is quick, and no copy is needed here.
224
225   * - f, g
226     - The VDED copies the message and sends that to MS1 and MS2 sequentially. Notice the threads
227       get ready to execute right after receiving the notification. However, they go to a pending
228       state because they have less priority than T1.
229   * - h
230     - The VDED pushes the notification message to the queue of S1. Notice the thread gets ready to
231       execute right after receiving the notification. However, it goes to a pending state because
232       it cannot access the channel since it is still locked.
233
234   * - i
235     - VDED finishes the publishing by unlocking channel A. The MS1 leaves the pending state and
236       starts executing.
237
238   * - j
239     - MS1 finishes execution. The MS2 leaves the pending state and starts executing.
240
241   * - k
242     - MS2 finishes execution. The S1 leaves the pending state and starts executing.
243
244   * - l, m, n
245     - The S1 leaves the pending state since channel A is not locked. It gets in the CPU again and
246       starts executing. As it did receive a notification from channel A, it performed a channel read
247       (as simple as lock, memory copy, unlock), continues its execution and goes out of the CPU.
248
249   * - o
250     - S1 finishes its workload.
251
252
253The figure below illustrates the actions performed during the VDED execution when T1 publishes to
254channel A. The scenario considers the following priorities: T1 < MS1 < MS2 < S1.
255
256.. figure:: images/zbus_publishing_process_example2.svg
257    :alt: ZBus publish processing detail
258    :width: 85%
259
260    ZBus VDED execution detail for priority T1 < MS1 < MS2 < S1.
261
262Thus, the table below describes the activities (represented by a letter) of the VDED execution.
263
264.. list-table:: VDED execution steps in detail for priority T1 < MS1 < MS2 < S1.
265   :widths: 5 65
266   :header-rows: 1
267
268   * - Actions
269     - Description
270   * - a
271     - T1 starts and, at some point, publishes to channel A.
272   * - b
273     - The publishing (VDED) process starts. The VDED locks the channel A.
274   * - c
275     - The VDED copies the T1 message to the channel A message.
276
277   * - d, e
278     - The VDED executes L1 and L2 in the respective sequence. Inside the listeners, usually, there
279       is a call to the :c:func:`zbus_chan_const_msg` function, which provides a direct constant
280       reference to channel A's message. It is quick, and no copy is needed here.
281
282   * - f
283     - The VDED copies the message and sends that to MS1. MS1 preempts T1 and starts working.
284       After that, the T1 regain MCU.
285
286   * - g
287     - The VDED copies the message and sends that to MS2. MS2 preempts T1 and starts working.
288       After that, the T1 regain MCU.
289
290   * - h
291     - The VDED pushes the notification message to the queue of S1.
292
293   * - i
294     - VDED finishes the publishing by unlocking channel A.
295
296   * - j, k, l
297     - The S1 leaves the pending state since channel A is not locked. It gets in the CPU again and
298       starts executing. As it did receive a notification from channel A, it performs a channel read
299       (as simple as lock, memory copy, unlock), continues its execution, and goes out the CPU.
300
301
302HLP priority boost
303------------------
304ZBus implements the Highest Locker Protocol that relies on the observers' thread priority to
305determine a temporary publisher priority. The protocol considers the channel's Highest Observer
306Priority (HOP); even if the observer is not waiting for a message on the channel, it is considered
307in the calculation. The VDED will elevate the publisher's priority based on the HOP to ensure small
308latency and as few preemptions as possible.
309
310.. note::
311    The priority boost is enabled by default. To deactivate it, you must set the
312    :kconfig:option:`CONFIG_ZBUS_PRIORITY_BOOST` configuration.
313
314.. warning::
315    ZBus priority boost does not consider runtime observers on the HOP calculations.
316
317The figure below illustrates the actions performed during the VDED execution when T1 publishes to
318channel A. The scenario considers the priority boost feature and the following priorities: T1 < MS1
319< MS2 < S1.
320
321.. figure:: images/zbus_publishing_process_example_HLP.svg
322    :alt: ZBus publishing process details using priority boost.
323    :width: 85%
324
325    ZBus VDED execution detail with priority boost enabled and for priority T1 < MS1 < MS2 < S1.
326
327To properly use the priority boost, attaching the observer to a thread is necessary. When the
328subscriber is attached to a thread, it assumes its priority, and the priority boost algorithm will
329consider the observer's priority. The following code illustrates the thread-attaching function.
330
331
332.. code-block:: c
333   :emphasize-lines: 10
334
335   ZBUS_SUBSCRIBER_DEFINE(s1, 4);
336   void s1_thread(void *ptr1, void *ptr2, void *ptr3)
337   {
338           ARG_UNUSED(ptr1);
339           ARG_UNUSED(ptr2);
340           ARG_UNUSED(ptr3);
341
342           const struct zbus_channel *chan;
343
344           zbus_obs_attach_to_thread(&s1);
345
346           while (1) {
347                   zbus_sub_wait(&s1, &chan, K_FOREVER);
348
349                   /* Subscriber implementation */
350
351           }
352   }
353   K_THREAD_DEFINE(s1_id, CONFIG_MAIN_STACK_SIZE, s1_thread, NULL, NULL, NULL, 2, 0, 0);
354
355On the above code, the :c:func:`zbus_obs_attach_to_thread` will set the ``s1`` observer with
356priority two as the thread has that priority. It is possible to reverse that by detaching the
357observer using the :c:func:`zbus_obs_detach_from_thread`. Only enabled observers and observations
358will be considered on the channel HOP calculation. Masking a specific observation of a channel will
359affect the channel HOP.
360
361In summary, the benefits of the feature are:
362
363* The HLP is more effective for zbus than the mutexes priority inheritance;
364* No bounded priority inversion will happen among the publisher and the observers;
365* No other threads (that are not involved in the communication) with priority between T1 and S1 can
366  preempt T1, avoiding unbounded priority inversion;
367* Message subscribers will wait for the VDED to finish the message delivery process. So the VDED
368  execution will be faster and more consistent;
369* The HLP priority is dynamic and can change in execution;
370* ZBus operations can be used inside ISRs;
371* The priority boosting feature can be turned off, and plain semaphores can be used as the channel
372  lock mechanism;
373* The Highest Locker Protocol's major disadvantage, the Inheritance-related Priority Inversion, is
374  acceptable in the zbus scenario since it will ensure a small bus latency.
375
376
377Limitations
378===========
379
380Based on the fact that developers can use zbus to solve many different problems, some challenges
381arise. ZBus will not solve every problem, so it is necessary to analyze the situation to be sure
382zbus is applicable. For instance, based on the zbus benchmark, it would not be well suited to a
383high-speed stream of bytes between threads. The :ref:`Pipe <pipes_v2>` kernel object solves this
384kind of need.
385
386Delivery guarantees
387-------------------
388
389ZBus always delivers the messages to the listeners and message subscribers. However, there are no
390message delivery guarantees for subscribers because zbus only sends the notification, but the
391message reading depends on the subscriber's implementation. It is possible to increase the delivery
392rate by following design tips:
393
394* Keep the listeners quick-as-possible (deal with them as ISRs). If some processing is needed,
395  consider submitting a work item to a work-queue;
396* Try to give producers a high priority to avoid losses;
397* Leave spare CPU for observers to consume data produced;
398* Consider using message queues or pipes for intensive byte transfers.
399
400.. warning::
401   ZBus uses :zephyr_file:`include/zephyr/net_buf.h` (network buffers) to exchange data with message
402   subscribers. Thus, choose carefully the configurations
403   :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE` and
404   :kconfig:option:`CONFIG_HEAP_MEM_POOL_SIZE`. They are crucial to a proper VDED execution
405   (delivery guarantee) considering message subscribers. If you want to keep an isolated pool for a
406   specific set of channels, you can use
407   :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION` with a dedicated pool. Look
408   at the :zephyr:code-sample:`zbus-msg-subscriber` to see the isolation in action.
409
410.. warning::
411   Subscribers will receive only the reference of the changing channel. A data loss may be perceived
412   if the channel is published twice before the subscriber reads it. The second publication
413   overwrites the value from the first. Thus, the subscriber will receive two notifications, but
414   only the last data is there.
415
416
417.. _zbus delivery sequence:
418
419Message delivery sequence
420-------------------------
421
422The message delivery will follow the precedence:
423
424#. Observers defined in a channel using the :c:macro:`ZBUS_CHAN_DEFINE` (following the definition
425   sequence);
426#. Observers defined using the :c:macro:`ZBUS_CHAN_ADD_OBS` based on the sequence priority
427   (parameter of the macro);
428#. The latest is the runtime observers in the addition sequence using the
429   :c:func:`zbus_chan_add_obs`.
430
431.. note::
432    The VDED will ignore all disabled observers or observations.
433
434Usage
435*****
436
437ZBus operation depends on channels and observers. Therefore, it is necessary to determine its
438message and observers list during the channel definition. A message is a regular C struct; the
439observer can be a subscriber (asynchronous), a message subscriber (asynchronous), or a listener
440(synchronous).
441
442The following code defines and initializes a regular channel and its dependencies. This channel
443exchanges accelerometer data, for example.
444
445.. code-block:: c
446
447    struct acc_msg {
448            int x;
449            int y;
450            int z;
451    };
452
453    ZBUS_CHAN_DEFINE(acc_chan,                           /* Name */
454             struct acc_msg,                             /* Message type */
455
456             NULL,                                       /* Validator */
457             NULL,                                       /* User Data */
458             ZBUS_OBSERVERS(my_listener, my_subscriber,
459                            my_msg_subscriber),          /* observers */
460             ZBUS_MSG_INIT(.x = 0, .y = 0, .z = 0)       /* Initial value */
461    );
462
463    void listener_callback_example(const struct zbus_channel *chan)
464    {
465            const struct acc_msg *acc;
466            if (&acc_chan == chan) {
467                    acc = zbus_chan_const_msg(chan); // Direct message access
468                    LOG_DBG("From listener -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z);
469            }
470    }
471
472    ZBUS_LISTENER_DEFINE(my_listener, listener_callback_example);
473
474    ZBUS_LISTENER_DEFINE(my_listener2, listener_callback_example);
475
476    ZBUS_CHAN_ADD_OBS(acc_chan, my_listener2, 3);
477
478    ZBUS_SUBSCRIBER_DEFINE(my_subscriber, 4);
479    void subscriber_task(void)
480    {
481            const struct zbus_channel *chan;
482
483            while (!zbus_sub_wait(&my_subscriber, &chan, K_FOREVER)) {
484                    struct acc_msg acc = {0};
485
486                    if (&acc_chan == chan) {
487                            // Indirect message access
488                            zbus_chan_read(&acc_chan, &acc, K_NO_WAIT);
489                            LOG_DBG("From subscriber -> Acc x=%d, y=%d, z=%d", acc.x, acc.y, acc.z);
490                    }
491            }
492    }
493    K_THREAD_DEFINE(subscriber_task_id, 512, subscriber_task, NULL, NULL, NULL, 3, 0, 0);
494
495    ZBUS_MSG_SUBSCRIBER_DEFINE(my_msg_subscriber);
496    static void msg_subscriber_task(void *ptr1, void *ptr2, void *ptr3)
497    {
498            ARG_UNUSED(ptr1);
499            ARG_UNUSED(ptr2);
500            ARG_UNUSED(ptr3);
501            const struct zbus_channel *chan;
502
503            struct acc_msg acc = {0};
504
505            while (!zbus_sub_wait_msg(&my_msg_subscriber, &chan, &acc, K_FOREVER)) {
506                    if (&acc_chan == chan) {
507                            LOG_INF("From msg subscriber -> Acc x=%d, y=%d, z=%d", acc.x, acc.y, acc.z);
508                    }
509            }
510    }
511    K_THREAD_DEFINE(msg_subscriber_task_id, 1024, msg_subscriber_task, NULL, NULL, NULL, 3, 0, 0);
512
513
514
515It is possible to add static observers to a channel using the :c:macro:`ZBUS_CHAN_ADD_OBS`. We call
516that a post-definition static observer. The command enables us to indicate an initialization
517priority that affects the observers' initialization order. The sequence priority param only affects
518the post-definition static observers. There is no possibility to overwrite the message delivery
519sequence of the static observers.
520
521.. note::
522   It is unnecessary to claim/lock a channel before accessing the message inside the listener since
523   the event dispatcher calls listeners with the notifying channel already locked. Subscribers,
524   however, must claim/lock that or use regular read operations to access the message after being
525   notified.
526
527
528Channels can have a *validator function* that enables a channel to accept only valid messages.
529Publish attempts invalidated by hard channels will return immediately with an error code. This
530allows original creators of a channel to exert some authority over other developers/publishers who
531may want to piggy-back on their channels. The following code defines and initializes a :dfn:`hard
532channel` and its dependencies. Only valid messages can be published to a :dfn:`hard channel`. It is
533possible because a *validator function* was passed to the channel's definition. In this example,
534only messages with ``move`` equal to 0, -1, and 1 are valid. Publish function will discard all other
535values to ``move``.
536
537.. code-block:: c
538
539    struct control_msg {
540            int move;
541    };
542
543    bool control_validator(const void* msg, size_t msg_size) {
544            const struct control_msg* cm = msg;
545            bool is_valid = (cm->move == -1) || (cm->move == 0) || (cm->move == 1);
546            return is_valid;
547    }
548
549    static int message_count = 0;
550
551    ZBUS_CHAN_DEFINE(control_chan,    /* Name */
552             struct control_msg,      /* Message type */
553
554             control_validator,       /* Validator */
555             &message_count,          /* User data */
556             ZBUS_OBSERVERS_EMPTY,    /* observers */
557             ZBUS_MSG_INIT(.move = 0) /* Initial value */
558    );
559
560The following sections describe in detail how to use zbus features.
561
562
563.. _publishing to a channel:
564
565Publishing to a channel
566=======================
567
568Messages are published to a channel in zbus by calling :c:func:`zbus_chan_pub`. For example, the
569following code builds on the examples above and publishes to channel ``acc_chan``. The code is
570trying to publish the message ``acc1`` to channel ``acc_chan``, and it will wait up to one second
571for the message to be published. Otherwise, the operation fails. As can be inferred from the code
572sample, it's OK to use stack allocated messages since VDED copies the data internally.
573
574.. code-block:: c
575
576	struct acc_msg acc1 = {.x = 1, .y = 1, .z = 1};
577	zbus_chan_pub(&acc_chan, &acc1, K_SECONDS(1));
578
579.. warning::
580    Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout.
581
582.. _reading from a channel:
583
584Reading from a channel
585======================
586
587Messages are read from a channel in zbus by calling :c:func:`zbus_chan_read`. So, for example, the
588following code tries to read the channel ``acc_chan``, which will wait up to 500 milliseconds to
589read the message. Otherwise, the operation fails.
590
591.. code-block:: c
592
593    struct acc_msg acc = {0};
594    zbus_chan_read(&acc_chan, &acc, K_MSEC(500));
595
596.. warning::
597    Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout.
598
599.. warning::
600   Choose the timeout of :c:func:`zbus_chan_read` after receiving a notification from
601   :c:func:`zbus_sub_wait` carefully because the channel will always be unavailable during the VDED
602   execution. Using ``K_NO_WAIT`` for reading is highly likely to return a timeout error if there
603   are more than one subscriber. For example, consider the VDED illustration again and notice how
604   ``S1`` read attempts would definitely fail with K_NO_WAIT. For more details, check
605   the `Virtual Distributed Event Dispatcher`_ section.
606
607Notifying a channel
608===================
609
610It is possible to force zbus to notify a channel's observers by calling :c:func:`zbus_chan_notify`.
611For example, the following code builds on the examples above and forces a notification for the
612channel ``acc_chan``. Note this can send events with no message, which does not require any data
613exchange. See the code example under `Claim and finish a channel`_ where this may become useful.
614
615.. code-block:: c
616
617    zbus_chan_notify(&acc_chan, K_NO_WAIT);
618
619.. warning::
620    Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout.
621
622Declaring channels and observers
623================================
624
625For accessing channels or observers from files other than its defining files, it is necessary to
626declare them by calling :c:macro:`ZBUS_CHAN_DECLARE` and :c:macro:`ZBUS_OBS_DECLARE`. In other
627words, zbus channel definitions and declarations with the same channel names in different files
628would point to the same (global) channel. Thus, developers should be careful about existing
629channels, and naming new channels or linking will fail. It is possible to declare more than one
630channel or observer on the same call. The following code builds on the examples above and displays
631the defined channels and observers.
632
633.. code-block:: c
634
635    ZBUS_OBS_DECLARE(my_listener, my_subscriber);
636    ZBUS_CHAN_DECLARE(acc_chan, version_chan);
637
638
639Iterating over channels and observers
640=====================================
641
642ZBus subsystem also implements :ref:`Iterable Sections <iterable_sections_api>` for channels and
643observers, for which there are supporting APIs like :c:func:`zbus_iterate_over_channels`,
644:c:func:`zbus_iterate_over_channels_with_user_data`, :c:func:`zbus_iterate_over_observers` and
645:c:func:`zbus_iterate_over_observers_with_user_data`. This feature enables developers to call a
646procedure over all declared channels, where the procedure parameter is a :c:struct:`zbus_channel`.
647The execution sequence is in the alphabetical name order of the channels (see :ref:`Iterable
648Sections <iterable_sections_api>` documentation for details). ZBus also implements this feature for
649:c:struct:`zbus_observer`.
650
651.. code-block:: c
652
653   static bool print_channel_data_iterator(const struct zbus_channel *chan, void *user_data)
654   {
655         int *count = user_data;
656
657         LOG_INF("%d - Channel %s:", *count, zbus_chan_name(chan));
658         LOG_INF("      Message size: %d", zbus_chan_msg_size(chan));
659         LOG_INF("      Observers:");
660
661         ++(*count);
662
663         struct zbus_channel_observation *observation;
664
665         for (int16_t i = *chan->observers_start_idx, limit = *chan->observers_end_idx; i < limit;
666               ++i) {
667               STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
668
669               LOG_INF("      - %s", observation->obs->name);
670         }
671
672         struct zbus_observer_node *obs_nd, *tmp;
673
674         SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->observers, obs_nd, tmp, node) {
675               LOG_INF("      - %s", obs_nd->obs->name);
676         }
677
678         return true;
679   }
680
681   static bool print_observer_data_iterator(const struct zbus_observer *obs, void *user_data)
682   {
683         int *count = user_data;
684
685         LOG_INF("%d - %s %s", *count, obs->queue ? "Subscriber" : "Listener", zbus_obs_name(obs));
686
687         ++(*count);
688
689         return true;
690   }
691
692   int main(void)
693   {
694         int count = 0;
695
696         LOG_INF("Channel list:");
697
698         zbus_iterate_over_channels_with_user_data(print_channel_data_iterator, &count);
699
700         count = 0;
701
702         LOG_INF("Observers list:");
703
704         zbus_iterate_over_observers_with_user_data(print_observer_data_iterator, &count);
705
706         return 0;
707   }
708
709
710The code will log the following output:
711
712.. code-block:: console
713
714    D: Channel list:
715    D: 0 - Channel acc_chan:
716    D:       Message size: 12
717    D:       Observers:
718    D:       - my_listener
719    D:       - my_subscriber
720    D: 1 - Channel version_chan:
721    D:       Message size: 4
722    D:       Observers:
723    D: Observers list:
724    D: 0 - Listener my_listener
725    D: 1 - Subscriber my_subscriber
726
727
728.. _Claim and finish a channel:
729
730Advanced channel control
731========================
732
733ZBus was designed to be as flexible and extensible as possible. Thus, there are some features
734designed to provide some control and extensibility to the bus.
735
736Listeners message access
737------------------------
738
739For performance purposes, listeners can access the receiving channel message directly since they
740already have the channel locked for it. To access the channel's message, the listener should use the
741:c:func:`zbus_chan_const_msg` because the channel passed as an argument to the listener function is
742a constant pointer to the channel. The const pointer return type tells developers not to modify the
743message.
744
745.. code-block:: c
746
747    void listener_callback_example(const struct zbus_channel *chan)
748    {
749            const struct acc_msg *acc;
750            if (&acc_chan == chan) {
751                    acc = zbus_chan_const_msg(chan); // Use this
752                    // instead of zbus_chan_read(chan, &acc, K_MSEC(200))
753                    // or zbus_chan_msg(chan)
754
755                    LOG_DBG("From listener -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z);
756            }
757    }
758
759User Data
760---------
761It is possible to pass custom data into the channel's ``user_data`` for various purposes, such as
762writing channel metadata. That can be achieved by passing a pointer to the channel definition
763macro's ``user_data`` field, which will then be accessible by others. Note that ``user_data`` is
764individual for each channel. Also, note that ``user_data`` access is not thread-safe. For
765thread-safe access to ``user_data``, see the next section.
766
767
768Claim and finish a channel
769--------------------------
770
771To take more control over channels, two functions were added :c:func:`zbus_chan_claim` and
772:c:func:`zbus_chan_finish`. With these functions, it is possible to access the channel's metadata
773safely. When a channel is claimed, no actions are available to that channel. After finishing the
774channel, all the actions are available again.
775
776.. warning::
777   Never change the fields of the channel struct directly. It may cause zbus behavior
778   inconsistencies and scheduling issues.
779
780.. warning::
781    Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout.
782
783The following code builds on the examples above and claims the ``acc_chan`` to set the ``user_data``
784to the channel. Suppose we would like to count how many times the channels exchange messages. We
785defined the ``user_data`` to have the 32 bits integer. This code could be added to the listener code
786described above.
787
788.. code-block:: c
789
790    if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) {
791            int *message_counting = (int *) zbus_chan_user_data(&acc_chan);
792            *message_counting += 1;
793            zbus_chan_finish(&acc_chan);
794    }
795
796The following code has the exact behavior of the code in :ref:`publishing to a channel`.
797
798.. code-block:: c
799
800    if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) {
801            struct acc_msg *acc1 = (struct acc_msg *) zbus_chan_msg(&acc_chan);
802            acc1.x = 1;
803            acc1.y = 1;
804            acc1.z = 1;
805            zbus_chan_finish(&acc_chan);
806            zbus_chan_notify(&acc_chan, K_SECONDS(1));
807    }
808
809The following code has the exact behavior of the code in :ref:`reading from a channel`.
810
811.. code-block:: c
812
813    if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) {
814            const struct acc_msg *acc1 = (const struct acc_msg *) zbus_chan_const_msg(&acc_chan);
815            // access the acc_msg fields directly.
816            zbus_chan_finish(&acc_chan);
817    }
818
819
820Runtime observer registration
821-----------------------------
822
823It is possible to add observers to channels in runtime. This feature uses the heap to allocate the
824nodes dynamically. The heap size limits the number of dynamic observers zbus can create. Therefore,
825set the :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS` to enable the feature. It is possible to
826adjust the heap size by changing the configuration :kconfig:option:`CONFIG_HEAP_MEM_POOL_SIZE`. The
827following example illustrates the runtime registration usage.
828
829
830
831.. code-block:: c
832
833    ZBUS_LISTENER_DEFINE(my_listener, callback);
834    // ...
835    void thread_entry(void) {
836            // ...
837            /* Adding the observer to channel chan1 */
838            zbus_chan_add_obs(&chan1, &my_listener, K_NO_WAIT);
839            /* Removing the observer from channel chan1 */
840            zbus_chan_rm_obs(&chan1, &my_listener, K_NO_WAIT);
841
842
843Samples
844*******
845
846For a complete overview of zbus usage, take a look at the samples. There are the following samples
847available:
848
849* :zephyr:code-sample:`zbus-hello-world` illustrates the code used above in action;
850* :zephyr:code-sample:`zbus-work-queue` shows how to define and use different kinds of observers.
851  Note there is an example of using a work queue instead of executing the listener as an execution
852  option;
853* :zephyr:code-sample:`zbus-msg-subscriber` illustrates how to use message subscribers;
854* :zephyr:code-sample:`zbus-dyn-channel` demonstrates how to use dynamically allocated exchanging
855  data in zbus;
856* :zephyr:code-sample:`zbus-uart-bridge` shows an example of sending the operation of the channel to
857  a host via serial;
858* :zephyr:code-sample:`zbus-remote-mock` illustrates how to implement an external mock (on the host)
859  to send and receive messages to and from the bus;
860* :zephyr:code-sample:`zbus-priority-boost` illustrates zbus priority boost feature with a priority
861  inversion scenario;
862* :zephyr:code-sample:`zbus-runtime-obs-registration` illustrates a way of using the runtime
863  observer registration feature;
864* :zephyr:code-sample:`zbus-confirmed-channel` implements a way of implement confirmed channel only
865  with subscribers;
866* :zephyr:code-sample:`zbus-benchmark` implements a benchmark with different combinations of inputs.
867
868Suggested Uses
869**************
870
871Use zbus to transfer data (messages) between threads in one-to-one, one-to-many, and many-to-many
872synchronously or asynchronously. Choosing the proper observer type is crucial. Use subscribers for
873scenarios that can tolerate message losses and duplications; when they cannot, use message
874subscribers (if you need a thread) or listeners (if you need to be lean and fast). In addition to
875the listener, another asynchronous message processing mechanism (like :ref:`message queues
876<message_queues_v2>`) may be necessary to retain the pending message until it gets processed.
877
878.. note::
879   ZBus can be used to transfer streams from the producer to the consumer. However, this can
880   increase zbus' communication latency. So maybe consider a Pipe a good alternative for this
881   communication topology.
882
883Configuration Options
884*********************
885
886For enabling zbus, it is necessary to enable the :kconfig:option:`CONFIG_ZBUS` option.
887
888Related configuration options:
889
890* :kconfig:option:`CONFIG_ZBUS_PRIORITY_BOOST` zbus Highest Locker Protocol implementation;
891* :kconfig:option:`CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY` determine the :c:macro:`SYS_INIT`
892  priority used by zbus to organize the channels observations by channel;
893* :kconfig:option:`CONFIG_ZBUS_CHANNEL_NAME` enables the name of channels to be available inside the
894  channels metadata. The log uses this information to show the channels' names;
895* :kconfig:option:`CONFIG_ZBUS_OBSERVER_NAME` enables the name of observers to be available inside
896  the channels metadata;
897* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER` enables the message subscriber observer type;
898* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC` uses the heap to allocate message
899  buffers;
900* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_STATIC` uses the stack to allocate message
901  buffers;
902* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE` the available number of message
903  buffers to be used simultaneously;
904* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION` enables the developer to isolate
905  a pool for the message subscriber for a set of channels;
906* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE` the biggest message of zbus
907  channels to be transported into a message buffer;
908* :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS` enables the runtime observer registration.
909
910API Reference
911*************
912
913.. doxygengroup:: zbus_apis
914