From 0132455b393d4c54a5a03893b47829ec240f77e9 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Fri, 19 Jan 2024 15:28:02 +0000 Subject: [PATCH 1/7] Reduce Keeper memory usage --- src/Coordination/KeeperSnapshotManager.cpp | 38 +++--- src/Coordination/KeeperSnapshotManager.h | 3 +- src/Coordination/KeeperStorage.cpp | 71 +++++----- src/Coordination/KeeperStorage.h | 45 +++++-- src/Coordination/SnapshotableHashTable.h | 123 ++++++++++++++---- src/Coordination/ZooKeeperDataReader.cpp | 1 - src/Coordination/tests/gtest_coordination.cpp | 16 +-- utils/keeper-data-dumper/main.cpp | 2 +- 8 files changed, 202 insertions(+), 97 deletions(-) diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index fffa6eaa9414..ee5935015e4f 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -74,7 +74,10 @@ namespace /// Serialize ACL writeBinary(node.acl_id, out); - writeBinary(node.is_sequental, out); + /// Write is_sequential for backwards compatibility + if (version < SnapshotVersion::V6) + writeBinary(false, out); + /// Serialize stat writeBinary(node.stat.czxid, out); writeBinary(node.stat.mzxid, out); @@ -84,16 +87,15 @@ namespace writeBinary(node.stat.cversion, out); writeBinary(node.stat.aversion, out); writeBinary(node.stat.ephemeralOwner, out); - writeBinary(node.stat.dataLength, out); + if (version < SnapshotVersion::V6) + writeBinary(static_cast(node.getData().size()), out); writeBinary(node.stat.numChildren, out); writeBinary(node.stat.pzxid, out); writeBinary(node.seq_num, out); - if (version >= SnapshotVersion::V4) - { - writeBinary(node.size_bytes, out); - } + if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5) + writeBinary(node.sizeInBytes(), out); } void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map) @@ -129,7 +131,11 @@ namespace acl_map.addUsage(node.acl_id); - readBinary(node.is_sequental, in); + if (version < SnapshotVersion::V6) + { + bool is_sequential = false; + readBinary(is_sequential, in); + } /// Deserialize stat readBinary(node.stat.czxid, in); @@ -140,14 +146,19 @@ namespace readBinary(node.stat.cversion, in); readBinary(node.stat.aversion, in); readBinary(node.stat.ephemeralOwner, in); - readBinary(node.stat.dataLength, in); + if (version < SnapshotVersion::V6) + { + int32_t data_length = 0; + readBinary(data_length, in); + } readBinary(node.stat.numChildren, in); readBinary(node.stat.pzxid, in); readBinary(node.seq_num, in); - if (version >= SnapshotVersion::V4) + if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5) { - readBinary(node.size_bytes, in); + uint64_t size_bytes = 0; + readBinary(size_bytes, in); } } @@ -354,7 +365,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial const auto is_node_empty = [](const auto & node) { - return node.getData().empty() && node.stat == Coordination::Stat{}; + return node.getData().empty() && node.stat == KeeperStorage::Node::Stat{}; }; for (size_t nodes_read = 0; nodes_read < snapshot_container_size; ++nodes_read) @@ -398,9 +409,6 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial "If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true", error_msg); } - - // we always ignore the written size for this node - node.recalculateSize(); } storage.container.insertOrReplace(path, node); @@ -417,7 +425,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial { auto parent_path = parentNodePath(itr.key); storage.container.updateValue( - parent_path, [version, path = itr.key](KeeperStorage::Node & value) { value.addChild(getBaseNodeName(path), /*update_size*/ version < SnapshotVersion::V4); }); + parent_path, [path = itr.key](KeeperStorage::Node & value) { value.addChild(getBaseNodeName(path)); }); } } diff --git a/src/Coordination/KeeperSnapshotManager.h b/src/Coordination/KeeperSnapshotManager.h index 9bb287b9276d..6096ba318da2 100644 --- a/src/Coordination/KeeperSnapshotManager.h +++ b/src/Coordination/KeeperSnapshotManager.h @@ -24,9 +24,10 @@ enum SnapshotVersion : uint8_t V3 = 3, /// compress snapshots with ZSTD codec V4 = 4, /// add Node size to snapshots V5 = 5, /// add ZXID and digest to snapshots + V6 = 6, /// remove is_sequential, per node size, data length }; -static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V5; +static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V6; /// What is stored in binary snapshot struct SnapshotDeserializationResult diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 41e6f5b5e2b5..c128d7c2f98f 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -24,7 +24,6 @@ #include #include -#include #include #include #include @@ -167,7 +166,7 @@ KeeperStorage::ResponsesForSessions processWatchesImpl( } // When this function is updated, update CURRENT_DIGEST_VERSION!! -uint64_t calculateDigest(std::string_view path, std::string_view data, const Coordination::Stat & stat) +uint64_t calculateDigest(std::string_view path, std::string_view data, const KeeperStorage::Node::Stat & stat) { SipHash hash; @@ -184,7 +183,7 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Coo hash.update(stat.cversion); hash.update(stat.aversion); hash.update(stat.ephemeralOwner); - hash.update(stat.dataLength); + hash.update(data.length()); hash.update(stat.numChildren); hash.update(stat.pzxid); @@ -193,36 +192,56 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Coo } +void KeeperStorage::Node::setResponseStat(Coordination::Stat & response_stat) const +{ + response_stat.czxid = stat.czxid; + response_stat.mzxid = stat.mzxid; + response_stat.ctime = stat.ctime; + response_stat.mtime = stat.mtime; + response_stat.version = stat.version; + response_stat.cversion = stat.cversion; + response_stat.aversion = stat.aversion; + response_stat.ephemeralOwner = stat.ephemeralOwner; + response_stat.dataLength = static_cast(data.size()); + response_stat.numChildren = stat.numChildren; + response_stat.pzxid = stat.pzxid; + +} + +uint64_t KeeperStorage::Node::sizeInBytes() const +{ + return sizeof(Node) + children.size() * sizeof(StringRef) + data.size(); +} + void KeeperStorage::Node::setData(String new_data) { - size_bytes = size_bytes - data.size() + new_data.size(); data = std::move(new_data); } -void KeeperStorage::Node::addChild(StringRef child_path, bool update_size) +void KeeperStorage::Node::addChild(StringRef child_path) { - if (update_size) [[likely]] - size_bytes += sizeof child_path; children.insert(child_path); } void KeeperStorage::Node::removeChild(StringRef child_path) { - size_bytes -= sizeof child_path; children.erase(child_path); } void KeeperStorage::Node::invalidateDigestCache() const { - cached_digest.reset(); + has_cached_digest = false; } UInt64 KeeperStorage::Node::getDigest(const std::string_view path) const { - if (!cached_digest) + if (!has_cached_digest) + { cached_digest = calculateDigest(path, data, stat); + has_cached_digest = true; + } - return *cached_digest; + return cached_digest; }; void KeeperStorage::Node::shallowCopy(const KeeperStorage::Node & other) @@ -233,13 +252,6 @@ void KeeperStorage::Node::shallowCopy(const KeeperStorage::Node & other) cached_digest = other.cached_digest; } -void KeeperStorage::Node::recalculateSize() -{ - size_bytes = sizeof(Node); - size_bytes += children.size() * sizeof(decltype(children)::value_type); - size_bytes += data.size(); -} - KeeperStorage::KeeperStorage( int64_t tick_time_ms, const String & superdigest_, const KeeperContextPtr & keeper_context_, const bool initialize_system_nodes) : session_expiry_queue(tick_time_ms), keeper_context(keeper_context_), superdigest(superdigest_) @@ -650,7 +662,6 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid) path, std::move(operation.data), operation.stat, - operation.is_sequental, std::move(operation.acls))) onStorageInconsistency(); @@ -729,8 +740,7 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid) bool KeeperStorage::createNode( const std::string & path, String data, - const Coordination::Stat & stat, - bool is_sequental, + const KeeperStorage::Node::Stat & stat, Coordination::ACLs node_acls) { auto parent_path = parentNodePath(path); @@ -753,7 +763,6 @@ bool KeeperStorage::createNode( created_node.acl_id = acl_id; created_node.stat = stat; created_node.setData(std::move(data)); - created_node.is_sequental = is_sequental; auto [map_key, _] = container.insert(path, created_node); /// Take child path from key owned by map. auto child_path = getBaseNodeName(map_key->getKey()); @@ -1012,7 +1021,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr new_deltas.emplace_back(std::string{parent_path}, zxid, KeeperStorage::UpdateNodeDelta{std::move(parent_update)}); - Coordination::Stat stat; + KeeperStorage::Node::Stat stat; stat.czxid = zxid; stat.mzxid = zxid; stat.pzxid = zxid; @@ -1022,13 +1031,12 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr stat.version = 0; stat.aversion = 0; stat.cversion = 0; - stat.dataLength = static_cast(request.data.length()); stat.ephemeralOwner = request.is_ephemeral ? session_id : 0; new_deltas.emplace_back( std::move(path_created), zxid, - KeeperStorage::CreateNodeDelta{stat, request.is_sequential, std::move(node_acls), request.data}); + KeeperStorage::CreateNodeDelta{stat, std::move(node_acls), request.data}); digest = storage.calculateNodesDigest(digest, new_deltas); return new_deltas; @@ -1126,7 +1134,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce } else { - response.stat = node_it->value.stat; + node_it->value.setResponseStat(response.stat); response.data = node_it->value.getData(); response.error = Coordination::Error::ZOK; } @@ -1285,7 +1293,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr } else { - response.stat = node_it->value.stat; + node_it->value.setResponseStat(response.stat); response.error = Coordination::Error::ZOK; } @@ -1345,7 +1353,6 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce value.stat.version++; value.stat.mzxid = zxid; value.stat.mtime = time; - value.stat.dataLength = static_cast(data.length()); value.setData(data); }, request.version}); @@ -1384,7 +1391,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce if (node_it == container.end()) onStorageInconsistency(); - response.stat = node_it->value.stat; + node_it->value.setResponseStat(response.stat); response.error = Coordination::Error::ZOK; return response_ptr; @@ -1481,7 +1488,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc response.names.push_back(child.toString()); } - response.stat = node_it->value.stat; + node_it->value.setResponseStat(response.stat); response.error = Coordination::Error::ZOK; } @@ -1675,7 +1682,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr auto node_it = storage.container.find(request.path); if (node_it == storage.container.end()) onStorageInconsistency(); - response.stat = node_it->value.stat; + node_it->value.setResponseStat(response.stat); response.error = Coordination::Error::ZOK; return response_ptr; @@ -1729,7 +1736,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr } else { - response.stat = node_it->value.stat; + node_it->value.setResponseStat(response.stat); response.acl = storage.acl_map.convertNumber(node_it->value.acl_id); } diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index ec5df74efb60..01c1413a8842 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -30,24 +30,47 @@ struct KeeperStorageSnapshot; class KeeperStorage { public: + /// Node should have as minimal size as possible to reduce memory footprint + /// of stored nodes + /// New fields should be added to the struct only if it's really necessary struct Node { + /// to reduce size of the Node struct we use a custom Stat without dataLength + struct Stat + { + int64_t czxid{0}; + int64_t mzxid{0}; + int64_t ctime{0}; + int64_t mtime{0}; + int32_t version{0}; + int32_t cversion{0}; + int32_t aversion{0}; + int32_t numChildren{0}; /// NOLINT + int64_t ephemeralOwner{0}; /// NOLINT + int64_t pzxid{0}; + + bool operator==(const Stat &) const = default; + }; + uint64_t acl_id = 0; /// 0 -- no ACL by default - bool is_sequental = false; - Coordination::Stat stat{}; + Stat stat{}; int32_t seq_num = 0; - uint64_t size_bytes; // save size to avoid calculate every time - Node() : size_bytes(sizeof(Node)) { } + /// we cannot use `std::optional because we want to + /// pack the boolean with seq_num above + mutable bool has_cached_digest = false; + mutable uint64_t cached_digest = 0; + + void setResponseStat(Coordination::Stat & response_stat) const; /// Object memory size - uint64_t sizeInBytes() const { return size_bytes; } + uint64_t sizeInBytes() const; void setData(String new_data); const auto & getData() const noexcept { return data; } - void addChild(StringRef child_path, bool update_size = true); + void addChild(StringRef child_path); void removeChild(StringRef child_path); @@ -63,13 +86,9 @@ class KeeperStorage // copy only necessary information for preprocessing and digest calculation // (e.g. we don't need to copy list of children) void shallowCopy(const Node & other); - - void recalculateSize(); - private: String data; ChildrenSet children{}; - mutable std::optional cached_digest; }; enum DigestVersion : uint8_t @@ -158,8 +177,7 @@ class KeeperStorage // - quickly commit the changes to the storage struct CreateNodeDelta { - Coordination::Stat stat; - bool is_sequental; + KeeperStorage::Node::Stat stat; Coordination::ACLs acls; String data; }; @@ -324,8 +342,7 @@ class KeeperStorage bool createNode( const std::string & path, String data, - const Coordination::Stat & stat, - bool is_sequental, + const KeeperStorage::Node::Stat & stat, Coordination::ACLs node_acls); // Remove node in the storage diff --git a/src/Coordination/SnapshotableHashTable.h b/src/Coordination/SnapshotableHashTable.h index 093126237ef1..b12e797d84ed 100644 --- a/src/Coordination/SnapshotableHashTable.h +++ b/src/Coordination/SnapshotableHashTable.h @@ -3,30 +3,93 @@ #include #include #include -#include #include -#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + template struct ListNode { StringRef key; V value; - /// Monotonically increasing version info for snapshot - size_t version{0}; - bool active_in_map{true}; - bool free_key{false}; + /// |* * ****** | + /// ^ ^ ^ + /// active_in_map free_key version + /// (1 byte) (1 byte) (6 bytes) + uint64_t node_metadata = 0; + + void setInactiveInMap() + { + node_metadata &= ~active_in_map_mask; + } + + void setActiveInMap() + { + node_metadata |= active_in_map_mask; + } + + bool isActiveInMap() + { + return node_metadata & active_in_map_mask; + } + + void setFreeKey() + { + node_metadata |= free_key_mask; + } + + bool getFreeKey() + { + return node_metadata & free_key_mask; + } + + uint64_t getVersion() + { + return node_metadata & version_mask; + } + + void setVersion(uint64_t version) + { + if (version > version_mask) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Snapshot version {} is larger than maximum allowed value {}", + version, + ListNode::version_mask); + + node_metadata &= ~version_mask; + node_metadata |= version; + } + + static constexpr uint64_t active_in_map_mask = static_cast(1) << 63; + static constexpr uint64_t free_key_mask = static_cast(1) << 62; + static constexpr uint64_t version_mask = ~(static_cast(3) << 62); }; template class SnapshotableHashTable { private: + struct GlobalArena + { + char * alloc(const size_t size) + { + return new char[size]; + } + + void free(const char * ptr, size_t /*size*/) + { + delete [] ptr; + } + }; using ListElem = ListNode; using List = std::list; @@ -39,7 +102,12 @@ class SnapshotableHashTable /// Allows to avoid additional copies in updateValue function size_t current_version{0}; size_t snapshot_up_to_version{0}; - ArenaWithFreeLists arena; + + /// Arena used for keys + /// we don't use std::string because it uses 24 bytes (because of SSO) + /// we want to always allocate the key on heap and use StringRef to it + GlobalArena arena; + /// Collect invalid iterators to avoid traversing the whole list std::vector snapshot_invalid_iters; @@ -132,11 +200,13 @@ class SnapshotableHashTable if (!it) { - ListElem elem{copyStringInArena(arena, key), value, current_version}; + ListElem elem{copyStringInArena(arena, key), value}; + elem.setVersion(current_version); auto itr = list.insert(list.end(), std::move(elem)); bool inserted; map.emplace(itr->key, it, inserted, hash_value); - assert(inserted); + itr->setActiveInMap(); + chassert(inserted); it->getMapped() = itr; updateDataSize(INSERT, key.size(), value.sizeInBytes(), 0); @@ -154,11 +224,13 @@ class SnapshotableHashTable if (it == map.end()) { - ListElem elem{copyStringInArena(arena, key), value, current_version}; + ListElem elem{copyStringInArena(arena, key), value}; + elem.setVersion(current_version); auto itr = list.insert(list.end(), std::move(elem)); bool inserted; map.emplace(itr->key, it, inserted, hash_value); - assert(inserted); + itr->setActiveInMap(); + chassert(inserted); it->getMapped() = itr; } else @@ -166,8 +238,9 @@ class SnapshotableHashTable auto list_itr = it->getMapped(); if (snapshot_mode) { - ListElem elem{list_itr->key, value, current_version}; - list_itr->active_in_map = false; + ListElem elem{list_itr->key, value}; + elem.setVersion(current_version); + list_itr->setInactiveInMap(); auto new_list_itr = list.insert(list.end(), std::move(elem)); it->getMapped() = new_list_itr; snapshot_invalid_iters.push_back(list_itr); @@ -190,9 +263,9 @@ class SnapshotableHashTable uint64_t old_data_size = list_itr->value.sizeInBytes(); if (snapshot_mode) { - list_itr->active_in_map = false; + list_itr->setInactiveInMap(); snapshot_invalid_iters.push_back(list_itr); - list_itr->free_key = true; + list_itr->setFreeKey(); map.erase(it->getKey()); } else @@ -215,7 +288,7 @@ class SnapshotableHashTable { size_t hash_value = map.hash(key); auto it = map.find(key, hash_value); - assert(it != map.end()); + chassert(it != map.end()); auto list_itr = it->getMapped(); uint64_t old_value_size = list_itr->value.sizeInBytes(); @@ -228,13 +301,14 @@ class SnapshotableHashTable /// We in snapshot mode but updating some node which is already more /// fresh than snapshot distance. So it will not participate in /// snapshot and we don't need to copy it. - if (list_itr->version <= snapshot_up_to_version) + if (list_itr->getVersion() <= snapshot_up_to_version) { auto elem_copy = *(list_itr); - list_itr->active_in_map = false; + list_itr->setInactiveInMap(); snapshot_invalid_iters.push_back(list_itr); updater(elem_copy.value); - elem_copy.version = current_version; + + elem_copy.setVersion(current_version); auto itr = list.insert(list.end(), std::move(elem_copy)); it->getMapped() = itr; ret = itr; @@ -269,17 +343,17 @@ class SnapshotableHashTable const V & getValue(StringRef key) const { auto it = map.find(key); - assert(it); + chassert(it); return it->getMapped()->value; } void clearOutdatedNodes() { - for (auto & itr: snapshot_invalid_iters) + for (auto & itr : snapshot_invalid_iters) { - assert(!itr->active_in_map); + chassert(!itr->isActiveInMap()); updateDataSize(CLEAR_OUTDATED_NODES, itr->key.size, itr->value.sizeInBytes(), 0); - if (itr->free_key) + if (itr->getFreeKey()) arena.free(const_cast(itr->key.data), itr->key.size); list.erase(itr); } @@ -327,13 +401,12 @@ class SnapshotableHashTable approximate_data_size = 0; for (auto & node : list) { - node.value.recalculateSize(); approximate_data_size += node.key.size; approximate_data_size += node.value.sizeInBytes(); } } - uint64_t keyArenaSize() const { return arena.allocatedBytes(); } + uint64_t keyArenaSize() const { return 0; } iterator begin() { return list.begin(); } const_iterator begin() const { return list.cbegin(); } diff --git a/src/Coordination/ZooKeeperDataReader.cpp b/src/Coordination/ZooKeeperDataReader.cpp index 3c1550f08c83..b55ebef327ff 100644 --- a/src/Coordination/ZooKeeperDataReader.cpp +++ b/src/Coordination/ZooKeeperDataReader.cpp @@ -120,7 +120,6 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L Coordination::read(node.stat.pzxid, in); if (!path.empty()) { - node.stat.dataLength = static_cast(node.getData().length()); node.seq_num = node.stat.cversion; storage.container.insertOrReplace(path, node); diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index dd19f0b9967a..f4e731495894 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1367,11 +1367,11 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) auto itr = map_snp.begin(); EXPECT_EQ(itr->key, "/hello"); EXPECT_EQ(itr->value, 7); - EXPECT_EQ(itr->active_in_map, false); + EXPECT_EQ(itr->isActiveInMap(), false); itr = std::next(itr); EXPECT_EQ(itr->key, "/hello"); EXPECT_EQ(itr->value, 554); - EXPECT_EQ(itr->active_in_map, true); + EXPECT_EQ(itr->isActiveInMap(), true); itr = std::next(itr); EXPECT_EQ(itr, map_snp.end()); for (int i = 0; i < 5; ++i) @@ -1387,7 +1387,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) { EXPECT_EQ(itr->key, "/hello" + std::to_string(i)); EXPECT_EQ(itr->value, i); - EXPECT_EQ(itr->active_in_map, true); + EXPECT_EQ(itr->isActiveInMap(), true); itr = std::next(itr); } @@ -1401,7 +1401,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) { EXPECT_EQ(itr->key, "/hello" + std::to_string(i)); EXPECT_EQ(itr->value, i); - EXPECT_EQ(itr->active_in_map, i != 3 && i != 2); + EXPECT_EQ(itr->isActiveInMap(), i != 3 && i != 2); itr = std::next(itr); } map_snp.clearOutdatedNodes(); @@ -1411,19 +1411,19 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) itr = map_snp.begin(); EXPECT_EQ(itr->key, "/hello"); EXPECT_EQ(itr->value, 554); - EXPECT_EQ(itr->active_in_map, true); + EXPECT_EQ(itr->isActiveInMap(), true); itr = std::next(itr); EXPECT_EQ(itr->key, "/hello0"); EXPECT_EQ(itr->value, 0); - EXPECT_EQ(itr->active_in_map, true); + EXPECT_EQ(itr->isActiveInMap(), true); itr = std::next(itr); EXPECT_EQ(itr->key, "/hello1"); EXPECT_EQ(itr->value, 1); - EXPECT_EQ(itr->active_in_map, true); + EXPECT_EQ(itr->isActiveInMap(), true); itr = std::next(itr); EXPECT_EQ(itr->key, "/hello4"); EXPECT_EQ(itr->value, 4); - EXPECT_EQ(itr->active_in_map, true); + EXPECT_EQ(itr->isActiveInMap(), true); itr = std::next(itr); EXPECT_EQ(itr, map_snp.end()); map_snp.disableSnapshotMode(); diff --git a/utils/keeper-data-dumper/main.cpp b/utils/keeper-data-dumper/main.cpp index aa8c0efbb265..e06b301edbfc 100644 --- a/utils/keeper-data-dumper/main.cpp +++ b/utils/keeper-data-dumper/main.cpp @@ -31,7 +31,7 @@ void dumpMachine(std::shared_ptr machine) ", czxid: " << value.stat.czxid << ", mzxid: " << value.stat.mzxid << ", numChildren: " << value.stat.numChildren << - ", dataLength: " << value.stat.dataLength << + ", dataLength: " << value.getData().size() << "}" << std::endl; std::cout << "\tData: " << storage.container.getValue(key).getData() << std::endl; From fdf8008804054238affd29bd99028cf280340b51 Mon Sep 17 00:00:00 2001 From: skyoct Date: Mon, 22 Jan 2024 10:27:35 +0000 Subject: [PATCH 2/7] Fix redirect retry --- src/IO/ReadWriteBufferFromHTTP.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/IO/ReadWriteBufferFromHTTP.cpp b/src/IO/ReadWriteBufferFromHTTP.cpp index 6dd6269e16fd..ea18b369c523 100644 --- a/src/IO/ReadWriteBufferFromHTTP.cpp +++ b/src/IO/ReadWriteBufferFromHTTP.cpp @@ -216,6 +216,9 @@ void ReadWriteBufferFromHTTPBase::getHeadResponse(Poco::Net if (i == settings.http_max_tries - 1 || !isRetriableError(response.getStatus())) throw; + if (e.code() == ErrorCodes::TOO_MANY_REDIRECTS) + throw; + LOG_ERROR(log, "Failed to make HTTP_HEAD request to {}. Error: {}", uri.toString(), e.displayText()); } } @@ -545,6 +548,9 @@ bool ReadWriteBufferFromHTTPBase::nextImpl() if (e.code() == POCO_EMFILE) throw; + if (e.code() == ErrorCodes::TOO_MANY_REDIRECTS) + throw; + /** Retry request unconditionally if nothing has been read yet. * Otherwise if it is GET method retry with range header. */ From 274c128bd78f91cac46a3b9d89b4990c71e56608 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 22 Jan 2024 08:23:04 +0000 Subject: [PATCH 3/7] Free memory --- src/Coordination/SnapshotableHashTable.h | 12 +++++++----- src/Coordination/tests/gtest_coordination.cpp | 5 +---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/Coordination/SnapshotableHashTable.h b/src/Coordination/SnapshotableHashTable.h index b12e797d84ed..ac8d36745c2d 100644 --- a/src/Coordination/SnapshotableHashTable.h +++ b/src/Coordination/SnapshotableHashTable.h @@ -1,7 +1,6 @@ #pragma once #include #include -#include #include #include @@ -60,10 +59,7 @@ struct ListNode { if (version > version_mask) throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Snapshot version {} is larger than maximum allowed value {}", - version, - ListNode::version_mask); + ErrorCodes::LOGICAL_ERROR, "Snapshot version {} is larger than maximum allowed value {}", version, version_mask); node_metadata &= ~version_mask; node_metadata |= version; @@ -193,6 +189,11 @@ class SnapshotableHashTable using const_iterator = typename List::const_iterator; using ValueUpdater = std::function; + ~SnapshotableHashTable() + { + clear(); + } + std::pair insert(const std::string & key, const V & value) { size_t hash_value = map.hash(key); @@ -362,6 +363,7 @@ class SnapshotableHashTable void clear() { + clearOutdatedNodes(); map.clear(); for (auto itr = list.begin(); itr != list.end(); ++itr) arena.free(const_cast(itr->key.data), itr->key.size); diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 8559accbaf47..c981085359e2 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -45,10 +45,7 @@ struct ChangelogDirTest bool drop; explicit ChangelogDirTest(std::string path_, bool drop_ = true) : path(path_), drop(drop_) { - if (fs::exists(path)) - { - EXPECT_TRUE(false) << "Path " << path << " already exists, remove it to run test"; - } + EXPECT_FALSE(fs::exists(path)) << "Path " << path << " already exists, remove it to run test"; fs::create_directory(path); } From be3b5dc45fdfd76585cb752b8a34fae9e34d7451 Mon Sep 17 00:00:00 2001 From: avogar Date: Tue, 23 Jan 2024 17:45:41 +0000 Subject: [PATCH 4/7] Allow to ignore schema evolution in Iceberg table engine under a setting --- docs/en/operations/settings/settings.md | 4 ++ src/Core/Settings.h | 3 +- src/Core/SettingsChangesHistory.h | 3 +- .../DataLakes/Iceberg/IcebergMetadata.cpp | 57 +++++++++++++++---- .../integration/test_storage_iceberg/test.py | 7 +++ 5 files changed, 60 insertions(+), 14 deletions(-) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index f085fe1abcdd..6444b76ba0e3 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -5197,3 +5197,7 @@ The value 0 means that you can delete all tables without any restrictions. :::note This query setting overwrites its server setting equivalent, see [max_table_size_to_drop](/docs/en/operations/server-configuration-parameters/settings.md/#max-table-size-to-drop) ::: + +## iceberg_engine_ignore_schema_evolution {#iceberg_engine_ignore_schema_evolution} + +Allow to ignore schema evolution in Iceberg table engine and read all data using latest schema saved on storage creation. \ No newline at end of file diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 305d6466658e..77a8c0ed7666 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -856,7 +856,8 @@ class IColumn; M(UInt64, cache_warmer_threads, 4, "Only available in ClickHouse Cloud", 0) \ M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \ M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \ - M(Bool, enable_order_by_all, true, "Enable sorting expression ORDER BY ALL.", 0)\ + M(Bool, enable_order_by_all, true, "Enable sorting expression ORDER BY ALL.", 0) \ + M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS. diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index ef37aa5fb47c..859ba99b5f7b 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -98,7 +98,8 @@ static std::map sett {"max_rows_in_set_to_optimize_join", 100000, 0, "Disable join optimization as it prevents from read in order optimization"}, {"output_format_pretty_color", true, "auto", "Setting is changed to allow also for auto value, disabling ANSI escapes if output is not a tty"}, {"function_visible_width_behavior", 0, 1, "We changed the default behavior of `visibleWidth` to be more precise"}, - {"max_estimated_execution_time", 0, 0, "Separate max_execution_time and max_estimated_execution_time"}}}, + {"max_estimated_execution_time", 0, 0, "Separate max_execution_time and max_estimated_execution_time"}, + {"iceberg_engine_ignore_schema_evolution", false, false, "Allow to ignore schema evolution in Iceberg table engine"}}}, {"23.12", {{"allow_suspicious_ttl_expressions", true, false, "It is a new setting, and in previous versions the behavior was equivalent to allowing."}, {"input_format_parquet_allow_missing_columns", false, true, "Allow missing columns in Parquet files by default"}, {"input_format_orc_allow_missing_columns", false, true, "Allow missing columns in ORC files by default"}, diff --git a/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp index b4ac00507a3d..e0c7e26a2e12 100644 --- a/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp @@ -240,7 +240,7 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t } -std::pair parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version) +std::pair parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution) { Poco::JSON::Object::Ptr schema; Int32 current_schema_id; @@ -253,13 +253,39 @@ std::pair parseTableSchema(const Poco::JSON::Object::P { current_schema_id = metadata_object->getValue("current-schema-id"); auto schemas = metadata_object->get("schemas").extract(); - if (schemas->size() != 1) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported"); + if (schemas->size() == 0) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty"); - /// Now we sure that there is only one schema. - schema = schemas->getObject(0); - if (schema->getValue("schema-id") != current_schema_id) - throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)"); + if (ignore_schema_evolution) + { + /// If we ignore schema evolution, we will just use latest schema for all data files. + /// Find schema with 'schema-id' equal to 'current_schema_id'. + for (uint32_t i = 0; i != schemas->size(); ++i) + { + auto current_schema = schemas->getObject(i); + if (current_schema->getValue("schema-id") == current_schema_id) + { + schema = current_schema; + break; + } + } + + if (!schema) + throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)"); + } + else + { + if (schemas->size() != 1) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, + "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is " + "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " + "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); + + /// Now we sure that there is only one schema. + schema = schemas->getObject(0); + if (schema->getValue("schema-id") != current_schema_id) + throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)"); + } } else { @@ -267,8 +293,11 @@ std::pair parseTableSchema(const Poco::JSON::Object::P current_schema_id = schema->getValue("schema-id"); /// Field "schemas" is optional for version 1, but after version 2 was introduced, /// in most cases this field is added for new tables in version 1 as well. - if (metadata_object->has("schemas") && metadata_object->get("schemas").extract()->size() > 1) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported"); + if (!ignore_schema_evolution && metadata_object->has("schemas") && metadata_object->get("schemas").extract()->size() > 1) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, + "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not " + "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " + "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); } NamesAndTypesList names_and_types; @@ -356,7 +385,7 @@ std::unique_ptr parseIcebergMetadata(const StorageS3::Configura Poco::JSON::Object::Ptr object = json.extract(); auto format_version = object->getValue("format-version"); - auto [schema, schema_id] = parseTableSchema(object, format_version); + auto [schema, schema_id] = parseTableSchema(object, format_version, context_->getSettingsRef().iceberg_engine_ignore_schema_evolution); auto current_snapshot_id = object->getValue("current-snapshot-id"); auto snapshots = object->get("snapshots").extract(); @@ -453,8 +482,12 @@ Strings IcebergMetadata::getDataFiles() Poco::JSON::Parser parser; Poco::Dynamic::Var json = parser.parse(schema_json_string); Poco::JSON::Object::Ptr schema_object = json.extract(); - if (schema_object->getValue("schema-id") != current_schema_id) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported"); + if (!getContext()->getSettingsRef().iceberg_engine_ignore_schema_evolution && schema_object->getValue("schema-id") != current_schema_id) + throw Exception( + ErrorCodes::UNSUPPORTED_METHOD, + "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not " + "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " + "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); avro::NodePtr root_node = manifest_file_reader->dataSchema().root(); size_t leaves_num = root_node->leaves(); diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 9a75dc50d619..30962dc619c5 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -399,6 +399,8 @@ def test_evolved_schema(started_cluster, format_version): assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 + expected_data = instance.query(f"SELECT * FROM {TABLE_NAME} order by a, b") + spark.sql(f"ALTER TABLE {TABLE_NAME} ADD COLUMNS (x bigint)") files = upload_directory( minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" @@ -407,6 +409,11 @@ def test_evolved_schema(started_cluster, format_version): error = instance.query_and_get_error(f"SELECT * FROM {TABLE_NAME}") assert "UNSUPPORTED_METHOD" in error + data = instance.query( + f"SELECT * FROM {TABLE_NAME} SETTINGS iceberg_engine_ignore_schema_evolution=1" + ) + assert data == expected_data + def test_row_based_deletes(started_cluster): instance = started_cluster.instances["node1"] From 1fd4b26c3c3429528e8b21479654216bee98a75d Mon Sep 17 00:00:00 2001 From: avogar Date: Tue, 23 Jan 2024 17:48:50 +0000 Subject: [PATCH 5/7] Better docs --- docs/en/operations/settings/settings.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 6444b76ba0e3..75d05d553663 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -5200,4 +5200,10 @@ This query setting overwrites its server setting equivalent, see [max_table_size ## iceberg_engine_ignore_schema_evolution {#iceberg_engine_ignore_schema_evolution} -Allow to ignore schema evolution in Iceberg table engine and read all data using latest schema saved on storage creation. \ No newline at end of file +Allow to ignore schema evolution in Iceberg table engine and read all data using schema specified by the user on table creation or latest schema parsed from metadata on table creation. + +:::note +Enabling this setting can lead to incorrect result as in case of evolved schema all data files will be read using the same schema. +::: + +Default value: 'false'. \ No newline at end of file From 95829e074eb208ab84e6065c5907d042990ef532 Mon Sep 17 00:00:00 2001 From: skyoct Date: Wed, 24 Jan 2024 03:49:21 +0000 Subject: [PATCH 6/7] Opt code --- src/IO/ReadWriteBufferFromHTTP.cpp | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/IO/ReadWriteBufferFromHTTP.cpp b/src/IO/ReadWriteBufferFromHTTP.cpp index ea18b369c523..fccbf13a5196 100644 --- a/src/IO/ReadWriteBufferFromHTTP.cpp +++ b/src/IO/ReadWriteBufferFromHTTP.cpp @@ -213,10 +213,7 @@ void ReadWriteBufferFromHTTPBase::getHeadResponse(Poco::Net } catch (const Poco::Exception & e) { - if (i == settings.http_max_tries - 1 || !isRetriableError(response.getStatus())) - throw; - - if (e.code() == ErrorCodes::TOO_MANY_REDIRECTS) + if (i == settings.http_max_tries - 1 || e.code() == ErrorCodes::TOO_MANY_REDIRECTS || !isRetriableError(response.getStatus())) throw; LOG_ERROR(log, "Failed to make HTTP_HEAD request to {}. Error: {}", uri.toString(), e.displayText()); @@ -544,11 +541,8 @@ bool ReadWriteBufferFromHTTPBase::nextImpl() } catch (const Poco::Exception & e) { - /// Too many open files - non-retryable. - if (e.code() == POCO_EMFILE) - throw; - - if (e.code() == ErrorCodes::TOO_MANY_REDIRECTS) + /// Too many open files or redirects - non-retryable. + if (e.code() == POCO_EMFILE || e.code() == ErrorCodes::TOO_MANY_REDIRECTS) throw; /** Retry request unconditionally if nothing has been read yet. From 718064bb43e98da65889d8381b81e2c64df07329 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Wed, 24 Jan 2024 12:19:10 +0100 Subject: [PATCH 7/7] Minor change to restart CI --- src/IO/ReadWriteBufferFromHTTP.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/IO/ReadWriteBufferFromHTTP.cpp b/src/IO/ReadWriteBufferFromHTTP.cpp index fccbf13a5196..723a12d0e5bf 100644 --- a/src/IO/ReadWriteBufferFromHTTP.cpp +++ b/src/IO/ReadWriteBufferFromHTTP.cpp @@ -4,8 +4,8 @@ namespace ProfileEvents { -extern const Event ReadBufferSeekCancelConnection; -extern const Event ReadWriteBufferFromHTTPPreservedSessions; + extern const Event ReadBufferSeekCancelConnection; + extern const Event ReadWriteBufferFromHTTPPreservedSessions; } namespace DB