Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nhse o32 upstream.d31 #13

Merged
merged 4 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/leveled.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
-define(MAX_SSTSLOTS, 256).
-define(MAX_MERGEBELOW, 24).
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
-define(LOADING_BATCH, 200).
-define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20).
-define(LONG_RUNNING, 1000000).
Expand Down
2 changes: 2 additions & 0 deletions src/leveled_cdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,8 @@ delete_pending({call, From}, cdb_close, State) ->
State#state.filename,
State#state.waste_path),
{stop_and_reply, normal, [{reply, From, ok}]};
delete_pending({call, From}, Event, State) ->
handle_sync_event(Event, From, State);
delete_pending(cast, delete_confirmed, State=#state{delete_point=ManSQN}) ->
leveled_log:log(cdb04, [State#state.filename, ManSQN]),
close_pendingdelete(State#state.handle,
Expand Down
106 changes: 65 additions & 41 deletions src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -558,10 +558,13 @@ handle_call({fold,
Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)),
Folder =
fun() ->
fold_from_sequence(StartSQN,
{FilterFun, InitAccFun, FoldFun},
Acc,
Manifest)
fold_from_sequence(
StartSQN,
State#state.journal_sqn,
{FilterFun, InitAccFun, FoldFun},
Acc,
Manifest
)
end,
case By of
as_ink ->
Expand Down Expand Up @@ -1211,8 +1214,12 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->



-spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list())
-> any().
-spec fold_from_sequence(
non_neg_integer(),
pos_integer(),
{fun(), fun(), fun()},
any(),
list()) -> any().
%% @doc
%%
%% Scan from the starting sequence number to the end of the Journal. Apply
Expand All @@ -1226,62 +1233,79 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->
%% over in batches using foldfile_between_sequence/7. The batch is a range of
%% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted
%% files
fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) ->
fold_from_sequence(_MinSQN, _JournalSQN, _FoldFuns, Acc, []) ->
Acc;
fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
fold_from_sequence(MinSQN, JournalSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
{NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN),
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest);
fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
{NextMinSQN, Acc0} =
foldfile_between_sequence(
MinSQN,
MinSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
Acc,
Pid,
undefined,
FN
),
fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest);
fold_from_sequence(
MinSQN, JournalSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
% If this file has a LowSQN less than the minimum, we can skip it if the
% next file also has a LowSQN below the minimum
{NextMinSQN, Acc0} =
case Rest of
[] ->
foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN);
foldfile_between_sequence(
MinSQN,
MinSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
Acc,
Pid,
undefined,
FN
);
[{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN ->
foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN);
foldfile_between_sequence(
MinSQN,
MinSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
Acc,
Pid,
undefined,
FN
);
_ ->
{MinSQN, Acc}
end,
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest).
fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest).

foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
Acc, CDBpid, StartPos, FN) ->
foldfile_between_sequence(
MinSQN, MaxSQN, JournalSQN, FoldFuns, Acc, CDBpid, StartPos, FN) ->
{FilterFun, InitAccFun, FoldFun} = FoldFuns,
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)},

