1%% 2%% Licensed to the Apache Software Foundation (ASF) under one 3%% or more contributor license agreements. See the NOTICE file 4%% distributed with this work for additional information 5%% regarding copyright ownership. The ASF licenses this file 6%% to you under the Apache License, Version 2.0 (the 7%% "License"); you may not use this file except in compliance 8%% with the License. You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, 13%% software distributed under the License is distributed on an 14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15%% KIND, either express or implied. See the License for the 16%% specific language governing permissions and limitations 17%% under the License. 18%% 19 20%%% Todo: this might be better off as a gen_server type of transport 21%%% that handles stuff like group commit, similar to TFileTransport 22%%% in cpp land 23-module(thrift_disk_log_transport). 24 25-behaviour(thrift_transport). 26 27%% API 28-export([new/2, new_transport_factory/2, new_transport_factory/3]). 29 30%% thrift_transport callbacks 31-export([read/2, write/2, force_flush/1, flush/1, close/1]). 32 33%% state 34-record(dl_transport, {log, 35 close_on_close = false, 36 sync_every = infinity, 37 sync_tref}). 38-type state() :: #dl_transport{}. 39-include("thrift_transport_behaviour.hrl"). 40 41 42%% Create a transport attached to an already open log. 43%% If you'd like this transport to close the disk_log using disk_log:lclose() 44%% when the transport is closed, pass a {close_on_close, true} tuple in the 45%% Opts list. 46new(LogName, Opts) when is_atom(LogName), is_list(Opts) -> 47 State = parse_opts(Opts, #dl_transport{log = LogName}), 48 49 State2 = 50 case State#dl_transport.sync_every of 51 N when is_integer(N), N > 0 -> 52 {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, [State]), 53 State#dl_transport{sync_tref = TRef}; 54 _ -> State 55 end, 56 57 thrift_transport:new(?MODULE, State2). 58 59 60parse_opts([], State) -> 61 State; 62parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) -> 63 parse_opts(Rest, State#dl_transport{close_on_close = Bool}); 64parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 -> 65 parse_opts(Rest, State#dl_transport{sync_every = Int}). 66 67 68%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 69 70%% disk_log_transport is write-only 71read(State, _Len) -> 72 {State, {error, no_read_from_disk_log}}. 73 74write(This = #dl_transport{log = Log}, Data) -> 75 {This, disk_log:balog(Log, erlang:iolist_to_binary(Data))}. 76 77force_flush(#dl_transport{log = Log}) -> 78 error_logger:info_msg("~p syncing~n", [?MODULE]), 79 disk_log:sync(Log). 80 81flush(This = #dl_transport{log = Log, sync_every = SE}) -> 82 case SE of 83 undefined -> % no time-based sync 84 disk_log:sync(Log); 85 _Else -> % sync will happen automagically 86 ok 87 end, 88 {This, ok}. 89 90 91 92 93%% On close, close the underlying log if we're configured to do so. 94close(This = #dl_transport{close_on_close = false}) -> 95 {This, ok}; 96close(This = #dl_transport{log = Log}) -> 97 {This, disk_log:lclose(Log)}. 98 99 100%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 101 102new_transport_factory(Name, ExtraLogOpts) -> 103 new_transport_factory(Name, ExtraLogOpts, [{close_on_close, true}, 104 {sync_every, 500}]). 105 106new_transport_factory(Name, ExtraLogOpts, TransportOpts) -> 107 F = fun() -> factory_impl(Name, ExtraLogOpts, TransportOpts) end, 108 {ok, F}. 109 110factory_impl(Name, ExtraLogOpts, TransportOpts) -> 111 LogOpts = [{name, Name}, 112 {format, external}, 113 {type, wrap} | 114 ExtraLogOpts], 115 Log = 116 case disk_log:open(LogOpts) of 117 {ok, LogS} -> 118 LogS; 119 {repaired, LogS, Info1, Info2} -> 120 error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [LogS, Info1, Info2]), 121 LogS 122 end, 123 new(Log, TransportOpts). 124