diff --git a/src/ra_log.erl b/src/ra_log.erl index 8a490344..5a3f5398 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -64,8 +64,7 @@ -define(WAL_RESEND_TIMEOUT, 5000). -type ra_meta_key() :: atom(). --type segment_ref() :: {From :: ra_index(), To :: ra_index(), - File :: file:filename_all()}. +-type segment_ref() :: {ra_range:range(), File :: file:filename_all()}. -type event_body() :: {written, ra_term(), ra:range()} | {segments, [{ets:tid(), ra:range()}], [segment_ref()]} | {resend_write, ra_index()} | @@ -279,7 +278,7 @@ init(#{uid := UId, LastSegRefIdx = case SegRefs of [] -> -1; - [{_, L, _} | _] -> + [{{_, L}, _} | _] -> L end, LastWrittenIdx = case ra_log_wal:last_writer_seq(Wal, UId) of @@ -969,7 +968,7 @@ should_snapshot(snapshot, Idx, % We should take a snapshot if the new snapshot index would allow us % to discard any segments or if the we've handled enough commands % since the last snapshot. - CanFreeSegments = lists:any(fun({_, To, _}) -> To =< Idx end, + CanFreeSegments = lists:any(fun({{_, To}, _}) -> To =< Idx end, ra_log_reader:segment_refs(Reader)), CanFreeSegments orelse Idx > SnapLimit; should_snapshot(checkpoint, Idx, @@ -1331,7 +1330,7 @@ recover_ranges(UId, MtRange, SegWriter) -> [SegRef | Acc] end end, [], SegFiles), - SegRanges = [{F, L} || {F, L, _} <- SegRefs], + SegRanges = [Range || {Range, _} <- SegRefs], Ranges = [MtRange | SegRanges], {pick_range(Ranges, undefined), SegRefs}. diff --git a/src/ra_log_reader.erl b/src/ra_log_reader.erl index ddc249c5..83eb4d25 100644 --- a/src/ra_log_reader.erl +++ b/src/ra_log_reader.erl @@ -36,12 +36,10 @@ access_pattern = random :: access_pattern() }). --type segment_ref() :: {From :: ra_index(), To :: ra_index(), - File :: string()}. +-type segment_ref() :: ra_log:segment_ref(). -record(?STATE, {cfg :: #cfg{}, - % first_index = 0 :: ra_index(), last_index = 0 :: ra:index(), - segment_refs = [] :: [segment_ref()], + segment_refs :: ra_lol:state(), open_segments :: ra_flru:state() }). @@ -52,7 +50,7 @@ -export_type([ state/0, read_plan/0 - ]). + ]). %% PUBLIC @@ -81,11 +79,15 @@ init(UId, Dir, MaxOpen, AccessPattern, SegRefs, #{}, Counter) _ -> 0 end, + #?STATE{cfg = Cfg, open_segments = ra_flru:new(MaxOpen, FlruHandler), - % first_index = FirstIdx, last_index = LastIdx, - segment_refs = SegRefs}. + segment_refs = + ra_lol:from_list(fun seg_ref_gt/2, lists:reverse(SegRefs))}. + +seg_ref_gt({{Start, _}, Fn1}, {{_, End}, Fn2}) -> + Start > End andalso Fn1 > Fn2. -spec close(state()) -> ok. close(#?STATE{open_segments = Open}) -> @@ -95,18 +97,21 @@ close(#?STATE{open_segments = Open}) -> -spec update_segments([segment_ref()], state()) -> state(). update_segments(NewSegmentRefs, #?STATE{open_segments = Open0, - segment_refs = SegmentRefs0} = State) -> - SegmentRefs = compact_seg_refs(NewSegmentRefs, SegmentRefs0), + segment_refs = SegRefs0} = State) -> + + SegmentRefs0 = ra_lol:to_list(SegRefs0), + SegmentRefs = compact_segrefs(NewSegmentRefs, SegmentRefs0), + SegRefs = ra_lol:from_list(fun seg_ref_gt/2, lists:reverse(SegmentRefs)), %% check if any of the updated segrefs refer to open segments %% we close these segments so that they can be re-opened with updated %% indexes if needed - Open = lists:foldl(fun ({_, _, F}, Acc0) -> - case ra_flru:evict(F, Acc0) of + Open = lists:foldl(fun ({_, Fn}, Acc0) -> + case ra_flru:evict(Fn, Acc0) of {_, Acc} -> Acc; error -> Acc0 end end, Open0, SegmentRefs), - State#?MODULE{segment_refs = SegmentRefs, + State#?MODULE{segment_refs = SegRefs, open_segments = Open}. -spec handle_log_update({ra_log_update, undefined | pid(), ra_index(), @@ -120,38 +125,42 @@ handle_log_update({ra_log_update, From, _FstIdx, SegRefs}, %% reply to the updater process From ! ra_log_update_processed end, - State#?MODULE{segment_refs = SegRefs, - % first_index = FstIdx, + State#?MODULE{segment_refs = ra_lol:from_list(fun seg_ref_gt/2, + lists:reverse(SegRefs)), open_segments = Open}. -spec update_first_index(ra_index(), state()) -> {state(), [segment_ref()]}. update_first_index(FstIdx, #?STATE{segment_refs = SegRefs0, open_segments = OpenSegs0} = State) -> - case lists:partition(fun({_, To, _}) - when To >= FstIdx -> true; - (_) -> false - end, SegRefs0) of - {_, []} -> - {State, []}; - {Active, Obsolete} -> - ObsoleteKeys = [element(3, O) || O <- Obsolete], - % close any open segments - OpenSegs = lists:foldl(fun (K, OS0) -> - case ra_flru:evict(K, OS0) of - {_, OS} -> OS; - error -> OS0 - end - end, OpenSegs0, ObsoleteKeys), - {State#?STATE{open_segments = OpenSegs, - % first_index = FstIdx, - segment_refs = Active}, - Obsolete} + %% TODO: refactor this so that ra_lol just returns plain lists on both sides? + case ra_lol:takewhile(fun({{_, To}, _}) -> + To >= FstIdx + end, SegRefs0) of + {Active, Obsolete0} -> + case ra_lol:len(Obsolete0) of + 0 -> + {State, []}; + _ -> + Obsolete = ra_lol:to_list(Obsolete0), + ObsoleteKeys = [K || {_, K} <- Obsolete], + % close any open segments + OpenSegs = lists:foldl(fun (K, OS0) -> + case ra_flru:evict(K, OS0) of + {_, OS} -> OS; + error -> OS0 + end + end, OpenSegs0, ObsoleteKeys), + {State#?STATE{open_segments = OpenSegs, + segment_refs = ra_lol:from_list(fun seg_ref_gt/2, + lists:reverse(Active))}, + Obsolete} + end end. -spec segment_refs(state()) -> [segment_ref()]. segment_refs(#?STATE{segment_refs = SegmentRefs}) -> - SegmentRefs. + ra_lol:to_list(SegmentRefs). -spec num_open_segments(state()) -> non_neg_integer(). num_open_segments(#?STATE{open_segments = Open}) -> @@ -170,7 +179,7 @@ fold(_FromIdx, _ToIdx, _Fun, Acc, #?STATE{} = State) -> -spec sparse_read(state(), [ra_index()], [log_entry()]) -> {[log_entry()], state()}. sparse_read(#?STATE{cfg = #cfg{} = Cfg} = State, Indexes, Entries0) -> - {Open, _, SegC, Entries} = (catch segment_sparse_read(State, Indexes, Entries0)), + {Open, SegC, Entries} = (catch segment_sparse_read(State, Indexes, Entries0)), ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, SegC), {Entries, State#?MODULE{open_segments = Open}}. @@ -206,21 +215,30 @@ fetch_term(Idx, #?STATE{cfg = #cfg{} = Cfg} = State0) -> %% LOCAL -segment_read_plan(_RegRefs, [], Acc) -> +segment_read_plan(_SegRefs, [], Acc) -> lists:reverse(Acc); -segment_read_plan([], _Indexes, Acc) -> - %% not all indexes were found - lists:reverse(Acc); -segment_read_plan([{To, From, Fn} | SegRefs], Indexes, Acc) -> - %% TODO: address unnecessary allocation here - Range = {To, From}, - case sparse_read_split(fun (I) -> - ra_range:in(I, Range) - end, Indexes, []) of - {[], _} -> - segment_read_plan(SegRefs, Indexes, Acc); - {Idxs, Rem} -> - segment_read_plan(SegRefs, Rem, [{Idxs, Fn} | Acc]) +segment_read_plan(SegRefs, [Idx | _] = Indexes, Acc) -> + case ra_lol:search(seg_ref_search_fun(Idx), SegRefs) of + {{Range, Fn}, Cont} -> + case sparse_read_split(fun (I) -> + ra_range:in(I, Range) + end, Indexes, []) of + {[], _} -> + segment_read_plan(Cont, Indexes, Acc); + {Idxs, Rem} -> + segment_read_plan(Cont, Rem, [{Idxs, Fn} | Acc]) + end; + undefined -> + %% not found + lists:reverse(Acc) + end. + +seg_ref_search_fun(Idx) -> + fun({{Start, End}, _}) -> + if Idx > End -> higher; + Idx < Start -> lower; + true -> equal + end end. segment_term_query(Idx, #?MODULE{segment_refs = SegRefs, @@ -229,87 +247,80 @@ segment_term_query(Idx, #?MODULE{segment_refs = SegRefs, {Result, Open} = segment_term_query0(Idx, SegRefs, OpenSegs, Cfg), {Result, State#?MODULE{open_segments = Open}}. -segment_term_query0(Idx, [{From, To, Filename} | _], Open0, +segment_term_query0(Idx, SegRefs, Open0, #cfg{directory = Dir, - access_pattern = AccessPattern} = Cfg) - when Idx >= From andalso Idx =< To -> - case ra_flru:fetch(Filename, Open0) of - {ok, Seg, Open} -> - Term = ra_log_segment:term_query(Seg, Idx), - {Term, Open}; - error -> - AbsFn = filename:join(Dir, Filename), - {ok, Seg} = ra_log_segment:open(AbsFn, - #{mode => read, - access_pattern => AccessPattern}), - - incr_counter(Cfg, ?C_RA_LOG_OPEN_SEGMENTS, 1), - Term = ra_log_segment:term_query(Seg, Idx), - {Term, ra_flru:insert(Filename, Seg, Open0)} - end; -segment_term_query0(Idx, [_ | Tail], Open, Cfg) -> - segment_term_query0(Idx, Tail, Open, Cfg); -segment_term_query0(_Idx, [], Open, _) -> - {undefined, Open}. + access_pattern = AccessPattern} = Cfg) -> + case ra_lol:search(seg_ref_search_fun(Idx), SegRefs) of + {{_Range, Fn}, _Cont} -> + case ra_flru:fetch(Fn, Open0) of + {ok, Seg, Open} -> + Term = ra_log_segment:term_query(Seg, Idx), + {Term, Open}; + error -> + AbsFn = filename:join(Dir, Fn), + {ok, Seg} = ra_log_segment:open(AbsFn, + #{mode => read, + access_pattern => AccessPattern}), -segrefs_to_read(From0, To0, _SegRefs, Acc) - when To0 < From0 -> + incr_counter(Cfg, ?C_RA_LOG_OPEN_SEGMENTS, 1), + Term = ra_log_segment:term_query(Seg, Idx), + {Term, ra_flru:insert(Fn, Seg, Open0)} + end; + undefined -> + {undefined, Open0} + end. + +segment_fold_plan(_SegRefs, undefined, Acc) -> Acc; -segrefs_to_read(From0, To0, [{SStart, SEnd, FileName} | SegRefs], Acc) - when SStart =< To0 andalso - SEnd >= From0 -> - %% TODO: use ra_range:range_overlap/2 here? - From = max(From0, SStart), - To = min(To0, SEnd), - Spec = {From, To, FileName}, - segrefs_to_read(From0, SStart - 1, SegRefs, [Spec | Acc]); -segrefs_to_read(From0, To0, [_ | SegRefs], Acc) -> - segrefs_to_read(From0, To0, SegRefs, Acc). +segment_fold_plan(SegRefs, {_ReqStart, ReqEnd} = ReqRange, Acc) -> + case ra_lol:search(seg_ref_search_fun(ReqEnd), SegRefs) of + {{Range, Fn}, Cont} -> + This = ra_range:overlap(ReqRange, Range), + ReqRem = case ra_range:subtract(This, ReqRange) of + [] -> + undefined; + [Rem] -> + Rem + end, + segment_fold_plan(Cont, ReqRem, [{This, Fn} | Acc]); + undefined -> + %% not found + Acc + end. segment_fold(#?STATE{segment_refs = SegRefs, open_segments = OpenSegs, cfg = Cfg} = State, RStart, REnd, Fun, Acc) -> - SegRefsToReadFrom = segrefs_to_read(RStart, REnd, SegRefs, []), + Plan = segment_fold_plan(SegRefs, {RStart, REnd}, []), {Op, A} = lists:foldl( - fun ({From, To, Fn}, {Open0, Ac0}) -> + fun ({{Start, End}, Fn}, {Open0, Ac0}) -> {Seg, Open} = get_segment(Cfg, Open0, Fn), - {Open, ra_log_segment:fold(Seg, From, To, + {Open, ra_log_segment:fold(Seg, Start, End, fun binary_to_term/1, Fun, Ac0)} - end, {OpenSegs, Acc}, SegRefsToReadFrom), + end, {OpenSegs, Acc}, Plan), {State#?MODULE{open_segments = Op}, A}. segment_sparse_read(#?STATE{open_segments = Open}, [], Entries0) -> - {Open, [], 0, Entries0}; + {Open, 0, Entries0}; segment_sparse_read(#?STATE{segment_refs = SegRefs, open_segments = OpenSegs, cfg = Cfg}, Indexes, Entries0) -> + Plan = segment_read_plan(SegRefs, Indexes, []), lists:foldl( - fun(_, {_, [], _, _} = Acc) -> - %% we're done reading - throw(Acc); - ({From, To, Fn}, {Open0, [NextIdx | _] = Idxs, C, En0}) - when NextIdx >= From andalso NextIdx =< To -> + fun ({Idxs, Fn}, {Open0, C, En0}) -> {Seg, Open} = get_segment(Cfg, Open0, Fn), - {ReadIdxs, RemIdxs} = - sparse_read_split(fun (I) -> - I >= From andalso I =< To - end, Idxs, []), {ReadSparseCount, Entries} = - ra_log_segment:read_sparse(Seg, ReadIdxs, + ra_log_segment:read_sparse(Seg, Idxs, fun (I, T, B, Acc) -> [{I, T, binary_to_term(B)} | Acc] - end, - []), - {Open, RemIdxs, C + ReadSparseCount, - lists:reverse(Entries, En0)}; - (_Segref, Acc) -> - Acc - end, {OpenSegs, Indexes, 0, Entries0}, SegRefs). + end, []), + {Open, C + ReadSparseCount, lists:reverse(Entries, En0)} + end, {OpenSegs, 0, Entries0}, Plan). %% like lists:splitwith but without reversing the accumulator sparse_read_split(Fun, [E | Rem] = All, Acc) -> @@ -361,18 +372,24 @@ get_segment_ext(Dir, Open0, Fn) -> end end. -compact_seg_refs([], PreviousSegRefs) -> - PreviousSegRefs; -compact_seg_refs(NewSegRefs, []) -> - NewSegRefs; -compact_seg_refs(NewSegRefs, - [{_, _, SegFile} | RemSegRefs] = PreviousSegRefs) -> - case lists:last(NewSegRefs) of - {_, _, SegFile} -> - % update information about the last previously seen segment - NewSegRefs ++ RemSegRefs; - _ -> - NewSegRefs ++ PreviousSegRefs +compact_segrefs(New, Cur) -> + %% all are in descending order + lists:foldr( + fun + (S, []) -> + [S]; + ({{Start, _}, _} = SegRef, Prev) -> + [SegRef | limit(Start, Prev)] + end, Cur, New). + +limit(_LimitIdx, []) -> + []; +limit(LimitIdx, [{PrevRange, PrevFn} | PrevRem]) -> + case ra_range:limit(LimitIdx, PrevRange) of + undefined -> + limit(LimitIdx, PrevRem); + NewPrevRange -> + [{NewPrevRange, PrevFn} | PrevRem] end. incr_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> @@ -389,24 +406,100 @@ decr_counter(#cfg{counter = undefined}, _, _) -> -include_lib("eunit/include/eunit.hrl"). compact_seg_refs_test() -> - % {From, To, File} - NewRefs = [{10, 100, "2"}], - PrevRefs = [{10, 75, "2"}, {1, 9, "1"}], - ?assertEqual([{10, 100, "2"}, {1, 9, "1"}], compact_seg_refs(NewRefs, PrevRefs)). + NewRefs = [{{10, 100}, "2"}], + PrevRefs = [{{10, 75}, "2"}, {{1, 9}, "1"}], + ?assertEqual([{{10, 100}, "2"}, {{1, 9}, "1"}], + compact_segrefs(NewRefs, PrevRefs)). + +compact_segref_3_test() -> + Data = [ + {{2, 7}, "B"}, + %% this entry has overwritten the prior two + {{5, 10}, "B"}, + {{1, 4}, "A"} + ], + Res = compact_segrefs(Data, []), + ?assertMatch([{{2, 7}, "B"}, + {{1, 1}, "A"}], Res), + ok. + +compact_segref_2_test() -> + Data = [ + {{80, 89}, "80"}, + %% this entry has overwritten the prior two + {{56, 79}, "71"}, + {{70, 85}, "70"}, + {{60, 69}, "60"}, + {{50, 59}, "50"} + ], + Res = compact_segrefs(Data, []), + ?assertMatch([{{80, 89}, "80"}, + {{56, 79}, "71"}, + {{50, 55}, "50"} + ], Res), + ok. + +compact_segref_1_test() -> + Data = [ + {{80, 89}, "80"}, + %% this entry has overwritten the prior one + {{70, 79}, "71"}, + {{70, 85}, "70"}, + %% partial overwrite + {{65, 69}, "65"}, + {{60, 69}, "60"}, + {{50, 59}, "50"}, + {{40, 49}, "40"} + ], + + Res = compact_segrefs(Data, [ + {{30, 39}, "30"}, + {{20, 29}, "20"} + ]), + + %% overwritten entry is no longer there + %% and the segment prior to the partial overwrite has been limited + %% to provide a continuous range + ?assertMatch([{{80, 89}, "80"}, + {{70, 79}, "71"}, + {{65, 69}, "65"}, + {{60, 64}, "60"}, + {{50, 59}, "50"}, + {{40, 49}, "40"}, + {{30, 39}, "30"}, + {{20, 29}, "20"} + ], Res), + ok. + segrefs_to_read_test() -> - SegRefs = [{412,499,"00000005.segment"}, - {284,411,"00000004.segment"}, - {284,310,"00000004b.segment"}, - {200,285,"00000003.segment"}, - {128,255,"00000002.segment"}, - {0,127,"00000001.segment"}], - - ?assertEqual([{199,199,"00000002.segment"}, - {200,283,"00000003.segment"}, - {284,411,"00000004.segment"}, - {412,499,"00000005.segment"}], - segrefs_to_read(199, 499, SegRefs, [])), + + SegRefs = ra_lol:from_list( + fun seg_ref_gt/2, + lists:reverse( + compact_segrefs( + [{{412,499},"00000006.segment"}, + {{284,411},"00000005.segment"}, + %% this segment got overwritten + {{284,500},"00000004.segment"}, + {{200,285},"00000003.segment"}, + {{128,255},"00000002.segment"}, + {{0,127},"00000001.segment"}], []))), + + + ?assertEqual([{{199, 199}, "00000002.segment"}, + {{200, 283}, "00000003.segment"}, + {{284, 411}, "00000005.segment"}, + {{412, 499}, "00000006.segment"}], + segment_fold_plan(SegRefs, {199, 499}, [])), + + %% out of range + ?assertEqual([], segment_fold_plan(SegRefs, {500, 500}, [])), + ?assertEqual([ + {{127,127},"00000001.segment"}, + {{128,128},"00000002.segment"} + ], + segment_fold_plan(SegRefs, {127, 128}, [])), ok. -endif. diff --git a/src/ra_log_segment.erl b/src/ra_log_segment.erl index 84b436e3..a76e8c9a 100644 --- a/src/ra_log_segment.erl +++ b/src/ra_log_segment.erl @@ -386,9 +386,9 @@ filename(#state{cfg = #cfg{filename = Fn}}) -> option(ra_log:segment_ref()). segref(#state{range = undefined}) -> undefined; -segref(#state{range = {Start, End}, +segref(#state{range = Range, cfg = #cfg{filename = Fn}}) -> - {Start, End, filename:basename(Fn)}; + {Range, filename:basename(Fn)}; segref(Filename) -> {ok, Seg} = open(Filename, #{mode => read}), SegRef = segref(Seg), diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl index 7d20dab5..2c378f8b 100644 --- a/src/ra_log_segment_writer.erl +++ b/src/ra_log_segment_writer.erl @@ -179,7 +179,7 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir, ?DEBUG("segment_writer in '~w': completed flush of ~b writers from wal file ~s in ~bms", [System, length(RangesList), WalFile, Diff]), {noreply, State}; -handle_cast({truncate_segments, Who, {_From, _To, Name} = SegRef}, +handle_cast({truncate_segments, Who, {_Range, Name} = SegRef}, #state{segment_conf = SegConf, system = System} = State0) -> %% remove all segments below the provided SegRef @@ -298,7 +298,7 @@ flush_mem_table_ranges({ServerUId, TidRanges0}, %% order they are kept by the ra_log SegRefs = lists:reverse( lists:foldl( - fun ({_, _, FILE}, [{_, _, FILE} | _] = Acc) -> + fun ({_, FILE}, [{_, FILE} | _] = Acc) -> Acc; (Seg, Acc) -> [Seg | Acc] @@ -460,7 +460,8 @@ find_segment_files(Dir) -> segment_files(Dir) -> case prim_file:list_dir(Dir) of {ok, Files0} -> - Files = [filename:join(Dir, F) || F <- Files0, filename:extension(F) == ".segment"], + Files = [filename:join(Dir, F) || F <- Files0, + filename:extension(F) == ".segment"], lists:sort(Files); {error, enoent} -> [] diff --git a/src/ra_lol.erl b/src/ra_lol.erl new file mode 100644 index 00000000..7dcd0269 --- /dev/null +++ b/src/ra_lol.erl @@ -0,0 +1,192 @@ +-module(ra_lol). +%% sorted list of list + +-export([ + new/0, + append/2, + search/2, + takewhile/2, + from_list/1, + from_list/2, + to_list/1, + len/1 + ]). + +-define(MAX_ROW_LEN, 64). + +-type row() :: [term()]. +-type gt_fun() :: fun((Item, Item) -> boolean()). + +-record(?MODULE, {len = 0 :: non_neg_integer(), + row_len = 0 :: non_neg_integer(), + gt_fun :: gt_fun(), + rows = [] :: [row()]}). + +-opaque state() :: #?MODULE{}. + +%% a search continuation +-opaque cont() :: [row()]. + + +-export_type([state/0, + cont/0]). + +-spec new() -> state(). +new() -> + #?MODULE{gt_fun = fun erlang:'>'/2}. + +-spec new(gt_fun()) -> state(). +new(GtFun) -> + #?MODULE{gt_fun = GtFun}. + +%% @doc append an item that is greater than the last appended item +-spec append(Item, state()) -> + state() | out_of_order + when Item :: term(). +append(Item, #?MODULE{rows = []} = State) -> + State#?MODULE{rows = [[Item]], + len = 1, + row_len = 0}; +append(Item, + #?MODULE{len = Len, + gt_fun = GtFun, + row_len = RowLen, + rows = [[LastItem | _] = Row | Rows]} = State) -> + case GtFun(Item, LastItem) of + true -> + case RowLen of + ?MAX_ROW_LEN -> + %% time for a new row + State#?MODULE{rows = [[Item], Row | Rows], + len = Len + 1, + row_len = 1}; + _ -> + State#?MODULE{rows = [[Item | Row] | Rows], + len = Len + 1, + row_len = RowLen + 1} + end; + false -> + out_of_order + end. + + +-spec search(fun((term()) -> higher | lower | equal), + state() | cont()) -> + {term(), cont()} | undefined. +search(SearchFun, #?MODULE{rows = Rows}) -> + search(SearchFun, Rows); +search(SearchFun, Rows) when is_list(Rows) -> + case find_row(SearchFun, Rows) of + [] -> + undefined; + [SearchRow | RemRows] -> + case search_row(SearchFun, SearchRow) of + undefined -> + undefined; + {Item, Rem} -> + {Item, [Rem | RemRows]} + end + end. + +-spec takewhile(fun((Item) -> boolean()), state()) -> + {[Item], state()} + when Item :: term(). +takewhile(Fun, #?MODULE{gt_fun = GtFun} = State) -> + %% not the most efficient but rarely used + {Taken, Left} = lists:splitwith(Fun, to_list(State)), + {Taken, from_list(GtFun, lists:reverse(Left))}. + + +%% @doc initialise an slist from a list sorted in ascending order +-spec from_list(list()) -> state(). +from_list(List) -> + from_list(fun erlang:'>'/2, List). + +-spec from_list(gt_fun(), list()) -> state(). +from_list(GtFun, List) + when is_list(List) -> + lists:foldl(fun append/2, new(GtFun), List). + +-spec to_list(state()) -> list(). +to_list(#?MODULE{rows = Rows}) -> + lists:append(Rows). + +-spec len(state()) -> non_neg_integer(). +len(#?MODULE{len = Len}) -> + Len. + + +%% Internals + +search_row(_SearchFun, []) -> + undefined; +search_row(SearchFun, [Item | Rem]) -> + case SearchFun(Item) of + equal -> + {Item, Rem}; + lower -> + search_row(SearchFun, Rem); + higher -> + undefined + end. + + +find_row(SearchFun, [_Row, Row | Rem] = Rows) -> + %% if last item of the second rows is higher than searching for + %% then return all rows + case SearchFun(hd(Row)) of + higher -> + Rows; + _ -> + %% else keep searching + find_row(SearchFun, [Row | Rem]) + end; +find_row(_SearchFun, Rows) -> + Rows. + +%%% =================== +%%% Internal unit tests +%%% =================== + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +basic_test() -> + Items = lists:seq(1, 100), + L0 = ra_lol:from_list(Items), + ?assertEqual(100, ra_lol:len(L0)), + ?assertEqual(Items, lists:reverse(ra_lol:to_list(L0))), + ?assertMatch(out_of_order, ra_lol:append(1, L0)), + L1 = ra_lol:append(101, L0), + ?assertEqual(101, ra_lol:len(L1)), + SearchFun = fun (T) -> + fun (Item) -> + if T == Item -> equal; + T > Item -> higher; + true -> lower + end + end + end, + [begin + {T, _} = ra_lol:search(SearchFun(T), L1) + end || T <- Items ++ [101]], + + %% test searching with a continuation + _ = lists:foldl(fun (T, Acc) -> + {T, Cont} = ra_lol:search(SearchFun(T), Acc), + Cont + end, L1, lists:reverse(Items ++ [101])), + + TakeFun = fun(Item) -> Item > 50 end, + + {Taken, L2} = takewhile(TakeFun, L1), + ?assertEqual(50, ra_lol:len(L2)), + ?assertEqual(51, length(Taken)), + ?assertMatch(out_of_order, ra_lol:append(50, L2)), + L3 = ra_lol:append(51, L2), + ?assertEqual(51, ra_lol:len(L3)), + + ok. + + +-endif. diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index d552924b..1001c9e5 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -331,12 +331,11 @@ validate_reads_for_overlapped_writes(Config) -> Log4 = write_and_roll(350, 500, 2, Log3), Log5 = write_n(500, 551, 2, Log4), % Log6 = deliver_all_log_events(Log5, 200), - % ct:pal("LAST ~p", [ra_log:last_written(Log6)]), Log6 = deliver_log_events_cond( - Log5, fun (L) -> - {W, _} = ra_log:last_written(L), - W >= 550 - end, 100), + Log5, fun (L) -> + {W, _} = ra_log:last_written(L), + W >= 550 + end, 100), Log7 = validate_fold(1, 199, 1, Log6), Log8 = validate_fold(200, 550, 2, Log7), diff --git a/test/ra_log_segment_SUITE.erl b/test/ra_log_segment_SUITE.erl index 63079fe3..9c9589ed 100644 --- a/test/ra_log_segment_SUITE.erl +++ b/test/ra_log_segment_SUITE.erl @@ -92,8 +92,6 @@ corrupted_segment(Config) -> % write_trunc_until_full(Fn), {ok, SegR} = ra_log_segment:open(Fn, #{mode => read}), - %% ct:pal("Range ~p", [ra_log_segment:segref(SegR)]), - %% ct:pal("SegR ~p", [SegR]), [{1, 2, Data}] = ra_log_segment:fold(SegR, 1, 1, fun ra_lib:id/1, @@ -205,7 +203,7 @@ segref(Config) -> {ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 128}), undefined = ra_log_segment:segref(Seg0), {ok, Seg1} = ra_log_segment:append(Seg0, 1, 2, <<"Adsf">>), - {1, 1, "seg1.seg"} = ra_log_segment:segref(Seg1), + {{1, 1}, "seg1.seg"} = ra_log_segment:segref(Seg1), ok. @@ -330,7 +328,7 @@ overwrite(Config) -> Fn = filename:join(Dir, "seg1.seg"), Data = make_data(1024), {ok, Seg0} = ra_log_segment:open(Fn), - {ok, Seg1} = ra_log_segment:append(Seg0, 5, 2, Data), + {ok, Seg1} = ra_log_segment:append(Seg0, 5, 1, Data), % overwrite - simulates follower receiving entries from new leader {ok, Seg2} = ra_log_segment:append(Seg1, 2, 2, Data), {2, 2} = ra_log_segment:range(Seg2), diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 6ae04fab..b19a4fe0 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -99,7 +99,7 @@ accept_mem_tables(Config) -> ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, make_wal(Config, "w1.wal")), receive - {ra_log_event, {segments, TidRanges, [{1, 3, SegFile}]}} -> + {ra_log_event, {segments, TidRanges, [{{1, 3}, SegFile}]}} -> SegmentFile = filename:join(?config(server_dir, Config), SegFile), {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), % assert Entries have been fully transferred @@ -139,7 +139,7 @@ accept_mem_tables_append(Config) -> make_wal(Config, "w2.wal")), AllEntries = Entries ++ Entries2, receive - {ra_log_event, {segments, [{Tid, {4, 5}}], [{1, 5, Fn}]}} -> + {ra_log_event, {segments, [{Tid, {4, 5}}], [{{1, 5}, Fn}]}} -> SegmentFile = filename:join(?config(server_dir, Config), Fn), {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), % assert Entries have been fully transferred @@ -164,10 +164,10 @@ accept_mem_tables_overwrite(Config) -> ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, make_wal(Config, "w1.wal")), receive - {ra_log_event, {segments, [{Tid, {3, 5}}], [{3, 5, Fn}]}} -> + {ra_log_event, {segments, [{Tid, {3, 5}}], [{{3, 5}, Fn}]}} -> SegmentFile = filename:join(?config(server_dir, Config), Fn), {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), - ?assertMatch({3, 5, _}, ra_log_segment:segref(Seg)), + ?assertMatch({{3, 5}, _}, ra_log_segment:segref(Seg)), ra_log_segment:close(Seg), ok after 3000 -> @@ -181,10 +181,10 @@ accept_mem_tables_overwrite(Config) -> ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges2, make_wal(Config, "w2.wal")), receive - {ra_log_event, {segments, [{Tid2, {1, 3}}], [{1, 3, Fn2}]}} -> + {ra_log_event, {segments, [{Tid2, {1, 3}}], [{{1, 3}, Fn2}]}} -> SegmentFile2 = filename:join(?config(server_dir, Config), Fn2), {ok, Seg2} = ra_log_segment:open(SegmentFile2, #{mode => read}), - ?assertMatch({1, 3, _}, ra_log_segment:segref(Seg2)), + ?assertMatch({{1, 3}, _}, ra_log_segment:segref(Seg2)), C2 = term_to_binary(c2), [{1, 43, _}, {2, 43, _}] = read_sparse(Seg2, [1, 2]), [{3, 43, C2}] = read_sparse(Seg2, [3]), @@ -217,10 +217,10 @@ accept_mem_tables_overwrite_same_wal(Config) -> make_wal(Config, "w2.wal")), receive {ra_log_event, - {segments, [{Tid2, {4, 6}}, {Tid, {2, 5}}], [{2, 6, Fn}]}} -> + {segments, [{Tid2, {4, 6}}, {Tid, {2, 5}}], [{{2, 6}, Fn}]}} -> SegmentFile = filename:join(?config(server_dir, Config), Fn), {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), - ?assertMatch({2, 6, _}, ra_log_segment:segref(Seg)), + ?assertMatch({{2, 6}, _}, ra_log_segment:segref(Seg)), [{2, 42, _}, {3, 42, _}, {4, 43, _}, @@ -253,7 +253,7 @@ accept_mem_tables_multi_segment(Config) -> ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, make_wal(Config, "w.wal")), receive - {ra_log_event, {segments, TidRanges, [{9, 10, _Seg2}, {1, 8, _Seg1}]}} -> + {ra_log_event, {segments, TidRanges, [{{9, 10}, _Seg2}, {{1, 8}, _Seg1}]}} -> ok after 3000 -> flush(), @@ -281,7 +281,7 @@ accept_mem_tables_multi_segment_overwrite(Config) -> make_wal(Config, "w.wal")), LastFile = receive - {ra_log_event, {segments, TidRanges, [{9, 10, Seg2}, {1, 8, _Seg1}]}} -> + {ra_log_event, {segments, TidRanges, [{{9, 10}, Seg2}, {{1, 8}, _Seg1}]}} -> Seg2 % ok after 3000 -> @@ -298,7 +298,7 @@ accept_mem_tables_multi_segment_overwrite(Config) -> make_wal(Config, "w2.wal")), receive {ra_log_event, {segments, TidRanges2, - [{13, 15, _}, {7, 12, LastFile}]}} -> + [{{13, 15}, _}, {{7, 12}, LastFile}]}} -> ok after 3000 -> flush(), @@ -336,7 +336,7 @@ accept_mem_tables_for_down_server(Config) -> ok = file:write_file(WalFile, <<"waldata">>), ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile), receive - {ra_log_event, {segments, [{Tid2, {1, 3}}], [{1, 3, Fn}]}} -> + {ra_log_event, {segments, [{Tid2, {1, 3}}], [{{1, 3}, Fn}]}} -> SegmentFile = filename:join(?config(server_dir, Config), Fn), {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), % assert Entries have been fully transferred @@ -395,7 +395,7 @@ accept_mem_tables_with_deleted_server(Config) -> WalFile = make_wal(Config, "00001.wal"), ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile), receive - {ra_log_event, {segments, [{Tid2, {1, 3}}], [{1, 3, Fn}]}} -> + {ra_log_event, {segments, [{Tid2, {1, 3}}], [{{1, 3}, Fn}]}} -> SegmentFile = filename:join(?config(server_dir, Config), Fn), {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), % assert Entries have been fully transferred @@ -446,7 +446,7 @@ accept_mem_tables_with_corrupt_segment(Config) -> file:write_file(filename:join(?config(server_dir, Config), "0000001.segment"), <<>>), ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile), receive - {ra_log_event, {segments, TidRanges, [{1, 3, SegFile}]}} -> + {ra_log_event, {segments, TidRanges, [{{1, 3}, SegFile}]}} -> SegmentFile = filename:join(?config(server_dir, Config), SegFile), {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), % assert Entries have been fully transferred @@ -486,10 +486,10 @@ accept_mem_tables_multiple_ranges(Config)-> receive {ra_log_event, {segments, _TidRanges, SegRefs}} -> ?assertMatch([ - {49, 64, _}, - {33, 48, _}, - {17, 32, _}, - {1, 16, _} + {{49, 64}, _}, + {{33, 48}, _}, + {{17, 32}, _}, + {{1, 16}, _} ], SegRefs), ok after 3000 -> @@ -546,14 +546,14 @@ truncate_segments(Config) -> WalFile = make_wal(Config, "0000001.wal"), ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile), receive - {ra_log_event, {segments, TidRanges, [{25, 32, S} = Cur | Rem]}} -> + {ra_log_event, {segments, TidRanges, [{{25, 32}, S} = Cur | Rem]}} -> % test a lower index _does not_ delete the file SegmentFile = filename:join(?config(server_dir, Config), S), ?assert(filelib:is_file(SegmentFile)), ok = ra_log_segment_writer:truncate_segments(TblWriterPid, UId, Cur), ra_log_segment_writer:await(?SEGWR), - [{_, _, S1}, {_, _, S2}] = Rem, + [{_, S1}, {_, S2}] = Rem, SegmentFile1 = filename:join(?config(server_dir, Config), S1), ?assertNot(filelib:is_file(SegmentFile1)), SegmentFile2 = filename:join(?config(server_dir, Config), S2), @@ -587,7 +587,7 @@ truncate_segments_with_pending_update(Config) -> ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges2, make_wal(Config, "w2.erl")), receive - {ra_log_event, {segments, _Tid, [{25, 32, S} = Cur | Rem]}} -> + {ra_log_event, {segments, _Tid, [{{25, 32}, S} = Cur | Rem]}} -> % this is the event from the first call to accept_mem_tables, % the Cur segments has been appended to since so should _not_ % be deleted when it is provided as the cutoff segref for @@ -598,7 +598,7 @@ truncate_segments_with_pending_update(Config) -> UId, Cur), ra_log_segment_writer:await(?SEGWR), ?assert(filelib:is_file(SegmentFile)), - [{_, _, S1}, {_, _, S2}] = Rem, + [{_, S1}, {_, S2}] = Rem, SegmentFile1 = filename:join(?config(server_dir, Config), S1), ?assertNot(filelib:is_file(SegmentFile1)), SegmentFile2 = filename:join(?config(server_dir, Config), S2), @@ -633,7 +633,7 @@ truncate_segments_with_pending_overwrite(Config) -> ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges2, make_wal(Config, "w2.wal")), receive - {ra_log_event, {segments, _Tid, [{25, 32, S} = Cur | Rem]}} -> + {ra_log_event, {segments, _Tid, [{{25, 32}, S} = Cur | Rem]}} -> % test a lower index _does not_ delete the file SegmentFile = filename:join(?config(server_dir, Config), S), ?assert(filelib:is_file(SegmentFile)), @@ -642,7 +642,7 @@ truncate_segments_with_pending_overwrite(Config) -> _ = ra_log_segment_writer:await(?SEGWR), SegmentFile = filename:join(?config(server_dir, Config), S), ?assert(filelib:is_file(SegmentFile)), - [{_, _, S1}, {_, _, S2}] = Rem, + [{_, S1}, {_, S2}] = Rem, SegmentFile1 = filename:join(?config(server_dir, Config), S1), ?assertNot(filelib:is_file(SegmentFile1)), SegmentFile2 = filename:join(?config(server_dir, Config), S2), @@ -654,7 +654,7 @@ truncate_segments_with_pending_overwrite(Config) -> throw(ra_log_event_timeout) end, receive - {ra_log_event, {segments, _, [{16, 25, F} = Cur2, {12, 15, F2}]}} -> + {ra_log_event, {segments, _, [{{16, 25}, F} = Cur2, {{12, 15}, F2}]}} -> ?assertMatch([_, _], segments_for(UId, Dir)), ok = ra_log_segment_writer:truncate_segments(TblWriterPid, UId, Cur2), @@ -689,7 +689,7 @@ my_segments(Config) -> WalFile = make_wal(Config, "00001.wal"), ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile), receive - {ra_log_event, {segments, TidRanges, [{1, 3, Fn}]}} -> + {ra_log_event, {segments, TidRanges, [{{1, 3}, Fn}]}} -> SegmentFile = filename:join(?config(server_dir, Config), Fn), [MyFile] = ra_log_segment_writer:my_segments(?SEGWR,UId), ?assertEqual(SegmentFile, unicode:characters_to_binary(MyFile)), @@ -721,7 +721,7 @@ skip_entries_lower_than_snapshot_index(Config) -> ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, make_wal(Config, "w1.wal")), receive - {ra_log_event, {segments, _Tid, [{4, 5, Fn}]}} -> + {ra_log_event, {segments, _Tid, [{{4, 5}, Fn}]}} -> SegmentFile = filename:join(?config(server_dir, Config), Fn), {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), % assert only entries with a higher index than the snapshot