Skip to content

Commit

Permalink
feat: run compactions through the controller
Browse files Browse the repository at this point in the history
  • Loading branch information
heywhy committed Jun 19, 2024
1 parent 1ad6bb6 commit 2b49769
Show file tree
Hide file tree
Showing 13 changed files with 324 additions and 118 deletions.
4 changes: 2 additions & 2 deletions lib/elasticlunr/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ defmodule Elasticlunr.Application do

use Application

alias Elasticlunr.CompactionController
alias Elasticlunr.Compaction.Controller
alias Elasticlunr.PubSub

@impl true
def start(_type, _args) do
children = [
PubSub,
FlakeIdWorker,
CompactionController,
Controller,
{Registry, name: Elasticlunr.Fs, keys: :unique},
{Registry, name: Elasticlunr.IndexRegistry, keys: :unique},
{Task.Supervisor, name: Elasticlunr.BackgroundTaskSupervisor}
Expand Down
20 changes: 17 additions & 3 deletions lib/elasticlunr/compaction.ex
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
defmodule Elasticlunr.Compaction do
alias Elasticlunr.FileMeta
alias Elasticlunr.Manifest.Changes
alias Elasticlunr.Options

@enforce_keys [:level]
defstruct [:level, inputs: [], parent_inputs: [], changes: %Changes{}]
@enforce_keys [:level, :new_file_number, :options]
defstruct [
:dir,
:level,
:options,
:owner,
:new_file_number,
inputs: [],
parent_inputs: [],
changes: %Changes{}
]

@type t :: %__MODULE__{
owner: nil | pid(),
dir: nil | Path.t(),
level: non_neg_integer(),
options: Options.t(),
changes: Changes.t(),
inputs: [FileMeta.t()],
parent_inputs: [FileMeta.t()]
parent_inputs: [FileMeta.t()],
new_file_number: (-> pos_integer())
}
end
97 changes: 97 additions & 0 deletions lib/elasticlunr/compaction/controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
defmodule Elasticlunr.Compaction.Controller do
use GenServer

alias Elasticlunr.BackgroundTaskSupervisor
alias Elasticlunr.Compaction
alias Elasticlunr.Manifest.Changes
alias Elasticlunr.SSTable

require Logger

defstruct [:task, count: 0, compactions: []]

@spec process(Compaction.t()) :: :ok
def process(%Compaction{} = compaction) do
GenServer.call(__MODULE__, {:process, compaction})
end

@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__, hibernate_after: 5_000)
end

@impl true
def init([]), do: {:ok, %__MODULE__{}}

@impl true
def handle_call(
{:process, compaction},
_from,
%__MODULE__{compactions: compactions, count: count} = state
) do
compactions = [{count, compaction} | compactions]

%{state | compactions: compactions, count: count + 1}
|> maybe_start_compaction()
|> then(&{:reply, :ok, &1})
end

@impl true
def handle_info(
{ref, {tag, changes}},
%__MODULE__{task: %Task{ref: ref}, compactions: compactions} = state
) do
Process.demonitor(ref, [:flush])

{[{_tag, compaction}], compactions} = Enum.split_with(compactions, &match?({^tag, _}, &1))

# override the changes with what was derived from the merge task
compaction = %{compaction | changes: changes}

# TODO: log error
with true <- Process.alive?(compaction.owner),
:ok <- GenServer.call(compaction.owner, {:apply_compaction_changes, compaction}) do
%{state | task: nil, compactions: compactions}
|> maybe_start_compaction()
|> then(&{:noreply, &1})
else
false ->
Logger.warning(
"process for the compaction task is dead #{compaction.dir} @ #{compaction.level}"
)

%{state | task: nil, compactions: compactions}
|> maybe_start_compaction()
|> then(&{:noreply, &1})
end
end

# TODO: compaction with a single file move to the parent level should
# be processed first followed by inputs with least total file size
defp maybe_start_compaction(%{task: nil, compactions: [{tag, compaction} | _]} = state) do
%{
dir: dir,
level: level,
options: options,
new_file_number: new_file_number,
changes: changes
} = compaction

