Skip to content

Commit

Permalink
Vector Read merge (xrootd#42)
Browse files Browse the repository at this point in the history
* variable rpm name (#17)

* variable rpm name

* Update xrootd-ceph.spec.in

* Update makesrpm.sh

* Update makesrpm.sh

* Master cephnamelib (#16)

* Allow ceph.namelib to take params and apply translation to full path

* Reduce logging

Remove extraneous logging messages

* simplify parsing of namelib and added a log line for any remapped file

Co-authored-by: James <[email protected]>

* XRD-22 Fix ensuring the correct filename is passed to the CephFile instance. (#24)

A regression in previous commit meant that the filename was not correctly passed
to the CephFile instance. This fix ensures that the filename is set correctly.

Co-authored-by: james <[email protected]>

* XRD-12 Add timestamp information for ceph logging methods

Update the logwrapper method to print out the current timestamp in the initial section of output.

* re-introduce variable names to spec input (xrootd#27)

* Return permission denied on write attempt on existing file with EXCL set (xrootd#31)

Co-authored-by: James Walder <[email protected]>

* disable posc (xrootd#30)

posc is disabled for proxies, but not for a unified setup. XrdCeph does not support the posc flag as it misinterprets objects as folders

* Disk space reporting (xrootd#36)

* Provide XrdCephOss::StatLS and ceph_posix_stat_pool to enable disk space reporting. Responds to the 'xrdfs query space' command as requested by ALICE VO

* Remove ts() timestamp function and unnecessary #defines

* Read ceph.poolnames setting from XRootD config to specify reportable pools.

* Support 'xrdfs spaceinfo' via Stat() method returning XrdOssOK for stat'ing 'pool:'

* Tidy up tracing of Stat* calls

* Remove unwanted method isPathReportablePool

* Add comments for need to support stat-ing '/'

* Return -ENOMEM if malloc fails

* Return -ENOMEM if malloc fails

* Rename disk space reporting config item to ceph,reportingppols and log if the list of names is not present. Report if ceph_posix_stat_pool call to get the amount of used space fails

* Sanitize incoming pool name and allow for MonALISA format

* Optional tracing of Stat* incoming paths and response. Remove double logging of ceph.reporting pools.

* Check that sanitized pool name is not marked invalid

* Use ceph namelib translation at Oss level by copying translateFileName logic from Posix level. More error checking if stat can't find pool name.

* Remove superfluous comments

* Ensure tracing of path arguments to Stat() and StatLS(). Add Doxygen-style commments to changed methods

* Make source tarball only as minimum output

* Add make-src-tar.sh to additionally place required source tarball in '--output' destination

* Change back usedSpace to totalSpace in ceph_posix_statfs

* feat: improve (vector) read implementation (xrootd#37)

Try to avoid usage of libradosstriper for readv operations
since it may impact performance significantly. To do so we explicitly
determine the objects that constitute a file and read from them using
rados only. Reads are async.

To do these async reads conveniently we introduce a class for handling
multiple async read requests.

* Initial implementation of ReadV at the XrdOss level

* Correct the signature of ReadV to XrdCephOssFile

* feat: do not use libradosstiper for readv operation

* feat: use atomic operations for readv requests

This should be the most efficient way of handling multiple read ops.

* feat: use nonstriper reads for pread requests

* feat: use nonstriper reads for read operations also

To do so we do complete refactoring: bulkAioRead class moved to a
separate file, and its features extended. Namely, it can do reads
from files, not only objects, now.

* feat: print warning message if waiting for aio reads from ceph takes long

This is useful for debugging the reasons of failures for read(v) requests.

* Added some comments

* fix: use size_t for start_block

We can use "%zx" in sprintf, so let's unify the types of variables in
the function. This will also allow us to extend limitations on the
file size.

* feat: refactor BulkAioRead::read method, suggested during review

1. Rename end_block to last_block
2. Move variable definitions closer to its usage
3. Use 'std::min' instead of 'if' for chunk_len determination
4. Use more efficient chunk_start calculation

* feat: add options to allow one to switch to standard read mechanisms

This may be useful for testing.

* feat: rename block_size to object_size in BulkAioRead

New name better describes reality, since we are talking about the size
of ceph objects.

* feat: rename wait_for_complete to submit_and_wait_for_complete

New name describes this function better.

* feat: use more meaningful names for variables that loops over operations map

op_data should describe the contents of the variables better.

* feat: move type definitions into the class

* feat: added comments with method's description

* feat: remove unnecessary semicolons

* feat: convert wait_for_complete method from void to int

This allow one to improve several things. Here we change key to the
operations and use object number instead of full its name.

* fix: fixed comment

* fix: fixed comments

* feat: refactor bulkAioRead class

Pointers were dropped from objectReadOperation and ceph_bufferlist objects.
The objects are moved to appropriate classes to simplify memory management
and usage.

* feat: take into account completion's return value

We can retrieve return code from completion and get meaningful status
of the whole operation with this value.

* feat: allow reading of sparse file

Since we do not really expect sparse files, we use a fallback mechanism:
if a read(v) failed with -ENOENT exit status, then just resubmit it using
striper-based functions.

* lint: remove trailing whitespaces

* feat: use meaningful names for read(v) functions

The name now indicates whether read(v)s are striper or non-striper
ones.

* feat: fallback to striper-based read if number of stripes > 1

Just in case, such files should not be present in our production setup

* feat: allow zero-sized reads

In principle, this is a correct request, so we should support it.

* fix: make sure we do not delete completion objects until submitted operation is completed

This is done to prevent some nasty side-effects, e.g. writing to a deleted buffer.

* fix: remove move constructor from bulkAioRead

We do not use it.

* fix: handle failure to allocate completion

Completion allocation can fail, we should take that into an account.

* feat: use file reference to construct readOp objects

There is no need to extract (and the copy) file name and object size
from file reference to construct read object, we can use file reference
directly.

* feat: replace conversion operator with explicit method

Implicit conversion was making code less readable.

* feat: remove call to is_complete() in completion wrapper destructor

There is no need to check for completion, we can call wait_for_complete
multiple times.

* feat: put warning threshold to config file

It is better to have this value as configurable instead of hardcoded.

* fix: initialize return code variable in ReadOpData

* Added comment

* feat: add comment for future optimization.

We should use `aio_cancel` to cancel all pending read operations in future.

* fix: remove vim's swp file

Commited by accident

* feat: improve logging

Add file descriptor to sparse file's logging, fix typos.

* fix: minor fixes

Remove unnecessary include, move variable declaration closer to the
usage, fix spelling in the comment.

* feat: BulkAioRead::read method refactoring

Refactoring was made to increase (hopefully) readability.

* fix: better wording for comment

* feat: BulkAioRead::read -- change loop exit condition

We can exit when `to_read == 0`. This allow us to drop `end_block`
variable.

* fix: add call to `clear` after getting results

This is to allow clients to use the same readOp object for future
operations.

---------

Co-authored-by: Ian Johnson <[email protected]>
Co-authored-by: Alexander Rogovskiy <[email protected]>

* duplicate struct definition

* move struct definition to headers

* use bufferedIO version of path

* remove MAXPATHLEN redefinition

---------

Co-authored-by: snafus <[email protected]>
Co-authored-by: James <[email protected]>
Co-authored-by: root <[email protected]>
Co-authored-by: Ian Johnson <[email protected]>
Co-authored-by: alex-rg <[email protected]>
Co-authored-by: Alexander Rogovskiy <[email protected]>

Buffered io nonstriperbuffer (xrootd#43)

* Add capability for buffer io raw to use striperless reads

* Add capability for buffer io raw to use striperless reads

* Add a maybe striper for reading in ceph posix

* Use striperless reads when bypassing the buffer

feat: improve (vector) read implementation (xrootd#37)

Try to avoid usage of libradosstriper for readv operations
since it may impact performance significantly. To do so we explicitly
determine the objects that constitute a file and read from them using
rados only. Reads are async.

To do these async reads conveniently we introduce a class for handling
multiple async read requests.

* Initial implementation of ReadV at the XrdOss level

* Correct the signature of ReadV to XrdCephOssFile

* feat: do not use libradosstiper for readv operation

* feat: use atomic operations for readv requests

This should be the most efficient way of handling multiple read ops.

* feat: use nonstriper reads for pread requests

* feat: use nonstriper reads for read operations also

To do so we do complete refactoring: bulkAioRead class moved to a
separate file, and its features extended. Namely, it can do reads
from files, not only objects, now.

* feat: print warning message if waiting for aio reads from ceph takes long

This is useful for debugging the reasons of failures for read(v) requests.

* Added some comments

* fix: use size_t for start_block

We can use "%zx" in sprintf, so let's unify the types of variables in
the function. This will also allow us to extend limitations on the
file size.

* feat: refactor BulkAioRead::read method, suggested during review

1. Rename end_block to last_block
2. Move variable definitions closer to its usage
3. Use 'std::min' instead of 'if' for chunk_len determination
4. Use more efficient chunk_start calculation

* feat: add options to allow one to switch to standard read mechanisms

This may be useful for testing.

* feat: rename block_size to object_size in BulkAioRead

New name better describes reality, since we are talking about the size
of ceph objects.

* feat: rename wait_for_complete to submit_and_wait_for_complete

New name describes this function better.

* feat: use more meaningful names for variables that loops over operations map

op_data should describe the contents of the variables better.

* feat: move type definitions into the class

* feat: added comments with method's description

* feat: remove unnecessary semicolons

* feat: convert wait_for_complete method from void to int

This allow one to improve several things. Here we change key to the
operations and use object number instead of full its name.

* fix: fixed comment

* fix: fixed comments

* feat: refactor bulkAioRead class

Pointers were dropped from objectReadOperation and ceph_bufferlist objects.
The objects are moved to appropriate classes to simplify memory management
and usage.

* feat: take into account completion's return value

We can retrieve return code from completion and get meaningful status
of the whole operation with this value.

* feat: allow reading of sparse file

Since we do not really expect sparse files, we use a fallback mechanism:
if a read(v) failed with -ENOENT exit status, then just resubmit it using
striper-based functions.

* lint: remove trailing whitespaces

* feat: use meaningful names for read(v) functions

The name now indicates whether read(v)s are striper or non-striper
ones.

* feat: fallback to striper-based read if number of stripes > 1

Just in case, such files should not be present in our production setup

* feat: allow zero-sized reads

In principle, this is a correct request, so we should support it.

* fix: make sure we do not delete completion objects until submitted operation is completed

This is done to prevent some nasty side-effects, e.g. writing to a deleted buffer.

* fix: remove move constructor from bulkAioRead

We do not use it.

* fix: handle failure to allocate completion

Completion allocation can fail, we should take that into an account.

* feat: use file reference to construct readOp objects

There is no need to extract (and the copy) file name and object size
from file reference to construct read object, we can use file reference
directly.

* feat: replace conversion operator with explicit method

Implicit conversion was making code less readable.

* feat: remove call to is_complete() in completion wrapper destructor

There is no need to check for completion, we can call wait_for_complete
multiple times.

* feat: put warning threshold to config file

It is better to have this value as configurable instead of hardcoded.

* fix: initialize return code variable in ReadOpData

* Added comment

* feat: add comment for future optimization.

We should use `aio_cancel` to cancel all pending read operations in future.

* fix: remove vim's swp file

Commited by accident

* feat: improve logging

Add file descriptor to sparse file's logging, fix typos.

* fix: minor fixes

Remove unnecessary include, move variable declaration closer to the
usage, fix spelling in the comment.

* feat: BulkAioRead::read method refactoring

Refactoring was made to increase (hopefully) readability.

* fix: better wording for comment

* feat: BulkAioRead::read -- change loop exit condition

We can exit when `to_read == 0`. This allow us to drop `end_block`
variable.

* fix: add call to `clear` after getting results

This is to allow clients to use the same readOp object for future
operations.

---------

Co-authored-by: Ian Johnson <[email protected]>
Co-authored-by: Alexander Rogovskiy <[email protected]>

Update XrdCephBufferAlgSimple.cc (xrootd#45)

Remove verbose logging for case when cache is bypassed, as the read size is at least the size of the buffer.

XRD-22 Fix ensuring the correct filename is passed to the CephFile instance. (#24)

A regression in previous commit meant that the filename was not correctly passed
to the CephFile instance. This fix ensures that the filename is set correctly.

Co-authored-by: james <[email protected]>
  • Loading branch information
2 people authored and amadio committed Feb 18, 2025
1 parent 7545f49 commit f38f10d
Show file tree
Hide file tree
Showing 13 changed files with 640 additions and 49 deletions.
13 changes: 9 additions & 4 deletions src/XrdCeph/XrdCephBuffers/CephIOAdapterRaw.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ using namespace XrdCephBuffer;
using myclock = std::chrono::steady_clock;
//using myseconds = std::chrono::duration<float,

CephIOAdapterRaw::CephIOAdapterRaw(IXrdCephBufferData * bufferdata, int fd) :
m_bufferdata(bufferdata),m_fd(fd) {
CephIOAdapterRaw::CephIOAdapterRaw(IXrdCephBufferData * bufferdata, int fd,
bool useStriperlessReads) :
m_bufferdata(bufferdata),m_fd(fd),
m_useStriperlessReads(useStriperlessReads) {
}

CephIOAdapterRaw::~CephIOAdapterRaw() {
Expand All @@ -30,7 +32,9 @@ CephIOAdapterRaw::~CephIOAdapterRaw() {
<< " write_MBs:" << write_speed
<< " nread:" << m_stats_read_req << " bytesread:" << m_stats_read_bytes << " read_s:"
<< m_stats_read_timer * 1e-3 << " readmax_s:" << m_stats_read_longest * 1e-3
<< " read_MBs:" << read_speed );
<< " read_MBs:" << read_speed
<< " striperlessRead: " << m_useStriperlessReads
);

}

Expand Down Expand Up @@ -60,10 +64,11 @@ ssize_t CephIOAdapterRaw::read(off64_t offset, size_t count) {
if (!buf) {
return -EINVAL;
}
ssize_t rc {0};

// no check is made whether the buffer has sufficient capacity
auto start = std::chrono::steady_clock::now();
ssize_t rc = ceph_posix_pread(m_fd,buf,count,offset);
rc = ceph_posix_maybestriper_pread(m_fd,buf,count,offset, m_useStriperlessReads);
auto end = std::chrono::steady_clock::now();
//auto elapsed = end-start;
auto int_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end-start);
Expand Down
4 changes: 3 additions & 1 deletion src/XrdCeph/XrdCephBuffers/CephIOAdapterRaw.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ namespace XrdCephBuffer {
*/
class CephIOAdapterRaw: public virtual ICephIOAdapter {
public:
CephIOAdapterRaw(IXrdCephBufferData * bufferdata, int fd);
CephIOAdapterRaw(IXrdCephBufferData * bufferdata, int fd,
bool useStriperlessReads);
virtual ~CephIOAdapterRaw();

/**
Expand Down Expand Up @@ -57,6 +58,7 @@ class CephIOAdapterRaw: public virtual ICephIOAdapter {
private:
IXrdCephBufferData * m_bufferdata; //!< no ownership of pointer (consider shared ptrs, etc)
int m_fd;
bool m_useStriperlessReads {true}; //!< use the striperless read code

// timer and counter info
std::atomic< long> m_stats_read_timer{0}, m_stats_write_timer{0};
Expand Down
13 changes: 8 additions & 5 deletions src/XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ using namespace XrdCephBuffer;


XrdCephBufferAlgSimple::XrdCephBufferAlgSimple(std::unique_ptr<IXrdCephBufferData> buffer,
std::unique_ptr<ICephIOAdapter> cephio, int fd ):
m_bufferdata(std::move(buffer)), m_cephio(std::move(cephio)), m_fd(fd){
std::unique_ptr<ICephIOAdapter> cephio, int fd,
bool useStriperlessReads):
m_bufferdata(std::move(buffer)), m_cephio(std::move(cephio)), m_fd(fd),
m_useStriperlessReads(useStriperlessReads) {

}

Expand Down Expand Up @@ -105,13 +107,14 @@ ssize_t XrdCephBufferAlgSimple::read(volatile void *buf, off_t offset, size_t
* Invalidate the cache in anycase
*/
if (blen >= m_bufferdata->capacity()) {
BUFLOG("XrdCephBufferAlgSimple::read: Readthrough cache: fd: " << m_fd
<< " " << offset << " " << blen);
//BUFLOG("XrdCephBufferAlgSimple::read: Readthrough cache: fd: " << m_fd
// << " " << offset << " " << blen);
// larger than cache, so read through, and invalidate the cache anyway
m_bufferdata->invalidate();
m_bufferLength =0; // ensure cached data is set to zero length
// #FIXME JW: const_cast is probably a bit poor.
ssize_t rc = ceph_posix_pread(m_fd, const_cast<void*>(buf), blen, offset);

ssize_t rc = ceph_posix_maybestriper_pread (m_fd, const_cast<void*>(buf), blen, offset, m_useStriperlessReads);
if (rc > 0) {
m_stats_bytes_fromceph += rc;
m_stats_bytes_toclient += rc;
Expand Down
4 changes: 3 additions & 1 deletion src/XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ namespace XrdCephBuffer {

class XrdCephBufferAlgSimple : public virtual IXrdCephBufferAlg {
public:
XrdCephBufferAlgSimple(std::unique_ptr<IXrdCephBufferData> buffer, std::unique_ptr<ICephIOAdapter> cephio, int fd );
XrdCephBufferAlgSimple(std::unique_ptr<IXrdCephBufferData> buffer, std::unique_ptr<ICephIOAdapter> cephio, int fd,
bool useStriperlessReads = true );
virtual ~XrdCephBufferAlgSimple();

virtual ssize_t read_aio (XrdSfsAio *aoip) override;
Expand All @@ -49,6 +50,7 @@ class XrdCephBufferAlgSimple : public virtual IXrdCephBufferAlg {
std::unique_ptr<IXrdCephBufferData> m_bufferdata; //! this algorithm takes ownership of the buffer, and will delete it on destruction
std::unique_ptr<ICephIOAdapter> m_cephio ; // no ownership is taken here
int m_fd = -1;
bool m_useStriperlessReads {true};

off_t m_bufferStartingOffset = 0;
size_t m_bufferLength = 0;
Expand Down
198 changes: 198 additions & 0 deletions src/XrdCeph/XrdCephBulkAioRead.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
#include "XrdCephBulkAioRead.hh"


bulkAioRead::bulkAioRead(librados::IoCtx* ct, logfunc_pointer logwrapper, CephFileRef* fileref) {
/**
* Constructor.
*
* @param ct Rados IoContext object
* @param logfunc_pointer Pointer to the function that will be used for logging
* @param fileref Ceph file reference
*
*/
context = ct;
file_ref = fileref;
log_func = logwrapper;
}

bulkAioRead::~bulkAioRead() {
/**
* Destructor. Just clears dynamically allocated memroy.
*/
clear();
}

void bulkAioRead::clear() {
/**
* Clear all dynamically alocated memory
*/
operations.clear();
buffers.clear();
}

int bulkAioRead::addRequest(size_t obj_idx, char* out_buf, size_t size, off64_t offset) {
/**
* Prepare read request for a single ceph object. Private method.
*
* Method will allocate all (well, almost, except the string for the object name)
* necessary objects to submit read request to ceph. To submit the requests use
* `submit_and_wait_for_complete` method.
*
* @param obj_idx number of the object (starting from zero) to read
* @param out_buf output buffer, where read results should be stored
* @param size number of bytes to read
* @param offset offset in bytes where the read should start. Note that the offset is local to the
* ceph object. I.e. if offset is 0 and object number is 1, yo'll be reading from the
* start of the second object, not from the begining of the file.
*
* @return zero on success, negative error code on failure
*/

try{
auto &op_data = operations[obj_idx];
//When we start using C++17, the next two lines can be merged
buffers.emplace_back(out_buf);
auto &buf = buffers.back();
op_data.ceph_read_op.read(offset, size, &buf.bl, &buf.rc);
} catch (std::bad_alloc&) {
log_func((char*)"Memory allocation failed while reading file %s", file_ref->name.c_str());
return -ENOMEM;
}
return 0;
}

int bulkAioRead::submit_and_wait_for_complete() {
/**
* Submit previously prepared read requests and wait for their completion
*
* To prepare read requests use `read` or `addRequest` methods.
*
* @return zero on success, negative error code on failure
*
*/

for (auto &op_data: operations) {
size_t obj_idx = op_data.first;
//16 bytes for object hex number, 1 for dot and 1 for null-terminator
char object_suffix[18];
int sp_bytes_written;
sp_bytes_written = snprintf(object_suffix, sizeof(object_suffix), ".%016zx", obj_idx);
if (sp_bytes_written >= (int) sizeof(object_suffix)) {
log_func((char*)"Can not fit object suffix into buffer for file %s -- too big\n", file_ref->name.c_str());
return -EFBIG;
}

std::string obj_name;
try {
obj_name = file_ref->name + std::string(object_suffix);
} catch (std::bad_alloc&) {
log_func((char*)"Can not create object string for file %s)", file_ref->name.c_str());
return -ENOMEM;
}
context->aio_operate(obj_name, op_data.second.cmpl.use(), &op_data.second.ceph_read_op, 0);
}

for (auto &op_data: operations) {
op_data.second.cmpl.wait_for_complete();
int rval = op_data.second.cmpl.get_return_value();
/*
* Optimization is possible here: cancel all remaining read operations after the failure.
* One way to do so is the following: add context as an argument to the `use` method of CmplPtr.
* Then inside the class this pointer can be saved and used by the destructor to call
* `aio_cancel` (and probably `wait_for_complete`) before releasing the completion.
* Though one need to clarify whether it is necessary to cal `wait_for_complete` after
* `aio_cancel` (i.e. may the status variable/bufferlist still be written to or not).
*/
if (rval < 0) {
log_func((char*)"Read of the object %ld for file %s failed", op_data.first, file_ref->name.c_str());
return rval;
}
}
return 0;
}

ssize_t bulkAioRead::get_results() {
/**
* Copy the results of executed read requests from ceph's bufferlists to client's buffers
*
* Note that this method should be called only after the submission and completion of read
* requests, i.e. after `submit_and_wait_for_complete` method.
*
* @return cumulative number of bytes read (by all read operations) on success, negative
* error code on failure
*
*/

ssize_t res = 0;
for (ReadOpData &op_data: buffers) {
if (op_data.rc < 0) {
//Is it possible to get here?
log_func((char*)"One of the reads failed with rc %d", op_data.rc);
return op_data.rc;
}
op_data.bl.begin().copy(op_data.bl.length(), op_data.out_buf);
res += op_data.bl.length();
}
//We should clear used completions to allow new operations
clear();
return res;
}

int bulkAioRead::read(void* out_buf, size_t req_size, off64_t offset) {
/**
* Declare a read operation for file.
*
* Read coordinates are global, i.e. valid offsets are from 0 to the <file_size> -1, valid request sizes
* are from 0 to INF. Method can be called multiple times to declare multiple read
* operations on the same file.
*
* @param out_buf output buffer, where read results should be stored
* @param req_size number of bytes to read
* @param offset offset in bytes where the read should start. Note that the offset is global,
* i.e. refers to the whole file, not individual ceph objects
*
* @return zero on success, negative error code on failure
*
*/

if (req_size == 0) {
log_func((char*)"Zero-length read request for file %s, probably client error", file_ref->name.c_str());
return 0;
}

char* const buf_start_ptr = (char*) out_buf;

size_t object_size = file_ref->objectSize;
//The amount of bytes that is yet to be read
size_t to_read = req_size;
//block means ceph object here
size_t start_block = offset / object_size;
size_t buf_pos = 0;
size_t chunk_start = offset % object_size;

while (to_read > 0) {
size_t chunk_len = std::min(to_read, object_size - chunk_start);

if (buf_pos >= req_size) {
log_func((char*)"Internal bug! Attempt to read %lu data for block (%lu, %lu) of file %s\n", buf_pos, offset, req_size, file_ref->name.c_str());
return -EINVAL;
}

int rc = addRequest(start_block, buf_start_ptr + buf_pos, chunk_len, chunk_start);
if (rc < 0) {
log_func((char*)"Unable to submit async read request, rc=%d\n", rc);
return rc;
}

buf_pos += chunk_len;

start_block++;
chunk_start = 0;
if (chunk_len > to_read) {
log_func((char*)"Internal bug! Read %lu bytes, more than expected %lu bytes for block (%lu, %lu) of file %s\n", chunk_len, to_read, offset, req_size, file_ref->name.c_str());
return -EINVAL;
}
to_read = to_read - chunk_len;
}
return 0;
}
93 changes: 93 additions & 0 deletions src/XrdCeph/XrdCephBulkAioRead.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#include <string>
#include <map>
#include <list>
#include <tuple>
#include <rados/librados.hpp>

#include "XrdCephPosix.hh"


class bulkAioRead {
//typedef std::tuple<ceph::bufferlist*, char*, int*> ReadOpData;
typedef void (*logfunc_pointer) (char *, ...);

/**
* Class is used to execute read operations against rados striper files *without* usage of rados striper.
* Reads are based on ceph read operations.
*
* The interface is similar to the one that ceph's read operation objects has:
* 1. Instantiate the object.
* 2. Declare read operations using 'read' method, providing the output buffers, offset and length.
* 3. Submitn operation and wait for results using 'submit_and_wait_for_complete' method.
* 4. Copy results to buffers with 'get_results' method.
*
* WARNING: there is no copy/move constructor in the class, so do not use temporary objects for initialization
* (i.e. something like `bulkAioRead rop = bulkAioRead(...);` will not work, use `bulkAioRead rop(...);` instead).
*/
public:
bulkAioRead(librados::IoCtx* ct, logfunc_pointer ptr, CephFileRef* fileref);
~bulkAioRead();

void clear();
int submit_and_wait_for_complete();
ssize_t get_results();
int read(void *out_buf, size_t size, off64_t offset);

private:
//Completion pointer
class CmplPtr {
librados::AioCompletion *ptr;
bool used = false;
public:
CmplPtr() {
ptr = librados::Rados::aio_create_completion();
if (NULL == ptr) {
throw std::bad_alloc();
}
}
~CmplPtr() {
if (used) {
this->wait_for_complete();
}
ptr->release();
}
void wait_for_complete() {
ptr->wait_for_complete();
}
int get_return_value() {
return ptr->get_return_value();
}
librados::AioCompletion* use() {
//If the object was converted to AioCompletion, we suppose it was passed to
//the read operation, and therefore set the flag.
used = true;
return ptr;
}
};

//Ceph read operation + completion
struct CephOpData {
librados::ObjectReadOperation ceph_read_op;
CmplPtr cmpl;
};

//Data for an individual read -- ceph's buffer, client's buffer and return code
struct ReadOpData {
ceph::bufferlist bl;
char* out_buf;
int rc;
ReadOpData(char* output_buf): out_buf(output_buf), rc(-1) {};
};



int addRequest(size_t obj_idx, char *out_buf, size_t size, off64_t offset);
librados::IoCtx* context;
std::list<ReadOpData> buffers;

//map { <object_number> : <CephOpData> }
std::map<size_t, CephOpData> operations;

logfunc_pointer log_func;
CephFileRef* file_ref;
};
Loading

0 comments on commit f38f10d

Please sign in to comment.