1 (*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20 unit Thrift.Stream;
21
22 {$I Thrift.Defines.inc}
23
24 interface
25
26 uses
27 Classes,
28 SysUtils,
29 SysConst,
30 RTLConsts,
31 {$IFDEF OLD_UNIT_NAMES}
32 ActiveX,
33 {$ELSE}
34 Winapi.ActiveX,
35 {$ENDIF}
36 Thrift.Utils;
37
38 type
39 IThriftStream = interface
40 ['{3A61A8A6-3639-4B91-A260-EFCA23944F3A}']
41 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); overload;
42 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); overload;
Readnull43 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload;
Readnull44 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload;
45 procedure Open;
46 procedure Close;
47 procedure Flush;
IsOpennull48 function IsOpen: Boolean;
ToArraynull49 function ToArray: TBytes;
Sizenull50 function Size : Int64;
Positionnull51 function Position : Int64;
52 end;
53
54
55 TThriftStreamImpl = class abstract( TInterfacedObject, IThriftStream)
56 strict private
57 procedure CheckSizeAndOffset( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer); overload;
58 strict protected
59 // IThriftStream
60 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); overload; inline;
61 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); overload; virtual;
Readnull62 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload; inline;
Readnull63 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload; virtual;
64 procedure Open; virtual; abstract;
65 procedure Close; virtual; abstract;
66 procedure Flush; virtual; abstract;
IsOpennull67 function IsOpen: Boolean; virtual; abstract;
ToArraynull68 function ToArray: TBytes; virtual; abstract;
Sizenull69 function Size : Int64; virtual;
Positionnull70 function Position : Int64; virtual;
71 end;
72
73 TThriftStreamAdapterDelphi = class( TThriftStreamImpl)
74 strict private
75 FStream : TStream;
76 FOwnsStream : Boolean;
77 strict protected
78 // IThriftStream
79 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
Readnull80 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
81 procedure Open; override;
82 procedure Close; override;
83 procedure Flush; override;
IsOpennull84 function IsOpen: Boolean; override;
ToArraynull85 function ToArray: TBytes; override;
Sizenull86 function Size : Int64; override;
Positionnull87 function Position : Int64; override;
88 public
89 constructor Create( const aStream: TStream; aOwnsStream : Boolean);
90 destructor Destroy; override;
91 end;
92
93 TThriftStreamAdapterCOM = class( TThriftStreamImpl)
94 strict private
95 FStream : IStream;
96 strict protected
97 // IThriftStream
98 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
Readnull99 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
100 procedure Open; override;
101 procedure Close; override;
102 procedure Flush; override;
IsOpennull103 function IsOpen: Boolean; override;
ToArraynull104 function ToArray: TBytes; override;
Sizenull105 function Size : Int64; override;
Positionnull106 function Position : Int64; override;
107 public
108 constructor Create( const aStream: IStream);
109 end;
110
111
112 TThriftMemoryStream = class(TMemoryStream)
113 strict protected
114 FInitialCapacity : NativeInt;
115 public
116 constructor Create( const aInitialCapacity : NativeInt = 4096);
117
118 // reimplemented
119 procedure Clear;
120
121 // make it publicly visible
122 property Capacity;
123 end;
124
125
126
127 implementation
128
129 uses Thrift.Transport;
130
131
132 { TThriftMemoryStream }
133
134 constructor TThriftMemoryStream.Create( const aInitialCapacity : NativeInt);
135 begin
136 inherited Create;
137 FInitialCapacity := aInitialCapacity;
138 Clear;
139 end;
140
141
142 procedure TThriftMemoryStream.Clear;
143 // reimplemented to keep initial capacity
144 begin
145 Position := 0;
146 Size := 0;
147
148 // primary goal: minimize costly reallocations (performance!)
149 // secondary goal: prevent costly ressource over-allocations
150 if (FInitialCapacity >= 1024*1024) // if we are talking about MB
151 or ((Capacity div 2) > FInitialCapacity) // or the allocated buffer is really large
152 or (Capacity < FInitialCapacity) // or we are actually below the limit
153 then Capacity := FInitialCapacity;
154 end;
155
156
157 { TThriftStreamAdapterCOM }
158
159 procedure TThriftStreamAdapterCOM.Close;
160 begin
161 FStream := nil;
162 end;
163
164 constructor TThriftStreamAdapterCOM.Create( const aStream: IStream);
165 begin
166 inherited Create;
167 FStream := aStream;
168 end;
169
170 procedure TThriftStreamAdapterCOM.Flush;
171 begin
172 if IsOpen then begin
173 if FStream <> nil then begin
174 FStream.Commit( STGC_DEFAULT );
175 end;
176 end;
177 end;
178
Sizenull179 function TThriftStreamAdapterCOM.Size : Int64;
180 var statstg: TStatStg;
181 begin
182 FillChar( statstg, SizeOf( statstg), 0);
183 if IsOpen
184 and Succeeded( FStream.Stat( statstg, STATFLAG_NONAME ))
185 then result := statstg.cbSize
186 else result := 0;
187 end;
188
TThriftStreamAdapterCOM.Positionnull189 function TThriftStreamAdapterCOM.Position : Int64;
190 var newpos : {$IF CompilerVersion >= 29.0} UInt64 {$ELSE} Int64 {$IFEND};
191 begin
192 if SUCCEEDED( FStream.Seek( 0, STREAM_SEEK_CUR, newpos))
193 then result := Int64(newpos)
194 else raise TTransportExceptionEndOfFile.Create('Seek() error');
195 end;
196
TThriftStreamAdapterCOM.IsOpennull197 function TThriftStreamAdapterCOM.IsOpen: Boolean;
198 begin
199 Result := FStream <> nil;
200 end;
201
202 procedure TThriftStreamAdapterCOM.Open;
203 begin
204 // nothing to do
205 end;
206
Readnull207 function TThriftStreamAdapterCOM.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
208 var pTmp : PByte;
209 begin
210 inherited;
211
212 if count >= buflen-offset
213 then count := buflen-offset;
214
215 Result := 0;
216 if FStream <> nil then begin
217 if count > 0 then begin
218 pTmp := pBuf;
219 Inc( pTmp, offset);
220 FStream.Read( pTmp, count, @Result);
221 end;
222 end;
223 end;
224
ToArraynull225 function TThriftStreamAdapterCOM.ToArray: TBytes;
226 var
227 len : Int64;
228 NewPos : {$IF CompilerVersion >= 29.0} UInt64 {$ELSE} Int64 {$IFEND};
229 cbRead : Integer;
230 begin
231 len := Self.Size;
232 SetLength( Result, len );
233
234 if len > 0 then begin
235 if Succeeded( FStream.Seek( 0, STREAM_SEEK_SET, NewPos) ) then begin
236 FStream.Read( @Result[0], len, @cbRead);
237 end;
238 end;
239 end;
240
241 procedure TThriftStreamAdapterCOM.Write( const pBuf: Pointer; offset: Integer; count: Integer);
242 var nWritten : Integer;
243 pTmp : PByte;
244 begin
245 inherited;
246 if IsOpen then begin
247 if count > 0 then begin
248 pTmp := pBuf;
249 Inc( pTmp, offset);
250 FStream.Write( pTmp, count, @nWritten);
251 end;
252 end;
253 end;
254
255 { TThriftStreamImpl }
256
257 procedure TThriftStreamImpl.CheckSizeAndOffset( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer);
258 begin
259 if count > 0 then begin
260 if (offset < 0) or ( offset >= buflen) then begin
261 raise ERangeError.Create( SBitsIndexError );
262 end;
263 if count > buflen then begin
264 raise ERangeError.Create( SBitsIndexError );
265 end;
266 end;
267 end;
268
Readnull269 function TThriftStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
270 begin
271 if Length(buffer) > 0
272 then Result := Read( @buffer[0], Length(buffer), offset, count)
273 else Result := 0;
274 end;
275
Readnull276 function TThriftStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
277 begin
278 Result := 0;
279 CheckSizeAndOffset( pBuf, buflen, offset, count );
280 end;
281
282 procedure TThriftStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
283 begin
284 if Length(buffer) > 0
285 then Write( @buffer[0], offset, count);
286 end;
287
288 procedure TThriftStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
289 begin
290 CheckSizeAndOffset( pBuf, offset+count, offset, count);
291 end;
292
Sizenull293 function TThriftStreamImpl.Size : Int64;
294 begin
295 ASSERT(FALSE);
296 raise ENotImplemented.Create(ClassName+'.Size');
297 end;
298
Positionnull299 function TThriftStreamImpl.Position : Int64;
300 begin
301 ASSERT(FALSE);
302 raise ENotImplemented.Create(ClassName+'.Position');
303 end;
304
305
306 { TThriftStreamAdapterDelphi }
307
308 constructor TThriftStreamAdapterDelphi.Create( const aStream: TStream; aOwnsStream: Boolean);
309 begin
310 inherited Create;
311 FStream := aStream;
312 FOwnsStream := aOwnsStream;
313 end;
314
315 destructor TThriftStreamAdapterDelphi.Destroy;
316 begin
317 if FOwnsStream
318 then Close;
319
320 inherited;
321 end;
322
323 procedure TThriftStreamAdapterDelphi.Close;
324 begin
325 FStream.Free;
326 FStream := nil;
327 FOwnsStream := False;
328 end;
329
330 procedure TThriftStreamAdapterDelphi.Flush;
331 begin
332 // nothing to do
333 end;
334
Sizenull335 function TThriftStreamAdapterDelphi.Size : Int64;
336 begin
337 result := FStream.Size;
338 end;
339
TThriftStreamAdapterDelphi.Positionnull340 function TThriftStreamAdapterDelphi.Position : Int64;
341 begin
342 result := FStream.Position;
343 end;
344
TThriftStreamAdapterDelphi.IsOpennull345 function TThriftStreamAdapterDelphi.IsOpen: Boolean;
346 begin
347 Result := FStream <> nil;
348 end;
349
350 procedure TThriftStreamAdapterDelphi.Open;
351 begin
352 // nothing to do
353 end;
354
Readnull355 function TThriftStreamAdapterDelphi.Read(const pBuf : Pointer; const buflen : Integer; offset, count: Integer): Integer;
356 var pTmp : PByte;
357 begin
358 inherited;
359
360 if count >= buflen-offset
361 then count := buflen-offset;
362
363 if count > 0 then begin
364 pTmp := pBuf;
365 Inc( pTmp, offset);
366 Result := FStream.Read( pTmp^, count)
367 end
368 else Result := 0;
369 end;
370
ToArraynull371 function TThriftStreamAdapterDelphi.ToArray: TBytes;
372 var
373 OrgPos : Integer;
374 len : Integer;
375 begin
376 if FStream <> nil
377 then len := FStream.Size
378 else len := 0;
379
380 SetLength( Result, len );
381
382 if len > 0 then
383 begin
384 OrgPos := FStream.Position;
385 try
386 FStream.Position := 0;
387 FStream.ReadBuffer( Pointer(@Result[0])^, len );
388 finally
389 FStream.Position := OrgPos;
390 end;
391 end
392 end;
393
394 procedure TThriftStreamAdapterDelphi.Write(const pBuf : Pointer; offset, count: Integer);
395 var pTmp : PByte;
396 begin
397 inherited;
398 if count > 0 then begin
399 pTmp := pBuf;
400 Inc( pTmp, offset);
401 FStream.Write( pTmp^, count)
402 end;
403 end;
404
405 end.
406