Skip to content

Commit

Permalink
[#24918] CDC: Enable tablet split for logical replication
Browse files Browse the repository at this point in the history
Summary:
There exists a bug in the hidden tablet deletion logic because of which we might end up not deleting the hidden parent tablets when logical replication is in use. To prevent getting into this state, automatic tablet splitting had been disabled when logical replication is in use.

This diff fixes the bug and re-enables automatic tablet splitting along with logical replication. Currently, a stream gives approval for the deletion of a hidden tablet if one of the below three conditions are satisfied:

  # Both its children are polled.
  # The state table entry for the hidden table is already deleted.
  # The hidden tablet is unpolled. This is inferred from the state table entry's checkpoint and with consistent snapshot streams this criteria is never satisfied

However for the tablets of an unpolled table none of these conditions are satisfied. In this diff we calculate the min restart time (given by the `record_id_commit_time` field of the slot entry) across all the slots over the hidden tablet. We also store the tablet split time (which is given by the hide time of the parent tablet) along with parent-children mapping in the `retained_by_cdcsdk_` map in master. In addition to above mentioned conditions, we now allow for the hidden tablet deletion if:

  # The tablet split time of a hidden tablet is less than the min restart time across all the slots, as it implies that no slot will ever stream a record with commit time <= split time of this hidden tablet.
  # The cdc_state entry for hidden parent tablet has expired or become not of interest. In this case the retention barriers will be lifted and such a tablet will become unusable from CDC POV.

In both these cases we remove the state table entry for the hidden parent tablet.

In addition to this, in case of logical replication, at the time of tablet split, we add children tablet entries to the cdc_state table with their parent's checkpoint. This is required because a user can perform an alter publication anytime adding a previously unpolled table with split tablets. Since the split parent's state table entry will be removed, we need to inherit the parent's checkpoint into children entries at the time of split.

**Note:** All the changes are applicable to logical replication model of CDC only. Also post this commit, we will have to set the flag `cdc_intent_retention_ms` and `cdcsdk_tablet_not_of_interest_timeout_secs ` on both master and tserver.
Jira: DB-14049

Test Plan:
./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestHiddenTabletDeletionWithUnusedSlot
./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestHiddenTabletDeletionOfUnpolledTable
./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestDynamicTablesAdditionAfterHiddenTabletDeletion

Reviewers: skumar, siddharth.shah, xCluster, hsunder

Reviewed By: siddharth.shah

Subscribers: ybase, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D41406
  • Loading branch information
Sumukh-Phalgaonkar committed Jan 29, 2025
1 parent 2575f61 commit 6448b6d
Show file tree
Hide file tree
Showing 6 changed files with 409 additions and 29 deletions.
266 changes: 266 additions & 0 deletions src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3754,5 +3754,271 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestCompactionWithReplicaIdentity
ASSERT_EQ(get_consistent_changes_resp.records.size(), 6);
}

TEST_F(CDCSDKConsumptionConsistentChangesTest, TestHiddenTabletDeletionOfUnpolledTable) {
auto parent_tablet_deletion_task_interval = 1;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_parent_tablet_deletion_task_retry_secs) =
parent_tablet_deletion_task_interval;
ASSERT_OK(SetUpWithParams(
3 /* replication_factor */, 1 /* num_masters */, false /* colocated */,
true /* cdc_populate_safepoint_record */));

const uint32_t num_tablets = 1;
const uint32_t num_tables = 2;
vector<string> table_suffix = {"_1", "_2"};
vector<YBTableName> tables(num_tables);
vector<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> tablets(num_tables);

// Create two tables, each having one tablet.
for (uint32_t idx = 0; idx < num_tables; idx++) {
tables[idx] = ASSERT_RESULT(
CreateTable(&test_cluster_, kNamespaceName, kTableName + table_suffix[idx], num_tablets));
ASSERT_OK(test_client()->GetTablets(
tables[idx], 0, &tablets[idx], nullptr /* partition_list_version */));
ASSERT_EQ(tablets[idx].size(), num_tablets);
}

// Create a slot, and start a VWAL instance for test_table_1 only.
auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());
ASSERT_OK(InitVirtualWAL(stream_id, {tables[0].table_id()}, kVWALSessionId1));

// Get the state table entry of the tablet for test_table_2.
auto parent_tablet_row =
ASSERT_RESULT(ReadFromCdcStateTable(stream_id, tablets[1].Get(0).tablet_id()));

// Write 100 rows to test_table_2 and then split it.
ASSERT_OK(
WriteRowsHelper(0, 100, &test_cluster_, true, 2, (kTableName + table_suffix[1]).c_str()));
ASSERT_OK(WaitForFlushTables({tables[1].table_id()}, false, 1000, true));
WaitUntilSplitIsSuccesful(tablets[1].Get(0).tablet_id(), tables[1], 2);

