Lines Matching refs:client
21 static void client_reset(struct mqtt_client *client) in client_reset() argument
23 MQTT_STATE_INIT(client); in client_reset()
25 client->internal.last_activity = 0U; in client_reset()
26 client->internal.rx_buf_datalen = 0U; in client_reset()
27 client->internal.remaining_payload = 0U; in client_reset()
31 static void tx_buf_init(struct mqtt_client *client, struct buf_ctx *buf) in tx_buf_init() argument
33 memset(client->tx_buf, 0, client->tx_buf_size); in tx_buf_init()
34 buf->cur = client->tx_buf; in tx_buf_init()
35 buf->end = client->tx_buf + client->tx_buf_size; in tx_buf_init()
38 void event_notify(struct mqtt_client *client, const struct mqtt_evt *evt) in event_notify() argument
40 if (client->evt_cb != NULL) { in event_notify()
41 mqtt_mutex_unlock(client); in event_notify()
43 client->evt_cb(client, evt); in event_notify()
45 mqtt_mutex_lock(client); in event_notify()
49 static void client_disconnect(struct mqtt_client *client, int result, in client_disconnect() argument
54 err_code = mqtt_transport_disconnect(client); in client_disconnect()
60 client_reset(client); in client_disconnect()
69 event_notify(client, &evt); in client_disconnect()
73 static int client_connect(struct mqtt_client *client) in client_connect() argument
78 err_code = mqtt_transport_connect(client); in client_connect()
83 tx_buf_init(client, &packet); in client_connect()
84 MQTT_SET_STATE(client, MQTT_STATE_TCP_CONNECTED); in client_connect()
86 err_code = connect_request_encode(client, &packet); in client_connect()
92 err_code = mqtt_transport_write(client, packet.cur, in client_connect()
98 client->internal.last_activity = mqtt_sys_tick_in_ms_get(); in client_connect()
101 client->unacked_ping = 0; in client_connect()
108 client_disconnect(client, err_code, false); in client_connect()
112 static int client_read(struct mqtt_client *client) in client_read() argument
116 if (client->internal.remaining_payload > 0) { in client_read()
120 err_code = mqtt_handle_rx(client); in client_read()
122 client_disconnect(client, err_code, true); in client_read()
128 static int client_write(struct mqtt_client *client, const uint8_t *data, in client_write() argument
133 NET_DBG("[%p]: Transport writing %d bytes.", client, datalen); in client_write()
135 err_code = mqtt_transport_write(client, data, datalen); in client_write()
139 client_disconnect(client, err_code, true); in client_write()
143 NET_DBG("[%p]: Transport write complete.", client); in client_write()
144 client->internal.last_activity = mqtt_sys_tick_in_ms_get(); in client_write()
149 static int client_write_msg(struct mqtt_client *client, in client_write_msg() argument
154 NET_DBG("[%p]: Transport writing message.", client); in client_write_msg()
156 err_code = mqtt_transport_write_msg(client, message); in client_write_msg()
160 client_disconnect(client, err_code, true); in client_write_msg()
164 NET_DBG("[%p]: Transport write complete.", client); in client_write_msg()
165 client->internal.last_activity = mqtt_sys_tick_in_ms_get(); in client_write_msg()
170 void mqtt_client_init(struct mqtt_client *client) in mqtt_client_init() argument
172 NULL_PARAM_CHECK_VOID(client); in mqtt_client_init()
174 memset(client, 0, sizeof(*client)); in mqtt_client_init()
176 MQTT_STATE_INIT(client); in mqtt_client_init()
177 mqtt_mutex_init(client); in mqtt_client_init()
179 client->protocol_version = MQTT_VERSION_3_1_1; in mqtt_client_init()
180 client->clean_session = MQTT_CLEAN_SESSION; in mqtt_client_init()
181 client->keepalive = MQTT_KEEPALIVE; in mqtt_client_init()
185 int mqtt_client_set_proxy(struct mqtt_client *client, in mqtt_client_set_proxy() argument
190 if (!client || !proxy_addr) { in mqtt_client_set_proxy()
194 client->transport.proxy.addrlen = addrlen; in mqtt_client_set_proxy()
195 memcpy(&client->transport.proxy.addr, proxy_addr, addrlen); in mqtt_client_set_proxy()
204 int mqtt_connect(struct mqtt_client *client) in mqtt_connect() argument
208 NULL_PARAM_CHECK(client); in mqtt_connect()
209 NULL_PARAM_CHECK(client->client_id.utf8); in mqtt_connect()
211 mqtt_mutex_lock(client); in mqtt_connect()
213 if ((client->tx_buf == NULL) || (client->rx_buf == NULL)) { in mqtt_connect()
218 err_code = client_connect(client); in mqtt_connect()
222 client_reset(client); in mqtt_connect()
225 mqtt_mutex_unlock(client); in mqtt_connect()
230 static int verify_tx_state(const struct mqtt_client *client) in verify_tx_state() argument
232 if (!MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) { in verify_tx_state()
239 int mqtt_publish(struct mqtt_client *client, in mqtt_publish() argument
247 NULL_PARAM_CHECK(client); in mqtt_publish()
251 "Data size 0x%08x", client, client->internal.state, in mqtt_publish()
255 mqtt_mutex_lock(client); in mqtt_publish()
257 tx_buf_init(client, &packet); in mqtt_publish()
259 err_code = verify_tx_state(client); in mqtt_publish()
279 err_code = client_write_msg(client, &msg); in mqtt_publish()
283 client, client->internal.state, err_code); in mqtt_publish()
285 mqtt_mutex_unlock(client); in mqtt_publish()
290 int mqtt_publish_qos1_ack(struct mqtt_client *client, in mqtt_publish_qos1_ack() argument
296 NULL_PARAM_CHECK(client); in mqtt_publish_qos1_ack()
300 client, client->internal.state, param->message_id); in mqtt_publish_qos1_ack()
302 mqtt_mutex_lock(client); in mqtt_publish_qos1_ack()
304 tx_buf_init(client, &packet); in mqtt_publish_qos1_ack()
306 err_code = verify_tx_state(client); in mqtt_publish_qos1_ack()
316 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_publish_qos1_ack()
320 client, client->internal.state, err_code); in mqtt_publish_qos1_ack()
322 mqtt_mutex_unlock(client); in mqtt_publish_qos1_ack()
327 int mqtt_publish_qos2_receive(struct mqtt_client *client, in mqtt_publish_qos2_receive() argument
333 NULL_PARAM_CHECK(client); in mqtt_publish_qos2_receive()
337 client, client->internal.state, param->message_id); in mqtt_publish_qos2_receive()
339 mqtt_mutex_lock(client); in mqtt_publish_qos2_receive()
341 tx_buf_init(client, &packet); in mqtt_publish_qos2_receive()
343 err_code = verify_tx_state(client); in mqtt_publish_qos2_receive()
353 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_publish_qos2_receive()
357 client, client->internal.state, err_code); in mqtt_publish_qos2_receive()
359 mqtt_mutex_unlock(client); in mqtt_publish_qos2_receive()
364 int mqtt_publish_qos2_release(struct mqtt_client *client, in mqtt_publish_qos2_release() argument
370 NULL_PARAM_CHECK(client); in mqtt_publish_qos2_release()
374 client, client->internal.state, param->message_id); in mqtt_publish_qos2_release()
376 mqtt_mutex_lock(client); in mqtt_publish_qos2_release()
378 tx_buf_init(client, &packet); in mqtt_publish_qos2_release()
380 err_code = verify_tx_state(client); in mqtt_publish_qos2_release()
390 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_publish_qos2_release()
394 client, client->internal.state, err_code); in mqtt_publish_qos2_release()
396 mqtt_mutex_unlock(client); in mqtt_publish_qos2_release()
401 int mqtt_publish_qos2_complete(struct mqtt_client *client, in mqtt_publish_qos2_complete() argument
407 NULL_PARAM_CHECK(client); in mqtt_publish_qos2_complete()
411 client, client->internal.state, param->message_id); in mqtt_publish_qos2_complete()
413 mqtt_mutex_lock(client); in mqtt_publish_qos2_complete()
415 tx_buf_init(client, &packet); in mqtt_publish_qos2_complete()
417 err_code = verify_tx_state(client); in mqtt_publish_qos2_complete()
427 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_publish_qos2_complete()
434 client, client->internal.state, err_code); in mqtt_publish_qos2_complete()
436 mqtt_mutex_unlock(client); in mqtt_publish_qos2_complete()
441 int mqtt_disconnect(struct mqtt_client *client) in mqtt_disconnect() argument
446 NULL_PARAM_CHECK(client); in mqtt_disconnect()
448 mqtt_mutex_lock(client); in mqtt_disconnect()
450 tx_buf_init(client, &packet); in mqtt_disconnect()
452 err_code = verify_tx_state(client); in mqtt_disconnect()
462 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_disconnect()
467 client_disconnect(client, 0, true); in mqtt_disconnect()
470 mqtt_mutex_unlock(client); in mqtt_disconnect()
475 int mqtt_subscribe(struct mqtt_client *client, in mqtt_subscribe() argument
481 NULL_PARAM_CHECK(client); in mqtt_subscribe()
485 "topic count 0x%04x", client, client->internal.state, in mqtt_subscribe()
488 mqtt_mutex_lock(client); in mqtt_subscribe()
490 tx_buf_init(client, &packet); in mqtt_subscribe()
492 err_code = verify_tx_state(client); in mqtt_subscribe()
502 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_subscribe()
506 client, client->internal.state, err_code); in mqtt_subscribe()
508 mqtt_mutex_unlock(client); in mqtt_subscribe()
513 int mqtt_unsubscribe(struct mqtt_client *client, in mqtt_unsubscribe() argument
519 NULL_PARAM_CHECK(client); in mqtt_unsubscribe()
522 mqtt_mutex_lock(client); in mqtt_unsubscribe()
524 tx_buf_init(client, &packet); in mqtt_unsubscribe()
526 err_code = verify_tx_state(client); in mqtt_unsubscribe()
536 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_unsubscribe()
539 mqtt_mutex_unlock(client); in mqtt_unsubscribe()
544 int mqtt_ping(struct mqtt_client *client) in mqtt_ping() argument
549 NULL_PARAM_CHECK(client); in mqtt_ping()
551 mqtt_mutex_lock(client); in mqtt_ping()
553 tx_buf_init(client, &packet); in mqtt_ping()
555 err_code = verify_tx_state(client); in mqtt_ping()
565 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_ping()
567 if (client->unacked_ping >= INT8_MAX) { in mqtt_ping()
570 client->unacked_ping++; in mqtt_ping()
574 mqtt_mutex_unlock(client); in mqtt_ping()
579 int mqtt_abort(struct mqtt_client *client) in mqtt_abort() argument
581 NULL_PARAM_CHECK(client); in mqtt_abort()
583 mqtt_mutex_lock(client); in mqtt_abort()
585 if (client->internal.state != MQTT_STATE_IDLE) { in mqtt_abort()
586 client_disconnect(client, -ECONNABORTED, true); in mqtt_abort()
589 mqtt_mutex_unlock(client); in mqtt_abort()
594 int mqtt_live(struct mqtt_client *client) in mqtt_live() argument
600 NULL_PARAM_CHECK(client); in mqtt_live()
602 mqtt_mutex_lock(client); in mqtt_live()
605 client->internal.last_activity); in mqtt_live()
606 if ((client->keepalive > 0) && in mqtt_live()
607 (elapsed_time >= (client->keepalive * 1000))) { in mqtt_live()
608 err_code = mqtt_ping(client); in mqtt_live()
612 mqtt_mutex_unlock(client); in mqtt_live()
621 int mqtt_keepalive_time_left(const struct mqtt_client *client) in mqtt_keepalive_time_left() argument
624 client->internal.last_activity); in mqtt_keepalive_time_left()
625 uint32_t keepalive_ms = 1000U * client->keepalive; in mqtt_keepalive_time_left()
627 if (client->keepalive == 0) { in mqtt_keepalive_time_left()
639 int mqtt_input(struct mqtt_client *client) in mqtt_input() argument
643 NULL_PARAM_CHECK(client); in mqtt_input()
645 mqtt_mutex_lock(client); in mqtt_input()
647 NET_DBG("state:0x%08x", client->internal.state); in mqtt_input()
649 if (MQTT_HAS_STATE(client, MQTT_STATE_TCP_CONNECTED)) { in mqtt_input()
650 err_code = client_read(client); in mqtt_input()
655 mqtt_mutex_unlock(client); in mqtt_input()
660 static int read_publish_payload(struct mqtt_client *client, void *buffer, in read_publish_payload() argument
665 NULL_PARAM_CHECK(client); in read_publish_payload()
667 mqtt_mutex_lock(client); in read_publish_payload()
669 if (client->internal.remaining_payload == 0U) { in read_publish_payload()
674 if (client->internal.remaining_payload < length) { in read_publish_payload()
675 length = client->internal.remaining_payload; in read_publish_payload()
678 ret = mqtt_transport_read(client, buffer, length, shall_block); in read_publish_payload()
688 client_disconnect(client, ret, true); in read_publish_payload()
692 client->internal.remaining_payload -= ret; in read_publish_payload()
695 mqtt_mutex_unlock(client); in read_publish_payload()
700 int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, in mqtt_read_publish_payload() argument
703 return read_publish_payload(client, buffer, length, false); in mqtt_read_publish_payload()
706 int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer, in mqtt_read_publish_payload_blocking() argument
709 return read_publish_payload(client, buffer, length, true); in mqtt_read_publish_payload_blocking()
712 int mqtt_readall_publish_payload(struct mqtt_client *client, uint8_t *buffer, in mqtt_readall_publish_payload() argument
718 int ret = mqtt_read_publish_payload_blocking(client, buffer, in mqtt_readall_publish_payload()