diff --git a/src/yb/master/sys_catalog.cc b/src/yb/master/sys_catalog.cc index 48c1581b22f..a8677460d80 100644 --- a/src/yb/master/sys_catalog.cc +++ b/src/yb/master/sys_catalog.cc @@ -158,6 +158,9 @@ DEFINE_RUNTIME_uint64(delete_systable_rows_batch_bytes, 500_KB, DEFINE_test_flag(int32, sys_catalog_write_rejection_percentage, 0, "Reject specified percentage of sys catalog writes."); +DEFINE_test_flag(double, simulate_catalog_message_read_failure, 0.0, + "Inject random failure of pg_yb_invalidation_messages read from sys_catalog."); + namespace yb { namespace master { @@ -1688,6 +1691,10 @@ Result SysCatalogTable::ReadPgYbTablegroupOid(const uint32_t database_ Result SysCatalogTable::ReadYsqlCatalogInvalationMessages() { + if (RandomActWithProbability(FLAGS_TEST_simulate_catalog_message_read_failure)) { + return STATUS(InternalError, "Injected pg_yb_invalidation_messages read failure for testing."); + } + TRACE_EVENT0("master", "ReadYsqlCatalogInvalationMessages"); auto read_data = VERIFY_RESULT(TableReadData(kTemplate1Oid, kPgYbInvalidationMessagesTableOid, diff --git a/src/yb/tserver/heartbeater.cc b/src/yb/tserver/heartbeater.cc index 5fd0ec30cf7..fd4d7ccd908 100644 --- a/src/yb/tserver/heartbeater.cc +++ b/src/yb/tserver/heartbeater.cc @@ -564,6 +564,18 @@ Status Heartbeater::Thread::TryHeartbeat() { << last_hb_response_.db_catalog_inval_messages_data().ShortDebugString(); } server_->SetYsqlDBCatalogVersions(last_hb_response_.db_catalog_version_data()); + if (FLAGS_TEST_yb_enable_invalidation_messages) { + if (last_hb_response_.has_db_catalog_inval_messages_data()) { + server_->SetYsqlDBCatalogInvalMessages( + last_hb_response_.db_catalog_inval_messages_data()); + } else { + // If we only have catalog versions but not invalidation messages, it means that the last + // heartbeat response was only able to read pg_yb_catalog_version, but the reading of + // pg_yb_invalidation_messages failed. Clear the fingerprint so that next heartbeat + // can read pg_yb_invalidation_messages again. + server_->ResetCatalogVersionsFingerprint(); + } + } } else { // The master does not pass back any catalog versions. This can happen in // several cases: diff --git a/src/yb/tserver/tablet_server.cc b/src/yb/tserver/tablet_server.cc index f4ba9085408..c41f8050e9a 100644 --- a/src/yb/tserver/tablet_server.cc +++ b/src/yb/tserver/tablet_server.cc @@ -237,6 +237,9 @@ DEFINE_RUNTIME_uint32(ysql_min_new_version_ignored_count, 10, DECLARE_bool(TEST_enable_object_locking_for_table_locks); +DEFINE_RUNTIME_uint32(ysql_max_invalidation_message_queue_size, 1024, + "Maximum number of invalidation messages we keep for a given database."); + DECLARE_bool(enable_pg_cron); DEFINE_test_flag(bool, enable_pg_client_mock, false, "Enable mocking of PgClient service in tests"); @@ -1131,6 +1134,9 @@ void TabletServer::SetYsqlDBCatalogVersions( shared_object().SetYsqlCatalogVersion(new_version); ysql_last_breaking_catalog_version_ = new_breaking_version; } + // Create the entry of db_oid if not exists. + ysql_db_invalidation_messages_map_.insert(make_pair( + db_oid, std::deque>>())); } } if (!catalog_version_table_in_perdb_mode_.has_value() && @@ -1158,6 +1164,7 @@ void TabletServer::SetYsqlDBCatalogVersions( // Mark the corresponding shared memory array db_catalog_versions_ slot as free. (*ysql_db_catalog_version_index_used_)[shm_index] = false; it = ysql_db_catalog_version_map_.erase(it); + ysql_db_invalidation_messages_map_.erase(db_oid); // Also reset the shared memory array db_catalog_versions_ slot to 0 to assist // debugging the shared memory array db_catalog_versions_ (e.g., when we can dump // the shared memory file to examine its contents). @@ -1193,6 +1200,105 @@ void TabletServer::SetYsqlDBCatalogVersions( } } +void TabletServer::ResetCatalogVersionsFingerprint() { + LOG(INFO) << "reset catalog_versions_fingerprint_"; + catalog_versions_fingerprint_.store(std::nullopt, std::memory_order_release); +} + +void TabletServer::SetYsqlDBCatalogInvalMessages( + const master::DBCatalogInvalMessagesDataPB& db_catalog_inval_messages_data) { + if (db_catalog_inval_messages_data.db_catalog_inval_messages_size() == 0) { + return; + } + std::lock_guard l(lock_); + uint32_t current_db_oid = 0; + InvalidationMessagesQueue *db_message_lists = nullptr; + InvalidationMessagesQueue::iterator it; + bool check_done = false; + // ysql_db_invalidation_messages_map_ is just an extended history of pg_yb_invalidation_messages + // except message_time column. Merge the incoming db_catalog_inval_messages_data from the + // heartbeat response which has an ordered array of (db_oid, current_version, messages) into + // ysql_db_invalidation_messages_map_. + for (const auto& db_inval_messages : db_catalog_inval_messages_data.db_catalog_inval_messages()) { + const uint32_t db_oid = db_inval_messages.db_oid(); + const uint64_t current_version = db_inval_messages.current_version(); + // db_catalog_inval_messages_data has the message history in a sorted order of + // (db_oid, current_version). + if (current_db_oid != db_oid) { + // We see a new db_oid, start processing its message history. Note that db_message_lists + // is for the DB that we have just completed processing. We may have appended more messages + // to its queue that exceeded the max size. + if (db_message_lists) { + VLOG(2) << "db_oid " << current_db_oid << " message queue size: " + << db_message_lists->size(); + while (db_message_lists->size() > FLAGS_ysql_max_invalidation_message_queue_size) { + db_message_lists->pop_front(); + } + } + + check_done = false; + current_db_oid = db_oid; + // Find the entry for the db_oid in ysql_db_invalidation_messages_map_ if there is one. + auto it2 = ysql_db_invalidation_messages_map_.find(db_oid); + if (it2 != ysql_db_invalidation_messages_map_.end()) { + // The db_oid isn't new and it is already exists in ysql_db_invalidation_messages_map_. + db_message_lists = &it2->second; + // The messages are in ascending order of current_version. We may have a picture like + // Existing: x, x+1, x+2, ..., x+i, x+i+1, x+i+2 + // Incoming: x+i, x+i+1, x+i+2, x+i+3, ... x+i+j + // Find the position of x+i. + it = std::find_if( + db_message_lists->begin(), db_message_lists->end(), + [current_version](const std::pair>& p) { + return p.first == current_version; + }); + } else { + // The db_oid does not exist in ysql_db_invalidation_messages_map_ yet. This is possible + // because at master side we read pg_yb_catalog_version and pg_yb_invalidation_messages + // separately (not a transactional read). As a result, a newly created database may not + // have been entered into pg_yb_catalog_version yet at the time when master reading + // pg_yb_catalog_version but by the time master reads pg_yb_invalidation_messages + // the new database is already inserted there. This case should be rare so we + // skip processing this db_oid. + db_message_lists = nullptr; + LOG(WARNING) << "db_oid " << db_oid << " not found in ysql_db_invalidation_messages_map_"; + } + } + if (!db_message_lists) { + continue; + } + // We should see the suffix of db_message_lists matches the prefix of incoming one. + const std::optional& message_list = db_inval_messages.has_message_list() ? + std::optional(db_inval_messages.message_list()) : std::nullopt; + if (!check_done && it != db_message_lists->end()) { + if (it->first == current_version) { + VLOG(2) << "current_version matched: " << current_version; + } else { + LOG(DFATAL) << "current_version mismatch"; + } + // same version should have same message. + if (it->second != message_list) { + LOG(DFATAL) << "message_list mismatch"; + } + // Keep moving until we pass the common section x+i, x+i+1, x+i+2 as shown in the + // above example. + ++it; + } else { + // Append the new message starting from x+i+3 to the end of the queue of db_oid. + db_message_lists->emplace_back(std::make_pair(current_version, message_list)); + check_done = true; + } + } + // Note that db_message_lists is for the last DB that we have just completed processing. + // We may have appended more messages to its queue that exceeded the max size. + if (db_message_lists) { + VLOG(2) << "db_oid " << current_db_oid << " message queue size: " << db_message_lists->size(); + while (db_message_lists->size() > FLAGS_ysql_max_invalidation_message_queue_size) { + db_message_lists->pop_front(); + } + } +} + void TabletServer::WriteServerMetaCacheAsJson(JsonWriter* writer) { writer->StartObject(); diff --git a/src/yb/tserver/tablet_server.h b/src/yb/tserver/tablet_server.h index 29eb06638d6..6aa2ba592a4 100644 --- a/src/yb/tserver/tablet_server.h +++ b/src/yb/tserver/tablet_server.h @@ -235,6 +235,10 @@ class TabletServer : public DbServerBase, public TabletServerIf { void SetYsqlCatalogVersion(uint64_t new_version, uint64_t new_breaking_version) EXCLUDES(lock_); void SetYsqlDBCatalogVersions(const master::DBCatalogVersionDataPB& db_catalog_version_data) EXCLUDES(lock_); + void SetYsqlDBCatalogInvalMessages( + const master::DBCatalogInvalMessagesDataPB& db_catalog_inval_messages_data) + EXCLUDES(lock_); + void ResetCatalogVersionsFingerprint() EXCLUDES(lock_); void get_ysql_catalog_version(uint64_t* current_version, uint64_t* last_breaking_version) const EXCLUDES(lock_) override { @@ -462,6 +466,19 @@ class TabletServer : public DbServerBase, public TabletServerIf { uint64_t ysql_catalog_version_ = 0; uint64_t ysql_last_breaking_catalog_version_ = 0; tserver::DbOidToCatalogVersionInfoMap ysql_db_catalog_version_map_ GUARDED_BY(lock_); + + // This map represents an extended history of pg_yb_invalidation_messages except message_time + // (i.e., db_oid, current_version, inval messages). For each db_oid, it stores a queue of + // (current_version, inval messages) pairs. If the message value is nullopt, it means a SQL + // null value. If it is empty string, it means there is no invalidation messages associated + // with this (db_oid, current_version). A PG backend needs to do a catalog cache refresh on + // a SQL null value, but treats an empty string as a noop because an empty string represents + // that there is no invalidation message. There is nothing in the PG catalog cache that can + // be invalidated by an empty string. + using InvalidationMessagesQueue = std::deque>>; + using DbOidToInvalidationMessagesMap = std::unordered_map; + DbOidToInvalidationMessagesMap ysql_db_invalidation_messages_map_ GUARDED_BY(lock_); + // See same variable comments in CatalogManager. std::optional catalog_version_table_in_perdb_mode_{std::nullopt};