Skip to content

Commit

Permalink
Merge pull request #704 from basho/merge-develop-2.2-to-develop
Browse files Browse the repository at this point in the history
Merge develop-2.2 into develop
  • Loading branch information
JeetKunDoug authored Nov 15, 2016
2 parents 78996cd + 5dc0f26 commit 0268231
Show file tree
Hide file tree
Showing 30 changed files with 1,273 additions and 1,085 deletions.
254 changes: 49 additions & 205 deletions docs/BATCHING.md

Large diffs are not rendered by default.

926 changes: 481 additions & 445 deletions docs/yz-batching-overview.graffle

Large diffs are not rendered by default.

Binary file modified docs/yz-batching-overview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 4 additions & 1 deletion include/yokozuna.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@
%% take care of it.
-type component() :: search | index.

-type solr_entry() :: {bkey(), obj(), write_reason(), p(), short_preflist(),

-type object_pair() :: {obj(), obj() | no_old_object}.

-type solr_entry() :: {bkey(), object_pair(), write_reason(), p(), short_preflist(),
hash()}.
-type solr_entries() :: [solr_entry()].

Expand Down
5 changes: 1 addition & 4 deletions priv/yokozuna.schema
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,12 @@
%% - purge_index -> Removes all items associated with one random
%% erroring (references to fuses blown in the code) index in
%% order to get below the search.queue.high_watermark.
%% - purge_all -> Removes all items associated with all
%% erroring (references to fuses blown in the code) indices in
%% order to get below the search.queue.high_watermark.
%% - off -> purging is disabled
{mapping, "search.queue.high_watermark.purge_strategy",
"yokozuna.solrq_hwm_purge_strategy", [
{default, purge_one},
{commented, purge_one},
{datatype, {enum, [purge_one, purge_index, purge_all, off]}}
{datatype, {enum, [purge_one, purge_index, off]}}
]}.

%% @doc The amount of time to wait before a drain operation times out.
Expand Down
27 changes: 16 additions & 11 deletions riak_test/intercepts/yz_solrq_drain_fsm_intercepts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,24 @@
prepare_crash(start, State) ->
{stop, {error, something_bad_happened}, State}.

%% Put a 1 second sleep in front of resume_workers.
resume_workers_sleep_1s(Pid) ->
timer:sleep(1000),
?M:resume_workers_orig(Pid).

%% Put a 5 second sleep in front of prepare.
prepare_sleep_5s(start, State) ->
timer:sleep(5000),
%% restore the original prepare
prepare_orig(start, State) ->
?M:prepare_orig(start, State).

%% restore the original resume_workers
resume_workers_orig(Pid) ->
?M:resume_workers_orig(Pid).

%% Put a 5 second sleep in front of prepare.
prepare_sleep_1s(start, State) ->
timer:sleep(1000),
?M:prepare_orig(start, State).
%% Timeout on a cancel, full stop
cancel_timeout(_Pid, _CancelTimeout) ->
lager:log(info, self(), "Intercepting cancel/2 and returning timeout"),
timeout.


%% restore the original
prepare_orig(start, State) ->
?M:prepare_orig(start, State).
%% restore the original cancel
cancel_orig(Pid, CancelTimeout) ->
?M:cancel_orig(Pid, CancelTimeout).
5 changes: 4 additions & 1 deletion riak_test/yokozuna_essential.erl
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,10 @@ verify_correct_solrqs(Cluster) ->
?assertEqual(ok, rt:wait_until(Cluster, fun check_queues_match/1)).

check_queues_match(Node) ->
CurrentIndexes = rpc:call(Node, yz_index, get_indexes_from_meta, []),
%% Current Indexes includes ?YZ_INDEX_TOMBSTONE because we need to write the entries
%% for non-indexed data to the YZ AAE tree. Excluding them makes the solrq supervisor
%% constantly start and stop these queues.
CurrentIndexes = rpc:call(Node, yz_index, get_indexes_from_meta, []) ++ [?YZ_INDEX_TOMBSTONE],
OwnedPartitions = rt:partitions_for_node(Node),
ActiveQueues = rpc:call(Node, yz_solrq_sup, active_queues, []),
ExpectedQueueus = [{Index, Partition} || Index <- CurrentIndexes, Partition <- OwnedPartitions],
Expand Down
70 changes: 39 additions & 31 deletions riak_test/yz_aae_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,34 @@
-define(BUCKET_TYPE, <<"data">>).
-define(INDEX1, <<"fruit_aae">>).
-define(INDEX2, <<"fruitpie_aae">>).
-define(BUCKETWITHTYPE,
{?BUCKET_TYPE, ?INDEX2}).
-define(BUCKETWITHTYPE, {?BUCKET_TYPE, ?INDEX2}).
-define(BUCKET, ?INDEX1).
-define(REPAIR_MFA, {yz_exchange_fsm, repair, 2}).
-define(SPACER, "testfor spaces ").
-define(AAE_THROTTLE_LIMITS, [{-1, 0}, {10000, 10}]).
-define(CFG,
[{riak_core,
[
{ring_creation_size, 16},
{default_bucket_props, [{n_val, ?N}]},
{handoff_concurrency, 10},
{vnode_management_timer, 1000}
]},
{yokozuna,
[
{enabled, true},
{?SOLRQ_DRAIN_ENABLE, true},
{anti_entropy_tick, 1000},
%% allow AAE to build trees and exchange rapidly
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 8},
{aae_throttle_limits, ?AAE_THROTTLE_LIMITS}
]}
]).
-define(CFG, [
{riak_core, [
{ring_creation_size, 16},
{default_bucket_props, [{n_val, ?N}]},
{handoff_concurrency, 10},
{vnode_management_timer, 1000}
]},
{riak_kv, [
{force_hashtree_upgrade, true},
{anti_entropy_tick, 1000},
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 8}
]},
{yokozuna, [
{enabled, true},
{?SOLRQ_DRAIN_ENABLE, true},
{anti_entropy_tick, 1000},
%% allow AAE to build trees and exchange rapidly
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 8},
{aae_throttle_limits, ?AAE_THROTTLE_LIMITS}
]}
]).

