1 // 2 // Copyright (c) 2010-2025 Antmicro 3 // Copyright (c) 2011-2015 Realtime Embedded 4 // 5 // This file is licensed under the MIT License. 6 // Full license text is available in 'licenses/MIT.txt'. 7 // 8 using System; 9 using System.Collections.Concurrent; 10 using System.Collections.Generic; 11 using System.Diagnostics; 12 using System.IO; 13 using System.Linq; 14 using System.Net; 15 using System.Net.Sockets; 16 using System.Threading; 17 using Antmicro.Renode.Logging; 18 using Antmicro.Renode.Sockets; 19 using Antmicro.Renode.Exceptions; 20 21 namespace Antmicro.Renode.Utilities 22 { 23 public class SocketServerProvider : IDisposable 24 { SocketServerProvider(bool emitConfigBytes = true, bool flushOnConnect = false, string serverName = R)25 public SocketServerProvider(bool emitConfigBytes = true, bool flushOnConnect = false, string serverName = "") 26 { 27 queue = new ConcurrentQueue<byte[]>(); 28 enqueuedEvent = new AutoResetEvent(false); 29 this.emitConfigBytes = emitConfigBytes; 30 this.flushOnConnect = flushOnConnect; 31 this.serverName = serverName; 32 } 33 Start(int port)34 public void Start(int port) 35 { 36 server = SocketsManager.Instance.AcquireSocket(null ,AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp, new IPEndPoint(IPAddress.Any, port), listeningBacklog: 1, nameAppendix: this.serverName); 37 38 listenerThread = new Thread(ListenerThreadBody) 39 { 40 IsBackground = true, 41 Name = GetType().Name 42 }; 43 44 stopRequested = false; 45 listenerThread.Start(); 46 } 47 Stop()48 public void Stop() 49 { 50 if(server != null) 51 { 52 if(!SocketsManager.Instance.TryDropSocket(server)) 53 { 54 Logger.LogAs(this, LogLevel.Debug, "Failed to drop socket from the manager"); 55 } 56 } 57 socket?.Close(); 58 stopRequested = true; 59 cancellationToken?.Cancel(); 60 61 var currentThreadId = Thread.CurrentThread.ManagedThreadId; 62 if(readerThread?.ManagedThreadId != currentThreadId) 63 { 64 listenerThread?.Join(); 65 listenerThread = null; 66 } 67 } 68 Dispose()69 public void Dispose() 70 { 71 Stop(); 72 } 73 SendByte(byte b)74 public void SendByte(byte b) 75 { 76 queue.Enqueue(new byte[] { b }); 77 enqueuedEvent.Set(); 78 } 79 Send(byte[] bytes)80 public void Send(byte[] bytes) 81 { 82 queue.Enqueue(bytes); 83 enqueuedEvent.Set(); 84 } 85 Send(IEnumerable<byte> bytes)86 public void Send(IEnumerable<byte> bytes) 87 { 88 Send(bytes.ToArray()); 89 } 90 91 public int BufferSize { get; set; } = 1; 92 93 public bool IsAnythingReceiving => DataReceived != null && DataBlockReceived != null; 94 95 public int? Port => (server?.LocalEndPoint as IPEndPoint)?.Port; 96 97 public event Action ConnectionClosed; 98 public event Action<Stream> ConnectionAccepted; 99 public event Action<int> DataReceived; 100 public event Action<byte[]> DataBlockReceived; 101 WriterThreadBody(Stream stream)102 private void WriterThreadBody(Stream stream) 103 { 104 try 105 { 106 // This thread will poll for bytes constantly for `MaxReadThreadPoolingTimeMs` to assert we have the lowest possible latency while transmiting packet. 107 var watch = new Stopwatch(); 108 while(!cancellationToken.IsCancellationRequested) 109 { 110 watch.Start(); 111 while(watch.ElapsedMilliseconds < MaxReadThreadPoolingTimeMs) 112 { 113 while(queue.TryDequeue(out var dequeued)) 114 { 115 stream.Write(dequeued, 0, dequeued.Length); 116 } 117 } 118 watch.Reset(); 119 enqueuedEvent.WaitOne(); 120 } 121 } 122 catch(OperationCanceledException) 123 { 124 } 125 catch(IOException) 126 { 127 } 128 catch(ObjectDisposedException) 129 { 130 } 131 cancellationToken.Cancel(); 132 } 133 ReaderThreadBody(Stream stream)134 private void ReaderThreadBody(Stream stream) 135 { 136 var size = BufferSize; 137 var buffer = new byte[size]; 138 139 while(!cancellationToken.IsCancellationRequested) 140 { 141 if(size != BufferSize) 142 { 143 size = BufferSize; 144 buffer = new byte[size]; 145 } 146 try 147 { 148 var count = stream.Read(buffer, 0, size); 149 150 if(count == 0) 151 { 152 break; 153 } 154 155 DataBlockReceived?.Invoke(buffer.Take(count).ToArray()); 156 157 var dataReceived = DataReceived; 158 if(dataReceived != null) 159 { 160 foreach(var b in buffer.Take(count)) 161 { 162 dataReceived((int)b); 163 } 164 } 165 } 166 catch(IOException) 167 { 168 break; 169 } 170 } 171 172 Logger.LogAs(this, LogLevel.Debug, "Client disconnected, stream closed."); 173 cancellationToken.Cancel(); 174 enqueuedEvent.Set(); 175 } 176 ListenerThreadBody()177 private void ListenerThreadBody() 178 { 179 NetworkStream stream; 180 while(!stopRequested) 181 { 182 try 183 { 184 socket = server.Accept(); 185 stream = new NetworkStream(socket); 186 } 187 catch(SocketException) 188 { 189 break; 190 } 191 catch(ObjectDisposedException) 192 { 193 break; 194 } 195 try 196 { 197 if(emitConfigBytes) 198 { 199 var initBytes = new byte[] { 200 255, 253, 0, // IAC DO BINARY 201 255, 251, 1, // IAC WILL ECHO 202 255, 251, 3, // IAC WILL SUPPRESS_GO_AHEAD 203 255, 252, 34, // IAC WONT LINEMODE 204 }; 205 stream.Write(initBytes, 0, initBytes.Length); 206 // we expect 9 bytes as a result of sending 207 // config bytes 208 for (int i = 0; i < 9; i++) 209 { 210 stream.ReadByte(); 211 } 212 } 213 } 214 catch(OperationCanceledException) 215 { 216 } 217 catch(IOException) 218 { 219 } 220 catch(ObjectDisposedException) 221 { 222 } 223 224 var connectionAccepted = ConnectionAccepted; 225 if(connectionAccepted != null) 226 { 227 connectionAccepted(stream); 228 } 229 230 if(flushOnConnect) 231 { 232 // creating a new queue not to have to lock accesses to it. 233 queue = new ConcurrentQueue<byte[]>(); 234 } 235 236 cancellationToken = new CancellationTokenSource(); 237 writerThread = new Thread(() => WriterThreadBody(stream)) 238 { 239 Name = GetType().Name + "_WriterThread", 240 IsBackground = true 241 }; 242 243 readerThread = new Thread(() => ReaderThreadBody(stream)) 244 { 245 Name = GetType().Name + "_ReaderThread", 246 IsBackground = true 247 }; 248 249 writerThread.Start(); 250 readerThread.Start(); 251 252 writerThread.Join(); 253 readerThread.Join(); 254 255 writerThread = null; 256 readerThread = null; 257 258 var connectionClosed = ConnectionClosed; 259 if(connectionClosed != null) 260 { 261 connectionClosed(); 262 } 263 } 264 listenerThread = null; 265 } 266 267 private ConcurrentQueue<byte[]> queue; 268 269 private CancellationTokenSource cancellationToken; 270 private AutoResetEvent enqueuedEvent; 271 private bool emitConfigBytes; 272 private bool flushOnConnect; 273 private readonly string serverName; 274 private volatile bool stopRequested; 275 private Thread listenerThread; 276 private Thread readerThread; 277 private Thread writerThread; 278 private Socket server; 279 private Socket socket; 280 281 private const int MaxReadThreadPoolingTimeMs = 60; 282 } 283 } 284