1 /*
2  * Copyright (c) 2022 Intel Corporation.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 #include <zephyr/sys/__assert.h>
7 #include <zephyr/logging/log.h>
8 #include <zephyr/rtio/rtio.h>
9 #include <zephyr/sensing/sensing_sensor.h>
10 #include "sensor_mgmt.h"
11 
12 LOG_MODULE_DECLARE(sensing, CONFIG_SENSING_LOG_LEVEL);
13 
14 /* check whether it is right time for client to consume this sample */
sensor_test_consume_time(struct sensing_sensor * sensor,struct sensing_connection * conn,uint64_t cur_time)15 static inline bool sensor_test_consume_time(struct sensing_sensor *sensor,
16 				     struct sensing_connection *conn,
17 				     uint64_t cur_time)
18 {
19 	LOG_DBG("sensor:%s next_consume_time:%lld cur_time:%lld",
20 			sensor->dev->name, conn->next_consume_time, cur_time);
21 
22 	return conn->next_consume_time <= cur_time;
23 }
24 
update_client_consume_time(struct sensing_sensor * sensor,struct sensing_connection * conn)25 static void update_client_consume_time(struct sensing_sensor *sensor,
26 				       struct sensing_connection *conn)
27 {
28 	uint32_t interval = conn->interval;
29 
30 	if (conn->next_consume_time == 0) {
31 		conn->next_consume_time = get_us();
32 	}
33 
34 	conn->next_consume_time += interval;
35 }
36 
37 /* send data to clients based on interval and sensitivity */
send_data_to_clients(struct sensing_sensor * sensor,void * data)38 static int send_data_to_clients(struct sensing_sensor *sensor,
39 				void *data)
40 {
41 	struct sensing_sensor *client;
42 	struct sensing_connection *conn;
43 
44 	for_each_client_conn(sensor, conn) {
45 		client = conn->sink;
46 		LOG_DBG("sensor:%s send data to client:%p", conn->source->dev->name, conn);
47 
48 		if (!is_client_request_data(conn)) {
49 			continue;
50 		}
51 
52 		/* sensor_test_consume_time(), check whether time is ready or not:
53 		 * true: it's time for client consuming the data
54 		 * false: client time not arrived yet, not consume the data
55 		 */
56 		if (!sensor_test_consume_time(sensor, conn, get_us())) {
57 			continue;
58 		}
59 
60 		update_client_consume_time(sensor, conn);
61 
62 		if (!conn->callback_list->on_data_event) {
63 			LOG_WRN("sensor:%s event callback not registered",
64 					conn->source->dev->name);
65 			continue;
66 		}
67 		conn->callback_list->on_data_event(conn, data,
68 				conn->callback_list->context);
69 	}
70 
71 	return 0;
72 }
73 
74 STRUCT_SECTION_START_EXTERN(sensing_sensor);
75 STRUCT_SECTION_END_EXTERN(sensing_sensor);
76 
dispatch_task(void * a,void * b,void * c)77 static void dispatch_task(void *a, void *b, void *c)
78 {
79 	uint8_t *data = NULL;
80 	uint32_t data_len = 0;
81 	int rc;
82 	int get_data_rc;
83 
84 	ARG_UNUSED(a);
85 	ARG_UNUSED(b);
86 	ARG_UNUSED(c);
87 
88 	if (IS_ENABLED(CONFIG_USERSPACE) && !k_is_user_context()) {
89 		rtio_access_grant(&sensing_rtio_ctx, k_current_get());
90 		k_thread_user_mode_enter(dispatch_task, a, b, c);
91 	}
92 
93 	while (true) {
94 		struct rtio_cqe cqe;
95 
96 		rc = rtio_cqe_copy_out(&sensing_rtio_ctx, &cqe, 1, K_FOREVER);
97 		if (rc < 1) {
98 			continue;
99 		}
100 
101 		/* Cache the data from the CQE */
102 		rc = cqe.result;
103 
104 		/* Get the associated data */
105 		get_data_rc =
106 			rtio_cqe_get_mempool_buffer(&sensing_rtio_ctx, &cqe, &data, &data_len);
107 		if (get_data_rc != 0 || data_len == 0) {
108 			continue;
109 		}
110 
111 		if ((uintptr_t)cqe.userdata >=
112 			    (uintptr_t)STRUCT_SECTION_START(sensing_sensor) &&
113 		    (uintptr_t)cqe.userdata < (uintptr_t)STRUCT_SECTION_END(sensing_sensor)) {
114 			struct sensing_sensor *sensor = cqe.userdata;
115 
116 			send_data_to_clients(sensor, data);
117 		}
118 
119 		rtio_release_buffer(&sensing_rtio_ctx, data, data_len);
120 	}
121 }
122 
123 K_THREAD_DEFINE(sensing_dispatch, CONFIG_SENSING_DISPATCH_THREAD_STACK_SIZE, dispatch_task,
124 		NULL, NULL, NULL, CONFIG_SENSING_DISPATCH_THREAD_PRIORITY, 0, 0);
125