case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of
{eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} ->
{AccMinSQN, FoldFun(BatchAcc, Acc)};
{_LastPosition, {AccMinSQN, _AccMaxSQN, BatchAcc}}
when AccMinSQN >= JournalSQN ->
{AccMinSQN, FoldFun(BatchAcc, Acc)};
{LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
UpdAcc = FoldFun(BatchAcc, Acc),
NextSQN = MaxSQN + 1,
foldfile_between_sequence(NextSQN,
NextSQN + ?LOADING_BATCH,
FoldFuns,
UpdAcc,
CDBpid,
LastPosition,
FN)
foldfile_between_sequence(
NextSQN,
NextSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
UpdAcc,
CDBpid,
LastPosition,
FN
)
end.


Expand Down
2 changes: 1 addition & 1 deletion src/leveled_pclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ do_merge(

add_entry(empty, FileName, _TS1, Additions) ->
leveled_log:log(pc013, [FileName]),
{[], [], Additions};
{Additions, [], []};
add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
Entry =
Expand Down
147 changes: 147 additions & 0 deletions test/end_to_end/riak_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
fetchclocks_modifiedbetween/1,
crossbucket_aae/1,
handoff/1,
handoff_close/1,
handoff_withcompaction/1,
dollar_bucket_index/1,
dollar_key_index/1,
bigobject_memorycheck/1,
Expand All @@ -23,6 +25,8 @@ all() -> [
fetchclocks_modifiedbetween,
crossbucket_aae,
handoff,
handoff_close,
handoff_withcompaction,
dollar_bucket_index,
dollar_key_index,
bigobject_memorycheck,
Expand Down Expand Up @@ -1633,6 +1637,149 @@ dollar_key_index(_Config) ->
ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().

handoff_close(_Config) ->
RootPath = testutil:reset_filestructure(),
KeyCount = 500000,
Bucket = {<<"BType">>, <<"BName">>},
StartOpts1 =
[
{root_path, RootPath},
{max_journalobjectcount, KeyCount + 1},
{max_pencillercachesize, 12000},
{sync_strategy, testutil:sync_strategy()}
],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
ObjList1 =
testutil:generate_objects(
KeyCount div 10,
{fixed_binary, 1}, [],
leveled_rand:rand_bytes(512),
fun() -> [] end,
Bucket
),
ObjList2 =
testutil:generate_objects(
KeyCount - (KeyCount div 10),
{fixed_binary, KeyCount div 10 + 1}, [],
leveled_rand:rand_bytes(512),
fun() -> [] end,
Bucket
),
testutil:riakload(Bookie1, ObjList1),
FoldObjectsFun =
fun(_, _, _, Acc) ->
[os:timestamp()|Acc]
end,
{async, Runner} =
leveled_bookie:book_objectfold(
Bookie1,
?RIAK_TAG,
{FoldObjectsFun, []},
true,
sqn_order
),
testutil:riakload(Bookie1, ObjList2),
TSList = Runner(),
QueryCompletionTime = os:timestamp(),
LastTS = hd(TSList),
io:format(
"Found ~w objects with Last TS ~w completion time ~w~n",
[length(TSList), LastTS, QueryCompletionTime]
),
true = KeyCount div 10 == length(TSList),
TimeSinceLastObjectTouchedMS =
timer:now_diff(QueryCompletionTime, LastTS) div 1000,
true = TimeSinceLastObjectTouchedMS < 1000,
leveled_bookie:book_destroy(Bookie1),
testutil:reset_filestructure().


handoff_withcompaction(_Config) ->
RootPath = testutil:reset_filestructure(),
KeyCount = 100000,
Bucket = {<<"BType">>, <<"BName">>},
StartOpts1 =
[
{root_path, RootPath},
{max_journalobjectcount, KeyCount div 4},
{max_pencillercachesize, 12000},
{sync_strategy, testutil:sync_strategy()},
{max_run_length, 4}
],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
ObjList1 =
testutil:generate_objects(
KeyCount div 4,
{fixed_binary, 1}, [],
crypto:strong_rand_bytes(512),
fun() -> [] end,
Bucket
),
testutil:riakload(Bookie1, ObjList1),
ObjList2 =
testutil:generate_objects(
KeyCount div 4,
{fixed_binary, (KeyCount div 4) + 1}, [],
crypto:strong_rand_bytes(512),
fun() -> [] end,
Bucket
),
testutil:riakload(Bookie1, ObjList2),
ObjList3 =
testutil:generate_objects(
KeyCount div 4,
{fixed_binary, (KeyCount div 4) * 2 + 1}, [],
crypto:strong_rand_bytes(512),
fun() -> [] end,
Bucket
),
testutil:riakload(Bookie1, ObjList3),
ObjList4 =
testutil:generate_objects(
KeyCount div 4,
{fixed_binary, (KeyCount div 4) * 3 + 1}, [],
crypto:strong_rand_bytes(512),
fun() -> [] end,
Bucket
),
testutil:riakload(Bookie1, ObjList4),
% Now update some objects to prompt compaction
testutil:update_some_objects(Bookie1, ObjList1, KeyCount div 8),
testutil:update_some_objects(Bookie1, ObjList2, KeyCount div 8),
testutil:update_some_objects(Bookie1, ObjList3, KeyCount div 8),
testutil:update_some_objects(Bookie1, ObjList4, KeyCount div 8),

% Setup a handoff-style fold to snapshot journal
FoldObjectsFun =
fun(_, K, _, Acc) ->
[K|Acc]
end,
{async, Runner} =
leveled_bookie:book_objectfold(
Bookie1,
?RIAK_TAG,
{FoldObjectsFun, []},
true,
sqn_order
),

% Now compact the journal, twice to be sure
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
testutil:wait_for_compaction(Bookie1),
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
testutil:wait_for_compaction(Bookie1),

% Run the fold - some cdb files should now be delete_pending
{TC0, Results} = timer:tc(Runner),
io:format(
"Found ~w objects in ~w ms~n",
[length(Results), TC0 div 1000]
),
true = KeyCount == length(Results),
leveled_bookie:book_destroy(Bookie1),
testutil:reset_filestructure().


%% @doc test that the riak specific $bucket indexes can be iterated
%% using leveled's existing folders
dollar_bucket_index(_Config) ->
Expand Down
Loading