From 2c687580f6fa434564e7abedd3c413d8c01e3061 Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Tue, 11 Mar 2025 21:22:57 +0000 Subject: [PATCH 1/2] Very simple option to change the IO device. --- src/osiris_log.erl | 78 +++++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 32 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 8e80578..4d5849f 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -105,6 +105,8 @@ -define(SKIP_SEARCH_JUMP, 2048). +-define(DEFAULT_IO_MODULE, file). + %% Specification of the Log format. %% %% Notes: @@ -577,14 +579,14 @@ init(#{dir := Dir, ?DEBUG_(Name, " next offset ~b first offset ~b", [element(1, TailInfo), FstChId]), - {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), + {ok, SegFd} = open_seg(Filename, ?FILE_OPTS_WRITE), {ok, Size} = file:position(SegFd, Size), %% maybe_fix_corrupted_files has truncated the index to the last %% record pointing %% at a valid chunk we can now truncate the segment to size in %% case there is trailing data ok = file:truncate(SegFd), - {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), + {ok, IdxFd} = open_idx(IdxFilename, ?FILE_OPTS_WRITE), {ok, IdxEof} = file:position(IdxFd, eof), NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B, #?MODULE{cfg = Cfg, @@ -600,8 +602,8 @@ init(#{dir := Dir, index = IdxFilename, last = undefined}, _} -> %% the empty log case - {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), - {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), + {ok, SegFd} = open_seg(Filename, ?FILE_OPTS_WRITE), + {ok, IdxFd} = open_idx(IdxFilename, ?FILE_OPTS_WRITE), {ok, _} = file:position(SegFd, ?LOG_HEADER_SIZE), counters:put(Cnt, ?C_SEGMENTS, 1), %% the segment could potentially have trailing data here so we'll @@ -641,7 +643,7 @@ maybe_fix_corrupted_files([IdxFile]) -> true -> % the only index doesn't contain a single valid record % make sure it has a valid header - {ok, IdxFd} = file:open(IdxFile, ?FILE_OPTS_WRITE), + {ok, IdxFd} = open_idx(IdxFile, ?FILE_OPTS_WRITE), ok = file:write(IdxFd, ?IDX_HEADER), ok = file:close(IdxFd); false -> @@ -651,7 +653,7 @@ maybe_fix_corrupted_files([IdxFile]) -> true -> % the only segment doesn't contain a single valid chunk % make sure it has a valid header - {ok, SegFd} = file:open(SegFile, ?FILE_OPTS_WRITE), + {ok, SegFd} = open_seg(SegFile, ?FILE_OPTS_WRITE), ok = file:write(SegFd, ?LOG_HEADER), ok = file:close(SegFd); false -> @@ -698,7 +700,7 @@ truncate_invalid_idx_records(IdxFile, SegSize) -> % add an option to perform a full segment scan and reconstruct % the index for the valid chunks. SegFile = segment_from_index_file(IdxFile), - {ok, IdxFd} = open(IdxFile, [raw, binary, write, read]), + {ok, IdxFd} = open_idx(IdxFile, [raw, binary, write, read]), {ok, Pos} = position_at_idx_record_boundary(IdxFd, eof), ok = skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos), ok = file:truncate(IdxFd), @@ -956,9 +958,9 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> %% the Chunk id was found and has the right epoch %% lets truncate to this point %% FilePos could be eof here which means the next offset - {ok, Fd} = file:open(File, [read, write, binary, raw]), + {ok, Fd} = open_seg(File, [read, write, binary, raw]), _ = file:advise(Fd, 0, 0, random), - {ok, IdxFd} = file:open(IdxFile, [read, write, binary, raw]), + {ok, IdxFd} = open_idx(IdxFile, [read, write, binary, raw]), NextPos = next_chunk_pos(Fd, Pos), {ok, _} = file:position(Fd, NextPos), @@ -1047,7 +1049,7 @@ init_data_reader_at(ChunkId, FilePos, File, #{dir := Dir, name := Name, shared := Shared, readers_counter_fun := CountersFun} = Config) -> - case file:open(File, [raw, binary, read]) of + case open_seg(File, [raw, binary, read]) of {ok, Fd} -> Cnt = make_counter(Config), counters:put(Cnt, ?C_OFFSET, ChunkId - 1), @@ -1157,7 +1159,7 @@ init_offset_reader0({timestamp, Ts}, #{} = Conf) -> SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); {first_in, IdxFile} -> - {ok, Fd} = file:open(IdxFile, [raw, binary, read]), + {ok, Fd} = open_seg(IdxFile, [raw, binary, read]), {ok, ?IDX_MATCH(ChunkId, _, FilePos)} = first_idx_record(Fd), SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); @@ -1268,7 +1270,7 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, readers_counter_fun := ReaderCounterFun, options := Options} = Conf) -> - {ok, Fd} = open(SegmentFile, [raw, binary, read]), + {ok, Fd} = open_seg(SegmentFile, [raw, binary, read]), Cnt = make_counter(Conf), ReaderCounterFun(1), FilterMatcher = case Options of @@ -1308,7 +1310,7 @@ last_user_chunk_id0(_, []) -> not_found; last_user_chunk_id0(Name, [IdxFile | Rest]) -> %% Do not read-ahead since we read the index file backwards chunk by chunk. - {ok, IdxFd} = open(IdxFile, [read, raw, binary]), + {ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]), {ok, EofPos} = position_at_idx_record_boundary(IdxFd, eof), Last = last_user_chunk_id_in_index(EofPos - ?INDEX_RECORD_SIZE_B, IdxFd), _ = file:close(IdxFd), @@ -1607,7 +1609,7 @@ iter_all_records({X, I}, Acc0) -> is_valid_chunk_on_disk(SegFile, Pos) -> %% read a chunk from a specified location in the segment %% then checks the CRC - case open(SegFile, [read, raw, binary]) of + case open_seg(SegFile, [read, raw, binary]) of {ok, SegFd} -> IsValid = case file:pread(SegFd, Pos, ?HEADER_SIZE_B) of {ok, @@ -1897,7 +1899,7 @@ last_idx_record(IdxFd) -> nth_last_idx_record(IdxFd, 1). nth_last_idx_record(IdxFile, N) when ?IS_STRING(IdxFile) -> - {ok, IdxFd} = open(IdxFile, [read, raw, binary]), + {ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]), IdxRecord = nth_last_idx_record(IdxFd, N), _ = file:close(IdxFd), IdxRecord; @@ -1910,7 +1912,7 @@ nth_last_idx_record(IdxFd, N) -> end. last_valid_idx_record(IdxFile) -> - {ok, IdxFd} = open(IdxFile, [read, raw, binary]), + {ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]), case position_at_idx_record_boundary(IdxFd, eof) of {ok, Pos} -> SegFile = segment_from_index_file(IdxFile), @@ -1955,7 +1957,7 @@ position_at_idx_record_boundary(IdxFd, At) -> end. build_segment_info(SegFile, LastChunkPos, IdxFile) -> - {ok, Fd} = open(SegFile, [read, binary, raw]), + {ok, Fd} = open_seg(SegFile, [read, binary, raw]), %% we don't want to read blocks into page cache we are unlikely to need _ = file:advise(Fd, 0, 0, random), case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of @@ -2215,7 +2217,7 @@ last_epoch_chunk_ids0([], undefined) -> %% the empty stream []; last_epoch_chunk_ids0([IdxFile | _] = Files, undefined) -> - {ok, Fd} = open(IdxFile, [read, raw, binary]), + {ok, Fd} = open_idx(IdxFile, [read, raw, binary]), case first_idx_record(Fd) of {ok, ?IDX_MATCH(FstChId, FstEpoch, _)} -> ok = file:close(Fd), @@ -2229,7 +2231,7 @@ last_epoch_chunk_ids0([IdxFile | Rem], {PrevE, _PrevChId, EOs} = Acc0) -> case last_valid_idx_record(IdxFile) of {ok, ?IDX_MATCH(_LstChId, LstEpoch, _)} when LstEpoch > PrevE -> - {ok, Fd} = open(IdxFile, [read, raw, binary]), + {ok, Fd} = open_idx(IdxFile, [read, raw, binary]), Acc = idx_skip_search(Fd, ?IDX_HEADER_SIZE, fun leo_search_fun/3, Acc0), @@ -2474,7 +2476,7 @@ send(ssl, Sock, Data) -> ssl:send(Sock, Data). last_timestamp_in_index_file(IdxFile) -> - case file:open(IdxFile, [raw, binary, read]) of + case open_idx(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> case last_idx_record(IdxFd) of {ok, <<_O:64/unsigned, @@ -2493,7 +2495,7 @@ last_timestamp_in_index_file(IdxFile) -> end. first_timestamp_from_index_files([IdxFile | _]) -> - case file:open(IdxFile, [raw, binary, read]) of + case open_idx(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> case first_idx_record(IdxFd) of {ok, <<_FstO:64/unsigned, @@ -2525,14 +2527,14 @@ chunk_id_range_from_idx_files(Files) -> end. chunk_id_range_from_idx_files(FstIdxFile, LstIdxFile) -> - {ok, LstFd} = open(LstIdxFile, [read, raw, binary]), + {ok, LstFd} = open_idx(LstIdxFile, [read, raw, binary]), case position_at_idx_record_boundary(LstFd, eof) of {ok, Pos} -> case file:pread(LstFd, Pos - ?INDEX_RECORD_SIZE_B, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(LstChId, _, _)} -> ok = file:close(LstFd), - {ok, FstFd} = open(FstIdxFile, [read, raw, binary]), + {ok, FstFd} = open_idx(FstIdxFile, [read, raw, binary]), case file:pread(FstFd, ?IDX_HEADER_SIZE, ?INDEX_RECORD_SIZE_B) of {ok, ?IDX_MATCH(FstChId, _, _)} -> @@ -2650,11 +2652,11 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, IdxFilename = make_file_name(NextOffset, "index"), ?DEBUG_(Name, "~ts", [Filename]), {ok, IdxFd} = - file:open( + open_idx( filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE), ok = file:write(IdxFd, ?IDX_HEADER), {ok, Fd} = - file:open( + open_seg( filename:join(Dir, Filename), ?FILE_OPTS_WRITE), ok = file:write(Fd, ?LOG_HEADER), %% we always move to the end of the file @@ -2669,7 +2671,7 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}}}. open_index_read(File) -> - {ok, Fd} = open(File, [read, raw, binary, read_ahead]), + {ok, Fd} = open_idx(File, [read, raw, binary, read_ahead]), %% We can't use the assertion that index header is correct because of a %% race condition between opening the file and writing the header %% It seems to happen when retention policies are applied @@ -2687,7 +2689,7 @@ offset_idx_scan(Name, Offset, #seg_info{index = IndexFile} = SegmentInfo) -> true -> offset_out_of_range; false -> - {ok, IdxFd} = open(IndexFile, + {ok, IdxFd} = open_idx(IndexFile, [read, raw, binary]), _ = file:advise(IdxFd, 0, 0, random), {Offset, SearchResult} = @@ -2720,8 +2722,21 @@ throw_missing({error, enoent}) -> throw_missing(Any) -> Any. +open_idx(File, Options) -> + IOMod = application:get_env(osiris, io_idx_module, + ?DEFAULT_IO_MODULE), + open(IOMod, File, Options). +open_seg(File, Options) -> + IOMod = application:get_env(osiris, io_segment_module, + ?DEFAULT_IO_MODULE), + open_seg(IOMod, File, Options). +open_seg(IOMod, File, Options) -> + open(IOMod, File, Options). + open(File, Options) -> - throw_missing(file:open(File, Options)). + open(file, File, Options). +open(Mod, File, Options) -> + throw_missing(Mod:open(File, Options)). chunk_location_for_timestamp(Idx, Ts) -> %% TODO: optimise using skip search approach @@ -2796,7 +2811,7 @@ recover_tracking(#?MODULE{cfg = #cfg{directory = Dir, %% we need to open a new file handle here as we cannot use the one that is %% being used for appending to the segment as pread _may_ move the file %% position on some systems (such as windows) - {ok, Fd} = open(filename:join(Dir, File), [read, raw, binary]), + {ok, Fd} = open_seg(filename:join(Dir, File), [read, raw, binary]), _ = file:advise(Fd, 0, 0, random), %% TODO: if the first chunk in the segment isn't a tracking snapshot and %% there are prior segments we could scan at least two segments increasing @@ -2946,7 +2961,7 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir, %% log but would cause an infinite loop if it does {end_of_stream, State}; false -> - case file:open(filename:join(Dir, SegFile), + case open_seg(filename:join(Dir, SegFile), [raw, binary, read]) of {ok, Fd2} -> ok = file:close(Fd), @@ -3019,7 +3034,7 @@ index_file_first_offset(IdxFile) when is_binary(IdxFile) -> binary_to_integer(filename:basename(IdxFile, <<".index">>)). first_last_timestamps(IdxFile) -> - case file:open(IdxFile, [raw, read, binary]) of + case open_idx(IdxFile, [raw, read, binary]) of {ok, Fd} -> _ = file:advise(Fd, 0, 0, random), case first_idx_record(Fd) of @@ -3107,7 +3122,6 @@ close_fd(Fd) -> _ = file:close(Fd), ok. - dump_init(File) -> {ok, Fd} = file:open(File, [raw, binary, read]), {ok, <<"OSIL", _V:4/binary>> } = file:read(Fd, ?LOG_HEADER_SIZE), From 3bb09b300580fb8279b6a5a22eaebe0eda81fb1b Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Tue, 11 Mar 2025 21:32:56 +0000 Subject: [PATCH 2/2] Fix warning --- src/osiris_log.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 4d5849f..5020c59 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -2733,8 +2733,6 @@ open_seg(File, Options) -> open_seg(IOMod, File, Options) -> open(IOMod, File, Options). -open(File, Options) -> - open(file, File, Options). open(Mod, File, Options) -> throw_missing(Mod:open(File, Options)).