Skip to content

Commit

Permalink
read in fixed size chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
noa-neria committed Oct 13, 2024
1 parent 53c7b45 commit 615fede
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
23 changes: 14 additions & 9 deletions cpp/streamer/impl/batch/batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,29 @@ void Batch::read(const Config & config)
}

auto file_offset = range.start;
char * buffer = dst;
size_t num_chunks = range.size / config.fs_block_bytesize;

// seek just once because tasks are consecutive within the range
_reader->seek(file_offset);
char * buffer = dst;

for (const auto & task : tasks)
// read task's range in chunks
for (size_t i = 0; i < num_chunks; ++i)
{
// read task's range in blocks
size_t to_read = task.info.bytesize;

_reader->read(to_read, buffer);
LOG(SPAM) << "Copied " << to_read << " bytes to " << std::hex << static_cast<void*>(buffer) << " from file offset " << file_offset;
_reader->read(config.fs_block_bytesize, buffer);

file_offset += to_read;
buffer += to_read;
file_offset += config.fs_block_bytesize;
buffer += config.fs_block_bytesize;

finished_until(file_offset, common::ResponseCode::Success);
}

if (file_offset < range.end)
{
_reader->read(range.end - file_offset, buffer);
finished_until(range.end, common::ResponseCode::Success);
}

LOG(DEBUG) << "Finished reading successfuly from file " << path;
}

Expand Down
7 changes: 5 additions & 2 deletions cpp/streamer/impl/streamer/streamer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,18 @@ TEST(Async, End_Of_File_Error)
const auto data = utils::random::buffer(chunk_size);
utils::temp::File file(data);

size_t num_bulks = chunk_size/bulk_size;
size_t successful_bytesize = num_bulks * bulk_size;
size_t total = 0;
unsigned expected = 0;
while (total < chunk_size && expected < num_chunks)

while (total < successful_bytesize && expected < num_chunks)
{
total += chunks[expected];
++expected;
}

expected = (total <= chunk_size ? expected : expected - 1);
expected = (total <= successful_bytesize ? expected : expected - 1);

Config config(utils::random::number(1, 20), chunk_size, bulk_size, false /* do not enforce minimum */);
Streamer streamer(config);
Expand Down

0 comments on commit 615fede

Please sign in to comment.