Skip to content

Commit

Permalink
feat: include fetch_add and fetch_sub in nif
Browse files Browse the repository at this point in the history
  • Loading branch information
heywhy committed Jun 20, 2024
1 parent 9ad7089 commit ee3c42a
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 29 deletions.
83 changes: 74 additions & 9 deletions c_src/atomic_int.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include <atomic>
#include <cstdio>
#include <erl_nif.h>

using std::atomic_int64_t;
Expand All @@ -12,7 +11,7 @@ struct atomic_int {

static ErlNifResourceType *ATOMICS_RESOURCE_TYPE;

static ERL_NIF_TERM make_atom(ErlNifEnv *env, const char *value);
ERL_NIF_TERM make_atom(ErlNifEnv *env, const char *value);

ERL_NIF_TERM init(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
ErlNifSInt64 value;
Expand Down Expand Up @@ -48,6 +47,24 @@ ERL_NIF_TERM add(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
return enif_make_badarg(env);
}

ERL_NIF_TERM sub(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
ErlNifUInt64 count;
atomic_int **variable;

if (!enif_get_uint64(env, argv[1], &count)) {
return enif_make_badarg(env);
}

if (enif_get_resource(env, argv[0], ATOMICS_RESOURCE_TYPE,
(void **)&variable)) {
(*variable)->value -= count;

return make_atom(env, "ok");
}

return enif_make_badarg(env);
}

ERL_NIF_TERM add_get(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
ErlNifUInt64 count;
atomic_int **variable;
Expand Down Expand Up @@ -76,14 +93,56 @@ ERL_NIF_TERM sub_get(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {

if (enif_get_resource(env, argv[0], ATOMICS_RESOURCE_TYPE,
(void **)&variable)) {
(*variable)->value += count;
(*variable)->value -= count;

return enif_make_int64(env, (*variable)->value);
}

return enif_make_badarg(env);
}

ERL_NIF_TERM fetch_add(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
ErlNifUInt64 count;
atomic_int **variable;

if (!enif_get_uint64(env, argv[1], &count)) {
return enif_make_badarg(env);
}

if (enif_get_resource(env, argv[0], ATOMICS_RESOURCE_TYPE,
(void **)&variable)) {

int64_t value = (*variable)->value;

(*variable)->value += count;

return enif_make_int64(env, value);
}

return enif_make_badarg(env);
}

ERL_NIF_TERM fetch_sub(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
ErlNifUInt64 count;
atomic_int **variable;

if (!enif_get_uint64(env, argv[1], &count)) {
return enif_make_badarg(env);
}

if (enif_get_resource(env, argv[0], ATOMICS_RESOURCE_TYPE,
(void **)&variable)) {

int64_t value = (*variable)->value;

(*variable)->value -= count;

return enif_make_int64(env, value);
}

return enif_make_badarg(env);
}

ERL_NIF_TERM put(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
atomic_int **variable;
ErlNifUInt64 value;
Expand Down Expand Up @@ -115,25 +174,31 @@ ERL_NIF_TERM get(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
return enif_make_badarg(env);
}

ErlNifFunc nif_funcs[] = {{"init", 1, init}, {"get", 1, get},
{"add", 2, add}, {"put", 2, put},
{"add_get", 2, add_get}, {"sub_get", 2, sub_get}};
ErlNifFunc nif_funcs[] = {{"init", 1, init},
{"get", 1, get},
{"put", 2, put},
{"add", 2, add},
{"sub", 2, sub},
{"add_get", 2, add_get},
{"sub_get", 2, sub_get},
{"fetch_add", 2, fetch_add},
{"fetch_sub", 2, fetch_sub}};

void desctructor(ErlNifEnv *env, void *ptr) {
atomic_int **resource = reinterpret_cast<atomic_int **>(ptr);

delete *resource;
}

static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) {
int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) {
ATOMICS_RESOURCE_TYPE = enif_open_resource_type(
env, NULL, "atomics.ref", desctructor, ERL_NIF_RT_CREATE, NULL);

return 0;
}

static int upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data,
ERL_NIF_TERM load_info) {
int upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data,
ERL_NIF_TERM load_info) {
return 0;
}

Expand Down
16 changes: 9 additions & 7 deletions lib/elasticlunr/atomic_int.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ defmodule Elasticlunr.AtomicInt do
@spec new(integer()) :: t()
def new(value \\ 0), do: init(value)

@spec fetch_add(t(), non_neg_integer()) :: integer()
def fetch_add(ref, incr) do
ref
|> get()
|> tap(fn _ -> add(ref, incr) end)
end

# nif methods
def init(_value), do: :erlang.nif_error(:not_loaded)

Expand All @@ -35,9 +28,18 @@ defmodule Elasticlunr.AtomicInt do
@spec add(t(), non_neg_integer()) :: :ok
def add(_ref, _incr), do: :erlang.nif_error(:not_loaded)

@spec sub(t(), non_neg_integer()) :: :ok
def sub(_ref, _incr), do: :erlang.nif_error(:not_loaded)

@spec add_get(t(), non_neg_integer()) :: integer()
def add_get(_ref, _incr), do: :erlang.nif_error(:not_loaded)

@spec sub_get(t(), non_neg_integer()) :: integer()
def sub_get(_ref, _incr), do: :erlang.nif_error(:not_loaded)

@spec fetch_add(t(), non_neg_integer()) :: integer()
def fetch_add(_ref, _incr), do: :erlang.nif_error(:not_loaded)

@spec fetch_sub(t(), non_neg_integer()) :: integer()
def fetch_sub(_ref, _incr), do: :erlang.nif_error(:not_loaded)
end
20 changes: 7 additions & 13 deletions lib/elasticlunr/index/writer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ defmodule Elasticlunr.Index.Writer do
find_log_files() >>>
recover_from_logs() >>>
reuse_last_log() >>>
patch_writer() >>>
bind(patch_writer) >>>
bind(remove_obsolete_files)
end

@spec remove_obsolete_files(t()) :: t()
def remove_obsolete_files(%__MODULE__{dir: dir, manifest: manifest} = writer) do
known_files = Manifest.known_files(manifest)

keep? = fn path ->
keep? = fn path, manifest ->
case Filename.parse(path) do
{:current, _number} -> true
{:log, number} -> number >= manifest.log_number
Expand All @@ -61,34 +61,28 @@ defmodule Elasticlunr.Index.Writer do
end
end

files_to_delete =
:ok =
dir
|> Fs.db_files()
|> Enum.reduce([], fn path, acc ->
case keep?.(path) do
false -> [path] ++ acc
true -> acc
end
end)

Enum.each(files_to_delete, &File.rm/1)
|> Enum.each(&unless keep?.(&1, manifest), do: File.rm(&1))

writer
end

defp patch_writer(%{wal: wal, manifest: manifest, mem_table: mem_table, writer: writer}) do
{:ok, %{writer | manifest: manifest, mem_table: mem_table, wal: wal}}
%{writer | manifest: manifest, mem_table: mem_table, wal: wal}
end

defp reuse_last_log(
%{log_files: log_files, compactions: compactions, dir: dir, manifest: manifest} = params
)
when log_files == [] or compactions >= 1 do
number = Manifest.new_file_number(manifest)
wal = Wal.create(dir, number)
changes = Changes.set_log_number(%Changes{}, number)

with {:ok, manifest} <- Manifest.apply_and_log(manifest, changes) do
wal = Wal.create(dir, number)

params
|> Map.put(:wal, wal)
|> Map.put(:manifest, manifest)
Expand Down

0 comments on commit ee3c42a

Please sign in to comment.