Skip to content

Commit

Permalink
Merge branch 'ClickHouse:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex-Cheng authored Jan 24, 2024
2 parents 10aaf2c + 89bafbc commit 3e795cc
Show file tree
Hide file tree
Showing 14 changed files with 277 additions and 121 deletions.
10 changes: 10 additions & 0 deletions docs/en/operations/settings/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -5197,3 +5197,13 @@ The value 0 means that you can delete all tables without any restrictions.
:::note
This query setting overwrites its server setting equivalent, see [max_table_size_to_drop](/docs/en/operations/server-configuration-parameters/settings.md/#max-table-size-to-drop)
:::

## iceberg_engine_ignore_schema_evolution {#iceberg_engine_ignore_schema_evolution}

Allow to ignore schema evolution in Iceberg table engine and read all data using schema specified by the user on table creation or latest schema parsed from metadata on table creation.

:::note
Enabling this setting can lead to incorrect result as in case of evolved schema all data files will be read using the same schema.
:::

Default value: 'false'.
38 changes: 23 additions & 15 deletions src/Coordination/KeeperSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ namespace

/// Serialize ACL
writeBinary(node.acl_id, out);
writeBinary(node.is_sequental, out);
/// Write is_sequential for backwards compatibility
if (version < SnapshotVersion::V6)
writeBinary(false, out);

/// Serialize stat
writeBinary(node.stat.czxid, out);
writeBinary(node.stat.mzxid, out);
Expand All @@ -84,16 +87,15 @@ namespace
writeBinary(node.stat.cversion, out);
writeBinary(node.stat.aversion, out);
writeBinary(node.stat.ephemeralOwner, out);
writeBinary(node.stat.dataLength, out);
if (version < SnapshotVersion::V6)
writeBinary(static_cast<int32_t>(node.getData().size()), out);
writeBinary(node.stat.numChildren, out);
writeBinary(node.stat.pzxid, out);

writeBinary(node.seq_num, out);

if (version >= SnapshotVersion::V4)
{
writeBinary(node.size_bytes, out);
}
if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5)
writeBinary(node.sizeInBytes(), out);
}

void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
Expand Down Expand Up @@ -129,7 +131,11 @@ namespace

acl_map.addUsage(node.acl_id);

readBinary(node.is_sequental, in);
if (version < SnapshotVersion::V6)
{
bool is_sequential = false;
readBinary(is_sequential, in);
}

/// Deserialize stat
readBinary(node.stat.czxid, in);
Expand All @@ -140,14 +146,19 @@ namespace
readBinary(node.stat.cversion, in);
readBinary(node.stat.aversion, in);
readBinary(node.stat.ephemeralOwner, in);
readBinary(node.stat.dataLength, in);
if (version < SnapshotVersion::V6)
{
int32_t data_length = 0;
readBinary(data_length, in);
}
readBinary(node.stat.numChildren, in);
readBinary(node.stat.pzxid, in);
readBinary(node.seq_num, in);

if (version >= SnapshotVersion::V4)
if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5)
{
readBinary(node.size_bytes, in);
uint64_t size_bytes = 0;
readBinary(size_bytes, in);
}
}

Expand Down Expand Up @@ -354,7 +365,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial

const auto is_node_empty = [](const auto & node)
{
return node.getData().empty() && node.stat == Coordination::Stat{};
return node.getData().empty() && node.stat == KeeperStorage::Node::Stat{};
};

for (size_t nodes_read = 0; nodes_read < snapshot_container_size; ++nodes_read)
Expand Down Expand Up @@ -398,9 +409,6 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
error_msg);
}

// we always ignore the written size for this node
node.recalculateSize();
}