opts = [max_file_size: options.max_file_size]
inputs = Enum.concat(compaction.inputs, compaction.parent_inputs)

task =
Task.Supervisor.async_nolink(BackgroundTaskSupervisor, fn ->
with {:ok, files} <- SSTable.merge(inputs, dir, new_file_number, opts) do
changes
|> Changes.add_files(level + 1, files)
|> Changes.delete_files(inputs)
|> then(&{tag, &1})
end
end)

%{state | task: task}
end

defp maybe_start_compaction(state), do: state
end
37 changes: 0 additions & 37 deletions lib/elasticlunr/compaction_controller.ex

This file was deleted.

9 changes: 5 additions & 4 deletions lib/elasticlunr/index/writer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ defmodule Elasticlunr.Index.Writer do
find_log_files() >>>
recover_from_logs() >>>
reuse_last_log() >>>
remove_obsolete_files() >>>
patch_writer()
patch_writer() >>>
bind(remove_obsolete_files)
end

defp remove_obsolete_files(%{dir: dir, manifest: manifest} = state) do
@spec remove_obsolete_files(t()) :: t()
def remove_obsolete_files(%__MODULE__{dir: dir, manifest: manifest} = writer) do
known_files = Manifest.known_files(manifest)

keep? = fn path ->
Expand All @@ -72,7 +73,7 @@ defmodule Elasticlunr.Index.Writer do

Enum.each(files_to_delete, &File.rm/1)

{:ok, state}
writer
end

defp patch_writer(%{wal: wal, manifest: manifest, mem_table: mem_table, writer: writer}) do
Expand Down
108 changes: 67 additions & 41 deletions lib/elasticlunr/manifest.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ defmodule Elasticlunr.Manifest do

use Rop

@fields ~w[fd l0_compaction_trigger max_bytes_for_base_level max_bytes_for_level_multiplier max_level number next_file_number]a
@enforce_keys [:fd, :options]
@fields ~w[fd options number next_file_number]a

@enforce_keys @fields
defstruct @fields ++
Expand All @@ -20,14 +21,11 @@ defmodule Elasticlunr.Manifest do

@type t :: %__MODULE__{
fd: File.io_device(),
options: Options.t(),
number: non_neg_integer(),
max_level: non_neg_integer(),
log_number: non_neg_integer(),
next_file_number: AtomicInt.t(),
l0_compaction_trigger: pos_integer(),
compaction_score: {float(), integer()},
max_bytes_for_base_level: pos_integer(),
max_bytes_for_level_multiplier: pos_integer(),
files: %{non_neg_integer() => [FileMeta.t()]}
}

Expand All @@ -39,12 +37,9 @@ defmodule Elasticlunr.Manifest do

attrs = %{
number: number,
options: options,
fd: File.open!(path, @opts),
max_level: options.max_level,
next_file_number: AtomicInt.new(0),
l0_compaction_trigger: options.l0_compaction_trigger,
max_bytes_for_base_level: options.max_bytes_for_base_level,
max_bytes_for_level_multiplier: options.max_bytes_for_level_multiplier
next_file_number: AtomicInt.new(0)
}

struct!(__MODULE__, attrs)
Expand Down Expand Up @@ -88,12 +83,14 @@ defmodule Elasticlunr.Manifest do
@spec needs_compaction?(t()) :: boolean()
def needs_compaction?(%__MODULE__{compaction_score: {score, _level}}), do: score >= 1

@spec pick_compaction(t()) :: {:ok, Compaction.t()} | {:error, term()}
def pick_compaction(%__MODULE__{
compaction_score: {_score, level},
files: files,
max_level: max_level
})
@spec pick_compaction(t()) :: {:ok, Compaction.t(), t()} | {:error, term()}
def pick_compaction(
%__MODULE__{
files: files,
options: options,
compaction_score: {_score, level}
} = manifest
)
when level >= 0 do
file_meta =
files
Expand All @@ -103,16 +100,33 @@ defmodule Elasticlunr.Manifest do
params = %{
files: files,
level: level,
max_level: max_level,
compaction: %Compaction{level: level, inputs: [file_meta]}
max_level: options.max_level,
compaction: %Compaction{
level: level,
options: options,
inputs: [file_meta],
new_file_number: new_file_number_fn(manifest)
}
}

