% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_stream). -behaviour(gen_server). -define(FILE_POINTER_BYTES, 8). -define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)). -define(STREAM_OFFSET_BYTES, 4). -define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)). -define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go -define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data -export([open/1, close/1, write/2, foldl/4, foldl/5, old_foldl/5,old_copy_to_new_stream/4]). -export([copy_to_new_stream/3,old_read_term/2]). -export([init/1, terminate/2, handle_call/3]). -export([handle_cast/2,code_change/3,handle_info/2]). -include("couch_db.hrl"). -record(stream, {fd = 0, written_pointers=[], buffer_list = [], buffer_len = 0, max_buffer = 4096, written_len = 0, md5 }). %%% Interface functions %%% open(Fd) -> gen_server:start_link(couch_stream, Fd, []). close(Pid) -> gen_server:call(Pid, close, infinity). copy_to_new_stream(Fd, PosList, DestFd) -> {ok, Dest} = open(DestFd), foldl(Fd, PosList, fun(Bin, _) -> ok = write(Dest, Bin) end, ok), close(Dest). % 09 UPGRADE CODE old_copy_to_new_stream(Fd, Pos, Len, DestFd) -> {ok, Dest} = open(DestFd), old_foldl(Fd, Pos, Len, fun(Bin, _) -> ok = write(Dest, Bin) end, ok), close(Dest). % 09 UPGRADE CODE old_foldl(_Fd, null, 0, _Fun, Acc) -> Acc; old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)-> {ok, Acc2, _} = old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc), Acc2. foldl(_Fd, [], _Fun, Acc) -> Acc; foldl(Fd, [Pos|Rest], Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Fun, Fun(Bin, Acc)). foldl(Fd, PosList, <<>>, Fun, Acc) -> foldl(Fd, PosList, Fun, Acc); foldl(Fd, PosList, Md5, Fun, Acc) -> foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc). foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> Md5 = erlang:md5_final(Md5Acc), Acc; foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, Bin)), Fun(Bin, Acc); foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). write(_Pid, <<>>) -> ok; write(Pid, Bin) -> gen_server:call(Pid, {write, Bin}, infinity). init(Fd) -> {ok, #stream{fd=Fd, md5=erlang:md5_init()}}. terminate(_Reason, _Stream) -> ok. handle_call({write, Bin}, _From, Stream) -> BinSize = iolist_size(Bin), #stream{ fd = Fd, written_len = WrittenLen, written_pointers = Written, buffer_len = BufferLen, buffer_list = Buffer, max_buffer = Max, md5 = Md5} = Stream, if BinSize + BufferLen > Max -> WriteBin = lists:reverse(Buffer, [Bin]), Md5_2 = erlang:md5_update(Md5, WriteBin), {ok, Pos} = couch_file:append_binary(Fd, WriteBin), {reply, ok, Stream#stream{ written_len=WrittenLen + BufferLen + BinSize, written_pointers=[Pos|Written], buffer_list=[], buffer_len=0, md5=Md5_2}}; true -> {reply, ok, Stream#stream{ buffer_list=[Bin|Buffer], buffer_len=BufferLen + BinSize}} end; handle_call(close, _From, Stream) -> #stream{ fd = Fd, written_len = WrittenLen, written_pointers = Written, buffer_len = BufferLen, buffer_list = Buffer, md5 = Md5} = Stream, case Buffer of [] -> Result = {lists:reverse(Written), WrittenLen, erlang:md5_final(Md5)}; _ -> WriteBin = lists:reverse(Buffer), Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin)), {ok, Pos} = couch_file:append_binary(Fd, WriteBin), Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen, Md5Final} end, {stop, normal, Result, Stream}. handle_cast(_Msg, State) -> {noreply,State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. handle_info(_Info, State) -> {noreply, State}. % 09 UPGRADE CODE old_read_term(Fd, Sp) -> {ok, <>, Sp2} = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES), {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen), {ok, binary_to_term(Bin)}. old_read(Fd, Sp, Num) -> {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []), Bin = list_to_binary(lists:reverse(RevBin)), {ok, Bin, Sp2}. % 09 UPGRADE CODE old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) -> {ok, Acc, Sp}; old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) -> {ok, <>} = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), Sp = {NextPos, NextOffset}, % Check NextPos is past current Pos (this is always true in a stream) % Guards against potential infinite loops caused by corruption. case NextPos > Pos of true -> ok; false -> throw({error, stream_corruption}) end, old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc); old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) -> ReadAmount = lists:min([MaxChunk, Num, Offset]), {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount), Sp = {Pos + ReadAmount, Offset - ReadAmount}, old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)). % Tests moved to tests/etap/050-stream.t