Skip to content

Commit

Permalink
Master buffered ceph io (#18)
Browse files Browse the repository at this point in the history
* Buffer implementation for XrdCeph

* Better error return code values

* Add timing into BufferIO

* Add timing into BufferSimple

* Utils code area

* Update raw data access and copy

* Adding Extents

* ReadV simple logic

* Add to own files the readV implementations

* Add to own files the readV implementations; cmake updated

* Logging improvements and write buffer updates

* Add IOadapter with blocking aio access

* Use IOadapter with blocking aio access

* Small logging update

* Reduce logging information; fix timeing to ms

* Reduce logging information;

* Reduced logging, and better use of aggregated metrics

* comment clean and typo fixes

* Remove uncessary file close

* Additional logging in case of problems

* Additional logging in case of problems

* allow option for buffering with IO or AIO buffer

Co-authored-by: james <[email protected]>
Co-authored-by: root <[email protected]>

merge variable rpm name into bufferedIO (#19)

* variable rpm name

* Update xrootd-ceph.spec.in

* Update makesrpm.sh

* Update makesrpm.sh

Fixes to remove warnings from devtoolset-9 compilation

Master buffered ceph io (#20)

* Buffer implementation for XrdCeph

* Better error return code values

* Add timing into BufferIO

* Add timing into BufferSimple

* Utils code area

* Update raw data access and copy

* Adding Extents

* ReadV simple logic

* Add to own files the readV implementations

* Add to own files the readV implementations; cmake updated

* Logging improvements and write buffer updates

* Add IOadapter with blocking aio access

* Use IOadapter with blocking aio access

* Small logging update

* Reduce logging information; fix timeing to ms

* Reduce logging information;

* Reduced logging, and better use of aggregated metrics

* comment clean and typo fixes

* Remove uncessary file close

* Additional logging in case of problems

* Additional logging in case of problems

* allow option for buffering with IO or AIO buffer

* fix conflicts

* Allow for finite retries on EBUSY, else fail with EIO.

It is possible for a read/write from the buffer to return EBUSY due to an underlying issue.
In these cases, if the -EBUSY is returned out of XrdCeph, a large number of retries can originate.
It is better at this point for the transfer to be flagged as failed, and retried properly.
The code allows for 5 retries with a 1s sleep between them. If this doesn't work - which it might not -
then an -EIO error is returned to xrootd.
Other error messages are not affected.

* Better summary stats output for CephIOAdapterRaw

* Comment out a comment

Co-authored-by: james <[email protected]>
Co-authored-by: root <[email protected]>

variable version/release for template (#21)

Update bufferedIO with updates from master (#26)

* variable rpm name (#17)

* variable rpm name

* Update xrootd-ceph.spec.in

* Update makesrpm.sh

* Update makesrpm.sh

* Master cephnamelib (#16)

* Allow ceph.namelib to take params and apply translation to full path

* Reduce logging

Remove extraneous logging messages

* simplify parsing of namelib and added a log line for any remapped file

Co-authored-by: James <[email protected]>

* XRD-22 Fix ensuring the correct filename is passed to the CephFile instance. (#24)

A regression in previous commit meant that the filename was not correctly passed
to the CephFile instance. This fix ensures that the filename is set correctly.

Co-authored-by: james <[email protected]>

* re-introduce variable names to spec input (xrootd#27)

Co-authored-by: Jo-stfc <[email protected]>
Co-authored-by: James <[email protected]>

Decreased logging for bufferedIO operations. (#25)

Reduced printouts. Only summary stats now produced, rather than the logging per read.

Co-authored-by: James Walder <[email protected]>

Updates from master to buffered io needed for 550 2 (xrootd#32)

* XRD-12 Add timestamp information for ceph logging methods

Update the logwrapper method to print out the current timestamp in the initial section of output.

* Return permission denied on write attempt on existing file with EXCL set (xrootd#31)

Co-authored-by: James Walder <[email protected]>

* disable posc (xrootd#30)

posc is disabled for proxies, but not for a unified setup. XrdCeph does not support the posc flag as it misinterprets objects as folders

Co-authored-by: James Walder <[email protected]>
Co-authored-by: Jo-stfc <[email protected]>

Buffered io multibuffers (xrootd#38)

* Add multiple buffer support for reads in case of simultaneous threads reading the same file.

* Further refinements to the simultaneous file reads code

 - Ensure all relevent read / write methods will create a buffer if needed
 - Validty check on close that a buffer was actually created (or bypass code if not)
 - Bugfix in case of odd read sizes combined with multi/split buffer reads (critical)
 - Clean of comments included for development

* Enhanced logging for cluster metrics and readV layer improvments (xrootd#35)

- dumpCLusterInfo to check on the rados connection info
  - extra logging in a delete to give info on delete times
  - update the readV basic alg to do a simple bulk request

Co-authored-by: James Walder <[email protected]>

* Add time taken to unlink a file in the logging message

  - Logging an unlink now includes the time taken, in cases of (un)successful deletes
  - Remove some extraneous comments

* - Fix issue with buffer passthrough read
 - Add maximum number of simultaneous buffers for a given file
Once a given number of opens have been made against the same file, don't
create a large buffer, and only create a 1MiB buffer for each new file.
This should avoid issues with small paged reads, but would normally hope the
pasthrough mode would be triggered in each read.

* Additional statistics on buffered reading added.

 - Will report bytes read from ceph, bytes read but bypassed the cache, and the cache hit fraction

---------

Co-authored-by: James Walder <[email protected]>

Bug fix for writes with bufferedIO when extending over buffer range.  (xrootd#40)

* Bug fix for writes with bufferedIO when extending over buffer range.
 - Fix for case where multiple writes to the buffer are needed for a given xrd write request
 - Previously threw an error; now will correctly perform the multiple writes as required.
 - Set the Simple Data buffer capacity to the input size, rather than the capacity of the vector, which could be larger.

---------

Co-authored-by: James Walder <[email protected]>

variable rpm name (#17)

* variable rpm name

* Update xrootd-ceph.spec.in

* Update makesrpm.sh

* Update makesrpm.sh

re-introduce variable names to spec input (xrootd#27)
  • Loading branch information
snafus authored and amadio committed Feb 19, 2025
1 parent 9a4c261 commit 2464412
Show file tree
Hide file tree
Showing 28 changed files with 2,771 additions and 18 deletions.
1 change: 1 addition & 0 deletions packaging/makesrpm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ function printHelp()
SOURCEPATH=$(realpath $(dirname $0)/..)
OUTPUTPATH="."
PRINTHELP=0
RPM_NAME="xrootd-ceph"

while test ${#} -ne 0; do
if test x${1} = x--help; then
Expand Down
169 changes: 169 additions & 0 deletions src/XrdCeph/XrdCephBuffers/BufferUtils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@

#include "BufferUtils.hh"
#include <algorithm> // std::max

using namespace XrdCephBuffer;

#ifdef CEPHBUFDEBUG
// to synchronise logging statements
std::mutex cephbuf_iolock;
#endif

// ------------------------------------------------------ //
// Extent //

bool Extent::in_extent(off_t pos) const
{
return ((pos > begin()) && (pos < end()));
}

bool Extent::isContiguous(const Extent &rhs) const
{
// does the rhs connect directly to the end of the first
if (end() != rhs.begin())
return false;
return true;
}

bool Extent::allInExtent(off_t pos, size_t len) const
{
// is all the range in this extent
if ((pos < begin()) || (pos >= end()))
return false;

if (off_t(pos + len) > end())
return false;
return true;
}
bool Extent::someInExtent(off_t pos, size_t len) const
{ // is some of the range in this extent
if ((off_t(pos + len) < begin()) || (pos >= end()))
return false;
return true;
}

Extent Extent::containedExtent(off_t pos, size_t len) const
{
// return the subset of input range that is in this extent
off_t subbeg = std::max(begin(), pos);
off_t subend = std::min(end(), off_t(pos + len));

return Extent(subbeg, subend - subbeg);
}
Extent Extent::containedExtent(const Extent &rhs) const
{
return containedExtent(rhs.begin(), rhs.len());
}

bool Extent::operator<(const Extent &rhs) const
{
// comparison primarily on begin values
// use end values if begin values are equal.

if (begin() > rhs.begin()) return false;
if (begin() < rhs.begin()) return true;
if (end() < rhs.end() ) return true;
return false;
}
bool Extent::operator==(const Extent &rhs) const
{
// equivalence based only on start and end
if (begin() != rhs.begin())
return false;
if (end() != rhs.end())
return false;
return true;
}

// ------------------------------------------------------ //
// ExtentHolder //

ExtentHolder::ExtentHolder() {}

ExtentHolder::ExtentHolder(size_t elements)
{
m_extents.reserve(elements);
}

ExtentHolder::ExtentHolder(const ExtentContainer &extents)
{
m_extents.reserve(extents.size());
for (ExtentContainer::const_iterator vit = m_extents.cbegin(); vit != m_extents.cend(); ++vit) {
push_back(*vit);
}

}
ExtentHolder::~ExtentHolder()
{
m_extents.clear();
}

void ExtentHolder::push_back(const Extent & in) {
if (size()) {
m_begin = std::min(m_begin, in.begin());
m_end = std::max(m_end, in.end());
} else {
m_begin = in.begin();
m_end = in.end();
}
return m_extents.push_back(in);
}



Extent ExtentHolder::asExtent() const {
// if (!size()) return Extent(0,0);
// ExtentContainer se = getSortedExtents();
// off_t b = se.front().begin();
// off_t e = se.back().end();

return Extent(m_begin, m_end-m_begin);

}

size_t ExtentHolder::bytesContained() const {
size_t nbytes{0};
for (ExtentContainer::const_iterator vit = m_extents.cbegin(); vit != m_extents.cend(); ++vit) {
nbytes += vit->len();
}
return nbytes;
}

size_t ExtentHolder::bytesMissing() const {
size_t bytesUsed = bytesContained();
size_t totalRange = asExtent().len(); //might be expensive to call
return totalRange - bytesUsed;
}


void ExtentHolder::sort() {
std::sort(m_extents.begin(), m_extents.end());
}


ExtentContainer ExtentHolder::getSortedExtents() const {
ExtentContainer v;
v.assign(m_extents.begin(), m_extents.end() );
std::sort(v.begin(), v.end());
return v;
}

ExtentContainer ExtentHolder::getExtents() const {
ExtentContainer v;
v.assign(m_extents.begin(), m_extents.end() );
return v;
}

// ------------------------------------------------------ //
// Timer ns //

Timer_ns::Timer_ns(long &output) : m_output_val(output)
{
m_start = std::chrono::steady_clock::now();
}

Timer_ns::~Timer_ns()
{
auto end = std::chrono::steady_clock::now();
m_output_val = std::chrono::duration_cast<std::chrono::nanoseconds>(end - m_start).count();
}
152 changes: 152 additions & 0 deletions src/XrdCeph/XrdCephBuffers/BufferUtils.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#ifndef __CEPH_BUFFER_UTILS_HH__
#define __CEPH_BUFFER_UTILS_HH__

// holder of various small utility classes for debugging, profiling, logging, and general stuff

#include <list>
#include <vector>
#include <atomic>
#include <chrono>
#include <sys/types.h>
#include <memory>
#include <mutex>
#include <sstream>
#include <iomanip>
#include <ctime>


// basic logging
// #TODO; merge this into the xrootd logging, when xrootd is available
#define CEPHBUFDEBUG 1
#ifdef CEPHBUFDEBUG
extern std::mutex cephbuf_iolock;
#define BUFLOG(x) {std::unique_lock<std::mutex>cephbuf_iolock; std::stringstream _bs; _bs << x; std::clog << _bs.str() << std::endl;}
#else
#define BUFLOG(x)
#endif

namespace XrdCephBuffer
{


class Timer_ns
{
/**
* @brief RAII based timer information outputing a long value of ns
* Almost trivial class to time something and to pass the duration as a long
* to an output variable (specified in the constructor) at destruction.
* Create the object to start the timer. The timer stops when its destructor is called.
* #TODO improve to template the output type and the time ratio
*/
public:
explicit Timer_ns(long &output_ns);
~Timer_ns();

private:
std::chrono::steady_clock::time_point m_start;
long &m_output_val; //!< reference to the external variable to store the output.

}; //Timer_ns



class Extent
{
/**
* @brief Ecapsulates an offsets and length, with added functionaliyu
* Class that represents an offset possition and a length.
* Simplest usecase is to avoid passing two values around, however this class
* provides additional funcationality for manipulation of extends (e.g. merging, splitting)
* which may prove useful.
*/

public:
Extent(off_t offset, size_t len) : m_offset(offset), m_len(len){}
inline off_t offset() const { return m_offset; }
inline size_t len() const { return m_len; }
inline off_t begin() const { return m_offset; } //!< Same as offset, but a bit more stl container like
inline off_t end() const { return m_offset + m_len; } //!< similar to stl vector end.
inline bool empty() const {return m_len == 0;}

/**
* Does the start of the rhs continue directly from the
* end of this Extent
*/
bool isContiguous(const Extent& rhs) const;

inline off_t last_pos() const { return m_offset + m_len - 1; } //!< last real position

bool in_extent(off_t pos) const; //!< is this position within the range of this extent
bool allInExtent(off_t pos, size_t len) const; //!< is all the range in this extent
bool someInExtent(off_t pos, size_t len) const; //!< is some of the range in this extent

Extent containedExtent(off_t pos, size_t len) const; //!< return the subset of range that is in this extent
Extent containedExtent(const Extent &in) const; //!<

bool operator<(const Extent &rhs) const;
bool operator==(const Extent &rhs) const;


private:
off_t m_offset;
size_t m_len;
};

/**
* @brief Container defintion for Extents
* Typedef to provide a container of extents as a simple stl vector container
*/
typedef std::vector<Extent> ExtentContainer;

/**
* @brief Designed to hold individual extents, but itself provide Extent-like capabilities
* Useful in cases of combining extends, or needing to hold a range of extends and extract
* information about (or aggregated from) the contained objects.
* Could be useful to inherit from Extent if improvements needed.
*
*
*/
class ExtentHolder {
// holder of a list of extent objects
public:
ExtentHolder();
explicit ExtentHolder(size_t elements); //!< reserve memory only
explicit ExtentHolder(const ExtentContainer& extents);
~ExtentHolder();

off_t begin() const {return m_begin;}
off_t end() const {return m_end;}
size_t len() const {return m_end - m_begin;} //! Total range in bytes of the extents

bool empty() const {return m_extents.empty();}
size_t size() const {return m_extents.size();} //!< number of extent elements

Extent asExtent() const; // return an extent covering the whole range


size_t bytesContained() const; // number of bytes across the extent not considering overlaps!
size_t bytesMissing() const; // number of bytes missing across the extent, not considering overlaps!

void push_back(const Extent & in);
void sort(); //!< inplace sort by offset of contained extents

const ExtentContainer & extents() const {return m_extents;}
//ExtentContainer & extents() {return m_extents;}

ExtentContainer getSortedExtents() const;
ExtentContainer getExtents() const;



protected:
ExtentContainer m_extents;

off_t m_begin{0}; //lowest offset value
off_t m_end{0}; // one past end of last byte used.

};


}

#endif
Loading

0 comments on commit 2464412

Please sign in to comment.