% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_rep_missing_revs). -behaviour(gen_server). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([start_link/4, next/1, stop/1]). -define(BUFFER_SIZE, 1000). -include("couch_db.hrl"). -record (state, { changes_loop, changes_from = nil, parent, complete = false, count = 0, reply_to = nil, rows = queue:new(), high_source_seq = 0, high_missing_seq = 0, high_committed_seq = 0 }). start_link(Parent, Target, ChangesFeed, PostProps) -> gen_server:start_link(?MODULE, [Parent, Target, ChangesFeed, PostProps], []). next(Server) -> gen_server:call(Server, next_missing_revs, infinity). stop(Server) -> gen_server:call(Server, stop). init([Parent, _Target, ChangesFeed, _PostProps]) -> process_flag(trap_exit, true), Self = self(), Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Parent) end), {ok, #state{changes_loop=Pid, parent=Parent}}. handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) -> State#state.parent ! {update_stats, missing_revs, length(Revs)}, handle_add_missing_revs(HighSeq, Revs, From, State); handle_call(next_missing_revs, From, State) -> handle_next_missing_revs(From, State). handle_cast({update_committed_seq, N}, State) -> if State#state.high_committed_seq < N -> ?LOG_DEBUG("missing_revs updating committed seq to ~p", [N]); true -> ok end, {noreply, State#state{high_committed_seq=N}}. handle_info({'EXIT', Pid, Reason}, #state{changes_loop=Pid} = State) -> handle_changes_loop_exit(Reason, State); handle_info(Msg, State) -> ?LOG_INFO("unexpected message ~p", [Msg]), {noreply, State}. terminate(_Reason, #state{changes_loop=Pid}) when is_pid(Pid) -> exit(Pid, shutdown), ok; terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %internal funs handle_add_missing_revs(HighSeq, [], _From, State) -> NewState = State#state{high_source_seq=HighSeq}, maybe_checkpoint(NewState), {reply, ok, NewState}; handle_add_missing_revs(HighSeq, Revs, From, #state{reply_to=nil} = State) -> #state{rows=Rows, count=Count} = State, NewState = State#state{ rows = queue:join(Rows, queue:from_list(Revs)), count = Count + length(Revs), high_source_seq = HighSeq, high_missing_seq = HighSeq }, if NewState#state.count < ?BUFFER_SIZE -> {reply, ok, NewState}; true -> {noreply, NewState#state{changes_from=From}} end; handle_add_missing_revs(HighSeq, Revs, _From, #state{count=0} = State) -> gen_server:reply(State#state.reply_to, {HighSeq, Revs}), NewState = State#state{ high_source_seq = HighSeq, high_missing_seq = HighSeq, reply_to = nil }, {reply, ok, NewState}. handle_next_missing_revs(From, #state{count=0} = State) -> if State#state.complete -> {stop, normal, complete, State}; true -> {noreply, State#state{reply_to=From}} end; handle_next_missing_revs(_From, State) -> #state{ changes_from = ChangesFrom, high_missing_seq = HighSeq, rows = Rows } = State, if ChangesFrom =/= nil -> gen_server:reply(ChangesFrom, ok); true -> ok end, NewState = State#state{count=0, changes_from=nil, rows=queue:new()}, {reply, {HighSeq, queue:to_list(Rows)}, NewState}. handle_changes_loop_exit(normal, State) -> if State#state.reply_to =/= nil -> gen_server:reply(State#state.reply_to, complete), {stop, normal, State}; true -> {noreply, State#state{complete=true, changes_loop=nil}} end; handle_changes_loop_exit(Reason, State) -> {stop, Reason, State#state{changes_loop=nil}}. changes_loop(OurServer, SourceChangesServer, Parent) -> case couch_rep_changes_feed:next(SourceChangesServer) of complete -> exit(normal); Changes -> {ok, Target} = gen_server:call(Parent, get_target_db, infinity), MissingRevs = get_missing_revs(Target, Changes), gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity) end, changes_loop(OurServer, SourceChangesServer, Parent). get_missing_revs(#http_db{}=Target, Changes) -> Transform = fun({Props}) -> C = couch_util:get_value(<<"changes">>, Props), Id = couch_util:get_value(<<"id">>, Props), {Id, [R || {[{<<"rev">>, R}]} <- C]} end, IdRevsList = [Transform(Change) || Change <- Changes], SeqDict = changes_dictionary(Changes), {LastProps} = lists:last(Changes), HighSeq = couch_util:get_value(<<"seq">>, LastProps), Request = Target#http_db{ resource = "_missing_revs", method = post, body = {IdRevsList} }, {Resp} = couch_rep_httpc:request(Request), case couch_util:get_value(<<"missing_revs">>, Resp) of {MissingRevs} -> X = [{Id, dict:fetch(Id, SeqDict), couch_doc:parse_revs(RevStrs)} || {Id,RevStrs} <- MissingRevs], {HighSeq, X}; _ -> exit({target_error, couch_util:get_value(<<"error">>, Resp)}) end; get_missing_revs(Target, Changes) -> Transform = fun({Props}) -> C = couch_util:get_value(<<"changes">>, Props), Id = couch_util:get_value(<<"id">>, Props), {Id, [couch_doc:parse_rev(R) || {[{<<"rev">>, R}]} <- C]} end, IdRevsList = [Transform(Change) || Change <- Changes], SeqDict = changes_dictionary(Changes), {LastProps} = lists:last(Changes), HighSeq = couch_util:get_value(<<"seq">>, LastProps), {ok, Results} = couch_db:get_missing_revs(Target, IdRevsList), {HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs, _} <- Results]}. changes_dictionary(ChangeList) -> KVs = [{couch_util:get_value(<<"id">>,C), couch_util:get_value(<<"seq">>,C)} || {C} <- ChangeList], dict:from_list(KVs). %% save a checkpoint if no revs are missing on target so we don't %% rescan metadata unnecessarily maybe_checkpoint(#state{high_missing_seq=N, high_committed_seq=N} = State) -> #state{ parent = Parent, high_source_seq = SourceSeq } = State, Parent ! {missing_revs_checkpoint, SourceSeq}; maybe_checkpoint(_State) -> ok.