% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_api_wrap_httpc). -include("couch_db.hrl"). -include("couch_api_wrap.hrl"). -include("../ibrowse/ibrowse.hrl"). -export([setup/1]). -export([send_req/3]). -export([full_url/2]). -import(couch_util, [ get_value/2, get_value/3 ]). -define(RETRY_LATER_WAIT, 100). -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})). setup(#httpdb{url = Url, httpc_pool = nil} = Db) -> #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), {ok, Pid} = couch_httpc_pool:start_link(Host, Port), {ok, Db#httpdb{httpc_pool = Pid}}. send_req(#httpdb{headers = BaseHeaders, httpc_pool = Pool} = HttpDb, Params, Callback) -> Method = get_value(method, Params, get), Headers = get_value(headers, Params, []) ++ BaseHeaders, Body = get_value(body, Params, []), Headers1 = case (Body =:= [] orelse Body =:= <<>>) andalso (Method =:= put orelse Method =:= post) of true -> ?replace(Headers, "Content-Length", 0); false -> Headers end, IbrowseOptions = [ {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout}, {socket_options, [{reuseaddr, true}, {keepalive, true}]} | HttpDb#httpdb.proxy_options ++ HttpDb#httpdb.ssl_options ++ get_value(ibrowse_options, Params, []) ], Headers2 = oauth_header(HttpDb, Params) ++ Headers1, Url = full_url(HttpDb, Params), case get_value(direct, Params, false) of true -> {ok, Pid} = ibrowse:spawn_link_worker_process(Url), Worker = {direct, Pid}, Response = ibrowse:send_req_direct( Pid, Url, Headers2, Method, Body, IbrowseOptions, infinity); false -> {ok, Worker} = couch_httpc_pool:get_worker(Pool), Response = ibrowse:send_req_direct( Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity) end, process_response(Response, Worker, HttpDb, Params, Callback). process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) -> ok = timer:sleep(?RETRY_LATER_WAIT), send_req(HttpDb, Params, Callback); process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) -> % ibrowse worker terminated because remote peer closed the socket % -> not an error send_req(HttpDb, Params, Cb); process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) -> process_stream_response(ReqId, Worker, HttpDb, Params, Callback); process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) -> stop_worker(Worker, HttpDb), case list_to_integer(Code) of Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) -> EJson = case Body of <<>> -> null; Json -> ?JSON_DECODE(Json) end, Callback(Ok, Headers, EJson); R when R =:= 301 ; R =:= 302 ; R =:= 303 -> do_redirect(Worker, R, Headers, HttpDb, Params, Callback); Error -> maybe_retry({code, Error}, Worker, HttpDb, Params, Callback) end; process_response(Error, Worker, HttpDb, Params, Callback) -> maybe_retry(Error, Worker, HttpDb, Params, Callback). process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> receive {ibrowse_async_headers, ReqId, Code, Headers} -> case ?l2i(Code) of Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) -> StreamDataFun = fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Callback) end, Ret = Callback(Ok, Headers, StreamDataFun), stop_worker(Worker, HttpDb), Ret; R when R =:= 301 ; R =:= 302 ; R =:= 303 -> do_redirect(Worker, R, Headers, HttpDb, Params, Callback); Error -> report_error(Worker, HttpDb, Params, {code, Error}) end; {ibrowse_async_response, ReqId, {error, _} = Error} -> maybe_retry(Error, Worker, HttpDb, Params, Callback) end. stop_worker({direct, Worker}, _HttpDb) -> unlink(Worker), receive {'EXIT', Worker, _} -> ok after 0 -> ok end, catch ibrowse:stop_worker_process(Worker); stop_worker(Worker, #httpdb{httpc_pool = Pool}) -> ok = couch_httpc_pool:release_worker(Pool, Worker). maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) -> report_error(Worker, HttpDb, Params, {error, Error}); maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb, Params, Cb) -> stop_worker(Worker, HttpDb), Method = string:to_upper(atom_to_list(get_value(method, Params, get))), Url = couch_util:url_strip_password(full_url(HttpDb, Params)), ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~p", [Method, Url, Wait / 1000, Error]), ok = timer:sleep(Wait), send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait * 2}, Params, Cb). report_error(Worker, #httpdb{timeout = Timeout} = HttpDb, Params, timeout) -> report_error(Worker, HttpDb, Params, {timeout, Timeout}); report_error(Worker, #httpdb{timeout = T} = Db, Params, {error, req_timedout}) -> report_error(Worker, Db, Params, {timeout, T}); report_error(Worker, HttpDb, Params, Error) -> Method = string:to_upper(atom_to_list(get_value(method, Params, get))), Url = couch_util:url_strip_password(full_url(HttpDb, Params)), do_report_error(Url, Method, Error), stop_worker(Worker, HttpDb), exit({http_request_failed, Method, Url, Error}). do_report_error(FullUrl, Method, {error, Error}) -> ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~p", [Method, FullUrl, Error]); do_report_error(Url, Method, {code, Code}) -> ?LOG_ERROR("Replicator, request ~s to ~p failed. The received " "HTTP error code is ~p", [Method, Url, Code]); do_report_error(Url, Method, {timeout, Timeout}) -> ?LOG_ERROR("Replicator, request ~s to ~p failed. Inactivity timeout " " (~p milliseconds).", [Method, Url, Timeout]). stream_data_self(HttpDb, Params, Worker, ReqId, Cb) -> ibrowse:stream_next(ReqId), receive {ibrowse_async_response, ReqId, {error, _} = Error} -> maybe_retry(Error, Worker, HttpDb, Params, Cb); {ibrowse_async_response, ReqId, Data} -> {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end}; {ibrowse_async_response_end, ReqId} -> {<<>>, fun() -> report_error(Worker, HttpDb, Params, {error, more_data_expected}) end} end. full_url(#httpdb{url = BaseUrl}, Params) -> Path = get_value(path, Params, []), QueryArgs = get_value(qs, Params, []), BaseUrl ++ Path ++ query_args_to_string(QueryArgs, []). query_args_to_string([], []) -> ""; query_args_to_string([], Acc) -> "?" ++ string:join(lists:reverse(Acc), "&"); query_args_to_string([{K, V} | Rest], Acc) -> Kv = K ++ "=" ++ ?b2l(iolist_to_binary(V)), query_args_to_string(Rest, [Kv | Acc]). oauth_header(#httpdb{oauth = nil}, _ConnParams) -> []; oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) -> Consumer = { OAuth#oauth.consumer_key, OAuth#oauth.consumer_secret, OAuth#oauth.signature_method }, Method = case get_value(method, ConnParams, get) of get -> "GET"; post -> "POST"; put -> "PUT"; head -> "HEAD" end, OAuthParams = oauth:signed_params(Method, BaseUrl ++ get_value(path, ConnParams, []), get_value(qs, ConnParams, []), Consumer, OAuth#oauth.token, OAuth#oauth.token_secret), [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(OAuthParams)}]. do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) -> stop_worker(Worker, HttpDb), RedirectUrl = redirect_url(Headers, Url), {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params), send_req(HttpDb2, Params2, Cb). redirect_url(RespHeaders, OrigUrl) -> MochiHeaders = mochiweb_headers:make(RespHeaders), RedUrl = mochiweb_headers:get_value("Location", MochiHeaders), #url{ host = Base, port = Port, path = Path, % includes query string protocol = Proto } = ibrowse_lib:parse_url(RedUrl), #url{ username = User, password = Passwd } = ibrowse_lib:parse_url(OrigUrl), Creds = case is_list(User) andalso is_list(Passwd) of true -> User ++ ":" ++ Passwd ++ "@"; false -> [] end, atom_to_list(Proto) ++ "://" ++ Creds ++ Base ++ ":" ++ integer_to_list(Port) ++ Path. after_redirect(RedirectUrl, 303, HttpDb, Params) -> after_redirect(RedirectUrl, HttpDb, ?replace(Params, method, get)); after_redirect(RedirectUrl, _Code, HttpDb, Params) -> after_redirect(RedirectUrl, HttpDb, Params). after_redirect(RedirectUrl, HttpDb, Params) -> Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)), {HttpDb#httpdb{url = RedirectUrl}, Params2}.