Skip to content

Very simple option to change the IO device. #180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 45 additions & 33 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@

-define(SKIP_SEARCH_JUMP, 2048).

-define(DEFAULT_IO_MODULE, file).

%% Specification of the Log format.
%%
%% Notes:
Expand Down Expand Up @@ -577,14 +579,14 @@ init(#{dir := Dir,
?DEBUG_(Name, " next offset ~b first offset ~b",
[element(1, TailInfo),
FstChId]),
{ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE),
{ok, SegFd} = open_seg(Filename, ?FILE_OPTS_WRITE),
{ok, Size} = file:position(SegFd, Size),
%% maybe_fix_corrupted_files has truncated the index to the last
%% record pointing
%% at a valid chunk we can now truncate the segment to size in
%% case there is trailing data
ok = file:truncate(SegFd),
{ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE),
{ok, IdxFd} = open_idx(IdxFilename, ?FILE_OPTS_WRITE),
{ok, IdxEof} = file:position(IdxFd, eof),
NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B,
#?MODULE{cfg = Cfg,
Expand All @@ -600,8 +602,8 @@ init(#{dir := Dir,
index = IdxFilename,
last = undefined}, _} ->
%% the empty log case
{ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE),
{ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE),
{ok, SegFd} = open_seg(Filename, ?FILE_OPTS_WRITE),
{ok, IdxFd} = open_idx(IdxFilename, ?FILE_OPTS_WRITE),
{ok, _} = file:position(SegFd, ?LOG_HEADER_SIZE),
counters:put(Cnt, ?C_SEGMENTS, 1),
%% the segment could potentially have trailing data here so we'll
Expand Down Expand Up @@ -641,7 +643,7 @@ maybe_fix_corrupted_files([IdxFile]) ->
true ->
% the only index doesn't contain a single valid record
% make sure it has a valid header
{ok, IdxFd} = file:open(IdxFile, ?FILE_OPTS_WRITE),
{ok, IdxFd} = open_idx(IdxFile, ?FILE_OPTS_WRITE),
ok = file:write(IdxFd, ?IDX_HEADER),
ok = file:close(IdxFd);
false ->
Expand All @@ -651,7 +653,7 @@ maybe_fix_corrupted_files([IdxFile]) ->
true ->
% the only segment doesn't contain a single valid chunk
% make sure it has a valid header
{ok, SegFd} = file:open(SegFile, ?FILE_OPTS_WRITE),
{ok, SegFd} = open_seg(SegFile, ?FILE_OPTS_WRITE),
ok = file:write(SegFd, ?LOG_HEADER),
ok = file:close(SegFd);
false ->
Expand Down Expand Up @@ -698,7 +700,7 @@ truncate_invalid_idx_records(IdxFile, SegSize) ->
% add an option to perform a full segment scan and reconstruct
% the index for the valid chunks.
SegFile = segment_from_index_file(IdxFile),
{ok, IdxFd} = open(IdxFile, [raw, binary, write, read]),
{ok, IdxFd} = open_idx(IdxFile, [raw, binary, write, read]),
{ok, Pos} = position_at_idx_record_boundary(IdxFd, eof),
ok = skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos),
ok = file:truncate(IdxFd),
Expand Down Expand Up @@ -956,9 +958,9 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) ->
%% the Chunk id was found and has the right epoch
%% lets truncate to this point
%% FilePos could be eof here which means the next offset
{ok, Fd} = file:open(File, [read, write, binary, raw]),
{ok, Fd} = open_seg(File, [read, write, binary, raw]),
_ = file:advise(Fd, 0, 0, random),
{ok, IdxFd} = file:open(IdxFile, [read, write, binary, raw]),
{ok, IdxFd} = open_idx(IdxFile, [read, write, binary, raw]),

