Skip to content

Commit

Permalink
[#25737] DocDB: Fix deadlock while creating index
Browse files Browse the repository at this point in the history
Summary:
The test uncovered the following kind of deadlock:
T1:
1) lock table for read
2) shared lock on catalog manager

T2 (create table):
1) unique lock on catalog manager
2) lock table (indexed) for write and then commit

Step 1 happened for both threads, so T1 tries to acquire shared lock on catalog manager, but cannot do it since T2 holds unique lock on it. While T2 tries to acquire commit lock on table, but fails since readers never become 0.

Fixed this deadlock.

Also added conflict detection for cases where we try to acquire the catalog manager mutex while an object read lock is already held (which can result in deadlocks; see below).
When acquiring a ReadLock on an RwcLock object (e.g. TableInfo / TabletInfo), we now increment a thread-local counter which is checked when we try to shared lock the catalog manager mutex.
This detection could be enabled in rwc_lock.h using RWC_LOCK_TRACK_EXTERNAL_DEADLOCK define.

So fixed all other places found by our tests.
Jira: DB-15019

Test Plan: CppCassandraDriverTest.TestBackfillBatchingEnabled

Reviewers: zdrudi, asrivastava, xCluster, hsunder

Reviewed By: asrivastava, hsunder

Subscribers: esheng, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D41399
  • Loading branch information
spolitov committed Jan 26, 2025
1 parent 6281c17 commit b4bb966
Show file tree
Hide file tree
Showing 16 changed files with 405 additions and 284 deletions.
6 changes: 4 additions & 2 deletions src/yb/integration-tests/cassandra_cpp_driver-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2044,8 +2044,10 @@ void CppCassandraDriverTestSlowTServer::TestBackfillBatching(bool batching_enabl
num_deferred_indexes += defer;
}

for (auto& future : futures) {
ASSERT_OK(future.Wait());
auto deadline = CoarseMonoClock::Now() + 20s * kTimeMultiplier;
for (int i = 0; i < kNumIndexes; ++i) {
SCOPED_TRACE(Format("Index: $0", i));
ASSERT_OK(futures[i].WaitFor(deadline - CoarseMonoClock::Now()));
}

for (const auto& index_name : indexes) {
Expand Down
526 changes: 288 additions & 238 deletions src/yb/master/catalog_manager.cc

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,7 @@ class CatalogManager : public CatalogManagerIf, public SnapshotCoordinatorContex
const TableInfoPtr& table_info, bool succeed_if_create_in_progress);

Result<std::string> GetPgSchemaName(
const TableId& table_id, const PersistentTableInfo& table_info) REQUIRES_SHARED(mutex_);
const TableId& table_id, const PersistentTableInfo& table_info);

Result<std::unordered_map<std::string, uint32_t>> GetPgAttNameTypidMap(
const TableId& table_id, const PersistentTableInfo& table_info);
Expand Down
4 changes: 2 additions & 2 deletions src/yb/master/clone/clone_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,10 @@ Status CloneStateManager::StartTabletsCloning(
}

