Skip to content

Commit

Permalink
Async refreshing slots mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
clanchun committed May 13, 2019
1 parent c57c8b0 commit 664987c
Showing 1 changed file with 44 additions and 21 deletions.
65 changes: 44 additions & 21 deletions src/eredis_cluster_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ connect(InitServers) ->
gen_server:call(?MODULE,{connect,InitServers}).

refresh_mapping(Version) ->
gen_server:call(?MODULE,{reload_slots_map,Version}).
gen_server:cast(?MODULE,{reload_slots_map,Version}).

%% =============================================================================
%% @doc Given a slot return the link (Redis instance) to the mapped
Expand Down Expand Up @@ -65,6 +65,9 @@ get_all_pools() ->
%% =============================================================================
-spec get_pool_by_slot(Slot::integer(), State::#state{}) ->
{PoolName::atom() | undefined, Version::integer()}.
get_pool_by_slot(_Slot, #state{slots = {}, version = Version}) ->
{undefined, Version};

get_pool_by_slot(Slot, State) ->
Index = element(Slot+1,State#state.slots),
Cluster = element(Index,State#state.slots_maps),
Expand All @@ -86,7 +89,13 @@ reload_slots_map(State) ->
[close_connection(SlotsMap)
|| SlotsMap <- tuple_to_list(State#state.slots_maps)],

ClusterSlots = get_cluster_slots(State#state.init_nodes),
ClusterSlots =
case get_cluster_slots(State#state.init_nodes) of
{error, cannot_connect_to_cluster} ->
[];
CSlots ->
CSlots
end,

SlotsMaps = parse_cluster_slots(ClusterSlots),
ConnectedSlotsMaps = connect_all_slots(SlotsMaps),
Expand All @@ -104,25 +113,30 @@ reload_slots_map(State) ->

-spec get_cluster_slots([#node{}]) -> [[bitstring() | [bitstring()]]].
get_cluster_slots([]) ->
throw({error,cannot_connect_to_cluster});
{error, cannot_connect_to_cluster};
get_cluster_slots([Node|T]) ->
case safe_eredis_start_link(Node#node.address, Node#node.port) of
{ok,Connection} ->
case eredis:q(Connection, ["CLUSTER", "SLOTS"]) of
{error,<<"ERR unknown command 'CLUSTER'">>} ->
get_cluster_slots_from_single_node(Node);
{error,<<"ERR This instance has cluster support disabled">>} ->
get_cluster_slots_from_single_node(Node);
{ok, ClusterInfo} ->
eredis:stop(Connection),
ClusterInfo;
try
case safe_eredis_start_link(Node#node.address, Node#node.port) of
{ok,Connection} ->
case eredis:q(Connection, ["CLUSTER", "SLOTS"]) of
{error,<<"ERR unknown command 'CLUSTER'">>} ->
get_cluster_slots_from_single_node(Node);
{error,<<"ERR This instance has cluster support disabled">>} ->
get_cluster_slots_from_single_node(Node);
{ok, ClusterInfo} ->
eredis:stop(Connection),
ClusterInfo;
_ ->
eredis:stop(Connection),
get_cluster_slots(T)
end;
_ ->
eredis:stop(Connection),
get_cluster_slots(T)
end;
_ ->
end
catch
_: _ ->
get_cluster_slots(T)
end.
end.

-spec get_cluster_slots_from_single_node(#node{}) ->
[[bitstring() | [bitstring()]]].
Expand Down Expand Up @@ -217,20 +231,29 @@ connect_(InitNodes) ->
init(_Args) ->
ets:new(?MODULE, [protected, set, named_table, {read_concurrency, true}]),
InitNodes = application:get_env(eredis_cluster, init_nodes, []),
erlang:send_after(0, self(), refresh_mapping),
{ok, connect_(InitNodes)}.

handle_call({reload_slots_map,Version}, _From, #state{version=Version} = State) ->
{reply, ok, reload_slots_map(State)};
handle_call({reload_slots_map,_}, _From, State) ->
{reply, ok, State};
handle_call({connect, InitServers}, _From, _State) ->
{reply, ok, connect_(InitServers)};
handle_call(_Request, _From, State) ->
{reply, ignored, State}.

handle_cast({reload_slots_map,Version}, #state{version=Version} = State) ->
{noreply, reload_slots_map(State)};
handle_cast({reload_slots_map,_}, State) ->
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(refresh_mapping, #state{slots = {}} = State) ->
erlang:send_after(2000, self(), refresh_mapping),
{noreply, reload_slots_map(State)};

handle_info(refresh_mapping, State) ->
erlang:send_after(2000, self(), refresh_mapping),
{noreply, State};

handle_info(_Info, State) ->
{noreply, State}.

Expand Down

0 comments on commit 664987c

Please sign in to comment.