1 /*
2  * Copyright (c) 2023 Basalte bv
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <string.h>
8 #include <zephyr/logging/log.h>
9 LOG_MODULE_DECLARE(net_coap, CONFIG_COAP_LOG_LEVEL);
10 
11 #include <zephyr/net/socket.h>
12 
13 #include <zephyr/init.h>
14 #include <zephyr/kernel.h>
15 #include <zephyr/net/coap.h>
16 #include <zephyr/net/coap_link_format.h>
17 #include <zephyr/net/coap_mgmt.h>
18 #include <zephyr/net/coap_service.h>
19 #include <zephyr/posix/fcntl.h>
20 
21 #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
22 /* Lowest priority cooperative thread */
23 #define THREAD_PRIORITY K_PRIO_COOP(CONFIG_NUM_COOP_PRIORITIES - 1)
24 #else
25 #define THREAD_PRIORITY K_PRIO_PREEMPT(CONFIG_NUM_PREEMPT_PRIORITIES - 1)
26 #endif
27 
28 #define ADDRLEN(sock) \
29 	(((struct sockaddr *)sock)->sa_family == AF_INET ? \
30 		sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))
31 
32 /* Shortened defines */
33 #define MAX_OPTIONS    CONFIG_COAP_SERVER_MESSAGE_OPTIONS
34 #define MAX_PENDINGS   CONFIG_COAP_SERVICE_PENDING_MESSAGES
35 #define MAX_OBSERVERS  CONFIG_COAP_SERVICE_OBSERVERS
36 #define MAX_POLL_FD    CONFIG_ZVFS_POLL_MAX
37 
38 BUILD_ASSERT(CONFIG_ZVFS_POLL_MAX > 0, "CONFIG_ZVFS_POLL_MAX can't be 0");
39 
40 static K_MUTEX_DEFINE(lock);
41 static int control_socks[2];
42 
43 #if defined(CONFIG_COAP_SERVER_PENDING_ALLOCATOR_STATIC)
44 K_MEM_SLAB_DEFINE_STATIC(pending_data, CONFIG_COAP_SERVER_MESSAGE_SIZE,
45 			 CONFIG_COAP_SERVER_PENDING_ALLOCATOR_STATIC_BLOCKS, 4);
46 #endif
47 
coap_server_alloc(size_t len)48 static inline void *coap_server_alloc(size_t len)
49 {
50 #if defined(CONFIG_COAP_SERVER_PENDING_ALLOCATOR_STATIC)
51 	void *ptr;
52 	int ret;
53 
54 	if (len > CONFIG_COAP_SERVER_MESSAGE_SIZE) {
55 		return NULL;
56 	}
57 
58 	ret = k_mem_slab_alloc(&pending_data, &ptr, K_NO_WAIT);
59 	if (ret < 0) {
60 		return NULL;
61 	}
62 
63 	return ptr;
64 #elif defined(CONFIG_COAP_SERVER_PENDING_ALLOCATOR_SYSTEM_HEAP)
65 	return k_malloc(len);
66 #else
67 	ARG_UNUSED(len);
68 
69 	return NULL;
70 #endif
71 }
72 
coap_server_free(void * ptr)73 static inline void coap_server_free(void *ptr)
74 {
75 #if defined(CONFIG_COAP_SERVER_PENDING_ALLOCATOR_STATIC)
76 	k_mem_slab_free(&pending_data, ptr);
77 #elif defined(CONFIG_COAP_SERVER_PENDING_ALLOCATOR_SYSTEM_HEAP)
78 	k_free(ptr);
79 #else
80 	ARG_UNUSED(ptr);
81 #endif
82 }
83 
coap_service_remove_observer(const struct coap_service * service,struct coap_resource * resource,const struct sockaddr * addr,const uint8_t * token,uint8_t tkl)84 static int coap_service_remove_observer(const struct coap_service *service,
85 					struct coap_resource *resource,
86 					const struct sockaddr *addr,
87 					const uint8_t *token, uint8_t tkl)
88 {
89 	struct coap_observer *obs;
90 
91 	if (tkl > 0 && addr != NULL) {
92 		/* Prefer addr+token to find the observer */
93 		obs = coap_find_observer(service->data->observers, MAX_OBSERVERS, addr, token, tkl);
94 	} else if (tkl > 0) {
95 		/* Then try to find the observer by token */
96 		obs = coap_find_observer_by_token(service->data->observers, MAX_OBSERVERS, token,
97 						  tkl);
98 	} else if (addr != NULL) {
99 		obs = coap_find_observer_by_addr(service->data->observers, MAX_OBSERVERS, addr);
100 	} else {
101 		/* Either a token or an address is required */
102 		return -EINVAL;
103 	}
104 
105 	if (obs == NULL) {
106 		return 0;
107 	}
108 
109 	if (resource == NULL) {
110 		COAP_SERVICE_FOREACH_RESOURCE(service, it) {
111 			if (coap_remove_observer(it, obs)) {
112 				memset(obs, 0, sizeof(*obs));
113 				return 1;
114 			}
115 		}
116 	} else if (coap_remove_observer(resource, obs)) {
117 		memset(obs, 0, sizeof(*obs));
118 		return 1;
119 	}
120 
121 	return 0;
122 }
123 
coap_server_process(int sock_fd)124 static int coap_server_process(int sock_fd)
125 {
126 	static uint8_t buf[CONFIG_COAP_SERVER_MESSAGE_SIZE];
127 
128 	struct sockaddr client_addr;
129 	socklen_t client_addr_len = sizeof(client_addr);
130 	struct coap_service *service = NULL;
131 	struct coap_packet request;
132 	struct coap_pending *pending;
133 	struct coap_option options[MAX_OPTIONS] = { 0 };
134 	uint8_t opt_num = MAX_OPTIONS;
135 	uint8_t type;
136 	ssize_t received;
137 	int ret;
138 	int flags = ZSOCK_MSG_DONTWAIT;
139 
140 	if (IS_ENABLED(CONFIG_COAP_SERVER_TRUNCATE_MSGS)) {
141 		flags |= ZSOCK_MSG_TRUNC;
142 	}
143 
144 	received = zsock_recvfrom(sock_fd, buf, sizeof(buf), flags, &client_addr, &client_addr_len);
145 
146 	if (received < 0) {
147 		if (errno == EWOULDBLOCK) {
148 			return 0;
149 		}
150 
151 		LOG_ERR("Failed to process client request (%d)", -errno);
152 		return -errno;
153 	}
154 
155 	ret = coap_packet_parse(&request, buf, MIN(received, sizeof(buf)), options, opt_num);
156 	if (ret < 0) {
157 		LOG_ERR("Failed To parse coap message (%d)", ret);
158 		return ret;
159 	}
160 
161 	(void)k_mutex_lock(&lock, K_FOREVER);
162 	/* Find the active service */
163 	COAP_SERVICE_FOREACH(svc) {
164 		if (svc->data->sock_fd == sock_fd) {
165 			service = svc;
166 			break;
167 		}
168 	}
169 	if (service == NULL) {
170 		ret = -ENOENT;
171 		goto unlock;
172 	}
173 
174 	type = coap_header_get_type(&request);
175 
176 	if (received > sizeof(buf)) {
177 		/* The message was truncated and can't be processed further */
178 		struct coap_packet response;
179 		uint8_t token[COAP_TOKEN_MAX_LEN];
180 		uint8_t tkl = coap_header_get_token(&request, token);
181 		uint16_t id = coap_header_get_id(&request);
182 
183 		if (type == COAP_TYPE_CON) {
184 			type = COAP_TYPE_ACK;
185 		} else {
186 			type = COAP_TYPE_NON_CON;
187 		}
188 
189 		ret = coap_packet_init(&response, buf, sizeof(buf), COAP_VERSION_1, type, tkl,
190 				       token, COAP_RESPONSE_CODE_REQUEST_TOO_LARGE, id);
191 		if (ret < 0) {
192 			LOG_ERR("Failed to init response (%d)", ret);
193 			goto unlock;
194 		}
195 
196 		ret = coap_append_option_int(&response, COAP_OPTION_SIZE1,
197 					     CONFIG_COAP_SERVER_MESSAGE_SIZE);
198 		if (ret < 0) {
199 			LOG_ERR("Failed to add SIZE1 option (%d)", ret);
200 			goto unlock;
201 		}
202 
203 		ret = coap_service_send(service, &response, &client_addr, client_addr_len, NULL);
204 		if (ret < 0) {
205 			LOG_ERR("Failed to reply \"Request Entity Too Large\" (%d)", ret);
206 			goto unlock;
207 		}
208 
209 		goto unlock;
210 	}
211 
212 	pending = coap_pending_received(&request, service->data->pending, MAX_PENDINGS);
213 	if (pending) {
214 		uint8_t token[COAP_TOKEN_MAX_LEN];
215 		uint8_t tkl;
216 
217 		switch (type) {
218 		case COAP_TYPE_RESET:
219 			tkl = coap_header_get_token(&request, token);
220 			coap_service_remove_observer(service, NULL, &client_addr, token, tkl);
221 			__fallthrough;
222 		case COAP_TYPE_ACK:
223 			coap_server_free(pending->data);
224 			coap_pending_clear(pending);
225 			break;
226 		default:
227 			LOG_WRN("Unexpected pending type %d", type);
228 			ret = -EINVAL;
229 			goto unlock;
230 		}
231 
232 		goto unlock;
233 	} else if (type == COAP_TYPE_ACK || type == COAP_TYPE_RESET) {
234 		LOG_WRN("Unexpected type %d without pending packet", type);
235 		ret = -EINVAL;
236 		goto unlock;
237 	}
238 
239 	if (IS_ENABLED(CONFIG_COAP_SERVER_WELL_KNOWN_CORE) &&
240 	    coap_header_get_code(&request) == COAP_METHOD_GET &&
241 	    coap_uri_path_match(COAP_WELL_KNOWN_CORE_PATH, options, opt_num)) {
242 		uint8_t well_known_buf[CONFIG_COAP_SERVER_MESSAGE_SIZE];
243 		struct coap_packet response;
244 
245 		ret = coap_well_known_core_get_len(service->res_begin,
246 						   COAP_SERVICE_RESOURCE_COUNT(service),
247 						   &request, &response,
248 						   well_known_buf, sizeof(well_known_buf));
249 		if (ret < 0) {
250 			LOG_ERR("Failed to build well known core for %s (%d)", service->name, ret);
251 			goto unlock;
252 		}
253 
254 		ret = coap_service_send(service, &response, &client_addr, client_addr_len, NULL);
255 	} else {
256 		ret = coap_handle_request_len(&request, service->res_begin,
257 					      COAP_SERVICE_RESOURCE_COUNT(service),
258 					      options, opt_num, &client_addr, client_addr_len);
259 
260 		/* Translate errors to response codes */
261 		switch (ret) {
262 		case -ENOENT:
263 			ret = COAP_RESPONSE_CODE_NOT_FOUND;
264 			break;
265 		case -ENOTSUP:
266 			ret = COAP_RESPONSE_CODE_BAD_REQUEST;
267 			break;
268 		case -EPERM:
269 			ret = COAP_RESPONSE_CODE_NOT_ALLOWED;
270 			break;
271 		}
272 
273 		/* Shortcut for replying a code without a body */
274 		if (ret > 0 && type == COAP_TYPE_CON) {
275 			/* Minimal sized ack buffer */
276 			uint8_t ack_buf[COAP_TOKEN_MAX_LEN + 4U];
277 			struct coap_packet ack;
278 
279 			ret = coap_ack_init(&ack, &request, ack_buf, sizeof(ack_buf), (uint8_t)ret);
280 			if (ret < 0) {
281 				LOG_ERR("Failed to init ACK (%d)", ret);
282 				goto unlock;
283 			}
284 
285 			ret = coap_service_send(service, &ack, &client_addr, client_addr_len, NULL);
286 		}
287 	}
288 
289 unlock:
290 	(void)k_mutex_unlock(&lock);
291 
292 	return ret;
293 }
294 
coap_server_retransmit(void)295 static void coap_server_retransmit(void)
296 {
297 	struct coap_pending *pending;
298 	int64_t remaining;
299 	int64_t now = k_uptime_get();
300 	int ret;
301 
302 	(void)k_mutex_lock(&lock, K_FOREVER);
303 
304 	COAP_SERVICE_FOREACH(service) {
305 		if (service->data->sock_fd < 0) {
306 			continue;
307 		}
308 
309 		pending = coap_pending_next_to_expire(service->data->pending, MAX_PENDINGS);
310 		if (pending == NULL) {
311 			/* No work to be done */
312 			continue;
313 		}
314 
315 		/* Check if the pending request has expired */
316 		remaining = pending->t0 + pending->timeout - now;
317 		if (remaining > 0) {
318 			continue;
319 		}
320 
321 		if (coap_pending_cycle(pending)) {
322 			ret = zsock_sendto(service->data->sock_fd, pending->data, pending->len, 0,
323 					   &pending->addr, ADDRLEN(&pending->addr));
324 			if (ret < 0) {
325 				LOG_ERR("Failed to send pending retransmission for %s (%d)",
326 					service->name, ret);
327 			}
328 			__ASSERT_NO_MSG(ret == pending->len);
329 		} else {
330 			LOG_WRN("Packet retransmission failed for %s", service->name);
331 
332 			coap_service_remove_observer(service, NULL, &pending->addr, NULL, 0U);
333 			coap_server_free(pending->data);
334 			coap_pending_clear(pending);
335 		}
336 	}
337 
338 	(void)k_mutex_unlock(&lock);
339 }
340 
coap_server_poll_timeout(void)341 static int coap_server_poll_timeout(void)
342 {
343 	struct coap_pending *pending;
344 	int64_t result = INT64_MAX;
345 	int64_t remaining;
346 	int64_t now = k_uptime_get();
347 
348 	COAP_SERVICE_FOREACH(svc) {
349 		if (svc->data->sock_fd < -1) {
350 			continue;
351 		}
352 
353 		pending = coap_pending_next_to_expire(svc->data->pending, MAX_PENDINGS);
354 		if (pending == NULL) {
355 			continue;
356 		}
357 
358 		remaining = pending->t0 + pending->timeout - now;
359 		if (result > remaining) {
360 			result = remaining;
361 		}
362 	}
363 
364 	if (result == INT64_MAX) {
365 		return -1;
366 	}
367 
368 	return MAX(result, 0);
369 }
370 
coap_server_update_services(void)371 static void coap_server_update_services(void)
372 {
373 	if (zsock_send(control_socks[1], &(char){0}, 1, 0) < 0) {
374 		LOG_ERR("Failed to notify server thread (%d)", errno);
375 	}
376 }
377 
coap_service_in_section(const struct coap_service * service)378 static inline bool coap_service_in_section(const struct coap_service *service)
379 {
380 	STRUCT_SECTION_START_EXTERN(coap_service);
381 	STRUCT_SECTION_END_EXTERN(coap_service);
382 
383 	return STRUCT_SECTION_START(coap_service) <= service &&
384 	       STRUCT_SECTION_END(coap_service) > service;
385 }
386 
coap_service_raise_event(const struct coap_service * service,uint32_t mgmt_event)387 static inline void coap_service_raise_event(const struct coap_service *service, uint32_t mgmt_event)
388 {
389 #if defined(CONFIG_NET_MGMT_EVENT_INFO)
390 	const struct net_event_coap_service net_event = {
391 		.service = service,
392 	};
393 
394 	net_mgmt_event_notify_with_info(mgmt_event, NULL, (void *)&net_event, sizeof(net_event));
395 #else
396 	ARG_UNUSED(service);
397 
398 	net_mgmt_event_notify(mgmt_event, NULL);
399 #endif
400 }
401 
coap_service_start(const struct coap_service * service)402 int coap_service_start(const struct coap_service *service)
403 {
404 	int ret;
405 
406 	uint8_t af;
407 	socklen_t len;
408 	struct sockaddr_storage addr_storage;
409 	union {
410 		struct sockaddr *addr;
411 		struct sockaddr_in *addr4;
412 		struct sockaddr_in6 *addr6;
413 	} addr_ptrs = {
414 		.addr = (struct sockaddr *)&addr_storage,
415 	};
416 
417 	if (!coap_service_in_section(service)) {
418 		__ASSERT_NO_MSG(false);
419 		return -EINVAL;
420 	}
421 
422 	k_mutex_lock(&lock, K_FOREVER);
423 
424 	if (service->data->sock_fd >= 0) {
425 		ret = -EALREADY;
426 		goto end;
427 	}
428 
429 	/* set the default address (in6addr_any / INADDR_ANY are all 0) */
430 	addr_storage = (struct sockaddr_storage){0};
431 	if (IS_ENABLED(CONFIG_NET_IPV6) && service->host != NULL &&
432 	    zsock_inet_pton(AF_INET6, service->host, &addr_ptrs.addr6->sin6_addr) == 1) {
433 		/* if a literal IPv6 address is provided as the host, use IPv6 */
434 		af = AF_INET6;
435 		len = sizeof(struct sockaddr_in6);
436 
437 		addr_ptrs.addr6->sin6_family = AF_INET6;
438 		addr_ptrs.addr6->sin6_port = htons(*service->port);
439 	} else if (IS_ENABLED(CONFIG_NET_IPV4) && service->host != NULL &&
440 		   zsock_inet_pton(AF_INET, service->host, &addr_ptrs.addr4->sin_addr) == 1) {
441 		/* if a literal IPv4 address is provided as the host, use IPv4 */
442 		af = AF_INET;
443 		len = sizeof(struct sockaddr_in);
444 
445 		addr_ptrs.addr4->sin_family = AF_INET;
446 		addr_ptrs.addr4->sin_port = htons(*service->port);
447 	} else if (IS_ENABLED(CONFIG_NET_IPV6)) {
448 		/* prefer IPv6 if both IPv6 and IPv4 are supported */
449 		af = AF_INET6;
450 		len = sizeof(struct sockaddr_in6);
451 
452 		addr_ptrs.addr6->sin6_family = AF_INET6;
453 		addr_ptrs.addr6->sin6_port = htons(*service->port);
454 	} else if (IS_ENABLED(CONFIG_NET_IPV4)) {
455 		af = AF_INET;
456 		len = sizeof(struct sockaddr_in);
457 
458 		addr_ptrs.addr4->sin_family = AF_INET;
459 		addr_ptrs.addr4->sin_port = htons(*service->port);
460 	} else {
461 		ret = -ENOTSUP;
462 		goto end;
463 	}
464 
465 	service->data->sock_fd = zsock_socket(af, SOCK_DGRAM, IPPROTO_UDP);
466 	if (service->data->sock_fd < 0) {
467 		ret = -errno;
468 		goto end;
469 	}
470 
471 	ret = zsock_fcntl(service->data->sock_fd, F_SETFL, O_NONBLOCK);
472 	if (ret < 0) {
473 		ret = -errno;
474 		goto close;
475 	}
476 
477 	ret = zsock_bind(service->data->sock_fd, addr_ptrs.addr, len);
478 	if (ret < 0) {
479 		ret = -errno;
480 		goto close;
481 	}
482 
483 	if (*service->port == 0) {
484 		/* ephemeral port - read back the port number */
485 		len = sizeof(addr_storage);
486 		ret = zsock_getsockname(service->data->sock_fd, addr_ptrs.addr, &len);
487 		if (ret < 0) {
488 			goto close;
489 		}
490 
491 		if (af == AF_INET6) {
492 			*service->port = addr_ptrs.addr6->sin6_port;
493 		} else {
494 			*service->port = addr_ptrs.addr4->sin_port;
495 		}
496 	}
497 
498 end:
499 	k_mutex_unlock(&lock);
500 
501 	coap_server_update_services();
502 
503 	coap_service_raise_event(service, NET_EVENT_COAP_SERVICE_STARTED);
504 
505 	return ret;
506 
507 close:
508 	(void)zsock_close(service->data->sock_fd);
509 	service->data->sock_fd = -1;
510 
511 	k_mutex_unlock(&lock);
512 
513 	return ret;
514 }
515 
coap_service_stop(const struct coap_service * service)516 int coap_service_stop(const struct coap_service *service)
517 {
518 	int ret;
519 
520 	if (!coap_service_in_section(service)) {
521 		__ASSERT_NO_MSG(false);
522 		return -EINVAL;
523 	}
524 
525 	k_mutex_lock(&lock, K_FOREVER);
526 
527 	if (service->data->sock_fd < 0) {
528 		k_mutex_unlock(&lock);
529 		return -EALREADY;
530 	}
531 
532 	/* Closing a socket will trigger a poll event */
533 	ret = zsock_close(service->data->sock_fd);
534 	service->data->sock_fd = -1;
535 
536 	k_mutex_unlock(&lock);
537 
538 	coap_service_raise_event(service, NET_EVENT_COAP_SERVICE_STOPPED);
539 
540 	return ret;
541 }
542 
coap_service_is_running(const struct coap_service * service)543 int coap_service_is_running(const struct coap_service *service)
544 {
545 	int ret;
546 
547 	if (!coap_service_in_section(service)) {
548 		__ASSERT_NO_MSG(false);
549 		return -EINVAL;
550 	}
551 
552 	k_mutex_lock(&lock, K_FOREVER);
553 
554 	ret = (service->data->sock_fd < 0) ? 0 : 1;
555 
556 	k_mutex_unlock(&lock);
557 
558 	return ret;
559 }
560 
coap_service_send(const struct coap_service * service,const struct coap_packet * cpkt,const struct sockaddr * addr,socklen_t addr_len,const struct coap_transmission_parameters * params)561 int coap_service_send(const struct coap_service *service, const struct coap_packet *cpkt,
562 		      const struct sockaddr *addr, socklen_t addr_len,
563 		      const struct coap_transmission_parameters *params)
564 {
565 	int ret;
566 
567 	if (!coap_service_in_section(service)) {
568 		__ASSERT_NO_MSG(false);
569 		return -EINVAL;
570 	}
571 
572 	(void)k_mutex_lock(&lock, K_FOREVER);
573 
574 	if (service->data->sock_fd < 0) {
575 		(void)k_mutex_unlock(&lock);
576 		return -EBADF;
577 	}
578 
579 	/*
580 	 * Check if we should start with retransmits, if creating a pending message fails we still
581 	 * try to send.
582 	 */
583 	if (coap_header_get_type(cpkt) == COAP_TYPE_CON) {
584 		struct coap_pending *pending = coap_pending_next_unused(service->data->pending,
585 									MAX_PENDINGS);
586 
587 		if (pending == NULL) {
588 			LOG_WRN("No pending message available for %s", service->name);
589 			goto send;
590 		}
591 
592 		ret = coap_pending_init(pending, cpkt, addr, params);
593 		if (ret < 0) {
594 			LOG_WRN("Failed to init pending message for %s (%d)", service->name, ret);
595 			goto send;
596 		}
597 
598 		/* Replace tracked data with our allocated copy */
599 		pending->data = coap_server_alloc(pending->len);
600 		if (pending->data == NULL) {
601 			LOG_WRN("Failed to allocate pending message data for %s", service->name);
602 			coap_pending_clear(pending);
603 			goto send;
604 		}
605 		memcpy(pending->data, cpkt->data, pending->len);
606 
607 		coap_pending_cycle(pending);
608 
609 		/* Trigger event in receive loop to schedule retransmit */
610 		coap_server_update_services();
611 	}
612 
613 send:
614 	(void)k_mutex_unlock(&lock);
615 
616 	ret = zsock_sendto(service->data->sock_fd, cpkt->data, cpkt->offset, 0, addr, addr_len);
617 	if (ret < 0) {
618 		LOG_ERR("Failed to send CoAP message (%d)", ret);
619 		return ret;
620 	}
621 	__ASSERT_NO_MSG(ret == cpkt->offset);
622 
623 	return 0;
624 }
625 
coap_resource_send(const struct coap_resource * resource,const struct coap_packet * cpkt,const struct sockaddr * addr,socklen_t addr_len,const struct coap_transmission_parameters * params)626 int coap_resource_send(const struct coap_resource *resource, const struct coap_packet *cpkt,
627 		       const struct sockaddr *addr, socklen_t addr_len,
628 		       const struct coap_transmission_parameters *params)
629 {
630 	/* Find owning service */
631 	COAP_SERVICE_FOREACH(svc) {
632 		if (COAP_SERVICE_HAS_RESOURCE(svc, resource)) {
633 			return coap_service_send(svc, cpkt, addr, addr_len, params);
634 		}
635 	}
636 
637 	return -ENOENT;
638 }
639 
coap_resource_parse_observe(struct coap_resource * resource,const struct coap_packet * request,const struct sockaddr * addr)640 int coap_resource_parse_observe(struct coap_resource *resource, const struct coap_packet *request,
641 				const struct sockaddr *addr)
642 {
643 	const struct coap_service *service = NULL;
644 	int ret;
645 	uint8_t token[COAP_TOKEN_MAX_LEN];
646 	uint8_t tkl;
647 
648 	if (!coap_packet_is_request(request)) {
649 		return -EINVAL;
650 	}
651 
652 	ret = coap_get_option_int(request, COAP_OPTION_OBSERVE);
653 	if (ret < 0) {
654 		return ret;
655 	}
656 
657 	/* Find owning service */
658 	COAP_SERVICE_FOREACH(svc) {
659 		if (COAP_SERVICE_HAS_RESOURCE(svc, resource)) {
660 			service = svc;
661 			break;
662 		}
663 	}
664 
665 	if (service == NULL) {
666 		return -ENOENT;
667 	}
668 
669 	tkl = coap_header_get_token(request, token);
670 	if (tkl == 0) {
671 		return -EINVAL;
672 	}
673 
674 	(void)k_mutex_lock(&lock, K_FOREVER);
675 
676 	if (ret == 0) {
677 		struct coap_observer *observer;
678 
679 		/* RFC7641 section 4.1 - Check if the current observer already exists */
680 		observer = coap_find_observer(service->data->observers, MAX_OBSERVERS, addr, token,
681 					      tkl);
682 		if (observer != NULL) {
683 			/* Client refresh */
684 			goto unlock;
685 		}
686 
687 		/* New client */
688 		observer = coap_observer_next_unused(service->data->observers, MAX_OBSERVERS);
689 		if (observer == NULL) {
690 			ret = -ENOMEM;
691 			goto unlock;
692 		}
693 
694 		coap_observer_init(observer, request, addr);
695 		coap_register_observer(resource, observer);
696 	} else if (ret == 1) {
697 		ret = coap_service_remove_observer(service, resource, addr, token, tkl);
698 		if (ret < 0) {
699 			LOG_WRN("Failed to remove observer (%d)", ret);
700 		}
701 	}
702 
703 unlock:
704 	(void)k_mutex_unlock(&lock);
705 
706 	return ret;
707 }
708 
coap_resource_remove_observer(struct coap_resource * resource,const struct sockaddr * addr,const uint8_t * token,uint8_t token_len)709 static int coap_resource_remove_observer(struct coap_resource *resource,
710 					 const struct sockaddr *addr,
711 					 const uint8_t *token, uint8_t token_len)
712 {
713 	const struct coap_service *service = NULL;
714 	int ret;
715 
716 	/* Find owning service */
717 	COAP_SERVICE_FOREACH(svc) {
718 		if (COAP_SERVICE_HAS_RESOURCE(svc, resource)) {
719 			service = svc;
720 			break;
721 		}
722 	}
723 
724 	if (service == NULL) {
725 		return -ENOENT;
726 	}
727 
728 	(void)k_mutex_lock(&lock, K_FOREVER);
729 	ret = coap_service_remove_observer(service, resource, addr, token, token_len);
730 	(void)k_mutex_unlock(&lock);
731 
732 	if (ret == 1) {
733 		/* An observer was found and removed */
734 		return 0;
735 	} else if (ret == 0) {
736 		/* No matching observer found */
737 		return -ENOENT;
738 	}
739 
740 	/* An error occurred */
741 	return ret;
742 }
743 
coap_resource_remove_observer_by_addr(struct coap_resource * resource,const struct sockaddr * addr)744 int coap_resource_remove_observer_by_addr(struct coap_resource *resource,
745 					  const struct sockaddr *addr)
746 {
747 	return coap_resource_remove_observer(resource, addr, NULL, 0);
748 }
749 
coap_resource_remove_observer_by_token(struct coap_resource * resource,const uint8_t * token,uint8_t token_len)750 int coap_resource_remove_observer_by_token(struct coap_resource *resource,
751 					   const uint8_t *token, uint8_t token_len)
752 {
753 	return coap_resource_remove_observer(resource, NULL, token, token_len);
754 }
755 
coap_server_thread(void * p1,void * p2,void * p3)756 static void coap_server_thread(void *p1, void *p2, void *p3)
757 {
758 	struct zsock_pollfd sock_fds[MAX_POLL_FD];
759 	int sock_nfds;
760 	int ret;
761 
762 	ARG_UNUSED(p1);
763 	ARG_UNUSED(p2);
764 	ARG_UNUSED(p3);
765 
766 	/* Create a socket pair to wake zsock_poll */
767 	ret = zsock_socketpair(AF_UNIX, SOCK_STREAM, 0, control_socks);
768 	if (ret < 0) {
769 		LOG_ERR("Failed to create socket pair (%d)", ret);
770 		return;
771 	}
772 
773 	for (int i = 0; i < 2; ++i) {
774 		ret = zsock_fcntl(control_socks[i], F_SETFL, O_NONBLOCK);
775 
776 		if (ret < 0) {
777 			zsock_close(control_socks[0]);
778 			zsock_close(control_socks[1]);
779 
780 			LOG_ERR("Failed to set socket pair [%d] non-blocking (%d)", i, ret);
781 			return;
782 		}
783 	}
784 
785 	COAP_SERVICE_FOREACH(svc) {
786 		if (svc->flags & COAP_SERVICE_AUTOSTART) {
787 			ret = coap_service_start(svc);
788 			if (ret < 0) {
789 				LOG_ERR("Failed to autostart service %s (%d)", svc->name, ret);
790 			}
791 		}
792 	}
793 
794 	while (true) {
795 		sock_nfds = 0;
796 		COAP_SERVICE_FOREACH(svc) {
797 			if (svc->data->sock_fd < 0) {
798 				continue;
799 			}
800 			if (sock_nfds >= MAX_POLL_FD) {
801 				LOG_ERR("Maximum active CoAP services reached (%d), "
802 					"increase CONFIG_ZVFS_POLL_MAX to support more.",
803 					MAX_POLL_FD);
804 				break;
805 			}
806 
807 			sock_fds[sock_nfds].fd = svc->data->sock_fd;
808 			sock_fds[sock_nfds].events = ZSOCK_POLLIN;
809 			sock_fds[sock_nfds].revents = 0;
810 			sock_nfds++;
811 		}
812 
813 		/* Add socket pair FD to allow wake up */
814 		if (sock_nfds < MAX_POLL_FD) {
815 			sock_fds[sock_nfds].fd = control_socks[0];
816 			sock_fds[sock_nfds].events = ZSOCK_POLLIN;
817 			sock_fds[sock_nfds].revents = 0;
818 			sock_nfds++;
819 		}
820 
821 		__ASSERT_NO_MSG(sock_nfds > 0);
822 
823 		ret = zsock_poll(sock_fds, sock_nfds, coap_server_poll_timeout());
824 		if (ret < 0) {
825 			LOG_ERR("Poll error (%d)", -errno);
826 			k_msleep(10);
827 		}
828 
829 		for (int i = 0; i < sock_nfds; ++i) {
830 			/* Check the wake up event */
831 			if (sock_fds[i].fd == control_socks[0] &&
832 			    sock_fds[i].revents & ZSOCK_POLLIN) {
833 				char tmp;
834 
835 				zsock_recv(sock_fds[i].fd, &tmp, 1, 0);
836 				continue;
837 			}
838 
839 			/* Check if socket can receive/was closed first */
840 			if (sock_fds[i].revents & ZSOCK_POLLIN) {
841 				coap_server_process(sock_fds[i].fd);
842 				continue;
843 			}
844 
845 			if (sock_fds[i].revents & ZSOCK_POLLERR) {
846 				LOG_ERR("Poll error on %d", sock_fds[i].fd);
847 			}
848 			if (sock_fds[i].revents & ZSOCK_POLLHUP) {
849 				LOG_ERR("Poll hup on %d", sock_fds[i].fd);
850 			}
851 			if (sock_fds[i].revents & ZSOCK_POLLNVAL) {
852 				LOG_ERR("Poll invalid on %d", sock_fds[i].fd);
853 			}
854 		}
855 
856 		/* Process retransmits */
857 		coap_server_retransmit();
858 	}
859 }
860 
861 K_THREAD_DEFINE(coap_server_id, CONFIG_COAP_SERVER_STACK_SIZE,
862 		coap_server_thread, NULL, NULL, NULL,
863 		THREAD_PRIORITY, 0, 0);
864