diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl index 96c49886..5f5f45c1 100644 --- a/apps/couch/src/couch_db.erl +++ b/apps/couch/src/couch_db.erl @@ -73,16 +73,16 @@ open_int(DbName, Options) -> % it ensures that the http userCtx is a valid reader open(DbName, Options) -> case couch_server:open(DbName, Options) of - {ok, Db} -> - try - check_is_reader(Db), - {ok, Db} - catch - throw:Error -> - close(Db), - throw(Error) - end; - Else -> Else + {ok, Db} -> + try + check_is_reader(Db), + {ok, Db} + catch + throw:Error -> + close(Db), + throw(Error) + end; + Else -> Else end. reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) -> @@ -154,7 +154,7 @@ apply_open_options({ok, Doc},Options) -> apply_open_options2(Doc,Options); apply_open_options(Else,_Options) -> Else. - + apply_open_options2(Doc,[]) -> {ok, Doc}; apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc, @@ -421,22 +421,22 @@ update_docs(Db, Docs) -> % group_alike_docs groups the sorted documents into sublist buckets, by id. % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] group_alike_docs(Docs) -> - Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs), + Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs), group_alike_docs(Sorted, []). group_alike_docs([], Buckets) -> lists:reverse(Buckets); group_alike_docs([Doc|Rest], []) -> group_alike_docs(Rest, [[Doc]]); -group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> - [#doc{id=BucketId}|_] = Bucket, +group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) -> + [{#doc{id=BucketId},_Ref}|_] = Bucket, case Doc#doc.id == BucketId of true -> % add to existing bucket - group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]); + group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]); false -> % add to new bucket - group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]]) + group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]]) end. validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) -> @@ -523,10 +523,8 @@ prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped, {AccPrepped, AccFatalErrors}; prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AllowConflict, AccPrepped, AccErrors) -> - [#doc{id=Id}|_]=DocBucket, - % no existing revs are known, {PreppedBucket, AccErrors3} = lists:foldl( - fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> + fun({#doc{revs=Revs}=Doc, Ref}, {AccBucket, AccErrors2}) -> case couch_doc:has_stubs(Doc) of true -> couch_doc:merge_stubs(Doc, #doc{}); % will throw exception @@ -536,13 +534,13 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], {0, []} -> case validate_doc_update(Db, Doc, fun() -> nil end) of ok -> - {[Doc | AccBucket], AccErrors2}; + {[{Doc, Ref} | AccBucket], AccErrors2}; Error -> - {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]} + {AccBucket, [{Ref, Error} | AccErrors2]} end; _ -> % old revs specified but none exist, a conflict - {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]} + {AccBucket, [{Ref, conflict} | AccErrors2]} end end, {[], AccErrors}, DocBucket), @@ -556,14 +554,14 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], LeafRevsDict = dict:from_list([{{Start, RevId}, {Del, Ptr, Revs}} || {#leaf{deleted=Del, ptr=Ptr}, {Start, [RevId|_]}=Revs} <- Leafs]), {PreppedBucket, AccErrors3} = lists:foldl( - fun(Doc, {Docs2Acc, AccErrors2}) -> + fun({Doc,Ref}, {Docs2Acc, AccErrors2}) -> case prep_and_validate_update(Db, Doc, OldFullDocInfo, LeafRevsDict, AllowConflict) of {ok, Doc2} -> - {[Doc2 | Docs2Acc], AccErrors2}; - {Error, #doc{id=Id,revs=Revs}} -> + {[{Doc2, Ref} | Docs2Acc], AccErrors2}; + {Error, #doc{}} -> % Record the error - {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]} + {Docs2Acc, [{Ref, Error} |AccErrors2]} end end, {[], AccErrors}, DocBucket), @@ -583,7 +581,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI case OldInfo of not_found -> {ValidatedBucket, AccErrors3} = lists:foldl( - fun(Doc, {AccPrepped2, AccErrors2}) -> + fun({Doc,Ref}, {AccPrepped2, AccErrors2}) -> case couch_doc:has_stubs(Doc) of true -> couch_doc:merge_stubs(Doc, #doc{}); % will throw exception @@ -591,7 +589,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI end, case validate_doc_update(Db, Doc, fun() -> nil end) of ok -> - {[Doc | AccPrepped2], AccErrors2}; + {[{Doc,Ref} | AccPrepped2], AccErrors2}; Error -> {AccPrepped2, [{Doc, Error} | AccErrors2]} end @@ -600,7 +598,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3); {ok, #full_doc_info{rev_tree=OldTree}} -> NewRevTree = lists:foldl( - fun(NewDoc, AccTree) -> + fun({NewDoc,_Ref}, AccTree) -> {NewTree, _} = couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc), Db#db.revs_limit), NewTree @@ -610,16 +608,16 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]), {ValidatedBucket, AccErrors3} = lists:foldl( - fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) -> + fun({#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, Ref}, {AccValidated, AccErrors2}) -> case dict:find({Pos, RevId}, LeafRevsFullDict) of {ok, {Start, Path}} -> % our unflushed doc is a leaf node. Go back on the path % to find the previous rev that's on disk. - + LoadPrevRevFun = fun() -> make_first_doc_on_disk(Db,Id,Start-1, tl(Path)) end, - + case couch_doc:has_stubs(Doc) of true -> DiskDoc = LoadPrevRevFun(), @@ -629,10 +627,10 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI Doc2 = Doc, GetDiskDocFun = LoadPrevRevFun end, - + case validate_doc_update(Db, Doc2, GetDiskDocFun) of ok -> - {[Doc2 | AccValidated], AccErrors2}; + {[{Doc2, Ref} | AccValidated], AccErrors2}; Error -> {AccValidated, [{Doc, Error} | AccErrors2]} end; @@ -664,10 +662,10 @@ new_revs([], OutBuckets, IdRevsAcc) -> {lists:reverse(OutBuckets), IdRevsAcc}; new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) -> {NewBucket, IdRevsAcc3} = lists:mapfoldl( - fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)-> + fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)-> NewRevId = new_revid(Doc), - {Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, - [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} + {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref}, + [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} end, IdRevsAcc, Bucket), new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3). @@ -686,16 +684,21 @@ check_dup_atts2(_) -> update_docs(Db, Docs, Options, replicated_changes) -> increment_stat(Db, {couchdb, database_writes}), - DocBuckets = group_alike_docs(Docs), + + % associate reference with each doc in order to track duplicates + Docs2 = lists:map(fun(Doc) -> + {Doc, make_ref()} + end,Docs), + DocBuckets = group_alike_docs(Docs2), case (Db#db.validate_doc_funs /= []) orelse lists:any( - fun(#doc{id= <>}) -> true; - (#doc{atts=Atts}) -> + fun({#doc{id= <>},_Ref}) -> true; + ({#doc{atts=Atts},_Ref}) -> Atts /= [] - end, Docs) of + end, Docs2) of true -> - Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], + Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets], ExistingDocs = get_full_doc_infos(Db, Ids), {DocBuckets2, DocErrors} = @@ -705,8 +708,8 @@ update_docs(Db, Docs, Options, replicated_changes) -> DocErrors = [], DocBuckets3 = DocBuckets end, - DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd) - || Doc <- Bucket] || Bucket <- DocBuckets3], + DocBuckets4 = [[{doc_flush_atts(check_dup_atts(Doc), Db#db.fd), Ref} + || {Doc, Ref} <- Bucket] || Bucket <- DocBuckets3], {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; @@ -715,28 +718,33 @@ update_docs(Db, Docs, Options, interactive_edit) -> AllOrNothing = lists:member(all_or_nothing, Options), % go ahead and generate the new revision ids for the documents. % separate out the NonRep documents from the rest of the documents - {Docs2, NonRepDocs} = lists:foldl( - fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) -> + + % associate reference with each doc in order to track duplicates + Docs2 = lists:map(fun(Doc) -> + {Doc, make_ref()} + end,Docs), + {Docs3, NonRepDocs} = lists:foldl( + fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) -> case Id of <> -> {DocsAcc, [Doc | NonRepDocsAcc]}; Id-> {[Doc | DocsAcc], NonRepDocsAcc} end - end, {[], []}, Docs), - - DocBuckets = group_alike_docs(Docs2), + end, {[], []}, Docs2), + + DocBuckets = group_alike_docs(Docs3), case (Db#db.validate_doc_funs /= []) orelse lists:any( - fun(#doc{id= <>}) -> + fun({#doc{id= <>},_Ref}) -> true; - (#doc{atts=Atts}) -> + ({#doc{atts=Atts},_Ref}) -> Atts /= [] - end, Docs2) of + end, Docs3) of true -> % lookup the doc by id and get the most recent - Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], + Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets], ExistingDocInfos = get_full_doc_infos(Db, Ids), {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db, @@ -750,29 +758,32 @@ update_docs(Db, Docs, Options, interactive_edit) -> end, if (AllOrNothing) and (PreCommitFailures /= []) -> - {aborted, lists:map( - fun({{Id,{Pos, [RevId|_]}}, Error}) -> - {{Id, {Pos, RevId}}, Error}; - ({{Id,{0, []}}, Error}) -> - {{Id, {0, <<>>}}, Error} - end, PreCommitFailures)}; + {aborted, + lists:foldl(fun({#doc{id=Id,revs={Pos, RevIds}}, Ref},Acc) -> + case lists:keyfind(Ref,1,PreCommitFailures) of + {Ref, Error} -> + [{{Id,{Pos,RevIds}}, Error} | Acc]; + false -> + Acc + end + end,[],Docs3)}; true -> Options2 = if AllOrNothing -> [merge_conflicts]; true -> [] end ++ Options, DocBuckets3 = [[ - doc_flush_atts(set_new_att_revpos( - check_dup_atts(Doc)), Db#db.fd) - || Doc <- B] || B <- DocBuckets2], + {doc_flush_atts(set_new_att_revpos( + check_dup_atts(Doc)), Db#db.fd), Ref} + || {Doc, Ref} <- B] || B <- DocBuckets2], {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), - + {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2), - + ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures), {ok, lists:map( - fun(#doc{id=Id,revs={Pos, RevIds}}) -> - {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict), + fun({#doc{}, Ref}) -> + {ok, Result} = dict:find(Ref, ResultsDict), Result - end, Docs)} + end, Docs2)} end. % Returns the first available document on disk. Input list is a full rev path @@ -831,7 +842,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets, % This can happen if the db file we wrote to was swapped out by % compaction. Retry by reopening the db and writing to the current file {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]), - DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], + DocBuckets2 = [[{doc_flush_atts(Doc, Db2#db.fd), Ref} || {Doc,Ref} <- Bucket] || Bucket <- DocBuckets], % We only retry once close(Db2), Pid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit}, @@ -852,7 +863,7 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) -> (Att) -> Att#att{revpos=RevPos+1} end, Atts)}. - + doc_flush_atts(Doc, Fd) -> Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}. diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl index 9bf52ee0..c551a18e 100644 --- a/apps/couch/src/couch_db_updater.erl +++ b/apps/couch/src/couch_db_updater.erl @@ -275,8 +275,8 @@ merge_updates([], RestB, AccOutGroups) -> lists:reverse(AccOutGroups, RestB); merge_updates(RestA, [], AccOutGroups) -> lists:reverse(AccOutGroups, RestA); -merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA], - [[{_, #doc{id=IdB}}|_]=GroupB | RestB], AccOutGroups) -> +merge_updates([[{_, {#doc{id=IdA},_}}|_]=GroupA | RestA], + [[{_, {#doc{id=IdB},_}}|_]=GroupB | RestB], AccOutGroups) -> if IdA == IdB -> merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]); IdA < IdB -> @@ -285,6 +285,7 @@ merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA], merge_updates([GroupA | RestA], RestB, [GroupB | AccOutGroups]) end. + collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) -> receive % Only collect updates with the same MergeConflicts flag and without @@ -531,9 +532,9 @@ flush_trees(#db{fd=Fd,header=Header}=Db, flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). -send_result(Client, Id, OriginalRevs, NewResult) -> +send_result(Client, Ref, NewResult) -> % used to send a result to the client - catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}). + catch(Client ! {result, self(), {Ref, NewResult}}). merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; @@ -542,12 +543,12 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} = OldDocInfo, NewRevTree = lists:foldl( - fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) -> + fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, AccTree) -> if not MergeConflicts -> case couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc), Limit) of {_NewTree, conflicts} when (not OldDeleted) -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, Ref, conflict), AccTree; {NewTree, conflicts} when PrevRevs /= [] -> % Check to be sure if prev revision was specified, it's @@ -559,7 +560,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], if IsPrevLeaf -> NewTree; true -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, Ref, conflict), AccTree end; {NewTree, no_conflicts} when AccTree == NewTree -> @@ -578,11 +579,10 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], {NewTree2, _} = couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc2), Limit), % we changed the rev id, this tells the caller we did - send_result(Client, Id, {Pos-1,PrevRevs}, - {ok, {OldPos + 1, NewRevId}}), + send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}), NewTree2; true -> - send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + send_result(Client, Ref, conflict), AccTree end; {NewTree, _} -> @@ -630,7 +630,7 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> update_seq = LastSeq, revs_limit = RevsLimit } = Db, - Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], + Ids = [Id || [{_Client, {#doc{id=Id},_Ref}}|_] <- DocsList], % lookup up the old documents, if they exist. OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), OldDocInfos = lists:zipwith( @@ -692,10 +692,10 @@ compute_data_sizes([FullDocInfo | RestDocInfos], Acc) -> update_local_docs(Db, []) -> {ok, Db}; update_local_docs(#db{local_tree=Btree}=Db, Docs) -> - Ids = [Id || {_Client, #doc{id=Id}} <- Docs], + Ids = [Id || {_Client, {#doc{id=Id},_Ref}} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, + fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body},Ref}}, _OldDocLookup) -> case PrevRevs of [RevStr|_] -> @@ -713,14 +713,13 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) -> % true -> case Delete of false -> - send_result(Client, Id, {0, PrevRevs}, {ok, + send_result(Client, Ref, {ok, {0, ?l2b(integer_to_list(PrevRev + 1))}}), {update, {Id, {PrevRev + 1, Body}}}; true -> - send_result(Client, Id, {0, PrevRevs}, - {ok, {0, <<"0">>}}), + send_result(Client, Ref, {ok, {0, <<"0">>}}), {remove, Id} - end%; + end % false -> % send_result(Client, Id, {0, PrevRevs}, conflict), % ignore