Skip to content

Optimise read planning #487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
@@ -64,8 +64,7 @@
-define(WAL_RESEND_TIMEOUT, 5000).

-type ra_meta_key() :: atom().
-type segment_ref() :: {From :: ra_index(), To :: ra_index(),
File :: file:filename_all()}.
-type segment_ref() :: {ra_range:range(), File :: file:filename_all()}.
-type event_body() :: {written, ra_term(), ra:range()} |
{segments, [{ets:tid(), ra:range()}], [segment_ref()]} |
{resend_write, ra_index()} |
@@ -279,7 +278,7 @@ init(#{uid := UId,
LastSegRefIdx = case SegRefs of
[] ->
-1;
[{_, L, _} | _] ->
[{{_, L}, _} | _] ->
L
end,
LastWrittenIdx = case ra_log_wal:last_writer_seq(Wal, UId) of
@@ -969,8 +968,14 @@ should_snapshot(snapshot, Idx,
% We should take a snapshot if the new snapshot index would allow us
% to discard any segments or if the we've handled enough commands
% since the last snapshot.
CanFreeSegments = lists:any(fun({_, To, _}) -> To =< Idx end,
ra_log_reader:segment_refs(Reader)),
CanFreeSegments = case ra_log_reader:range(Reader) of
undefined ->
false;
{Start, _End} ->
%% this isn't 100% guaranteed to free a segment
%% but there is a good chance
Idx > Start
end,
CanFreeSegments orelse Idx > SnapLimit;
should_snapshot(checkpoint, Idx,
#?MODULE{cfg = #cfg{min_checkpoint_interval = CheckpointInter},
@@ -1029,7 +1034,8 @@ overview(#?MODULE{last_index = LastIndex,
last_term => LastTerm,
first_index => FirstIndex,
last_written_index_term => LWIT,
num_segments => length(ra_log_reader:segment_refs(Reader)),
num_segments => ra_log_reader:segment_ref_count(Reader),
segments_range => ra_log_reader:range(Reader),
open_segments => ra_log_reader:num_open_segments(Reader),
snapshot_index => case CurrSnap of
undefined -> undefined;
@@ -1166,9 +1172,9 @@ delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId,
ok = ra_log_segment_writer:truncate_segments(SegWriter,
UId, Pivot)
end),
Active = ra_log_reader:segment_refs(Reader),
NumActive = ra_log_reader:segment_ref_count(Reader),
?DEBUG("~ts: ~b obsolete segments at ~b - remaining: ~b, pivot ~0p",
[LogId, length(Obsolete), SnapIdx, length(Active), Pivot]),
[LogId, length(Obsolete), SnapIdx, NumActive, Pivot]),
State = State0#?MODULE{reader = Reader},
{State, log_update_effects(Readers, Pid, State)}
end.
@@ -1331,7 +1337,7 @@ recover_ranges(UId, MtRange, SegWriter) ->
[SegRef | Acc]
end
end, [], SegFiles),
SegRanges = [{F, L} || {F, L, _} <- SegRefs],
SegRanges = [Range || {Range, _} <- SegRefs],
Ranges = [MtRange | SegRanges],
{pick_range(Ranges, undefined), SegRefs}.

394 changes: 254 additions & 140 deletions src/ra_log_reader.erl

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/ra_log_segment.erl
Original file line number Diff line number Diff line change
@@ -386,9 +386,9 @@ filename(#state{cfg = #cfg{filename = Fn}}) ->
option(ra_log:segment_ref()).
segref(#state{range = undefined}) ->
undefined;
segref(#state{range = {Start, End},
segref(#state{range = Range,
cfg = #cfg{filename = Fn}}) ->
{Start, End, filename:basename(Fn)};
{Range, filename:basename(Fn)};
segref(Filename) ->
{ok, Seg} = open(Filename, #{mode => read}),
SegRef = segref(Seg),
9 changes: 5 additions & 4 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
@@ -179,7 +179,7 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
?DEBUG("segment_writer in '~w': completed flush of ~b writers from wal file ~s in ~bms",
[System, length(RangesList), WalFile, Diff]),
{noreply, State};
handle_cast({truncate_segments, Who, {_From, _To, Name} = SegRef},
handle_cast({truncate_segments, Who, {_Range, Name} = SegRef},
#state{segment_conf = SegConf,
system = System} = State0) ->
%% remove all segments below the provided SegRef
@@ -298,7 +298,7 @@ flush_mem_table_ranges({ServerUId, TidRanges0},
%% order they are kept by the ra_log
SegRefs = lists:reverse(
lists:foldl(
fun ({_, _, FILE}, [{_, _, FILE} | _] = Acc) ->
fun ({_, FILE}, [{_, FILE} | _] = Acc) ->
Acc;
(Seg, Acc) ->
[Seg | Acc]
@@ -443,7 +443,7 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
exit({segment_writer_append_error, FileName, Posix})
end
catch _:badarg ->
?ERROR("segment_writer: uid ~s ets table deleted", [UId]),
?INFO("segment_writer: uid ~s ets table deleted", [UId]),
%% ets table has been deleted.
%% this could be due to two reasons
%% 1. the ra server has been deleted.
@@ -460,7 +460,8 @@ find_segment_files(Dir) ->
segment_files(Dir) ->
case prim_file:list_dir(Dir) of
{ok, Files0} ->
Files = [filename:join(Dir, F) || F <- Files0, filename:extension(F) == ".segment"],
Files = [filename:join(Dir, F) || F <- Files0,
filename:extension(F) == ".segment"],
lists:sort(Files);
{error, enoent} ->
[]
192 changes: 192 additions & 0 deletions src/ra_lol.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
-module(ra_lol).
%% sorted list of list

-export([
new/0,
append/2,
search/2,
takewhile/2,
from_list/1,
from_list/2,
to_list/1,
len/1
]).

-define(MAX_ROW_LEN, 64).

-type row() :: [term()].
-type gt_fun() :: fun((Item, Item) -> boolean()).

-record(?MODULE, {len = 0 :: non_neg_integer(),
append_row_len = 0 :: non_neg_integer(),
gt_fun :: gt_fun(),
rows = [] :: [row()]}).

-opaque state() :: #?MODULE{}.

%% a search continuation
-opaque cont() :: [row()].


-export_type([state/0,
cont/0]).

-spec new() -> state().
new() ->
#?MODULE{gt_fun = fun erlang:'>'/2}.

-spec new(gt_fun()) -> state().
new(GtFun) ->
#?MODULE{gt_fun = GtFun}.

%% @doc append an item that is greater than the last appended item
-spec append(Item, state()) ->
state() | out_of_order
when Item :: term().
append(Item, #?MODULE{rows = []} = State) ->
State#?MODULE{rows = [[Item]],
len = 1,
append_row_len = 0};
append(Item,
#?MODULE{len = Len,
gt_fun = GtFun,
append_row_len = RowLen,
rows = [[LastItem | _] = Row | Rows]} = State) ->
case GtFun(Item, LastItem) of
true ->
case RowLen of
?MAX_ROW_LEN ->
%% time for a new row
State#?MODULE{rows = [[Item], Row | Rows],
len = Len + 1,
append_row_len = 1};
_ ->
State#?MODULE{rows = [[Item | Row] | Rows],
len = Len + 1,
append_row_len = RowLen + 1}
end;
false ->
out_of_order
end.


-spec search(fun((term()) -> higher | lower | equal),
state() | cont()) ->
{term(), cont()} | undefined.
search(SearchFun, #?MODULE{rows = Rows}) ->
search(SearchFun, Rows);
search(SearchFun, Rows) when is_list(Rows) ->
case find_row(SearchFun, Rows) of
[] ->
undefined;
[SearchRow | RemRows] ->
case search_row(SearchFun, SearchRow) of
undefined ->
undefined;
{Item, Rem} ->
{Item, [Rem | RemRows]}
end
end.

-spec takewhile(fun((Item) -> boolean()), state()) ->
{[Item], state()}
when Item :: term().
takewhile(Fun, #?MODULE{gt_fun = GtFun} = State) ->
%% not the most efficient but rarely used
{Taken, Left} = lists:splitwith(Fun, to_list(State)),
{Taken, from_list(GtFun, lists:reverse(Left))}.


%% @doc initialise from a list sorted in ascending order
-spec from_list(list()) -> state().
from_list(List) ->
from_list(fun erlang:'>'/2, List).

-spec from_list(gt_fun(), list()) -> state().
from_list(GtFun, List)
when is_list(List) ->
lists:foldl(fun append/2, new(GtFun), List).

-spec to_list(state()) -> list().
to_list(#?MODULE{rows = Rows}) ->
lists:append(Rows).

-spec len(state()) -> non_neg_integer().
len(#?MODULE{len = Len}) ->
Len.


%% Internals

search_row(_SearchFun, []) ->
undefined;
search_row(SearchFun, [Item | Rem]) ->
case SearchFun(Item) of
equal ->
{Item, Rem};
lower ->
search_row(SearchFun, Rem);
higher ->
undefined
end.


find_row(SearchFun, [_Row, Row | Rem] = Rows) ->
%% if last item of the second rows is higher than searching for
%% then return all rows
case SearchFun(hd(Row)) of
higher ->
Rows;
_ ->
%% else keep searching
find_row(SearchFun, [Row | Rem])
end;
find_row(_SearchFun, Rows) ->
Rows.

%%% ===================
%%% Internal unit tests
%%% ===================

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

basic_test() ->
Items = lists:seq(1, 100),
L0 = ra_lol:from_list(Items),
?assertEqual(100, ra_lol:len(L0)),
?assertEqual(Items, lists:reverse(ra_lol:to_list(L0))),
?assertMatch(out_of_order, ra_lol:append(1, L0)),
L1 = ra_lol:append(101, L0),
?assertEqual(101, ra_lol:len(L1)),
SearchFun = fun (T) ->
fun (Item) ->
if T == Item -> equal;
T > Item -> higher;
true -> lower
end
end
end,
[begin
{T, _} = ra_lol:search(SearchFun(T), L1)
end || T <- Items ++ [101]],

%% test searching with a continuation
_ = lists:foldl(fun (T, Acc) ->
{T, Cont} = ra_lol:search(SearchFun(T), Acc),
Cont
end, L1, lists:reverse(Items ++ [101])),

TakeFun = fun(Item) -> Item > 50 end,

{Taken, L2} = takewhile(TakeFun, L1),
?assertEqual(50, ra_lol:len(L2)),
?assertEqual(51, length(Taken)),
?assertMatch(out_of_order, ra_lol:append(50, L2)),
L3 = ra_lol:append(51, L2),
?assertEqual(51, ra_lol:len(L3)),

ok.


-endif.
14 changes: 7 additions & 7 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
@@ -331,12 +331,11 @@ validate_reads_for_overlapped_writes(Config) ->
Log4 = write_and_roll(350, 500, 2, Log3),
Log5 = write_n(500, 551, 2, Log4),
% Log6 = deliver_all_log_events(Log5, 200),
% ct:pal("LAST ~p", [ra_log:last_written(Log6)]),
Log6 = deliver_log_events_cond(
Log5, fun (L) ->
{W, _} = ra_log:last_written(L),
W >= 550
end, 100),
Log5, fun (L) ->
{W, _} = ra_log:last_written(L),
W >= 550
end, 100),

Log7 = validate_fold(1, 199, 1, Log6),
Log8 = validate_fold(200, 550, 2, Log7),
@@ -345,6 +344,9 @@ validate_reads_for_overlapped_writes(Config) ->
read_segment := M2}} = ra_counters:overview(),
?assertEqual(550, M1 + M2),
ra_log:close(Log8),
%% re open to test init with overlapping segments
Log = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME)}),
ra_log:close(Log),
ok.

read_opt(Config) ->
@@ -394,7 +396,6 @@ sparse_read_out_of_range_2(Config) ->
%% but only process events for 9
Log1 = deliver_all_log_events(write_n(10, 20, 2,
write_and_roll(1, 10, 2, Log0)), 50),
ct:pal("log1 ~p", [ra_log:overview(Log1)]),
SnapIdx = 10,
%% do snapshot in
{Log2, _} = ra_log:update_release_cursor(SnapIdx, #{}, 2,
@@ -409,7 +410,6 @@ sparse_read_out_of_range_2(Config) ->
end,
Log4 = deliver_all_log_events(Log3, 100),

ct:pal("log ~p", [ra_log:overview(Log4)]),
{SnapIdx, 2} = ra_log:snapshot_index_term(Log4),

?assertMatch({[{11, _, _}], _},
6 changes: 2 additions & 4 deletions test/ra_log_segment_SUITE.erl
Original file line number Diff line number Diff line change
@@ -92,8 +92,6 @@ corrupted_segment(Config) ->
% write_trunc_until_full(Fn),

{ok, SegR} = ra_log_segment:open(Fn, #{mode => read}),
%% ct:pal("Range ~p", [ra_log_segment:segref(SegR)]),
%% ct:pal("SegR ~p", [SegR]),
[{1, 2, Data}] =
ra_log_segment:fold(SegR, 1, 1,
fun ra_lib:id/1,
@@ -205,7 +203,7 @@ segref(Config) ->
{ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 128}),
undefined = ra_log_segment:segref(Seg0),
{ok, Seg1} = ra_log_segment:append(Seg0, 1, 2, <<"Adsf">>),
{1, 1, "seg1.seg"} = ra_log_segment:segref(Seg1),
{{1, 1}, "seg1.seg"} = ra_log_segment:segref(Seg1),
ok.


@@ -330,7 +328,7 @@ overwrite(Config) ->
Fn = filename:join(Dir, "seg1.seg"),
Data = make_data(1024),
{ok, Seg0} = ra_log_segment:open(Fn),
{ok, Seg1} = ra_log_segment:append(Seg0, 5, 2, Data),
{ok, Seg1} = ra_log_segment:append(Seg0, 5, 1, Data),
% overwrite - simulates follower receiving entries from new leader
{ok, Seg2} = ra_log_segment:append(Seg1, 2, 2, Data),
{2, 2} = ra_log_segment:range(Seg2),
54 changes: 27 additions & 27 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ accept_mem_tables(Config) ->
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
make_wal(Config, "w1.wal")),
receive
{ra_log_event, {segments, TidRanges, [{1, 3, SegFile}]}} ->
{ra_log_event, {segments, TidRanges, [{{1, 3}, SegFile}]}} ->
SegmentFile = filename:join(?config(server_dir, Config), SegFile),
{ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}),
% assert Entries have been fully transferred
@@ -139,7 +139,7 @@ accept_mem_tables_append(Config) ->
make_wal(Config, "w2.wal")),
AllEntries = Entries ++ Entries2,
receive
{ra_log_event, {segments, [{Tid, {4, 5}}], [{1, 5, Fn}]}} ->
{ra_log_event, {segments, [{Tid, {4, 5}}], [{{1, 5}, Fn}]}} ->
SegmentFile = filename:join(?config(server_dir, Config), Fn),
{ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}),
% assert Entries have been fully transferred
@@ -164,10 +164,10 @@ accept_mem_tables_overwrite(Config) ->
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
make_wal(Config, "w1.wal")),
receive
{ra_log_event, {segments, [{Tid, {3, 5}}], [{3, 5, Fn}]}} ->
{ra_log_event, {segments, [{Tid, {3, 5}}], [{{3, 5}, Fn}]}} ->
SegmentFile = filename:join(?config(server_dir, Config), Fn),
{ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}),
?assertMatch({3, 5, _}, ra_log_segment:segref(Seg)),
?assertMatch({{3, 5}, _}, ra_log_segment:segref(Seg)),
ra_log_segment:close(Seg),
ok
after 3000 ->
@@ -181,10 +181,10 @@ accept_mem_tables_overwrite(Config) ->
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges2,
make_wal(Config, "w2.wal")),
receive
{ra_log_event, {segments, [{Tid2, {1, 3}}], [{1, 3, Fn2}]}} ->
{ra_log_event, {segments, [{Tid2, {1, 3}}], [{{1, 3}, Fn2}]}} ->
SegmentFile2 = filename:join(?config(server_dir, Config), Fn2),
{ok, Seg2} = ra_log_segment:open(SegmentFile2, #{mode => read}),
?assertMatch({1, 3, _}, ra_log_segment:segref(Seg2)),
?assertMatch({{1, 3}, _}, ra_log_segment:segref(Seg2)),
C2 = term_to_binary(c2),
[{1, 43, _}, {2, 43, _}] = read_sparse(Seg2, [1, 2]),
[{3, 43, C2}] = read_sparse(Seg2, [3]),
@@ -217,10 +217,10 @@ accept_mem_tables_overwrite_same_wal(Config) ->
make_wal(Config, "w2.wal")),
receive
{ra_log_event,
{segments, [{Tid2, {4, 6}}, {Tid, {2, 5}}], [{2, 6, Fn}]}} ->
{segments, [{Tid2, {4, 6}}, {Tid, {2, 5}}], [{{2, 6}, Fn}]}} ->
SegmentFile = filename:join(?config(server_dir, Config), Fn),
{ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}),
?assertMatch({2, 6, _}, ra_log_segment:segref(Seg)),
?assertMatch({{2, 6}, _}, ra_log_segment:segref(Seg)),
[{2, 42, _},
{3, 42, _},
{4, 43, _},
@@ -253,7 +253,7 @@ accept_mem_tables_multi_segment(Config) ->
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
make_wal(Config, "w.wal")),
receive
{ra_log_event, {segments, TidRanges, [{9, 10, _Seg2}, {1, 8, _Seg1}]}} ->
{ra_log_event, {segments, TidRanges, [{{9, 10}, _Seg2}, {{1, 8}, _Seg1}]}} ->
ok
after 3000 ->
flush(),
@@ -281,7 +281,7 @@ accept_mem_tables_multi_segment_overwrite(Config) ->
make_wal(Config, "w.wal")),
LastFile =
receive
{ra_log_event, {segments, TidRanges, [{9, 10, Seg2}, {1, 8, _Seg1}]}} ->
{ra_log_event, {segments, TidRanges, [{{9, 10}, Seg2}, {{1, 8}, _Seg1}]}} ->
Seg2
% ok
after 3000 ->
@@ -298,7 +298,7 @@ accept_mem_tables_multi_segment_overwrite(Config) ->
make_wal(Config, "w2.wal")),
receive
{ra_log_event, {segments, TidRanges2,
[{13, 15, _}, {7, 12, LastFile}]}} ->
[{{13, 15}, _}, {{7, 12}, LastFile}]}} ->
ok
after 3000 ->
flush(),
@@ -336,7 +336,7 @@ accept_mem_tables_for_down_server(Config) ->
ok = file:write_file(WalFile, <<"waldata">>),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile),
receive
{ra_log_event, {segments, [{Tid2, {1, 3}}], [{1, 3, Fn}]}} ->
{ra_log_event, {segments, [{Tid2, {1, 3}}], [{{1, 3}, Fn}]}} ->
SegmentFile = filename:join(?config(server_dir, Config), Fn),
{ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}),
% assert Entries have been fully transferred
@@ -395,7 +395,7 @@ accept_mem_tables_with_deleted_server(Config) ->
WalFile = make_wal(Config, "00001.wal"),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile),
receive
{ra_log_event, {segments, [{Tid2, {1, 3}}], [{1, 3, Fn}]}} ->
{ra_log_event, {segments, [{Tid2, {1, 3}}], [{{1, 3}, Fn}]}} ->
SegmentFile = filename:join(?config(server_dir, Config), Fn),
{ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}),
% assert Entries have been fully transferred
@@ -446,7 +446,7 @@ accept_mem_tables_with_corrupt_segment(Config) ->
file:write_file(filename:join(?config(server_dir, Config), "0000001.segment"), <<>>),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile),
receive
{ra_log_event, {segments, TidRanges, [{1, 3, SegFile}]}} ->
{ra_log_event, {segments, TidRanges, [{{1, 3}, SegFile}]}} ->
SegmentFile = filename:join(?config(server_dir, Config), SegFile),
{ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}),
% assert Entries have been fully transferred
@@ -486,10 +486,10 @@ accept_mem_tables_multiple_ranges(Config)->
receive
{ra_log_event, {segments, _TidRanges, SegRefs}} ->
?assertMatch([
{49, 64, _},
{33, 48, _},
{17, 32, _},
{1, 16, _}
{{49, 64}, _},
{{33, 48}, _},
{{17, 32}, _},
{{1, 16}, _}
], SegRefs),
ok
after 3000 ->
@@ -546,14 +546,14 @@ truncate_segments(Config) ->
WalFile = make_wal(Config, "0000001.wal"),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile),
receive
{ra_log_event, {segments, TidRanges, [{25, 32, S} = Cur | Rem]}} ->
{ra_log_event, {segments, TidRanges, [{{25, 32}, S} = Cur | Rem]}} ->
% test a lower index _does not_ delete the file
SegmentFile = filename:join(?config(server_dir, Config), S),
?assert(filelib:is_file(SegmentFile)),
ok = ra_log_segment_writer:truncate_segments(TblWriterPid,
UId, Cur),
ra_log_segment_writer:await(?SEGWR),
[{_, _, S1}, {_, _, S2}] = Rem,
[{_, S1}, {_, S2}] = Rem,
SegmentFile1 = filename:join(?config(server_dir, Config), S1),
?assertNot(filelib:is_file(SegmentFile1)),
SegmentFile2 = filename:join(?config(server_dir, Config), S2),
@@ -587,7 +587,7 @@ truncate_segments_with_pending_update(Config) ->
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges2,
make_wal(Config, "w2.erl")),
receive
{ra_log_event, {segments, _Tid, [{25, 32, S} = Cur | Rem]}} ->
{ra_log_event, {segments, _Tid, [{{25, 32}, S} = Cur | Rem]}} ->
% this is the event from the first call to accept_mem_tables,
% the Cur segments has been appended to since so should _not_
% be deleted when it is provided as the cutoff segref for
@@ -598,7 +598,7 @@ truncate_segments_with_pending_update(Config) ->
UId, Cur),
ra_log_segment_writer:await(?SEGWR),
?assert(filelib:is_file(SegmentFile)),
[{_, _, S1}, {_, _, S2}] = Rem,
[{_, S1}, {_, S2}] = Rem,
SegmentFile1 = filename:join(?config(server_dir, Config), S1),
?assertNot(filelib:is_file(SegmentFile1)),
SegmentFile2 = filename:join(?config(server_dir, Config), S2),
@@ -633,7 +633,7 @@ truncate_segments_with_pending_overwrite(Config) ->
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges2,
make_wal(Config, "w2.wal")),
receive
{ra_log_event, {segments, _Tid, [{25, 32, S} = Cur | Rem]}} ->
{ra_log_event, {segments, _Tid, [{{25, 32}, S} = Cur | Rem]}} ->
% test a lower index _does not_ delete the file
SegmentFile = filename:join(?config(server_dir, Config), S),
?assert(filelib:is_file(SegmentFile)),
@@ -642,7 +642,7 @@ truncate_segments_with_pending_overwrite(Config) ->
_ = ra_log_segment_writer:await(?SEGWR),
SegmentFile = filename:join(?config(server_dir, Config), S),
?assert(filelib:is_file(SegmentFile)),
[{_, _, S1}, {_, _, S2}] = Rem,
[{_, S1}, {_, S2}] = Rem,
SegmentFile1 = filename:join(?config(server_dir, Config), S1),
?assertNot(filelib:is_file(SegmentFile1)),
SegmentFile2 = filename:join(?config(server_dir, Config), S2),
@@ -654,7 +654,7 @@ truncate_segments_with_pending_overwrite(Config) ->
throw(ra_log_event_timeout)
end,
receive
{ra_log_event, {segments, _, [{16, 25, F} = Cur2, {12, 15, F2}]}} ->
{ra_log_event, {segments, _, [{{16, 25}, F} = Cur2, {{12, 15}, F2}]}} ->
?assertMatch([_, _], segments_for(UId, Dir)),
ok = ra_log_segment_writer:truncate_segments(TblWriterPid,
UId, Cur2),
@@ -689,7 +689,7 @@ my_segments(Config) ->
WalFile = make_wal(Config, "00001.wal"),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile),
receive
{ra_log_event, {segments, TidRanges, [{1, 3, Fn}]}} ->
{ra_log_event, {segments, TidRanges, [{{1, 3}, Fn}]}} ->
SegmentFile = filename:join(?config(server_dir, Config), Fn),
[MyFile] = ra_log_segment_writer:my_segments(?SEGWR,UId),
?assertEqual(SegmentFile, unicode:characters_to_binary(MyFile)),
@@ -721,7 +721,7 @@ skip_entries_lower_than_snapshot_index(Config) ->
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
make_wal(Config, "w1.wal")),
receive
{ra_log_event, {segments, _Tid, [{4, 5, Fn}]}} ->
{ra_log_event, {segments, _Tid, [{{4, 5}, Fn}]}} ->
SegmentFile = filename:join(?config(server_dir, Config), Fn),
{ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}),
% assert only entries with a higher index than the snapshot