1%%
2%% Licensed to the Apache Software Foundation (ASF) under one
3%% or more contributor license agreements. See the NOTICE file
4%% distributed with this work for additional information
5%% regarding copyright ownership. The ASF licenses this file
6%% to you under the Apache License, Version 2.0 (the
7%% "License"); you may not use this file except in compliance
8%% with the License. You may obtain a copy of the License at
9%%
10%%   http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing,
13%% software distributed under the License is distributed on an
14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15%% KIND, either express or implied. See the License for the
16%% specific language governing permissions and limitations
17%% under the License.
18%%
19
20%%% Todo: this might be better off as a gen_server type of transport
21%%%       that handles stuff like group commit, similar to TFileTransport
22%%%       in cpp land
23-module(thrift_disk_log_transport).
24
25-behaviour(thrift_transport).
26
27%% API
28-export([new/2, new_transport_factory/2, new_transport_factory/3]).
29
30%% thrift_transport callbacks
31-export([read/2, write/2, force_flush/1, flush/1, close/1]).
32
33%% state
34-record(dl_transport, {log,
35                       close_on_close = false,
36                       sync_every = infinity,
37                       sync_tref}).
38-type state() :: #dl_transport{}.
39-include("thrift_transport_behaviour.hrl").
40
41
42%% Create a transport attached to an already open log.
43%% If you'd like this transport to close the disk_log using disk_log:lclose()
44%% when the transport is closed, pass a {close_on_close, true} tuple in the
45%% Opts list.
46new(LogName, Opts) when is_atom(LogName), is_list(Opts) ->
47    State = parse_opts(Opts, #dl_transport{log = LogName}),
48
49    State2 =
50        case State#dl_transport.sync_every of
51            N when is_integer(N), N > 0 ->
52                {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, [State]),
53                State#dl_transport{sync_tref = TRef};
54            _ -> State
55        end,
56
57    thrift_transport:new(?MODULE, State2).
58
59
60parse_opts([], State) ->
61    State;
62parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) ->
63    parse_opts(Rest, State#dl_transport{close_on_close = Bool});
64parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 ->
65    parse_opts(Rest, State#dl_transport{sync_every = Int}).
66
67
68%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
69
70%% disk_log_transport is write-only
71read(State, _Len) ->
72    {State, {error, no_read_from_disk_log}}.
73
74write(This = #dl_transport{log = Log}, Data) ->
75    {This, disk_log:balog(Log, erlang:iolist_to_binary(Data))}.
76
77force_flush(#dl_transport{log = Log}) ->
78    error_logger:info_msg("~p syncing~n", [?MODULE]),
79    disk_log:sync(Log).
80
81flush(This = #dl_transport{log = Log, sync_every = SE}) ->
82    case SE of
83        undefined -> % no time-based sync
84            disk_log:sync(Log);
85        _Else ->     % sync will happen automagically
86            ok
87    end,
88    {This, ok}.
89
90
91
92
93%% On close, close the underlying log if we're configured to do so.
94close(This = #dl_transport{close_on_close = false}) ->
95    {This, ok};
96close(This = #dl_transport{log = Log}) ->
97    {This, disk_log:lclose(Log)}.
98
99
100%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
101
102new_transport_factory(Name, ExtraLogOpts) ->
103    new_transport_factory(Name, ExtraLogOpts, [{close_on_close, true},
104                                               {sync_every, 500}]).
105
106new_transport_factory(Name, ExtraLogOpts, TransportOpts) ->
107    F = fun() -> factory_impl(Name, ExtraLogOpts, TransportOpts) end,
108    {ok, F}.
109
110factory_impl(Name, ExtraLogOpts, TransportOpts) ->
111    LogOpts = [{name, Name},
112               {format, external},
113               {type, wrap} |
114               ExtraLogOpts],
115    Log =
116        case disk_log:open(LogOpts) of
117            {ok, LogS} ->
118                LogS;
119            {repaired, LogS, Info1, Info2} ->
120                error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [LogS, Info1, Info2]),
121                LogS
122        end,
123    new(Log, TransportOpts).
124