1 // 2 // server.hpp 3 // ~~~~~~~~~~~~~~~ 4 // 5 // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com) 6 // 7 // Distributed under the Boost Software License, Version 1.0. (See accompanying 8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9 // 10 11 #ifndef CHAT_SERVER_HPP 12 #define CHAT_SERVER_HPP 13 14 #include <list> 15 #include <set> 16 #include <deque> 17 #include <utility> 18 #include "asio.hpp" 19 #include "chat_message.hpp" 20 21 //---------------------------------------------------------------------- 22 23 typedef std::deque<chat_message> chat_message_queue; 24 25 extern std::mutex server_ready; 26 27 //---------------------------------------------------------------------- 28 29 30 class chat_participant 31 { 32 public: ~chat_participant()33 virtual ~chat_participant() {} 34 virtual void deliver(const chat_message& msg) = 0; 35 }; 36 37 typedef std::shared_ptr<chat_participant> chat_participant_ptr; 38 39 //---------------------------------------------------------------------- 40 41 class chat_room 42 { 43 public: join(chat_participant_ptr participant)44 void join(chat_participant_ptr participant) 45 { 46 participants_.insert(participant); 47 for (auto msg: recent_msgs_) 48 participant->deliver(msg); 49 } 50 leave(chat_participant_ptr participant)51 void leave(chat_participant_ptr participant) 52 { 53 participants_.erase(participant); 54 } 55 deliver(const chat_message & msg)56 void deliver(const chat_message& msg) 57 { 58 recent_msgs_.push_back(msg); 59 while (recent_msgs_.size() > max_recent_msgs) 60 recent_msgs_.pop_front(); 61 62 for (auto participant: participants_) 63 participant->deliver(msg); 64 } 65 66 private: 67 std::set<chat_participant_ptr> participants_; 68 enum { max_recent_msgs = 100 }; 69 chat_message_queue recent_msgs_; 70 }; 71 72 //---------------------------------------------------------------------- 73 74 75 class chat_session 76 : public chat_participant, 77 public std::enable_shared_from_this<chat_session> 78 { 79 public: chat_session(asio::ip::tcp::socket socket,chat_room & room)80 chat_session(asio::ip::tcp::socket socket, chat_room& room) 81 : socket_(std::move(socket)), 82 room_(room) 83 { 84 } 85 start()86 void start() 87 { 88 room_.join(shared_from_this()); 89 do_read_header(); 90 } 91 deliver(const chat_message & msg)92 void deliver(const chat_message& msg) 93 { 94 bool write_in_progress = !write_msgs_.empty(); 95 write_msgs_.push_back(msg); 96 if (!write_in_progress) 97 { 98 do_write(); 99 } 100 } 101 102 private: do_read_header()103 void do_read_header() 104 { 105 auto self(shared_from_this()); 106 asio::async_read(socket_, 107 asio::buffer(read_msg_.data(), chat_message::header_length), 108 [this, self](std::error_code ec, std::size_t /*length*/) 109 { 110 if (!ec && read_msg_.decode_header()) 111 { 112 do_read_body(); 113 } 114 else 115 { 116 room_.leave(shared_from_this()); 117 } 118 }); 119 } 120 do_read_body()121 void do_read_body() 122 { 123 auto self(shared_from_this()); 124 asio::async_read(socket_, 125 asio::buffer(read_msg_.body(), read_msg_.body_length()), 126 [this, self](std::error_code ec, std::size_t /*length*/) 127 { 128 if (!ec) 129 { 130 ESP_LOGD("asio-chat:", "%s", read_msg_.body()); 131 room_.deliver(read_msg_); 132 do_read_header(); 133 } 134 else 135 { 136 room_.leave(shared_from_this()); 137 } 138 }); 139 } 140 do_write()141 void do_write() 142 { 143 auto self(shared_from_this()); 144 asio::async_write(socket_, 145 asio::buffer(write_msgs_.front().data(), 146 write_msgs_.front().length()), 147 [this, self](std::error_code ec, std::size_t /*length*/) 148 { 149 if (!ec) 150 { 151 write_msgs_.pop_front(); 152 if (!write_msgs_.empty()) 153 { 154 do_write(); 155 } 156 } 157 else 158 { 159 room_.leave(shared_from_this()); 160 } 161 }); 162 } 163 164 asio::ip::tcp::socket socket_; 165 chat_room& room_; 166 chat_message read_msg_; 167 chat_message_queue write_msgs_; 168 }; 169 170 //---------------------------------------------------------------------- 171 172 class chat_server 173 { 174 public: chat_server(asio::io_context & io_context,const asio::ip::tcp::endpoint & endpoint)175 chat_server(asio::io_context& io_context, 176 const asio::ip::tcp::endpoint& endpoint) 177 : acceptor_(io_context, endpoint) 178 { 179 do_accept(); 180 } 181 182 private: do_accept()183 void do_accept() 184 { 185 std::lock_guard<std::mutex> guard(server_ready); 186 acceptor_.async_accept( 187 [this](std::error_code ec, asio::ip::tcp::socket socket) 188 { 189 if (!ec) 190 { 191 std::make_shared<chat_session>(std::move(socket), room_)->start(); 192 } 193 194 do_accept(); 195 }); 196 } 197 198 asio::ip::tcp::acceptor acceptor_; 199 chat_room room_; 200 }; 201 202 #endif // CHAT_SERVER_HPP 203