// Sleep to ensure that parent tablet deletion task has run for atleast one iteration.
SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier * parent_tablet_deletion_task_interval));

// Get all the tablets including hidden tablets for test_table_2. We should get 3 tablets, i.e. 2
// children and 1 un-deleted hidden parent tablet, since the restart time is behind the tablet
// split time.
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> get_tablets_res;
ASSERT_OK(test_client()->GetTablets(
tables[1], 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse,
master::IncludeInactive::kTrue));
ASSERT_EQ(get_tablets_res.size(), 3);

// Now, we insert and consume records from test_table_1. This will move the restart time forward
// and eventually it will become greater than the tablet split time. As a result the hidden parent
// tablet will be deleted.
auto i = 0;
ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
RETURN_NOT_OK(WriteRowsHelper(
i, i + 100, &test_cluster_, true, 2, (kTableName + table_suffix[0]).c_str()));
auto get_consistent_changes_resp = VERIFY_RESULT(GetAllPendingTxnsFromVirtualWAL(
stream_id, {tables[0].table_id()}, 100 /* expected_dml_records*/,
false /* init_virtual_wal */));
get_tablets_res.Clear();
auto s = test_client()->GetTablets(
tables[1], 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse,
master::IncludeInactive::kTrue);
i += 100;
SleepFor(MonoDelta::FromSeconds(5));
return get_tablets_res.size() == 2;
},
MonoDelta::FromSeconds(120 * kTimeMultiplier),
"Timed out waiting for hidden tablet deletion"));

// Assert that children tablet entries have the parent entry's checkpoint.
for (auto child_tablet : get_tablets_res) {
auto child_tablet_row =
ASSERT_RESULT(ReadFromCdcStateTable(stream_id, child_tablet.tablet_id()));
ASSERT_EQ(parent_tablet_row.op_id, child_tablet_row.op_id);
}
}

TEST_F(CDCSDKConsumptionConsistentChangesTest, TestHiddenTabletDeletionWithUnusedSlot) {
auto parent_tablet_deletion_task_interval = 1;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_parent_tablet_deletion_task_retry_secs) =
parent_tablet_deletion_task_interval;

ASSERT_OK(SetUpWithParams(
3 /* replication_factor */, 1 /* num_masters */, false /* colocated */,
true /* cdc_populate_safepoint_record */));

// Create one table with single tablet.
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr /* partition_list_version */));
ASSERT_EQ(tablets.size(), 1);

// Create 2 slots, only one of them will be used for polling.
auto polled_stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());
auto unpolled_stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

// Get the state table entries of the parent tablet.
auto parent_tablet_row_polled_stream =
ASSERT_RESULT(ReadFromCdcStateTable(polled_stream_id, tablets.Get(0).tablet_id()));
auto parent_tablet_row_unpolled_stream =
ASSERT_RESULT(ReadFromCdcStateTable(unpolled_stream_id, tablets.Get(0).tablet_id()));

ASSERT_OK(WriteRowsHelper(0, 10, &test_cluster_, true));

// Split the tablet.
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true));
WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table, 2);

// Consume the 10 inserted records with the first slot.
auto get_consistent_changes_resp = ASSERT_RESULT(GetAllPendingTxnsFromVirtualWAL(
polled_stream_id, {table.table_id()}, 10 /* expected_dml_records*/,
true /* init_virtual_wal */));

// Sleep to ensure that parent tablet deletion task has run for atleast one iteration.
SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier * parent_tablet_deletion_task_interval));

// Get all the tablets including hidden tablets. We should get 3 tablets, i.e 2 children and 1
// un-deleted hidden parent tablet, since one slot is unpolled leaving restart time across all
// slots behind the tablet split time.
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> get_tablets_res;
ASSERT_OK(test_client()->GetTablets(
table, 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse,
master::IncludeInactive::kTrue));
ASSERT_EQ(get_tablets_res.size(), 3);

ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_intent_retention_ms) = 0;

// Now, since cdc_wal_retention_time_secs has been set to zero, we will delete the hidden parent
// tablet as now cdc_wal_retention_time_secs interval has passed since the tablet split time.
ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
get_tablets_res.Clear();
auto s = test_client()->GetTablets(
table, 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse,
master::IncludeInactive::kTrue);

return get_tablets_res.size() == 2;
},
MonoDelta::FromSeconds(120 * kTimeMultiplier),
"Timed out waiting for hidden tablet deletion"));

