1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <errno.h>
21 #include <netdb.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <unistd.h>
26
27 #include <thrift/c_glib/thrift.h>
28 #include <thrift/c_glib/transport/thrift_transport.h>
29 #include <thrift/c_glib/transport/thrift_buffered_transport.h>
30
31 /* object properties */
32 enum _ThriftBufferedTransportProperties
33 {
34 PROP_0,
35 PROP_THRIFT_BUFFERED_TRANSPORT_TRANSPORT,
36 PROP_THRIFT_BUFFERED_TRANSPORT_READ_BUFFER_SIZE,
37 PROP_THRIFT_BUFFERED_TRANSPORT_WRITE_BUFFER_SIZE,
38 PROP_THRIFT_BUFFERED_TRANSPORT_CONFIGURATION,
39 PROP_THRIFT_BUFFERED_TRANSPORT_REMAINING_MESSAGE_SIZE,
40 PROP_THRIFT_BUFFERED_TRANSPORT_KNOW_MESSAGE_SIZE
41 };
42
G_DEFINE_TYPE(ThriftBufferedTransport,thrift_buffered_transport,THRIFT_TYPE_TRANSPORT)43 G_DEFINE_TYPE(ThriftBufferedTransport, thrift_buffered_transport, THRIFT_TYPE_TRANSPORT)
44
45 /* implements thrift_transport_is_open */
46 gboolean
47 thrift_buffered_transport_is_open (ThriftTransport *transport)
48 {
49 ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
50 return THRIFT_TRANSPORT_GET_CLASS (t->transport)->is_open (t->transport);
51 }
52
53 /* overrides thrift_transport_peek */
54 gboolean
thrift_buffered_transport_peek(ThriftTransport * transport,GError ** error)55 thrift_buffered_transport_peek (ThriftTransport *transport, GError **error)
56 {
57 ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
58 return (t->r_buf->len > 0) || thrift_transport_peek (t->transport, error);
59 }
60
61 /* implements thrift_transport_open */
62 gboolean
thrift_buffered_transport_open(ThriftTransport * transport,GError ** error)63 thrift_buffered_transport_open (ThriftTransport *transport, GError **error)
64 {
65 ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
66 return THRIFT_TRANSPORT_GET_CLASS (t->transport)->open (t->transport, error);
67 }
68
69 /* implements thrift_transport_close */
70 gboolean
thrift_buffered_transport_close(ThriftTransport * transport,GError ** error)71 thrift_buffered_transport_close (ThriftTransport *transport, GError **error)
72 {
73 ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
74 return THRIFT_TRANSPORT_GET_CLASS (t->transport)->close (t->transport, error);
75 }
76
77 /* the actual read is "slow" because it calls the underlying transport */
78 gint32
thrift_buffered_transport_read_slow(ThriftTransport * transport,gpointer buf,guint32 len,GError ** error)79 thrift_buffered_transport_read_slow (ThriftTransport *transport, gpointer buf,
80 guint32 len, GError **error)
81 {
82 ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
83 gint ret = 0;
84 guint32 want = len;
85 guint32 got = 0;
86 guchar *tmpdata = g_new0 (guchar, len);
87 guint32 have = t->r_buf->len;
88
89
90 /* we shouldn't hit this unless the buffer doesn't have enough to read */
91 g_assert (t->r_buf->len < want);
92
93 /* first copy what we have in our buffer. */
94 if (have > 0)
95 {
96 memcpy (buf, t->r_buf, t->r_buf->len);
97 want -= t->r_buf->len;
98 t->r_buf = g_byte_array_remove_range (t->r_buf, 0, t->r_buf->len);
99 }
100
101 /* if the buffer is still smaller than what we want to read, then just
102 * read it directly. otherwise, fill the buffer and then give out
103 * enough to satisfy the read. */
104 if (t->r_buf_size < want)
105 {
106 if ((ret = THRIFT_TRANSPORT_GET_CLASS (t->transport)->read (t->transport,
107 tmpdata,
108 want,
109 error)) < 0) {
110 g_free (tmpdata);
111 return ret;
112 }
113 got += ret;
114
115 /* copy the data starting from where we left off */
116 memcpy ((guint8 *)buf + have, tmpdata, got);
117 g_free (tmpdata);
118 return got + have;
119 } else {
120 guint32 give;
121
122 if ((ret = THRIFT_TRANSPORT_GET_CLASS (t->transport)->read (t->transport,
123 tmpdata,
124 want,
125 error)) < 0) {
126 g_free (tmpdata);
127 return ret;
128 }
129 got += ret;
130 t->r_buf = g_byte_array_append (t->r_buf, tmpdata, got);
131 g_free (tmpdata);
132 /* hand over what we have up to what the caller wants */
133 give = want < t->r_buf->len ? want : t->r_buf->len;
134
135
136 memcpy ((guint8 *)buf + len - want, t->r_buf->data, give);
137 t->r_buf = g_byte_array_remove_range (t->r_buf, 0, give);
138 want -= give;
139
140 return (len - want);
141 }
142 }
143
144 /* implements thrift_transport_read */
145 gint32
thrift_buffered_transport_read(ThriftTransport * transport,gpointer buf,guint32 len,GError ** error)146 thrift_buffered_transport_read (ThriftTransport *transport, gpointer buf,
147 guint32 len, GError **error)
148 {
149 ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
150 ThriftTransportClass *ttc = THRIFT_TRANSPORT_GET_CLASS (transport);
151 if(!ttc->checkReadBytesAvailable (transport, len, error))
152 {
153 return -1;
154 }
155
156 /* if we have enough buffer data to fulfill the read, just use
157 * a memcpy */
158 if (len <= t->r_buf->len)
159 {
160 memcpy (buf, t->r_buf->data, len);
161 g_byte_array_remove_range (t->r_buf, 0, len);
162 return len;
163 }
164
165 return thrift_buffered_transport_read_slow (transport, buf, len, error);
166 }
167
168 /* implements thrift_transport_read_end
169 * called when write is complete. nothing to do on our end. */
170 gboolean
thrift_buffered_transport_read_end(ThriftTransport * transport,GError ** error)171 thrift_buffered_transport_read_end (ThriftTransport *transport, GError **error)
172 {
173 /* satisfy -Wall */
174 THRIFT_UNUSED_VAR (transport);
175 THRIFT_UNUSED_VAR (error);
176 return TRUE;
177 }
178
179 gboolean
thrift_buffered_transport_write_slow(ThriftTransport * transport,gpointer buf,guint32 len,GError ** error)180 thrift_buffered_transport_write_slow (ThriftTransport *transport, gpointer buf,
181 guint32 len, GError **error)
182 {
183 ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
184 guint32 have_bytes = t->w_buf->len;
185 guint32 space = t->w_buf_size - t->w_buf->len;
186
187 /* we need two syscalls because the buffered data plus the buffer itself
188 * is too big. */
189 if ((have_bytes + len >= 2*t->w_buf_size) || (have_bytes == 0))
190 {
191 if (have_bytes > 0)
192 {
193 if (!THRIFT_TRANSPORT_GET_CLASS (t->transport)->write (t->transport,
194 t->w_buf->data,
195 have_bytes,
196 error)) {
197 return FALSE;
198 }
199 t->w_buf = g_byte_array_remove_range (t->w_buf, 0, have_bytes);
200 }
201 if (!THRIFT_TRANSPORT_GET_CLASS (t->transport)->write (t->transport,
202 buf, len, error)) {
203 return FALSE;
204 }
205 return TRUE;
206 }
207
208 t->w_buf = g_byte_array_append (t->w_buf, buf, space);
209 if (!THRIFT_TRANSPORT_GET_CLASS (t->transport)->write (t->transport,
210 t->w_buf->data,
211 t->w_buf->len,
212 error)) {
213 return FALSE;
214 }
215
216 t->w_buf = g_byte_array_remove_range (t->w_buf, 0, t->w_buf->len);
217 t->w_buf = g_byte_array_append (t->w_buf, (guint8 *)buf + space, len-space);
218
219 return TRUE;
220 }
221
222 /* implements thrift_transport_write */
223 gboolean
thrift_buffered_transport_write(ThriftTransport * transport,const gpointer buf,const guint32 len,GError ** error)224 thrift_buffered_transport_write (ThriftTransport *transport,
225 const gpointer buf,
226 const guint32 len, GError **error)
227 {
228 ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
229
230 /* the length of the current buffer plus the length of the data being read */
231 if (t->w_buf->len + len <= t->w_buf_size)
232 {
233 t->w_buf = g_byte_array_append (t->w_buf, buf, len);
234 return len;
235 }
236
237 return thrift_buffered_transport_write_slow (transport, buf, len, error);
238 }
239
240 /* implements thrift_transport_write_end
241 * called when write is complete. nothing to do on our end. */
242 gboolean
thrift_buffered_transport_write_end(ThriftTransport * transport,GError ** error)243 thrift_buffered_transport_write_end (ThriftTransport *transport, GError **error)
244 {
245 /* satisfy -Wall */
246 THRIFT_UNUSED_VAR (transport);
247 THRIFT_UNUSED_VAR (error);
248 return TRUE;
249 }
250
251 /* implements thrift_transport_flush */
252 gboolean
thrift_buffered_transport_flush(ThriftTransport * transport,GError ** error)253 thrift_buffered_transport_flush (ThriftTransport *transport, GError **error)
254 {
255 ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
256 ThriftTransportClass *ttc = THRIFT_TRANSPORT_GET_CLASS (transport);
257
258 if(!ttc->resetConsumedMessageSize (transport, -1, error))
259 {
260 return FALSE;
261 }
262
263 if (t->w_buf != NULL && t->w_buf->len > 0)
264 {
265 /* write the buffer and then empty it */
266 if (!THRIFT_TRANSPORT_GET_CLASS (t->transport)->write (t->transport,
267 t->w_buf->data,
268 t->w_buf->len,
269 error)) {
270 return FALSE;
271 }
272 t->w_buf = g_byte_array_remove_range (t->w_buf, 0, t->w_buf->len);
273 }
274 THRIFT_TRANSPORT_GET_CLASS (t->transport)->flush (t->transport,
275 error);
276
277 return TRUE;
278 }
279
280 /* initializes the instance */
281 static void
thrift_buffered_transport_init(ThriftBufferedTransport * transport)282 thrift_buffered_transport_init (ThriftBufferedTransport *transport)
283 {
284 transport->transport = NULL;
285 transport->r_buf = g_byte_array_new ();
286 transport->w_buf = g_byte_array_new ();
287 }
288
289 /* destructor */
290 static void
thrift_buffered_transport_finalize(GObject * object)291 thrift_buffered_transport_finalize (GObject *object)
292 {
293 ThriftBufferedTransport *transport = THRIFT_BUFFERED_TRANSPORT (object);
294
295 if (transport->r_buf != NULL)
296 {
297 g_byte_array_free (transport->r_buf, TRUE);
298 }
299 transport->r_buf = NULL;
300
301 if (transport->w_buf != NULL)
302 {
303 g_byte_array_free (transport->w_buf, TRUE);
304 }
305 transport->w_buf = NULL;
306 }
307
308 /* property accessor */
309 void
thrift_buffered_transport_get_property(GObject * object,guint property_id,GValue * value,GParamSpec * pspec)310 thrift_buffered_transport_get_property (GObject *object, guint property_id,
311 GValue *value, GParamSpec *pspec)
312 {
313 ThriftBufferedTransport *transport = THRIFT_BUFFERED_TRANSPORT (object);
314
315 ThriftTransport *tt = THRIFT_TRANSPORT (object);
316
317 THRIFT_UNUSED_VAR (pspec);
318
319 switch (property_id)
320 {
321 case PROP_THRIFT_BUFFERED_TRANSPORT_TRANSPORT:
322 g_value_set_object (value, transport->transport);
323 break;
324 case PROP_THRIFT_BUFFERED_TRANSPORT_READ_BUFFER_SIZE:
325 g_value_set_uint (value, transport->r_buf_size);
326 break;
327 case PROP_THRIFT_BUFFERED_TRANSPORT_WRITE_BUFFER_SIZE:
328 g_value_set_uint (value, transport->w_buf_size);
329 break;
330 case PROP_THRIFT_BUFFERED_TRANSPORT_CONFIGURATION:
331 g_value_set_object (value, tt->configuration);
332 break;
333 case PROP_THRIFT_BUFFERED_TRANSPORT_REMAINING_MESSAGE_SIZE:
334 g_value_set_long (value, tt->remainingMessageSize_);
335 break;
336 case PROP_THRIFT_BUFFERED_TRANSPORT_KNOW_MESSAGE_SIZE:
337 g_value_set_long (value, tt->knowMessageSize_);
338 break;
339 }
340 }
341
342 /* property mutator */
343 void
thrift_buffered_transport_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)344 thrift_buffered_transport_set_property (GObject *object, guint property_id,
345 const GValue *value, GParamSpec *pspec)
346 {
347 ThriftBufferedTransport *transport = THRIFT_BUFFERED_TRANSPORT (object);
348
349 ThriftTransport *tt = THRIFT_TRANSPORT (object);
350
351 THRIFT_UNUSED_VAR (pspec);
352
353 switch (property_id)
354 {
355 case PROP_THRIFT_BUFFERED_TRANSPORT_TRANSPORT:
356 transport->transport = g_value_get_object (value);
357 break;
358 case PROP_THRIFT_BUFFERED_TRANSPORT_READ_BUFFER_SIZE:
359 transport->r_buf_size = g_value_get_uint (value);
360 break;
361 case PROP_THRIFT_BUFFERED_TRANSPORT_WRITE_BUFFER_SIZE:
362 transport->w_buf_size = g_value_get_uint (value);
363 break;
364 case PROP_THRIFT_BUFFERED_TRANSPORT_CONFIGURATION:
365 tt->configuration = g_value_dup_object (value);
366 break;
367 case PROP_THRIFT_BUFFERED_TRANSPORT_REMAINING_MESSAGE_SIZE:
368 tt->remainingMessageSize_ = g_value_get_long (value);
369 break;
370 case PROP_THRIFT_BUFFERED_TRANSPORT_KNOW_MESSAGE_SIZE:
371 tt->knowMessageSize_ = g_value_get_long (value);
372 break;
373 }
374 }
375
376 /* initializes the class */
377 static void
thrift_buffered_transport_class_init(ThriftBufferedTransportClass * cls)378 thrift_buffered_transport_class_init (ThriftBufferedTransportClass *cls)
379 {
380 ThriftTransportClass *ttc = THRIFT_TRANSPORT_CLASS (cls);
381 GObjectClass *gobject_class = G_OBJECT_CLASS (cls);
382 GParamSpec *param_spec = NULL;
383
384 /* setup accessors and mutators */
385 gobject_class->get_property = thrift_buffered_transport_get_property;
386 gobject_class->set_property = thrift_buffered_transport_set_property;
387
388 param_spec = g_param_spec_object ("transport", "transport (construct)",
389 "Thrift transport",
390 THRIFT_TYPE_TRANSPORT,
391 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY);
392 g_object_class_install_property (gobject_class,
393 PROP_THRIFT_BUFFERED_TRANSPORT_TRANSPORT,
394 param_spec);
395
396 param_spec = g_param_spec_uint ("r_buf_size",
397 "read buffer size (construct)",
398 "Set the read buffer size",
399 0, /* min */
400 1048576, /* max, 1024*1024 */
401 512, /* default value */
402 G_PARAM_CONSTRUCT_ONLY |
403 G_PARAM_READWRITE);
404 g_object_class_install_property (gobject_class,
405 PROP_THRIFT_BUFFERED_TRANSPORT_READ_BUFFER_SIZE,
406 param_spec);
407
408 param_spec = g_param_spec_uint ("w_buf_size",
409 "write buffer size (construct)",
410 "Set the write buffer size",
411 0, /* min */
412 1048576, /* max, 1024*1024 */
413 512, /* default value */
414 G_PARAM_CONSTRUCT_ONLY |
415 G_PARAM_READWRITE);
416 g_object_class_install_property (gobject_class,
417 PROP_THRIFT_BUFFERED_TRANSPORT_WRITE_BUFFER_SIZE,
418 param_spec);
419
420 param_spec = g_param_spec_object ("configuration",
421 "configuration (construct)",
422 "Thrift Configuration",
423 THRIFT_TYPE_CONFIGURATION,
424 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY);
425 g_object_class_install_property (gobject_class,
426 PROP_THRIFT_BUFFERED_TRANSPORT_CONFIGURATION,
427 param_spec);
428
429 param_spec = g_param_spec_long ("remainingmessagesize",
430 "remainingmessagesize (construct)",
431 "Set the remaining message size",
432 0, /* min */
433 G_MAXINT32, /* max */
434 DEFAULT_MAX_MESSAGE_SIZE, /* default by construct */
435 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY);
436 g_object_class_install_property (gobject_class,
437 PROP_THRIFT_BUFFERED_TRANSPORT_REMAINING_MESSAGE_SIZE,
438 param_spec);
439
440 param_spec = g_param_spec_long ("knowmessagesize",
441 "knowmessagesize (construct)",
442 "Set the known size of the message",
443 0, /* min */
444 G_MAXINT32, /* max */
445 DEFAULT_MAX_MESSAGE_SIZE, /* default by construct */
446 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY);
447 g_object_class_install_property (gobject_class,
448 PROP_THRIFT_BUFFERED_TRANSPORT_KNOW_MESSAGE_SIZE,
449 param_spec);
450
451 gobject_class->finalize = thrift_buffered_transport_finalize;
452 ttc->is_open = thrift_buffered_transport_is_open;
453 ttc->peek = thrift_buffered_transport_peek;
454 ttc->open = thrift_buffered_transport_open;
455 ttc->close = thrift_buffered_transport_close;
456 ttc->read = thrift_buffered_transport_read;
457 ttc->read_end = thrift_buffered_transport_read_end;
458 ttc->write = thrift_buffered_transport_write;
459 ttc->write_end = thrift_buffered_transport_write_end;
460 ttc->flush = thrift_buffered_transport_flush;
461 }
462