From 6448b6dd79f2e521d9dc819f76bccc9f676c72a5 Mon Sep 17 00:00:00 2001 From: Sumukh-Phalgaonkar Date: Wed, 29 Jan 2025 00:38:00 +0530 Subject: [PATCH] [#24918] CDC: Enable tablet split for logical replication Summary: There exists a bug in the hidden tablet deletion logic because of which we might end up not deleting the hidden parent tablets when logical replication is in use. To prevent getting into this state, automatic tablet splitting had been disabled when logical replication is in use. This diff fixes the bug and re-enables automatic tablet splitting along with logical replication. Currently, a stream gives approval for the deletion of a hidden tablet if one of the below three conditions are satisfied: # Both its children are polled. # The state table entry for the hidden table is already deleted. # The hidden tablet is unpolled. This is inferred from the state table entry's checkpoint and with consistent snapshot streams this criteria is never satisfied However for the tablets of an unpolled table none of these conditions are satisfied. In this diff we calculate the min restart time (given by the `record_id_commit_time` field of the slot entry) across all the slots over the hidden tablet. We also store the tablet split time (which is given by the hide time of the parent tablet) along with parent-children mapping in the `retained_by_cdcsdk_` map in master. In addition to above mentioned conditions, we now allow for the hidden tablet deletion if: # The tablet split time of a hidden tablet is less than the min restart time across all the slots, as it implies that no slot will ever stream a record with commit time <= split time of this hidden tablet. # The cdc_state entry for hidden parent tablet has expired or become not of interest. In this case the retention barriers will be lifted and such a tablet will become unusable from CDC POV. In both these cases we remove the state table entry for the hidden parent tablet. In addition to this, in case of logical replication, at the time of tablet split, we add children tablet entries to the cdc_state table with their parent's checkpoint. This is required because a user can perform an alter publication anytime adding a previously unpolled table with split tablets. Since the split parent's state table entry will be removed, we need to inherit the parent's checkpoint into children entries at the time of split. **Note:** All the changes are applicable to logical replication model of CDC only. Also post this commit, we will have to set the flag `cdc_intent_retention_ms` and `cdcsdk_tablet_not_of_interest_timeout_secs ` on both master and tserver. Jira: DB-14049 Test Plan: ./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestHiddenTabletDeletionWithUnusedSlot ./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestHiddenTabletDeletionOfUnpolledTable ./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestDynamicTablesAdditionAfterHiddenTabletDeletion Reviewers: skumar, siddharth.shah, xCluster, hsunder Reviewed By: siddharth.shah Subscribers: ybase, ycdcxcluster Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D41406 --- ...sdk_consumption_consistent_changes-test.cc | 266 ++++++++++++++++++ src/yb/integration-tests/cdcsdk_test_base.cc | 2 +- src/yb/integration-tests/cdcsdk_test_base.h | 2 - src/yb/master/catalog_manager.cc | 2 +- src/yb/master/catalog_manager.h | 16 +- src/yb/master/xrepl_catalog_manager.cc | 150 ++++++++-- 6 files changed, 409 insertions(+), 29 deletions(-) diff --git a/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc b/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc index f460bd82f102..bcac47ed2b1c 100644 --- a/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc +++ b/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc @@ -3754,5 +3754,271 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestCompactionWithReplicaIdentity ASSERT_EQ(get_consistent_changes_resp.records.size(), 6); } +TEST_F(CDCSDKConsumptionConsistentChangesTest, TestHiddenTabletDeletionOfUnpolledTable) { + auto parent_tablet_deletion_task_interval = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_parent_tablet_deletion_task_retry_secs) = + parent_tablet_deletion_task_interval; + ASSERT_OK(SetUpWithParams( + 3 /* replication_factor */, 1 /* num_masters */, false /* colocated */, + true /* cdc_populate_safepoint_record */)); + + const uint32_t num_tablets = 1; + const uint32_t num_tables = 2; + vector table_suffix = {"_1", "_2"}; + vector tables(num_tables); + vector> tablets(num_tables); + + // Create two tables, each having one tablet. + for (uint32_t idx = 0; idx < num_tables; idx++) { + tables[idx] = ASSERT_RESULT( + CreateTable(&test_cluster_, kNamespaceName, kTableName + table_suffix[idx], num_tablets)); + ASSERT_OK(test_client()->GetTablets( + tables[idx], 0, &tablets[idx], nullptr /* partition_list_version */)); + ASSERT_EQ(tablets[idx].size(), num_tablets); + } + + // Create a slot, and start a VWAL instance for test_table_1 only. + auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot()); + ASSERT_OK(InitVirtualWAL(stream_id, {tables[0].table_id()}, kVWALSessionId1)); + + // Get the state table entry of the tablet for test_table_2. + auto parent_tablet_row = + ASSERT_RESULT(ReadFromCdcStateTable(stream_id, tablets[1].Get(0).tablet_id())); + + // Write 100 rows to test_table_2 and then split it. + ASSERT_OK( + WriteRowsHelper(0, 100, &test_cluster_, true, 2, (kTableName + table_suffix[1]).c_str())); + ASSERT_OK(WaitForFlushTables({tables[1].table_id()}, false, 1000, true)); + WaitUntilSplitIsSuccesful(tablets[1].Get(0).tablet_id(), tables[1], 2); + + // Sleep to ensure that parent tablet deletion task has run for atleast one iteration. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier * parent_tablet_deletion_task_interval)); + + // Get all the tablets including hidden tablets for test_table_2. We should get 3 tablets, i.e. 2 + // children and 1 un-deleted hidden parent tablet, since the restart time is behind the tablet + // split time. + google::protobuf::RepeatedPtrField get_tablets_res; + ASSERT_OK(test_client()->GetTablets( + tables[1], 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse, + master::IncludeInactive::kTrue)); + ASSERT_EQ(get_tablets_res.size(), 3); + + // Now, we insert and consume records from test_table_1. This will move the restart time forward + // and eventually it will become greater than the tablet split time. As a result the hidden parent + // tablet will be deleted. + auto i = 0; + ASSERT_OK(WaitFor( + [&]() -> Result { + RETURN_NOT_OK(WriteRowsHelper( + i, i + 100, &test_cluster_, true, 2, (kTableName + table_suffix[0]).c_str())); + auto get_consistent_changes_resp = VERIFY_RESULT(GetAllPendingTxnsFromVirtualWAL( + stream_id, {tables[0].table_id()}, 100 /* expected_dml_records*/, + false /* init_virtual_wal */)); + get_tablets_res.Clear(); + auto s = test_client()->GetTablets( + tables[1], 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse, + master::IncludeInactive::kTrue); + i += 100; + SleepFor(MonoDelta::FromSeconds(5)); + return get_tablets_res.size() == 2; + }, + MonoDelta::FromSeconds(120 * kTimeMultiplier), + "Timed out waiting for hidden tablet deletion")); + + // Assert that children tablet entries have the parent entry's checkpoint. + for (auto child_tablet : get_tablets_res) { + auto child_tablet_row = + ASSERT_RESULT(ReadFromCdcStateTable(stream_id, child_tablet.tablet_id())); + ASSERT_EQ(parent_tablet_row.op_id, child_tablet_row.op_id); + } +} + +TEST_F(CDCSDKConsumptionConsistentChangesTest, TestHiddenTabletDeletionWithUnusedSlot) { + auto parent_tablet_deletion_task_interval = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_parent_tablet_deletion_task_retry_secs) = + parent_tablet_deletion_task_interval; + + ASSERT_OK(SetUpWithParams( + 3 /* replication_factor */, 1 /* num_masters */, false /* colocated */, + true /* cdc_populate_safepoint_record */)); + + // Create one table with single tablet. + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName)); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr /* partition_list_version */)); + ASSERT_EQ(tablets.size(), 1); + + // Create 2 slots, only one of them will be used for polling. + auto polled_stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot()); + auto unpolled_stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot()); + + // Get the state table entries of the parent tablet. + auto parent_tablet_row_polled_stream = + ASSERT_RESULT(ReadFromCdcStateTable(polled_stream_id, tablets.Get(0).tablet_id())); + auto parent_tablet_row_unpolled_stream = + ASSERT_RESULT(ReadFromCdcStateTable(unpolled_stream_id, tablets.Get(0).tablet_id())); + + ASSERT_OK(WriteRowsHelper(0, 10, &test_cluster_, true)); + + // Split the tablet. + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true)); + WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table, 2); + + // Consume the 10 inserted records with the first slot. + auto get_consistent_changes_resp = ASSERT_RESULT(GetAllPendingTxnsFromVirtualWAL( + polled_stream_id, {table.table_id()}, 10 /* expected_dml_records*/, + true /* init_virtual_wal */)); + + // Sleep to ensure that parent tablet deletion task has run for atleast one iteration. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier * parent_tablet_deletion_task_interval)); + + // Get all the tablets including hidden tablets. We should get 3 tablets, i.e 2 children and 1 + // un-deleted hidden parent tablet, since one slot is unpolled leaving restart time across all + // slots behind the tablet split time. + google::protobuf::RepeatedPtrField get_tablets_res; + ASSERT_OK(test_client()->GetTablets( + table, 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse, + master::IncludeInactive::kTrue)); + ASSERT_EQ(get_tablets_res.size(), 3); + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_intent_retention_ms) = 0; + + // Now, since cdc_wal_retention_time_secs has been set to zero, we will delete the hidden parent + // tablet as now cdc_wal_retention_time_secs interval has passed since the tablet split time. + ASSERT_OK(WaitFor( + [&]() -> Result { + get_tablets_res.Clear(); + auto s = test_client()->GetTablets( + table, 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse, + master::IncludeInactive::kTrue); + + return get_tablets_res.size() == 2; + }, + MonoDelta::FromSeconds(120 * kTimeMultiplier), + "Timed out waiting for hidden tablet deletion")); + + // Assert that children tablet entries have the parent entry's checkpoint. + for (auto child_tablet : get_tablets_res) { + auto child_tablet_row_polled_stream = + ASSERT_RESULT(ReadFromCdcStateTable(polled_stream_id, child_tablet.tablet_id())); + auto child_tablet_row_unpolled_stream = + ASSERT_RESULT(ReadFromCdcStateTable(unpolled_stream_id, child_tablet.tablet_id())); + + ASSERT_EQ(parent_tablet_row_polled_stream.op_id, child_tablet_row_polled_stream.op_id); + ASSERT_EQ(parent_tablet_row_unpolled_stream.op_id, child_tablet_row_unpolled_stream.op_id); + } +} + +// This test verifies that it is safe to add a table to polling list (publication) after its tablets +// have been split and hidden tablets deleted. +TEST_F(CDCSDKConsumptionConsistentChangesTest, TestDynamicTablesAdditionAfterHiddenTabletDeletion) { + uint64_t publication_refresh_interval = 5; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_publication_list_refresh_interval_secs) = + publication_refresh_interval; + + ASSERT_OK(SetUpWithParams( + 3 /* replication_factor */, 1 /* num_masters */, false /* colocated */, + true /* cdc_populate_safepoint_record */)); + + const uint32_t num_tablets = 1; + const uint32_t num_tables = 2; + vector table_suffix = {"_1", "_2"}; + vector tables(num_tables); + vector> tablets(num_tables); + + // Create two tables, each having one tablet. + for (uint32_t idx = 0; idx < num_tables; idx++) { + tables[idx] = ASSERT_RESULT( + CreateTable(&test_cluster_, kNamespaceName, kTableName + table_suffix[idx], num_tablets)); + ASSERT_OK(test_client()->GetTablets( + tables[idx], 0, &tablets[idx], nullptr /* partition_list_version */)); + ASSERT_EQ(tablets[idx].size(), num_tablets); + } + + // Create a slot. + auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStreamWithReplicationSlot()); + + // Get the state table entry of the tablet for test_table_2. + auto parent_tablet_row = + ASSERT_RESULT(ReadFromCdcStateTable(stream_id, tablets[1].Get(0).tablet_id())); + + // Write 100 rows to test_table_2 and then split it. + ASSERT_OK( + WriteRowsHelper(0, 100, &test_cluster_, true, 2, (kTableName + table_suffix[1]).c_str())); + ASSERT_OK(WaitForFlushTables({tables[1].table_id()}, false, 1000, true)); + WaitUntilSplitIsSuccesful(tablets[1].Get(0).tablet_id(), tables[1], 2); + + // Get all the tablets including hidden tablets for test_table_2. We should get 3 tablets, i.e. 2 + // children and 1 un-deleted hidden parent tablet, since the restart time is behind the tablet + // split time. + google::protobuf::RepeatedPtrField get_tablets_res; + ASSERT_OK(test_client()->GetTablets( + tables[1], 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse, + master::IncludeInactive::kTrue)); + ASSERT_EQ(get_tablets_res.size(), 3); + + // Init Virtual Wal with only test_table_1. + ASSERT_OK(InitVirtualWAL(stream_id, {tables[0].table_id()}, kVWALSessionId1)); + + // Insert and consume records from test_table_1 so that its restart time moves ahead, leading to + // the deletion of hidden tablet belonging to test_table_2. Once the hidden tablet has been + // deleted, add test_table_2 to the polling list. + auto i = 0; + ASSERT_OK(WaitFor( + [&]() -> Result { + RETURN_NOT_OK(WriteRowsHelper( + i, i + 100, &test_cluster_, true, 2, (kTableName + table_suffix[0]).c_str())); + auto get_consistent_changes_resp = VERIFY_RESULT(GetAllPendingTxnsFromVirtualWAL( + stream_id, {tables[0].table_id()}, 100 /* expected_dml_records*/, + false /* init_virtual_wal */)); + get_tablets_res.Clear(); + auto s = test_client()->GetTablets( + tables[1], 3, &get_tablets_res, nullptr, RequireTabletsRunning::kFalse, + master::IncludeInactive::kTrue); + i += 100; + SleepFor(MonoDelta::FromSeconds(5)); + return get_tablets_res.size() == 2; + }, + MonoDelta::FromSeconds(120 * kTimeMultiplier), + "Timed out waiting for hidden tablet deletion")); + + // Assert that children tablet entries have the parent entry's checkpoint. + for (auto child_tablet : get_tablets_res) { + auto child_tablet_row = + ASSERT_RESULT(ReadFromCdcStateTable(stream_id, child_tablet.tablet_id())); + ASSERT_EQ(parent_tablet_row.op_id, child_tablet_row.op_id); + } + + // Sleep to ensure that we receive a pub refresh record. + SleepFor(MonoDelta::FromSeconds(2 * publication_refresh_interval)); + + // Perform a txn inserting a record each in both the tables. + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + ASSERT_OK(conn.Execute("BEGIN;")); + ASSERT_OK(conn.Execute("INSERT INTO test_table_1 values (9999999,1)")); + ASSERT_OK(conn.Execute("INSERT INTO test_table_2 values (9999999,1)")); + ASSERT_OK(conn.Execute("COMMIT;")); + + auto change_resp = ASSERT_RESULT(GetConsistentChangesFromCDC(stream_id)); + ASSERT_EQ(change_resp.cdc_sdk_proto_records_size(), 0); + ASSERT_TRUE( + change_resp.has_needs_publication_table_list_refresh() && + change_resp.needs_publication_table_list_refresh() && + change_resp.has_publication_refresh_time()); + ASSERT_GT(change_resp.publication_refresh_time(), 0); + + // Update the publication's tables list. + ASSERT_OK(UpdatePublicationTableList(stream_id, {tables[0].table_id(), tables[1].table_id()})); + + auto get_consistent_changes_resp = ASSERT_RESULT(GetAllPendingTxnsFromVirtualWAL( + stream_id, {} /* table_ids */, 2 /* expected_dml_records*/, false /* init_virtual_wal */)); + + // We will receive the records from the above txn belonging to both the tables + ASSERT_EQ( + get_consistent_changes_resp.records[1].row_message().table(), kTableName + table_suffix[0]); + ASSERT_EQ( + get_consistent_changes_resp.records[2].row_message().table(), kTableName + table_suffix[1]); +} + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_test_base.cc b/src/yb/integration-tests/cdcsdk_test_base.cc index 50326916fa07..5b76e3c09267 100644 --- a/src/yb/integration-tests/cdcsdk_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_test_base.cc @@ -533,7 +533,7 @@ Result CDCSDKTestBase::CreateConsistentSnapshotStream( Result CDCSDKTestBase::CreateDBStreamBasedOnCheckpointType( CDCCheckpointType checkpoint_type) { - return checkpoint_type == CDCCheckpointType::EXPLICIT ? CreateDBStreamWithReplicationSlot() + return checkpoint_type == CDCCheckpointType::EXPLICIT ? CreateConsistentSnapshotStream() : CreateDBStream(IMPLICIT); } diff --git a/src/yb/integration-tests/cdcsdk_test_base.h b/src/yb/integration-tests/cdcsdk_test_base.h index e10b0b4a1aab..62627805298d 100644 --- a/src/yb/integration-tests/cdcsdk_test_base.h +++ b/src/yb/integration-tests/cdcsdk_test_base.h @@ -144,8 +144,6 @@ class CDCSDKTestBase : public YBTest { ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_enable_packed_row_for_colocated_table) = true; - ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_replication_slot_streamed_tables) = - true; } void TearDown() override; diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 6743f2f3164e..7abad2ab025b 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -501,7 +501,7 @@ DEFINE_RUNTIME_bool(enable_tablet_split_of_cdcsdk_streamed_tables, true, "When set, it enables automatic tablet splitting for tables that are part of a " "CDCSDK stream"); -DEFINE_RUNTIME_bool(enable_tablet_split_of_replication_slot_streamed_tables, false, +DEFINE_RUNTIME_bool(enable_tablet_split_of_replication_slot_streamed_tables, true, "When set, it enables automatic tablet splitting for tables that are part of replication " "slot's stream metadata"); diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index ea773640740c..c8d7e66c6cfa 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -2240,6 +2240,7 @@ class CatalogManager : public CatalogManagerIf, public SnapshotCoordinatorContex TableId table_id_; std::string parent_tablet_id_; std::array split_tablets_; + HybridTime hide_time_; }; std::unordered_map retained_by_cdcsdk_ GUARDED_BY(mutex_); @@ -2802,7 +2803,20 @@ class CatalogManager : public CatalogManagerIf, public SnapshotCoordinatorContex CoarseTimePoint deadline, bool* const bootstrap_required); - std::unordered_set GetCDCSDKStreamsForTable(const TableId& table_id) const; + std::unordered_set GetCDCSDKStreamsForTable(const TableId& table_id) const + REQUIRES_SHARED(mutex_); + + // Reads the slot entries for all the stream_ids provided and returns the minimum restart time + // across them. + Result GetMinRestartTimeAcrossSlots( + const std::unordered_set& stream_ids) REQUIRES_SHARED(mutex_); + + Result>> + GetCDCSDKStreamCreationTimeMap(const std::unordered_set& stream_ids) + REQUIRES_SHARED(mutex_); + + bool IsCDCSDKTabletExpiredOrNotOfInterest( + HybridTime last_active_time, std::optional stream_creation_time); Status AddNamespaceEntriesToPB( const std::vector& tables, diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index 22f6eb53aa84..e4e9eeb166bf 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -144,6 +144,8 @@ DECLARE_bool(ysql_yb_allow_replication_slot_lsn_types); DECLARE_bool(yb_enable_cdc_consistent_snapshot_streams); DECLARE_bool(ysql_yb_enable_replica_identity); DECLARE_uint32(cdc_wal_retention_time_secs); +DECLARE_uint64(cdc_intent_retention_ms); +DECLARE_uint32(cdcsdk_tablet_not_of_interest_timeout_secs); DECLARE_bool(cdcsdk_enable_dynamic_table_addition_with_table_cleanup); @@ -359,7 +361,8 @@ void CatalogManager::RecordCDCSDKHiddenTablets( .table_id_ = hidden_tablet->table()->id(), .parent_tablet_id_ = tablet_pb.has_split_parent_tablet_id() ? tablet_pb.split_parent_tablet_id() : "", - .split_tablets_ = {tablet_pb.split_tablet_ids(0), tablet_pb.split_tablet_ids(1)}}; + .split_tablets_ = {tablet_pb.split_tablet_ids(0), tablet_pb.split_tablet_ids(1)}, + .hide_time_ = HybridTime(tablet_pb.hide_hybrid_time())}; retained_by_cdcsdk_.emplace(hidden_tablet->id(), std::move(info)); } @@ -3415,9 +3418,11 @@ Status CatalogManager::UpdateCDCProducerOnTabletSplit( std::optional parent_entry_opt; if (stream_type == cdc::CDCSDK) { - parent_entry_opt = VERIFY_RESULT(cdc_state_table_->TryFetchEntry( - {split_tablet_ids.source, stream->StreamId()}, - cdc::CDCStateTableEntrySelector().IncludeActiveTime().IncludeCDCSDKSafeTime())); + parent_entry_opt = VERIFY_RESULT(cdc_state_table_->TryFetchEntry( + {split_tablet_ids.source, stream->StreamId()}, cdc::CDCStateTableEntrySelector() + .IncludeCheckpoint() + .IncludeActiveTime() + .IncludeCDCSDKSafeTime())); DCHECK(parent_entry_opt); } @@ -3443,17 +3448,24 @@ Status CatalogManager::UpdateCDCProducerOnTabletSplit( } } - // Insert children entries into cdc_state now, set the opid to 0.0 and the timestamp to - // NULL. When we process the parent's SPLIT_OP in GetChanges, we will update the opid to - // the SPLIT_OP so that the children pollers continue from the next records. When we process - // the first GetChanges for the children, then their timestamp value will be set. We use - // this information to know that the children has been polled for. Once both children have - // been polled for, then we can delete the parent tablet via the bg task + // Insert children entries into cdc_state now. In case of logical replication set the opid to + // parent entry's opid. The split will be detected in the immediate next GetChanges call on + // this tablet and we will transition to the children tablets. In other cases, set the opid to + // 0.0 and the timestamp to NULL. When we process the parent's SPLIT_OP in GetChanges, we will + // update the opid to the SPLIT_OP so that the children pollers continue from the next + // records. When we process the first GetChanges for the children, then their timestamp value + // will be set. We use this information to know that the children has been polled for. Once + // both children have been polled for, then we can delete the parent tablet via the bg task // DoProcessXClusterParentTabletDeletion. for (const auto& child_tablet_id : {split_tablet_ids.children.first, split_tablet_ids.children.second}) { cdc::CDCStateTableEntry entry(child_tablet_id, stream->StreamId()); - entry.checkpoint = OpId().Min(); + if (!stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + DCHECK(parent_entry_opt->checkpoint); + entry.checkpoint = *parent_entry_opt->checkpoint; + } else { + entry.checkpoint = OpId().Min(); + } if (stream_type == cdc::CDCSDK) { entry.active_time = last_active_time; @@ -4435,7 +4447,6 @@ bool CatalogManager::IsTablePartOfCDCSDK( std::unordered_set CatalogManager::GetCDCSDKStreamsForTable( const TableId& table_id) const { - SharedLock lock(mutex_); DCHECK(xrepl_maps_loaded_); auto table_ids = FindOrNull(cdcsdk_tables_to_stream_map_, table_id); if (!table_ids) { @@ -4444,6 +4455,74 @@ std::unordered_set CatalogManager::GetCDCSDKStreamsForTable( return *table_ids; } +Result CatalogManager::GetMinRestartTimeAcrossSlots( + const std::unordered_set& stream_ids) { + if (stream_ids.empty()) { + return HybridTime::kInvalid; + } + + // All these streams are on the same tablet (hence same DB). We can only have either logical + // replication or gRPC replication on any DB. If the first stream present in cdc_stream_map_ + // among the ones passed, does't contain replication slot name it means that this DB has gRPC + // streams. + for (const auto stream_id : stream_ids) { + if (cdc_stream_map_.contains(stream_id)) { + if (cdc_stream_map_[stream_id]->GetCdcsdkYsqlReplicationSlotName().empty()) { + return HybridTime::kInvalid; + } + break; + } + } + + HybridTime min_restart_time_across_all_slots; + for (const auto& stream_id : stream_ids) { + auto entry_opt = VERIFY_RESULT(cdc_state_table_->TryFetchEntry( + {kCDCSDKSlotEntryTabletId, stream_id}, + cdc::CDCStateTableEntrySelector().IncludeRecordIdCommitTime())); + RSTATUS_DCHECK(entry_opt, NotFound, "Slot entry not found for stream_id : {}", stream_id); + RSTATUS_DCHECK( + entry_opt->record_id_commit_time.has_value(), NotFound, + "Restart time not found for slot with stream_id : {}", stream_id); + min_restart_time_across_all_slots = + std::min(min_restart_time_across_all_slots, HybridTime(*entry_opt->record_id_commit_time)); + } + + return min_restart_time_across_all_slots; +} + +Result>> +CatalogManager::GetCDCSDKStreamCreationTimeMap( + const std::unordered_set& stream_ids) { + std::unordered_map> stream_creation_time_map; + for (const auto stream_id : stream_ids) { + auto stream = FindPtrOrNull(cdc_stream_map_, stream_id); + RSTATUS_DCHECK( + stream, NotFound, "Entry for stream: {} not found in cdc_stream_map_", stream_id); + + auto stream_lock_pb = stream->LockForRead()->pb; + if (stream_lock_pb.has_stream_creation_time()) { + stream_creation_time_map.emplace(stream_id, stream_lock_pb.stream_creation_time()); + } else { + stream_creation_time_map.emplace(stream_id, std::nullopt); + } + } + return stream_creation_time_map; +} + +bool CatalogManager::IsCDCSDKTabletExpiredOrNotOfInterest( + HybridTime last_active_time, std::optional stream_creation_time) { + if (last_active_time.AddMilliseconds(FLAGS_cdc_intent_retention_ms) < Clock()->Now()) { + return true; + } + + auto not_of_interest_limit_secs = + GetAtomicFlag(&FLAGS_cdcsdk_tablet_not_of_interest_timeout_secs) + 2; + if (!stream_creation_time.has_value() || last_active_time != *stream_creation_time || + last_active_time.AddSeconds(not_of_interest_limit_secs) > Clock()->Now()) { + return false; + } + return true; +} void CatalogManager::RunXReplBgTasks(const LeaderEpoch& epoch) { if (!FLAGS_TEST_cdcsdk_disable_deleted_stream_cleanup) { @@ -4724,7 +4803,6 @@ Status CatalogManager::DoProcessCDCSDKTabletDeletion() { } std::unordered_set tablets_to_delete; - std::vector entries_to_update; std::vector entries_to_delete; // Check cdc_state table to see if the children tablets are being polled. @@ -4735,8 +4813,18 @@ Status CatalogManager::DoProcessCDCSDKTabletDeletion() { continue; } - // For each hidden tablet, check if for each stream we have an entry in the mapping for them. - const auto stream_ids = GetCDCSDKStreamsForTable(hidden_tablet.table_id_); + // For each hidden tablet, get all the streams on it and related info about the streams. + std::unordered_set stream_ids; + HybridTime min_restart_time_across_slots; + std::unordered_map> stream_creation_time_map; + { + SharedLock lock(mutex_); + stream_ids = GetCDCSDKStreamsForTable(hidden_tablet.table_id_); + min_restart_time_across_slots = VERIFY_RESULT(GetMinRestartTimeAcrossSlots(stream_ids)); + if (min_restart_time_across_slots.is_valid()) { + stream_creation_time_map = VERIFY_RESULT(GetCDCSDKStreamCreationTimeMap(stream_ids)); + } + } size_t count_tablet_streams_to_delete = 0; size_t count_streams_already_deleted = 0; @@ -4746,8 +4834,10 @@ Status CatalogManager::DoProcessCDCSDKTabletDeletion() { // If the entry for the tablet does not exist, then we can go ahead with deletion of the // tablet. auto entry_opt = VERIFY_RESULT(cdc_state_table_->TryFetchEntry( - {tablet_id, stream_id}, - cdc::CDCStateTableEntrySelector().IncludeCheckpoint().IncludeLastReplicationTime())); + {tablet_id, stream_id}, cdc::CDCStateTableEntrySelector() + .IncludeCheckpoint() + .IncludeLastReplicationTime() + .IncludeActiveTime())); // This means we already deleted the entry for this stream in a previous iteration. if (!entry_opt) { @@ -4801,6 +4891,24 @@ Status CatalogManager::DoProcessCDCSDKTabletDeletion() { // Also delete the parent tablet from cdc_state for all completed streams. entries_to_delete.emplace_back(cdc::CDCStateTableKey{tablet_id, stream_id}); count_tablet_streams_to_delete++; + continue; + } + + // This is the case where the tablet is not being polled by the replication slot corresponding + // to the stream id. We can delete the hidden tablet if: + // 1. The min_restart_time_across_slots is greater than the hide time of the hidden tablet. + // 2. If the tablet has expired or become not of interest. This is because if a tablet + // has expired or become not of interest, all its barriers will be lifted, hence making it + // unconsumable for CDC. + // Also delete the parent tablet entry from cdc_state table. + DCHECK(entry_opt->active_time); + auto last_active_time = HybridTime::FromMicros(*entry_opt->active_time); + if (min_restart_time_across_slots.is_valid() && + (hidden_tablet.hide_time_ < min_restart_time_across_slots || + IsCDCSDKTabletExpiredOrNotOfInterest( + last_active_time, stream_creation_time_map.at(stream_id)))) { + entries_to_delete.emplace_back(cdc::CDCStateTableKey{tablet_id, stream_id}); + count_tablet_streams_to_delete++; } } @@ -4809,13 +4917,7 @@ Status CatalogManager::DoProcessCDCSDKTabletDeletion() { } } - Status s = cdc_state_table_->UpdateEntries(entries_to_update); - if (!s.ok()) { - LOG(ERROR) << "Unable to flush operations to update cdc streams: " << s; - return s.CloneAndPrepend("Error updating cdc stream rows from cdc_state table"); - } - - s = cdc_state_table_->DeleteEntries(entries_to_delete); + auto s = cdc_state_table_->DeleteEntries(entries_to_delete); if (!s.ok()) { LOG(ERROR) << "Unable to flush operations to delete cdc streams: " << s; return s.CloneAndPrepend("Error deleting cdc stream rows from cdc_state table");