Skip to content

Commit

Permalink
[#25770] xClusterDDLRepl: Use source namespace id for xcluster_source…
Browse files Browse the repository at this point in the history
…_table_id

Summary:
Fix DDLQueueHandler to use the source namespace id for creating the xcluster_source_table_id.

Was incorrectly using the target namespace id before, which leads to creating an incorrect source table id (if the source and target have different database ids). Then when we tried to set up replication for the create table, we would be unable to find the table and continuously error out.
Jira: DB-15057

Test Plan: changed ddl replication tests to use `use_different_database_oids`, so running all ddl repl tests will verify this fix

Reviewers: xCluster, mlillibridge, hsunder

Reviewed By: hsunder

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D41503
  • Loading branch information
hulien22 committed Jan 29, 2025
1 parent 014ccb5 commit 99dc1ec
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 44 deletions.
40 changes: 21 additions & 19 deletions src/yb/integration-tests/xcluster/xcluster_ddl_replication-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,14 @@ TEST_F(XClusterDDLReplicationTest, CreateTable) {
// Create a table in a new schema.
const std::string kNewSchemaName = "new_schema";
ASSERT_OK(RunOnBothClusters([&](Cluster* cluster) -> Status {
auto conn = VERIFY_RESULT(cluster->Connect());
auto conn = VERIFY_RESULT(cluster->ConnectToDB(namespace_name));
// TODO(jhe) can remove this once create schema is replicated.
RETURN_NOT_OK(conn.Execute("SET yb_xcluster_ddl_replication.enable_manual_ddl_replication=1"));
RETURN_NOT_OK(conn.ExecuteFormat("CREATE SCHEMA $0", kNewSchemaName));
return Status::OK();
}));
{
auto conn = ASSERT_RESULT(producer_cluster_.Connect());
auto conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(namespace_name));
ASSERT_OK(conn.ExecuteFormat(
"CREATE TABLE $0.$1($2 int)", kNewSchemaName, producer_table_name.table_name(),
kKeyColumnName));
Expand All @@ -236,14 +236,14 @@ TEST_F(XClusterDDLReplicationTest, CreateTable) {
const std::string kNewUserName = "new_user";
const std::string producer_table_name_new_user_str = producer_table_name.table_name() + "newuser";
ASSERT_OK(RunOnBothClusters([&](Cluster* cluster) -> Status {
auto conn = VERIFY_RESULT(cluster->Connect());
auto conn = VERIFY_RESULT(cluster->ConnectToDB(namespace_name));
RETURN_NOT_OK(conn.ExecuteFormat("CREATE USER $0 WITH PASSWORD '123'", kNewUserName));
RETURN_NOT_OK(conn.Execute("SET yb_xcluster_ddl_replication.enable_manual_ddl_replication=1"));
RETURN_NOT_OK(conn.ExecuteFormat("GRANT CREATE ON SCHEMA public TO $0", kNewUserName));
return Status::OK();
}));
{
auto conn = ASSERT_RESULT(producer_cluster_.Connect());
auto conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(namespace_name));
ASSERT_OK(conn.ExecuteFormat("SET ROLE $0", kNewUserName));
ASSERT_OK(conn.ExecuteFormat(
"CREATE TABLE $0($1 int)", producer_table_name_new_user_str, kKeyColumnName));
Expand Down Expand Up @@ -274,6 +274,8 @@ TEST_F(XClusterDDLReplicationTest, BlockMultistatementQuery) {
args.push_back(producer_cluster_.pg_host_port_.host());
args.push_back("--port");
args.push_back(AsString(producer_cluster_.pg_host_port_.port()));
args.push_back("-d");
args.push_back(namespace_name);
args.push_back("-c");
args.push_back(query);

Expand Down Expand Up @@ -306,8 +308,8 @@ TEST_F(XClusterDDLReplicationTest, CreateIndex) {
const std::string kBaseTableName = "base_table";
const std::string kColumn2Name = "a";
const std::string kColumn3Name = "b";
auto p_conn = ASSERT_RESULT(producer_cluster_.Connect());
auto c_conn = ASSERT_RESULT(consumer_cluster_.Connect());
auto p_conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(namespace_name));
auto c_conn = ASSERT_RESULT(consumer_cluster_.ConnectToDB(namespace_name));

// Create a base table.
ASSERT_OK(p_conn.ExecuteFormat(
Expand Down Expand Up @@ -420,7 +422,7 @@ TEST_F(XClusterDDLReplicationTest, DuplicateTableNames) {
ASSERT_OK(InsertRowsInProducer(0, kNumRowsTable1, producer_table));

// Drop the table, it should move to HIDDEN state.
auto producer_conn = ASSERT_RESULT(producer_cluster_.Connect());
auto producer_conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(namespace_name));
ASSERT_OK(producer_conn.ExecuteFormat("DROP TABLE $0", producer_table_name.table_name()));

// Create a new table with the same name.
Expand Down Expand Up @@ -454,7 +456,7 @@ TEST_F(XClusterDDLReplicationTest, RepeatedCreateAndDropTable) {
ASSERT_OK(ToggleUniverseReplication(
consumer_cluster(), consumer_client(), kReplicationGroupId, false /* is_enabled */));

auto producer_conn = ASSERT_RESULT(producer_cluster_.Connect());
auto producer_conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(namespace_name));
for (int i = 0; i < kNumIterations; i++) {
ASSERT_OK(producer_conn.Execute("DROP TABLE IF EXISTS live_die_repeat"));
ASSERT_OK(producer_conn.Execute("CREATE TABLE live_die_repeat(a int)"));
Expand All @@ -468,7 +470,7 @@ TEST_F(XClusterDDLReplicationTest, RepeatedCreateAndDropTable) {
ASSERT_OK(WaitForSafeTimeToAdvanceToNow());

// Ensure table has the correct row at the end.
auto consumer_conn = ASSERT_RESULT(consumer_cluster_.Connect());
auto consumer_conn = ASSERT_RESULT(consumer_cluster_.ConnectToDB(namespace_name));
ASSERT_EQ(
ASSERT_RESULT(consumer_conn.FetchRow<int64_t>("SELECT COUNT(*) FROM live_die_repeat")), 1);
ASSERT_EQ(
Expand Down Expand Up @@ -497,7 +499,7 @@ TEST_F(XClusterDDLReplicationTest, AddRenamedTable) {

// Rename the table.
// TODO(#23951) remove manual flag once we support ALTERs.
auto producer_conn = ASSERT_RESULT(producer_cluster_.Connect());
auto producer_conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(namespace_name));
ASSERT_OK(producer_conn.ExecuteFormat(
"SET yb_xcluster_ddl_replication.enable_manual_ddl_replication=1"));
ASSERT_OK(producer_conn.ExecuteFormat(
Expand All @@ -515,7 +517,7 @@ TEST_F(XClusterDDLReplicationTest, AddRenamedTable) {

// TODO(#23951) need to wait for create and manually run the DDL on the target side for now.
ASSERT_OK(StringWaiterLogSink("Successfully processed entry").WaitFor(kTimeout));
auto consumer_conn = ASSERT_RESULT(consumer_cluster_.Connect());
auto consumer_conn = ASSERT_RESULT(consumer_cluster_.ConnectToDB(namespace_name));
ASSERT_OK(consumer_conn.ExecuteFormat(
"SET yb_xcluster_ddl_replication.enable_manual_ddl_replication=1"));
ASSERT_OK(consumer_conn.ExecuteFormat(
Expand Down Expand Up @@ -560,10 +562,10 @@ class XClusterDDLReplicationAddDropColumnTest : public XClusterDDLReplicationTes
ASSERT_OK(SetUpClusters());
ASSERT_OK(CheckpointReplicationGroup());
ASSERT_OK(CreateReplicationFromCheckpoint());
producer_conn_ =
std::make_unique<pgwrapper::PGConn>(ASSERT_RESULT(producer_cluster_.Connect()));
consumer_conn_ =
std::make_unique<pgwrapper::PGConn>(ASSERT_RESULT(consumer_cluster_.Connect()));
producer_conn_ = std::make_unique<pgwrapper::PGConn>(
ASSERT_RESULT(producer_cluster_.ConnectToDB(namespace_name)));
consumer_conn_ = std::make_unique<pgwrapper::PGConn>(
ASSERT_RESULT(consumer_cluster_.ConnectToDB(namespace_name)));
}

Status PerformStep(size_t step) {
Expand Down Expand Up @@ -726,10 +728,10 @@ class XClusterDDLReplicationTableRewriteTest : public XClusterDDLReplicationTest
ASSERT_OK(CheckpointReplicationGroup());
ASSERT_OK(CreateReplicationFromCheckpoint());

producer_conn_ =
std::make_unique<pgwrapper::PGConn>(ASSERT_RESULT(producer_cluster_.Connect()));
consumer_conn_ =
std::make_unique<pgwrapper::PGConn>(ASSERT_RESULT(consumer_cluster_.Connect()));
producer_conn_ = std::make_unique<pgwrapper::PGConn>(
ASSERT_RESULT(producer_cluster_.ConnectToDB(namespace_name)));
consumer_conn_ = std::make_unique<pgwrapper::PGConn>(
ASSERT_RESULT(consumer_cluster_.ConnectToDB(namespace_name)));

// Create a base table and insert some rows.
ASSERT_OK(producer_conn_->ExecuteFormat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ class XClusterPgRegressDDLReplicationTest : public XClusterDDLReplicationTestBas
google::SetVLOGLevel("cdc*", 0);
}

Result<std::string> RunYSQLDump(Cluster& cluster, const std::string& database_name = "yugabyte") {
Result<std::string> RunYSQLDump(Cluster& cluster) { return RunYSQLDump(cluster, namespace_name); }

Result<std::string> RunYSQLDump(Cluster& cluster, const std::string& database_name) {
const auto output = VERIFY_RESULT(tools::RunYSQLDump(cluster.pg_host_port_, database_name));

// Filter out any lines in output that contain "binary_upgrade_set_next", since these contain
Expand All @@ -58,8 +60,11 @@ class XClusterPgRegressDDLReplicationTest : public XClusterDDLReplicationTestBas
return std::regex_replace(output, pattern, "\n<binary_upgrade_set_next>");
}

Result<std::string> ReadEnumLabelInfo(
Cluster& cluster, const std::string& database_name = "yugabyte") {
Result<std::string> ReadEnumLabelInfo(Cluster& cluster) {
return ReadEnumLabelInfo(cluster, namespace_name);
}

Result<std::string> ReadEnumLabelInfo(Cluster& cluster, const std::string& database_name) {
auto conn = VERIFY_RESULT(cluster.ConnectToDB(database_name));
return VERIFY_RESULT(conn.FetchAllAsString(
"SELECT typname, enumlabel, pg_enum.oid, enumsortorder FROM pg_enum "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ void XClusterDDLReplicationTestBase::SetUp() {

Status XClusterDDLReplicationTestBase::SetUpClusters(
bool is_colocated, bool start_yb_controller_servers) {
if (is_colocated) {
namespace_name = "colocated_test_db";
}
namespace_name = is_colocated ? "colocated_test_db" : "test_db";
const SetupParams kDefaultParams{
// By default start with no consumer or producer tables.
.num_consumer_tablets = {},
Expand All @@ -53,6 +51,7 @@ Status XClusterDDLReplicationTestBase::SetUpClusters(
.num_masters = 1,
.ranged_partitioned = false,
.is_colocated = is_colocated,
.use_different_database_oids = true,
.start_yb_controller_servers = start_yb_controller_servers,
};
RETURN_NOT_OK(XClusterYsqlTestBase::SetUpClusters(kDefaultParams));
Expand Down
12 changes: 10 additions & 2 deletions src/yb/tserver/xcluster_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "yb/common/pg_types.h"
#include "yb/common/wire_protocol.h"
#include "yb/common/xcluster_util.h"
#include "yb/common/ysql_utils.h"

#include "yb/gutil/map-util.h"

Expand Down Expand Up @@ -529,9 +530,16 @@ void XClusterConsumer::TriggerPollForNewTablets() {
entry.disable_stream);

if (ddl_queue_streams_.contains(producer_tablet_info.stream_id)) {
auto source_namespace_id = GetNamespaceIdFromYsqlTableId(producer_tablet_info.table_id);
if (!source_namespace_id.ok()) {
LOG(WARNING) << "Could not get source namespace id for table "
<< producer_tablet_info.table_id << ": "
<< source_namespace_id.status().ToString();
continue; // Don't finish creation. Try again on the next RunThread().
}
xcluster_poller->InitDDLQueuePoller(
use_local_tserver, rate_limiter_.get(), consumer_namespace_name, xcluster_context_,
connect_to_pg_func_);
use_local_tserver, rate_limiter_.get(), consumer_namespace_name, *source_namespace_id,
xcluster_context_, connect_to_pg_func_);
} else {
xcluster_poller->Init(use_local_tserver, rate_limiter_.get());
}
Expand Down
7 changes: 4 additions & 3 deletions src/yb/tserver/xcluster_ddl_queue_handler-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ const std::string kDDLCommandCreateIndex = "CREATE INDEX";
class XClusterDDLQueueHandlerMocked : public XClusterDDLQueueHandler {
public:
explicit XClusterDDLQueueHandlerMocked(
const client::YBTableName& table_name, MockTserverXClusterContext& xcluster_context)
const client::YBTableName& target_table_name, MockTserverXClusterContext& xcluster_context)
: XClusterDDLQueueHandler(
/* local_client */ nullptr, table_name.namespace_name(), table_name.namespace_id(),
/* log_prefix */ "", xcluster_context, /* connect_to_pg_func */ nullptr) {}
/* local_client */ nullptr, target_table_name.namespace_name(),
/* source_namespace_id */ "", target_table_name.namespace_id(), /* log_prefix */ "",
xcluster_context, /* connect_to_pg_func */ nullptr) {}

Status ProcessDDLQueueTable(
const std::optional<HybridTime>& apply_safe_time, int num_records = 1) {
Expand Down
14 changes: 8 additions & 6 deletions src/yb/tserver/xcluster_ddl_queue_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,13 @@ Result<rapidjson::Document> ParseSerializedJson(const std::string& raw_json_data

XClusterDDLQueueHandler::XClusterDDLQueueHandler(
client::YBClient* local_client, const NamespaceName& namespace_name,
const NamespaceId& namespace_id, const std::string& log_prefix,
TserverXClusterContextIf& xcluster_context, ConnectToPostgresFunc connect_to_pg_func)
const NamespaceId& source_namespace_id, const NamespaceId& target_namespace_id,
const std::string& log_prefix, TserverXClusterContextIf& xcluster_context,
ConnectToPostgresFunc connect_to_pg_func)
: local_client_(local_client),
namespace_name_(namespace_name),
namespace_id_(namespace_id),
source_namespace_id_(source_namespace_id),
target_namespace_id_(target_namespace_id),
log_prefix_(log_prefix),
xcluster_context_(xcluster_context),
connect_to_pg_func_(std::move(connect_to_pg_func)) {}
Expand Down Expand Up @@ -226,7 +228,7 @@ Status XClusterDDLQueueHandler::ProcessDDLQueueTable(const XClusterOutputClientR
HybridTime safe_time_ht = VERIFY_RESULT(GetXClusterSafeTimeForNamespace());
SCHECK(
!safe_time_ht.is_special(), InternalError, "Found invalid safe time $0 for namespace $1",
safe_time_ht, namespace_id_);
safe_time_ht, target_namespace_id_);
SCHECK_GE(
safe_time_ht, target_safe_ht, TryAgain, "Waiting for other pollers to catch up to safe time");

Expand Down Expand Up @@ -314,7 +316,7 @@ Status XClusterDDLQueueHandler::ProcessNewRelations(
for (const auto& new_rel : *new_rel_map) {
VALIDATE_MEMBER(new_rel, kDDLJsonRelFileOid, Int);
VALIDATE_MEMBER(new_rel, kDDLJsonRelName, String);
const auto db_oid = VERIFY_RESULT(GetPgsqlDatabaseOid(namespace_id_));
const auto db_oid = VERIFY_RESULT(GetPgsqlDatabaseOid(source_namespace_id_));
const auto relfile_oid = new_rel[kDDLJsonRelFileOid].GetInt();
const auto rel_name = new_rel[kDDLJsonRelName].GetString();
RETURN_NOT_OK(xcluster_context_.SetSourceTableMappingForCreateTable(
Expand Down Expand Up @@ -406,7 +408,7 @@ Status XClusterDDLQueueHandler::InitPGConnection() {

Result<HybridTime> XClusterDDLQueueHandler::GetXClusterSafeTimeForNamespace() {
return local_client_->GetXClusterSafeTimeForNamespace(
namespace_id_, master::XClusterSafeTimeFilter::DDL_QUEUE);
target_namespace_id_, master::XClusterSafeTimeFilter::DDL_QUEUE);
}

Result<std::vector<std::tuple<int64, int64, std::string>>>
Expand Down
8 changes: 5 additions & 3 deletions src/yb/tserver/xcluster_ddl_queue_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class XClusterDDLQueueHandler {
public:
XClusterDDLQueueHandler(
client::YBClient* local_client, const NamespaceName& namespace_name,
const NamespaceId& namespace_id, const std::string& log_prefix,
TserverXClusterContextIf& xcluster_context, ConnectToPostgresFunc connect_to_pg_func);
const NamespaceId& source_namespace_id, const NamespaceId& target_namespace_id,
const std::string& log_prefix, TserverXClusterContextIf& xcluster_context,
ConnectToPostgresFunc connect_to_pg_func);
virtual ~XClusterDDLQueueHandler();

Status ProcessDDLQueueTable(const XClusterOutputClientResponse& response);
Expand Down Expand Up @@ -83,7 +84,8 @@ class XClusterDDLQueueHandler {

std::unique_ptr<pgwrapper::PGConn> pg_conn_;
NamespaceName namespace_name_;
NamespaceId namespace_id_;
NamespaceId source_namespace_id_;
NamespaceId target_namespace_id_;
const std::string log_prefix_;
TserverXClusterContextIf& xcluster_context_;
ConnectToPostgresFunc connect_to_pg_func_;
Expand Down
7 changes: 4 additions & 3 deletions src/yb/tserver/xcluster_poller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,14 @@ void XClusterPoller::Init(bool use_local_tserver, rocksdb::RateLimiter* rate_lim

void XClusterPoller::InitDDLQueuePoller(
bool use_local_tserver, rocksdb::RateLimiter* rate_limiter, const NamespaceName& namespace_name,
TserverXClusterContextIf& xcluster_context, ConnectToPostgresFunc connect_to_pg_func) {
const NamespaceId& source_namespace_id, TserverXClusterContextIf& xcluster_context,
ConnectToPostgresFunc connect_to_pg_func) {
DCHECK_EQ(is_automatic_mode_, true);
Init(use_local_tserver, rate_limiter);

ddl_queue_handler_ = std::make_shared<XClusterDDLQueueHandler>(
&local_client_, namespace_name, consumer_namespace_id_, LogPrefix(), xcluster_context,
std::move(connect_to_pg_func));
&local_client_, namespace_name, source_namespace_id, consumer_namespace_id_, LogPrefix(),
xcluster_context, std::move(connect_to_pg_func));
}

void XClusterPoller::StartShutdown() {
Expand Down
4 changes: 2 additions & 2 deletions src/yb/tserver/xcluster_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class XClusterPoller : public XClusterAsyncExecutor {
void Init(bool use_local_tserver, rocksdb::RateLimiter* rate_limiter);
void InitDDLQueuePoller(
bool use_local_tserver, rocksdb::RateLimiter* rate_limiter,
const NamespaceName& namespace_name, TserverXClusterContextIf& xcluster_context,
ConnectToPostgresFunc connect_to_pg_func);
const NamespaceId& source_namespace_id, const NamespaceName& target_namespace_name,
TserverXClusterContextIf& xcluster_context, ConnectToPostgresFunc connect_to_pg_func);

void StartShutdown() override;
void CompleteShutdown() override;
Expand Down

0 comments on commit 99dc1ec

Please sign in to comment.