Skip to content

Commit

Permalink
feat:new big key count (#3009)
Browse files Browse the repository at this point in the history
Co-authored-by: chejinge <[email protected]>
  • Loading branch information
chejinge and brother-jin authored Feb 11, 2025
1 parent f0bea9a commit edacc5a
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,7 @@ void InfoCmd::InfoData(std::string& info) {
uint64_t total_table_reader_usage = 0;
uint64_t memtable_usage = 0;
uint64_t table_reader_usage = 0;
uint64_t total_big_key_count = 0;
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->dbs_) {
if (!db_item.second) {
Expand All @@ -1417,6 +1418,9 @@ void InfoCmd::InfoData(std::string& info) {
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);

uint64_t big_key_count = db_item.second->storage()->GetBigKeyStatistics(db_item.first, "bigkey_property");
total_big_key_count += big_key_count;
db_item.second->DBUnlockShared();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
Expand All @@ -1436,6 +1440,7 @@ void InfoCmd::InfoData(std::string& info) {
tmp_stream << "db_tablereader_usage:" << total_table_reader_usage << "\r\n";
tmp_stream << "db_fatal:" << (total_background_errors != 0 ? "1" : "0") << "\r\n";
tmp_stream << "db_fatal_msg:" << (total_background_errors != 0 ? db_fatal_msg_stream.str() : "nullptr") << "\r\n";
tmp_stream << "big_key_count:" << total_big_key_count << "\r\n";

info.append(tmp_stream.str());
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ class Storage {
Status EnableAutoCompaction(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string& info);
uint64_t GetBigKeyStatistics(const std::string& db_type, const std::string& property);

private:
std::unique_ptr<RedisStrings> strings_db_;
Expand Down
18 changes: 18 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>
#include <string>
#include <vector>
#include <iostream>

#include "rocksdb/db.h"
#include "rocksdb/slice.h"
Expand Down Expand Up @@ -115,6 +116,21 @@ class Redis {
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
std::vector<rocksdb::ColumnFamilyHandle*> GetHandles(){ return handles_;};
void GetRocksDBInfo(std::string &info, const char *prefix);
void CheckBigKeyAndLog(const std::string& key, uint64_t size) {
static const uint64_t kBigKeyThreshold = 10000;
if (size > kBigKeyThreshold) {
std::lock_guard<std::mutex> lock(big_key_access_mutex_);
big_key_access_count_[key]++;
std::cerr << "[BIGKEY DETECTED] Key: " << key
<< ", Size: " << size
<< ", Access Count: " << big_key_access_count_[key] << std::endl;
}
}

std::unordered_map<std::string, int> GetBigKeyStatistics() {
std::lock_guard<std::mutex> lock(big_key_access_mutex_);
return big_key_access_count_;
}

protected:
Storage* const storage_;
Expand All @@ -141,6 +157,8 @@ class Redis {
Status UpdateSpecificKeyStatistics(const std::string& key, uint64_t count);
Status UpdateSpecificKeyDuration(const std::string& key, uint64_t duration);
Status AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration);
std::unordered_map<std::string, int> big_key_access_count_;
std::mutex big_key_access_mutex_;
};

} // namespace storage
Expand Down
22 changes: 22 additions & 0 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ Status RedisHashes::HDel(const Slice& key, const std::vector<std::string>& field
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
*ret = 0;
return Status::OK();
Expand Down Expand Up @@ -266,6 +267,7 @@ Status RedisHashes::HGet(const Slice& key, const Slice& field, std::string* valu
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -290,6 +292,7 @@ Status RedisHashes::HGetall(const Slice& key, std::vector<FieldValue>* fvs) {
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand Down Expand Up @@ -321,6 +324,7 @@ Status RedisHashes::HGetallWithTTL(const Slice& key, std::vector<FieldValue>* fv
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.count() == 0) {
return Status::NotFound();
} else if (parsed_hashes_meta_value.IsStale()) {
Expand Down Expand Up @@ -368,6 +372,7 @@ Status RedisHashes::HIncrby(const Slice& key, const Slice& field, int64_t value,
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.UpdateVersion();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -443,6 +448,7 @@ Status RedisHashes::HIncrbyfloat(const Slice& key, const Slice& field, const Sli
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.UpdateVersion();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -509,6 +515,7 @@ Status RedisHashes::HKeys(const Slice& key, std::vector<std::string>* fields) {
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -535,6 +542,7 @@ Status RedisHashes::HLen(const Slice& key, int32_t* ret) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
*ret = 0;
return Status::NotFound("Stale");
Expand Down Expand Up @@ -563,6 +571,7 @@ Status RedisHashes::HMGet(const Slice& key, const std::vector<std::string>& fiel
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if ((is_stale = parsed_hashes_meta_value.IsStale()) || parsed_hashes_meta_value.count() == 0) {
for (size_t idx = 0; idx < fields.size(); ++idx) {
vss->push_back({std::string(), Status::NotFound()});
Expand Down Expand Up @@ -613,6 +622,7 @@ Status RedisHashes::HMSet(const Slice& key, const std::vector<FieldValue>& fvs)
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
if (!parsed_hashes_meta_value.check_set_count(static_cast<int32_t>(filtered_fvs.size()))) {
Expand Down Expand Up @@ -673,6 +683,7 @@ Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& valu
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -731,6 +742,7 @@ Status RedisHashes::HSetnx(const Slice& key, const Slice& field, const Slice& va
char meta_value_buf[4] = {0};
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
parsed_hashes_meta_value.set_count(1);
Expand Down Expand Up @@ -782,6 +794,7 @@ Status RedisHashes::HVals(const Slice& key, std::vector<std::string>* values) {
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand Down Expand Up @@ -832,6 +845,7 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
*next_cursor = 0;
return Status::NotFound();
Expand Down Expand Up @@ -896,6 +910,7 @@ Status RedisHashes::HScanx(const Slice& key, const std::string& start_field, con
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
*next_field = "";
return Status::NotFound();
Expand Down Expand Up @@ -954,6 +969,7 @@ Status RedisHashes::PKHScanRange(const Slice& key, const Slice& field_start, con
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
return Status::NotFound();
} else {
Expand Down Expand Up @@ -1013,6 +1029,7 @@ Status RedisHashes::PKHRScanRange(const Slice& key, const Slice& field_start, co
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.count() == 0) {
return Status::NotFound();
} else {
Expand Down Expand Up @@ -1163,6 +1180,7 @@ Status RedisHashes::Expire(const Slice& key, int32_t ttl) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -1186,6 +1204,7 @@ Status RedisHashes::Del(const Slice& key) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand Down Expand Up @@ -1281,6 +1300,7 @@ Status RedisHashes::Expireat(const Slice& key, int32_t timestamp) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -1303,6 +1323,7 @@ Status RedisHashes::Persist(const Slice& key) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.count() == 0) {
Expand All @@ -1325,6 +1346,7 @@ Status RedisHashes::TTL(const Slice& key, int64_t* timestamp) {
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.count());
if (parsed_hashes_meta_value.IsStale()) {
*timestamp = -2;
return Status::NotFound("Stale");
Expand Down
Loading

0 comments on commit edacc5a

Please sign in to comment.