Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:new big key count #3009

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,7 @@ void InfoCmd::InfoData(std::string& info) {
uint64_t total_table_reader_usage = 0;
uint64_t memtable_usage = 0;
uint64_t table_reader_usage = 0;
uint64_t total_big_key_count = 0;
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->dbs_) {
if (!db_item.second) {
Expand All @@ -1417,6 +1418,9 @@ void InfoCmd::InfoData(std::string& info) {
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);

uint64_t big_key_count = db_item.second->storage()->GetBigKeyStatistics(db_item.first, "bigkey_property");
total_big_key_count += big_key_count;
db_item.second->DBUnlockShared();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
Expand All @@ -1436,6 +1440,7 @@ void InfoCmd::InfoData(std::string& info) {
tmp_stream << "db_tablereader_usage:" << total_table_reader_usage << "\r\n";
tmp_stream << "db_fatal:" << (total_background_errors != 0 ? "1" : "0") << "\r\n";
tmp_stream << "db_fatal_msg:" << (total_background_errors != 0 ? db_fatal_msg_stream.str() : "nullptr") << "\r\n";
tmp_stream << "big_key_count:" << total_big_key_count << "\r\n";

info.append(tmp_stream.str());
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ class Storage {
Status EnableAutoCompaction(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string& info);
uint64_t GetBigKeyStatistics(const std::string& db_type, const std::string& property);

private:
std::unique_ptr<RedisStrings> strings_db_;
Expand Down
18 changes: 18 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>
#include <string>
#include <vector>
#include <iostream>

#include "rocksdb/db.h"
#include "rocksdb/slice.h"
Expand Down Expand Up @@ -115,6 +116,21 @@ class Redis {
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
std::vector<rocksdb::ColumnFamilyHandle*> GetHandles(){ return handles_;};
void GetRocksDBInfo(std::string &info, const char *prefix);
void CheckBigKeyAndLog(const std::string& key, uint64_t size) {
static const uint64_t kBigKeyThreshold = 10000;
if (size > kBigKeyThreshold) {
std::lock_guard<std::mutex> lock(big_key_access_mutex_);
big_key_access_count_[key]++;
std::cerr << "[BIGKEY DETECTED] Key: " << key
<< ", Size: " << size
<< ", Access Count: " << big_key_access_count_[key] << std::endl;
}
}

std::unordered_map<std::string, int> GetBigKeyStatistics() {
std::lock_guard<std::mutex> lock(big_key_access_mutex_);
return big_key_access_count_;
}

protected:
Storage* const storage_;
Expand All @@ -141,6 +157,8 @@ class Redis {
Status UpdateSpecificKeyStatistics(const std::string& key, uint64_t count);
Status UpdateSpecificKeyDuration(const std::string& key, uint64_t duration);
Status AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration);
std::unordered_map<std::string, int> big_key_access_count_;
std::mutex big_key_access_mutex_;
};

} // namespace storage
Expand Down
22 changes: 22 additions & 0 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ Status RedisHashes::HDel(const Slice& key, const std::vector<std::string>& field
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
*ret = 0;
return Status::OK();
Expand Down Expand Up @@ -266,6 +267,7 @@ Status RedisHashes::HGet(const Slice& key, const Slice& field, std::string* valu
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -290,6 +292,7 @@ Status RedisHashes::HGetall(const Slice& key, std::vector<FieldValue>* fvs) {
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand Down Expand Up @@ -321,6 +324,7 @@ Status RedisHashes::HGetallWithTTL(const Slice& key, std::vector<FieldValue>* fv
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.count() == 0) {
return Status::NotFound();
} else if (parsed_hashes_meta_value.IsStale()) {
Expand Down Expand Up @@ -368,6 +372,7 @@ Status RedisHashes::HIncrby(const Slice& key, const Slice& field, int64_t value,
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.UpdateVersion();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -443,6 +448,7 @@ Status RedisHashes::HIncrbyfloat(const Slice& key, const Slice& field, const Sli
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.UpdateVersion();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -509,6 +515,7 @@ Status RedisHashes::HKeys(const Slice& key, std::vector<std::string>* fields) {
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -535,6 +542,7 @@ Status RedisHashes::HLen(const Slice& key, int32_t* ret) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
*ret = 0;
return Status::NotFound("Stale");
Expand Down Expand Up @@ -563,6 +571,7 @@ Status RedisHashes::HMGet(const Slice& key, const std::vector<std::string>& fiel
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if ((is_stale = parsed_hashes_meta_value.IsStale()) || parsed_hashes_meta_value.count() == 0) {
for (size_t idx = 0; idx < fields.size(); ++idx) {
vss->push_back({std::string(), Status::NotFound()});
Expand Down Expand Up @@ -613,6 +622,7 @@ Status RedisHashes::HMSet(const Slice& key, const std::vector<FieldValue>& fvs)
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
if (!parsed_hashes_meta_value.check_set_count(static_cast<int32_t>(filtered_fvs.size()))) {
Expand Down Expand Up @@ -673,6 +683,7 @@ Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& valu
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -731,6 +742,7 @@ Status RedisHashes::HSetnx(const Slice& key, const Slice& field, const Slice& va
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -782,6 +794,7 @@ Status RedisHashes::HVals(const Slice& key, std::vector<std::string>* values) {
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand Down Expand Up @@ -832,6 +845,7 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
*next_cursor = 0;
return Status::NotFound();
Expand Down Expand Up @@ -896,6 +910,7 @@ Status RedisHashes::HScanx(const Slice& key, const std::string& start_field, con
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
*next_field = "";
return Status::NotFound();
Expand Down Expand Up @@ -954,6 +969,7 @@ Status RedisHashes::PKHScanRange(const Slice& key, const Slice& field_start, con
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
return Status::NotFound();
} else {
Expand Down Expand Up @@ -1013,6 +1029,7 @@ Status RedisHashes::PKHRScanRange(const Slice& key, const Slice& field_start, co
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
return Status::NotFound();
} else {
Expand Down Expand Up @@ -1163,6 +1180,7 @@ Status RedisHashes::Expire(const Slice& key, int32_t ttl) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -1186,6 +1204,7 @@ Status RedisHashes::Del(const Slice& key) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand Down Expand Up @@ -1281,6 +1300,7 @@ Status RedisHashes::Expireat(const Slice& key, int32_t timestamp) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -1303,6 +1323,7 @@ Status RedisHashes::Persist(const Slice& key) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -1325,6 +1346,7 @@ Status RedisHashes::TTL(const Slice& key, int64_t* timestamp) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
*timestamp = -2;
return Status::NotFound("Stale");
Expand Down
Loading
Loading