storage.container.insertOrReplace(path, node);
Expand All @@ -417,7 +425,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
{
auto parent_path = parentNodePath(itr.key);
storage.container.updateValue(
parent_path, [version, path = itr.key](KeeperStorage::Node & value) { value.addChild(getBaseNodeName(path), /*update_size*/ version < SnapshotVersion::V4); });
parent_path, [path = itr.key](KeeperStorage::Node & value) { value.addChild(getBaseNodeName(path)); });
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/Coordination/KeeperSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ enum SnapshotVersion : uint8_t
V3 = 3, /// compress snapshots with ZSTD codec
V4 = 4, /// add Node size to snapshots
V5 = 5, /// add ZXID and digest to snapshots
V6 = 6, /// remove is_sequential, per node size, data length
};

static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V5;
static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V6;

/// What is stored in binary snapshot
struct SnapshotDeserializationResult
Expand Down
71 changes: 39 additions & 32 deletions src/Coordination/KeeperStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <Coordination/KeeperStorage.h>
#include <Coordination/KeeperDispatcher.h>

#include <mutex>
#include <functional>
#include <base/defines.h>
#include <filesystem>
Expand Down Expand Up @@ -167,7 +166,7 @@ KeeperStorage::ResponsesForSessions processWatchesImpl(
}

// When this function is updated, update CURRENT_DIGEST_VERSION!!
uint64_t calculateDigest(std::string_view path, std::string_view data, const Coordination::Stat & stat)
uint64_t calculateDigest(std::string_view path, std::string_view data, const KeeperStorage::Node::Stat & stat)
{
SipHash hash;

Expand All @@ -184,7 +183,7 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Coo
hash.update(stat.cversion);
hash.update(stat.aversion);
hash.update(stat.ephemeralOwner);
hash.update(stat.dataLength);
hash.update(data.length());
hash.update(stat.numChildren);
hash.update(stat.pzxid);

Expand All @@ -193,36 +192,56 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Coo

}

void KeeperStorage::Node::setResponseStat(Coordination::Stat & response_stat) const
{
response_stat.czxid = stat.czxid;
response_stat.mzxid = stat.mzxid;
response_stat.ctime = stat.ctime;
response_stat.mtime = stat.mtime;
response_stat.version = stat.version;
response_stat.cversion = stat.cversion;
response_stat.aversion = stat.aversion;
response_stat.ephemeralOwner = stat.ephemeralOwner;
response_stat.dataLength = static_cast<int32_t>(data.size());
response_stat.numChildren = stat.numChildren;
response_stat.pzxid = stat.pzxid;

}

uint64_t KeeperStorage::Node::sizeInBytes() const
{
return sizeof(Node) + children.size() * sizeof(StringRef) + data.size();
}

void KeeperStorage::Node::setData(String new_data)
{
size_bytes = size_bytes - data.size() + new_data.size();
data = std::move(new_data);
}

void KeeperStorage::Node::addChild(StringRef child_path, bool update_size)
void KeeperStorage::Node::addChild(StringRef child_path)
{
if (update_size) [[likely]]
size_bytes += sizeof child_path;
children.insert(child_path);
}

void KeeperStorage::Node::removeChild(StringRef child_path)
{
size_bytes -= sizeof child_path;
children.erase(child_path);
}

void KeeperStorage::Node::invalidateDigestCache() const
{
cached_digest.reset();
has_cached_digest = false;
}

UInt64 KeeperStorage::Node::getDigest(const std::string_view path) const
{
if (!cached_digest)
if (!has_cached_digest)
{
cached_digest = calculateDigest(path, data, stat);
has_cached_digest = true;
}

return *cached_digest;
return cached_digest;
};

void KeeperStorage::Node::shallowCopy(const KeeperStorage::Node & other)
Expand All @@ -233,13 +252,6 @@ void KeeperStorage::Node::shallowCopy(const KeeperStorage::Node & other)
cached_digest = other.cached_digest;
}

void KeeperStorage::Node::recalculateSize()
{
size_bytes = sizeof(Node);
size_bytes += children.size() * sizeof(decltype(children)::value_type);
size_bytes += data.size();
}