// Export snapshot info.
HybridTime restore_ht(clone_state->LockForRead()->pb.restore_time());
auto [snapshot_info, not_snapshotted_tablets] = VERIFY_RESULT(
external_funcs_->GenerateSnapshotInfoFromScheduleForClone(
snapshot_schedule_id, HybridTime(clone_state->LockForRead()->pb.restore_time()),
deadline));
snapshot_schedule_id, restore_ht, deadline));
auto source_snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(snapshot_info.id()));
VLOG(2) << Format(
"The generated SnapshotInfoPB as of time: $0, snapshot_info: $1 ",
Expand Down
17 changes: 8 additions & 9 deletions src/yb/master/master-path-handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1174,15 +1174,16 @@ void MasterPathHandlers::HandleAllTables(
}

for (const auto& table : tables) {
string keyspace = master_->catalog_manager()->GetNamespaceName(table->namespace_id());
TableType table_cat = GetTableType(*table);

auto table_locked = table->LockForRead();
if (!table_locked->is_running()) {
continue;
}

string table_uuid = table->id();
string keyspace = master_->catalog_manager()->GetNamespaceName(table->namespace_id());

TableType table_cat = GetTableType(*table);
// Skip non-user tables if we should.
if (only_user_tables && table_cat != kUserIndex && table_cat != kUserTable) {
continue;
Expand Down Expand Up @@ -1368,13 +1369,13 @@ void MasterPathHandlers::HandleAllTablesJSON(
}

for (const auto& table : tables) {
string keyspace = master_->catalog_manager()->GetNamespaceName(table->namespace_id());
auto table_locked = table->LockForRead();
if (!table_locked->is_running()) {
continue;
}

string table_uuid = table->id();
string keyspace = master_->catalog_manager()->GetNamespaceName(table->namespace_id());

TableType table_cat = GetTableType(*table);
if (only_user_tables && table_cat != kUserIndex && table_cat != kUserTable) {
Expand Down Expand Up @@ -1694,8 +1695,8 @@ void MasterPathHandlers::HandleTablePage(const Webserver::WebRequest& req,
TableName table_name;
TabletInfos tablets;
{
auto l = table->LockForRead();
keyspace_name = master_->catalog_manager()->GetNamespaceName(table->namespace_id());
auto l = table->LockForRead();
table_name = l->name();
*output << "<h1>Table: "
<< EscapeForHtmlToString(server::TableLongName(keyspace_name, table_name))
Expand Down Expand Up @@ -2003,10 +2004,9 @@ void MasterPathHandlers::HandleTablePageJSON(const Webserver::WebRequest& req,
dockv::PartitionSchema partition_schema;
TabletInfos tablets;
{
NamespaceName keyspace_name;
auto keyspace_name = master_->catalog_manager()->GetNamespaceName(table->namespace_id());
TableName table_name;
auto l = table->LockForRead();
keyspace_name = master_->catalog_manager()->GetNamespaceName(table->namespace_id());
table_name = l->name();
jw.String("table_name");
jw.String(server::TableLongName(keyspace_name, table_name));
Expand Down Expand Up @@ -3510,14 +3510,13 @@ void MasterPathHandlers::RenderLoadBalancerViewPanel(

// Table rows.
for (const auto& table : tables) {
auto keyspace = master_->catalog_manager()->GetNamespaceName(table->namespace_id());
auto table_cat = GetTableType(*table);
auto table_locked = table->LockForRead();
if (!table_locked->is_running()) {
continue;
}

const string& keyspace = master_->catalog_manager()->GetNamespaceName(table->namespace_id());

const auto& table_cat = GetTableType(*table);
// Skip non-user tables if we should.
if (table_cat != kUserIndex && table_cat != kUserTable) {
continue;
Expand Down
4 changes: 1 addition & 3 deletions src/yb/master/master_cluster_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,8 @@ Status MasterClusterHandler::RemoveTabletServer(

Status MasterClusterHandler::GetLoadMoveCompletionPercent(
GetLoadMovePercentResponsePB* resp, bool blacklist_leader) {
auto l = catalog_manager_->ClusterConfig()->LockForRead();

// Fine to pass in empty defaults if server_blacklist or leader_blacklist is not filled.
const auto& state = GetBlacklist(l->pb, blacklist_leader);
auto state = GetBlacklist(catalog_manager_->ClusterConfig()->LockForRead()->pb, blacklist_leader);
int64_t blacklist_replicas = catalog_manager_->GetNumRelevantReplicas(state, blacklist_leader);
int64_t initial_load =
(blacklist_leader) ? state.initial_leader_load() : state.initial_replica_load();
Expand Down
14 changes: 9 additions & 5 deletions src/yb/master/master_snapshot_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2299,12 +2299,16 @@ void MasterSnapshotCoordinator::Impl::ScheduleOperation<TabletRestoreOperation>(
task->SetMetadata(tablet_info->table()->LockForRead()->pb);
// Populate metadata for colocated tables.
if (tablet_info->colocated()) {
auto lock = tablet_info->LockForRead();
if (lock->pb.hosted_tables_mapped_by_parent_id()) {
SetTaskMetadataForColocatedTable(tablet_info->GetTableIds(), task.get());
} else {
SetTaskMetadataForColocatedTable(lock->pb.table_ids(), task.get());
std::vector<TableId> table_ids;
{
auto lock = tablet_info->LockForRead();
if (lock->pb.hosted_tables_mapped_by_parent_id()) {
table_ids = tablet_info->GetTableIds();
} else {
table_ids.assign(lock->pb.table_ids().begin(), lock->pb.table_ids().end());
}
}
SetTaskMetadataForColocatedTable(table_ids, task.get());
}
}
// For sequences_data_table, we should set partial restore and db_oid.
Expand Down
4 changes: 3 additions & 1 deletion src/yb/master/xcluster/xcluster_bootstrap_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
#include "yb/cdc/xcluster_types.h"
#include "yb/master/leader_epoch.h"
#include "yb/master/master_fwd.h"
#include "yb/util/cow_object.h"
#include "yb/util/status_fwd.h"

namespace yb {

template<class State>
class CowWriteLock;

namespace rpc {
class RpcContext;
} // namespace rpc
Expand Down
4 changes: 3 additions & 1 deletion src/yb/master/xcluster/xcluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
#include "yb/cdc/cdc_types.h"
#include "yb/gutil/thread_annotations.h"
#include "yb/master/leader_epoch.h"
#include "yb/util/cow_object.h"
#include "yb/util/status_fwd.h"

namespace yb {

template<class State>
class CowWriteLock;

namespace master {

class SysCatalogTable;
Expand Down
7 changes: 4 additions & 3 deletions src/yb/master/xcluster/xcluster_outbound_replication_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,10 @@ Result<std::optional<NamespaceCheckpointInfo>>
XClusterOutboundReplicationGroup::GetNamespaceCheckpointInfo(
const NamespaceId& namespace_id,
const std::vector<std::pair<TableName, PgSchemaName>>& table_names) const {
auto all_tables = VERIFY_RESULT(helper_functions_.get_tables_func(
namespace_id, /*include_sequences_data=*/(
AutomaticDDLMode() && FLAGS_TEST_xcluster_enable_sequence_replication)));

SharedLock mutex_lock(mutex_);
auto l = VERIFY_RESULT(LockForRead());
const auto* namespace_info = VERIFY_RESULT(GetNamespaceInfo(namespace_id));
Expand All @@ -642,9 +646,6 @@ XClusterOutboundReplicationGroup::GetNamespaceCheckpointInfo(
NamespaceCheckpointInfo ns_info;
ns_info.initial_bootstrap_required = namespace_info->initial_bootstrap_required();

auto all_tables = VERIFY_RESULT(helper_functions_.get_tables_func(
namespace_id, /*include_sequences_data=*/(
AutomaticDDLMode() && FLAGS_TEST_xcluster_enable_sequence_replication)));
std::vector<TableDesignator> table_descriptors;

if (!table_names.empty()) {
Expand Down
5 changes: 3 additions & 2 deletions src/yb/master/xcluster/xcluster_replication_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,13 @@ std::optional<NamespaceId> GetProducerNamespaceIdInternal(
Result<bool> ShouldAddTableToReplicationGroup(
UniverseReplicationInfo& universe, const TableInfo& table_info,
CatalogManager& catalog_manager) {
const auto& table_pb = table_info.old_pb();

if (!IsTableEligibleForXClusterReplication(table_info)) {
return false;
}

auto table_lock = table_info.LockForRead();
const auto& table_pb = table_lock->pb;

auto l = universe.LockForRead();
const auto& universe_pb = l->pb;
if (l->is_deleted_or_failed()) {
Expand Down
5 changes: 3 additions & 2 deletions src/yb/master/xcluster/xcluster_target_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,9 @@ Result<XClusterInboundReplicationGroupStatus> XClusterTargetManager::GetUniverse
SCHECK_FORMAT(replication_info, NotFound, "Replication group $0 not found", replication_group_id);

const auto cluster_config = VERIFY_RESULT(catalog_manager_.GetClusterConfig());
auto l = replication_info->LockForRead();
return GetUniverseReplicationInfo(l->pb, cluster_config);
// Make pb copy to avoid potential deadlock while calling GetUniverseReplicationInfo.
auto pb = replication_info->LockForRead()->pb;
return GetUniverseReplicationInfo(pb, cluster_config);
}

Status XClusterTargetManager::ClearXClusterSourceTableId(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,22 +249,25 @@ Status AlterUniverseReplicationHelper::AddTablesToReplication(
// Only add new tables. Ignore tables that are currently being replicated.
std::unordered_set<std::string> new_tables(
add_table_data.source_table_ids_to_add.begin(), add_table_data.source_table_ids_to_add.end());
auto original_universe_l = universe->LockForRead();
auto& original_universe_pb = original_universe_l->pb;
XClusterSetupUniverseReplicationData setup_data;
{
auto original_universe_l = universe->LockForRead();
auto& original_universe_pb = original_universe_l->pb;

for (const auto& table_id : original_universe_pb.tables()) {
new_tables.erase(table_id);
for (const auto& table_id : original_universe_pb.tables()) {
new_tables.erase(table_id);
}
SCHECK(
!new_tables.empty(), InvalidArgument,
"xCluster ReplicationGroup already contains all requested tables",
add_table_data.ToString());

// 1. Create an ALTER table request that mirrors the original 'setup_replication'.
setup_data.replication_group_id = alter_replication_group_id;
setup_data.source_masters.CopyFrom(original_universe_pb.producer_master_addresses());
setup_data.transactional = original_universe_pb.transactional();
setup_data.automatic_ddl_mode = original_universe_pb.db_scoped_info().automatic_ddl_mode();
}
SCHECK(
!new_tables.empty(), InvalidArgument,
"xCluster ReplicationGroup already contains all requested tables", add_table_data.ToString());

// 1. Create an ALTER table request that mirrors the original 'setup_replication'.
XClusterSetupUniverseReplicationData setup_data;
setup_data.replication_group_id = alter_replication_group_id;
setup_data.source_masters.CopyFrom(original_universe_pb.producer_master_addresses());
setup_data.transactional = original_universe_pb.transactional();
setup_data.automatic_ddl_mode = original_universe_pb.db_scoped_info().automatic_ddl_mode();

if (add_table_data.HasSourceNamespaceToAdd()) {
setup_data.source_namespace_ids.push_back(add_table_data.source_namespace_to_add.id());
Expand Down
5 changes: 4 additions & 1 deletion src/yb/util/debug/lock_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@

#include "yb/util/debug/lock_debug.h"

#include "yb/util/logging.h"
#include "yb/util/debug-util.h"
#include "yb/util/logging.h"
#include "yb/util/rwc_lock.h"

using namespace std::literals;

Expand All @@ -28,6 +29,8 @@ thread_local NonRecursiveSharedLockBase* head = nullptr;

NonRecursiveSharedLockBase::NonRecursiveSharedLockBase(void* mutex)
: mutex_(mutex), next_(head) {
RWCLock::CheckNoReadLockConflict(mutex);

auto current = head;
while (current != nullptr) {
LOG_IF(DFATAL, current->mutex_ == mutex) << "Recursive shared lock";
Expand Down
37 changes: 37 additions & 0 deletions src/yb/util/rwc_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ TAG_FLAG(slow_rwc_lock_log_ms, advanced);

using namespace std::literals;

#define RWC_LOCK_COLLECT_READ_LOCK_STACK_TRACE 1

namespace yb {

namespace {
Expand All @@ -66,13 +68,28 @@ const size_t kWriteActive = 1ULL << 60;
const size_t kWritePending = kWriteActive << 1;
const size_t kReadersMask = kWriteActive - 1;

#if RWC_LOCK_TRACK_EXTERNAL_DEADLOCK
thread_local size_t rwc_read_lock_counter = 0;
std::atomic<void*> rwc_conflicting_mutex{nullptr};
#if RWC_LOCK_COLLECT_READ_LOCK_STACK_TRACE
thread_local StackTrace rwc_read_lock_first_stack_trace;
#endif
#endif

} // namespace

RWCLock::~RWCLock() {
CHECK_EQ(reader_counter_.load(), 0);
}

void RWCLock::ReadLock() {
#if RWC_LOCK_TRACK_EXTERNAL_DEADLOCK
if (++rwc_read_lock_counter == 1) {
#if RWC_LOCK_COLLECT_READ_LOCK_STACK_TRACE
rwc_read_lock_first_stack_trace.Collect();
#endif
}
#endif
for (;;) {
if (!(reader_counter_.fetch_add(1) & kWriteActive)) {
return;
Expand All @@ -91,7 +108,27 @@ void RWCLock::ReadLock() {
}
}

#if RWC_LOCK_TRACK_EXTERNAL_DEADLOCK
void RWCLock::CheckNoReadLockConflict(void* mutex) {
if (mutex == rwc_conflicting_mutex.load()) {
CHECK_EQ(rwc_read_lock_counter, 0)
#if RWC_LOCK_COLLECT_READ_LOCK_STACK_TRACE
<< rwc_read_lock_first_stack_trace.Symbolize();
#else
<< "";
#endif
}
}

void RWCLock::SetConflictingMutex(void* mutex) {
rwc_conflicting_mutex.store(mutex);
}
#endif

void RWCLock::ReadUnlock() {
#if RWC_LOCK_TRACK_EXTERNAL_DEADLOCK
--rwc_read_lock_counter;
#endif
if (reader_counter_.fetch_sub(1) - 1 == kWritePending) {
no_readers_.notify_one();
}
Expand Down
18 changes: 18 additions & 0 deletions src/yb/util/rwc_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@
#include "yb/util/mutex.h"
#include "yb/util/stack_trace.h"

// Enable mechanism to detect deadlocks with mutex that does not belong to RWCLock.
// For instance it could detect the following scenario:
// T1:
// acquire read lock on RWCLock
// acquire read lock on some external mutex
// T2:
// acquire write lock on the same external mutex
// acquire write and then commit lock on the same RWCLock
#define RWC_LOCK_TRACK_EXTERNAL_DEADLOCK 0

namespace yb {

// A read-write-commit lock.
Expand Down Expand Up @@ -91,6 +101,14 @@ class RWCLock {
RWCLock() = default;
~RWCLock();

#if RWC_LOCK_TRACK_EXTERNAL_DEADLOCK
static void CheckNoReadLockConflict(void* mutex);
static void SetConflictingMutex(void* mutex);
#else
static void CheckNoReadLockConflict(void* mutex) {}
static void SetConflictingMutex(void* mutex) {}
#endif

// Acquire the lock in read mode. Upon return, guarantees that:
// - Other threads may concurrently hold the lock for Read.
// - Either zero or one thread may hold the lock for Write.
Expand Down

0 comments on commit b4bb966

Please sign in to comment.