1 //
2 // server.hpp
3 // ~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #ifndef CHAT_SERVER_HPP
12 #define CHAT_SERVER_HPP
13 
14 #include <list>
15 #include <set>
16 #include <deque>
17 #include <utility>
18 #include "asio.hpp"
19 #include "chat_message.hpp"
20 
21 //----------------------------------------------------------------------
22 
23 typedef std::deque<chat_message> chat_message_queue;
24 
25 extern std::mutex server_ready;
26 
27 //----------------------------------------------------------------------
28 
29 
30 class chat_participant
31 {
32 public:
~chat_participant()33   virtual ~chat_participant() {}
34   virtual void deliver(const chat_message& msg) = 0;
35 };
36 
37 typedef std::shared_ptr<chat_participant> chat_participant_ptr;
38 
39 //----------------------------------------------------------------------
40 
41 class chat_room
42 {
43 public:
join(chat_participant_ptr participant)44   void join(chat_participant_ptr participant)
45   {
46     participants_.insert(participant);
47     for (auto msg: recent_msgs_)
48       participant->deliver(msg);
49   }
50 
leave(chat_participant_ptr participant)51   void leave(chat_participant_ptr participant)
52   {
53     participants_.erase(participant);
54   }
55 
deliver(const chat_message & msg)56   void deliver(const chat_message& msg)
57   {
58     recent_msgs_.push_back(msg);
59     while (recent_msgs_.size() > max_recent_msgs)
60       recent_msgs_.pop_front();
61 
62     for (auto participant: participants_)
63       participant->deliver(msg);
64   }
65 
66 private:
67   std::set<chat_participant_ptr> participants_;
68   enum { max_recent_msgs = 100 };
69   chat_message_queue recent_msgs_;
70 };
71 
72 //----------------------------------------------------------------------
73 
74 
75 class chat_session
76   : public chat_participant,
77     public std::enable_shared_from_this<chat_session>
78 {
79 public:
chat_session(asio::ip::tcp::socket socket,chat_room & room)80   chat_session(asio::ip::tcp::socket socket, chat_room& room)
81     : socket_(std::move(socket)),
82       room_(room)
83   {
84   }
85 
start()86   void start()
87   {
88     room_.join(shared_from_this());
89     do_read_header();
90   }
91 
deliver(const chat_message & msg)92   void deliver(const chat_message& msg)
93   {
94     bool write_in_progress = !write_msgs_.empty();
95     write_msgs_.push_back(msg);
96     if (!write_in_progress)
97     {
98       do_write();
99     }
100   }
101 
102 private:
do_read_header()103   void do_read_header()
104   {
105     auto self(shared_from_this());
106     asio::async_read(socket_,
107         asio::buffer(read_msg_.data(), chat_message::header_length),
108         [this, self](std::error_code ec, std::size_t /*length*/)
109         {
110           if (!ec && read_msg_.decode_header())
111           {
112             do_read_body();
113           }
114           else
115           {
116             room_.leave(shared_from_this());
117           }
118          });
119    }
120 
do_read_body()121    void do_read_body()
122    {
123      auto self(shared_from_this());
124      asio::async_read(socket_,
125          asio::buffer(read_msg_.body(), read_msg_.body_length()),
126          [this, self](std::error_code ec, std::size_t /*length*/)
127          {
128            if (!ec)
129            {
130              ESP_LOGD("asio-chat:", "%s", read_msg_.body());
131              room_.deliver(read_msg_);
132              do_read_header();
133            }
134            else
135            {
136              room_.leave(shared_from_this());
137            }
138          });
139    }
140 
do_write()141    void do_write()
142    {
143      auto self(shared_from_this());
144      asio::async_write(socket_,
145          asio::buffer(write_msgs_.front().data(),
146            write_msgs_.front().length()),
147          [this, self](std::error_code ec, std::size_t /*length*/)
148          {
149            if (!ec)
150            {
151              write_msgs_.pop_front();
152              if (!write_msgs_.empty())
153              {
154                do_write();
155              }
156            }
157            else
158            {
159              room_.leave(shared_from_this());
160            }
161          });
162    }
163 
164    asio::ip::tcp::socket socket_;
165    chat_room& room_;
166    chat_message read_msg_;
167    chat_message_queue write_msgs_;
168  };
169 
170 //----------------------------------------------------------------------
171 
172 class chat_server
173 {
174 public:
chat_server(asio::io_context & io_context,const asio::ip::tcp::endpoint & endpoint)175   chat_server(asio::io_context& io_context,
176       const asio::ip::tcp::endpoint& endpoint)
177     : acceptor_(io_context, endpoint)
178   {
179     do_accept();
180   }
181 
182 private:
do_accept()183   void do_accept()
184   {
185     std::lock_guard<std::mutex> guard(server_ready);
186     acceptor_.async_accept(
187        [this](std::error_code ec, asio::ip::tcp::socket socket)
188        {
189          if (!ec)
190          {
191            std::make_shared<chat_session>(std::move(socket), room_)->start();
192          }
193 
194          do_accept();
195        });
196   }
197 
198   asio::ip::tcp::acceptor acceptor_;
199   chat_room room_;
200 };
201 
202 #endif // CHAT_SERVER_HPP
203