Skip to content

Commit

Permalink
[#23785] YSQL: Incrementally refresh PG backend catcaches (part 4)
Browse files Browse the repository at this point in the history
Summary:
In part 3, I have made changes to extend the tserver to master heartbeat response
message to also include the contents of `pg_yb_invalidation_messages` along with
the contents of `pg_yb_catalog_version` when there is a change in
`pg_yb_catalog_version` (via the existing fingerprint mechanism). The
invalidation messages is set in the `db_catalog_inval_messages_data` proto field
of the heartbeat response message.

This diff reads the `db_catalog_inval_messages_data` from the heartbeat response
message, and store it in the tserver private memory. A new map
`ysql_db_invalidation_messages_map_` is added: for each database, it stores a
doubly ended queue. Each element of the queue is a pair: (current_version,
messages). The current_version is the catalog version, and messages is a blob
representing the list of catalog cache invalidation messages generated by PG for
the current_version.

The maximum size of the queue is controlled by --ysql_max_invalidation_message_queue_size,
default to 1024. This means that for each database, we can store a history of
up to 1024 catalog versions. Logically this map simply extends the history of
`ysql_db_invalidation_messages_map_`. Because by default we only store a history
of 10 seconds of invalidation messages for each database, it is likely that in
the heartbeat response we see much less than 1024 versions. In that case, the
new versions and their associated messages are merged into the in-memory map
`ysql_db_invalidation_messages_map_` while old entries are removed from the front
of the queue. In this way, we let each tserver keep a more extended history of
catalog versions and their invalidation messages, so that we can tolerate a PG
transaction block that can run longer: a PG transaction block cannot do catalog
cache refresh (whether incremental or full refresh) until the transaction
completes. As a result, by the time the PG transaction block completes, there
may be many DDL statements already executed. For example, if the PG local
catalog version is 1 when it starts a transaction block, by the time the
transaction block completes, 100 DDLs have been executed and 50 of them have
incremented catalog versions, the latest catalog version that PG reads from
shared memory is now 51. PG will need to read the entire sequence of 2, 3, ...,
51 and their invalidation messages in order to do a valid incremental cache
refresh. By having a history of up to 1024 catalog versions, we can allow a
longer running PG transaction block to still do incremental refresh.

Note that even though `pg_yb_catalog_version` and `pg_yb_invalidation_messages`
are written transactionally, they are not read transactionally at the master
side when it prepares for the heartbeat response. Therefore we do not try to
process the `db_catalog_version_data` and `db_catalog_inval_messages_data`
atomically at tserver side since they could be out of sync in rare cases when
the following sequence of events happens at master side:
(1) pg_yb_catalog_version is read and put in response
(2) pg_yb_catalog_version and pg_yb_invalidation_messages are updated
transactionally (current_version = current_version + 1, with the messages)
(3) pg_yb_invalidation_messages is read and put in response

Test Plan:
(1) Run
YB_EXTRA_MASTER_FLAGS="--TEST_yb_enable_invalidation_messages=true --log_ysql_catalog_versions=true --vmodule=catalog_manager=2,heartbeater=2,master_heartbeat_service=2,pg_catversions=2 --TEST_simulate_catalog_message_read_failure=0.5" YB_EXTRA_TSERVER_FLAGS="--TEST_yb_enable_invalidation_messages=true --log_ysql_catalog_versions=true --vmodule=heartbeater=2,tablet_server=2,pg_catversions=2 --ysql_max_invalidation_message_queue_size=15" ./yb_build.sh --cxx-test pg_catalog_version-test

Also look at the test logs indicating some code coverage:

```
[m-1] W0228 00:36:14.667902 4069245 master_heartbeat_service.cc:358] Could not get YSQL invalidation
messages for heartbeat response: Internal error (yb/master/sys_catalog.cc:1695): Injected pg_yb_invalidation_messages read failure for testing.
```

```
[ts-3] I0228 00:36:54.320386 4069668 tablet_server.cc:1204] reset catalog_versions_fingerprint_

```

```
[ts-2] W0228 00:29:16.253911 4061495 tablet_server.cc:1265] db_oid 16384 not found in ysql_db_invalidation_messages_map_
```

```
[ts-3] I0228 00:32:06.696022 4064671 tablet_server.cc:1234] vlog2: db_oid 1 message queue size: 4
```

(2) Manual test:
```
yugabyte=# \i /tmp/t1.sql
create table foo (id int);
CREATE TABLE
alter table foo add column id2 text;
ALTER TABLE
alter table foo drop column id2;
ALTER TABLE
alter table foo add column id2 text;
ALTER TABLE
alter table foo drop column id2;
ALTER TABLE
alter table foo add column id2 text;
ALTER TABLE
alter table foo drop column id2;
ALTER TABLE
alter table foo add column id2 text;
ALTER TABLE
alter table foo drop column id2;
ALTER TABLE
alter table foo add column id2 text;
ALTER TABLE
alter table foo drop column id2;
ALTER TABLE
alter table foo add column id2 text;
ALTER TABLE
alter table foo drop column id2;
ALTER TABLE
alter table foo add column id2 text;
ALTER TABLE
alter table foo drop column id2;
ALTER TABLE
alter table foo add column id2 text;
ALTER TABLE
alter table foo drop column id2;
ALTER TABLE
alter table foo add column id2 text;
ALTER TABLE
alter table foo drop column id2;
ALTER TABLE
alter table foo add column id2 text;
ALTER TABLE
alter table foo drop column id2;
ALTER TABLE
yugabyte=# alter table foo add column id2 text;
ALTER TABLE
yugabyte=# alter table foo drop column id2;
ALTER TABLE
```
The last 2 alter table commands are manually typed (not from /tmp/t1.sql).
Look at yb-tserver log:
```
I0228 02:52:11.464120 4124142 tablet_server.cc:1297] vlog2: db_oid 13515 message queue size: 2
I0228 02:52:12.469295 4124142 tablet_server.cc:1297] vlog2: db_oid 13515 message queue size: 7
I0228 02:52:13.474404 4124142 tablet_server.cc:1297] vlog2: db_oid 13515 message queue size: 11
I0228 02:52:14.480101 4124142 tablet_server.cc:1297] vlog2: db_oid 13515 message queue size: 15
I0228 02:52:15.485495 4124142 tablet_server.cc:1297] vlog2: db_oid 13515 message queue size: 20
I0228 02:52:49.621912 4124142 tablet_server.cc:1297] vlog2: db_oid 13515 message queue size: 16
I0228 02:52:52.636390 4124142 tablet_server.cc:1297] vlog2: db_oid 13515 message queue size: 16
```
We can see that about 5 back-to-back alter DDLs were executed per heartbeat interval.
When manually execute one by one, the message queue size remained at 16 (before pop_front is called).
So the message queue max size of 15 is checked and respected.

Reviewers: kfranz, sanketh, mihnea

Reviewed By: kfranz

Subscribers: yql

Differential Revision: https://phorge.dev.yugabyte.com/D42226
  • Loading branch information
myang2021 committed Mar 4, 2025
1 parent 6222c84 commit 6380954
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/yb/master/sys_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ DEFINE_RUNTIME_uint64(delete_systable_rows_batch_bytes, 500_KB,
DEFINE_test_flag(int32, sys_catalog_write_rejection_percentage, 0,
"Reject specified percentage of sys catalog writes.");

DEFINE_test_flag(double, simulate_catalog_message_read_failure, 0.0,
"Inject random failure of pg_yb_invalidation_messages read from sys_catalog.");

namespace yb {
namespace master {

Expand Down Expand Up @@ -1688,6 +1691,10 @@ Result<uint32_t> SysCatalogTable::ReadPgYbTablegroupOid(const uint32_t database_

Result<DbOidVersionToMessageListMap>
SysCatalogTable::ReadYsqlCatalogInvalationMessages() {
if (RandomActWithProbability(FLAGS_TEST_simulate_catalog_message_read_failure)) {
return STATUS(InternalError, "Injected pg_yb_invalidation_messages read failure for testing.");
}

TRACE_EVENT0("master", "ReadYsqlCatalogInvalationMessages");

auto read_data = VERIFY_RESULT(TableReadData(kTemplate1Oid, kPgYbInvalidationMessagesTableOid,
Expand Down
12 changes: 12 additions & 0 deletions src/yb/tserver/heartbeater.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,18 @@ Status Heartbeater::Thread::TryHeartbeat() {
<< last_hb_response_.db_catalog_inval_messages_data().ShortDebugString();
}
server_->SetYsqlDBCatalogVersions(last_hb_response_.db_catalog_version_data());
if (FLAGS_TEST_yb_enable_invalidation_messages) {
if (last_hb_response_.has_db_catalog_inval_messages_data()) {
server_->SetYsqlDBCatalogInvalMessages(
last_hb_response_.db_catalog_inval_messages_data());
} else {
// If we only have catalog versions but not invalidation messages, it means that the last
// heartbeat response was only able to read pg_yb_catalog_version, but the reading of
// pg_yb_invalidation_messages failed. Clear the fingerprint so that next heartbeat
// can read pg_yb_invalidation_messages again.
server_->ResetCatalogVersionsFingerprint();
}
}
} else {
// The master does not pass back any catalog versions. This can happen in
// several cases:
Expand Down
106 changes: 106 additions & 0 deletions src/yb/tserver/tablet_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ DEFINE_RUNTIME_uint32(ysql_min_new_version_ignored_count, 10,

DECLARE_bool(TEST_enable_object_locking_for_table_locks);

DEFINE_RUNTIME_uint32(ysql_max_invalidation_message_queue_size, 1024,
"Maximum number of invalidation messages we keep for a given database.");

DECLARE_bool(enable_pg_cron);

DEFINE_test_flag(bool, enable_pg_client_mock, false, "Enable mocking of PgClient service in tests");
Expand Down Expand Up @@ -1131,6 +1134,9 @@ void TabletServer::SetYsqlDBCatalogVersions(
shared_object().SetYsqlCatalogVersion(new_version);
ysql_last_breaking_catalog_version_ = new_breaking_version;
}
// Create the entry of db_oid if not exists.
ysql_db_invalidation_messages_map_.insert(make_pair(
db_oid, std::deque<std::pair<uint64_t, std::optional<std::string>>>()));
}
}
if (!catalog_version_table_in_perdb_mode_.has_value() &&
Expand Down Expand Up @@ -1158,6 +1164,7 @@ void TabletServer::SetYsqlDBCatalogVersions(
// Mark the corresponding shared memory array db_catalog_versions_ slot as free.
(*ysql_db_catalog_version_index_used_)[shm_index] = false;
it = ysql_db_catalog_version_map_.erase(it);
ysql_db_invalidation_messages_map_.erase(db_oid);
// Also reset the shared memory array db_catalog_versions_ slot to 0 to assist
// debugging the shared memory array db_catalog_versions_ (e.g., when we can dump
// the shared memory file to examine its contents).
Expand Down Expand Up @@ -1193,6 +1200,105 @@ void TabletServer::SetYsqlDBCatalogVersions(
}
}

void TabletServer::ResetCatalogVersionsFingerprint() {
LOG(INFO) << "reset catalog_versions_fingerprint_";
catalog_versions_fingerprint_.store(std::nullopt, std::memory_order_release);
}

void TabletServer::SetYsqlDBCatalogInvalMessages(
const master::DBCatalogInvalMessagesDataPB& db_catalog_inval_messages_data) {
if (db_catalog_inval_messages_data.db_catalog_inval_messages_size() == 0) {
return;
}
std::lock_guard l(lock_);
uint32_t current_db_oid = 0;
InvalidationMessagesQueue *db_message_lists = nullptr;
InvalidationMessagesQueue::iterator it;
bool check_done = false;
// ysql_db_invalidation_messages_map_ is just an extended history of pg_yb_invalidation_messages
// except message_time column. Merge the incoming db_catalog_inval_messages_data from the
// heartbeat response which has an ordered array of (db_oid, current_version, messages) into
// ysql_db_invalidation_messages_map_.
for (const auto& db_inval_messages : db_catalog_inval_messages_data.db_catalog_inval_messages()) {
const uint32_t db_oid = db_inval_messages.db_oid();
const uint64_t current_version = db_inval_messages.current_version();
// db_catalog_inval_messages_data has the message history in a sorted order of
// (db_oid, current_version).
if (current_db_oid != db_oid) {
// We see a new db_oid, start processing its message history. Note that db_message_lists
// is for the DB that we have just completed processing. We may have appended more messages
// to its queue that exceeded the max size.
if (db_message_lists) {
VLOG(2) << "db_oid " << current_db_oid << " message queue size: "
<< db_message_lists->size();
while (db_message_lists->size() > FLAGS_ysql_max_invalidation_message_queue_size) {
db_message_lists->pop_front();
}
}

check_done = false;
current_db_oid = db_oid;
// Find the entry for the db_oid in ysql_db_invalidation_messages_map_ if there is one.
auto it2 = ysql_db_invalidation_messages_map_.find(db_oid);
if (it2 != ysql_db_invalidation_messages_map_.end()) {
// The db_oid isn't new and it is already exists in ysql_db_invalidation_messages_map_.
db_message_lists = &it2->second;
// The messages are in ascending order of current_version. We may have a picture like
// Existing: x, x+1, x+2, ..., x+i, x+i+1, x+i+2
// Incoming: x+i, x+i+1, x+i+2, x+i+3, ... x+i+j
// Find the position of x+i.
it = std::find_if(
db_message_lists->begin(), db_message_lists->end(),
[current_version](const std::pair<uint64_t, std::optional<std::string>>& p) {
return p.first == current_version;
});
} else {
// The db_oid does not exist in ysql_db_invalidation_messages_map_ yet. This is possible
// because at master side we read pg_yb_catalog_version and pg_yb_invalidation_messages
// separately (not a transactional read). As a result, a newly created database may not
// have been entered into pg_yb_catalog_version yet at the time when master reading
// pg_yb_catalog_version but by the time master reads pg_yb_invalidation_messages
// the new database is already inserted there. This case should be rare so we
// skip processing this db_oid.
db_message_lists = nullptr;
LOG(WARNING) << "db_oid " << db_oid << " not found in ysql_db_invalidation_messages_map_";
}
}
if (!db_message_lists) {
continue;
}
// We should see the suffix of db_message_lists matches the prefix of incoming one.
const std::optional<std::string>& message_list = db_inval_messages.has_message_list() ?
std::optional<std::string>(db_inval_messages.message_list()) : std::nullopt;
if (!check_done && it != db_message_lists->end()) {
if (it->first == current_version) {
VLOG(2) << "current_version matched: " << current_version;
} else {
LOG(DFATAL) << "current_version mismatch";
}
// same version should have same message.
if (it->second != message_list) {
LOG(DFATAL) << "message_list mismatch";
}
// Keep moving until we pass the common section x+i, x+i+1, x+i+2 as shown in the
// above example.
++it;
} else {
// Append the new message starting from x+i+3 to the end of the queue of db_oid.
db_message_lists->emplace_back(std::make_pair(current_version, message_list));
check_done = true;
}
}
// Note that db_message_lists is for the last DB that we have just completed processing.
// We may have appended more messages to its queue that exceeded the max size.
if (db_message_lists) {
VLOG(2) << "db_oid " << current_db_oid << " message queue size: " << db_message_lists->size();
while (db_message_lists->size() > FLAGS_ysql_max_invalidation_message_queue_size) {
db_message_lists->pop_front();
}
}
}

void TabletServer::WriteServerMetaCacheAsJson(JsonWriter* writer) {
writer->StartObject();

Expand Down
17 changes: 17 additions & 0 deletions src/yb/tserver/tablet_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ class TabletServer : public DbServerBase, public TabletServerIf {
void SetYsqlCatalogVersion(uint64_t new_version, uint64_t new_breaking_version) EXCLUDES(lock_);
void SetYsqlDBCatalogVersions(const master::DBCatalogVersionDataPB& db_catalog_version_data)
EXCLUDES(lock_);
void SetYsqlDBCatalogInvalMessages(
const master::DBCatalogInvalMessagesDataPB& db_catalog_inval_messages_data)
EXCLUDES(lock_);
void ResetCatalogVersionsFingerprint() EXCLUDES(lock_);

void get_ysql_catalog_version(uint64_t* current_version,
uint64_t* last_breaking_version) const EXCLUDES(lock_) override {
Expand Down Expand Up @@ -462,6 +466,19 @@ class TabletServer : public DbServerBase, public TabletServerIf {
uint64_t ysql_catalog_version_ = 0;
uint64_t ysql_last_breaking_catalog_version_ = 0;
tserver::DbOidToCatalogVersionInfoMap ysql_db_catalog_version_map_ GUARDED_BY(lock_);

// This map represents an extended history of pg_yb_invalidation_messages except message_time
// (i.e., db_oid, current_version, inval messages). For each db_oid, it stores a queue of
// (current_version, inval messages) pairs. If the message value is nullopt, it means a SQL
// null value. If it is empty string, it means there is no invalidation messages associated
// with this (db_oid, current_version). A PG backend needs to do a catalog cache refresh on
// a SQL null value, but treats an empty string as a noop because an empty string represents
// that there is no invalidation message. There is nothing in the PG catalog cache that can
// be invalidated by an empty string.
using InvalidationMessagesQueue = std::deque<std::pair<uint64_t, std::optional<std::string>>>;
using DbOidToInvalidationMessagesMap = std::unordered_map<uint32_t, InvalidationMessagesQueue>;
DbOidToInvalidationMessagesMap ysql_db_invalidation_messages_map_ GUARDED_BY(lock_);

// See same variable comments in CatalogManager.
std::optional<bool> catalog_version_table_in_perdb_mode_{std::nullopt};

Expand Down

0 comments on commit 6380954

Please sign in to comment.