1 //
2 // Copyright (c) 2010-2025 Antmicro
3 // Copyright (c) 2011-2015 Realtime Embedded
4 //
5 // This file is licensed under the MIT License.
6 // Full license text is available in 'licenses/MIT.txt'.
7 //
8 using System;
9 using System.Collections.Concurrent;
10 using System.Collections.Generic;
11 using System.Diagnostics;
12 using System.IO;
13 using System.Linq;
14 using System.Net;
15 using System.Net.Sockets;
16 using System.Threading;
17 using Antmicro.Renode.Logging;
18 using Antmicro.Renode.Sockets;
19 using Antmicro.Renode.Exceptions;
20 
21 namespace Antmicro.Renode.Utilities
22 {
23     public class SocketServerProvider : IDisposable
24     {
SocketServerProvider(bool emitConfigBytes = true, bool flushOnConnect = false, string serverName = R)25         public SocketServerProvider(bool emitConfigBytes = true, bool flushOnConnect = false, string serverName = "")
26         {
27             queue = new ConcurrentQueue<byte[]>();
28             enqueuedEvent = new AutoResetEvent(false);
29             this.emitConfigBytes = emitConfigBytes;
30             this.flushOnConnect = flushOnConnect;
31             this.serverName = serverName;
32         }
33 
Start(int port)34         public void Start(int port)
35         {
36             server = SocketsManager.Instance.AcquireSocket(null ,AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp, new IPEndPoint(IPAddress.Any, port), listeningBacklog: 1, nameAppendix: this.serverName);
37 
38             listenerThread = new Thread(ListenerThreadBody)
39             {
40                 IsBackground = true,
41                 Name = GetType().Name
42             };
43 
44             stopRequested = false;
45             listenerThread.Start();
46         }
47 
Stop()48         public void Stop()
49         {
50             if(server != null)
51             {
52                 if(!SocketsManager.Instance.TryDropSocket(server))
53                 {
54                     Logger.LogAs(this, LogLevel.Debug, "Failed to drop socket from the manager");
55                 }
56             }
57             socket?.Close();
58             stopRequested = true;
59             cancellationToken?.Cancel();
60 
61             var currentThreadId = Thread.CurrentThread.ManagedThreadId;
62             if(readerThread?.ManagedThreadId != currentThreadId)
63             {
64                 listenerThread?.Join();
65                 listenerThread = null;
66             }
67         }
68 
Dispose()69         public void Dispose()
70         {
71             Stop();
72         }
73 
SendByte(byte b)74         public void SendByte(byte b)
75         {
76             queue.Enqueue(new byte[] { b });
77             enqueuedEvent.Set();
78         }
79 
Send(byte[] bytes)80         public void Send(byte[] bytes)
81         {
82             queue.Enqueue(bytes);
83             enqueuedEvent.Set();
84         }
85 
Send(IEnumerable<byte> bytes)86         public void Send(IEnumerable<byte> bytes)
87         {
88             Send(bytes.ToArray());
89         }
90 
91         public int BufferSize { get; set; } = 1;
92 
93         public bool IsAnythingReceiving => DataReceived != null && DataBlockReceived != null;
94 
95         public int? Port => (server?.LocalEndPoint as IPEndPoint)?.Port;
96 
97         public event Action ConnectionClosed;
98         public event Action<Stream> ConnectionAccepted;
99         public event Action<int> DataReceived;
100         public event Action<byte[]> DataBlockReceived;
101 
WriterThreadBody(Stream stream)102         private void WriterThreadBody(Stream stream)
103         {
104             try
105             {
106                 // This thread will poll for bytes constantly for `MaxReadThreadPoolingTimeMs` to assert we have the lowest possible latency while transmiting packet.
107                 var watch = new Stopwatch();
108                 while(!cancellationToken.IsCancellationRequested)
109                 {
110                     watch.Start();
111                     while(watch.ElapsedMilliseconds < MaxReadThreadPoolingTimeMs)
112                     {
113                         while(queue.TryDequeue(out var dequeued))
114                         {
115                             stream.Write(dequeued, 0, dequeued.Length);
116                         }
117                     }
118                     watch.Reset();
119                     enqueuedEvent.WaitOne();
120                 }
121             }
122             catch(OperationCanceledException)
123             {
124             }
125             catch(IOException)
126             {
127             }
128             catch(ObjectDisposedException)
129             {
130             }
131             cancellationToken.Cancel();
132         }
133 
ReaderThreadBody(Stream stream)134         private void ReaderThreadBody(Stream stream)
135         {
136             var size = BufferSize;
137             var buffer = new byte[size];
138 
139             while(!cancellationToken.IsCancellationRequested)
140             {
141                 if(size != BufferSize)
142                 {
143                     size = BufferSize;
144                     buffer = new byte[size];
145                 }
146                 try
147                 {
148                     var count = stream.Read(buffer, 0, size);
149 
150                     if(count == 0)
151                     {
152                         break;
153                     }
154 
155                     DataBlockReceived?.Invoke(buffer.Take(count).ToArray());
156 
157                     var dataReceived = DataReceived;
158                     if(dataReceived != null)
159                     {
160                         foreach(var b in buffer.Take(count))
161                         {
162                             dataReceived((int)b);
163                         }
164                     }
165                 }
166                 catch(IOException)
167                 {
168                     break;
169                 }
170             }
171 
172             Logger.LogAs(this, LogLevel.Debug, "Client disconnected, stream closed.");
173             cancellationToken.Cancel();
174             enqueuedEvent.Set();
175         }
176 
ListenerThreadBody()177         private void ListenerThreadBody()
178         {
179             NetworkStream stream;
180             while(!stopRequested)
181             {
182                 try
183                 {
184                     socket = server.Accept();
185                     stream = new NetworkStream(socket);
186                 }
187                 catch(SocketException)
188                 {
189                     break;
190                 }
191                 catch(ObjectDisposedException)
192                 {
193                     break;
194                 }
195                 try
196                 {
197                     if(emitConfigBytes)
198                     {
199                         var initBytes = new byte[] {
200                             255, 253,   0, // IAC DO    BINARY
201                             255, 251,   1, // IAC WILL  ECHO
202                             255, 251,   3, // IAC WILL  SUPPRESS_GO_AHEAD
203                             255, 252,  34, // IAC WONT  LINEMODE
204                         };
205                         stream.Write(initBytes, 0, initBytes.Length);
206                         // we expect 9 bytes as a result of sending
207                         // config bytes
208                         for (int i = 0; i < 9; i++)
209                         {
210                             stream.ReadByte();
211                         }
212                     }
213                 }
214                 catch(OperationCanceledException)
215                 {
216                 }
217                 catch(IOException)
218                 {
219                 }
220                 catch(ObjectDisposedException)
221                 {
222                 }
223 
224                 var connectionAccepted = ConnectionAccepted;
225                 if(connectionAccepted != null)
226                 {
227                     connectionAccepted(stream);
228                 }
229 
230                 if(flushOnConnect)
231                 {
232                     // creating a new queue not to have to lock accesses to it.
233                     queue = new ConcurrentQueue<byte[]>();
234                 }
235 
236                 cancellationToken = new CancellationTokenSource();
237                 writerThread = new Thread(() => WriterThreadBody(stream))
238                 {
239                     Name = GetType().Name + "_WriterThread",
240                     IsBackground = true
241                 };
242 
243                 readerThread = new Thread(() => ReaderThreadBody(stream))
244                 {
245                     Name = GetType().Name + "_ReaderThread",
246                     IsBackground = true
247                 };
248 
249                 writerThread.Start();
250                 readerThread.Start();
251 
252                 writerThread.Join();
253                 readerThread.Join();
254 
255                 writerThread = null;
256                 readerThread = null;
257 
258                 var connectionClosed = ConnectionClosed;
259                 if(connectionClosed != null)
260                 {
261                     connectionClosed();
262                 }
263             }
264             listenerThread = null;
265         }
266 
267         private ConcurrentQueue<byte[]> queue;
268 
269         private CancellationTokenSource cancellationToken;
270         private AutoResetEvent enqueuedEvent;
271         private bool emitConfigBytes;
272         private bool flushOnConnect;
273         private readonly string serverName;
274         private volatile bool stopRequested;
275         private Thread listenerThread;
276         private Thread readerThread;
277         private Thread writerThread;
278         private Socket server;
279         private Socket socket;
280 
281         private const int MaxReadThreadPoolingTimeMs = 60;
282     }
283 }
284