1 /*
2  * Copyright (c) 2023 Basalte bv
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <string.h>
8 #include <zephyr/logging/log.h>
9 LOG_MODULE_DECLARE(net_coap, CONFIG_COAP_LOG_LEVEL);
10 
11 #include <zephyr/net/socket.h>
12 
13 #include <zephyr/init.h>
14 #include <zephyr/kernel.h>
15 #include <zephyr/net/coap.h>
16 #include <zephyr/net/coap_link_format.h>
17 #include <zephyr/net/coap_mgmt.h>
18 #include <zephyr/net/coap_service.h>
19 #include <zephyr/posix/fcntl.h>
20 
21 #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
22 /* Lowest priority cooperative thread */
23 #define THREAD_PRIORITY K_PRIO_COOP(CONFIG_NUM_COOP_PRIORITIES - 1)
24 #else
25 #define THREAD_PRIORITY K_PRIO_PREEMPT(CONFIG_NUM_PREEMPT_PRIORITIES - 1)
26 #endif
27 
28 #define ADDRLEN(sock) \
29 	(((struct sockaddr *)sock)->sa_family == AF_INET ? \
30 		sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))
31 
32 /* Shortened defines */
33 #define MAX_OPTIONS    CONFIG_COAP_SERVER_MESSAGE_OPTIONS
34 #define MAX_PENDINGS   CONFIG_COAP_SERVICE_PENDING_MESSAGES
35 #define MAX_OBSERVERS  CONFIG_COAP_SERVICE_OBSERVERS
36 #define MAX_POLL_FD    CONFIG_ZVFS_POLL_MAX
37 
38 BUILD_ASSERT(CONFIG_ZVFS_POLL_MAX > 0, "CONFIG_ZVFS_POLL_MAX can't be 0");
39 
40 static K_MUTEX_DEFINE(lock);
41 static int control_socks[2];
42 
43 #if defined(CONFIG_COAP_SERVER_PENDING_ALLOCATOR_STATIC)
44 K_MEM_SLAB_DEFINE_STATIC(pending_data, CONFIG_COAP_SERVER_MESSAGE_SIZE,
45 			 CONFIG_COAP_SERVER_PENDING_ALLOCATOR_STATIC_BLOCKS, 4);
46 #endif
47 
coap_server_alloc(size_t len)48 static inline void *coap_server_alloc(size_t len)
49 {
50 #if defined(CONFIG_COAP_SERVER_PENDING_ALLOCATOR_STATIC)
51 	void *ptr;
52 	int ret;
53 
54 	if (len > CONFIG_COAP_SERVER_MESSAGE_SIZE) {
55 		return NULL;
56 	}
57 
58 	ret = k_mem_slab_alloc(&pending_data, &ptr, K_NO_WAIT);
59 	if (ret < 0) {
60 		return NULL;
61 	}
62 
63 	return ptr;
64 #elif defined(CONFIG_COAP_SERVER_PENDING_ALLOCATOR_SYSTEM_HEAP)
65 	return k_malloc(len);
66 #else
67 	ARG_UNUSED(len);
68 
69 	return NULL;
70 #endif
71 }
72 
coap_server_free(void * ptr)73 static inline void coap_server_free(void *ptr)
74 {
75 #if defined(CONFIG_COAP_SERVER_PENDING_ALLOCATOR_STATIC)
76 	k_mem_slab_free(&pending_data, ptr);
77 #elif defined(CONFIG_COAP_SERVER_PENDING_ALLOCATOR_SYSTEM_HEAP)
78 	k_free(ptr);
79 #else
80 	ARG_UNUSED(ptr);
81 #endif
82 }
83 
coap_service_remove_observer(const struct coap_service * service,struct coap_resource * resource,const struct sockaddr * addr,const uint8_t * token,uint8_t tkl)84 static int coap_service_remove_observer(const struct coap_service *service,
85 					struct coap_resource *resource,
86 					const struct sockaddr *addr,
87 					const uint8_t *token, uint8_t tkl)
88 {
89 	struct coap_observer *obs;
90 
91 	if (tkl > 0 && addr != NULL) {
92 		/* Prefer addr+token to find the observer */
93 		obs = coap_find_observer(service->data->observers, MAX_OBSERVERS, addr, token, tkl);
94 	} else if (tkl > 0) {
95 		/* Then try to find the observer by token */
96 		obs = coap_find_observer_by_token(service->data->observers, MAX_OBSERVERS, token,
97 						  tkl);
98 	} else if (addr != NULL) {
99 		obs = coap_find_observer_by_addr(service->data->observers, MAX_OBSERVERS, addr);
100 	} else {
101 		/* Either a token or an address is required */
102 		return -EINVAL;
103 	}
104 
105 	if (obs == NULL) {
106 		return 0;
107 	}
108 
109 	if (resource == NULL) {
110 		COAP_SERVICE_FOREACH_RESOURCE(service, it) {
111 			if (coap_remove_observer(it, obs)) {
112 				memset(obs, 0, sizeof(*obs));
113 				return 1;
114 			}
115 		}
116 	} else if (coap_remove_observer(resource, obs)) {
117 		memset(obs, 0, sizeof(*obs));
118 		return 1;
119 	}
120 
121 	return 0;
122 }
123 
coap_server_process(int sock_fd)124 static int coap_server_process(int sock_fd)
125 {
126 	static uint8_t buf[CONFIG_COAP_SERVER_MESSAGE_SIZE];
127 
128 	struct sockaddr client_addr;
129 	socklen_t client_addr_len = sizeof(client_addr);
130 	struct coap_service *service = NULL;
131 	struct coap_packet request;
132 	struct coap_pending *pending;
133 	struct coap_option options[MAX_OPTIONS] = { 0 };
134 	uint8_t opt_num = MAX_OPTIONS;
135 	uint8_t type;
136 	ssize_t received;
137 	int ret;
138 
139 	received = zsock_recvfrom(sock_fd, buf, sizeof(buf), ZSOCK_MSG_DONTWAIT, &client_addr,
140 				  &client_addr_len);
141 	__ASSERT_NO_MSG(received <= sizeof(buf));
142 
143 	if (received < 0) {
144 		if (errno == EWOULDBLOCK) {
145 			return 0;
146 		}
147 
148 		LOG_ERR("Failed to process client request (%d)", -errno);
149 		return -errno;
150 	}
151 
152 	ret = coap_packet_parse(&request, buf, received, options, opt_num);
153 	if (ret < 0) {
154 		LOG_ERR("Failed To parse coap message (%d)", ret);
155 		return ret;
156 	}
157 
158 	(void)k_mutex_lock(&lock, K_FOREVER);
159 	/* Find the active service */
160 	COAP_SERVICE_FOREACH(svc) {
161 		if (svc->data->sock_fd == sock_fd) {
162 			service = svc;
163 			break;
164 		}
165 	}
166 	if (service == NULL) {
167 		ret = -ENOENT;
168 		goto unlock;
169 	}
170 
171 	type = coap_header_get_type(&request);
172 
173 	pending = coap_pending_received(&request, service->data->pending, MAX_PENDINGS);
174 	if (pending) {
175 		uint8_t token[COAP_TOKEN_MAX_LEN];
176 		uint8_t tkl;
177 
178 		switch (type) {
179 		case COAP_TYPE_RESET:
180 			tkl = coap_header_get_token(&request, token);
181 			coap_service_remove_observer(service, NULL, &client_addr, token, tkl);
182 			__fallthrough;
183 		case COAP_TYPE_ACK:
184 			coap_server_free(pending->data);
185 			coap_pending_clear(pending);
186 			break;
187 		default:
188 			LOG_WRN("Unexpected pending type %d", type);
189 			ret = -EINVAL;
190 			goto unlock;
191 		}
192 
193 		goto unlock;
194 	} else if (type == COAP_TYPE_ACK || type == COAP_TYPE_RESET) {
195 		LOG_WRN("Unexpected type %d without pending packet", type);
196 		ret = -EINVAL;
197 		goto unlock;
198 	}
199 
200 	if (IS_ENABLED(CONFIG_COAP_SERVER_WELL_KNOWN_CORE) &&
201 	    coap_header_get_code(&request) == COAP_METHOD_GET &&
202 	    coap_uri_path_match(COAP_WELL_KNOWN_CORE_PATH, options, opt_num)) {
203 		uint8_t well_known_buf[CONFIG_COAP_SERVER_MESSAGE_SIZE];
204 		struct coap_packet response;
205 
206 		ret = coap_well_known_core_get_len(service->res_begin,
207 						   COAP_SERVICE_RESOURCE_COUNT(service),
208 						   &request, &response,
209 						   well_known_buf, sizeof(well_known_buf));
210 		if (ret < 0) {
211 			LOG_ERR("Failed to build well known core for %s (%d)", service->name, ret);
212 			goto unlock;
213 		}
214 
215 		ret = coap_service_send(service, &response, &client_addr, client_addr_len, NULL);
216 	} else {
217 		ret = coap_handle_request_len(&request, service->res_begin,
218 					      COAP_SERVICE_RESOURCE_COUNT(service),
219 					      options, opt_num, &client_addr, client_addr_len);
220 
221 		/* Translate errors to response codes */
222 		switch (ret) {
223 		case -ENOENT:
224 			ret = COAP_RESPONSE_CODE_NOT_FOUND;
225 			break;
226 		case -ENOTSUP:
227 			ret = COAP_RESPONSE_CODE_BAD_REQUEST;
228 			break;
229 		case -EPERM:
230 			ret = COAP_RESPONSE_CODE_NOT_ALLOWED;
231 			break;
232 		}
233 
234 		/* Shortcut for replying a code without a body */
235 		if (ret > 0 && type == COAP_TYPE_CON) {
236 			/* Minimal sized ack buffer */
237 			uint8_t ack_buf[COAP_TOKEN_MAX_LEN + 4U];
238 			struct coap_packet ack;
239 
240 			ret = coap_ack_init(&ack, &request, ack_buf, sizeof(ack_buf), (uint8_t)ret);
241 			if (ret < 0) {
242 				LOG_ERR("Failed to init ACK (%d)", ret);
243 				goto unlock;
244 			}
245 
246 			ret = coap_service_send(service, &ack, &client_addr, client_addr_len, NULL);
247 		}
248 	}
249 
250 unlock:
251 	(void)k_mutex_unlock(&lock);
252 
253 	return ret;
254 }
255 
coap_server_retransmit(void)256 static void coap_server_retransmit(void)
257 {
258 	struct coap_pending *pending;
259 	int64_t remaining;
260 	int64_t now = k_uptime_get();
261 	int ret;
262 
263 	(void)k_mutex_lock(&lock, K_FOREVER);
264 
265 	COAP_SERVICE_FOREACH(service) {
266 		if (service->data->sock_fd < 0) {
267 			continue;
268 		}
269 
270 		pending = coap_pending_next_to_expire(service->data->pending, MAX_PENDINGS);
271 		if (pending == NULL) {
272 			/* No work to be done */
273 			continue;
274 		}
275 
276 		/* Check if the pending request has expired */
277 		remaining = pending->t0 + pending->timeout - now;
278 		if (remaining > 0) {
279 			continue;
280 		}
281 
282 		if (coap_pending_cycle(pending)) {
283 			ret = zsock_sendto(service->data->sock_fd, pending->data, pending->len, 0,
284 					   &pending->addr, ADDRLEN(&pending->addr));
285 			if (ret < 0) {
286 				LOG_ERR("Failed to send pending retransmission for %s (%d)",
287 					service->name, ret);
288 			}
289 			__ASSERT_NO_MSG(ret == pending->len);
290 		} else {
291 			LOG_WRN("Packet retransmission failed for %s", service->name);
292 
293 			coap_service_remove_observer(service, NULL, &pending->addr, NULL, 0U);
294 			coap_server_free(pending->data);
295 			coap_pending_clear(pending);
296 		}
297 	}
298 
299 	(void)k_mutex_unlock(&lock);
300 }
301 
coap_server_poll_timeout(void)302 static int coap_server_poll_timeout(void)
303 {
304 	struct coap_pending *pending;
305 	int64_t result = INT64_MAX;
306 	int64_t remaining;
307 	int64_t now = k_uptime_get();
308 
309 	COAP_SERVICE_FOREACH(svc) {
310 		if (svc->data->sock_fd < -1) {
311 			continue;
312 		}
313 
314 		pending = coap_pending_next_to_expire(svc->data->pending, MAX_PENDINGS);
315 		if (pending == NULL) {
316 			continue;
317 		}
318 
319 		remaining = pending->t0 + pending->timeout - now;
320 		if (result > remaining) {
321 			result = remaining;
322 		}
323 	}
324 
325 	if (result == INT64_MAX) {
326 		return -1;
327 	}
328 
329 	return MAX(result, 0);
330 }
331 
coap_server_update_services(void)332 static void coap_server_update_services(void)
333 {
334 	if (zsock_send(control_socks[1], &(char){0}, 1, 0) < 0) {
335 		LOG_ERR("Failed to notify server thread (%d)", errno);
336 	}
337 }
338 
coap_service_in_section(const struct coap_service * service)339 static inline bool coap_service_in_section(const struct coap_service *service)
340 {
341 	STRUCT_SECTION_START_EXTERN(coap_service);
342 	STRUCT_SECTION_END_EXTERN(coap_service);
343 
344 	return STRUCT_SECTION_START(coap_service) <= service &&
345 	       STRUCT_SECTION_END(coap_service) > service;
346 }
347 
coap_service_raise_event(const struct coap_service * service,uint32_t mgmt_event)348 static inline void coap_service_raise_event(const struct coap_service *service, uint32_t mgmt_event)
349 {
350 #if defined(CONFIG_NET_MGMT_EVENT_INFO)
351 	const struct net_event_coap_service net_event = {
352 		.service = service,
353 	};
354 
355 	net_mgmt_event_notify_with_info(mgmt_event, NULL, (void *)&net_event, sizeof(net_event));
356 #else
357 	ARG_UNUSED(service);
358 
359 	net_mgmt_event_notify(mgmt_event, NULL);
360 #endif
361 }
362 
coap_service_start(const struct coap_service * service)363 int coap_service_start(const struct coap_service *service)
364 {
365 	int ret;
366 
367 	uint8_t af;
368 	socklen_t len;
369 	struct sockaddr_storage addr_storage;
370 	union {
371 		struct sockaddr *addr;
372 		struct sockaddr_in *addr4;
373 		struct sockaddr_in6 *addr6;
374 	} addr_ptrs = {
375 		.addr = (struct sockaddr *)&addr_storage,
376 	};
377 
378 	if (!coap_service_in_section(service)) {
379 		__ASSERT_NO_MSG(false);
380 		return -EINVAL;
381 	}
382 
383 	k_mutex_lock(&lock, K_FOREVER);
384 
385 	if (service->data->sock_fd >= 0) {
386 		ret = -EALREADY;
387 		goto end;
388 	}
389 
390 	/* set the default address (in6addr_any / INADDR_ANY are all 0) */
391 	addr_storage = (struct sockaddr_storage){0};
392 	if (IS_ENABLED(CONFIG_NET_IPV6) && service->host != NULL &&
393 	    zsock_inet_pton(AF_INET6, service->host, &addr_ptrs.addr6->sin6_addr) == 1) {
394 		/* if a literal IPv6 address is provided as the host, use IPv6 */
395 		af = AF_INET6;
396 		len = sizeof(struct sockaddr_in6);
397 
398 		addr_ptrs.addr6->sin6_family = AF_INET6;
399 		addr_ptrs.addr6->sin6_port = htons(*service->port);
400 	} else if (IS_ENABLED(CONFIG_NET_IPV4) && service->host != NULL &&
401 		   zsock_inet_pton(AF_INET, service->host, &addr_ptrs.addr4->sin_addr) == 1) {
402 		/* if a literal IPv4 address is provided as the host, use IPv4 */
403 		af = AF_INET;
404 		len = sizeof(struct sockaddr_in);
405 
406 		addr_ptrs.addr4->sin_family = AF_INET;
407 		addr_ptrs.addr4->sin_port = htons(*service->port);
408 	} else if (IS_ENABLED(CONFIG_NET_IPV6)) {
409 		/* prefer IPv6 if both IPv6 and IPv4 are supported */
410 		af = AF_INET6;
411 		len = sizeof(struct sockaddr_in6);
412 
413 		addr_ptrs.addr6->sin6_family = AF_INET6;
414 		addr_ptrs.addr6->sin6_port = htons(*service->port);
415 	} else if (IS_ENABLED(CONFIG_NET_IPV4)) {
416 		af = AF_INET;
417 		len = sizeof(struct sockaddr_in);
418 
419 		addr_ptrs.addr4->sin_family = AF_INET;
420 		addr_ptrs.addr4->sin_port = htons(*service->port);
421 	} else {
422 		ret = -ENOTSUP;
423 		goto end;
424 	}
425 
426 	service->data->sock_fd = zsock_socket(af, SOCK_DGRAM, IPPROTO_UDP);
427 	if (service->data->sock_fd < 0) {
428 		ret = -errno;
429 		goto end;
430 	}
431 
432 	ret = zsock_fcntl(service->data->sock_fd, F_SETFL, O_NONBLOCK);
433 	if (ret < 0) {
434 		ret = -errno;
435 		goto close;
436 	}
437 
438 	ret = zsock_bind(service->data->sock_fd, addr_ptrs.addr, len);
439 	if (ret < 0) {
440 		ret = -errno;
441 		goto close;
442 	}
443 
444 	if (*service->port == 0) {
445 		/* ephemeral port - read back the port number */
446 		len = sizeof(addr_storage);
447 		ret = zsock_getsockname(service->data->sock_fd, addr_ptrs.addr, &len);
448 		if (ret < 0) {
449 			goto close;
450 		}
451 
452 		if (af == AF_INET6) {
453 			*service->port = addr_ptrs.addr6->sin6_port;
454 		} else {
455 			*service->port = addr_ptrs.addr4->sin_port;
456 		}
457 	}
458 
459 end:
460 	k_mutex_unlock(&lock);
461 
462 	coap_server_update_services();
463 
464 	coap_service_raise_event(service, NET_EVENT_COAP_SERVICE_STARTED);
465 
466 	return ret;
467 
468 close:
469 	(void)zsock_close(service->data->sock_fd);
470 	service->data->sock_fd = -1;
471 
472 	k_mutex_unlock(&lock);
473 
474 	return ret;
475 }
476 
coap_service_stop(const struct coap_service * service)477 int coap_service_stop(const struct coap_service *service)
478 {
479 	int ret;
480 
481 	if (!coap_service_in_section(service)) {
482 		__ASSERT_NO_MSG(false);
483 		return -EINVAL;
484 	}
485 
486 	k_mutex_lock(&lock, K_FOREVER);
487 
488 	if (service->data->sock_fd < 0) {
489 		k_mutex_unlock(&lock);
490 		return -EALREADY;
491 	}
492 
493 	/* Closing a socket will trigger a poll event */
494 	ret = zsock_close(service->data->sock_fd);
495 	service->data->sock_fd = -1;
496 
497 	k_mutex_unlock(&lock);
498 
499 	coap_service_raise_event(service, NET_EVENT_COAP_SERVICE_STOPPED);
500 
501 	return ret;
502 }
503 
coap_service_is_running(const struct coap_service * service)504 int coap_service_is_running(const struct coap_service *service)
505 {
506 	int ret;
507 
508 	if (!coap_service_in_section(service)) {
509 		__ASSERT_NO_MSG(false);
510 		return -EINVAL;
511 	}
512 
513 	k_mutex_lock(&lock, K_FOREVER);
514 
515 	ret = (service->data->sock_fd < 0) ? 0 : 1;
516 
517 	k_mutex_unlock(&lock);
518 
519 	return ret;
520 }
521 
coap_service_send(const struct coap_service * service,const struct coap_packet * cpkt,const struct sockaddr * addr,socklen_t addr_len,const struct coap_transmission_parameters * params)522 int coap_service_send(const struct coap_service *service, const struct coap_packet *cpkt,
523 		      const struct sockaddr *addr, socklen_t addr_len,
524 		      const struct coap_transmission_parameters *params)
525 {
526 	int ret;
527 
528 	if (!coap_service_in_section(service)) {
529 		__ASSERT_NO_MSG(false);
530 		return -EINVAL;
531 	}
532 
533 	(void)k_mutex_lock(&lock, K_FOREVER);
534 
535 	if (service->data->sock_fd < 0) {
536 		(void)k_mutex_unlock(&lock);
537 		return -EBADF;
538 	}
539 
540 	/*
541 	 * Check if we should start with retransmits, if creating a pending message fails we still
542 	 * try to send.
543 	 */
544 	if (coap_header_get_type(cpkt) == COAP_TYPE_CON) {
545 		struct coap_pending *pending = coap_pending_next_unused(service->data->pending,
546 									MAX_PENDINGS);
547 
548 		if (pending == NULL) {
549 			LOG_WRN("No pending message available for %s", service->name);
550 			goto send;
551 		}
552 
553 		ret = coap_pending_init(pending, cpkt, addr, params);
554 		if (ret < 0) {
555 			LOG_WRN("Failed to init pending message for %s (%d)", service->name, ret);
556 			goto send;
557 		}
558 
559 		/* Replace tracked data with our allocated copy */
560 		pending->data = coap_server_alloc(pending->len);
561 		if (pending->data == NULL) {
562 			LOG_WRN("Failed to allocate pending message data for %s", service->name);
563 			coap_pending_clear(pending);
564 			goto send;
565 		}
566 		memcpy(pending->data, cpkt->data, pending->len);
567 
568 		coap_pending_cycle(pending);
569 
570 		/* Trigger event in receive loop to schedule retransmit */
571 		coap_server_update_services();
572 	}
573 
574 send:
575 	(void)k_mutex_unlock(&lock);
576 
577 	ret = zsock_sendto(service->data->sock_fd, cpkt->data, cpkt->offset, 0, addr, addr_len);
578 	if (ret < 0) {
579 		LOG_ERR("Failed to send CoAP message (%d)", ret);
580 		return ret;
581 	}
582 	__ASSERT_NO_MSG(ret == cpkt->offset);
583 
584 	return 0;
585 }
586 
coap_resource_send(const struct coap_resource * resource,const struct coap_packet * cpkt,const struct sockaddr * addr,socklen_t addr_len,const struct coap_transmission_parameters * params)587 int coap_resource_send(const struct coap_resource *resource, const struct coap_packet *cpkt,
588 		       const struct sockaddr *addr, socklen_t addr_len,
589 		       const struct coap_transmission_parameters *params)
590 {
591 	/* Find owning service */
592 	COAP_SERVICE_FOREACH(svc) {
593 		if (COAP_SERVICE_HAS_RESOURCE(svc, resource)) {
594 			return coap_service_send(svc, cpkt, addr, addr_len, params);
595 		}
596 	}
597 
598 	return -ENOENT;
599 }
600 
coap_resource_parse_observe(struct coap_resource * resource,const struct coap_packet * request,const struct sockaddr * addr)601 int coap_resource_parse_observe(struct coap_resource *resource, const struct coap_packet *request,
602 				const struct sockaddr *addr)
603 {
604 	const struct coap_service *service = NULL;
605 	int ret;
606 	uint8_t token[COAP_TOKEN_MAX_LEN];
607 	uint8_t tkl;
608 
609 	if (!coap_packet_is_request(request)) {
610 		return -EINVAL;
611 	}
612 
613 	ret = coap_get_option_int(request, COAP_OPTION_OBSERVE);
614 	if (ret < 0) {
615 		return ret;
616 	}
617 
618 	/* Find owning service */
619 	COAP_SERVICE_FOREACH(svc) {
620 		if (COAP_SERVICE_HAS_RESOURCE(svc, resource)) {
621 			service = svc;
622 			break;
623 		}
624 	}
625 
626 	if (service == NULL) {
627 		return -ENOENT;
628 	}
629 
630 	tkl = coap_header_get_token(request, token);
631 	if (tkl == 0) {
632 		return -EINVAL;
633 	}
634 
635 	(void)k_mutex_lock(&lock, K_FOREVER);
636 
637 	if (ret == 0) {
638 		struct coap_observer *observer;
639 
640 		/* RFC7641 section 4.1 - Check if the current observer already exists */
641 		observer = coap_find_observer(service->data->observers, MAX_OBSERVERS, addr, token,
642 					      tkl);
643 		if (observer != NULL) {
644 			/* Client refresh */
645 			goto unlock;
646 		}
647 
648 		/* New client */
649 		observer = coap_observer_next_unused(service->data->observers, MAX_OBSERVERS);
650 		if (observer == NULL) {
651 			ret = -ENOMEM;
652 			goto unlock;
653 		}
654 
655 		coap_observer_init(observer, request, addr);
656 		coap_register_observer(resource, observer);
657 	} else if (ret == 1) {
658 		ret = coap_service_remove_observer(service, resource, addr, token, tkl);
659 		if (ret < 0) {
660 			LOG_WRN("Failed to remove observer (%d)", ret);
661 		}
662 	}
663 
664 unlock:
665 	(void)k_mutex_unlock(&lock);
666 
667 	return ret;
668 }
669 
coap_resource_remove_observer(struct coap_resource * resource,const struct sockaddr * addr,const uint8_t * token,uint8_t token_len)670 static int coap_resource_remove_observer(struct coap_resource *resource,
671 					 const struct sockaddr *addr,
672 					 const uint8_t *token, uint8_t token_len)
673 {
674 	const struct coap_service *service = NULL;
675 	int ret;
676 
677 	/* Find owning service */
678 	COAP_SERVICE_FOREACH(svc) {
679 		if (COAP_SERVICE_HAS_RESOURCE(svc, resource)) {
680 			service = svc;
681 			break;
682 		}
683 	}
684 
685 	if (service == NULL) {
686 		return -ENOENT;
687 	}
688 
689 	(void)k_mutex_lock(&lock, K_FOREVER);
690 	ret = coap_service_remove_observer(service, resource, addr, token, token_len);
691 	(void)k_mutex_unlock(&lock);
692 
693 	if (ret == 1) {
694 		/* An observer was found and removed */
695 		return 0;
696 	} else if (ret == 0) {
697 		/* No matching observer found */
698 		return -ENOENT;
699 	}
700 
701 	/* An error occurred */
702 	return ret;
703 }
704 
coap_resource_remove_observer_by_addr(struct coap_resource * resource,const struct sockaddr * addr)705 int coap_resource_remove_observer_by_addr(struct coap_resource *resource,
706 					  const struct sockaddr *addr)
707 {
708 	return coap_resource_remove_observer(resource, addr, NULL, 0);
709 }
710 
coap_resource_remove_observer_by_token(struct coap_resource * resource,const uint8_t * token,uint8_t token_len)711 int coap_resource_remove_observer_by_token(struct coap_resource *resource,
712 					   const uint8_t *token, uint8_t token_len)
713 {
714 	return coap_resource_remove_observer(resource, NULL, token, token_len);
715 }
716 
coap_server_thread(void * p1,void * p2,void * p3)717 static void coap_server_thread(void *p1, void *p2, void *p3)
718 {
719 	struct zsock_pollfd sock_fds[MAX_POLL_FD];
720 	int sock_nfds;
721 	int ret;
722 
723 	ARG_UNUSED(p1);
724 	ARG_UNUSED(p2);
725 	ARG_UNUSED(p3);
726 
727 	/* Create a socket pair to wake zsock_poll */
728 	ret = zsock_socketpair(AF_UNIX, SOCK_STREAM, 0, control_socks);
729 	if (ret < 0) {
730 		LOG_ERR("Failed to create socket pair (%d)", ret);
731 		return;
732 	}
733 
734 	for (int i = 0; i < 2; ++i) {
735 		ret = zsock_fcntl(control_socks[i], F_SETFL, O_NONBLOCK);
736 
737 		if (ret < 0) {
738 			zsock_close(control_socks[0]);
739 			zsock_close(control_socks[1]);
740 
741 			LOG_ERR("Failed to set socket pair [%d] non-blocking (%d)", i, ret);
742 			return;
743 		}
744 	}
745 
746 	COAP_SERVICE_FOREACH(svc) {
747 		if (svc->flags & COAP_SERVICE_AUTOSTART) {
748 			ret = coap_service_start(svc);
749 			if (ret < 0) {
750 				LOG_ERR("Failed to autostart service %s (%d)", svc->name, ret);
751 			}
752 		}
753 	}
754 
755 	while (true) {
756 		sock_nfds = 0;
757 		COAP_SERVICE_FOREACH(svc) {
758 			if (svc->data->sock_fd < 0) {
759 				continue;
760 			}
761 			if (sock_nfds >= MAX_POLL_FD) {
762 				LOG_ERR("Maximum active CoAP services reached (%d), "
763 					"increase CONFIG_ZVFS_POLL_MAX to support more.",
764 					MAX_POLL_FD);
765 				break;
766 			}
767 
768 			sock_fds[sock_nfds].fd = svc->data->sock_fd;
769 			sock_fds[sock_nfds].events = ZSOCK_POLLIN;
770 			sock_fds[sock_nfds].revents = 0;
771 			sock_nfds++;
772 		}
773 
774 		/* Add socket pair FD to allow wake up */
775 		if (sock_nfds < MAX_POLL_FD) {
776 			sock_fds[sock_nfds].fd = control_socks[0];
777 			sock_fds[sock_nfds].events = ZSOCK_POLLIN;
778 			sock_fds[sock_nfds].revents = 0;
779 			sock_nfds++;
780 		}
781 
782 		__ASSERT_NO_MSG(sock_nfds > 0);
783 
784 		ret = zsock_poll(sock_fds, sock_nfds, coap_server_poll_timeout());
785 		if (ret < 0) {
786 			LOG_ERR("Poll error (%d)", -errno);
787 			k_msleep(10);
788 		}
789 
790 		for (int i = 0; i < sock_nfds; ++i) {
791 			/* Check the wake up event */
792 			if (sock_fds[i].fd == control_socks[0] &&
793 			    sock_fds[i].revents & ZSOCK_POLLIN) {
794 				char tmp;
795 
796 				zsock_recv(sock_fds[i].fd, &tmp, 1, 0);
797 				continue;
798 			}
799 
800 			/* Check if socket can receive/was closed first */
801 			if (sock_fds[i].revents & ZSOCK_POLLIN) {
802 				coap_server_process(sock_fds[i].fd);
803 				continue;
804 			}
805 
806 			if (sock_fds[i].revents & ZSOCK_POLLERR) {
807 				LOG_ERR("Poll error on %d", sock_fds[i].fd);
808 			}
809 			if (sock_fds[i].revents & ZSOCK_POLLHUP) {
810 				LOG_ERR("Poll hup on %d", sock_fds[i].fd);
811 			}
812 			if (sock_fds[i].revents & ZSOCK_POLLNVAL) {
813 				LOG_ERR("Poll invalid on %d", sock_fds[i].fd);
814 			}
815 		}
816 
817 		/* Process retransmits */
818 		coap_server_retransmit();
819 	}
820 }
821 
822 K_THREAD_DEFINE(coap_server_id, CONFIG_COAP_SERVER_STACK_SIZE,
823 		coap_server_thread, NULL, NULL, NULL,
824 		THREAD_PRIORITY, 0, 0);
825