1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Collections.Generic; 20 using System.IO; 21 using System.Linq; 22 using System.Net.Http; 23 using System.Net.Http.Headers; 24 using System.Security.Cryptography.X509Certificates; 25 using System.Threading; 26 using System.Threading.Tasks; 27 28 29 namespace Thrift.Transport.Client 30 { 31 // ReSharper disable once InconsistentNaming 32 public class THttpTransport : TEndpointTransport 33 { 34 private readonly X509Certificate[] _certificates; 35 private readonly Uri _uri; 36 37 private int _connectTimeout = 30000; // Timeouts in milliseconds 38 private HttpClient _httpClient; 39 private Stream _inputStream; 40 private MemoryStream _outputStream = new MemoryStream(); 41 private bool _isDisposed; 42 THttpTransport(Uri uri, TConfiguration config, IDictionary<string, string> customRequestHeaders = null, string userAgent = null)43 public THttpTransport(Uri uri, TConfiguration config, IDictionary<string, string> customRequestHeaders = null, string userAgent = null) 44 : this(uri, config, Enumerable.Empty<X509Certificate>(), customRequestHeaders, userAgent) 45 { 46 } 47 THttpTransport(Uri uri, TConfiguration config, IEnumerable<X509Certificate> certificates, IDictionary<string, string> customRequestHeaders, string userAgent = null)48 public THttpTransport(Uri uri, TConfiguration config, IEnumerable<X509Certificate> certificates, 49 IDictionary<string, string> customRequestHeaders, string userAgent = null) 50 : base(config) 51 { 52 _uri = uri; 53 _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray(); 54 55 if (!string.IsNullOrEmpty(userAgent)) 56 UserAgent = userAgent; 57 58 // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809 59 // this can be switched to default way (create client->use->dispose per flush) later 60 _httpClient = CreateClient(customRequestHeaders); 61 ConfigureClient(_httpClient); 62 } 63 64 /// <summary> 65 /// Constructor that takes a <c>HttpClient</c> instance to support using <c>IHttpClientFactory</c>. 66 /// </summary> 67 /// <remarks>As the <c>HttpMessageHandler</c> of the client must be configured at the time of creation, it 68 /// is assumed that the consumer has already added any certificates and configured decompression methods. The 69 /// consumer can use the <c>CreateHttpClientHandler</c> method to get a handler with these set.</remarks> 70 /// <param name="httpClient">Client configured with the desired message handler, user agent, and URI if not 71 /// specified in the <c>uri</c> parameter. A default user agent will be used if not set.</param> 72 /// <param name="config">Thrift configuration object</param> 73 /// <param name="uri">Optional URI to use for requests, if not specified the base address of <c>httpClient</c> 74 /// is used.</param> THttpTransport(HttpClient httpClient, TConfiguration config, Uri uri = null)75 public THttpTransport(HttpClient httpClient, TConfiguration config, Uri uri = null) 76 : base(config) 77 { 78 _httpClient = httpClient; 79 80 _uri = uri ?? httpClient.BaseAddress; 81 httpClient.BaseAddress = _uri; 82 83 var userAgent = _httpClient.DefaultRequestHeaders.UserAgent.ToString(); 84 if (!string.IsNullOrEmpty(userAgent)) 85 UserAgent = userAgent; 86 87 ConfigureClient(_httpClient); 88 } 89 90 // According to RFC 2616 section 3.8, the "User-Agent" header may not carry a version number 91 public readonly string UserAgent = "Thrift netstd THttpClient"; 92 93 public int ConnectTimeout 94 { 95 set 96 { 97 _connectTimeout = value; 98 if(_httpClient != null) 99 _httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout); 100 } 101 get 102 { 103 if (_httpClient == null) 104 return _connectTimeout; 105 return (int)_httpClient.Timeout.TotalMilliseconds; 106 } 107 } 108 109 public override bool IsOpen => true; 110 111 public HttpRequestHeaders RequestHeaders => _httpClient.DefaultRequestHeaders; 112 113 public MediaTypeHeaderValue ContentType { get; set; } 114 OpenAsync(CancellationToken cancellationToken)115 public override Task OpenAsync(CancellationToken cancellationToken) 116 { 117 cancellationToken.ThrowIfCancellationRequested(); 118 return Task.CompletedTask; 119 } 120 Close()121 public override void Close() 122 { 123 if (_inputStream != null) 124 { 125 _inputStream.Dispose(); 126 _inputStream = null; 127 } 128 129 if (_outputStream != null) 130 { 131 _outputStream.Dispose(); 132 _outputStream = null; 133 } 134 135 if (_httpClient != null) 136 { 137 _httpClient.Dispose(); 138 _httpClient = null; 139 } 140 } 141 ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)142 public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 143 { 144 cancellationToken.ThrowIfCancellationRequested(); 145 146 if (_inputStream == null) 147 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent"); 148 149 CheckReadBytesAvailable(length); 150 151 try 152 { 153 #if NETSTANDARD2_0 154 var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken); 155 #else 156 var ret = await _inputStream.ReadAsync(new Memory<byte>(buffer, offset, length), cancellationToken); 157 #endif 158 if (ret == -1) 159 { 160 throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available"); 161 } 162 163 CountConsumedMessageBytes(ret); 164 return ret; 165 } 166 catch (IOException iox) 167 { 168 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); 169 } 170 } 171 WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)172 public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 173 { 174 cancellationToken.ThrowIfCancellationRequested(); 175 176 #if NETSTANDARD2_0 177 await _outputStream.WriteAsync(buffer, offset, length, cancellationToken); 178 #else 179 await _outputStream.WriteAsync(buffer.AsMemory(offset, length), cancellationToken); 180 #endif 181 } 182 183 /// <summary> 184 /// Get a client handler configured with recommended properties to use with the <c>HttpClient</c> constructor 185 /// and an <c>IHttpClientFactory</c>. 186 /// </summary> 187 /// <param name="certificates">An optional array of client certificates to associate with the handler.</param> 188 /// <returns> 189 /// A client handler with deflate and gZip compression-decompression algorithms and any client 190 /// certificates passed in via <c>certificates</c>. 191 /// </returns> CreateHttpClientHandler(X509Certificate[] certificates = null)192 public virtual HttpClientHandler CreateHttpClientHandler(X509Certificate[] certificates = null) 193 { 194 var handler = new HttpClientHandler(); 195 if (certificates != null) 196 handler.ClientCertificates.AddRange(certificates); 197 handler.AutomaticDecompression = System.Net.DecompressionMethods.Deflate | System.Net.DecompressionMethods.GZip; 198 return handler; 199 } 200 CreateClient(IDictionary<string, string> customRequestHeaders)201 private HttpClient CreateClient(IDictionary<string, string> customRequestHeaders) 202 { 203 var handler = CreateHttpClientHandler(_certificates); 204 var httpClient = new HttpClient(handler); 205 206 207 if (customRequestHeaders != null) 208 { 209 foreach (var item in customRequestHeaders) 210 { 211 httpClient.DefaultRequestHeaders.Add(item.Key, item.Value); 212 } 213 } 214 215 return httpClient; 216 } 217 ConfigureClient(HttpClient httpClient)218 private void ConfigureClient(HttpClient httpClient) 219 { 220 if (_connectTimeout > 0) 221 { 222 httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout); 223 } 224 225 httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift")); 226 227 // Clear any user agent values to avoid drift with the field value 228 httpClient.DefaultRequestHeaders.UserAgent.Clear(); 229 httpClient.DefaultRequestHeaders.UserAgent.TryParseAdd(UserAgent); 230 231 httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate")); 232 httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip")); 233 } 234 FlushAsync(CancellationToken cancellationToken)235 public override async Task FlushAsync(CancellationToken cancellationToken) 236 { 237 try 238 { 239 _outputStream.Seek(0, SeekOrigin.Begin); 240 241 using (var contentStream = new StreamContent(_outputStream)) 242 { 243 contentStream.Headers.ContentType = ContentType ?? new MediaTypeHeaderValue(@"application/x-thrift"); 244 245 var response = (await _httpClient.PostAsync(_uri, contentStream, cancellationToken)).EnsureSuccessStatusCode(); 246 247 _inputStream?.Dispose(); 248 #if NETSTANDARD2_0 || NETSTANDARD2_1 249 _inputStream = await response.Content.ReadAsStreamAsync(); 250 #else 251 _inputStream = await response.Content.ReadAsStreamAsync(cancellationToken); 252 #endif 253 if (_inputStream.CanSeek) 254 { 255 _inputStream.Seek(0, SeekOrigin.Begin); 256 } 257 } 258 } 259 catch (IOException iox) 260 { 261 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); 262 } 263 catch (HttpRequestException wx) 264 { 265 throw new TTransportException(TTransportException.ExceptionType.Unknown, 266 "Couldn't connect to server: " + wx); 267 } 268 catch (Exception ex) 269 { 270 throw new TTransportException(TTransportException.ExceptionType.Unknown, ex.Message); 271 } 272 finally 273 { 274 _outputStream = new MemoryStream(); 275 ResetConsumedMessageSize(); 276 } 277 } 278 279 280 // IDisposable Dispose(bool disposing)281 protected override void Dispose(bool disposing) 282 { 283 if (!_isDisposed) 284 { 285 if (disposing) 286 { 287 _inputStream?.Dispose(); 288 _outputStream?.Dispose(); 289 _httpClient?.Dispose(); 290 } 291 } 292 _isDisposed = true; 293 } 294 } 295 } 296