diff --git a/include/xdc_replicator.hrl b/include/xdc_replicator.hrl index f7f4a3db9f..64ab566263 100644 --- a/include/xdc_replicator.hrl +++ b/include/xdc_replicator.hrl @@ -10,6 +10,9 @@ %% License for the specific language governing permissions and limitations under %% the License. +-ifndef(_XDC_COMMON__HRL_). +-define(_XDC_COMMON__HRL_,). + %% couchdb headers -include("couch_db.hrl"). -include("couch_js_functions.hrl"). @@ -26,7 +29,9 @@ to_binary/1 ]). -%% constants used by XDCR +%% ------------------------------------%% +%% constants and macros used by XDCR %% +%% ------------------------------------%% -define(REP_ID_VERSION, 2). %% capture the last 10 entries of checkpoint history per bucket replicator -define(XDCR_CHECKPOINT_HISTORY, 10). @@ -34,8 +39,21 @@ -define(XDCR_ERROR_HISTORY, 10). %% interval (secs) to compute rate stats -define(XDCR_RATE_STAT_INTERVAL, 1). +%% constants used by XMEM +-define(XDCR_XMEM_CONNECTION_ATTEMPTS, 16). +-define(XDCR_XMEM_CONNECTION_TIMEOUT, 120000). %% timeout in ms +%% builder of error/warning/debug msgs +-define(format_msg(Msg, Args), lists:flatten(io_lib:format(Msg, Args))). + +%% by default we reply on remote memcached to do conflict resolution, +%% leave a swtich to if do local conflict resolution in case it is necessary +-define(XDCR_LOCAL_CONFLICT_RESOLUTION, false). + -%% data structures + +%% -------------------------%% +%% XDCR data structures %% +%% -------------------------%% %% replication settings used by bucket level and vbucket level replicators -record(rep, { @@ -134,6 +152,7 @@ %% bucket level replication state used by module xdc_replication -record(replication, { rep = #rep{}, % the basic replication settings + mode, % replication mode vbucket_sup, % the supervisor for vb replicators vbs = [], % list of vb we should be replicating num_tokens = 0, % number of available tokens used by throttles @@ -156,6 +175,12 @@ status = #rep_vb_status{}, %% time the vb replicator intialized rep_start_time, + + %% xmem server process + xmem_srv, + %% remote node + xmem_remote, + throttle, parent, source_name, @@ -234,6 +259,7 @@ target = #httpdb{}, %% target db changes_manager, %% process to queue changes from storage max_conns, %% max connections + xmem_server, %% XMem server process opt_rep_threshold %% optimistic replication threshold }). @@ -246,3 +272,56 @@ worker_item_checked = 0, worker_item_replicated = 0 }). + +%%-----------------------------------------%% +%% XDCR-MEMCACHED %% +%%-----------------------------------------%% +% statistics +-record(xdc_vb_rep_xmem_statistics, { + item_replicated = 0, + data_replicated = 0, + ckpt_issued = 0, + ckpt_failed = 0 + }). + +%% information needed talk to remote memcached +-record(xdc_rep_xmem_remote, { + ip, %% inet:ip_address(), + port, %% inet:port_number(), + bucket = "default", + username = "_admin", + password = "_admin", + options = [] + }). + +%% xmem server state +-record(xdc_vb_rep_xmem_srv_state, { + vb, + parent_vb_rep, + num_workers, + pid_workers, + statistics = #xdc_vb_rep_xmem_statistics{}, + remote = #xdc_rep_xmem_remote{}, + seed, + enable_pipeline = false, + error_reports + }). + +%% xmem worker state +-record(xdc_vb_rep_xmem_worker_state, { + id, + vb, + parent_server_pid, + status, + statistics = #xdc_vb_rep_xmem_statistics{}, + socket, %% inet:socket(), + time_connected, + time_init, + options, + error_reports + }). + + +-endif. + +%% end of xdc_replicator.hrl diff --git a/src/mc_client_binary.erl b/src/mc_client_binary.erl index 42d47d8e74..dc3bf5e2a8 100644 --- a/src/mc_client_binary.erl +++ b/src/mc_client_binary.erl @@ -56,7 +56,11 @@ disable_traffic/1, wait_for_checkpoint_persistence/3, get_tap_docs_estimate/3, - get_mass_tap_docs_estimate/2 + map_status/1, + process_error_response/1, + get_mass_tap_docs_estimate/2, + ext/2, + rev_to_mcd_ext/1 ]). -type recv_callback() :: fun((_, _, _) -> any()) | undefined. diff --git a/src/ns_config_default.erl b/src/ns_config_default.erl index c0260e3971..e8c94d920f 100644 --- a/src/ns_config_default.erl +++ b/src/ns_config_default.erl @@ -76,6 +76,17 @@ default() -> %% when doc body size is no greater than the threshold {xdcr_optimistic_replication_threshold, 256}, + %% xdcr replication mode: + %% "capi": replicating to ns_server:capi_replication layer + %% "xmem": replciating to mecached directly + {xdcr_replication_mode, "xmem"}, + %% # of worker processes per vb rep xmem server + {xdcr_xmem_worker, 1}, + %% enable pipelined memcached operations + {xdcr_enable_pipeline_ops, true}, + %% inverse of probability to dump non-critical trace + {xdcr_trace_dump_inverse_prob, 1000}, + {directory, path_config:component_path(data, "config")}, {index_aware_rebalance_disabled, false}, {max_bucket_count, 10}, diff --git a/src/remote_clusters_info.erl b/src/remote_clusters_info.erl index 963f0739bc..ae805d2845 100644 --- a/src/remote_clusters_info.erl +++ b/src/remote_clusters_info.erl @@ -50,7 +50,7 @@ %% Construct remote bucket reference that can be used by %% get_remote_bucket_by_ref functions. %% -%% - get_memcached_vbucket_info_by_ref/4 +%% - get_memcached_vbucket_info_by_ref/{3, 4} %% %% -> {ok, {Host :: binary(), MemcachedPort :: integer()}, #remote_bucket{}} %% | {error, _} | {error, _, _} % see get_remote_bucket_by_ref for errors @@ -86,6 +86,7 @@ remote_bucket_reference/2, parse_remote_bucket_reference/1, invalidate_remote_bucket/2, invalidate_remote_bucket_by_ref/1, find_cluster_by_uuid/1, + get_memcached_vbucket_info_by_ref/3, get_memcached_vbucket_info_by_ref/4]). %% gen_server callbacks @@ -251,6 +252,9 @@ invalidate_remote_bucket(ClusterName, Bucket) -> {invalidate_remote_bucket, Cluster, Bucket}, infinity) end. +get_memcached_vbucket_info_by_ref(Reference, ForceRefresh, VBucket) -> + get_memcached_vbucket_info_by_ref(Reference, ForceRefresh, VBucket, ?GET_BUCKET_TIMEOUT). + get_memcached_vbucket_info_by_ref(Reference, ForceRefresh, VBucket, Timeout) -> case get_remote_bucket_by_ref(Reference, ForceRefresh, Timeout) of {ok, RemoteBucket} -> diff --git a/src/xdc_rep_manager.erl b/src/xdc_rep_manager.erl index 7efca8c818..32f73eb929 100644 --- a/src/xdc_rep_manager.erl +++ b/src/xdc_rep_manager.erl @@ -303,9 +303,22 @@ dump_parameters() -> {value, DefaultRestartWaitTime} = ns_config:search(xdcr_failure_restart_interval), RestartWaitTime = misc:getenv_int("XDCR_FAILURE_RESTART_INTERVAL", DefaultRestartWaitTime), + RepMode = xdc_rep_utils:get_replication_mode(), OptRepThreshold = xdc_rep_utils:get_opt_replication_threshold(), + {NumXMemWorker, Pipeline} + = case RepMode of + "xmem" -> + DefNumXMemWorker = xdc_rep_utils:get_xmem_worker(), + EnablePipeline = xdc_rep_utils:enable_pipeline_ops(), + {DefNumXMemWorker, EnablePipeline}; + _ -> + {undefined, undefined} + end, + ?xdcr_debug("default XDCR parameters:~n \t" + "replication mode: ~p (pipleline: ~p, " + "num xmem worker per vb replicator: ~p);~n \t" "optimistic replication threshold: ~p bytes;~n \t" "number of max concurrent reps per bucket: ~p;~n \t" "checkpoint interval in secs: ~p;~n \t" @@ -315,7 +328,10 @@ dump_parameters() -> "max number HTTP connections per vb replicator: ~p;~n \t" "max number retries per connection: ~p;~n \t" "vb replicator waiting time before restart: ~p ", - [OptRepThreshold, + [RepMode, + Pipeline, + NumXMemWorker, + OptRepThreshold, MaxConcurrentReps, IntervalSecs, DefBatchSize, DocBatchSizeKB, diff --git a/src/xdc_rep_utils.erl b/src/xdc_rep_utils.erl index 0aa19fa61b..0414dd20f2 100644 --- a/src/xdc_rep_utils.erl +++ b/src/xdc_rep_utils.erl @@ -25,6 +25,9 @@ -export([get_master_db/1, get_checkpoint_log_id/2]). -export([get_opt_replication_threshold/0]). -export([update_options/1]). +-export([get_replication_mode/0, get_replication_batch_size/0]). +-export([enable_pipeline_ops/0, get_trace_dump_invprob/0]). +-export([get_xmem_worker/0]). -include("xdc_replicator.hrl"). @@ -169,10 +172,11 @@ make_options(Props) -> "optimistic replication threshold: ~p bytes, " "worker processes: ~p, " "worker batch size (# of mutations): ~p, " + "socket options: ~p " "HTTP connections: ~p, " "connection timeout (ms): ~p," "num of retries per request: ~p]", - [OptRepThreshold, DefWorkers, DefBatchSize, DefConns, DefTimeout, DefRetries]), + [OptRepThreshold, DefWorkers, DefBatchSize, DefSocketOptions, DefConns, DefTimeout, DefRetries]), lists:ukeymerge(1, Options, lists:keysort(1, [ {connection_timeout, DefTimeout}, @@ -321,7 +325,6 @@ get_opt_replication_threshold() -> Threshold end. - %% get xdc replication options, log them if changed -spec update_options(list()) -> list(). update_options(Options) -> @@ -396,5 +399,58 @@ update_options(Options) -> {worker_processes, DefWorkers}, {opt_rep_threshold, Threshold}]), Options). +-spec get_replication_mode() -> list(). +get_replication_mode() -> + {value, DefaultRepMode} = ns_config:search(xdcr_replication_mode), + + EnvVar = case (catch string:to_lower(os:getenv("XDCR_REPLICATION_MODE"))) of + "capi" -> + "capi"; + "xmem" -> + "xmem"; + _ -> + undefined + end, + + %% env var overrides ns_config parameter, use default ns_config parameter + %% only when env var is undefined + case EnvVar of + undefined -> + DefaultRepMode; + _ -> + EnvVar + end. +-spec get_replication_batch_size() -> integer(). +get_replication_batch_size() -> + %% env parameter can override the ns_config parameter + {value, DefaultDocBatchSize} = ns_config:search(xdcr_worker_batch_size), + DocBatchSize = misc:getenv_int("XDCR_WORKER_BATCH_SIZE", DefaultDocBatchSize), + 1024*DocBatchSize. + +-spec enable_pipeline_ops() -> boolean(). +enable_pipeline_ops() -> + %% env parameter can override the ns_config parameter + {value, EnablePipeline} = ns_config:search(xdcr_enable_pipeline_ops), + case os:getenv("XDCR_ENABLE_PIPELINE") of + "true" -> + true; + "false" -> + false; + _ -> + EnablePipeline + end. +%% inverse probability to dump non-critical datapath trace, +%% trace will be dumped by probability 1/N +-spec get_trace_dump_invprob() -> integer(). +get_trace_dump_invprob() -> + %% env parameter can override the ns_config parameter + {value, DefInvProb} = ns_config:search(xdcr_trace_dump_inverse_prob), + misc:getenv_int("XDCR_TRACE_DUMP_INVERSE_PROB", DefInvProb). + +-spec get_xmem_worker() -> integer(). +get_xmem_worker() -> + %% env parameter can override the ns_config parameter + {value, DefNumXMemWorker} = ns_config:search(xdcr_xmem_worker), + misc:getenv_int("XDCR_XMEM_WORKER", DefNumXMemWorker). diff --git a/src/xdc_replication.erl b/src/xdc_replication.erl index 332352f872..f11b950084 100644 --- a/src/xdc_replication.erl +++ b/src/xdc_replication.erl @@ -73,7 +73,7 @@ init([#rep{source = SrcBucketBinary} = Rep]) -> (_Evt) -> ok end, - + RepMode = xdc_rep_utils:get_replication_mode(), {ok, _} = couch_db_update_notifier:start_link(NotifyFun), ?xdcr_debug("couch_db update notifier started", []), {ok, InitThrottle} = concurrency_throttle:start_link(MaxConcurrentReps, self()), @@ -85,6 +85,7 @@ init([#rep{source = SrcBucketBinary} = Rep]) -> {ok, SrcBucketConfig} -> Vbs = xdc_rep_utils:my_active_vbuckets(SrcBucketConfig), RepState0 = #replication{rep = Rep, + mode = RepMode, vbs = Vbs, num_tokens = MaxConcurrentReps, init_throttle = InitThrottle, @@ -95,6 +96,7 @@ init([#rep{source = SrcBucketBinary} = Rep]) -> ?xdcr_error("fail to fetch a valid bucket config and no vb replicator " "would be created (error: ~p)", [Error]), RepState = #replication{rep = Rep, + mode = RepMode, num_tokens = MaxConcurrentReps, init_throttle = InitThrottle, work_throttle = WorkThrottle, @@ -384,6 +386,7 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. start_vb_replicators(#replication{rep = Rep, + mode = RepMode, vbucket_sup = Sup, init_throttle = InitThrottle, work_throttle = WorkThrottle, @@ -408,7 +411,8 @@ start_vb_replicators(#replication{rep = Rep, Vb, InitThrottle, WorkThrottle, - self()) + self(), + RepMode) end, misc:shuffle(NewVbs)), Replication#replication{vb_rep_dict = Dict2}. diff --git a/src/xdc_vbucket_rep.erl b/src/xdc_vbucket_rep.erl index 66f36e6b9c..b7229dd245 100644 --- a/src/xdc_vbucket_rep.erl +++ b/src/xdc_vbucket_rep.erl @@ -35,7 +35,7 @@ -behaviour(gen_server). %% public functions --export([start_link/5]). +-export([start_link/6]). %% gen_server callbacks -export([init/1, terminate/2, code_change/3]). @@ -47,13 +47,15 @@ -record(init_state, { rep, vb, + mode, init_throttle, work_throttle, parent}). -start_link(Rep, Vb, InitThrottle, WorkThrottle, Parent) -> +start_link(Rep, Vb, InitThrottle, WorkThrottle, Parent, RepMode) -> InitState = #init_state{rep = Rep, vb = Vb, + mode = RepMode, init_throttle = InitThrottle, work_throttle = WorkThrottle, parent = Parent}, @@ -146,14 +148,20 @@ handle_call({report_seq_done, #worker_stat{seq = Seq, _ -> NewThroughSeq0 end, - ?xdcr_debug("Replicator of vbucket ~p: worker reported seq ~p, through seq was ~p, " - "new through seq is ~p, highest seq done was ~p, " - "new highest seq done is ~p~n" - "Seqs in progress were: ~p~nSeqs in progress are now: ~p" - "(total docs checked: ~p, total docs written: ~p)", - [Vb, Seq, ThroughSeq, NewThroughSeq, HighestDone, - NewHighestDone, SeqsInProgress, NewSeqsInProgress, - TotalChecked, TotalWritten]), + + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("Replicator of vbucket ~p: worker reported seq ~p, through seq was ~p, " + "new through seq is ~p, highest seq done was ~p, " + "new highest seq done is ~p~n" + "Seqs in progress were: ~p~nSeqs in progress are now: ~p" + "(total docs checked: ~p, total docs written: ~p)", + [Vb, Seq, ThroughSeq, NewThroughSeq, HighestDone, + NewHighestDone, SeqsInProgress, NewSeqsInProgress, + TotalChecked, TotalWritten]); + _ -> + ok + end, SourceCurSeq = xdc_vbucket_rep_ckpt:source_cur_seq(State), %% get stats @@ -205,17 +213,22 @@ handle_call({report_seq_done, #worker_stat{seq = Seq, {noreply, update_status_to_parent(NewState)}; handle_call({worker_done, Pid}, _From, - #rep_state{workers = Workers, status = VbStatus, parent = Parent} = State) -> + #rep_state{workers = Workers, status = VbStatus, xmem_srv = XMemSrv, parent = Parent} = State) -> case Workers -- [Pid] of Workers -> {stop, {unknown_worker_done, Pid}, ok, State}; [] -> %% all workers completed. Now shutdown everything and prepare for %% more changes from src - %% before return my token to throttle, check if user has changed number of tokens Parent ! check_tokens, - + %% disconnect all xmem workers + case XMemSrv of + nil -> + ok; + _ -> + xdc_vbucket_rep_xmem_srv:disconnect(XMemSrv) + end, %% allow another replicator to go State2 = replication_turn_is_done(State), couch_api_wrap:db_close(State2#rep_state.source), @@ -241,16 +254,22 @@ handle_call({worker_done, Pid}, _From, NumFailedCkpts = VbStatus2#rep_vb_status.num_failedckpts, LastCkptTime = State2#rep_state.last_checkpoint_time, StartRepTime = State2#rep_state.rep_start_time, - ?xdcr_debug("Replicator of vbucket ~p done, return token to throttle: ~p~n" - "(highest seq done is ~p, number of changes left: ~p~n" - "total docs checked: ~p, total docs written: ~p (total data repd: ~p)~n" - "total number of succ ckpts: ~p (failed ckpts: ~p)~n" - "last succ ckpt time: ~p, replicator start time: ~p.", - [Vb, Throttle, HighestDone, ChangesLeft, TotalChecked, TotalWritten, TotalDataRepd, - NumCkpts, NumFailedCkpts, - calendar:now_to_local_time(LastCkptTime), - calendar:now_to_local_time(StartRepTime) - ]), + + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("Replicator of vbucket ~p done, return token to throttle: ~p~n" + "(highest seq done is ~p, number of changes left: ~p~n" + "total docs checked: ~p, total docs written: ~p (total data repd: ~p)~n" + "total number of succ ckpts: ~p (failed ckpts: ~p)~n" + "last succ ckpt time: ~p, replicator start time: ~p.", + [Vb, Throttle, HighestDone, ChangesLeft, TotalChecked, TotalWritten, TotalDataRepd, + NumCkpts, NumFailedCkpts, + calendar:now_to_local_time(LastCkptTime), + calendar:now_to_local_time(StartRepTime) + ]); + _ -> + ok + end, %% we mark the vb rep status to idle NewRateStat = (VbStatus2#rep_vb_status.ratestat)#ratestat{curr_rate_item = 0, @@ -287,8 +306,13 @@ handle_cast(checkpoint, #rep_state{status = VbStatus} = State) -> NewState2 = NewState#rep_state{timer = xdc_vbucket_rep_ckpt:start_timer(State), status = VbStatus2}, Vb = (NewState2#rep_state.status)#rep_vb_status.vb, - ?xdcr_debug("checkpoint issued during replication for vb ~p, " - "commit time: ~p", [Vb, CommitTime]), + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("checkpoint issued during replication for vb ~p, " + "commit time: ~p", [Vb, CommitTime]); + _ -> + ok + end, {ok, NewState2}; {checkpoint_commit_failure, ErrorMsg, NewState} -> %% update the failed ckpt stats to bucket replicator @@ -313,6 +337,13 @@ handle_cast(checkpoint, #rep_state{status = VbStatus} = State) -> Result end; + +handle_cast({report_error, Err}, + #rep_state{parent = Parent} = State) -> + %% relay error from child to parent bucket replicator + gen_server:cast(Parent, {report_error, Err}), + {noreply, State}; + handle_cast({report_seq, Seq}, #rep_state{seqs_in_progress = SeqsInProgress} = State) -> NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress), @@ -352,7 +383,15 @@ terminate(Reason, #rep_state{ terminate_cleanup(State). -terminate_cleanup(State0) -> +terminate_cleanup(#rep_state{xmem_srv = XMemSrv} = State0) -> + %% shutdown xmem server + ok = case XMemSrv of + nil -> + ok; + _ -> + xdc_vbucket_rep_xmem_srv:stop(XMemSrv) + end, + State = xdc_vbucket_rep_ckpt:cancel_timer(State0), Dbs = [State#rep_state.source, State#rep_state.target, @@ -421,6 +460,7 @@ update_status_to_parent(#rep_state{parent = Parent, init_replication_state(#init_state{rep = Rep, vb = Vb, + mode = RepMode, work_throttle = Throttle, parent = Parent}) -> #rep{ @@ -435,7 +475,6 @@ init_replication_state(#init_state{rep = Rep, TgtDb = xdc_rep_utils:parse_rep_db(TgtURI), {ok, Source} = couch_api_wrap:db_open(SrcVbDb, []), {ok, Target} = couch_api_wrap:db_open(TgtDb, []), - {ok, SourceInfo} = couch_api_wrap:get_db_info(Source), {ok, TargetInfo} = couch_api_wrap:get_db_info(Target), @@ -446,6 +485,16 @@ init_replication_state(#init_state{rep = Rep, xdc_rep_utils:get_master_db(Target), []), + XMemRemote = case RepMode of + "xmem" -> + {ok, {Ip, Port}, _Bucket} = + remote_clusters_info:get_memcached_vbucket_info_by_ref(Tgt, false, Vb), + #xdc_rep_xmem_remote{ip = binary_to_list(Ip), port = Port, + username= "_admin", password = "_admin", options = []}; + _ -> + nil + end, + %% We have to pass the vbucket database along with the master database %% because the replication log id needs to be prefixed with the vbucket id %% at both the source and the destination. @@ -458,9 +507,14 @@ init_replication_state(#init_state{rep = Rep, TotalDocsWritten, TotalDataReplicated, History} = compare_replication_logs(SourceLog, TargetLog), - ?xdcr_debug("history log at src and dest: startseq: ~p, docs checked: ~p," - "docs_written: ~p, data replicated: ~p", - [StartSeq0, TotalDocsChecked, TotalDocsWritten, TotalDataReplicated]), + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("history log at src and dest: startseq: ~p, docs checked: ~p," + "docs_written: ~p, data replicated: ~p", + [StartSeq0, TotalDocsChecked, TotalDocsWritten, TotalDataReplicated]); + _ -> + ok + end, StartSeq = get_value(since_seq, Options, StartSeq0), #doc{body={CheckpointHistory}} = SourceLog, couch_db:close(Source), @@ -468,6 +522,7 @@ init_replication_state(#init_state{rep = Rep, couch_api_wrap:db_close(TgtMasterDb), couch_api_wrap:db_close(Target), couch_api_wrap:db_close(TgtMasterDb), + RepState = #rep_state{ rep_details = Rep, throttle = Throttle, @@ -494,6 +549,9 @@ init_replication_state(#init_state{rep = Rep, %% initialize the work start time in start_replication() work_start_time = 0, session_id = couch_uuids:random(), + %% XMem not started + xmem_srv = nil, + xmem_remote = XMemRemote, status = #rep_vb_status{vb = Vb, pid = self(), %% init per vb replication stats from checkpoint doc @@ -514,7 +572,8 @@ init_replication_state(#init_state{rep = Rep, }, source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ) }, - ?xdcr_debug("vb ~p replication state initialized: ~p", [Vb, RepState]), + ?xdcr_debug("vb ~p replication state initialized: (local db: ~p, remote db: ~p, mode: ~p)", + [Vb, RepState#rep_state.source_name, RepState#rep_state.target_name, RepMode]), RepState. start_replication(#rep_state{ @@ -522,7 +581,8 @@ start_replication(#rep_state{ target_name = TargetName, current_through_seq = StartSeq, last_checkpoint_time = LastCkptTime, - rep_details = #rep{id = Id, options = Opt} + rep_details = #rep{id = Id, options = Opt}, + xmem_remote = Remote } = State) -> WorkStart = now(), @@ -561,16 +621,56 @@ start_replication(#rep_state{ [ChangesReader, ChangesManager]), Changes = couch_db:count_changes_since(Source, StartSeq), + + %% start xmem server if it has not started + Vb = (State#rep_state.status)#rep_vb_status.vb, + XPid = case Remote of + nil -> + nil; + %% xmem replication mode + _XMemRemote -> + XMemSrvPid = case State#rep_state.xmem_srv of + nil -> + {ok, XMemSrv} = xdc_vbucket_rep_xmem_srv:start_link(Vb, Remote, self()), + XMemSrv; + Pid -> + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("xmem remote server already started (vb: ~p, pid: ~p)", + [Vb, Pid]), + ok; + _ -> + ok + end, + Pid + end, + ok = xdc_vbucket_rep_xmem_srv:connect(XMemSrvPid), + ok = xdc_vbucket_rep_xmem_srv:select_bucket(XMemSrvPid), + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("xmem remote node connected and bucket selected " + "(remote bucket: ~p, vb: ~b, remote ip: ~p, port: ~p, xmem srv pid: ~p)", + [Remote#xdc_rep_xmem_remote.bucket, + Vb, + Remote#xdc_rep_xmem_remote.ip, + Remote#xdc_rep_xmem_remote.port, + XMemSrvPid]); + _ -> + ok + end, + XMemSrvPid + end, + %% build start option for worker process WorkerOption = #rep_worker_option{ cp = self(), source = Source, target = Target, changes_manager = ChangesManager, max_conns = MaxConns, - opt_rep_threshold = OptRepThreshold}, + opt_rep_threshold = OptRepThreshold, xmem_server = XPid}, Workers = lists:map( fun(_) -> - {ok, Pid} = xdc_vbucket_rep_worker:start_link(WorkerOption), - Pid + {ok, WorkerPid} = xdc_vbucket_rep_worker:start_link(WorkerOption), + WorkerPid end, lists:seq(1, NumWorkers)), @@ -595,13 +695,21 @@ start_replication(#rep_state{ {value, DefaultIntervalSecs} = ns_config:search(xdcr_checkpoint_interval), IntervalSecs = misc:getenv_int("XDCR_CHECKPOINT_INTERVAL", DefaultIntervalSecs), TimeSinceLastCkpt = timer:now_diff(now(), LastCkptTime) div 1000000, - ?xdcr_debug("Worker pids are: ~p, last checkpt time: ~p" - "secs since last ckpt: ~p, ckpt interval: ~p)", - [Workers, calendar:now_to_local_time(LastCkptTime), - TimeSinceLastCkpt, IntervalSecs]), + + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("Worker pids are: ~p, last checkpt time: ~p" + "secs since last ckpt: ~p, ckpt interval: ~p)", + [Workers, calendar:now_to_local_time(LastCkptTime), + TimeSinceLastCkpt, IntervalSecs]), + ok; + _ -> + ok + end, %% check if we need do checkpointing, replicator will crash if checkpoint failure State1 = State#rep_state{ + xmem_srv = XPid, source = Source, target = Target, src_master_db = SrcMasterDb, @@ -615,6 +723,7 @@ start_replication(#rep_state{ _ -> {ok, <<"no checkpoint">>, State1} end, + CommitTime = timer:now_diff(now(), Start) div 1000, TotalCommitTime = CommitTime + NewState#rep_state.status#rep_vb_status.commit_time, @@ -633,11 +742,15 @@ start_replication(#rep_state{ }), %% finally crash myself if fail to commit, after posting status to parent - Vb = (ResultState#rep_state.status)#rep_vb_status.vb, case Succ of ok -> - ?xdcr_debug("checkpoint at start of replication for vb ~p " - "commit time: ~p ms, msg: ~p", [Vb, CommitTime, ErrorMsg]), + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("checkpoint at start of replication for vb ~p " + "commit time: ~p ms, msg: ~p", [Vb, CommitTime, ErrorMsg]); + _ -> + ok + end, ok; checkpoint_commit_failure -> ?xdcr_error("checkpoint commit failure at start of replication for vb ~p, " @@ -645,11 +758,15 @@ start_replication(#rep_state{ exit(ErrorMsg) end, + %% finally the vb replicator has been started Src = ResultState#rep_state.source_name, Tgt = ResultState#rep_state.target_name, - ?xdcr_info("replicator of vb ~p for replication from src ~p to target ~p has been started.", - [Vb, Src, Tgt]), + ?xdcr_info("replicator of vb ~p for replication from src ~p to target ~p has been " + "started (xmem remote (ip: ~p, port: ~p, bucket: ~p), xmem srv: ~p).", + [Vb, Src, Tgt, Remote#xdc_rep_xmem_remote.ip, + Remote#xdc_rep_xmem_remote.port, Remote#xdc_rep_xmem_remote.bucket, + XPid]), ResultState. diff --git a/src/xdc_vbucket_rep_ckpt.erl b/src/xdc_vbucket_rep_ckpt.erl index 9d35c1f50e..c056ab2c56 100644 --- a/src/xdc_vbucket_rep_ckpt.erl +++ b/src/xdc_vbucket_rep_ckpt.erl @@ -33,7 +33,12 @@ start_timer(State) -> %% start a new timer case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of {ok, Ref} -> - ?xdcr_debug("schedule next checkpoint in ~p seconds (ref: ~p)", [AfterSecs, Ref]), + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("schedule next checkpoint in ~p seconds (ref: ~p)", [AfterSecs, Ref]); + _ -> + ok + end, Ref; Error -> ?xdcr_error("Replicator, error scheduling checkpoint: ~p", [Error]), @@ -41,11 +46,15 @@ start_timer(State) -> end. cancel_timer(#rep_state{timer = nil} = State) -> - ?xdcr_debug("no checkpoint timer to cancel"), State; cancel_timer(#rep_state{timer = Timer} = State) -> {ok, cancel} = timer:cancel(Timer), - ?xdcr_debug("checkpoint timer has been cancelled (ref: ~p)", [Timer]), + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("checkpoint timer has been cancelled (ref: ~p)", [Timer]); + _ -> + ok + end, State#rep_state{timer = nil}. -spec do_checkpoint(#rep_state{}) -> {ok, binary(), #rep_state{}} | @@ -71,7 +80,8 @@ do_checkpoint(State) -> src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, session_id = SessionId, - status = Status + status = Status, + xmem_srv = XMemSrvPid } = State, #rep_vb_status{docs_checked = Checked, @@ -83,7 +93,15 @@ do_checkpoint(State) -> num_checkpoints = NumCkpts, num_failedckpts = NumFailedCkpts} = Status, - CheckpointResult = case commit_to_both(Source, Target) of + + CommitResult = case XMemSrvPid of + nil -> + commit_to_both_remote_capi(Source, Target); + _ -> + commit_to_both_remote_xmem(Source, XMemSrvPid) + end, + + CheckpointResult = case CommitResult of {source_error, Reason} -> {checkpoint_commit_failure, <<"Failure on source commit: ", (to_binary(Reason))/binary>>}; @@ -202,7 +220,12 @@ update_checkpoint(Db, #doc{id = LogId, body = LogBody, rev = Rev} = Doc) -> end end. -commit_to_both(Source, Target) -> + +%% do remote checkpoint via CAPI protocol +-spec commit_to_both_remote_capi(#db{}, #httpdb{}) -> {ok, _} | + {source_error, _} | + {target_error, _}. +commit_to_both_remote_capi(Source, Target) -> %% commit the src async ParentPid = self(), SrcCommitPid = spawn_link( @@ -234,6 +257,40 @@ commit_to_both(Source, Target) -> {target_error, TargetError} end. +%% do remote checkpoint via memcached protocol +-spec commit_to_both_remote_xmem(#db{}, pid()) -> {ok, _} | + {source_error, _} | + {target_error, _}. +commit_to_both_remote_xmem(Source, XMemSrv) -> + %% commit the src async + ParentPid = self(), + SrcCommitPid = spawn_link( + fun() -> + Result = (catch couch_api_wrap:ensure_full_commit(Source)), + ParentPid ! {self(), Result} + end), + + %% commit tgt sync + TargetResult = xdc_vbucket_rep_xmem_srv:ensure_full_commit(XMemSrv), + SourceResult = receive + {SrcCommitPid, Result} -> + unlink(SrcCommitPid), + receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end, + Result; + {'EXIT', SrcCommitPid, Reason} -> + {error, Reason} + end, + case TargetResult of + {ok, TargetStartTime} -> + case SourceResult of + {ok, SourceStartTime} -> + {SourceStartTime, TargetStartTime}; + SourceError -> + {source_error, SourceError} + end; + TargetError -> + {target_error, TargetError} + end. source_cur_seq(#rep_state{source = #db{} = Db, source_seq = Seq}) -> {ok, Info} = couch_api_wrap:get_db_info(Db), diff --git a/src/xdc_vbucket_rep_sup.erl b/src/xdc_vbucket_rep_sup.erl index cc400fd399..0b25b62829 100644 --- a/src/xdc_vbucket_rep_sup.erl +++ b/src/xdc_vbucket_rep_sup.erl @@ -12,7 +12,7 @@ -module(xdc_vbucket_rep_sup). -behaviour(supervisor2). --export([start_link/1, shutdown/1, start_vbucket_rep/6, stop_vbucket_rep/2]). +-export([start_link/1, shutdown/1, start_vbucket_rep/7, stop_vbucket_rep/2]). -export([vbucket_reps/1]). -export([init/1]). @@ -24,14 +24,15 @@ start_link(ChildSpecs) -> ?xdcr_debug("xdc vbucket replicator supervisor started: ~p", [Sup]), {ok, Sup}. -start_vbucket_rep(Sup, Rep, Vb, InitThrottle, WorkThrottle, Parent) -> +start_vbucket_rep(Sup, Rep, Vb, InitThrottle, WorkThrottle, Parent, RepMode) -> {value, DefaultRestartWaitTime} = ns_config:search(xdcr_failure_restart_interval), RestartWaitTime = misc:getenv_int("XDCR_FAILURE_RESTART_INTERVAL", DefaultRestartWaitTime), - ?xdcr_debug("start xdc vbucket replicator (vb: ~p, restart wait time: ~p, parent pid: ~p)", - [Vb, RestartWaitTime, Parent]), + ?xdcr_debug("start xdc vbucket replicator (vb: ~p, restart wait time: ~p, " + "parent pid: ~p, mode: ~p)", + [Vb, RestartWaitTime, Parent, RepMode]), Spec = {Vb, - {xdc_vbucket_rep, start_link, [Rep, Vb, InitThrottle, WorkThrottle, Parent]}, + {xdc_vbucket_rep, start_link, [Rep, Vb, InitThrottle, WorkThrottle, Parent, RepMode]}, {permanent, RestartWaitTime}, 100, worker, diff --git a/src/xdc_vbucket_rep_worker.erl b/src/xdc_vbucket_rep_worker.erl index 6bd9101c83..6ecbc42407 100644 --- a/src/xdc_vbucket_rep_worker.erl +++ b/src/xdc_vbucket_rep_worker.erl @@ -21,36 +21,61 @@ %% the target should always from remote with record #httpdb{}. There is %% no intra-cluster XDCR start_link(#rep_worker_option{cp = Cp, source = Source, target = Target, - changes_manager = ChangesManager, opt_rep_threshold = OptRepThreshold} = _WorkerOption) -> + changes_manager = ChangesManager, + opt_rep_threshold = OptRepThreshold, + xmem_server = XMemSrv} = _WorkerOption) -> Pid = spawn_link(fun() -> erlang:monitor(process, ChangesManager), - queue_fetch_loop(Source, Target, Cp, ChangesManager, OptRepThreshold) + queue_fetch_loop(Source, Target, Cp, ChangesManager, OptRepThreshold, XMemSrv) end), - ?xdcr_debug("create queue_fetch_loop process (pid: ~p) within replicator (pid: ~p) " - "Source: ~p, Target: ~p, ChangesManager: ~p, latency optimized: ~p", - [Pid, Cp, Source#db.name, Target#httpdb.url, ChangesManager, OptRepThreshold]), + + + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("create queue_fetch_loop process (pid: ~p) within replicator (pid: ~p) " + "Source: ~p, Target: ~p, ChangesManager: ~p, latency optimized: ~p", + [Pid, Cp, Source#db.name, Target#httpdb.url, ChangesManager, OptRepThreshold]); + _ -> + ok + end, {ok, Pid}. -queue_fetch_loop(Source, Target, Cp, ChangesManager, OptRepThreshold) -> - ?xdcr_debug("fetch changes from changes manager at ~p (target: ~p)", - [ChangesManager, Target#httpdb.url]), +queue_fetch_loop(Source, Target, Cp, ChangesManager, OptRepThreshold, XMemSrv) -> + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("fetch changes from changes manager at ~p (target: ~p)", + [ChangesManager, Target#httpdb.url]); + _ -> + ok + end, ChangesManager ! {get_changes, self()}, receive {'DOWN', _, _, _, _} -> ok = gen_server:call(Cp, {worker_done, self()}, infinity); {changes, ChangesManager, Changes, ReportSeq} -> %% get docinfo of missing ids - {MissingDocInfoList, MetaLatency} = find_missing(Changes, Target, OptRepThreshold), + {MissingDocInfoList, MetaLatency} = find_missing(Changes, Target, OptRepThreshold, XMemSrv), NumChecked = length(Changes), NumWritten = length(MissingDocInfoList), %% use ptr in docinfo to fetch document from storage Start = now(), {ok, DataRepd} = local_process_batch( - MissingDocInfoList, Cp, Source, Target, #batch{}), + MissingDocInfoList, Cp, Source, Target, #batch{}, XMemSrv), - %% latency in millisecond - DocLatency = timer:now_diff(now(), Start) div 1000, + %% the latency returned should be coupled with batch size, for example, + %% if we send N docs in a batch, the latency returned to stats should be the latency + %% for all N docs. XMem mode XDCR is now single doc based, therefore the latency + %% should be single doc replication latency latency in millisecond. + DocLatency = case is_pid(XMemSrv) of + true -> + BatchLatency = timer:now_diff(now(), Start) div 1000, + try BatchLatency / NumWritten + catch error:badarith -> 0 + end; + false -> + timer:now_diff(now(), Start) div 1000 + end, %% report seq done and stats to vb replicator ok = gen_server:call(Cp, {report_seq_done, #worker_stat{ @@ -60,31 +85,44 @@ queue_fetch_loop(Source, Target, Cp, ChangesManager, OptRepThreshold) -> worker_data_replicated = DataRepd, worker_item_checked = NumChecked, worker_item_replicated = NumWritten}}, infinity), - ?xdcr_debug("Worker reported completion of seq ~p", [ReportSeq]), - queue_fetch_loop(Source, Target, Cp, ChangesManager, OptRepThreshold) + + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("Worker reported completion of seq ~p, num docs written: ~p " + "data replicated: ~p bytes, latency: ~p ms.", + [ReportSeq, NumWritten, DataRepd, DocLatency]); + _ -> + ok + end, + queue_fetch_loop(Source, Target, Cp, ChangesManager, OptRepThreshold, XMemSrv) end. -local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}) -> +local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, _XMemSrv) -> {ok, 0}; local_process_batch([], Cp, #db{} = Source, #httpdb{} = Target, - #batch{docs = Docs, size = Size}) -> - ?xdcr_debug("worker process flushing a batch docs of total size ~p bytes", - [Size]), - ok = flush_docs(Target, Docs), - {ok, DataRepd1} = local_process_batch([], Cp, Source, Target, #batch{}), + #batch{docs = Docs, size = Size}, XMemSrv) -> + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("worker process flushing a batch docs of total size ~p bytes", + [Size]); + _ -> + ok + end, + ok = flush_docs_helper(Target, Docs, XMemSrv), + {ok, DataRepd1} = local_process_batch([], Cp, Source, Target, #batch{}, XMemSrv), {ok, DataRepd1 + Size}; local_process_batch([DocInfo | Rest], Cp, #db{} = Source, - #httpdb{} = Target, Batch) -> - {ok, {_, DocList, _}} = fetch_doc( + #httpdb{} = Target, Batch, XMemSrv) -> + {ok, {_, DocsList, _}} = fetch_doc( Source, DocInfo, fun local_doc_handler/2, {Target, [], Cp}), {Batch2, DataFlushed} = lists:foldl( fun(Doc, {Batch0, DataFlushed1}) -> - maybe_flush_docs(Target, Batch0, Doc, DataFlushed1) + maybe_flush_docs(Target, Batch0, Doc, DataFlushed1, XMemSrv) end, - {Batch, 0}, DocList), - {ok, DataFlushed2} = local_process_batch(Rest, Cp, Source, Target, Batch2), + {Batch, 0}, DocsList), + {ok, DataFlushed2} = local_process_batch(Rest, Cp, Source, Target, Batch2, XMemSrv), %% return total data flushed {ok, DataFlushed + DataFlushed2}. @@ -93,53 +131,49 @@ local_process_batch([DocInfo | Rest], Cp, #db{} = Source, fetch_doc(Source, #doc_info{body_ptr = _BodyPtr} = DocInfo, DocHandler, Acc) -> couch_api_wrap:open_doc(Source, DocInfo, [deleted], DocHandler, Acc). -local_doc_handler({ok, Doc}, {Target, DocList, Cp}) -> - {ok, {Target, [Doc | DocList], Cp}}; +local_doc_handler({ok, Doc}, {Target, DocsList, Cp}) -> + {ok, {Target, [Doc | DocsList], Cp}}; local_doc_handler(_, Acc) -> {ok, Acc}. -maybe_flush_docs(#httpdb{} = Target, Batch, Doc, DataFlushed) -> - #batch{docs = DocAcc, size = SizeAcc} = Batch, - JsonDoc = couch_doc:to_json_base64(Doc), - - %% env parameter can override the ns_config parameter - {value, DefaultDocBatchSize} = ns_config:search(xdcr_doc_batch_size_kb), - DocBatchSize = misc:getenv_int("XDCR_DOC_BATCH_SIZE_KB", DefaultDocBatchSize), - - DocBatchSizeByte = 1024*DocBatchSize, - case SizeAcc + iolist_size(JsonDoc) of - SizeAcc2 when SizeAcc2 > DocBatchSizeByte -> - ?xdcr_debug("Worker flushing doc batch of size ~p bytes " - "(batch limit: ~p)", [SizeAcc2, DocBatchSizeByte]), - flush_docs(Target, [JsonDoc | DocAcc]), - %% data flushed, return empty batch and size of data flushed - {#batch{}, SizeAcc2 + DataFlushed}; - SizeAcc2 -> - %% no data flushed in this turn, return the new batch - {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, DataFlushed} +-spec maybe_flush_docs(#httpdb{}, #batch{}, #doc{}, integer(), pid()) -> {#batch{}, integer()}. +maybe_flush_docs(#httpdb{} = Target, Batch, Doc, DataFlushed, XMemSrv) -> + case is_pid(XMemSrv) of + false -> + maybe_flush_docs_capi(Target, Batch, Doc, DataFlushed); + true -> + maybe_flush_docs_xmem(XMemSrv, Batch, Doc, DataFlushed) end. -flush_docs(_Target, []) -> - ok; -flush_docs(Target, DocList) -> - case couch_api_wrap:update_docs(Target, DocList, [delay_commit], - replicated_changes) of +flush_docs_helper(Target, DocsList, XMemSrv) -> + {RepMode,RV} = + case is_pid(XMemSrv) of + false -> + {"capi", flush_docs_capi(Target, DocsList)}; + true -> + {"xmem", flush_docs_xmem(XMemSrv, DocsList)} + end, + + case RV of ok -> - ?xdcr_debug("worker process replicated ~p docs to target ~p", - [length(DocList), Target#httpdb.url]), + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("replication mode: ~p, worker process replicated ~p docs to target ~p", + [RepMode, length(DocsList), Target#httpdb.url]); + _ -> + ok + end, ok; - {ok, {Props}} -> - DbUri = couch_api_wrap:db_uri(Target), - ?xdcr_error("Replicator: couldn't write document `~s`, revision `~s`," - " to target database `~s`. Error: `~s`, reason: `~200s`.", - [get_value(<<"id">>, Props, ""), get_value(<<"rev">>, Props, ""), DbUri, - get_value(<<"error">>, Props, ""), get_value(<<"reason">>, Props, "")]), - exit({failed_write, Props}) + {failed_write, Error} -> + ?xdcr_error("replication mode: ~p, unable to replicate ~p docs to target ~p", + [RepMode, length(DocsList), Target#httpdb.url]), + exit({failed_write, Error}) end. + %% return list of Docsinfos of missing keys --spec find_missing(list(), #httpdb{}, boolean()) -> {list(), integer()}. -find_missing(DocInfos, Target, OptRepThreshold) -> +-spec find_missing(list(), #httpdb{}, integer(), pid()) -> {list(), integer()}. +find_missing(DocInfos, Target, OptRepThreshold, XMemSrv) -> Start = now(), %% depending on doc body size, we separate all keys into two groups: @@ -174,7 +208,7 @@ find_missing(DocInfos, Target, OptRepThreshold) -> {Missing, MissingBigDocCount} = case length(BigDocIdRevs) of V when V > 0 -> - {ok , MissingBigIdRevs} = couch_api_wrap:get_missing_revs(Target, BigDocIdRevs), + MissingBigIdRevs = find_missing_helper(Target, BigDocIdRevs, XMemSrv), {lists:flatten([SmallDocIdRevs | MissingBigIdRevs]), length(MissingBigIdRevs)}; _ -> {SmallDocIdRevs, 0} @@ -194,15 +228,133 @@ find_missing(DocInfos, Target, OptRepThreshold) -> DocInfos), %% latency in millisecond - Latency = round(timer:now_diff(now(), Start) div 1000), + TotalLatency = round(timer:now_diff(now(), Start) div 1000), + Latency = case is_pid(XMemSrv) of + true -> + %% xmem is single doc based + try (TotalLatency div BigDocCount) of + X -> X + catch + error:badarith -> 0 + end; + _ -> + TotalLatency + end, + - ?xdcr_debug("out of all ~p docs, number of small docs (including dels: ~p) is ~p, " - "number of big docs is ~p, threshold is ~p bytes, ~n\t" - "after conflict resolution at target (~p), out of all big ~p docs " - "the number of docs we need to replicate is: ~p; ~n\t " - "total # of docs to be replicated is: ~p, total latency: ~p ms", - [AllRevsCount, DelCount, SmallDocCount, BigDocCount, OptRepThreshold, - Target#httpdb.url, BigDocCount, MissingBigDocCount, - length(MissingDocInfoList), Latency]), + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + RepMode = case is_pid(XMemSrv) of + true -> + "xmem"; + _ -> + "capi" + end, + ?xdcr_debug("[replication mode: ~p] out of all ~p docs, number of small docs (including dels: ~p) is ~p, " + "number of big docs is ~p, threshold is ~p bytes, ~n\t" + "after conflict resolution at target (~p), out of all big ~p docs " + "the number of docs we need to replicate is: ~p; ~n\t " + "total # of docs to be replicated is: ~p, total latency: ~p ms", + [RepMode, AllRevsCount, DelCount, SmallDocCount, BigDocCount, OptRepThreshold, + Target#httpdb.url, BigDocCount, MissingBigDocCount, + length(MissingDocInfoList), Latency]); + _ -> + ok + end, {MissingDocInfoList, Latency}. + +-spec find_missing_helper(#httpdb{}, list(), pid()) -> list(). +find_missing_helper(Target, BigDocIdRevs, XMemSrv) -> + MissingIdRevs = case is_pid(XMemSrv) of + false -> + {ok, IdRevs} = couch_api_wrap:get_missing_revs(Target, BigDocIdRevs), + IdRevs; + true -> + {ok, IdRevs} = xdc_vbucket_rep_xmem_srv:find_missing(XMemSrv, BigDocIdRevs), + IdRevs + end, + MissingIdRevs. + +%% ================================================= %% +%% ========= FLUSHING DOCS USING CAPI ============== %% +%% ================================================= %% +-spec maybe_flush_docs_capi(#httpdb{}, #batch{}, #doc{}, integer()) -> {#batch{}, integer()}. +maybe_flush_docs_capi(#httpdb{} = Target, Batch, Doc, DataFlushed) -> + #batch{docs = DocAcc, size = SizeAcc} = Batch, + JsonDoc = couch_doc:to_json_base64(Doc), + + DocBatchSizeByte = xdc_rep_utils:get_replication_batch_size(), + case SizeAcc + iolist_size(JsonDoc) of + SizeAcc2 when SizeAcc2 > DocBatchSizeByte -> + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("Worker flushing doc batch of size ~p bytes " + "(batch limit: ~p)", [SizeAcc2, DocBatchSizeByte]); + _ -> + ok + end, + flush_docs_capi(Target, [JsonDoc | DocAcc]), + %% data flushed, return empty batch and size of data flushed + {#batch{}, SizeAcc2 + DataFlushed}; + SizeAcc2 -> %% no data flushed in this turn, return the new batch + {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, DataFlushed} + end. + +-spec flush_docs_capi(#httpdb{}, list()) -> ok | {failed_write, term()}. +flush_docs_capi(_Target, []) -> + ok; +flush_docs_capi(Target, DocsList) -> + case couch_api_wrap:update_docs(Target, DocsList, [delay_commit], + replicated_changes) of + ok -> + ok; + {ok, {Props}} -> + DbUri = couch_api_wrap:db_uri(Target), + ?xdcr_error("Replicator: couldn't write document `~s`, revision `~s`," + " to target database `~s`. Error: `~s`, reason: `~200s`.", + [get_value(<<"id">>, Props, ""), get_value(<<"rev">>, Props, ""), DbUri, + get_value(<<"error">>, Props, ""), get_value(<<"reason">>, Props, "")]), + {failed_write, Props} + end. + + +%% ================================================= %% +%% ========= FLUSHING DOCS USING XMEM ============== %% +%% ================================================= %% +-spec flush_docs_xmem(pid(), list()) -> ok | {failed_write, term()}. +flush_docs_xmem(_XMemSrv, []) -> + ok; +flush_docs_xmem(XMemSrv, DocsList) -> + xdc_vbucket_rep_xmem_srv:flush_docs(XMemSrv, DocsList). + +-spec maybe_flush_docs_xmem(pid(), #batch{}, #doc{}, integer()) -> {#batch{}, integer()}. +maybe_flush_docs_xmem(XMemSrv, Batch, Doc0, DocsFlushed) -> + #batch{docs = DocAcc, size = SizeAcc} = Batch, + + DocBatchSizeByte = xdc_rep_utils:get_replication_batch_size(), + %% uncompress it if necessary + Doc = couch_doc:with_uncompressed_body(Doc0), + DocSize = case Doc#doc.deleted of + true -> + 0; + _ -> + iolist_size(Doc#doc.body) + end, + + %% if reach the limit in terms of docs, flush them + case SizeAcc + DocSize >= DocBatchSizeByte of + true -> + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("Worker flushing doc batch of ~p bytes ", [SizeAcc + DocSize]); + _ -> + ok + end, + DocsList = [Doc| DocAcc], + ok = flush_docs_xmem(XMemSrv, DocsList), + %% data flushed, return empty batch and size of # of docs flushed + {#batch{}, DocsFlushed + SizeAcc + DocSize}; + _ -> %% no data flushed in this turn, return the new batch + {#batch{docs = [Doc | DocAcc], size = SizeAcc + DocSize}, DocsFlushed} + end. diff --git a/src/xdc_vbucket_rep_xmem_srv.erl b/src/xdc_vbucket_rep_xmem_srv.erl new file mode 100644 index 0000000000..926dd1b2b4 --- /dev/null +++ b/src/xdc_vbucket_rep_xmem_srv.erl @@ -0,0 +1,349 @@ +%% @author Couchbase +%% @copyright 2011 Couchbase, Inc. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. + +%% This module is responsible for communication to remote memcached process for +%% individual vbucket. It gets tarted and stopped by the vbucket replicator module + + +-module(xdc_vbucket_rep_xmem_srv). +-behaviour(gen_server). + +%% public functions +-export([start_link/3]). +-export([init/1, terminate/2, code_change/3]). +-export([handle_call/3, handle_cast/2, handle_info/2]). + +-export([connect/1, disconnect/1, select_bucket/1, stop/1]). +-export([find_missing/2, flush_docs/2, ensure_full_commit/1]). + +-include("xdc_replicator.hrl"). +-include("remote_clusters_info.hrl"). + + +%% -------------------------------------------------------------------------- %% +%% --- public functions --- %% +%% -------------------------------------------------------------------------- %% +start_link(Vb, RemoteXMem, ParentVbRep) -> + %% prepare parameters to start xmem server process + DefaultWorkers = xdc_rep_utils:get_xmem_worker(), + Options = {Vb, RemoteXMem, ParentVbRep, DefaultWorkers}, + {ok, Pid} = gen_server:start_link(?MODULE, Options, []), + ?xdcr_debug("xmem server started (vb: ~p, pid: ~p, options: ~p)", + [Vb, Pid, Options]), + {ok, Pid}. + +%% gen_server behavior callback functions +init({Vb, RemoteXMem, ParentVbRep, NumWorkers}) -> + process_flag(trap_exit, true), + %% signal to self to initialize + {ok, AllWorkers} = start_worker_process(Vb, NumWorkers), + {T1, T2, T3} = now(), + random:seed(T1, T2, T3), + Errs = ringbuffer:new(?XDCR_ERROR_HISTORY), + Pipeline = xdc_rep_utils:enable_pipeline_ops(), + InitState = #xdc_vb_rep_xmem_srv_state{vb = Vb, + parent_vb_rep = ParentVbRep, + remote = RemoteXMem, + statistics = #xdc_vb_rep_xmem_statistics{}, + pid_workers = AllWorkers, + num_workers = NumWorkers, + enable_pipeline = Pipeline, + seed = {T1, T2, T3}, + error_reports = Errs}, + + ?xdcr_debug("xmem server (vb: ~p) initialized (remote ip: ~p, port: ~p, " + "# of xmem workers: ~p)", + [Vb, + RemoteXMem#xdc_rep_xmem_remote.ip, + RemoteXMem#xdc_rep_xmem_remote.port, + dict:size(AllWorkers)]), + + {ok, InitState}. + +connect(Server) -> + gen_server:call(Server, connect, infinity). + +disconnect(Server) -> + gen_server:call(Server, disconnect, infinity). + +stop(Server) -> + gen_server:cast(Server, stop). + +select_bucket(Server) -> + gen_server:call(Server, select_bucket, infinity). + +-spec find_missing(pid(), list()) -> {ok, list()} | + {error, term()}. +find_missing(Server, IdRevs) -> + gen_server:call(Server, {find_missing, IdRevs}, infinity). + +-spec flush_docs(pid(), list()) -> ok | {error, term()}. +flush_docs(Server, DocsList) -> + gen_server:call(Server, {flush_docs, DocsList}, infinity). + +ensure_full_commit(Server) -> + gen_server:call(Server, ensure_full_commit). + +handle_info({'EXIT',_Pid, normal}, St) -> + {noreply, St}; + +handle_info({'EXIT',_Pid, Reason}, St) -> + {stop, Reason, St}. + +handle_call(connect, {_Pid, _Tag}, + #xdc_vb_rep_xmem_srv_state{ + remote = Remote, + pid_workers = Workers, + vb = Vb} = State) -> + + %%ask workers to connect remote memcached + ConnectWorkers = lists:foldl( + fun({Id, {Worker, idle}}, Acc) -> + RV = xdc_vbucket_rep_xmem_worker:connect(Worker, Remote), + Acc1 = case RV of + ok -> + dict:store(Id, {Worker, connected}, Acc); + _ -> + ?xdcr_error("Error! Worker ~p (pid: ~p, vb: ~p) " + "failed to connect remote ~p", + [Id, Worker, Vb, Remote]), + Acc + end, + Acc1 + end, + dict:new(), + dict:to_list(Workers)), + NewState = State#xdc_vb_rep_xmem_srv_state{pid_workers = ConnectWorkers}, + {reply, ok, NewState}; + +handle_call(disconnect, {_Pid, _Tag}, + #xdc_vb_rep_xmem_srv_state{remote = Remote, + pid_workers = Workers, vb = Vb} = State) -> + %%ask workers to connect remote memcached + IdleWorkers = lists:foldl( + fun({Id, {Worker, _Status}}, Acc) -> + RV = xdc_vbucket_rep_xmem_worker:disconnect(Worker), + Acc1 = case RV of + ok -> + dict:store(Id, {Worker, idle}, Acc); + _ -> + ?xdcr_error("Error! Worker ~p (pid: ~p, vb: ~p) " + "failed to disconnect remote ~p", + [Id, Worker, Vb, Remote]), + Acc + end, + Acc1 + end, + dict:new(), + dict:to_list(Workers)), + {reply, ok, State#xdc_vb_rep_xmem_srv_state{pid_workers = IdleWorkers}}; + +handle_call(select_bucket, {_Pid, _Tag}, + #xdc_vb_rep_xmem_srv_state{ + remote = Remote, + pid_workers = Workers, + vb = Vb} = State) -> + ConnectWorkers = lists:foldl( + fun({Id, {Worker, connected}}, Acc) -> + RV = xdc_vbucket_rep_xmem_worker:select_bucket(Worker, Remote), + Acc1 = case RV of + ok -> + dict:store(Id, {Worker, bucket_selected}, Acc); + _ -> + ?xdcr_error("Error! worker ~p (pid: ~p, vb: ~p) " + "failed to select target bucekt at remote ~p", + [Id, Worker, Vb, Remote]), + Acc + end, + Acc1 + end, + dict:new(), + dict:to_list(Workers)), + NewState = State#xdc_vb_rep_xmem_srv_state{pid_workers = ConnectWorkers}, + {reply, ok, NewState}; + +handle_call({find_missing, IdRevs}, _From, + #xdc_vb_rep_xmem_srv_state{vb = Vb, pid_workers = Workers, enable_pipeline = Pipeline} = State) -> + + WorkerPid = load_balancer(Vb, Workers), + TimeStart = now(), + {ok, MissingIdRevs} = + case Pipeline of + false -> + xdc_vbucket_rep_xmem_worker:find_missing(WorkerPid, IdRevs); + _ -> + xdc_vbucket_rep_xmem_worker:find_missing_pipeline(WorkerPid, IdRevs) + end, + TimeSpent = timer:now_diff(now(), TimeStart) div 1000, + NumIdRevs = length(IdRevs), + AvgLatency = TimeSpent div NumIdRevs, + + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("[xmem_srv for vb ~p]: out of ~p keys, we need to send ~p " + "(worker: ~p, avg latency: ~p ms).", + [Vb, NumIdRevs, length(MissingIdRevs), WorkerPid, AvgLatency]); + _ -> + ok + end, + + {reply, {ok, MissingIdRevs}, State}; + +handle_call({flush_docs, DocsList}, _From, + #xdc_vb_rep_xmem_srv_state{vb = Vb, pid_workers = Workers, + enable_pipeline = Pipeline} = State) -> + + WorkerPid = load_balancer(Vb, Workers), + TimeStart = now(), + {ok, NumDocRepd, NumDocRejected} = + case Pipeline of + false -> + xdc_vbucket_rep_xmem_worker:flush_docs(WorkerPid, DocsList); + _ -> + xdc_vbucket_rep_xmem_worker:flush_docs_pipeline(WorkerPid, DocsList) + end, + TimeSpent = timer:now_diff(now(), TimeStart) div 1000, + AvgLatency = TimeSpent div length(DocsList), + + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("[xmem_srv for vb ~p]: out of total ~p docs, " + "# of docs accepted by remote: ~p " + "# of docs rejected by remote: ~p" + "(worker: ~p," + "time spent in ms: ~p, avg latency per doc in ms: ~p)", + [Vb, length(DocsList), + NumDocRepd, NumDocRejected, + WorkerPid, TimeSpent, AvgLatency]); + _ -> + ok + end, + + {reply, ok, State}; + + +handle_call(ensure_full_commit, _From, + #xdc_vb_rep_xmem_srv_state{vb = Vb, remote = Remote, + pid_workers = Workers} = State) -> + WorkerPid = load_balancer(Vb, Workers), + RV = xdc_vbucket_rep_xmem_worker:ensure_full_commit(WorkerPid, Remote#xdc_rep_xmem_remote.bucket), + {reply, RV, State}; + +handle_call(stats, _From, + #xdc_vb_rep_xmem_srv_state{} = State) -> + Props = [], + NewState = State, + {reply, {ok, Props}, NewState}; + +handle_call(Msg, From, #xdc_vb_rep_xmem_srv_state{vb = Vb} = State) -> + ?xdcr_error("[xmem_srv for vb ~p]: received unexpected call ~p from process ~p", + [Vb, Msg, From]), + {stop, {error, {unexpected_call, Msg, From}}, State}. + +handle_cast(stop, #xdc_vb_rep_xmem_srv_state{vb = Vb} = State) -> + %% let terminate() do the cleanup + ?xdcr_debug("[xmem_srv for vb ~p]: receive stop, let terminate() clean up", [Vb]), + {stop, normal, State}; + +handle_cast({report_error, Err}, #xdc_vb_rep_xmem_srv_state{error_reports = Errs} = State) -> + {noreply, State#xdc_vb_rep_xmem_srv_state{error_reports = ringbuffer:add(Err, Errs)}}; + +handle_cast(Msg, #xdc_vb_rep_xmem_srv_state{vb = Vb} = State) -> + ?xdcr_error("[xmem_srv for vb ~p]: received unexpected cast ~p", [Vb, Msg]), + {stop, {error, {unexpected_cast, Msg}}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(Reason, State) when Reason == normal orelse Reason == shutdown -> + terminate_cleanup(State); + +terminate(Reason, #xdc_vb_rep_xmem_srv_state{vb = Vb, parent_vb_rep = Par} = State) -> + + ?xdcr_error("[xmem_srv for vb ~p]: shutdown xmem server for reason: ~p", + [Vb, Reason]), + report_error(Reason, Vb, Par), + terminate_cleanup(State), + ok. + +terminate_cleanup(#xdc_vb_rep_xmem_srv_state{vb = Vb, pid_workers = Workers} = _State) -> + %% close sock and shutdown each worker process + {Gone, Shutdown} = + lists:foldl( + fun({_Id, {WorkerPid, _Status}}, {WorkersGone, WorkersShutdown}) -> + case process_info(WorkerPid) of + undefined -> + %% already gone + ?xdcr_debug("worker (pid: ~p) already gone", [WorkerPid]), + WorkersGone1 = lists:flatten([WorkerPid | WorkersGone]), + {WorkersGone1, WorkersShutdown}; + _ -> + ok = xdc_vbucket_rep_xmem_worker:disconnect(WorkerPid), + ok = xdc_vbucket_rep_xmem_worker:stop(WorkerPid), + WorkersShutdown1 = lists:flatten([WorkerPid | WorkersShutdown]), + {WorkersGone, WorkersShutdown1} + end + end, + {[], []}, + dict:to_list(Workers)), + + ?xdcr_debug("[xmem_srv for vb ~p]: worker process (~p) already gone, shutdown processes (~p)", + [Vb, Gone, Shutdown]), + + ok. + + +%% -------------------------------------------------------------------------- %% +%% --- internal helper functions --- %% +%% -------------------------------------------------------------------------- %% +report_error(Err, _Vb, _Parent) when Err == normal orelse Err == shutdown -> + ok; +report_error(Err, Vb, Parent) -> + %% return raw erlang time to make it sortable + RawTime = erlang:localtime(), + Time = misc:iso_8601_fmt(RawTime), + String = iolist_to_binary(io_lib:format("~s XMem error replicating vbucket ~p: ~p", + [Time, Vb, Err])), + gen_server:cast(Parent, {report_error, {RawTime, String}}). + + +-spec start_worker_process(integer(), integer()) -> dict(). +start_worker_process(Vb, NumWorkers) -> + WorkerDict = dict:new(), + AllWorkers = lists:foldl( + fun(Id, Acc) -> + {ok, Pid} = xdc_vbucket_rep_xmem_worker:start_link(Vb, Id, self(), []), + dict:store(Id, {Pid, idle}, Acc) + end, + WorkerDict, + lists:seq(1, NumWorkers)), + + ?xdcr_debug("all xmem worker processes have started (vb: ~p, num of workers: ~p)", + [Vb, dict:size(AllWorkers)]), + {ok, AllWorkers}. + +-spec load_balancer(integer(), list()) -> pid(). +load_balancer(Vb, Workers) -> + NumWorkers = dict:size(Workers), + Index = random:uniform(NumWorkers), + {Id, {WorkerPid, bucket_selected}} = lists:nth(Index, dict:to_list(Workers)), + case random:uniform(xdc_rep_utils:get_trace_dump_invprob()) of + 1 -> + ?xdcr_debug("[xmem_srv for vb ~p]: pick up worker process (id: ~p, pid: ~p)", [Vb, Id, WorkerPid]); + _ -> + ok + end, + WorkerPid. + diff --git a/src/xdc_vbucket_rep_xmem_worker.erl b/src/xdc_vbucket_rep_xmem_worker.erl new file mode 100644 index 0000000000..f54e53a621 --- /dev/null +++ b/src/xdc_vbucket_rep_xmem_worker.erl @@ -0,0 +1,693 @@ +%% @author Couchbase +%% @copyright 2011 Couchbase, Inc. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. + +-module(xdc_vbucket_rep_xmem_worker). +-behaviour(gen_server). + +%% public functions +-export([start_link/4]). +-export([init/1, terminate/2, code_change/3]). +-export([handle_call/3, handle_cast/2, handle_info/2]). +-export([find_missing/2, flush_docs/2, ensure_full_commit/2]). +-export([connect/2, select_bucket/2, disconnect/1, stop/1]). + +-export([find_missing_pipeline/2, flush_docs_pipeline/2]). + +-include("xdc_replicator.hrl"). +-include("remote_clusters_info.hrl"). +-include("mc_constants.hrl"). +-include("mc_entry.hrl"). + + +%% -------------------------------------------------------------------------- %% +%% --- public functions --- %% +%% -------------------------------------------------------------------------- %% +start_link(Vb, Id, Parent, Options) -> + {ok, Pid} = gen_server:start_link(?MODULE, {Vb, Id, Parent, Options}, []), + {ok, Pid}. + + +%% gen_server behavior callback functions +init({Vb, Id, Parent, Options}) -> + process_flag(trap_exit, true), + + Errs = ringbuffer:new(?XDCR_ERROR_HISTORY), + InitState = #xdc_vb_rep_xmem_worker_state{ + id = Id, + vb = Vb, + parent_server_pid = Parent, + options = Options, + status = init, + statistics = #xdc_vb_rep_xmem_statistics{}, + socket = undefined, + time_init = calendar:now_to_local_time(erlang:now()), + time_connected = 0, + error_reports = Errs}, + + {ok, InitState}. + +select_bucket(Server, Remote) -> + gen_server:call(Server, {select_bucket, Remote}, infinity). + +-spec find_missing(pid(), list()) -> {ok, list()} | {error, term()}. +find_missing(Server, IdRevs) -> + gen_server:call(Server, {find_missing, IdRevs}, infinity). + +-spec find_missing_pipeline(pid(), list()) -> {ok, list()} | {error, term()}. +find_missing_pipeline(Server, IdRevs) -> + gen_server:call(Server, {find_missing_pipeline, IdRevs}, infinity). + +-spec flush_docs(pid(), list()) -> {ok, list()} | {error, term()}. +flush_docs(Server, DocsList) -> + gen_server:call(Server, {flush_docs, DocsList}, infinity). + +-spec flush_docs_pipeline(pid(), list()) -> {ok, list()} | {error, term()}. +flush_docs_pipeline(Server, DocsList) -> + gen_server:call(Server, {flush_docs_pipeline, DocsList}, infinity). +%% gen_server:call(Server, {flush_docs, DocsList}, infinity). + +-spec connect(pid(), #xdc_rep_xmem_remote{}) -> {ok, list()} | {error, term()}. +connect(Server, Remote) -> + gen_server:call(Server, {connect, Remote}, infinity). + +disconnect(Server) -> + gen_server:call(Server, disconnect, infinity). + +ensure_full_commit(Server, RemoteBucket) -> + gen_server:call(Server, {ensure_full_commit, RemoteBucket}, infinity). + +stop(Server) -> + gen_server:cast(Server, stop). + +handle_info({'EXIT',_Pid, normal}, St) -> + {noreply, St}; + +handle_info({'EXIT',_Pid, Reason}, St) -> + {stop, Reason, St}. + +handle_call({connect, #xdc_rep_xmem_remote{} = Remote}, {_Pid, _Tag}, + #xdc_vb_rep_xmem_worker_state{id = Id, vb = Vb} = State) -> + %% establish connection to remote memcached + Socket = case connect_internal(Remote) of + {ok, S} -> + S; + _ -> + ?xdcr_error("[xmem_worker ~p for vb ~p]: unable to connect remote memcached" + "(ip: ~p, port: ~p) after ~p attempts", + [Id, Vb, + Remote#xdc_rep_xmem_remote.ip, + Remote#xdc_rep_xmem_remote.port, + ?XDCR_XMEM_CONNECTION_ATTEMPTS]), + nil + end, + {reply, ok, State#xdc_vb_rep_xmem_worker_state{socket = Socket, + time_connected = calendar:now_to_local_time(erlang:now()), + status = connected}}; + +handle_call(disconnect, {_Pid, _Tag}, #xdc_vb_rep_xmem_worker_state{} = State) -> + State1 = close_connection(State), + {reply, ok, State1}; + +handle_call({select_bucket, #xdc_rep_xmem_remote{} = Remote}, {_Pid, _Tag}, + #xdc_vb_rep_xmem_worker_state{id = Id, vb = Vb, socket = Socket} = State) -> + %% establish connection to remote memcached + case select_bucket_internal(Remote, Socket) of + ok -> + ok; + _ -> + ?xdcr_error("[xmem_worker ~p for vb ~p]: unable to select remote bucket ~p at node ~p", + [Id, Vb, + Remote#xdc_rep_xmem_remote.bucket, + Remote#xdc_rep_xmem_remote.ip]) + end, + {reply, ok, State#xdc_vb_rep_xmem_worker_state{status = bucket_selected}}; + +handle_call({find_missing, IdRevs}, _From, + #xdc_vb_rep_xmem_worker_state{vb = Vb, + socket = Socket} = State) -> + TimeStart = now(), + MissingIdRevs = + lists:foldr(fun({Key, Rev}, Acc) -> + Missing = is_missing(Socket, Vb, {Key, Rev}), + Acc1 = case Missing of + true -> + [{Key, Rev} | Acc]; + _ -> + Acc + end, + Acc1 + end, + [], IdRevs), + + TimeSpent = timer:now_diff(now(), TimeStart) div 1000, + _AvgLatency = TimeSpent div length(IdRevs), + {reply, {ok, MissingIdRevs}, State}; + +%% ----------- Pipelined Memached Ops --------------%% +handle_call({find_missing_pipeline, IdRevs}, _From, + #xdc_vb_rep_xmem_worker_state{vb = Vb, + socket = Sock} = State) -> + TimeStart = now(), + McHeader = #mc_header{vbucket = Vb}, + %% send out all keys + Pid = spawn_link(fun () -> + _NumKeysSent = + lists:foldr(fun({Key, _Rev}, Acc) -> + Entry = #mc_entry{key = Key}, + ok = mc_cmd_pipeline(?CMD_GET_META, Sock, + {McHeader, Entry}, undefined), + Acc + 1 + end, + 0, IdRevs) + end), + + %% receive all response + MissingIdRevs = + lists:foldr(fun({Key, SrcMeta}, Acc) -> + Missing = case receive_remote_meta_pipeline(Sock, Vb) of + {key_enoent, _Error, _CAS} -> + true; + {ok, DestMeta, _CAS} -> + case max(SrcMeta, DestMeta) of + SrcMeta -> + true; %% need to replicate to remote + DestMeta -> + false; %% no need to replicate + _ -> + false + end; + {error, Error, Msg} -> + ?xdcr_error("Error! err ~p (error msg: ~p) found in replication, " + "abort the worker thread", [Error, Msg]), + throw({bad_request, Error, Msg}) + end, + case Missing of + true -> + [{Key, SrcMeta} | Acc]; + _ -> + Acc + end + end, + [], IdRevs), + + erlang:unlink(Pid), + erlang:exit(Pid, kill), + misc:wait_for_process(Pid, infinity), + + TimeSpent = timer:now_diff(now(), TimeStart) div 1000, + _AvgLatency = TimeSpent div length(IdRevs), + {reply, {ok, MissingIdRevs}, State}; + +handle_call({flush_docs, DocsList}, _From, + #xdc_vb_rep_xmem_worker_state{id = Id, vb = VBucket, + socket = Socket} = State) -> + TimeStart = now(), + %% enumerate all docs and update them + {NumDocsRepd, NumDocsRejected, Errors} = + lists:foldr( + fun (#doc{id = Key, rev = Rev} = Doc, {AccRepd, AccRejd, ErrorAcc}) -> + case flush_single_doc(Key, Socket, VBucket, Doc, 2) of + {ok, flushed} -> + {(AccRepd + 1), AccRejd, ErrorAcc}; + {ok, rejected} -> + {AccRepd, AccRejd, ErrorAcc}; + {error, Error} -> + ?xdcr_error("Error! unable to flush doc (key: ~p, rev: ~p) due to error ~p", + [Key, Rev, Error]), + ErrorsAcc1 = [{{Key, Rev}, Error} | ErrorAcc], + {AccRepd, AccRejd, ErrorsAcc1} + end + end, + {0, 0, []}, DocsList), + + TimeSpent = timer:now_diff(now(), TimeStart) div 1000, + _AvgLatency = TimeSpent div length(DocsList), + + %% dump error msg if timeout + {value, DefaultConnTimeout} = ns_config:search(xdcr_connection_timeout), + DefTimeoutSecs = misc:getenv_int("XDCR_CONNECTION_TIMEOUT", DefaultConnTimeout), + TimeSpentSecs = TimeSpent div 1000, + case TimeSpentSecs > DefTimeoutSecs of + true -> + ?xdcr_error("[xmem_worker ~p for vb ~p]: update ~p docs takes too long to finish!" + "(total time spent: ~p secs, default connection time out: ~p secs)", + [Id, VBucket, length(DocsList), TimeSpentSecs, DefTimeoutSecs]); + _ -> + ok + end, + + case Errors of + [] -> + ok; + _ -> + %% for some reason we can only return one error. Thus + %% we're logging everything else here + ?xdcr_error("[xmem_worker ~p for vb ~p]: Error, could not " + "update docs. Time spent in ms: ~p, " + "# of docs trying to update: ~p, error msg: ~n~p", + [Id, VBucket, TimeSpent, length(DocsList), Errors]), + ok + end, + + {reply, {ok, NumDocsRepd, NumDocsRejected}, State}; + +%% ----------- Pipelined Memached Ops --------------%% +handle_call({flush_docs_pipeline, DocsList}, _From, + #xdc_vb_rep_xmem_worker_state{id = Id, vb = VBucket, + socket = Sock} = State) -> + TimeStart = now(), + + %% send out all docs + Pid = spawn_link( + fun () -> + _NumDocsSent = + lists:foldr( + fun (#doc{id = Key, rev = Rev, deleted = Deleted, + body = DocValue} = _Doc, Acc) -> + {OpCode, Data} = case Deleted of + true -> + {?CMD_DEL_WITH_META, <<>>}; + _ -> + {?CMD_SET_WITH_META, DocValue} + end, + McHeader = #mc_header{vbucket = VBucket, opcode = OpCode}, + Ext = mc_client_binary:rev_to_mcd_ext(Rev), + %% CAS does not matter since remote ep_engine has capability + %% to do getMeta internally before doing setWithMeta or delWithMeta + CAS = 0, + McBody = #mc_entry{key = Key, data = Data, ext = Ext, cas = CAS}, + + ok = mc_cmd_pipeline(OpCode, Sock, {McHeader, McBody}, undefined), + Acc + 1 + end, + 0, + DocsList) + end), + + %% receive all responses + {Flushed, Enoent, Eexist, NotMyVb, Einval, Timeout} = + lists:foldr(fun(#doc{id = _Key} = _Doc, + {FlushedAcc, EnoentAcc, EexistsAcc, NotMyVbAcc, EinvalAcc, Timeout}) -> + case get_flush_response_pipeline(Sock, VBucket) of + {ok, _, _} -> + {FlushedAcc + 1, EnoentAcc, EexistsAcc, NotMyVbAcc, EinvalAcc, Timeout}; + {memcached_error, key_enoent, _} -> + {FlushedAcc, EnoentAcc + 1, EexistsAcc, NotMyVbAcc, EinvalAcc, Timeout}; + {memcached_error, key_eexists, _} -> + {FlushedAcc, EnoentAcc, EexistsAcc + 1, NotMyVbAcc, EinvalAcc, Timeout}; + {memcached_error, not_my_vbucket, _} -> + {FlushedAcc, EnoentAcc, EexistsAcc, NotMyVbAcc + 1, EinvalAcc, Timeout}; + {memcached_error, einval, _} -> + {FlushedAcc, EnoentAcc, EexistsAcc, NotMyVbAcc, EinvalAcc + 1, Timeout}; + {memcached_error, timeout, _} -> + {FlushedAcc, EnoentAcc, EexistsAcc, NotMyVbAcc, EinvalAcc, Timeout + 1} + end + end, + {0, 0, 0, 0, 0, 0}, DocsList), + + erlang:unlink(Pid), + erlang:exit(Pid, kill), + misc:wait_for_process(Pid, infinity), + + TimeSpent = timer:now_diff(now(), TimeStart) div 1000, + _AvgLatency = TimeSpent div length(DocsList), + + %% dump error msg if timeout + {value, DefaultConnTimeout} = ns_config:search(xdcr_connection_timeout), + DefTimeoutSecs = misc:getenv_int("XDCR_CONNECTION_TIMEOUT", DefaultConnTimeout), + TimeSpentSecs = TimeSpent div 1000, + case TimeSpentSecs > DefTimeoutSecs of + true -> + ?xdcr_error("[xmem_worker ~p for vb ~p]: update ~p docs takes too long to finish!" + "(total time spent: ~p secs, default connection time out: ~p secs)", + [Id, VBucket, length(DocsList), TimeSpentSecs, DefTimeoutSecs]); + _ -> + ok + end, + + DocsListSize = length(DocsList), + RV = + case (Flushed + Eexist) == DocsListSize of + true -> + {reply, {ok, Flushed, Eexist}, State}; + _ -> + %% for some reason we can only return one error. Thus + %% we're logging everything else here + ?xdcr_error("out of ~p docs, succ to flush ~p docs, fail to flush others " + "(by error type, enoent: ~p, not-my-vb: ~p, einval: ~p, timeout: ~p", + [DocsListSize, (Flushed + Eexist), Enoent, NotMyVb, Einval, Timeout]), + {stop, {error, {Flushed, Eexist, Enoent, NotMyVb, Einval, Timeout}}, State} + end, + RV; + +handle_call({ensure_full_commit, Bucket}, _From, + #xdc_vb_rep_xmem_worker_state{vb = VBucket, + socket = Socket, + statistics = OldStat} = State) -> + RV = ensure_full_commit_internal(Socket, Bucket, VBucket), + CkptIssued = OldStat#xdc_vb_rep_xmem_statistics.ckpt_issued, + CkptFailed = OldStat#xdc_vb_rep_xmem_statistics.ckpt_failed, + + Stat = case RV of + {ok, _} -> + OldStat#xdc_vb_rep_xmem_statistics{ckpt_issued = CkptIssued + 1}; + _ -> + ?xdcr_error("failed to issue a ckpt for vb ~p (bucket: ~p), " + "total ckpt issued (succ: ~p, fail: ~p)", + [VBucket, Bucket, CkptIssued, (CkptFailed+1)]), + OldStat#xdc_vb_rep_xmem_statistics{ckpt_failed = CkptFailed + 1} + end, + {reply, RV, State#xdc_vb_rep_xmem_worker_state{statistics = Stat}}; + +handle_call(Msg, From, #xdc_vb_rep_xmem_worker_state{vb = Vb, + id = Id} = State) -> + ?xdcr_error("[xmem_worker ~p for vb ~p]: received unexpected call ~p from process ~p", + [Id, Vb, Msg, From]), + {stop, {error, {unexpected_call, Msg, From}}, State}. + + +%% --- handle_cast --- %% +handle_cast(stop, #xdc_vb_rep_xmem_worker_state{} = State) -> + %% let terminate() do the cleanup + {stop, normal, State}; + +handle_cast({report_error, Err}, #xdc_vb_rep_xmem_worker_state{error_reports = Errs} = State) -> + {noreply, State#xdc_vb_rep_xmem_worker_state{error_reports = ringbuffer:add(Err, Errs)}}; + +handle_cast(Msg, #xdc_vb_rep_xmem_worker_state{id = Id, vb = Vb} = State) -> + ?xdcr_error("[xmem_worker ~p for vb ~p]: received unexpected cast ~p", + [Id, Vb, Msg]), + {stop, {error, {unexpected_cast, Msg}}, State}. + + +%% default gen_server callbacks +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +terminate(Reason, State) when Reason == normal orelse Reason == shutdown -> + terminate_cleanup(State); + +terminate(Reason, #xdc_vb_rep_xmem_worker_state{vb = Vb, + id = Id, + parent_server_pid = Par} = State) -> + report_error(Reason, Vb, Par), + ?xdcr_error("[xmem_worker ~p for vb ~p]: Shutting xmem worker for reason: ~p", + [Id, Vb, Reason]), + terminate_cleanup(State), + ok. + +terminate_cleanup(#xdc_vb_rep_xmem_worker_state{} = State) -> + close_connection(State), + ok. + +%% -------------------------------------------------------------------------- %% +%% --- internal helper functions --- %% +%% -------------------------------------------------------------------------- %% + +report_error(Err, _Vb, _Parent) when Err == normal orelse Err == shutdown -> + ok; +report_error(Err, Vb, Parent) -> + %% return raw erlang time to make it sortable + RawTime = erlang:localtime(), + Time = misc:iso_8601_fmt(RawTime), + String = iolist_to_binary(io_lib:format("~s - Error replicating vbucket ~p: ~p", + [Time, Vb, Err])), + gen_server:cast(Parent, {report_error, {RawTime, String}}). + +close_connection(#xdc_vb_rep_xmem_worker_state{socket = Socket} = State) -> + case Socket of + undefined -> + ok; + _ -> + ok = gen_tcp:close(Socket) + end, + State#xdc_vb_rep_xmem_worker_state{socket = undefined, status = idle}. + +connect_internal(#xdc_rep_xmem_remote{} = Remote) -> + connect_internal(?XDCR_XMEM_CONNECTION_ATTEMPTS, Remote). +connect_internal(0, _) -> + {error, couldnt_connect_to_remote_memcached}; +connect_internal(Tries, #xdc_rep_xmem_remote{} = Remote) -> + + Ip = Remote#xdc_rep_xmem_remote.ip, + Port = Remote#xdc_rep_xmem_remote.port, + User = Remote#xdc_rep_xmem_remote.username, + Pass = Remote#xdc_rep_xmem_remote.password, + try + {ok, S} = gen_tcp:connect(Ip, Port, [binary, {packet, 0}, {active, false}]), + ok = mc_client_binary:auth(S, {<<"PLAIN">>, + {list_to_binary(User), + list_to_binary(Pass)}}), + S of + Sock -> {ok, Sock} + catch + E:R -> + ?xdcr_debug("Unable to connect: ~p, retrying.", [{E, R}]), + timer:sleep(1000), % Avoid reconnecting too fast. + NewTries = Tries - 1, + connect_internal(NewTries, Remote) + end. + +-spec select_bucket_internal(#xdc_rep_xmem_remote{}, inet:socket()) -> ok. +select_bucket_internal(#xdc_rep_xmem_remote{bucket = Bucket} = _Remote, Socket) -> + mc_client_binary:select_bucket(Socket, Bucket). + +-spec is_missing(inet:socket(), integer(), {integer(), term()}) -> boolean(). +is_missing(Socket, VBucket, {Key, LocalMeta}) -> + case get_remote_meta(Socket, VBucket, Key) of + {key_enoent, _Error, _CAS} -> + true; + {not_my_vbucket, Error} -> + throw({bad_request, not_my_vbucket, Error}); + {ok, RemoteMeta, _CAS} -> + case max(RemoteMeta, LocalMeta) of + LocalMeta -> + true; %% need to replicate to remote + RemoteMeta -> + false; %% no need to replicate + _ -> + false + end + end. + +-spec get_remote_meta(inet:socket(), integer(), term()) -> {ok, term(), integer()} | + {key_enoent, list(), integer()} | + {not_my_vbucket, list()}. +get_remote_meta(Socket, VBucket, Key) -> + %% issue get_meta to remote memcached + Reply = case mc_client_binary:get_meta(Socket, Key, VBucket) of + {memcached_error, key_enoent, RemoteCAS} -> + {key_enoent, "remote_memcached_error: key does not exist", RemoteCAS}; + {memcached_error, not_my_vbucket, _} -> + ErrorMsg = ?format_msg("remote_memcached_error: not my vbucket (vb: ~p)", [VBucket]), + {not_my_vbucket, ErrorMsg}; + {ok, RemoteFullMeta, RemoteCAS, _Flags} -> + {ok, RemoteFullMeta, RemoteCAS} + end, + Reply. + +-spec flush_single_doc(integer(), inet:socket(), integer(), #doc{}, integer()) -> ok | + {error, {term(), term()}}. +flush_single_doc(Id, _Socket, VBucket, #doc{id = DocId} = _Doc, 0) -> + ?xdcr_error("[xmem_worker ~p for vb ~p]: Error, unable to flush doc (key: ~p) " + "to destination, maximum retry reached.", + [Id, VBucket, DocId]), + {error, {bad_request, max_retry}}; + +flush_single_doc(Id, Socket, VBucket, + #doc{id = DocId, rev = DocRev, body = DocValue, + deleted = DocDeleted} = Doc, Retry) -> + {SrcSeqNo, SrcRevId} = DocRev, + ConflictRes = case ?XDCR_LOCAL_CONFLICT_RESOLUTION of + false -> + {key_enoent, "no local conflict resolution, send it optimistically", 0}; + _ -> + get_remote_meta(Socket, VBucket, DocId) + end, + + RV = case ConflictRes of + {key_enoent, _ErrorMsg, DstCAS} -> + flush_single_doc_remote(Socket, VBucket, DocId, DocValue, DocRev, DocDeleted, DstCAS); + {not_my_vbucket, _} -> + {error, {bad_request, not_my_vbucket}}; + {ok, {DstSeqNo, DstRevId}, DstCAS} -> + DstFullMeta = {DstSeqNo, DstRevId}, + SrcFullMeta = {SrcSeqNo, SrcRevId}, + case max(SrcFullMeta, DstFullMeta) of + DstFullMeta -> + ok; + %% replicate src doc to destination, using + %% the same CAS returned from the get_remote_meta() above. + SrcFullMeta -> + flush_single_doc_remote(Socket, VBucket, DocId, DocValue, DocRev, DocDeleted, DstCAS) + end + end, + + case RV of + retry -> + flush_single_doc(Id, Socket, VBucket, Doc, Retry-1); + ok -> + {ok, flushed}; + {memcached_error, key_eexists} -> + {ok, rejected}; + _Other -> + RV + end. + + +flush_single_doc_remote(Socket, VBucket, Key, Value, Rev, DocDeleted, CAS) -> + case mc_client_binary:update_with_rev(Socket, VBucket, Key, Value, Rev, DocDeleted, CAS) of + {ok, _, _} -> + ok; + {memcached_error, key_eexists, _} -> + {ok, key_eexists}; + {memcached_error, key_enoent, _} -> + retry; + {memcached_error, not_my_vbucket, _} -> + {error, {bad_request, not_my_vbucket}}; + {memcached_error, einval, _} -> + {error, {bad_request, einval}} + end. + +-spec ensure_full_commit_internal(inet:socket(), list(), integer()) -> {ok, binary()}. +ensure_full_commit_internal(Socket, Bucket, VBucket) -> + %% create a new open checkpoint + StartTime = now(), + {ok, OpenCheckpointId, PersistedCkptId} = mc_client_binary:create_new_checkpoint(Socket, VBucket), + + Result = case PersistedCkptId >= (OpenCheckpointId - 1) of + true -> + ?xdcr_debug("replication to remote (bucket: ~p, vbucket ~p) issues an empty open ckpt, " + "no need to wait (open ckpt: ~p, persisted ckpt: ~p)", + [Bucket, VBucket, OpenCheckpointId, PersistedCkptId]), + ok; + _ -> + %% waiting for persisted ckpt to catch up, time out in milliseconds + ?xdcr_debug("replication to (bucket: ~p, vbucket ~p) wait for priority chkpt persisted: ~p, " + "current persisted chkpt: ~p", + [Bucket, VBucket, (OpenCheckpointId-1), PersistedCkptId]), + wait_priority_checkpoint_persisted(Socket, Bucket, VBucket, (OpenCheckpointId - 1)) + end, + + RV = case Result of + ok -> + {ok, Stats2} = mc_binary:quick_stats(Socket, <<>>, + fun (K, V, Acc) -> + [{K, V} | Acc] + end, []), + + EpStartupTime = proplists:get_value(<<"ep_startup_time">>, Stats2), + WorkTime = timer:now_diff(now(), StartTime) div 1000, + ?xdcr_debug("persistend open ckpt: ~p for rep (bucket: ~p, vbucket ~p), " + "time spent in millisecs: ~p", + [OpenCheckpointId, Bucket, VBucket, WorkTime]), + + {ok, EpStartupTime}; + timeout -> + ?xdcr_error("Alert! timeout when rep (bucket ~p, vb: ~p) waiting for open checkpoint " + "(id: ~p) to be persisted.", + [Bucket, VBucket, OpenCheckpointId]), + {error, time_out_polling} + end, + RV. + +-spec wait_priority_checkpoint_persisted(inet:socket(), list(), integer(), integer()) -> ok | timeout. +wait_priority_checkpoint_persisted(Socket, Bucket, VBucket, CheckpointId) -> + case mc_client_binary:wait_for_checkpoint_persistence(Socket, VBucket, CheckpointId) of + ok -> + ok; + {memcached_error, etmpfail, _} -> + ?xdcr_error("rep (bucket: ~p, vbucket ~p) fail to persist priority chkpt: ~p", + [Bucket, VBucket, CheckpointId]), + timeout; + ErrorMsg -> + ?xdcr_error("rep (bucket: ~p, vbucket ~p) fail to persist priority chkpt: ~p (unrecgnized error: ~p)", + [Bucket, VBucket, CheckpointId, ErrorMsg]), + timeout + end. + +%% pipeline %% +receive_remote_meta_pipeline(Sock, VBucket) -> + Response = mc_binary:recv(Sock, res, ?XDCR_XMEM_CONNECTION_TIMEOUT), + {ok, RecvHeader, RecvEntry} = Response, + Raw = case Response of + %% get meta of key succefully + {ok, #mc_header{status=?SUCCESS}, #mc_entry{ext = Ext, cas = CAS}} -> + <> = Ext, + RevId = <>, + RemoteRev = {SeqNo, RevId}, + {ok, RemoteRev, CAS, MetaFlags}; + %% key not found, which is Ok if replicating new items + {ok, #mc_header{status=?KEY_ENOENT}, #mc_entry{cas=CAS}} -> + {ok, key_enoent, CAS}; + %% if timeout + {error, timeout} -> + {memcached_error, timeout, ?format_msg("remote memcached timeout at sock: ~p, vb: ~p", + [Sock, VBucket])}; + %% other errors + Response -> + ?xdcr_error("unrecognized response from memcached: ~p (sock: ~p, vb: ~p),", + [Response, Sock, VBucket]), + mc_client_binary:process_error_response({ok, RecvHeader, RecvEntry, undefined}) + end, + + Reply = case Raw of + {ok, RemoteFullMeta, RemoteCAS, _Flags} -> + {ok, RemoteFullMeta, RemoteCAS}; + {ok, key_enoent, RemoteCAS} -> + {key_enoent, "key does not exist at remote", RemoteCAS}; + {memcached_error, not_my_vbucket, _} -> + ErrorMsg = ?format_msg("remote_memcached_error: not my vbucket (vb: ~p)", [VBucket]), + {error, not_my_vbucket, ErrorMsg}; + {memcached_error, Status, Msg} -> + ErrorMsg = ?format_msg("remote_memcached_error: status ~p, msg: ~p (vb: ~p)", + [Status, Msg, VBucket]), + {error, memcached_error, ErrorMsg}; + OtherErr -> + ErrorMsg = ?format_msg("other error from remote: ~p (vb: ~p)", + [OtherErr, VBucket]), + {error, error, ErrorMsg} + end, + Reply. + +get_flush_response_pipeline(Sock, VBucket) -> + Response = mc_binary:recv(Sock, res, ?XDCR_XMEM_CONNECTION_TIMEOUT), + Reply = case Response of + {ok, #mc_header{status=?SUCCESS} = McHdr, #mc_entry{} = McBody} -> + {ok, McHdr, McBody}; + {ok, #mc_header{status=?KEY_ENOENT}, #mc_entry{cas = CAS} = _McBody} -> + {memcached_error, key_enoent, CAS}; + {ok, #mc_header{status=?KEY_EEXISTS}, #mc_entry{cas = CAS} = _McBody} -> + {memcached_error, key_eexists, CAS}; + {ok, #mc_header{status=?NOT_MY_VBUCKET}, #mc_entry{cas = CAS} = _McBody} -> + {memcached_error, not_my_vbucket, CAS}; + {ok, #mc_header{status=?EINVAL}, #mc_entry{cas = CAS} = _McBody} -> + {memcached_error, einval, CAS}; + {error, timeout} -> + {memcached_error, timeout, ?format_msg("remote memcached timeout at sock ~p, vb: ~p", + [Sock, VBucket])}; + Response -> + ?xdcr_error("unrecognized response from memcached: ~p (sock: ~p, vb: ~p),", + [Response, Sock, VBucket]), + mc_client_binary:process_error_response(Response) + end, + Reply. + +mc_cmd_pipeline(Opcode, Sock, {Header, Entry}, _Timeout) -> + ok = mc_binary:send(Sock, req, + Header#mc_header{opcode = Opcode}, mc_client_binary:ext(Opcode, Entry)), + ok.