% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_changes). -include("couch_db.hrl"). -export([handle_changes/3]). %% @type Req -> #httpd{} | {json_req, JsonObj()} handle_changes(#changes_args{}=Args1, Req, Db) -> Args = Args1#changes_args{filter=make_filter_fun(Args1, Req, Db)}, StartSeq = case Args#changes_args.dir of rev -> couch_db:get_update_seq(Db); fwd -> Args#changes_args.since end, if Args#changes_args.feed == "continuous" orelse Args#changes_args.feed == "longpoll" -> fun(Callback) -> start_sending_changes(Callback, Args#changes_args.feed), Self = self(), {ok, Notify} = couch_db_update_notifier:start_link( fun({_, DbName}) when DbName == Db#db.name -> Self ! db_updated; (_) -> ok end ), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), couch_stats_collector:track_process_count( Self, {httpd, clients_requesting_changes} ), try keep_sending_changes( Args, Callback, Db, StartSeq, <<"">>, Timeout, TimeoutFun ) after couch_db_update_notifier:stop(Notify), get_rest_db_updated() % clean out any remaining update messages end end; true -> fun(Callback) -> start_sending_changes(Callback, Args#changes_args.feed), {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} = send_changes( Args#changes_args{feed="normal"}, Callback, Db, StartSeq, <<"">> ), end_sending_changes(Callback, LastSeq, Args#changes_args.feed) end end. %% @type Req -> #httpd{} | {json_req, JsonObj()} make_filter_fun(#changes_args{filter=FilterName}, Req, Db) -> case [list_to_binary(couch_httpd:unquote(Part)) || Part <- string:tokens(FilterName, "/")] of [] -> fun(DocInfos) -> % doing this as a batch is more efficient for external filters [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos] end; [DName, FName] -> DesignId = <<"_design/", DName/binary>>, DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), % validate that the ddoc has the filter fun #doc{body={Props}} = DDoc, couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), fun(DocInfos) -> Docs = [Doc || {ok, Doc} <- [ {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts]) || DInfo <- DocInfos]], {ok, Passes} = couch_query_servers:filter_docs( Req, Db, DDoc, FName, Docs ), [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos, Pass <- Passes, Pass == true] end; _Else -> throw({bad_request, "filter parameter must be of the form `designname/filtername`"}) end. get_changes_timeout(Args, Callback) -> #changes_args{ heartbeat = Heartbeat, timeout = Timeout, feed = ResponseType } = Args, DefaultTimeout = list_to_integer( couch_config:get("httpd", "changes_timeout", "60000") ), case Heartbeat of undefined -> case Timeout of undefined -> {DefaultTimeout, fun() -> stop end}; infinity -> {infinity, fun() -> stop end}; _ -> {lists:min([DefaultTimeout, Timeout]), fun() -> stop end} end; true -> {DefaultTimeout, fun() -> Callback(timeout, ResponseType), ok end}; _ -> {lists:min([DefaultTimeout, Heartbeat]), fun() -> Callback(timeout, ResponseType), ok end} end. start_sending_changes(_Callback, "continuous") -> ok; start_sending_changes(Callback, ResponseType) -> Callback(start, ResponseType). send_changes(Args, Callback, Db, StartSeq, Prepend) -> #changes_args{ style = Style, include_docs = IncludeDocs, limit = Limit, feed = ResponseType, dir = Dir, filter = FilterFun } = Args, couch_db:changes_since( Db, Style, StartSeq, fun changes_enumerator/2, [{dir, Dir}], {Db, StartSeq, Prepend, FilterFun, Callback, ResponseType, Limit, IncludeDocs} ). keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> #changes_args{ feed = ResponseType, limit = Limit } = Args, {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes( Args#changes_args{dir=fwd}, Callback, Db, StartSeq, Prepend ), couch_db:close(Db), if Limit > NewLimit, ResponseType == "longpoll" -> end_sending_changes(Callback, EndSeq, ResponseType); true -> case wait_db_updated(Timeout, TimeoutFun) of updated -> case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of {ok, Db2} -> keep_sending_changes( Args#changes_args{limit=NewLimit}, Callback, Db2, EndSeq, Prepend2, Timeout, TimeoutFun ); _Else -> end_sending_changes(Callback, EndSeq, ResponseType) end; stop -> end_sending_changes(Callback, EndSeq, ResponseType) end end. end_sending_changes(Callback, EndSeq, ResponseType) -> Callback({stop, EndSeq}, ResponseType). changes_enumerator(DocInfos, {Db, _, _, FilterFun, Callback, "continuous", Limit, IncludeDocs}) -> [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos, Results0 = FilterFun(DocInfos), Results = [Result || Result <- Results0, Result /= null], Go = if Limit =< 1 -> stop; true -> ok end, case Results of [] -> {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit, IncludeDocs} }; _ -> ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs), Callback({change, ChangesRow, <<"">>}, "continuous"), {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit - 1, IncludeDocs} } end; changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Callback, ResponseType, Limit, IncludeDocs}) -> [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos, Results0 = FilterFun(DocInfos), Results = [Result || Result <- Results0, Result /= null], Go = if Limit =< 1 -> stop; true -> ok end, case Results of [] -> {Go, {Db, Seq, Prepend, FilterFun, Callback, ResponseType, Limit, IncludeDocs} }; _ -> ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs), Callback({change, ChangesRow, Prepend}, ResponseType), {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, ResponseType, Limit - 1, IncludeDocs} } end. changes_row(Db, Seq, Id, Del, Results, Rev, true) -> {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ deleted_item(Del) ++ couch_httpd_view:doc_member(Db, {Id, Rev})}; changes_row(_, Seq, Id, Del, Results, _, false) -> {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ deleted_item(Del)}. deleted_item(true) -> [{deleted, true}]; deleted_item(_) -> []. % waits for a db_updated msg, if there are multiple msgs, collects them. wait_db_updated(Timeout, TimeoutFun) -> receive db_updated -> get_rest_db_updated() after Timeout -> case TimeoutFun() of ok -> wait_db_updated(Timeout, TimeoutFun); stop -> stop end end. get_rest_db_updated() -> receive db_updated -> get_rest_db_updated() after 0 -> updated end.