Skip to content

Commit

Permalink
[#26093] xClusterDDLRepl: Add in-memory safeguard on target for repea…
Browse files Browse the repository at this point in the history
…ted failed DDLs

Summary:
To prevent cases of the ddl_queue handler repeatedly failing the same DDL on the target, this diff
adds a guardrail to pause replication in case we fail the same DDL more than
xcluster_ddl_queue_max_retries_per_ddl times (default 5).

This state is kept in memory in the ddl_queue poller, and upon reaching the limit, will stop that
poller from processing any more DDLs. Once the DDL is manually fixed, replication can be resumed by
recreating the ddl_queue poller (eg, leader stepdown of ddl_queue, pause+resume of replication
group).
Jira: DB-15425

Test Plan:
```
ybd --cxx-test xcluster_ddl_replication-test --gtest_filter "XClusterDDLReplicationTest.PauseTargetOnRepeatedFailures"
```

Reviewers: xCluster, hsunder, slingam

Reviewed By: slingam

Subscribers: ybase, yql, hsunder

Differential Revision: https://phorge.dev.yugabyte.com/D42244
  • Loading branch information
hulien22 committed Mar 4, 2025
1 parent 1fb63d5 commit 1713a7e
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 11 deletions.
41 changes: 41 additions & 0 deletions src/yb/integration-tests/xcluster/xcluster_ddl_replication-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "yb/util/tsan_util.h"

DECLARE_uint32(xcluster_consistent_wal_safe_time_frequency_ms);
DECLARE_int32(xcluster_ddl_queue_max_retries_per_ddl);

DECLARE_bool(TEST_xcluster_ddl_queue_handler_fail_at_end);
DECLARE_bool(TEST_xcluster_ddl_queue_handler_fail_at_start);
Expand Down Expand Up @@ -489,6 +490,44 @@ TEST_F(XClusterDDLReplicationTest, DDLsWithinTransaction) {
InsertRowsIntoProducerTableAndVerifyConsumer(producer_table->name());
}

TEST_F(XClusterDDLReplicationTest, PauseTargetOnRepeatedFailures) {
ASSERT_OK(SetUpClusters());
ASSERT_OK(CheckpointReplicationGroup());
ASSERT_OK(CreateReplicationFromCheckpoint());

auto p_conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(namespace_name));
ASSERT_OK(p_conn.Execute("CREATE TABLE test_table_1 (key int PRIMARY KEY);"));
ASSERT_OK(WaitForSafeTimeToAdvanceToNow());

// Cause the target to fail the DDLs.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_xcluster_ddl_queue_handler_fail_ddl) = true;

const auto alter_query = "ALTER TABLE test_table_1 RENAME TO test_table_2;";
ASSERT_OK(p_conn.Execute(alter_query));

// Replication should not continue. Wait till we see replication errors.
ASSERT_OK(
StringWaiterLogSink("DDL replication is paused due to repeated failures").WaitFor(kTimeout));

// Stop failing DDLs.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_xcluster_ddl_queue_handler_fail_ddl) = false;

// Replication should not resume until we recreate the ddl_queue poller.
ASSERT_NOK(WaitForSafeTimeToAdvanceToNow());

// Resume replication by pausing and unpausing, this will recreate the pollers.
ASSERT_OK(ToggleUniverseReplication(
consumer_cluster(), consumer_client(), kReplicationGroupId, false /* is_enabled */));
SleepFor(MonoDelta::FromSeconds(1));
ASSERT_OK(ToggleUniverseReplication(
consumer_cluster(), consumer_client(), kReplicationGroupId, true /* is_enabled */));

// Replication should resume and rows should replicate.
auto producer_table = ASSERT_RESULT(GetProducerTable(ASSERT_RESULT(
GetYsqlTable(&producer_cluster_, namespace_name, /*schema_name*/ "", "test_table_2"))));
InsertRowsIntoProducerTableAndVerifyConsumer(producer_table->name());
}

