1.. _zbus: 2 3Zephyr bus (zbus) 4################# 5 6.. 7 Note to documentation authors: the diagrams included in this documentation page were designed 8 using the following Figma library: 9 https://www.figma.com/community/file/1292866458780627559/zbus-diagram-assets 10 11 12The :dfn:`Zephyr bus - zbus` is a lightweight and flexible software bus enabling a simple way for 13threads to talk to one another in a many-to-many way. 14 15.. contents:: 16 :local: 17 :depth: 2 18 19Concepts 20******** 21Threads can send messages to one or more observers using zbus. It makes the many-to-many 22communication possible. The bus implements message-passing and publish/subscribe communication 23paradigms that enable threads to communicate synchronously or asynchronously through shared memory. 24 25The communication through zbus is channel-based. Threads (or callbacks) use channels to exchange 26messages. Additionally, besides other actions, threads can publish and observe channels. When a 27thread publishes a message on a channel, the bus will make the message available to all the 28published channel's observers. Based on the observer's type, it can access the message directly, 29receive a copy of it, or even receive only a reference of the published channel. 30 31The figure below shows an example of a typical application using zbus in which the application logic 32(hardware independent) talks to other threads via software bus. Note that the threads are decoupled 33from each other because they only use zbus channels and do not need to know each other to talk. 34 35 36.. figure:: images/zbus_overview.svg 37 :alt: zbus usage overview 38 :width: 75% 39 40 A typical zbus application architecture. 41 42The bus comprises: 43 44* Set of channels that consists of the control metadata information, and the message itself; 45* :dfn:`Virtual Distributed Event Dispatcher` (VDED), the bus logic responsible for sending 46 notifications/messages to the observers. The VDED logic runs inside the publishing action in the same 47 thread context, giving the bus an idea of a distributed execution. When a thread publishes to a 48 channel, it also propagates the notifications to the observers; 49* Threads (subscribers and message subscribers) and callbacks (listeners) publishing, reading, and 50 receiving notifications from the bus. 51 52.. figure:: images/zbus_anatomy.svg 53 :alt: ZBus anatomy 54 :width: 70% 55 56 ZBus anatomy. 57 58The bus makes the publish, read, claim, finish, notify, and subscribe actions available over 59channels. Publishing, reading, claiming, and finishing are available in all RTOS thread contexts, 60including ISRs. The publish and read operations are simple and fast; the procedure is channel 61locking followed by a memory copy to and from a shared memory region and then a channel unlocking. 62Another essential aspect of zbus is the observers. There are three types of observers: 63 64.. figure:: images/zbus_type_of_observers.svg 65 :alt: ZBus observers type 66 :width: 70% 67 68 ZBus observers. 69 70* Listeners, a callback that the event dispatcher executes every time an observed channel is 71 published or notified; 72* Subscriber, a thread-based observer that relies internally on a message queue where the event 73 dispatcher puts a changed channel's reference every time an observed channel is published or 74 notified. Note this kind of observer does not receive the message itself. It should read the 75 message from the channel after receiving the notification; 76* Message subscribers, a thread-based observer that relies internally on a FIFO where the event 77 dispatcher puts a copy of the message every time an observed channel is published or notified. 78 79Channel observation structures define the relationship between a channel and its observers. For 80every observation, a pair channel/observer. Developers can statically allocate observation using the 81:c:macro:`ZBUS_CHAN_DEFINE` or :c:macro:`ZBUS_CHAN_ADD_OBS`. There are also runtime observers, 82enabling developers to create runtime observations. It is possible to disable an observer entirely 83or observations individually. The event dispatcher will ignore disabled observers and observations. 84 85.. figure:: images/zbus_observation_mask.svg 86 :alt: ZBus observation mask. 87 :width: 75% 88 89 ZBus observation mask. 90 91The above figure illustrates some states, from (a) to (d), for channels from ``C1`` to ``C5``, 92``Subscriber 1``, and the observations. The last two are in orange to indicate they are dynamically 93allocated (runtime observation). (a) shows that the observer and all observations are enabled. (b) 94shows the observer is disabled, so the event dispatcher will ignore it. (c) shows the observer 95enabled. However, there is one static observation disabled. The event dispatcher will only stop 96sending notifications from channel ``C3``. In (d), the event dispatcher will stop sending 97notifications from channels ``C3`` and ``C5`` to ``Subscriber 1``. 98 99 100Suppose a usual sensor-based solution is in the figure below for illustration purposes. When 101triggered, the timer publishes to the ``Trigger`` channel. As the sensor thread subscribed to the 102``Trigger`` channel, it receives the sensor data. Notice the VDED executes the ``Blink`` because it 103also listens to the ``Trigger`` channel. When the sensor data is ready, the sensor thread publishes 104it to the ``Sensor data`` channel. The core thread receives the message as a ``Sensor data`` channel 105message subscriber, processes the sensor data, and stores it in an internal sample buffer. It 106repeats until the sample buffer is full; when it happens, the core thread aggregates the sample 107buffer information, prepares a package, and publishes that to the ``Payload`` channel. The Lora 108thread receives that because it is a ``Payload`` channel message subscriber and sends the payload to 109the cloud. When it completes the transmission, the Lora thread publishes to the ``Transmission 110done`` channel. The VDED executes the ``Blink`` again since it listens to the ``Transmission done`` 111channel. 112 113.. figure:: images/zbus_operations.svg 114 :alt: ZBus sensor-based application 115 :width: 85% 116 117 ZBus sensor-based application. 118 119This way of implementing the solution makes the application more flexible, enabling us to change 120things independently. For example, we want to change the trigger from a timer to a button press. We 121can do that, and the change does not affect other parts of the system. Likewise, we would like to 122change the communication interface from LoRa to Bluetooth; we only need to change the LoRa thread. 123No other change is required in order to make that work. Thus, the developer would do that for every 124block of the image. Based on that, there is a sign zbus promotes decoupling in the system 125architecture. 126 127Another important aspect of using zbus is the reuse of system modules. If a code portion with 128well-defined behaviors (we call that module) only uses zbus channels and not hardware interfaces, it 129can easily be reused in other solutions. The new solution must implement the interfaces (set of 130channels) the module needs to work. That indicates zbus could improve the module reuse. 131 132The last important note is the zbus solution reach. We can count on many ways of using zbus to 133enable the developer to be as free as possible to create what they need. For example, messages can 134be dynamic or static allocated; notifications can be synchronous or asynchronous; the developer can 135control the channel in so many different ways claiming the channel, developers can add their 136metadata information to a channel by using the user-data field, the discretionary use of a validator 137enables the systems to be accurate over message format, and so on. Those characteristics increase 138the solutions that can be done with zbus and make it a good fit as an open-source community tool. 139 140 141.. _Virtual Distributed Event Dispatcher: 142 143Virtual Distributed Event Dispatcher 144==================================== 145 146The VDED execution always happens in the publisher's context. It can be a thread or an ISR. Be 147careful with publications inside ISR because the scheduler won't preempt the VDED. Use that wisely. 148The basic description of the execution is as follows: 149 150 151* The channel lock is acquired; 152* The channel receives the new message via direct copy (by a raw :c:func:`memcpy`); 153* The event dispatcher logic executes the listeners, sends a copy of the message to the message 154 subscribers, and pushes the channel's reference to the subscribers' notification message queue in 155 the same sequence they appear on the channel observers' list. The listeners can perform non-copy 156 quick access to the constant message reference directly (via the :c:func:`zbus_chan_const_msg` 157 function) since the channel is still locked; 158* At last, the publishing function unlocks the channel. 159 160 161To illustrate the VDED execution, consider the example illustrated below. We have four threads in 162ascending priority ``S1``, ``MS2``, ``MS1``, and ``T1`` (the highest priority); two listeners, 163``L1`` and ``L2``; and channel A. Supposing ``L1``, ``L2``, ``MS1``, ``MS2``, and ``S1`` observer 164channel A. 165 166.. figure:: images/zbus_publishing_process_example_scenario.svg 167 :alt: ZBus example scenario 168 :width: 45% 169 170 ZBus VDED execution example scenario. 171 172 173The following code implements channel A. Note the ``struct a_msg`` is illustrative only. 174 175.. code-block:: c 176 177 ZBUS_CHAN_DEFINE(a_chan, /* Name */ 178 struct a_msg, /* Message type */ 179 180 NULL, /* Validator */ 181 NULL, /* User Data */ 182 ZBUS_OBSERVERS(L1, L2, MS1, MS2, S1), /* observers */ 183 ZBUS_MSG_INIT(0) /* Initial value {0} */ 184 ); 185 186 187In the figure below, the letters indicate some action related to the VDED execution. The X-axis 188represents the time, and the Y-axis represents the priority of threads. Channel A's message, 189represented by a voice balloon, is only one memory portion (shared memory). It appears several times 190only as an illustration of the message at that point in time. 191 192 193.. figure:: images/zbus_publishing_process_example.svg 194 :alt: ZBus publish processing detail 195 :width: 85% 196 197 ZBus VDED execution detail for priority T1 > MS1 > MS2 > S1. 198 199 200 201The figure above illustrates the actions performed during the VDED execution when T1 publishes to 202channel A. Thus, the table below describes the activities (represented by a letter) of the VDED 203execution. The scenario considers the following priorities: T1 > MS1 > MS2 > S1. T1 has the highest 204priority. 205 206 207.. list-table:: VDED execution steps in detail for priority T1 > MS1 > MS2 > S1. 208 :widths: 5 65 209 :header-rows: 1 210 211 * - Actions 212 - Description 213 * - a 214 - T1 starts and, at some point, publishes to channel A. 215 * - b 216 - The publishing (VDED) process starts. The VDED locks the channel A. 217 * - c 218 - The VDED copies the T1 message to the channel A message. 219 220 * - d, e 221 - The VDED executes L1 and L2 in the respective sequence. Inside the listeners, usually, there 222 is a call to the :c:func:`zbus_chan_const_msg` function, which provides a direct constant 223 reference to channel A's message. It is quick, and no copy is needed here. 224 225 * - f, g 226 - The VDED copies the message and sends that to MS1 and MS2 sequentially. Notice the threads 227 get ready to execute right after receiving the notification. However, they go to a pending 228 state because they have less priority than T1. 229 * - h 230 - The VDED pushes the notification message to the queue of S1. Notice the thread gets ready to 231 execute right after receiving the notification. However, it goes to a pending state because 232 it cannot access the channel since it is still locked. 233 234 * - i 235 - VDED finishes the publishing by unlocking channel A. The MS1 leaves the pending state and 236 starts executing. 237 238 * - j 239 - MS1 finishes execution. The MS2 leaves the pending state and starts executing. 240 241 * - k 242 - MS2 finishes execution. The S1 leaves the pending state and starts executing. 243 244 * - l, m, n 245 - The S1 leaves the pending state since channel A is not locked. It gets in the CPU again and 246 starts executing. As it did receive a notification from channel A, it performed a channel read 247 (as simple as lock, memory copy, unlock), continues its execution and goes out of the CPU. 248 249 * - o 250 - S1 finishes its workload. 251 252 253The figure below illustrates the actions performed during the VDED execution when T1 publishes to 254channel A. The scenario considers the following priorities: T1 < MS1 < MS2 < S1. 255 256.. figure:: images/zbus_publishing_process_example2.svg 257 :alt: ZBus publish processing detail 258 :width: 85% 259 260 ZBus VDED execution detail for priority T1 < MS1 < MS2 < S1. 261 262Thus, the table below describes the activities (represented by a letter) of the VDED execution. 263 264.. list-table:: VDED execution steps in detail for priority T1 < MS1 < MS2 < S1. 265 :widths: 5 65 266 :header-rows: 1 267 268 * - Actions 269 - Description 270 * - a 271 - T1 starts and, at some point, publishes to channel A. 272 * - b 273 - The publishing (VDED) process starts. The VDED locks the channel A. 274 * - c 275 - The VDED copies the T1 message to the channel A message. 276 277 * - d, e 278 - The VDED executes L1 and L2 in the respective sequence. Inside the listeners, usually, there 279 is a call to the :c:func:`zbus_chan_const_msg` function, which provides a direct constant 280 reference to channel A's message. It is quick, and no copy is needed here. 281 282 * - f 283 - The VDED copies the message and sends that to MS1. MS1 preempts T1 and starts working. 284 After that, the T1 regain MCU. 285 286 * - g 287 - The VDED copies the message and sends that to MS2. MS2 preempts T1 and starts working. 288 After that, the T1 regain MCU. 289 290 * - h 291 - The VDED pushes the notification message to the queue of S1. 292 293 * - i 294 - VDED finishes the publishing by unlocking channel A. 295 296 * - j, k, l 297 - The S1 leaves the pending state since channel A is not locked. It gets in the CPU again and 298 starts executing. As it did receive a notification from channel A, it performs a channel read 299 (as simple as lock, memory copy, unlock), continues its execution, and goes out the CPU. 300 301 302HLP priority boost 303------------------ 304ZBus implements the Highest Locker Protocol that relies on the observers' thread priority to 305determine a temporary publisher priority. The protocol considers the channel's Highest Observer 306Priority (HOP); even if the observer is not waiting for a message on the channel, it is considered 307in the calculation. The VDED will elevate the publisher's priority based on the HOP to ensure small 308latency and as few preemptions as possible. 309 310.. note:: 311 The priority boost is enabled by default. To deactivate it, you must set the 312 :kconfig:option:`CONFIG_ZBUS_PRIORITY_BOOST` configuration. 313 314.. warning:: 315 ZBus priority boost does not consider runtime observers on the HOP calculations. 316 317The figure below illustrates the actions performed during the VDED execution when T1 publishes to 318channel A. The scenario considers the priority boost feature and the following priorities: T1 < MS1 319< MS2 < S1. 320 321.. figure:: images/zbus_publishing_process_example_HLP.svg 322 :alt: ZBus publishing process details using priority boost. 323 :width: 85% 324 325 ZBus VDED execution detail with priority boost enabled and for priority T1 < MS1 < MS2 < S1. 326 327To properly use the priority boost, attaching the observer to a thread is necessary. When the 328subscriber is attached to a thread, it assumes its priority, and the priority boost algorithm will 329consider the observer's priority. The following code illustrates the thread-attaching function. 330 331 332.. code-block:: c 333 :emphasize-lines: 10 334 335 ZBUS_SUBSCRIBER_DEFINE(s1, 4); 336 void s1_thread(void *ptr1, void *ptr2, void *ptr3) 337 { 338 ARG_UNUSED(ptr1); 339 ARG_UNUSED(ptr2); 340 ARG_UNUSED(ptr3); 341 342 const struct zbus_channel *chan; 343 344 zbus_obs_attach_to_thread(&s1); 345 346 while (1) { 347 zbus_sub_wait(&s1, &chan, K_FOREVER); 348 349 /* Subscriber implementation */ 350 351 } 352 } 353 K_THREAD_DEFINE(s1_id, CONFIG_MAIN_STACK_SIZE, s1_thread, NULL, NULL, NULL, 2, 0, 0); 354 355On the above code, the :c:func:`zbus_obs_attach_to_thread` will set the ``s1`` observer with 356priority two as the thread has that priority. It is possible to reverse that by detaching the 357observer using the :c:func:`zbus_obs_detach_from_thread`. Only enabled observers and observations 358will be considered on the channel HOP calculation. Masking a specific observation of a channel will 359affect the channel HOP. 360 361In summary, the benefits of the feature are: 362 363* The HLP is more effective for zbus than the mutexes priority inheritance; 364* No bounded priority inversion will happen among the publisher and the observers; 365* No other threads (that are not involved in the communication) with priority between T1 and S1 can 366 preempt T1, avoiding unbounded priority inversion; 367* Message subscribers will wait for the VDED to finish the message delivery process. So the VDED 368 execution will be faster and more consistent; 369* The HLP priority is dynamic and can change in execution; 370* ZBus operations can be used inside ISRs; 371* The priority boosting feature can be turned off, and plain semaphores can be used as the channel 372 lock mechanism; 373* The Highest Locker Protocol's major disadvantage, the Inheritance-related Priority Inversion, is 374 acceptable in the zbus scenario since it will ensure a small bus latency. 375 376 377Limitations 378=========== 379 380Based on the fact that developers can use zbus to solve many different problems, some challenges 381arise. ZBus will not solve every problem, so it is necessary to analyze the situation to be sure 382zbus is applicable. For instance, based on the zbus benchmark, it would not be well suited to a 383high-speed stream of bytes between threads. The :ref:`Pipe <pipes_v2>` kernel object solves this 384kind of need. 385 386Delivery guarantees 387------------------- 388 389ZBus always delivers the messages to the listeners and message subscribers. However, there are no 390message delivery guarantees for subscribers because zbus only sends the notification, but the 391message reading depends on the subscriber's implementation. It is possible to increase the delivery 392rate by following design tips: 393 394* Keep the listeners quick-as-possible (deal with them as ISRs). If some processing is needed, 395 consider submitting a work item to a work-queue; 396* Try to give producers a high priority to avoid losses; 397* Leave spare CPU for observers to consume data produced; 398* Consider using message queues or pipes for intensive byte transfers. 399 400.. warning:: 401 ZBus uses :zephyr_file:`include/zephyr/net_buf.h` (network buffers) to exchange data with message 402 subscribers. Thus, choose carefully the configurations 403 :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE` and 404 :kconfig:option:`CONFIG_HEAP_MEM_POOL_SIZE`. They are crucial to a proper VDED execution 405 (delivery guarantee) considering message subscribers. If you want to keep an isolated pool for a 406 specific set of channels, you can use 407 :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION` with a dedicated pool. Look 408 at the :zephyr:code-sample:`zbus-msg-subscriber` to see the isolation in action. 409 410.. warning:: 411 Subscribers will receive only the reference of the changing channel. A data loss may be perceived 412 if the channel is published twice before the subscriber reads it. The second publication 413 overwrites the value from the first. Thus, the subscriber will receive two notifications, but 414 only the last data is there. 415 416 417.. _zbus delivery sequence: 418 419Message delivery sequence 420------------------------- 421 422The message delivery will follow the precedence: 423 424#. Observers defined in a channel using the :c:macro:`ZBUS_CHAN_DEFINE` (following the definition 425 sequence); 426#. Observers defined using the :c:macro:`ZBUS_CHAN_ADD_OBS` based on the sequence priority 427 (parameter of the macro); 428#. The latest is the runtime observers in the addition sequence using the 429 :c:func:`zbus_chan_add_obs`. 430 431.. note:: 432 The VDED will ignore all disabled observers or observations. 433 434Usage 435***** 436 437ZBus operation depends on channels and observers. Therefore, it is necessary to determine its 438message and observers list during the channel definition. A message is a regular C struct; the 439observer can be a subscriber (asynchronous), a message subscriber (asynchronous), or a listener 440(synchronous). 441 442The following code defines and initializes a regular channel and its dependencies. This channel 443exchanges accelerometer data, for example. 444 445.. code-block:: c 446 447 struct acc_msg { 448 int x; 449 int y; 450 int z; 451 }; 452 453 ZBUS_CHAN_DEFINE(acc_chan, /* Name */ 454 struct acc_msg, /* Message type */ 455 456 NULL, /* Validator */ 457 NULL, /* User Data */ 458 ZBUS_OBSERVERS(my_listener, my_subscriber, 459 my_msg_subscriber), /* observers */ 460 ZBUS_MSG_INIT(.x = 0, .y = 0, .z = 0) /* Initial value */ 461 ); 462 463 void listener_callback_example(const struct zbus_channel *chan) 464 { 465 const struct acc_msg *acc; 466 if (&acc_chan == chan) { 467 acc = zbus_chan_const_msg(chan); // Direct message access 468 LOG_DBG("From listener -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z); 469 } 470 } 471 472 ZBUS_LISTENER_DEFINE(my_listener, listener_callback_example); 473 474 ZBUS_LISTENER_DEFINE(my_listener2, listener_callback_example); 475 476 ZBUS_CHAN_ADD_OBS(acc_chan, my_listener2, 3); 477 478 ZBUS_SUBSCRIBER_DEFINE(my_subscriber, 4); 479 void subscriber_task(void) 480 { 481 const struct zbus_channel *chan; 482 483 while (!zbus_sub_wait(&my_subscriber, &chan, K_FOREVER)) { 484 struct acc_msg acc = {0}; 485 486 if (&acc_chan == chan) { 487 // Indirect message access 488 zbus_chan_read(&acc_chan, &acc, K_NO_WAIT); 489 LOG_DBG("From subscriber -> Acc x=%d, y=%d, z=%d", acc.x, acc.y, acc.z); 490 } 491 } 492 } 493 K_THREAD_DEFINE(subscriber_task_id, 512, subscriber_task, NULL, NULL, NULL, 3, 0, 0); 494 495 ZBUS_MSG_SUBSCRIBER_DEFINE(my_msg_subscriber); 496 static void msg_subscriber_task(void *ptr1, void *ptr2, void *ptr3) 497 { 498 ARG_UNUSED(ptr1); 499 ARG_UNUSED(ptr2); 500 ARG_UNUSED(ptr3); 501 const struct zbus_channel *chan; 502 503 struct acc_msg acc = {0}; 504 505 while (!zbus_sub_wait_msg(&my_msg_subscriber, &chan, &acc, K_FOREVER)) { 506 if (&acc_chan == chan) { 507 LOG_INF("From msg subscriber -> Acc x=%d, y=%d, z=%d", acc.x, acc.y, acc.z); 508 } 509 } 510 } 511 K_THREAD_DEFINE(msg_subscriber_task_id, 1024, msg_subscriber_task, NULL, NULL, NULL, 3, 0, 0); 512 513 514 515It is possible to add static observers to a channel using the :c:macro:`ZBUS_CHAN_ADD_OBS`. We call 516that a post-definition static observer. The command enables us to indicate an initialization 517priority that affects the observers' initialization order. The sequence priority param only affects 518the post-definition static observers. There is no possibility to overwrite the message delivery 519sequence of the static observers. 520 521.. note:: 522 It is unnecessary to claim/lock a channel before accessing the message inside the listener since 523 the event dispatcher calls listeners with the notifying channel already locked. Subscribers, 524 however, must claim/lock that or use regular read operations to access the message after being 525 notified. 526 527 528Channels can have a *validator function* that enables a channel to accept only valid messages. 529Publish attempts invalidated by hard channels will return immediately with an error code. This 530allows original creators of a channel to exert some authority over other developers/publishers who 531may want to piggy-back on their channels. The following code defines and initializes a :dfn:`hard 532channel` and its dependencies. Only valid messages can be published to a :dfn:`hard channel`. It is 533possible because a *validator function* was passed to the channel's definition. In this example, 534only messages with ``move`` equal to 0, -1, and 1 are valid. Publish function will discard all other 535values to ``move``. 536 537.. code-block:: c 538 539 struct control_msg { 540 int move; 541 }; 542 543 bool control_validator(const void* msg, size_t msg_size) { 544 const struct control_msg* cm = msg; 545 bool is_valid = (cm->move == -1) || (cm->move == 0) || (cm->move == 1); 546 return is_valid; 547 } 548 549 static int message_count = 0; 550 551 ZBUS_CHAN_DEFINE(control_chan, /* Name */ 552 struct control_msg, /* Message type */ 553 554 control_validator, /* Validator */ 555 &message_count, /* User data */ 556 ZBUS_OBSERVERS_EMPTY, /* observers */ 557 ZBUS_MSG_INIT(.move = 0) /* Initial value */ 558 ); 559 560The following sections describe in detail how to use zbus features. 561 562 563.. _publishing to a channel: 564 565Publishing to a channel 566======================= 567 568Messages are published to a channel in zbus by calling :c:func:`zbus_chan_pub`. For example, the 569following code builds on the examples above and publishes to channel ``acc_chan``. The code is 570trying to publish the message ``acc1`` to channel ``acc_chan``, and it will wait up to one second 571for the message to be published. Otherwise, the operation fails. As can be inferred from the code 572sample, it's OK to use stack allocated messages since VDED copies the data internally. 573 574.. code-block:: c 575 576 struct acc_msg acc1 = {.x = 1, .y = 1, .z = 1}; 577 zbus_chan_pub(&acc_chan, &acc1, K_SECONDS(1)); 578 579.. warning:: 580 Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout. 581 582.. _reading from a channel: 583 584Reading from a channel 585====================== 586 587Messages are read from a channel in zbus by calling :c:func:`zbus_chan_read`. So, for example, the 588following code tries to read the channel ``acc_chan``, which will wait up to 500 milliseconds to 589read the message. Otherwise, the operation fails. 590 591.. code-block:: c 592 593 struct acc_msg acc = {0}; 594 zbus_chan_read(&acc_chan, &acc, K_MSEC(500)); 595 596.. warning:: 597 Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout. 598 599.. warning:: 600 Choose the timeout of :c:func:`zbus_chan_read` after receiving a notification from 601 :c:func:`zbus_sub_wait` carefully because the channel will always be unavailable during the VDED 602 execution. Using ``K_NO_WAIT`` for reading is highly likely to return a timeout error if there 603 are more than one subscriber. For example, consider the VDED illustration again and notice how 604 ``S1`` read attempts would definitely fail with K_NO_WAIT. For more details, check 605 the `Virtual Distributed Event Dispatcher`_ section. 606 607Notifying a channel 608=================== 609 610It is possible to force zbus to notify a channel's observers by calling :c:func:`zbus_chan_notify`. 611For example, the following code builds on the examples above and forces a notification for the 612channel ``acc_chan``. Note this can send events with no message, which does not require any data 613exchange. See the code example under `Claim and finish a channel`_ where this may become useful. 614 615.. code-block:: c 616 617 zbus_chan_notify(&acc_chan, K_NO_WAIT); 618 619.. warning:: 620 Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout. 621 622Declaring channels and observers 623================================ 624 625For accessing channels or observers from files other than its defining files, it is necessary to 626declare them by calling :c:macro:`ZBUS_CHAN_DECLARE` and :c:macro:`ZBUS_OBS_DECLARE`. In other 627words, zbus channel definitions and declarations with the same channel names in different files 628would point to the same (global) channel. Thus, developers should be careful about existing 629channels, and naming new channels or linking will fail. It is possible to declare more than one 630channel or observer on the same call. The following code builds on the examples above and displays 631the defined channels and observers. 632 633.. code-block:: c 634 635 ZBUS_OBS_DECLARE(my_listener, my_subscriber); 636 ZBUS_CHAN_DECLARE(acc_chan, version_chan); 637 638 639Iterating over channels and observers 640===================================== 641 642ZBus subsystem also implements :ref:`Iterable Sections <iterable_sections_api>` for channels and 643observers, for which there are supporting APIs like :c:func:`zbus_iterate_over_channels`, 644:c:func:`zbus_iterate_over_channels_with_user_data`, :c:func:`zbus_iterate_over_observers` and 645:c:func:`zbus_iterate_over_observers_with_user_data`. This feature enables developers to call a 646procedure over all declared channels, where the procedure parameter is a :c:struct:`zbus_channel`. 647The execution sequence is in the alphabetical name order of the channels (see :ref:`Iterable 648Sections <iterable_sections_api>` documentation for details). ZBus also implements this feature for 649:c:struct:`zbus_observer`. 650 651.. code-block:: c 652 653 static bool print_channel_data_iterator(const struct zbus_channel *chan, void *user_data) 654 { 655 int *count = user_data; 656 657 LOG_INF("%d - Channel %s:", *count, zbus_chan_name(chan)); 658 LOG_INF(" Message size: %d", zbus_chan_msg_size(chan)); 659 LOG_INF(" Observers:"); 660 661 ++(*count); 662 663 struct zbus_channel_observation *observation; 664 665 for (int16_t i = *chan->observers_start_idx, limit = *chan->observers_end_idx; i < limit; 666 ++i) { 667 STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); 668 669 LOG_INF(" - %s", observation->obs->name); 670 } 671 672 struct zbus_observer_node *obs_nd, *tmp; 673 674 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->observers, obs_nd, tmp, node) { 675 LOG_INF(" - %s", obs_nd->obs->name); 676 } 677 678 return true; 679 } 680 681 static bool print_observer_data_iterator(const struct zbus_observer *obs, void *user_data) 682 { 683 int *count = user_data; 684 685 LOG_INF("%d - %s %s", *count, obs->queue ? "Subscriber" : "Listener", zbus_obs_name(obs)); 686 687 ++(*count); 688 689 return true; 690 } 691 692 int main(void) 693 { 694 int count = 0; 695 696 LOG_INF("Channel list:"); 697 698 zbus_iterate_over_channels_with_user_data(print_channel_data_iterator, &count); 699 700 count = 0; 701 702 LOG_INF("Observers list:"); 703 704 zbus_iterate_over_observers_with_user_data(print_observer_data_iterator, &count); 705 706 return 0; 707 } 708 709 710The code will log the following output: 711 712.. code-block:: console 713 714 D: Channel list: 715 D: 0 - Channel acc_chan: 716 D: Message size: 12 717 D: Observers: 718 D: - my_listener 719 D: - my_subscriber 720 D: 1 - Channel version_chan: 721 D: Message size: 4 722 D: Observers: 723 D: Observers list: 724 D: 0 - Listener my_listener 725 D: 1 - Subscriber my_subscriber 726 727 728.. _Claim and finish a channel: 729 730Advanced channel control 731======================== 732 733ZBus was designed to be as flexible and extensible as possible. Thus, there are some features 734designed to provide some control and extensibility to the bus. 735 736Listeners message access 737------------------------ 738 739For performance purposes, listeners can access the receiving channel message directly since they 740already have the channel locked for it. To access the channel's message, the listener should use the 741:c:func:`zbus_chan_const_msg` because the channel passed as an argument to the listener function is 742a constant pointer to the channel. The const pointer return type tells developers not to modify the 743message. 744 745.. code-block:: c 746 747 void listener_callback_example(const struct zbus_channel *chan) 748 { 749 const struct acc_msg *acc; 750 if (&acc_chan == chan) { 751 acc = zbus_chan_const_msg(chan); // Use this 752 // instead of zbus_chan_read(chan, &acc, K_MSEC(200)) 753 // or zbus_chan_msg(chan) 754 755 LOG_DBG("From listener -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z); 756 } 757 } 758 759User Data 760--------- 761It is possible to pass custom data into the channel's ``user_data`` for various purposes, such as 762writing channel metadata. That can be achieved by passing a pointer to the channel definition 763macro's ``user_data`` field, which will then be accessible by others. Note that ``user_data`` is 764individual for each channel. Also, note that ``user_data`` access is not thread-safe. For 765thread-safe access to ``user_data``, see the next section. 766 767 768Claim and finish a channel 769-------------------------- 770 771To take more control over channels, two functions were added :c:func:`zbus_chan_claim` and 772:c:func:`zbus_chan_finish`. With these functions, it is possible to access the channel's metadata 773safely. When a channel is claimed, no actions are available to that channel. After finishing the 774channel, all the actions are available again. 775 776.. warning:: 777 Never change the fields of the channel struct directly. It may cause zbus behavior 778 inconsistencies and scheduling issues. 779 780.. warning:: 781 Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout. 782 783The following code builds on the examples above and claims the ``acc_chan`` to set the ``user_data`` 784to the channel. Suppose we would like to count how many times the channels exchange messages. We 785defined the ``user_data`` to have the 32 bits integer. This code could be added to the listener code 786described above. 787 788.. code-block:: c 789 790 if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) { 791 int *message_counting = (int *) zbus_chan_user_data(&acc_chan); 792 *message_counting += 1; 793 zbus_chan_finish(&acc_chan); 794 } 795 796The following code has the exact behavior of the code in :ref:`publishing to a channel`. 797 798.. code-block:: c 799 800 if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) { 801 struct acc_msg *acc1 = (struct acc_msg *) zbus_chan_msg(&acc_chan); 802 acc1.x = 1; 803 acc1.y = 1; 804 acc1.z = 1; 805 zbus_chan_finish(&acc_chan); 806 zbus_chan_notify(&acc_chan, K_SECONDS(1)); 807 } 808 809The following code has the exact behavior of the code in :ref:`reading from a channel`. 810 811.. code-block:: c 812 813 if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) { 814 const struct acc_msg *acc1 = (const struct acc_msg *) zbus_chan_const_msg(&acc_chan); 815 // access the acc_msg fields directly. 816 zbus_chan_finish(&acc_chan); 817 } 818 819 820Runtime observer registration 821----------------------------- 822 823It is possible to add observers to channels in runtime. This feature uses the heap to allocate the 824nodes dynamically. The heap size limits the number of dynamic observers zbus can create. Therefore, 825set the :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS` to enable the feature. It is possible to 826adjust the heap size by changing the configuration :kconfig:option:`CONFIG_HEAP_MEM_POOL_SIZE`. The 827following example illustrates the runtime registration usage. 828 829 830 831.. code-block:: c 832 833 ZBUS_LISTENER_DEFINE(my_listener, callback); 834 // ... 835 void thread_entry(void) { 836 // ... 837 /* Adding the observer to channel chan1 */ 838 zbus_chan_add_obs(&chan1, &my_listener, K_NO_WAIT); 839 /* Removing the observer from channel chan1 */ 840 zbus_chan_rm_obs(&chan1, &my_listener, K_NO_WAIT); 841 842 843Samples 844******* 845 846For a complete overview of zbus usage, take a look at the samples. There are the following samples 847available: 848 849* :zephyr:code-sample:`zbus-hello-world` illustrates the code used above in action; 850* :zephyr:code-sample:`zbus-work-queue` shows how to define and use different kinds of observers. 851 Note there is an example of using a work queue instead of executing the listener as an execution 852 option; 853* :zephyr:code-sample:`zbus-msg-subscriber` illustrates how to use message subscribers; 854* :zephyr:code-sample:`zbus-dyn-channel` demonstrates how to use dynamically allocated exchanging 855 data in zbus; 856* :zephyr:code-sample:`zbus-uart-bridge` shows an example of sending the operation of the channel to 857 a host via serial; 858* :zephyr:code-sample:`zbus-remote-mock` illustrates how to implement an external mock (on the host) 859 to send and receive messages to and from the bus; 860* :zephyr:code-sample:`zbus-priority-boost` illustrates zbus priority boost feature with a priority 861 inversion scenario; 862* :zephyr:code-sample:`zbus-runtime-obs-registration` illustrates a way of using the runtime 863 observer registration feature; 864* :zephyr:code-sample:`zbus-confirmed-channel` implements a way of implement confirmed channel only 865 with subscribers; 866* :zephyr:code-sample:`zbus-benchmark` implements a benchmark with different combinations of inputs. 867 868Suggested Uses 869************** 870 871Use zbus to transfer data (messages) between threads in one-to-one, one-to-many, and many-to-many 872synchronously or asynchronously. Choosing the proper observer type is crucial. Use subscribers for 873scenarios that can tolerate message losses and duplications; when they cannot, use message 874subscribers (if you need a thread) or listeners (if you need to be lean and fast). In addition to 875the listener, another asynchronous message processing mechanism (like :ref:`message queues 876<message_queues_v2>`) may be necessary to retain the pending message until it gets processed. 877 878.. note:: 879 ZBus can be used to transfer streams from the producer to the consumer. However, this can 880 increase zbus' communication latency. So maybe consider a Pipe a good alternative for this 881 communication topology. 882 883Configuration Options 884********************* 885 886For enabling zbus, it is necessary to enable the :kconfig:option:`CONFIG_ZBUS` option. 887 888Related configuration options: 889 890* :kconfig:option:`CONFIG_ZBUS_PRIORITY_BOOST` zbus Highest Locker Protocol implementation; 891* :kconfig:option:`CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY` determine the :c:macro:`SYS_INIT` 892 priority used by zbus to organize the channels observations by channel; 893* :kconfig:option:`CONFIG_ZBUS_CHANNEL_NAME` enables the name of channels to be available inside the 894 channels metadata. The log uses this information to show the channels' names; 895* :kconfig:option:`CONFIG_ZBUS_OBSERVER_NAME` enables the name of observers to be available inside 896 the channels metadata; 897* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER` enables the message subscriber observer type; 898* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC` uses the heap to allocate message 899 buffers; 900* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_STATIC` uses the stack to allocate message 901 buffers; 902* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE` the available number of message 903 buffers to be used simultaneously; 904* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION` enables the developer to isolate 905 a pool for the message subscriber for a set of channels; 906* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE` the biggest message of zbus 907 channels to be transported into a message buffer; 908* :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS` enables the runtime observer registration. 909 910API Reference 911************* 912 913.. doxygengroup:: zbus_apis 914