Skip to content

Commit

Permalink
Merge pull request #10 from konrads/gh-9-pre-check-riak-ensemble
Browse files Browse the repository at this point in the history
pre-check to ensure ensembles aren't created, also introduced backoff
  • Loading branch information
loucash committed Mar 19, 2015
2 parents 891c7a6 + 46c17bf commit 0678daa
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 17 deletions.
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
{deps, [
{reltool_util, ".*", {git, "https://github.com/okeuday/reltool_util.git", {tag, "v1.4.0"}}},
{riak_core, ".*", {git, "git://github.com/Regulators/riak_core", {tag,"2.0.1jb3"}}},
{rafter, ".*", {git, "https://github.com/andrewjstone/rafter.git", "bef9c9b6c5d8cad14685b2f3045dd83d9915e513"}}
{rafter, ".*", {git, "https://github.com/andrewjstone/rafter.git", "bef9c9b6c5d8cad14685b2f3045dd83d9915e513"}},
{backoff, ".*", {git, "https://github.com/ferd/backoff.git", "e66e80a874dd7c7dfdd658c8ee2c2fc2231ad333"}}
]}.

{cover_enabled, true}.
Expand Down
40 changes: 26 additions & 14 deletions src/riak_governor_ensemble_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

%% API
-export([ring_changed/0]).
-export([ensure_ensembles/0]).
-export([start_link/0]).

%% gen_server callbacks
Expand All @@ -15,17 +16,22 @@

-record(state, {
ensemble_size :: pos_integer(),
ringhash :: binary()
ringhash :: binary(),
retry_backoff :: backoff:backoff()
}).

-define(RETRY_DELAY, 1000).
-define(RETRY_DELAY_MIN, 100).
-define(RETRY_DELAY_MAX, 10000).

%%%===================================================================
%%% API
%%%===================================================================
ring_changed() ->
gen_server:cast(?MODULE, ring_changed).

ensure_ensembles() ->
gen_server:cast(?MODULE, ensure_ensembles).

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

Expand All @@ -37,9 +43,12 @@ init([]) ->
EnsembleSize = riak_governor_util:get_ensemble_size(),
RingHash = get_ring_hash(),
_ = ets:new(?MODULE, [named_table, public]),
timer:apply_after(500, gen_server, cast, [?MODULE, init_ensembles]),
timer:apply_after(?RETRY_DELAY_MIN, gen_server, cast, [?MODULE, ensure_ensembles]),
RetryDelayMin = application:get_env(riak_governor, ensemble_creation_retry_delay_min, ?RETRY_DELAY_MIN),
RetryDelayMax = application:get_env(riak_governor, ensemble_creation_retry_delay_max, ?RETRY_DELAY_MAX),
{ok, #state{ensemble_size=EnsembleSize,
ringhash=RingHash}}.
ringhash=RingHash,
retry_backoff=backoff:init(RetryDelayMin, RetryDelayMax)}}.

handle_call(_Request, _From, State) ->
Reply = ok,
Expand All @@ -50,12 +59,12 @@ handle_cast(ring_changed, #state{ringhash=RingHash}=State) ->
RingHash ->
{noreply, State};
NewRingHash ->
try_start_ensembles(State),
{noreply, State#state{ringhash=NewRingHash}}
State2 = try_start_ensembles(State),
{noreply, State2#state{ringhash=NewRingHash}}
end;
handle_cast(init_ensembles, State) ->
try_start_ensembles(State),
{noreply, State}.
handle_cast(ensure_ensembles, State) ->
State2 = try_start_ensembles(State),
{noreply, State2}.

handle_info(_Msg, State) ->
{noreply, State}.
Expand All @@ -73,14 +82,17 @@ get_ring_hash() ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
crypto:hash(md5, term_to_binary(Ring)).

try_start_ensembles(State) ->
try_start_ensembles(#state{retry_backoff=Backoff}=State) ->
case ensure_ensembles_started(State) of
ok -> ignore;
ok ->
{_, Backoff2} = backoff:succeed(Backoff),
State#state{retry_backoff=Backoff2};
{error, Error} ->
% schedule another ensemble start attempt
lager:debug("Failed to create ensembles due to: ~p, will retry in ~bms", [Error, ?RETRY_DELAY]),
RetryDelay = application:get_env(riak_governor, ensemble_creation_retry_delay, ?RETRY_DELAY),
timer:apply_after(RetryDelay, ?MODULE, ring_changed, [])
{RetryDelay, Backoff2} = backoff:fail(Backoff),
lager:debug("Failed to create ensembles due to: ~p, will retry in ~bms", [Error, RetryDelay]),
timer:apply_after(RetryDelay, ?MODULE, ensure_ensembles, []),
State#state{retry_backoff=Backoff2}
end.

ensure_ensembles_started(#state{ensemble_size=EnsembleSize}) ->
Expand Down
13 changes: 11 additions & 2 deletions src/riak_governor_spi_riak_ensemble.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,14 @@ get_leader(Id) ->
riak_ensemble_manager:get_leader(Id).

start_ensemble(Name,Peers) ->
riak_ensemble_manager:create_ensemble(Name, {Name,node()}, Peers, ?ENSEMBLE_BACKEND, []).

% since ensemble creation might return failure/timeout and YET actually succeed,
% hence check ensemble isn't created already before recreating
case riak_ensemble_manager:known_ensembles() of
{ok, Known} ->
case proplists:is_defined(Name, Known) of
true -> ok;
false -> riak_ensemble_manager:create_ensemble(Name, {Name,node()}, Peers, ?ENSEMBLE_BACKEND, [])
end;
Other ->
Other
end.

0 comments on commit 0678daa

Please sign in to comment.