ensure_level_below_max_level(params) >>>
maybe_include_level0_overlapping_files() >>>
include_boundary_files() >>>
include_overlapping_files_in_parent() >>>
include_boundary_files_in_parent() >>>
bind((fn %{compaction: c} -> c end).())
bind((fn %{compaction: c} -> c end).()) >>>
remove_compaction_inputs(manifest)
end

defp remove_compaction_inputs(compaction, %{files: files} = manifest) do
file_nums =
compaction.inputs
|> Enum.concat(compaction.parent_inputs)
|> Enum.map(& &1.number)

updated_files = remove_deleted_files(files, file_nums)

{:ok, compaction, %{manifest | files: updated_files}}
end

defp ensure_level_below_max_level(%{level: level, max_level: max_level} = params) do
Expand Down Expand Up @@ -308,18 +322,18 @@ defmodule Elasticlunr.Manifest do

defp compute_compaction_score(%{manifest: manifest} = params) do
score_fn = fn
files, 0 = level ->
files, 0 = level, options ->
files
|> level_files(level)
|> Enum.count()
|> Kernel./(manifest.l0_compaction_trigger)
|> Kernel./(options.l0_compaction_trigger)

files, level ->
files, level, options ->
max_bytes_for_level =
level_max_bytes(
level,
manifest.max_bytes_for_base_level,
manifest.max_bytes_for_level_multiplier
options.max_bytes_for_base_level,
options.max_bytes_for_level_multiplier
)

files
Expand All @@ -328,16 +342,16 @@ defmodule Elasticlunr.Manifest do
|> Kernel./(max_bytes_for_level)
end

{_, best_score, best_level} =
{_, _, best_score, best_level} =
Enum.reduce(
0..manifest.max_level,
{manifest.files, -1, -1},
fn level, {files, best_score, _best_level} = acc ->
score = score_fn.(files, level)
0..manifest.options.max_level,
{manifest.files, manifest.options, -1, -1},
fn level, {files, options, best_score, _best_level} = acc ->
score = score_fn.(files, level, options)

case score > best_score do
false -> acc
true -> {files, score, level}
true -> {files, options, score, level}
end
end
)
Expand All @@ -349,19 +363,31 @@ defmodule Elasticlunr.Manifest do

defp level_max_bytes(level, max_size, multiplier), do: max_size * multiplier ** level

defp merge_files(%{changes: changes, manifest: manifest} = params) do
%__MODULE__{files: files} = manifest
%Changes{new_files: new_files} = changes
defp merge_files(%{changes: changes, manifest: %__MODULE__{files: files} = manifest} = params) do
%Changes{delete_files: delete_files, new_files: new_files} = changes

files =
Enum.reduce(new_files, files, fn {level, file}, files ->
files
|> level_files(level)
|> then(&([file] ++ &1))
|> then(&Map.put(files, level, &1))
end)
files
|> remove_deleted_files(delete_files)
|> add_new_files(new_files)
|> then(&%{manifest | files: &1})
|> then(&{:ok, %{params | manifest: &1}})
end

{:ok, %{params | manifest: %{manifest | files: files}}}
defp remove_deleted_files(files, files_to_delete) do
Enum.reduce(files, %{}, fn {level, files}, result ->
files
|> Enum.reject(&(&1.number in files_to_delete))
|> then(&Map.put(result, level, &1))
end)
end

defp add_new_files(files, new_files) do
Enum.reduce(new_files, files, fn {level, file}, files ->
files
|> level_files(level)
|> then(&[file | &1])
|> then(&Map.put(files, level, &1))
end)
end

defp log_changes(%{changes: changes, manifest: manifest}) do
Expand Down
Loading

0 comments on commit 2b49769

Please sign in to comment.