Skip to content

Commit

Permalink
Add atomic field update
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrien Moreau committed Jan 14, 2016
1 parent 3fde747 commit 3d4b3e0
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 24 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.2
0.5.3
2 changes: 1 addition & 1 deletion include/eredis_cluster.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
}).

-define(REDIS_CLUSTER_HASH_SLOTS, 16384).
-define(OPTIMISTIC_LOCK_TRANSACTION_TTL, 16).
-define(OL_TRANSACTION_TTL, 16).
-define(REDIS_CLUSTER_REQUEST_TTL, 16).
-define(REDIS_RETRY_DELAY, 100).

Expand Down
2 changes: 1 addition & 1 deletion src/eredis_cluster.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, eredis_cluster, [
{description, "Eredis Cluster"},
{vsn, "0.5.2"},
{vsn, "0.5.3"},
{modules, []},
{registered, []},
{applications, [
Expand Down
60 changes: 39 additions & 21 deletions src/eredis_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@

% API.
-export([start/0, stop/0, connect/1]). % Application Management.
-export([q/1, qp/1, qw/2, transaction/1, transaction/2]). % Generic redis call
-export([flushdb/0]). % Specific redis command implementation
-export([update_key/2]). % Helper functions
-export([optimistic_lock_transaction/3]).

% Generic redis call
-export([q/1, qp/1, qw/2, transaction/1, transaction/2]).

% Specific redis command implementation
-export([flushdb/0]).

% Helper functions
-export([update_key/2]).
-export([update_hash_field/3]).
-export([optimistic_locking_transaction/3]).

-include("eredis_cluster.hrl").

Expand Down Expand Up @@ -60,18 +67,14 @@ transaction(Commands) ->
-spec transaction(fun((Worker::pid()) -> redis_result()), anystring()) -> any().
transaction(Transaction, PoolKey) ->
Slot = get_key_slot(PoolKey),
query(Transaction, Slot, 0).
transaction(Transaction, Slot, undefined, 0).

transaction_retry(Transaction, PoolKey) ->
Slot = get_key_slot(PoolKey),
transaction_retry(Transaction, Slot, ?OPTIMISTIC_LOCK_TRANSACTION_TTL).

transaction_retry(_, _, 0) ->
{error, undefined};
transaction_retry(Transaction, Slot, Counter) ->
transaction(Transaction, Slot, _, 0) ->
query(Transaction, Slot, 0);
transaction(Transaction, Slot, ExpectedValue, Counter) ->
case query(Transaction, Slot, 0) of
{ok, undefined} ->
transaction_retry(Transaction, Slot, Counter - 1);
ExpectedValue ->
transaction(Transaction, Slot, ExpectedValue, Counter - 1);
Payload ->
Payload
end.
Expand Down Expand Up @@ -129,8 +132,8 @@ throttle_retries(0) -> ok;
throttle_retries(_) -> timer:sleep(?REDIS_RETRY_DELAY).

%% =============================================================================
%% @doc Update a key value in redis using a function passed as an argument.
%% The update is made in a transaction.
%% @doc Update the value of a key by applying the function passed in the
%% argument. The operation is done atomically
%% @end
%% =============================================================================
-spec update_key(Key::anystring(), UpdateFunction::fun((any()) -> any())) ->
Expand All @@ -140,17 +143,32 @@ update_key(Key, UpdateFunction) ->
{ok, Var} = GetResult,
[["SET", Key, UpdateFunction(Var)]]
end,
optimistic_lock_transaction(Key, ["GET", Key], UpdateFunction2).
optimistic_locking_transaction(Key, ["GET", Key], UpdateFunction2).

%% =============================================================================
%% @doc Update the value of a field stored in a hash by applying the function
%% passed in the argument. The operation is done atomically
%% @end
%% =============================================================================
-spec update_hash_field(Key::anystring(), Field::anystring(),
UpdateFunction::fun((any()) -> any())) -> redis_transaction_result().
update_hash_field(Key, Field, UpdateFunction) ->
UpdateFunction2 = fun(GetResult) ->
{ok, Var} = GetResult,
[["HSET", Key, Field, UpdateFunction(Var)]]
end,
optimistic_locking_transaction(Key, ["HGET", Key, Field], UpdateFunction2).

%% =============================================================================
%% @doc Optimistic lock transaction helper, based on Redis documentation :
%% @doc Optimistic locking transaction helper, based on Redis documentation :
%% http://redis.io/topics/transactions
%% @end
%% =============================================================================
-spec optimistic_lock_transaction(Key::anystring(), redis_command(),
-spec optimistic_locking_transaction(Key::anystring(), redis_command(),
UpdateFunction::fun((redis_result()) -> redis_pipeline_command())) ->
redis_transaction_result().
optimistic_lock_transaction(WatchedKey, GetCommand, UpdateFunction) ->
optimistic_locking_transaction(WatchedKey, GetCommand, UpdateFunction) ->
Slot = get_key_slot(WatchedKey),
Transaction = fun(Worker) ->
%% Watch given key
qw(Worker,["WATCH", WatchedKey]),
Expand All @@ -161,7 +179,7 @@ optimistic_lock_transaction(WatchedKey, GetCommand, UpdateFunction) ->
Result = qw(Worker, SetCommand),
lists:last(Result)
end,
transaction_retry(Transaction, WatchedKey).
transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL).

%% =============================================================================
%% @doc Perform a given query on all node of a redis cluster
Expand Down
10 changes: 10 additions & 0 deletions test/eredis_cluster_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ basic_test_() ->
?assertEqual({ok, <<"7">>}, eredis_cluster:q(["get", "hij"]))
end
}
,

{ "atomic hget hset",
fun () ->
eredis_cluster:q(["hset", "klm", "nop", 2]),
Incr = fun(Var) -> binary_to_integer(Var) + 1 end,
rpc:pmap({eredis_cluster, update_hash_field}, ["nop", Incr], lists:duplicate(5, "klm")),
?assertEqual({ok, <<"7">>}, eredis_cluster:q(["hget", "klm", "nop"]))
end
}

]
}
Expand Down

0 comments on commit 3d4b3e0

Please sign in to comment.