confirm() ->
Cluster = rt:build_cluster(5, ?CFG),
Expand Down Expand Up @@ -101,15 +104,14 @@ aae_run(Cluster, Bucket, Index) ->

RepairCountBefore = get_cluster_repair_count(Cluster),
yz_rt:count_calls(Cluster, ?REPAIR_MFA),
NumKeys = [{Bucket, K} || K <- yz_rt:random_keys(?NUM_KEYS)],
NumKeysSpaces = [{Bucket, add_space_to_key(K)} ||
RandomBKeys = [{Bucket, K} || K <- yz_rt:random_keys(?NUM_KEYS)],
RandomBKeysWithSpaces = [{Bucket, add_space_to_key(K)} ||
K <- yz_rt:random_keys(?NUM_KEYS_SPACES)],
{DelNumKeys, _ChangeKeys} = lists:split(length(NumKeys) div 2,
NumKeys),
{DelNumKeysSpaces, _ChangeKeysSpaces} = lists:split(
length(NumKeysSpaces) div 2,
NumKeysSpaces),
AllDelKeys = DelNumKeys ++ DelNumKeysSpaces,
{RandomBKeysToDelete, _} = lists:split(length(RandomBKeys) div 2, RandomBKeys),
{RandomBKeysWithSpacesToDelete, _} = lists:split(
length(RandomBKeysWithSpaces) div 2,
RandomBKeysWithSpaces),
AllDelKeys = RandomBKeysToDelete ++ RandomBKeysWithSpacesToDelete,
lager:info("Deleting ~p keys", [length(AllDelKeys)]),
[delete_key_in_solr(Cluster, Index, K) || K <- AllDelKeys],
lager:info("Verify Solr indexes missing"),
Expand Down Expand Up @@ -174,7 +176,7 @@ create_orphan_postings(Cluster, Index, Bucket, Keys) ->
Keys2 = [{Bucket, ?INT_TO_BIN(K)} || K <- Keys],
lager:info("Create orphan postings with keys ~p", [Keys]),
ObjNodePs = [create_obj_node_partition_tuple(Cluster, Key) || Key <- Keys2],
[ok = rpc:call(Node, yz_kv, index, [Obj, put, P])
[ok = rpc:call(Node, yz_kv, index, [{Obj, no_old_object}, put, P])
|| {Obj, Node, P} <- ObjNodePs],
yz_rt:commit(Cluster, Index),
ok.
Expand Down Expand Up @@ -330,7 +332,8 @@ verify_count_and_repair_after_error_value(Cluster, {BType, _Bucket}, Index,
%% 1. write KV data to non-indexed bucket
Conn = yz_rt:select_random(PBConns),
lager:info("write 1 bad search field to bucket ~p", [Bucket]),
Obj = riakc_obj:new(Bucket, <<"akey_bad_data">>, <<"{\"date_register\":3333}">>,
Key = <<"akey_bad_data">>,
Obj = riakc_obj:new(Bucket, Key, <<"{\"date_register\":3333}">>,
"application/json"),

ok = riakc_pb_socket:put(Conn, Obj),
Expand All @@ -349,6 +352,11 @@ verify_count_and_repair_after_error_value(Cluster, {BType, _Bucket}, Index,
%% 5. verify count after expiration
verify_exchange_after_expire(Cluster, Index),

%% 6. Because it's possible we'll try to repair this key again
%% after clearing trees, delete it from KV
ok = riakc_pb_socket:delete(Conn, Bucket, Key),
yz_rt:commit(Cluster, Index),

ok;
verify_count_and_repair_after_error_value(_Cluster, _Bucket, _Index, _PBConns) ->
ok.
Expand Down
18 changes: 11 additions & 7 deletions riak_test/yz_rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
-type search_type() :: solr | yokozuna.
-type cluster() :: cluster().

-type intercept() :: {{TargetFunctionName::atom(), TargetArity::non_neg_integer()}, InterceptFunctionName::atom()}.
-type intercepts() :: [intercept()].

-export_type([prop/0, props/0, cluster/0]).

%% @doc Get {Host, Port} from `Cluster'.
Expand Down Expand Up @@ -552,6 +555,7 @@ remove_index(Node, BucketType) ->
ok = rpc:call(Node, riak_core_bucket_type, update, [BucketType, Props]).

really_remove_index(Cluster, {BucketType, Bucket}, Index, PBConn) ->
lager:info("Removing index ~p", [Index]),
Node = hd(Cluster),
F = fun(_) ->
Props = [{?YZ_INDEX, ?YZ_INDEX_TOMBSTONE}],
Expand Down Expand Up @@ -965,17 +969,17 @@ check_fuse_status(Node, Partition, Indices, FuseCheckFunction) ->

-spec intercept_index_batch(node() | cluster(), module()) -> ok | [ok].
intercept_index_batch(Cluster, Intercept) ->
add_intercept(
add_intercepts(
Cluster,
yz_solr, index_batch, 2, Intercept).
yz_solr, [{{index_batch, 2}, Intercept}]).

-spec add_intercept(node() | cluster(), module(), atom(), non_neg_integer(), module()) -> ok | [ok].
add_intercept(Cluster, Module, Function, Arity, Intercept) when is_list(Cluster) ->
[add_intercept(Node, Module, Function, Arity, Intercept) || Node <- Cluster];
add_intercept(Node, Module, Function, Arity, Intercept) ->
-spec add_intercepts(node() | cluster(), module(), intercepts()) -> ok | [ok].
add_intercepts(Cluster, Module, Intercepts) when is_list(Cluster) ->
[add_intercepts(Node, Module, Intercepts) || Node <- Cluster];
add_intercepts(Node, Module, Intercepts) ->
rt_intercept:add(
Node,
{Module, [{{Function, Arity}, Intercept}]}).
{Module, Intercepts}).

-spec set_yz_aae_mode(node() | cluster(), automatic | manual) -> ok | [ok].
set_yz_aae_mode(Cluster, Mode) when is_list(Cluster) ->
Expand Down
31 changes: 22 additions & 9 deletions riak_test/yz_solrq_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,18 @@ confirm() ->
pass.

confirm_drain_fsm_failure(Cluster) ->
lager:info("Starting confirm_drain_fsm_failure"),
yz_stat:reset(),
try
yz_rt:load_intercept_code(Cluster),
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_crash),
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{prepare, 2}, prepare_crash}]),
%% drain solrqs and wait until the drain failure stats are touched
yz_rt:drain_solrqs(Cluster),
yz_rt:wait_until(Cluster, fun check_drain_failure_stats/1),

lager:info("confirm_drain_fsm_failure ok")
after
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_orig)
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{prepare, 2}, prepare_orig}])
end.

check_drain_failure_stats(Node) ->
Expand All @@ -138,19 +139,20 @@ check_drain_failure_stats(Node) ->
yz_rt:check_stat_values(Stats, Pairs).

confirm_drain_fsm_timeout(Cluster) ->
lager:info("Starting confirm_drain_fsm_timeout"),
yz_stat:reset(),
[rpc:call(
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_TIMEOUT, 500])
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_TIMEOUT, 250])
|| Node <- Cluster],
try
yz_rt:load_intercept_code(Cluster),
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_sleep_1s),
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{resume_workers, 1}, resume_workers_sleep_1s}]),
yz_rt:drain_solrqs(Cluster),
yz_rt:wait_until(Cluster, fun check_drain_timeout_stats/1),

lager:info("confirm_drain_fsm_timeout ok")
after
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_orig),
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{resume_workers, 1}, resume_workers_orig}]),
[rpc:call(
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_TIMEOUT, 60000])
|| Node <- Cluster]
Expand All @@ -170,26 +172,31 @@ check_drain_timeout_stats(Node) ->
yz_rt:check_stat_values(Stats, Pairs).

confirm_drain_fsm_kill(Cluster) ->
lager:info("Starting confirm_drain_fsm_kill"),
[rpc:call(
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_TIMEOUT, 10])
|| Node <- Cluster],
%% technically not needed for this test (because the cancel intercept will
%% just return timeout), but added for completeness
[rpc:call(
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_CANCEL_TIMEOUT, 10])
|| Node <- Cluster],
try
yz_test_listener:start(),
yz_rt:load_intercept_code(Cluster),
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_sleep_5s),
yz_rt:add_intercept(Cluster, yz_solrq_drain_mgr, unlink_and_kill, 2, count_unlink_and_kill),
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{resume_workers, 1}, resume_workers_sleep_1s},
{{cancel, 2}, cancel_timeout}]),
yz_rt:add_intercepts(Cluster, yz_solrq_drain_mgr, [{{unlink_and_kill, 2}, count_unlink_and_kill}]),
yz_rt:drain_solrqs(Cluster),
yz_rt:wait_until(Cluster, fun check_drain_cancel_timeout_stats/1),

?assertEqual(1, length(yz_test_listener:messages())),

lager:info("confirm_drain_fsm_kill ok")
after
yz_rt:add_intercept(Cluster, yz_solrq_drain_fsm, prepare, 2, prepare_orig),
yz_rt:add_intercept(Cluster, yz_solrq_drain_mgr, unlink_and_kill, 2, unlink_and_kill_orig),
yz_rt:add_intercepts(Cluster, yz_solrq_drain_fsm, [{{resume_workers, 1}, resume_workers_orig},
{{cancel, 2}, cancel_orig}]),
yz_rt:add_intercepts(Cluster, yz_solrq_drain_mgr, [{{unlink_and_kill, 2}, unlink_and_kill_orig}]),
yz_test_listener:stop(),
[rpc:call(
Node, application, set_env, [?YZ_APP_NAME, ?SOLRQ_DRAIN_TIMEOUT, 60000])
Expand All @@ -214,6 +221,7 @@ check_drain_cancel_timeout_stats(Node) ->


confirm_batch_size(Cluster, PBConn, BKey, Index) ->
lager:info("Starting confirm_batch_size"),
%% First, put one less than the min batch size and expect that there are no
%% search results (because the index operations are queued).
Count = ?SOLRQ_BATCH_MIN_SETTING - 1,
Expand Down Expand Up @@ -246,6 +254,7 @@ confirm_batch_size(Cluster, PBConn, BKey, Index) ->
ok.

confirm_hwm(Cluster, PBConn, Bucket, Index, HWM) ->
lager:info("Starting confirm_hwm"),
yz_rt:drain_solrqs(Cluster),
{OldMin, OldMax, OldDelay} = set_index(Cluster, Index, 1, 100, 100),
try
Expand All @@ -267,6 +276,7 @@ confirm_hwm(Cluster, PBConn, Bucket, Index, HWM) ->
gteq(A, B) -> A >= B.

confirm_draining(Cluster, PBConn, Bucket, Index) ->
lager:info("Starting confirm_draining"),
Count = ?SOLRQ_BATCH_MIN_SETTING - 1,
Count = put_objects(PBConn, Bucket, Count),
yz_rt:commit(Cluster, Index),
Expand All @@ -278,6 +288,7 @@ confirm_draining(Cluster, PBConn, Bucket, Index) ->
ok.

confirm_requeue_undelivered([Node|_] = Cluster, PBConn, BKey, Index) ->
lager:info("Starting confirm_requeue_undelivered"),
yz_rt:load_intercept_code(Node),
yz_rt:intercept_index_batch(Node, index_batch_returns_other_error),

Expand All @@ -300,6 +311,7 @@ confirm_requeue_undelivered([Node|_] = Cluster, PBConn, BKey, Index) ->
ok.

confirm_no_contenttype_data(Cluster, PBConn, BKey, Index) ->
lager:info("Starting confirm_no_contenttype_data"),
yz_rt:set_index(Cluster, Index, 1, 100, 100),
Count = 1,
Count = put_no_contenttype_objects(PBConn, BKey, Count),
Expand All @@ -309,6 +321,7 @@ confirm_no_contenttype_data(Cluster, PBConn, BKey, Index) ->
ok.

confirm_purge_strategy(Cluster, PBConn) ->
lager:info("Starting confirm_purge_strategy"),
confirm_purge_one_strategy(Cluster, PBConn,
{?BUCKET5, ?INDEX5}),
confirm_purge_idx_strategy(Cluster, PBConn,
Expand Down
Loading

0 comments on commit 0268231

Please sign in to comment.