% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_batch_save). -behaviour(gen_server). %% API -export([start_link/2, eventually_save_doc/3, commit_now/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -include("couch_db.hrl"). -record(batch_state, { batch_size=1000, batch_interval=1000 }). %%==================================================================== %% API %%==================================================================== %%-------------------------------------------------------------------- %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- start_link(BatchSize, BatchInterval) -> gen_server:start_link({local, couch_batch_save}, ?MODULE, [BatchSize, BatchInterval], []). %%-------------------------------------------------------------------- %% Function: commit_doc(Doc) -> committed %% Description: Puts the doc into the set to commit. Does not reply until %% the commit is complete. %%-------------------------------------------------------------------- eventually_save_doc(DbName, Doc, UserCtx) -> % find or create a process for the {DbName, UserCtx} pair {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx), % hand it the document ?LOG_DEBUG("sending doc to batch ~p",[Pid]), ok = send_doc_to_batch(Pid, Doc). %%-------------------------------------------------------------------- %% Function: commit_now(DbName) -> committed %% Description: Commits all docs for the DB. Does not reply until %% the commit is complete. %%-------------------------------------------------------------------- commit_now(DbName, UserCtx) -> % find the process for the {DbName, UserCtx} pair {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx, false), case Pid of none -> committed; _Else -> ok = send_commit(Pid), committed end. %%-------------------------------------------------------------------- %% Function: commit_now() -> committed %% Description: Commits all docs for all DBs. Does not reply until %% the commit is complete. %%-------------------------------------------------------------------- % commit_all() -> % committed = gen_server:call(couch_batch_save, commit_now, infinity). % %%==================================================================== %% gen_server callbacks %%==================================================================== %%-------------------------------------------------------------------- %% Function: init([BatchSize, BatchInterval]) -> {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %% Description: Initiates the server with the meanings %%-------------------------------------------------------------------- init([BatchSize, BatchInterval]) -> ets:new(couch_batch_save_by_db, [set, public, named_table]), {ok, #batch_state{batch_size=BatchSize, batch_interval=BatchInterval}}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | %% {stop, Reason, State} %% Description: Handling call messages %%-------------------------------------------------------------------- handle_call({make_pid, DbName, UserCtx}, _From, #batch_state{ batch_size=BatchSize, batch_interval=BatchInterval }=State) -> % Create the pid in a serialized process. % We checked before to see that we need the Pid, but the check is outside % the gen_server for parellelism. We check again here to ensure we don't % make a duplicate. Resp = case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of [{_, Pid}] -> % we have a pid {ok, Pid}; [] -> % no match % start and record the doc collector process ?LOG_DEBUG("making a batch pid ~p",[{DbName, UserCtx}]), Pid = spawn_link(fun() -> doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) end), true = ets:insert_new(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}), {ok, Pid} end, {reply, Resp, State}. %%-------------------------------------------------------------------- %% Function: handle_cast(Msg, State) -> {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% Description: Handling cast messages %%-------------------------------------------------------------------- handle_cast(_Msg, State) -> {noreply, State}. %%-------------------------------------------------------------------- %% Function: handle_info(Info, State) -> {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- % handle_info({'EXIT', Pid, Reason}, State) -> % {noreply, State}; handle_info(_Info, State) -> {noreply, State}. %%-------------------------------------------------------------------- %% Function: terminate(Reason, State) -> void() %% Description: This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any necessary %% cleaning up. When it returns, the gen_server terminates with Reason. %% The return value is ignored. %%-------------------------------------------------------------------- terminate(_Reason, _State) -> % todo shutdown the interval loop % todo kill all the Pids and drop the ets table ok. %%-------------------------------------------------------------------- %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} %% Description: Convert process state when code is changed %%-------------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- commit_user_docs(_DbName, _UserCtx, []) -> {ok, []}; commit_user_docs(DbName, UserCtx, Docs) -> ?LOG_INFO("Committing ~p batch docs to ~p",[length(Docs), DbName]), case couch_db:open(DbName, [{user_ctx, UserCtx}]) of {ok, Db} -> try {ok, Revs} = couch_db:update_docs(Db, Docs), ?LOG_INFO("Committed ~p batch docs to ~p",[length(Docs), DbName]), {ok, Revs} after couch_db:close(Db) end; Error -> throw(Error) end. % spawned to trigger commits on an interval commit_every_ms(Pid, BatchInterval) -> receive after BatchInterval -> ok = send_commit(Pid), commit_every_ms(Pid, BatchInterval) end. send_commit(Pid) -> Pid ! {self(), commit}, receive {Pid, committed} -> ok; {'DOWN', _, _, Pid, _} -> exit(normal) end. batch_pid_for_db_and_user(DbName, UserCtx) -> batch_pid_for_db_and_user(DbName, UserCtx, true). batch_pid_for_db_and_user(DbName, UserCtx, Create) -> % look in the ets table case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of [{_, Pid}] -> % we have a pid {ok, Pid}; [] -> % no match if Create -> {ok, Pid} = gen_server:call(couch_batch_save, {make_pid, DbName, UserCtx}, infinity), {ok, Pid}; true -> {ok, none} end end. send_doc_to_batch(Pid, Doc) -> Pid ! {self(), add_doc, Doc}, receive {Pid, doc_added} -> ok end. % the loop that holds documents between commits doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) -> % start a process that triggers commit every BatchInterval milliseconds Me = self(), spawn_link(fun() -> erlang:monitor(process, Me), commit_every_ms(Me, BatchInterval) end), doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, []); doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) when length(Docs) >= BatchSize-> collector_commit(DbName, UserCtx, BatchInterval, Docs), exit(normal); doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) -> receive {From, add_doc, Doc} -> From ! {self(), doc_added}, doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, [Doc|Docs]); {From, commit} -> collector_commit(DbName, UserCtx, BatchInterval, Docs), From ! {self(), committed}, exit(normal) end. collector_commit(DbName, UserCtx, BatchInterval, Docs) -> % unregister unregister_collector(DbName, UserCtx, self()), % wait and collect Docs2 = shutdown_collector(DbName, UserCtx, BatchInterval, Docs), {ok, _Revs} = commit_user_docs(DbName, UserCtx, Docs2). unregister_collector(DbName, UserCtx, Pid) -> % remove from ets ets:delete_object(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}). shutdown_collector(DbName, UserCtx, BatchInterval, Docs) -> receive {From, add_doc, Doc} -> From ! {self(), doc_added}, shutdown_collector(DbName, UserCtx, BatchInterval, [Doc|Docs]) % this interval will be waited for each time ensure-full-commit is called after BatchInterval -> Docs end.