// Assert that children tablet entries have the parent entry's checkpoint.
for (auto child_tablet : get_tablets_res) {
auto child_tablet_row_polled_stream =
ASSERT_RESULT(ReadFromCdcStateTable(polled_stream_id, child_tablet.tablet_id()));
auto child_tablet_row_unpolled_stream =
ASSERT_RESULT(ReadFromCdcStateTable(unpolled_stream_id, child_tablet.tablet_id()));

ASSERT_EQ(parent_tablet_row_polled_stream.op_id, child_tablet_row_polled_stream.op_id);
ASSERT_EQ(parent_tablet_row_unpolled_stream.op_id, child_tablet_row_unpolled_stream.op_id);
}
}

// This test verifies that it is safe to add a table to polling list (publication) after its tablets
// have been split and hidden tablets deleted.
TEST_F(CDCSDKConsumptionConsistentChangesTest, TestDynamicTablesAdditionAfterHiddenTabletDeletion) {
uint64_t publication_refresh_interval = 5;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_publication_list_refresh_interval_secs) =
publication_refresh_interval;

ASSERT_OK(SetUpWithParams(
3 /* replication_factor */, 1 /* num_masters */, false /* colocated */,
true /* cdc_populate_safepoint_record */));

const uint32_t num_tablets = 1;
const uint32_t num_tables = 2;
vector<string> table_suffix = {"_1", "_2"};
vector<YBTableName> tables(num_tables);
vector<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> tablets(num_tables);

// Create two tables, each having one tablet.
for (uint32_t idx = 0; idx < num_tables; idx++) {
tables[idx] = ASSERT_RESULT(
CreateTable(&test_cluster_, kNamespaceName, kTableName + table_suffix[idx], num_tablets));
ASSERT_OK(test_client()->GetTablets(
tables[idx], 0, &tablets[idx], nullptr /* partition_list_version */));
ASSERT_EQ(tablets[idx].size(), num_tablets);
}

// Create a slot.
auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot());

// Get the state table entry of the tablet for test_table_2.
auto parent_tablet_row =
ASSERT_RESULT(ReadFromCdcStateTable(stream_id, tablets[1].Get(0).tablet_id()));

// Write 100 rows to test_table_2 and then split it.
ASSERT_OK(
WriteRowsHelper(0, 100, &test_cluster_, true, 2, (kTableName + table_suffix[1]).c_str()));
ASSERT_OK(WaitForFlushTables({tables[1].table_id()}, false, 1000, true));
WaitUntilSplitIsSuccesful(tablets[1].Get(0).tablet_id(), tables[1], 2);

// Get all the tablets including hidden tablets for test_table_2. We should get 3 tablets, i.e. 2
// children and 1 un-deleted hidden parent tablet, since the restart time is behind the tablet
// split time.
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> get_tablets_res;
ASSERT_OK(test_client()->GetTablets(
tables[1], 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse,
master::IncludeInactive::kTrue));
ASSERT_EQ(get_tablets_res.size(), 3);

// Init Virtual Wal with only test_table_1.
ASSERT_OK(InitVirtualWAL(stream_id, {tables[0].table_id()}, kVWALSessionId1));

// Insert and consume records from test_table_1 so that its restart time moves ahead, leading to
// the deletion of hidden tablet belonging to test_table_2. Once the hidden tablet has been
// deleted, add test_table_2 to the polling list.
auto i = 0;
ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
RETURN_NOT_OK(WriteRowsHelper(
i, i + 100, &test_cluster_, true, 2, (kTableName + table_suffix[0]).c_str()));
auto get_consistent_changes_resp = VERIFY_RESULT(GetAllPendingTxnsFromVirtualWAL(
stream_id, {tables[0].table_id()}, 100 /* expected_dml_records*/,
false /* init_virtual_wal */));
get_tablets_res.Clear();
auto s = test_client()->GetTablets(
tables[1], 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse,
master::IncludeInactive::kTrue);
i += 100;
SleepFor(MonoDelta::FromSeconds(5));
return get_tablets_res.size() == 2;
},
MonoDelta::FromSeconds(120 * kTimeMultiplier),
"Timed out waiting for hidden tablet deletion"));

// Assert that children tablet entries have the parent entry's checkpoint.
for (auto child_tablet : get_tablets_res) {
auto child_tablet_row =
ASSERT_RESULT(ReadFromCdcStateTable(stream_id, child_tablet.tablet_id()));
ASSERT_EQ(parent_tablet_row.op_id, child_tablet_row.op_id);
}

// Sleep to ensure that we receive a pub refresh record.
SleepFor(MonoDelta::FromSeconds(2 * publication_refresh_interval));

