|
64 | 64 | -define(WAL_RESEND_TIMEOUT, 5000).
|
65 | 65 |
|
66 | 66 | -type ra_meta_key() :: atom().
|
67 |
| --type segment_ref() :: {From :: ra_index(), To :: ra_index(), |
68 |
| - File :: file:filename_all()}. |
| 67 | +-type segment_ref() :: {ra_range:range(), File :: file:filename_all()}. |
69 | 68 | -type event_body() :: {written, ra_term(), ra:range()} |
|
70 | 69 | {segments, [{ets:tid(), ra:range()}], [segment_ref()]} |
|
71 | 70 | {resend_write, ra_index()} |
|
@@ -279,7 +278,7 @@ init(#{uid := UId,
|
279 | 278 | LastSegRefIdx = case SegRefs of
|
280 | 279 | [] ->
|
281 | 280 | -1;
|
282 |
| - [{_, L, _} | _] -> |
| 281 | + [{{_, L}, _} | _] -> |
283 | 282 | L
|
284 | 283 | end,
|
285 | 284 | LastWrittenIdx = case ra_log_wal:last_writer_seq(Wal, UId) of
|
@@ -969,8 +968,14 @@ should_snapshot(snapshot, Idx,
|
969 | 968 | % We should take a snapshot if the new snapshot index would allow us
|
970 | 969 | % to discard any segments or if the we've handled enough commands
|
971 | 970 | % since the last snapshot.
|
972 |
| - CanFreeSegments = lists:any(fun({_, To, _}) -> To =< Idx end, |
973 |
| - ra_log_reader:segment_refs(Reader)), |
| 971 | + CanFreeSegments = case ra_log_reader:range(Reader) of |
| 972 | + undefined -> |
| 973 | + false; |
| 974 | + {Start, _End} -> |
| 975 | + %% this isn't 100% guaranteed to free a segment |
| 976 | + %% but there is a good chance |
| 977 | + Idx > Start |
| 978 | + end, |
974 | 979 | CanFreeSegments orelse Idx > SnapLimit;
|
975 | 980 | should_snapshot(checkpoint, Idx,
|
976 | 981 | #?MODULE{cfg = #cfg{min_checkpoint_interval = CheckpointInter},
|
@@ -1029,7 +1034,8 @@ overview(#?MODULE{last_index = LastIndex,
|
1029 | 1034 | last_term => LastTerm,
|
1030 | 1035 | first_index => FirstIndex,
|
1031 | 1036 | last_written_index_term => LWIT,
|
1032 |
| - num_segments => length(ra_log_reader:segment_refs(Reader)), |
| 1037 | + num_segments => ra_log_reader:segment_ref_count(Reader), |
| 1038 | + segments_range => ra_log_reader:range(Reader), |
1033 | 1039 | open_segments => ra_log_reader:num_open_segments(Reader),
|
1034 | 1040 | snapshot_index => case CurrSnap of
|
1035 | 1041 | undefined -> undefined;
|
@@ -1166,9 +1172,9 @@ delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId,
|
1166 | 1172 | ok = ra_log_segment_writer:truncate_segments(SegWriter,
|
1167 | 1173 | UId, Pivot)
|
1168 | 1174 | end),
|
1169 |
| - Active = ra_log_reader:segment_refs(Reader), |
| 1175 | + NumActive = ra_log_reader:segment_ref_count(Reader), |
1170 | 1176 | ?DEBUG("~ts: ~b obsolete segments at ~b - remaining: ~b, pivot ~0p",
|
1171 |
| - [LogId, length(Obsolete), SnapIdx, length(Active), Pivot]), |
| 1177 | + [LogId, length(Obsolete), SnapIdx, NumActive, Pivot]), |
1172 | 1178 | State = State0#?MODULE{reader = Reader},
|
1173 | 1179 | {State, log_update_effects(Readers, Pid, State)}
|
1174 | 1180 | end.
|
@@ -1331,7 +1337,7 @@ recover_ranges(UId, MtRange, SegWriter) ->
|
1331 | 1337 | [SegRef | Acc]
|
1332 | 1338 | end
|
1333 | 1339 | end, [], SegFiles),
|
1334 |
| - SegRanges = [{F, L} || {F, L, _} <- SegRefs], |
| 1340 | + SegRanges = [Range || {Range, _} <- SegRefs], |
1335 | 1341 | Ranges = [MtRange | SegRanges],
|
1336 | 1342 | {pick_range(Ranges, undefined), SegRefs}.
|
1337 | 1343 |
|
|
0 commit comments