Skip to content

feat(tdigest): Add support for QUANTILE command #2849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
dc1bb1a
added command class for quantile command
SharonIV0x86 Mar 23, 2025
a2e99dc
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Mar 24, 2025
7bc9b81
Implemented QUANTILE command
SharonIV0x86 Mar 30, 2025
a33b2cf
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Mar 30, 2025
562798d
Merge branch 'unstable' into feat/quantile-command
PragmaTwice Apr 9, 2025
6e0a7f0
Merge branch 'unstable' into feat/quantile-command
mapleFU Apr 13, 2025
64ee2e7
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 16, 2025
aa04e94
Tested working of TDIGEST.QUANTILE locally.
SharonIV0x86 Apr 16, 2025
185898f
Added go test case for quantile command
SharonIV0x86 Apr 16, 2025
359f151
Fixed return status for non existent key
SharonIV0x86 Apr 16, 2025
63c34f3
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 17, 2025
7937777
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 19, 2025
42e4667
Merge branch 'unstable' into feat/quantile-command
PragmaTwice Apr 20, 2025
d77f5d6
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 21, 2025
b1b951b
Made suggested changes
SharonIV0x86 Apr 21, 2025
6d8f169
Made suggested changes
SharonIV0x86 Apr 21, 2025
d42bea1
Made go linter happy
SharonIV0x86 Apr 21, 2025
e1a6796
Merge branch 'unstable' into feat/quantile-command
PragmaTwice Apr 21, 2025
a51b5d4
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 22, 2025
e21933d
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 23, 2025
84ad11b
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 25, 2025
a302c79
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 26, 2025
cb2e4d5
Merge branch 'unstable' into feat/quantile-command
aleksraiden Apr 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,47 @@ class CommandTDigestMax : public CommandTDigestMinMax {
public:
CommandTDigestMax() : CommandTDigestMinMax(false) {}
};
class CommandTDigestQuantile : public Commander {
Status Parse(const std::vector<std::string> &args) override {
key_name_ = args[1];
values_.reserve(args.size() - 2);
for (size_t i = 2; i < args.size(); i++) {
auto value = ParseFloat(args[i]);
if (!value) {
return {Status::RedisParseErr, errValueIsNotFloat};
}
values_.push_back(*value);
}
return Status::OK();
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
TDigest tdigest(srv->storage, conn->GetNamespace());
TDigestQuantitleResult result;
auto s = tdigest.Quantile(ctx, key_name_, values_, &result);
if (!s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
return {Status::RedisExecErr, s.ToString()};
}
std::vector<std::string> quantile_strings;
quantile_strings.reserve(result.quantiles.size());
for (const auto &q : result.quantiles) {
quantile_strings.push_back(std::to_string(q));
}
*output = conn->MultiBulkString(quantile_strings);
return Status::OK();
}

private:
std::string key_name_;
std::vector<double> values_;
};
REDIS_REGISTER_COMMANDS(TDigest, MakeCmdAttr<CommandTDigestCreate>("tdigest.create", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1));
} // namespace redis
40 changes: 40 additions & 0 deletions tests/gocase/unit/type/tdigest/tdigest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package tdigest

import (
"context"
"strconv"
"testing"

"github.com/apache/kvrocks/tests/gocase/util"
Expand Down Expand Up @@ -310,4 +311,43 @@ func tdigestTests(t *testing.T, configs util.KvrocksServerConfigs) {
infoAfterEmptyReset := toTdigestInfo(t, rsp.Val())
require.EqualValues(t, 100, infoAfterEmptyReset.Compression)
})
t.Run("tdigest.quantile with different arguments", func(t *testing.T) {
keyPrefix := "t_qt_"

//Testing with no arguments
require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.QUANTILE").Err(), errMsgWrongNumberArg)

// Quantile on a non existent key
require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.QUANTILE", keyPrefix+"iDoNotExist").Err(), errMsgKeyNotExist)

// Creating a key
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", keyPrefix+"01", "compression", "100").Err())

key := keyPrefix + "01"
//Adding some data to tdigest 1 2 2 3 3 3 4 4 4 4 5 5 5 5 5
require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key, "1", "2", "2", "3", "3", "3", "4", "4", "4", "4", "5", "5", "5", "5", "5").Err())

// Getting quantiles 0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1
rsp := rdb.Do(ctx, "TDIGEST.QUANTILE", key, "0", "0.1", "0.2", "0.3", "0.4", "0.5", "0.6", "0.7", "0.8", "0.9", "1")
require.NoError(t, rsp.Err())

vals, err := rsp.Slice()
require.NoError(t, err)
require.Len(t, vals, 11)

// Expected values
expected := []float64{
1.0, 2.0, 2.5, 3.0, 3.5,
4.0, 4.0, 5.0, 5.0, 5.0, 5.0,
}
for i, v := range vals {
str, ok := v.(string)
require.True(t, ok, "expected string but got %T at index %d", v, i)

got, err := strconv.ParseFloat(str, 64)
require.NoError(t, err, "could not parse value at index %d", i)

require.InEpsilon(t, expected[i], got, 0.0001, "mismatch at index %d", i)
}
})
}