TEST_F(XClusterDDLReplicationTest, DuplicateTableNames) {
// Test that when there are multiple tables with the same name, we are able to correctly link the
// target tables to the correct source tables.
Expand Down Expand Up @@ -826,6 +865,8 @@ TEST_F(XClusterDDLReplicationTest, CreateColocatedTableWithTargetFailures) {

// Allow DDLs through but fail them at the end of execution.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_xcluster_ddl_queue_handler_fail_ddl) = true;
// Bump up the number of retries to ensure that we don't hit the limit.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_xcluster_ddl_queue_max_retries_per_ddl) = 1000;

auto producer_conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(namespace_name));
ASSERT_OK(producer_conn.ExecuteFormat("CREATE TABLE $0 (key int primary key)", kNewTableName));
Expand Down
3 changes: 1 addition & 2 deletions src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,7 @@ struct PersistentTableInfo : public Persistent<SysTablesEntryPB> {
bool IsXClusterDDLReplicationReplicatedDDLsTable() const;

bool IsXClusterDDLReplicationTable() const {
return IsXClusterDDLReplicationDDLQueueTable() ||
IsXClusterDDLReplicationReplicatedDDLsTable();
return IsXClusterDDLReplicationDDLQueueTable() || IsXClusterDDLReplicationReplicatedDDLsTable();
}

Result<uint32_t> GetPgTableOid(const std::string& id) const;
Expand Down
2 changes: 2 additions & 0 deletions src/yb/tserver/xcluster_ddl_queue_handler-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class XClusterDDLQueueHandlerMocked : public XClusterDDLQueueHandler {
return rows_;
}

Status CheckForFailedQuery() override { return Status::OK(); };

Status ProcessDDLQuery(const DDLQueryInfo& query_info) override { return Status::OK(); }

Result<bool> CheckIfAlreadyProcessed(const DDLQueryInfo& query_info) override { return false; };
Expand Down
56 changes: 47 additions & 9 deletions src/yb/tserver/xcluster_ddl_queue_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
#include "yb/util/scope_exit.h"
#include "yb/yql/pgwrapper/libpq_utils.h"

DEFINE_RUNTIME_int32(xcluster_ddl_queue_max_retries_per_ddl, 5,
"Maximum number of retries per DDL before we pause processing of the ddl_queue table.");

DEFINE_test_flag(bool, xcluster_ddl_queue_handler_cache_connection, true,
"Whether we should cache the ddl_queue handler's connection, or always recreate it.");

Expand Down Expand Up @@ -246,6 +249,7 @@ Status XClusterDDLQueueHandler::ProcessDDLQueueTable(const XClusterOutputClientR
safe_time_ht, target_safe_ht, TryAgain, "Waiting for other pollers to catch up to safe time");

RETURN_NOT_OK(InitPGConnection());
RETURN_NOT_OK(CheckForFailedQuery());

// TODO(#20928): Make these calls async.
auto rows = VERIFY_RESULT(GetRowsToProcess(target_safe_ht));
Expand Down Expand Up @@ -394,14 +398,47 @@ Status XClusterDDLQueueHandler::ProcessDDLQuery(const DDLQueryInfo& query_info)
}

RETURN_NOT_OK(RunAndLogQuery(setup_query.str()));
RETURN_NOT_OK(RunAndLogQuery(query_info.query));
RETURN_NOT_OK(ProcessFailedDDLQuery(RunAndLogQuery(query_info.query), query_info));
RETURN_NOT_OK(
// The SELECT here can't be last; otherwise, RunAndLogQuery complains that rows are returned.
RunAndLogQuery("SELECT pg_catalog.yb_xcluster_set_next_oid_assignments('{}');"
"SET yb_skip_data_insert_for_table_rewrite=false;"));
return Status::OK();
}

Status XClusterDDLQueueHandler::ProcessFailedDDLQuery(
const Status& s, const DDLQueryInfo& query_info) {
if (s.ok()) {
num_fails_for_this_ddl_ = 0;
last_failed_query_.reset();
return Status::OK();
}

DCHECK(!last_failed_query_ || last_failed_query_->MatchesQueryInfo(query_info));
if (last_failed_query_ && last_failed_query_->MatchesQueryInfo(query_info)) {
num_fails_for_this_ddl_++;
if (num_fails_for_this_ddl_ >= FLAGS_xcluster_ddl_queue_max_retries_per_ddl) {
LOG_WITH_PREFIX(ERROR) << "Failed to process DDL after " << num_fails_for_this_ddl_
<< " retries. Pausing DDL replication.";
}
} else {
last_failed_query_ = QueryIdentifier{query_info.ddl_end_time, query_info.query_id};
num_fails_for_this_ddl_ = 1;
}

last_failed_status_ = s;
return s;
}

Status XClusterDDLQueueHandler::CheckForFailedQuery() {
if (num_fails_for_this_ddl_ >= FLAGS_xcluster_ddl_queue_max_retries_per_ddl) {
return last_failed_status_.CloneAndPrepend(
"DDL replication is paused due to repeated failures. Manual fix is required, followed by a "
"leader stepdown of the target's ddl_queue tablet. ");
}
return Status::OK();
}

Status XClusterDDLQueueHandler::ProcessManualExecutionQuery(const DDLQueryInfo& query_info) {
rapidjson::Document doc;
doc.SetObject();
Expand All @@ -411,8 +448,8 @@ Status XClusterDDLQueueHandler::ProcessManualExecutionQuery(const DDLQueryInfo&
doc.AddMember(rapidjson::StringRef(kDDLJsonManualReplication), true, doc.GetAllocator());

RETURN_NOT_OK(RunAndLogQuery(Format(
"EXECUTE $0($1, $2, '$3')", kDDLPrepStmtManualInsert, query_info.ddl_end_time,
query_info.query_id, common::WriteRapidJsonToString(doc))));
"EXECUTE $0($1, $2, $4$3$4)", kDDLPrepStmtManualInsert, query_info.ddl_end_time,
query_info.query_id, common::WriteRapidJsonToString(doc), "$manual_query$")));
return Status::OK();
}