KeeperStorage::KeeperStorage(
int64_t tick_time_ms, const String & superdigest_, const KeeperContextPtr & keeper_context_, const bool initialize_system_nodes)
: session_expiry_queue(tick_time_ms), keeper_context(keeper_context_), superdigest(superdigest_)
Expand Down Expand Up @@ -650,7 +662,6 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
path,
std::move(operation.data),
operation.stat,
operation.is_sequental,
std::move(operation.acls)))
onStorageInconsistency();

Expand Down Expand Up @@ -729,8 +740,7 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
bool KeeperStorage::createNode(
const std::string & path,
String data,
const Coordination::Stat & stat,
bool is_sequental,
const KeeperStorage::Node::Stat & stat,
Coordination::ACLs node_acls)
{
auto parent_path = parentNodePath(path);
Expand All @@ -753,7 +763,6 @@ bool KeeperStorage::createNode(
created_node.acl_id = acl_id;
created_node.stat = stat;
created_node.setData(std::move(data));
created_node.is_sequental = is_sequental;
auto [map_key, _] = container.insert(path, created_node);
/// Take child path from key owned by map.
auto child_path = getBaseNodeName(map_key->getKey());
Expand Down Expand Up @@ -1012,7 +1021,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr

new_deltas.emplace_back(std::string{parent_path}, zxid, KeeperStorage::UpdateNodeDelta{std::move(parent_update)});

Coordination::Stat stat;
KeeperStorage::Node::Stat stat;
stat.czxid = zxid;
stat.mzxid = zxid;
stat.pzxid = zxid;
Expand All @@ -1022,13 +1031,12 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
stat.version = 0;
stat.aversion = 0;
stat.cversion = 0;
stat.dataLength = static_cast<Int32>(request.data.length());
stat.ephemeralOwner = request.is_ephemeral ? session_id : 0;

new_deltas.emplace_back(
std::move(path_created),
zxid,
KeeperStorage::CreateNodeDelta{stat, request.is_sequential, std::move(node_acls), request.data});
KeeperStorage::CreateNodeDelta{stat, std::move(node_acls), request.data});

digest = storage.calculateNodesDigest(digest, new_deltas);
return new_deltas;
Expand Down Expand Up @@ -1126,7 +1134,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
}
else
{
response.stat = node_it->value.stat;
node_it->value.setResponseStat(response.stat);
response.data = node_it->value.getData();
response.error = Coordination::Error::ZOK;
}
Expand Down Expand Up @@ -1285,7 +1293,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
}
else
{
response.stat = node_it->value.stat;
node_it->value.setResponseStat(response.stat);
response.error = Coordination::Error::ZOK;
}

Expand Down Expand Up @@ -1345,7 +1353,6 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
value.stat.version++;
value.stat.mzxid = zxid;
value.stat.mtime = time;
value.stat.dataLength = static_cast<Int32>(data.length());
value.setData(data);
},
request.version});
Expand Down Expand Up @@ -1384,7 +1391,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
if (node_it == container.end())
onStorageInconsistency();

response.stat = node_it->value.stat;
node_it->value.setResponseStat(response.stat);
response.error = Coordination::Error::ZOK;

return response_ptr;
Expand Down Expand Up @@ -1481,7 +1488,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
response.names.push_back(child.toString());
}

response.stat = node_it->value.stat;
node_it->value.setResponseStat(response.stat);
response.error = Coordination::Error::ZOK;
}

Expand Down Expand Up @@ -1675,7 +1682,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
auto node_it = storage.container.find(request.path);
if (node_it == storage.container.end())
onStorageInconsistency();
response.stat = node_it->value.stat;
node_it->value.setResponseStat(response.stat);
response.error = Coordination::Error::ZOK;

return response_ptr;
Expand Down Expand Up @@ -1729,7 +1736,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
}
else
{
response.stat = node_it->value.stat;
node_it->value.setResponseStat(response.stat);
response.acl = storage.acl_map.convertNumber(node_it->value.acl_id);
}

Expand Down
Loading

0 comments on commit 3e795cc

Please sign in to comment.