// Perform a txn inserting a record each in both the tables.
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
ASSERT_OK(conn.Execute("BEGIN;"));
ASSERT_OK(conn.Execute("INSERT INTO test_table_1 values (9999999,1)"));
ASSERT_OK(conn.Execute("INSERT INTO test_table_2 values (9999999,1)"));
ASSERT_OK(conn.Execute("COMMIT;"));

auto change_resp = ASSERT_RESULT(GetConsistentChangesFromCDC(stream_id));
ASSERT_EQ(change_resp.cdc_sdk_proto_records_size(), 0);
ASSERT_TRUE(
change_resp.has_needs_publication_table_list_refresh() &&
change_resp.needs_publication_table_list_refresh() &&
change_resp.has_publication_refresh_time());
ASSERT_GT(change_resp.publication_refresh_time(), 0);

// Update the publication's tables list.
ASSERT_OK(UpdatePublicationTableList(stream_id, {tables[0].table_id(), tables[1].table_id()}));

auto get_consistent_changes_resp = ASSERT_RESULT(GetAllPendingTxnsFromVirtualWAL(
stream_id, {} /* table_ids */, 2 /* expected_dml_records*/, false /* init_virtual_wal */));

// We will receive the records from the above txn belonging to both the tables
ASSERT_EQ(
get_consistent_changes_resp.records[1].row_message().table(), kTableName + table_suffix[0]);
ASSERT_EQ(
get_consistent_changes_resp.records[2].row_message().table(), kTableName + table_suffix[1]);
}

} // namespace cdc
} // namespace yb
2 changes: 1 addition & 1 deletion src/yb/integration-tests/cdcsdk_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ Result<xrepl::StreamId> CDCSDKTestBase::CreateConsistentSnapshotStream(

Result<xrepl::StreamId> CDCSDKTestBase::CreateDBStreamBasedOnCheckpointType(
CDCCheckpointType checkpoint_type) {
return checkpoint_type == CDCCheckpointType::EXPLICIT ? CreateDBStreamWithReplicationSlot()
return checkpoint_type == CDCCheckpointType::EXPLICIT ? CreateConsistentSnapshotStream()
: CreateDBStream(IMPLICIT);
}

Expand Down
2 changes: 0 additions & 2 deletions src/yb/integration-tests/cdcsdk_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ class CDCSDKTestBase : public YBTest {

ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_enable_packed_row_for_colocated_table) = true;

ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_replication_slot_streamed_tables) =
true;
}

void TearDown() override;
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ DEFINE_RUNTIME_bool(enable_tablet_split_of_cdcsdk_streamed_tables, true,
"When set, it enables automatic tablet splitting for tables that are part of a "
"CDCSDK stream");

DEFINE_RUNTIME_bool(enable_tablet_split_of_replication_slot_streamed_tables, false,
DEFINE_RUNTIME_bool(enable_tablet_split_of_replication_slot_streamed_tables, true,
"When set, it enables automatic tablet splitting for tables that are part of replication "
"slot's stream metadata");

Expand Down
16 changes: 15 additions & 1 deletion src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,7 @@ class CatalogManager : public CatalogManagerIf, public SnapshotCoordinatorContex
TableId table_id_;
std::string parent_tablet_id_;
std::array<TabletId, kNumSplitParts> split_tablets_;
HybridTime hide_time_;
};
std::unordered_map<TabletId, HiddenReplicationParentTabletInfo> retained_by_cdcsdk_
GUARDED_BY(mutex_);
Expand Down Expand Up @@ -2802,7 +2803,20 @@ class CatalogManager : public CatalogManagerIf, public SnapshotCoordinatorContex
CoarseTimePoint deadline,
bool* const bootstrap_required);

std::unordered_set<xrepl::StreamId> GetCDCSDKStreamsForTable(const TableId& table_id) const;
std::unordered_set<xrepl::StreamId> GetCDCSDKStreamsForTable(const TableId& table_id) const
REQUIRES_SHARED(mutex_);

// Reads the slot entries for all the stream_ids provided and returns the minimum restart time
// across them.
Result<HybridTime> GetMinRestartTimeAcrossSlots(
const std::unordered_set<xrepl::StreamId>& stream_ids) REQUIRES_SHARED(mutex_);

Result<std::unordered_map<xrepl::StreamId, std::optional<HybridTime>>>
GetCDCSDKStreamCreationTimeMap(const std::unordered_set<xrepl::StreamId>& stream_ids)
REQUIRES_SHARED(mutex_);

bool IsCDCSDKTabletExpiredOrNotOfInterest(
HybridTime last_active_time, std::optional<HybridTime> stream_creation_time);

Status AddNamespaceEntriesToPB(
const std::vector<TableDescription>& tables,
Expand Down
Loading

0 comments on commit 6448b6d

Please sign in to comment.