Expand All @@ -432,8 +469,8 @@ Status XClusterDDLQueueHandler::InitPGConnection() {
if (pg_conn_ && FLAGS_TEST_xcluster_ddl_queue_handler_cache_connection) {
return Status::OK();
}
auto se = ScopeExit([this] { pg_conn_.reset(); });
// Create pg connection if it doesn't exist.
// TODO(#20693) Create prepared statements as part of opening the connection.
CoarseTimePoint deadline = CoarseMonoClock::Now() + local_client_->default_rpc_timeout();
pg_conn_ = std::make_unique<pgwrapper::PGConn>(
VERIFY_RESULT(connect_to_pg_func_(namespace_name_, deadline)));
Expand All @@ -445,15 +482,16 @@ Status XClusterDDLQueueHandler::InitPGConnection() {
query << "SET yb_non_ddl_txn_for_sys_tables_allowed = 1;";
// Prepare replicated_ddls insert for manually replicated ddls.
query << "PREPARE " << kDDLPrepStmtManualInsert << "(bigint, bigint, text) AS "
<< "INSERT INTO " << xcluster::kDDLQueuePgSchemaName << "."
<< xcluster::kDDLReplicatedTableName << " VALUES ($1, $2, $3::jsonb);";
<< "INSERT INTO " << kReplicatedDDLsFullTableName << " VALUES ($1, $2, $3::jsonb);";
// Prepare replicated_ddls select query.
query << "PREPARE " << kDDLPrepStmtAlreadyProcessed << "(bigint, bigint) AS "
<< "SELECT EXISTS(SELECT 1 FROM " << xcluster::kDDLQueuePgSchemaName << "."
<< xcluster::kDDLReplicatedTableName << " WHERE " << xcluster::kDDLQueueDDLEndTimeColumn
<< " = $1 AND " << xcluster::kDDLQueueQueryIdColumn << " = $2);";
<< "SELECT EXISTS(SELECT 1 FROM " << kReplicatedDDLsFullTableName << " WHERE "
<< xcluster::kDDLQueueDDLEndTimeColumn << " = $1 AND " << xcluster::kDDLQueueQueryIdColumn
<< " = $2);";

RETURN_NOT_OK(pg_conn_->Execute(query.str()));

se.Cancel();
return Status::OK();
}

Expand Down
18 changes: 18 additions & 0 deletions src/yb/tserver/xcluster_ddl_queue_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class XClusterDDLQueueHandler {

virtual Status ProcessDDLQuery(const DDLQueryInfo& query_info);

virtual Status ProcessFailedDDLQuery(const Status& s, const DDLQueryInfo& query_info);

virtual Result<bool> CheckIfAlreadyProcessed(const DDLQueryInfo& query_info);

Status ProcessManualExecutionQuery(const DDLQueryInfo& query_info);
Expand All @@ -75,6 +77,8 @@ class XClusterDDLQueueHandler {
virtual Result<std::vector<std::tuple<int64, int64, std::string>>> GetRowsToProcess(
const HybridTime& apply_safe_time);

virtual Status CheckForFailedQuery();

// Sets xcluster_context with the mapping of table name -> source table id.
Status ProcessNewRelations(
rapidjson::Document& doc, const std::string& schema,
Expand All @@ -92,6 +96,20 @@ class XClusterDDLQueueHandler {
TserverXClusterContextIf& xcluster_context_;
ConnectToPostgresFunc connect_to_pg_func_;

struct QueryIdentifier {
int64 ddl_end_time;
int64 query_id;

bool MatchesQueryInfo(const DDLQueryInfo& query_info) const {
return ddl_end_time == query_info.ddl_end_time && query_id == query_info.query_id;
}
};

// Keep track of how many times we've repeatedly failed a DDL.
int num_fails_for_this_ddl_ = 0;
std::optional<QueryIdentifier> last_failed_query_;
Status last_failed_status_;

// Whether we have applied new rows to the ddl_queue table since the last apply_safe_time update.
// If false we can skip processing new DDLs.
// Set to true initially to handle any rows written but not processed from previous pollers.
Expand Down
6 changes: 6 additions & 0 deletions src/yb/tserver/xcluster_poller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,12 @@ void XClusterPoller::HandleApplyChangesResponse(XClusterOutputClientResponse res
poll_stats_history_.SetError(std::move(s));
}

// If we're paused or failed, then stop processing the DDL queue table.
if (is_paused_ || is_failed_) {
SchedulePoll();
return;
}

// If processing ddl_queue table fails, then retry just this part (don't repeat ApplyChanges).
ScheduleFuncWithDelay(
GetAtomicFlag(&FLAGS_xcluster_safe_time_update_interval_secs),
Expand Down

0 comments on commit 1713a7e

Please sign in to comment.