1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Collections.Generic;
20 using System.IO;
21 using System.Linq;
22 using System.Net.Http;
23 using System.Net.Http.Headers;
24 using System.Security.Cryptography.X509Certificates;
25 using System.Threading;
26 using System.Threading.Tasks;
27 
28 
29 namespace Thrift.Transport.Client
30 {
31     // ReSharper disable once InconsistentNaming
32     public class THttpTransport : TEndpointTransport
33     {
34         private readonly X509Certificate[] _certificates;
35         private readonly Uri _uri;
36 
37         private int _connectTimeout = 30000; // Timeouts in milliseconds
38         private HttpClient _httpClient;
39         private Stream _inputStream;
40         private MemoryStream _outputStream = new MemoryStream();
41         private bool _isDisposed;
42 
THttpTransport(Uri uri, TConfiguration config, IDictionary<string, string> customRequestHeaders = null, string userAgent = null)43         public THttpTransport(Uri uri, TConfiguration config, IDictionary<string, string> customRequestHeaders = null, string userAgent = null)
44             : this(uri, config, Enumerable.Empty<X509Certificate>(), customRequestHeaders, userAgent)
45         {
46         }
47 
THttpTransport(Uri uri, TConfiguration config, IEnumerable<X509Certificate> certificates, IDictionary<string, string> customRequestHeaders, string userAgent = null)48         public THttpTransport(Uri uri, TConfiguration config, IEnumerable<X509Certificate> certificates,
49             IDictionary<string, string> customRequestHeaders, string userAgent = null)
50             : base(config)
51         {
52             _uri = uri;
53             _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray();
54 
55             if (!string.IsNullOrEmpty(userAgent))
56                 UserAgent = userAgent;
57 
58             // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809
59             // this can be switched to default way (create client->use->dispose per flush) later
60             _httpClient = CreateClient(customRequestHeaders);
61             ConfigureClient(_httpClient);
62         }
63 
64         /// <summary>
65         /// Constructor that takes a <c>HttpClient</c> instance to support using <c>IHttpClientFactory</c>.
66         /// </summary>
67         /// <remarks>As the <c>HttpMessageHandler</c> of the client must be configured at the time of creation, it
68         /// is assumed that the consumer has already added any certificates and configured decompression methods. The
69         /// consumer can use the <c>CreateHttpClientHandler</c> method to get a handler with these set.</remarks>
70         /// <param name="httpClient">Client configured with the desired message handler, user agent, and URI if not
71         /// specified in the <c>uri</c> parameter. A default user agent will be used if not set.</param>
72         /// <param name="config">Thrift configuration object</param>
73         /// <param name="uri">Optional URI to use for requests, if not specified the base address of <c>httpClient</c>
74         /// is used.</param>
THttpTransport(HttpClient httpClient, TConfiguration config, Uri uri = null)75         public THttpTransport(HttpClient httpClient, TConfiguration config, Uri uri = null)
76             : base(config)
77         {
78             _httpClient = httpClient;
79 
80             _uri = uri ?? httpClient.BaseAddress;
81             httpClient.BaseAddress = _uri;
82 
83             var userAgent = _httpClient.DefaultRequestHeaders.UserAgent.ToString();
84             if (!string.IsNullOrEmpty(userAgent))
85                 UserAgent = userAgent;
86 
87             ConfigureClient(_httpClient);
88         }
89 
90         // According to RFC 2616 section 3.8, the "User-Agent" header may not carry a version number
91         public readonly string UserAgent = "Thrift netstd THttpClient";
92 
93         public int ConnectTimeout
94         {
95             set
96             {
97                 _connectTimeout = value;
98                 if(_httpClient != null)
99                     _httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout);
100             }
101             get
102             {
103                 if (_httpClient == null)
104                     return _connectTimeout;
105                 return (int)_httpClient.Timeout.TotalMilliseconds;
106             }
107         }
108 
109         public override bool IsOpen => true;
110 
111         public HttpRequestHeaders RequestHeaders => _httpClient.DefaultRequestHeaders;
112 
113         public MediaTypeHeaderValue ContentType { get; set; }
114 
OpenAsync(CancellationToken cancellationToken)115         public override Task OpenAsync(CancellationToken cancellationToken)
116         {
117             cancellationToken.ThrowIfCancellationRequested();
118             return Task.CompletedTask;
119         }
120 
Close()121         public override void Close()
122         {
123             if (_inputStream != null)
124             {
125                 _inputStream.Dispose();
126                 _inputStream = null;
127             }
128 
129             if (_outputStream != null)
130             {
131                 _outputStream.Dispose();
132                 _outputStream = null;
133             }
134 
135             if (_httpClient != null)
136             {
137                 _httpClient.Dispose();
138                 _httpClient = null;
139             }
140         }
141 
ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)142         public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
143         {
144             cancellationToken.ThrowIfCancellationRequested();
145 
146             if (_inputStream == null)
147                 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent");
148 
149             CheckReadBytesAvailable(length);
150 
151             try
152             {
153 #if NETSTANDARD2_0
154                 var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken);
155 #else
156                 var ret = await _inputStream.ReadAsync(new Memory<byte>(buffer, offset, length), cancellationToken);
157 #endif
158                 if (ret == -1)
159                 {
160                     throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available");
161                 }
162 
163                 CountConsumedMessageBytes(ret);
164                 return ret;
165             }
166             catch (IOException iox)
167             {
168                 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
169             }
170         }
171 
WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)172         public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
173         {
174             cancellationToken.ThrowIfCancellationRequested();
175 
176 #if NETSTANDARD2_0
177             await _outputStream.WriteAsync(buffer, offset, length, cancellationToken);
178 #else
179             await _outputStream.WriteAsync(buffer.AsMemory(offset, length), cancellationToken);
180 #endif
181         }
182 
183         /// <summary>
184         /// Get a client handler configured with recommended properties to use with the <c>HttpClient</c> constructor
185         /// and an <c>IHttpClientFactory</c>.
186         /// </summary>
187         /// <param name="certificates">An optional array of client certificates to associate with the handler.</param>
188         /// <returns>
189         /// A client handler with deflate and gZip compression-decompression algorithms and any client
190         /// certificates passed in via <c>certificates</c>.
191         /// </returns>
CreateHttpClientHandler(X509Certificate[] certificates = null)192         public virtual HttpClientHandler CreateHttpClientHandler(X509Certificate[] certificates = null)
193         {
194             var handler = new HttpClientHandler();
195             if (certificates != null)
196                 handler.ClientCertificates.AddRange(certificates);
197             handler.AutomaticDecompression = System.Net.DecompressionMethods.Deflate | System.Net.DecompressionMethods.GZip;
198             return handler;
199         }
200 
CreateClient(IDictionary<string, string> customRequestHeaders)201         private HttpClient CreateClient(IDictionary<string, string> customRequestHeaders)
202         {
203             var handler = CreateHttpClientHandler(_certificates);
204             var httpClient = new HttpClient(handler);
205 
206 
207             if (customRequestHeaders != null)
208             {
209                 foreach (var item in customRequestHeaders)
210                 {
211                     httpClient.DefaultRequestHeaders.Add(item.Key, item.Value);
212                 }
213             }
214 
215             return httpClient;
216         }
217 
ConfigureClient(HttpClient httpClient)218         private void ConfigureClient(HttpClient httpClient)
219         {
220             if (_connectTimeout > 0)
221             {
222                 httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout);
223             }
224 
225             httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift"));
226 
227             // Clear any user agent values to avoid drift with the field value
228             httpClient.DefaultRequestHeaders.UserAgent.Clear();
229             httpClient.DefaultRequestHeaders.UserAgent.TryParseAdd(UserAgent);
230 
231             httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate"));
232             httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip"));
233         }
234 
FlushAsync(CancellationToken cancellationToken)235         public override async Task FlushAsync(CancellationToken cancellationToken)
236         {
237             try
238             {
239                 _outputStream.Seek(0, SeekOrigin.Begin);
240 
241                 using (var contentStream = new StreamContent(_outputStream))
242                 {
243                     contentStream.Headers.ContentType = ContentType ?? new MediaTypeHeaderValue(@"application/x-thrift");
244 
245                     var response = (await _httpClient.PostAsync(_uri, contentStream, cancellationToken)).EnsureSuccessStatusCode();
246 
247                     _inputStream?.Dispose();
248 #if NETSTANDARD2_0 || NETSTANDARD2_1
249                     _inputStream = await response.Content.ReadAsStreamAsync();
250 #else
251                     _inputStream = await response.Content.ReadAsStreamAsync(cancellationToken);
252 #endif
253                     if (_inputStream.CanSeek)
254                     {
255                         _inputStream.Seek(0, SeekOrigin.Begin);
256                     }
257                 }
258             }
259             catch (IOException iox)
260             {
261                 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
262             }
263             catch (HttpRequestException wx)
264             {
265                 throw new TTransportException(TTransportException.ExceptionType.Unknown,
266                     "Couldn't connect to server: " + wx);
267             }
268             catch (Exception ex)
269             {
270                 throw new TTransportException(TTransportException.ExceptionType.Unknown, ex.Message);
271             }
272             finally
273             {
274                 _outputStream = new MemoryStream();
275                 ResetConsumedMessageSize();
276             }
277         }
278 
279 
280         // IDisposable
Dispose(bool disposing)281         protected override void Dispose(bool disposing)
282         {
283             if (!_isDisposed)
284             {
285                 if (disposing)
286                 {
287                     _inputStream?.Dispose();
288                     _outputStream?.Dispose();
289                     _httpClient?.Dispose();
290                 }
291             }
292             _isDisposed = true;
293         }
294     }
295 }
296