NextPos = next_chunk_pos(Fd, Pos),
{ok, _} = file:position(Fd, NextPos),
Expand Down Expand Up @@ -1047,7 +1049,7 @@ init_data_reader_at(ChunkId, FilePos, File,
#{dir := Dir, name := Name,
shared := Shared,
readers_counter_fun := CountersFun} = Config) ->
case file:open(File, [raw, binary, read]) of
case open_seg(File, [raw, binary, read]) of
{ok, Fd} ->
Cnt = make_counter(Config),
counters:put(Cnt, ?C_OFFSET, ChunkId - 1),
Expand Down Expand Up @@ -1157,7 +1159,7 @@ init_offset_reader0({timestamp, Ts}, #{} = Conf) ->
SegmentFile = segment_from_index_file(IdxFile),
open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf);
{first_in, IdxFile} ->
{ok, Fd} = file:open(IdxFile, [raw, binary, read]),
{ok, Fd} = open_seg(IdxFile, [raw, binary, read]),
{ok, ?IDX_MATCH(ChunkId, _, FilePos)} = first_idx_record(Fd),
SegmentFile = segment_from_index_file(IdxFile),
open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf);
Expand Down Expand Up @@ -1268,7 +1270,7 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
readers_counter_fun := ReaderCounterFun,
options := Options} =
Conf) ->
{ok, Fd} = open(SegmentFile, [raw, binary, read]),
{ok, Fd} = open_seg(SegmentFile, [raw, binary, read]),
Cnt = make_counter(Conf),
ReaderCounterFun(1),
FilterMatcher = case Options of
Expand Down Expand Up @@ -1308,7 +1310,7 @@ last_user_chunk_id0(_, []) ->
not_found;
last_user_chunk_id0(Name, [IdxFile | Rest]) ->
%% Do not read-ahead since we read the index file backwards chunk by chunk.
{ok, IdxFd} = open(IdxFile, [read, raw, binary]),
{ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]),
{ok, EofPos} = position_at_idx_record_boundary(IdxFd, eof),
Last = last_user_chunk_id_in_index(EofPos - ?INDEX_RECORD_SIZE_B, IdxFd),
_ = file:close(IdxFd),
Expand Down Expand Up @@ -1607,7 +1609,7 @@ iter_all_records({X, I}, Acc0) ->
is_valid_chunk_on_disk(SegFile, Pos) ->
%% read a chunk from a specified location in the segment
%% then checks the CRC
case open(SegFile, [read, raw, binary]) of
case open_seg(SegFile, [read, raw, binary]) of
{ok, SegFd} ->
IsValid = case file:pread(SegFd, Pos, ?HEADER_SIZE_B) of
{ok,
Expand Down Expand Up @@ -1897,7 +1899,7 @@ last_idx_record(IdxFd) ->
nth_last_idx_record(IdxFd, 1).

nth_last_idx_record(IdxFile, N) when ?IS_STRING(IdxFile) ->
{ok, IdxFd} = open(IdxFile, [read, raw, binary]),
{ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]),
IdxRecord = nth_last_idx_record(IdxFd, N),
_ = file:close(IdxFd),
IdxRecord;
Expand All @@ -1910,7 +1912,7 @@ nth_last_idx_record(IdxFd, N) ->
end.

last_valid_idx_record(IdxFile) ->
{ok, IdxFd} = open(IdxFile, [read, raw, binary]),
{ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]),
case position_at_idx_record_boundary(IdxFd, eof) of
{ok, Pos} ->
SegFile = segment_from_index_file(IdxFile),
Expand Down Expand Up @@ -1955,7 +1957,7 @@ position_at_idx_record_boundary(IdxFd, At) ->
end.

build_segment_info(SegFile, LastChunkPos, IdxFile) ->
{ok, Fd} = open(SegFile, [read, binary, raw]),
{ok, Fd} = open_seg(SegFile, [read, binary, raw]),
%% we don't want to read blocks into page cache we are unlikely to need
_ = file:advise(Fd, 0, 0, random),
case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of
Expand Down Expand Up @@ -2215,7 +2217,7 @@ last_epoch_chunk_ids0([], undefined) ->
%% the empty stream
[];
last_epoch_chunk_ids0([IdxFile | _] = Files, undefined) ->
{ok, Fd} = open(IdxFile, [read, raw, binary]),
{ok, Fd} = open_idx(IdxFile, [read, raw, binary]),
case first_idx_record(Fd) of
{ok, ?IDX_MATCH(FstChId, FstEpoch, _)} ->
ok = file:close(Fd),
Expand All @@ -2229,7 +2231,7 @@ last_epoch_chunk_ids0([IdxFile | Rem], {PrevE, _PrevChId, EOs} = Acc0) ->
case last_valid_idx_record(IdxFile) of
{ok, ?IDX_MATCH(_LstChId, LstEpoch, _)}
when LstEpoch > PrevE ->
{ok, Fd} = open(IdxFile, [read, raw, binary]),
{ok, Fd} = open_idx(IdxFile, [read, raw, binary]),
Acc = idx_skip_search(Fd, ?IDX_HEADER_SIZE,
fun leo_search_fun/3,
Acc0),
Expand Down Expand Up @@ -2474,7 +2476,7 @@ send(ssl, Sock, Data) ->
ssl:send(Sock, Data).

