Skip to content

Commit

Permalink
fix failover and slowdown connection repeat on errors, add back trans…
Browse files Browse the repository at this point in the history
…action
  • Loading branch information
Adrien Moreau committed Oct 10, 2015
1 parent 2a9cce9 commit 845b5d9
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 44 deletions.
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
The MIT License (MIT)

Copyright (c) 2015 Adrien Moreau

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
eredis_cluster is a wrapper for eredis to support cluster mode of redis 3.0.0+
This project is under development

**Todo**
## TODO

- Enhance the way poolboy is used
- Fix/Add specs of functions
- Add dialyzer
- Add safeguard if keys of a pipeline command is not located in the same server
- Improve unit tests

## Compilation && Test

Expand All @@ -32,7 +35,7 @@ To configure the redis cluster, you can use an application variable (probably in
}

You don't need to specify all nodes of your configuration as eredis_cluster will
retrieve them through the command `CLUSTER NODES` at runtime.
retrieve them through the command `CLUSTER SLOTS` at runtime.

## Usage

Expand Down
2 changes: 1 addition & 1 deletion src/eredis_cluster.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, eredis_cluster, [
{description, "Eredis Cluster"},
{vsn, "0.2"},
{vsn, "0.3.0"},
{modules, []},
{registered, []},
{applications, [
Expand Down
50 changes: 32 additions & 18 deletions src/eredis_cluster.erl
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
-module(eredis_cluster).

-define(REDIS_CLUSTER_REQUEST_TTL,16).
-define(REDIS_CLUSTER_HASH_SLOTS,16384).

-export([connect/1]).
-export([q/1]).
-export([qp/1]).
-export([transaction/1]).
-export([has_same_key/1]).
-export([get_slots_map/0]).


connect(InitServers) ->
eredis_cluster_monitor:connect(InitServers).

qp(Commands) ->
q(Commands).
q(Command) ->
q(Command,0,false).
q(_,?REDIS_CLUSTER_REQUEST_TTL,_) ->
Expand All @@ -30,6 +27,12 @@ q(Command,Counter,TryRandomNode) ->
TryRandomNode =:= false ->
eredis_cluster_monitor:get_pool_by_slot(Slot);
true ->
if
Counter > 1 ->
timer:sleep(100);
true ->
ok
end,
eredis_cluster_monitor:get_random_pool()
end,

Expand Down Expand Up @@ -57,6 +60,14 @@ q(Command,Counter,TryRandomNode) ->
end
end.

qp(Commands) ->
q(Commands).

transaction(Commands) ->
Transaction = [["multi"]|Commands] ++ [["exec"]],
Result = qp(Transaction),
lists:nth(erlang:length(Result),Result).

query_eredis_pool(PoolName,[[X|Y]|Z]) when is_list(X) ->
query_eredis_pool(PoolName,[[X|Y]|Z],qp);
query_eredis_pool(PoolName, Command) ->
Expand Down Expand Up @@ -101,34 +112,37 @@ get_key_slot(Key) ->

%% =============================================================================
%% @doc Return the first key in the command arguments.
%% In a normal query, the second term will be returned
%%
%% Currently we just return the second argument
%% after the command name.
%% If it is a pipeline query we will use the second term of the first term, we
%% will assume that all keys are in the same server and the query can be
%% performed
%%
%% This is indeed the key for most commands, and when it is not true
%% the cluster redirection will point us to the right node anyway.
%% If the pipeline query starts with multi (transaction), we will look at the
%% second term of the second command
%%
%% For commands that don't make sense in the context of cluster
%% return value will be undefined.
%% @end
%% =============================================================================

-spec get_key_from_command([string()]) -> string() | undefined.
get_key_from_command([[X|Y]|_]) when is_list(X) ->
get_key_from_command([X|Y]);
get_key_from_command([[X|Y]|Z]) when is_list(X) ->
case string:to_lower(X) of
"multi" ->
get_key_from_command(Z);
_ ->
get_key_from_command([X|Y])
end;
get_key_from_command([Term1,Term2|_]) ->
case string:to_lower(Term1) of
"info" ->
undefined;
"multi" ->
undefined;
"exec" ->
undefined;
"slaveof" ->
"info" ->
undefined;
"config" ->
undefined;
"shutdown" ->
"shutdown" ->
undefined;
"slaveof" ->
undefined;
_ ->
Term2
Expand Down
42 changes: 22 additions & 20 deletions src/eredis_cluster_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ initialize_slots_cache() ->
get_random_pool() ->
gen_server:call(?MODULE,get_random_pool).

remove_pool(Connection) ->
gen_server:call(?MODULE,{remove_pool,Connection}).
remove_pool(Pool) ->
gen_server:call(?MODULE,{remove_pool,Pool}).

get_pool_by_slot(Slot) ->
gen_server:call(?MODULE,{get_pool_by_slot,Slot}).
Expand All @@ -79,27 +79,32 @@ get_slots_map() ->
get_pool_by_slot(State,Slot) ->
Index = lists:nth(Slot+1,State#state.slots),
Cluster = lists:nth(Index,State#state.slots_maps),
Cluster#slots_map.node#node.pool.
if
Cluster#slots_map.node =/= undefined ->
Cluster#slots_map.node#node.pool;
true ->
undefined
end.

get_slots_map(State) ->
State#state.slots.

remove_pool(State,Connection) ->
remove_pool(State,Pool) ->
SlotsMaps = State#state.slots_maps,
eredis_cluster_pools_sup:stop_eredis_pool(Connection),
NewSlotsMaps = [remove_node_by_connection(SlotsMap,Connection) || SlotsMap <- SlotsMaps],
eredis_cluster_pools_sup:stop_eredis_pool(Pool),
NewSlotsMaps = [remove_node_by_pool(SlotsMap,Pool) || SlotsMap <- SlotsMaps],
State#state{slots_maps=NewSlotsMaps}.

remove_node_by_connection(SlotsMap,Connection) ->
remove_node_by_pool(SlotsMap,Pool) ->
if
SlotsMap#slots_map.node#node.pool =:= Connection ->
SlotsMap#slots_map.node#node.pool =:= Pool ->
SlotsMap#slots_map{node=undefined};
true ->
SlotsMap
end.

%% =============================================================================
%% @doc Return a link to a random node, or raise an error if no node can be
%% @doc Return a link to a random pool, or raise an error if no pool can be
%% contacted. This function is only called when we can't reach the node
%% associated with a given hash slot, or when we don't know the right
%% mapping.
Expand All @@ -110,20 +115,17 @@ get_random_pool(State) ->
SlotsMaps = State#state.slots_maps,
NbSlotsRange = erlang:length(SlotsMaps),
Index = random:uniform(NbSlotsRange),
ArrangedList = lists_shift(SlotsMaps,Index),
find_connection(ArrangedList).

lists_shift(List,Index) ->
lists:sublist(List,Index) ++ lists:nthtail(Index,List).
ArrangedList = eredis_cluster_utils:lists_shift(SlotsMaps,Index),
find_pool(ArrangedList).

find_connection([]) ->
find_pool([]) ->
cluster_down;
find_connection([H|T]) ->
find_pool([H|T]) ->
if
H#slots_map.node =/= undefined ->
H#slots_map.node;
H#slots_map.node#node.pool;
true ->
find_connection(T)
find_pool(T)
end.

initialize_slots_cache(State) ->
Expand Down Expand Up @@ -266,8 +268,8 @@ handle_call(initialize_slots_cache, _From, State) ->
{reply, ok, initialize_slots_cache(State)};
handle_call(get_random_pool, _From, State) ->
{reply, get_random_pool(State), State};
handle_call({remove_pool, Connection}, _From, State) ->
{reply, ok, remove_pool(State,Connection)};
handle_call({remove_pool, Pool}, _From, State) ->
{reply, ok, remove_pool(State,Pool)};
handle_call({get_pool_by_slot, Slot}, _From, State) ->
{reply, get_pool_by_slot(State,Slot), State};
handle_call({connect, InitServers}, _From, _State) ->
Expand Down
6 changes: 6 additions & 0 deletions src/eredis_cluster_utils.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-module(eredis_cluster_utils).

-export([lists_shift/2]).

lists_shift(List,Index) ->
lists:sublist(List,Index) ++ lists:nthtail(Index,List).
18 changes: 16 additions & 2 deletions src/eredis_cluster_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,30 @@ start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).

init(Args) ->
%%process_flag(trap_exit, true),
Hostname = proplists:get_value(host, Args),
Port = proplists:get_value(port, Args),
PoolName = proplists:get_value(pool_name, Args),
eredis_cluster_pools_sup:register_worker_connection(PoolName),
{ok, Conn} = eredis:start_link(Hostname,Port),

process_flag(trap_exit, true),
Result = eredis:start_link(Hostname,Port),
process_flag(trap_exit, false),

Conn = case Result of
{ok,Connection} ->
Connection;
_ ->
undefined
end,

{ok, #state{conn=Conn}}.

handle_call({q, _}, _From, #state{conn=undefined}=State) ->
{reply, {error,no_connection}, State};
handle_call({q, Params}, _From, #state{conn=Conn}=State) ->
{reply, eredis:q(Conn,Params), State};
handle_call({qp, _}, _From, #state{conn=undefined}=State) ->
{reply, {error,no_connection}, State};
handle_call({qp, Params}, _From, #state{conn=Conn}=State) ->
{reply, eredis:qp(Conn,Params), State};
handle_call(_Request, _From, State) ->
Expand Down
7 changes: 7 additions & 0 deletions test/eredis_cluster_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ basic_test_() ->
?assertNotMatch([{ok, _},{ok, _},{ok, _}], eredis_cluster:qp([["SET", "a1", "aaa"], ["SET", "a2", "aaa"], ["SET", "a3", "aaa"]])),
?assertMatch([{ok, _},{ok, _},{ok, _}], eredis_cluster:qp([["LPUSH", "a", "aaa"], ["LPUSH", "a", "bbb"], ["LPUSH", "a", "ccc"]]))
end
},

{ "transaction",
fun () ->
?assertMatch({ok,[_,_,_]}, eredis_cluster:transaction([["get","abc"],["get","abcd"],["get","abcd1"]])),
?assertMatch({error,_}, eredis_cluster:transaction([["get","abc"],["get","abcde"],["get","abcd1"]]))
end
}

]
Expand Down

0 comments on commit 845b5d9

Please sign in to comment.