Skip to content

Commit

Permalink
refactor for comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
LindaSummer committed Feb 3, 2025
1 parent c23f106 commit f0f2f3d
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 77 deletions.
10 changes: 5 additions & 5 deletions src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ rocksdb::Status HyperLogLogMetadata::Decode(Slice *input) {

void TDigestMetadata::Encode(std::string *dst) const {
Metadata::Encode(dst);
PutFixed64(dst, compression);
PutFixed64(dst, capacity);
PutFixed32(dst, compression);
PutFixed32(dst, capacity);
PutFixed64(dst, unmerged_nodes);
PutFixed64(dst, merged_nodes);
PutFixed64(dst, total_weight);
Expand All @@ -518,12 +518,12 @@ rocksdb::Status TDigestMetadata::Decode(Slice *input) {
return s;
}

if (input->size() < (sizeof(uint64_t) * 8 + sizeof(double) * 2)) {
if (input->size() < (sizeof(uint32_t) * 2 + sizeof(uint64_t) * 6 + sizeof(double) * 2)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

GetFixed64(input, &compression);
GetFixed64(input, &capacity);
GetFixed32(input, &compression);
GetFixed32(input, &capacity);
GetFixed64(input, &unmerged_nodes);
GetFixed64(input, &merged_nodes);
GetFixed64(input, &total_weight);
Expand Down
10 changes: 5 additions & 5 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,18 +341,18 @@ class HyperLogLogMetadata : public Metadata {

class TDigestMetadata : public Metadata {
public:
uint64_t compression;
uint64_t capacity;
uint32_t compression;
uint32_t capacity;
uint64_t unmerged_nodes = 0;
uint64_t merged_nodes = 0;
uint64_t total_weight = 0;
uint64_t merged_weight = 0;
double minimum = std::numeric_limits<double>::infinity();
double maximum = -1 * std::numeric_limits<double>::infinity();
double minimum = std::numeric_limits<double>::max();
double maximum = std::numeric_limits<double>::lowest();
uint64_t total_observations = 0;
uint64_t merge_times = 0;

explicit TDigestMetadata(uint64_t compression, uint64_t capacity, bool generate_version = true)
explicit TDigestMetadata(uint32_t compression, uint32_t capacity, bool generate_version = true)
: Metadata(kRedisTDigest, generate_version), compression(compression), capacity(capacity) {}
explicit TDigestMetadata(bool generate_version = true) : TDigestMetadata(0, 0, generate_version) {}
void Encode(std::string *dst) const override;
Expand Down
31 changes: 20 additions & 11 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ class DummyCentroids {
};

std::unique_ptr<Iterator> Begin() { return std::make_unique<Iterator>(centroids_.cbegin(), centroids_); }
std::unique_ptr<Iterator> End() { return std::make_unique<Iterator>(centroids_.cend(), centroids_); }
std::unique_ptr<Iterator> End() {
if (centroids_.empty()) {
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
}
return std::make_unique<Iterator>(std::prev(centroids_.cend()), centroids_);
}
double TotalWeight() const { return static_cast<double>(meta_data_.total_weight); }
double Min() const { return meta_data_.minimum; }
double Max() const { return meta_data_.maximum; }
Expand All @@ -101,10 +106,15 @@ class DummyCentroids {
const std::vector<Centroid>& centroids_;
};

uint64_t constexpr kMaxElements = 1 * 1024; // 1k doubles
uint32_t constexpr kMaxElements = 1 * 1024; // 1k doubles
uint32_t constexpr kMaxCompression = 1000; // limit the compression to 1k

std::optional<rocksdb::Status> TDigest::Create(engine::Context& ctx, const Slice& digest_name,
const TDigestCreateOptions& options) {
if (options.compression > kMaxCompression) {
return rocksdb::Status::InvalidArgument(fmt::format("compression should be less than {}", kMaxCompression));
}

auto ns_key = AppendNamespacePrefix(digest_name);
auto capacity = options.compression * 6 + 10;
capacity = ((capacity < kMaxElements) ? capacity : kMaxElements);
Expand Down Expand Up @@ -207,14 +217,14 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name
}

std::vector<Centroid> centroids;
if (auto status = dumpCentroidsAndBuffer(ctx, ns_key, metadata, &centroids); !status.ok()) {
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}

auto dump_centroids = DummyCentroids(metadata, centroids);

for (auto q : qs) {
auto status_or_value = TDigestQuantile(dump_centroids, q, Lerp);
auto status_or_value = TDigestQuantile(dump_centroids, q);
if (!status_or_value) {
return rocksdb::Status::InvalidArgument(status_or_value.Msg());
}
Expand Down Expand Up @@ -345,8 +355,8 @@ rocksdb::Status TDigest::dumpCentroidsAndBuffer(engine::Context& ctx, const std:
for (uint64_t i = 0; i < metadata.unmerged_nodes; ++i) {
double tmp_value = std::numeric_limits<double>::quiet_NaN();
if (!GetDouble(&buffer_slice, &tmp_value)) {
return rocksdb::Status::Corruption(
fmt::format("metadata has {} records, but get {} failed", metadata.unmerged_nodes, i));
LOG(ERROR) << "metadata has " << metadata.unmerged_nodes << " records, but get " << i << " failed";
return rocksdb::Status::Corruption("corrupted tdigest buffer value");
}
buffer->emplace_back(tmp_value);
}
Expand Down Expand Up @@ -378,11 +388,10 @@ rocksdb::Status TDigest::dumpCentroidsAndBuffer(engine::Context& ctx, const std:
return status;
}
centroids->emplace_back(centroid);
}

if (clean_after_dump_batch != nullptr) {
if (auto status = (*clean_after_dump_batch)->DeleteRange(cf_handle_, start_key, guard_key); !status.ok()) {
return status;
if (clean_after_dump_batch != nullptr) {
if (auto status = (*clean_after_dump_batch)->Delete(cf_handle_, iter->key()); !status.ok()) {
return status;
}
}
}

Expand Down
34 changes: 22 additions & 12 deletions src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,14 @@ struct CentroidWithKey {
};

struct TDigestCreateOptions {
uint64_t compression;
uint32_t compression;
};

struct TDigestMergeOptions {};

struct TDigestCDFResult {};

struct TDigestQuantitleResult {
std::vector<double> quantiles;
};

class RedisTDigestTest;
class TDigest : public SubKeyScanner {
friend class RedisTDigestTest;

public:
using Slice = rocksdb::Slice;
explicit TDigest(engine::Storage* storage, const std::string& ns)
Expand All @@ -73,10 +66,27 @@ class TDigest : public SubKeyScanner {
rocksdb::Status appendBuffer(engine::Context& ctx, ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
const std::string& ns_key, const std::vector<double>& inputs, TDigestMetadata* metadata);

rocksdb::Status dumpCentroidsAndBuffer(
engine::Context& ctx, const std::string& ns_key, const TDigestMetadata& metadata,
std::vector<Centroid>* centroids, std::vector<double>* buffer = nullptr,
ObserverOrUniquePtr<rocksdb::WriteBatchBase>* clean_after_dump_batch = nullptr);
rocksdb::Status dumpCentroids(engine::Context& ctx, const std::string& ns_key, const TDigestMetadata& metadata,
std::vector<Centroid>* centroids) {
return dumpCentroidsAndBuffer(ctx, ns_key, metadata, centroids, nullptr, nullptr);
}

/**
* @brief Dumps the centroids and buffer of the t-digest.
*
* This function reads the centroids and buffer from persistent storage and removes them from the storage.
* @param ctx The context of the operation.
* @param ns_key The namespace key of the t-digest.
* @param metadata The metadata of the t-digest.
* @param centroids The output vector to store the centroids.
* @param buffer The output vector to store the buffer. If it is nullptr, the buffer will not be read.
* @param clean_after_dump_batch The write batch to store the clean operations. If it is nullptr, the clean operations
* @return rocksdb::Status
*/
rocksdb::Status dumpCentroidsAndBuffer(engine::Context& ctx, const std::string& ns_key,
const TDigestMetadata& metadata, std::vector<Centroid>* centroids,
std::vector<double>* buffer,
ObserverOrUniquePtr<rocksdb::WriteBatchBase>* clean_after_dump_batch);
rocksdb::Status applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, const std::string& ns_key,
const TDigestMetadata& metadata, const std::vector<Centroid>& centroids);

Expand Down
68 changes: 33 additions & 35 deletions src/types/tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ refer to https://github.com/apache/arrow/blob/27bbd593625122a4a25d9471c8aaf5df54

#include <algorithm>
#include <iterator>
#include <memory>
#include <queue>

#include "status.h"
#include "common/status.h"

namespace {
// scale function K0: linear function, as baseline
Expand All @@ -57,26 +56,6 @@ struct ScalerK1 {
};
} // namespace

class TDigest {
public:
explicit TDigest(uint64_t delta);

TDigest(const TDigest&) = delete;
TDigest& operator=(const TDigest&) = delete;
TDigest(TDigest&& rhs) = default;
~TDigest() = default;

void Merge(const std::vector<TDigest>& others);
void Add(const std::vector<double>& items);
void Reset(const CentroidsWithDelta& centroid_list);
void Reset();
CentroidsWithDelta DumpCentroids() const;

private:
class TDigestImpl;
std::unique_ptr<TDigestImpl> impl_;
};

template <typename T = ScalerK1>
class TDigestMerger : private T {
public:
Expand Down Expand Up @@ -135,7 +114,7 @@ class TDigestMerger : private T {
std::vector<Centroid>* tdigest_;
};

class TDigest::TDigestImpl {
class TDigestImpl {
public:
using Status = rocksdb::Status;
explicit TDigestImpl(uint32_t delta) : delta_(delta > 10 ? delta : 10), merger_(delta_) {
Expand Down Expand Up @@ -372,7 +351,27 @@ class TDigest::TDigestImpl {
int current_;
};

TDigest::TDigest(uint64_t delta) : impl_(std::make_unique<TDigestImpl>(delta)) { Reset({}); }
class TDigest {
public:
explicit TDigest(uint64_t delta);

TDigest(const TDigest&) = delete;
TDigest& operator=(const TDigest&) = delete;
TDigest(TDigest&& rhs) = default;
~TDigest() = default;

void Merge(const std::vector<TDigest>& others);
void Add(const std::vector<double>& items);
void Reset(const CentroidsWithDelta& centroid_list);
void Reset();
CentroidsWithDelta DumpCentroids() const;

private:
// class TDigestImpl;
TDigestImpl impl_;
};

TDigest::TDigest(uint64_t delta) : impl_(TDigestImpl(delta)) { Reset({}); }

void TDigest::Merge(const std::vector<TDigest>& others) {
if (others.empty()) {
Expand All @@ -382,30 +381,29 @@ void TDigest::Merge(const std::vector<TDigest>& others) {
std::vector<const TDigestImpl*> impls;
impls.reserve(others.size());

std::transform(others.cbegin(), others.cend(), std::back_inserter(impls),
[](const TDigest& i) { return i.impl_.get(); });
std::transform(others.cbegin(), others.cend(), std::back_inserter(impls), [](const TDigest& i) { return &i.impl_; });

impl_->Merge(impls);
impl_.Merge(impls);
}

void TDigest::Reset(const CentroidsWithDelta& centroids_list) {
impl_->Reset(centroids_list.centroids, centroids_list.min, centroids_list.max, centroids_list.total_weight);
impl_.Reset(centroids_list.centroids, centroids_list.min, centroids_list.max, centroids_list.total_weight);
}

void TDigest::Reset() { impl_->Reset(); }
void TDigest::Reset() { impl_.Reset(); }

CentroidsWithDelta TDigest::DumpCentroids() const {
auto centroids = impl_->Centroids();
auto centroids = impl_.Centroids();
return {
.centroids = std::move(centroids),
.delta = impl_->Delta(),
.min = impl_->Min(),
.max = impl_->Max(),
.total_weight = impl_->TotalWeight(),
.delta = impl_.Delta(),
.min = impl_.Min(),
.max = impl_.Max(),
.total_weight = impl_.TotalWeight(),
};
}

void TDigest::Add(const std::vector<double>& items) { impl_->MergeInput(items); }
void TDigest::Add(const std::vector<double>& items) { impl_.MergeInput(items); }

StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<CentroidsWithDelta>& centroids_list) {
if (centroids_list.empty()) {
Expand Down
21 changes: 12 additions & 9 deletions src/types/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

#include <vector>

#include "status.h"
#include "common/status.h"

struct Centroid {
double mean;
Expand All @@ -38,8 +38,6 @@ struct Centroid {

std::string ToString() const { return fmt::format("centroid<mean: {}, weight: {}>", mean, weight); }

Centroid(const Centroid& centroid) = default;

explicit Centroid() = default;
explicit Centroid(double mean, double weight) : mean(mean), weight(weight) {}
};
Expand Down Expand Up @@ -81,10 +79,10 @@ class TDSample {
// https://github.com/apache/arrow/blob/27bbd593625122a4a25d9471c8aaf5df54a6dcf9/cpp/src/arrow/util/tdigest.cc#L38
static inline double Lerp(double a, double b, double t) { return a + t * (b - a); }

template <typename TD, typename Lerp>
inline StatusOr<double> TDigestQuantile(TD&& td, double q, Lerp lerp) {
template <typename TD>
inline StatusOr<double> TDigestQuantile(TD&& td, double q) {
if (q < 0 || q > 1 || td.Size() == 0) {
return NAN;
return Status{Status::InvalidArgument, "invalid quantile or empty tdigest"};
}

const double index = q * td.TotalWeight();
Expand All @@ -104,6 +102,11 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q, Lerp lerp) {
}
}

// since index is in (1, total_weight - 1), iter should be valid
if (!iter->Valid()) {
return Status{Status::InvalidArgument, "invalid iterator during decoding tdigest centroid"};
}

auto centroid = GET_OR_RET(iter->GetCentroid());

// deviation of index from the centroid center
Expand All @@ -122,15 +125,15 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q, Lerp lerp) {
// index larger than center of last bin
auto c = GET_OR_RET(ci_left->GetCentroid());
DCHECK_GE(c.weight, 2);
return lerp(c.mean, td.Max(), diff / (c.weight / 2));
return Lerp(c.mean, td.Max(), diff / (c.weight / 2));
}
ci_right->Next();
} else {
if (ci_left == td.Begin()) {
// index smaller than center of first bin
auto c = GET_OR_RET(ci_left->GetCentroid());
DCHECK_GE(c.weight, 2);
return lerp(td.Min(), c.mean, index / (c.weight / 2));
return Lerp(td.Min(), c.mean, index / (c.weight / 2));
}
ci_left->Prev();
auto lc = GET_OR_RET(ci_left->GetCentroid());
Expand All @@ -143,5 +146,5 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q, Lerp lerp) {

// interpolate from adjacent centroids
diff /= (lc.weight / 2 + rc.weight / 2);
return lerp(lc.mean, rc.mean, diff);
return Lerp(lc.mean, rc.mean, diff);
}

0 comments on commit f0f2f3d

Please sign in to comment.