last_timestamp_in_index_file(IdxFile) ->
case file:open(IdxFile, [raw, binary, read]) of
case open_idx(IdxFile, [raw, binary, read]) of
{ok, IdxFd} ->
case last_idx_record(IdxFd) of
{ok, <<_O:64/unsigned,
Expand All @@ -2493,7 +2495,7 @@ last_timestamp_in_index_file(IdxFile) ->
end.

first_timestamp_from_index_files([IdxFile | _]) ->
case file:open(IdxFile, [raw, binary, read]) of
case open_idx(IdxFile, [raw, binary, read]) of
{ok, IdxFd} ->
case first_idx_record(IdxFd) of
{ok, <<_FstO:64/unsigned,
Expand Down Expand Up @@ -2525,14 +2527,14 @@ chunk_id_range_from_idx_files(Files) ->
end.

chunk_id_range_from_idx_files(FstIdxFile, LstIdxFile) ->
{ok, LstFd} = open(LstIdxFile, [read, raw, binary]),
{ok, LstFd} = open_idx(LstIdxFile, [read, raw, binary]),
case position_at_idx_record_boundary(LstFd, eof) of
{ok, Pos} ->
case file:pread(LstFd, Pos - ?INDEX_RECORD_SIZE_B,
?INDEX_RECORD_SIZE_B) of
{ok, ?IDX_MATCH(LstChId, _, _)} ->
ok = file:close(LstFd),
{ok, FstFd} = open(FstIdxFile, [read, raw, binary]),
{ok, FstFd} = open_idx(FstIdxFile, [read, raw, binary]),
case file:pread(FstFd, ?IDX_HEADER_SIZE,
?INDEX_RECORD_SIZE_B) of
{ok, ?IDX_MATCH(FstChId, _, _)} ->
Expand Down Expand Up @@ -2650,11 +2652,11 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name,
IdxFilename = make_file_name(NextOffset, "index"),
?DEBUG_(Name, "~ts", [Filename]),
{ok, IdxFd} =
file:open(
open_idx(
filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE),
ok = file:write(IdxFd, ?IDX_HEADER),
{ok, Fd} =
file:open(
open_seg(
filename:join(Dir, Filename), ?FILE_OPTS_WRITE),
ok = file:write(Fd, ?LOG_HEADER),
%% we always move to the end of the file
Expand All @@ -2669,7 +2671,7 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name,
mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}}}.

open_index_read(File) ->
{ok, Fd} = open(File, [read, raw, binary, read_ahead]),
{ok, Fd} = open_idx(File, [read, raw, binary, read_ahead]),
%% We can't use the assertion that index header is correct because of a
%% race condition between opening the file and writing the header
%% It seems to happen when retention policies are applied
Expand All @@ -2687,7 +2689,7 @@ offset_idx_scan(Name, Offset, #seg_info{index = IndexFile} = SegmentInfo) ->
true ->
offset_out_of_range;
false ->
{ok, IdxFd} = open(IndexFile,
{ok, IdxFd} = open_idx(IndexFile,
[read, raw, binary]),
_ = file:advise(IdxFd, 0, 0, random),
{Offset, SearchResult} =
Expand Down Expand Up @@ -2720,8 +2722,19 @@ throw_missing({error, enoent}) ->
throw_missing(Any) ->
Any.

open(File, Options) ->
throw_missing(file:open(File, Options)).
open_idx(File, Options) ->
IOMod = application:get_env(osiris, io_idx_module,
?DEFAULT_IO_MODULE),
open(IOMod, File, Options).
open_seg(File, Options) ->
IOMod = application:get_env(osiris, io_segment_module,
?DEFAULT_IO_MODULE),
open_seg(IOMod, File, Options).
open_seg(IOMod, File, Options) ->
open(IOMod, File, Options).

open(Mod, File, Options) ->
throw_missing(Mod:open(File, Options)).

chunk_location_for_timestamp(Idx, Ts) ->
%% TODO: optimise using skip search approach
Expand Down Expand Up @@ -2796,7 +2809,7 @@ recover_tracking(#?MODULE{cfg = #cfg{directory = Dir,
%% we need to open a new file handle here as we cannot use the one that is
%% being used for appending to the segment as pread _may_ move the file
%% position on some systems (such as windows)
{ok, Fd} = open(filename:join(Dir, File), [read, raw, binary]),
{ok, Fd} = open_seg(filename:join(Dir, File), [read, raw, binary]),
_ = file:advise(Fd, 0, 0, random),
%% TODO: if the first chunk in the segment isn't a tracking snapshot and
%% there are prior segments we could scan at least two segments increasing
Expand Down Expand Up @@ -2946,7 +2959,7 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir,
%% log but would cause an infinite loop if it does
{end_of_stream, State};
false ->
case file:open(filename:join(Dir, SegFile),
case open_seg(filename:join(Dir, SegFile),
[raw, binary, read]) of
{ok, Fd2} ->
ok = file:close(Fd),
Expand Down Expand Up @@ -3019,7 +3032,7 @@ index_file_first_offset(IdxFile) when is_binary(IdxFile) ->
binary_to_integer(filename:basename(IdxFile, <<".index">>)).

first_last_timestamps(IdxFile) ->
case file:open(IdxFile, [raw, read, binary]) of
case open_idx(IdxFile, [raw, read, binary]) of
{ok, Fd} ->
_ = file:advise(Fd, 0, 0, random),
case first_idx_record(Fd) of
Expand Down Expand Up @@ -3107,7 +3120,6 @@ close_fd(Fd) ->
_ = file:close(Fd),
ok.


dump_init(File) ->
{ok, Fd} = file:open(File, [raw, binary, read]),
{ok, <<"OSIL", _V:4/binary>> } = file:read(Fd, ?LOG_HEADER_SIZE),
Expand Down