Skip to content

Commit

Permalink
[#25562] CDC: Move restart time forward when there are no writes
Browse files Browse the repository at this point in the history
Summary:
In this diff, we introduce the concept of Virtual WAL Safe time. Virtual WAL safe time is defined as the commit time of the last commit or safepoint record popped from the priority queue. Upon Init VWAL, VWAL safe time is initialised equal to the restart time (record id commit time field of the slot entry).  The VWAL safe time moves forward as the streaming progresses. It is the threshold time, whereby the records having commit time lesser than this, if encountered by VWAL, will be filtered out. LSN will not be generated for such filtered records.

In addition to this, changes have been introduced in this diff to move the restart time forward when VWAL has received acknowledgement for every record it has shipped. A new tserver flag `cdcsdk_update_restart_time_interval_secs`  (default value 1 minute) governs the interval after which we check if the last received restart lsn is same as the last lsn shipped by the VWAL. If these two are found to be equal, then we go ahead and update the restart time to the VWAL safe time. All the maps are updated accordingly and pub refresh times list is trimmed as per the new restart time. The new restart time is also persisted in the slot entry in cdc_state table.

All the changes for calculating the VWAL safe time and moving the restart time forward, are guarded by a default true runtime flag `cdcsdk_update_restart_time_when_nothing_to_stream`.
Jira: DB-14815

Test Plan:
./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestMovingRestartTimeForwardWhenNothingToStream
./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestVWALSafeTimeWithDynamicTableAddition

Reviewers: skumar, siddharth.shah, utkarsh.munjal

Reviewed By: siddharth.shah

Subscribers: ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D41834
  • Loading branch information
Sumukh-Phalgaonkar committed Mar 4, 2025
1 parent f85bbca commit a011f3b
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/yb/cdc/cdcsdk_unique_record_id.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ CDCSDKUniqueRecordID::CDCSDKUniqueRecordID(

uint64_t CDCSDKUniqueRecordID::GetCommitTime() const { return commit_time_; }

RowMessage_Op CDCSDKUniqueRecordID::GetOp() const { return op_; }

VWALRecordType CDCSDKUniqueRecordID::GetVWALRecordTypeFromOp(
const bool& is_publication_refresh, const RowMessage_Op& op) {
if (is_publication_refresh) {
Expand Down
2 changes: 2 additions & 0 deletions src/yb/cdc/cdcsdk_unique_record_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class CDCSDKUniqueRecordID {

uint64_t GetCommitTime() const;

RowMessage_Op GetOp() const;

std::string ToString() const;

bool IsPublicationRefreshRecord() const;
Expand Down
126 changes: 118 additions & 8 deletions src/yb/cdc/cdcsdk_virtual_wal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ DEFINE_RUNTIME_bool(cdc_use_byte_threshold_for_vwal_changes, true,
"cdcsdk_max_consistent_records flag.");
TAG_FLAG(cdc_use_byte_threshold_for_vwal_changes, advanced);

DEFINE_RUNTIME_uint64(cdcsdk_update_restart_time_interval_secs, 60 /* 1 min */,
"We will check if the restart lsn is equal to the last shipped lsn periodically and move the "
"restart time forward. This flag determines the periodicity of this operation.");
TAG_FLAG(cdcsdk_update_restart_time_interval_secs, advanced);

DEFINE_RUNTIME_bool(cdcsdk_update_restart_time_when_nothing_to_stream, true,
"When this flag is enabled, the restart time would be moved forward to the commit time of the "
"most recently popped COMMIT / SAFEPOINT record from the priority queue iff restart lsn is "
"equal to the last shipped lsn");
TAG_FLAG(cdcsdk_update_restart_time_when_nothing_to_stream, advanced);

DECLARE_uint64(cdc_stream_records_threshold_size_bytes);
DECLARE_bool(ysql_yb_enable_consistent_replication_from_hash_range);

Expand Down Expand Up @@ -430,6 +441,12 @@ Status CDCSDKVirtualWAL::InitLSNAndTxnIDGenerators(
Format(
"Couldnt find restart_lsn on the slot's cdc_state entry for stream_id: $0", stream_id_));

RSTATUS_DCHECK_GT(
*entry_opt.confirmed_flush_lsn, 0, NotFound,
Format(
"Couldnt find confirmed_flush_lsn on the slot's cdc_state entry for stream_id: $0",
stream_id_));

RSTATUS_DCHECK_GT(
*entry_opt.xmin, 0, NotFound,
Format("Couldnt find xmin on the slot's cdc_state entry for stream_id: $0", stream_id_));
Expand All @@ -449,6 +466,7 @@ Status CDCSDKVirtualWAL::InitLSNAndTxnIDGenerators(

last_seen_lsn_ = *entry_opt.restart_lsn;
last_received_restart_lsn = *entry_opt.restart_lsn;
last_received_confirmed_flush_lsn_ = *entry_opt.confirmed_flush_lsn;

last_seen_txn_id_ = *entry_opt.xmin;

Expand All @@ -475,6 +493,8 @@ Status CDCSDKVirtualWAL::InitLSNAndTxnIDGenerators(
last_shipped_commit.commit_record_unique_id = last_seen_unique_record_id_;
last_shipped_commit.last_pub_refresh_time = last_pub_refresh_time;

virtual_wal_safe_time_ = HybridTime(commit_time);

return Status::OK();
}

Expand Down Expand Up @@ -536,6 +556,7 @@ Status CDCSDKVirtualWAL::GetConsistentChangesInternal(

// We never ship safepoint record to the walsender.
if (record->row_message().op() == RowMessage_Op_SAFEPOINT) {
RETURN_NOT_OK(ValidateAndUpdateVWALSafeTime(*unique_id));
continue;
}

Expand Down Expand Up @@ -664,6 +685,7 @@ Status CDCSDKVirtualWAL::GetConsistentChangesInternal(
!metadata.is_last_txn_fully_sent) {
metadata.is_last_txn_fully_sent = true;
}
RETURN_NOT_OK(ValidateAndUpdateVWALSafeTime(*unique_id));
break;
}

Expand All @@ -683,6 +705,11 @@ Status CDCSDKVirtualWAL::GetConsistentChangesInternal(
}
}

auto s = UpdateRestartTimeIfRequired();
if (!s.ok()) {
LOG_WITH_PREFIX(WARNING) << "Could not update restart time for stream id: " << stream_id_
<< ", because: " << s.ToString();
}
std::ostringstream oss;
if (resp->cdc_sdk_proto_records_size() == 0) {
oss.clear();
Expand Down Expand Up @@ -935,6 +962,19 @@ Status CDCSDKVirtualWAL::AddRecordToVirtualWalPriorityQueue(
if (result) {
auto unique_id = std::make_shared<CDCSDKUniqueRecordID>(
CDCSDKUniqueRecordID(is_publication_refresh_record, record));

if (GetAtomicFlag(&FLAGS_cdcsdk_update_restart_time_when_nothing_to_stream) &&
virtual_wal_safe_time_.is_valid() &&
unique_id->GetCommitTime() < virtual_wal_safe_time_.ToUint64()) {
VLOG_WITH_PREFIX(3) << "Received a record with commit time lesser than virtual wal "
"safe time. The unique id for filtered record: "
<< unique_id->ToString()
<< " . virtual_wal_safe_time_: " << virtual_wal_safe_time_.ToUint64()
<< ". The filtered record is: " << record->ShortDebugString();
tablet_queue->pop();
continue;
}

sorted_records->push({tablet_id, {unique_id, record}});
break;
} else {
Expand Down Expand Up @@ -999,6 +1039,54 @@ Result<TabletRecordInfoPair> CDCSDKVirtualWAL::FindConsistentRecord(
return tablet_record_info_pair;
}

Status CDCSDKVirtualWAL::ValidateAndUpdateVWALSafeTime(const CDCSDKUniqueRecordID& popped_record) {
if (!GetAtomicFlag(&FLAGS_cdcsdk_update_restart_time_when_nothing_to_stream)) {
return Status::OK();
}

if (popped_record.GetOp() != RowMessage_Op_COMMIT &&
popped_record.GetOp() != RowMessage_Op_SAFEPOINT) {
return Status::OK();
}

DCHECK(virtual_wal_safe_time_.is_valid());
// The virtual wal safe time should be non decreasing. We allow the popped record's commit time to
// be equal to virtual_wal_safe_time_ here because there can be multiple commit / safepoint
// records with same commit time in the priority queue. Ideally we should never get a record which
// fails this check since we filter while inserting to the priority queue.
RSTATUS_DCHECK(
popped_record.GetCommitTime() >= virtual_wal_safe_time_.ToUint64(), IllegalState,
"Received a record with commit time: {} lesser than the Virtual WAL safe "
"time: {}. This record will not be shipped, filtered record: {}",
popped_record.GetCommitTime(), virtual_wal_safe_time_.ToUint64(), popped_record.ToString());

virtual_wal_safe_time_ = HybridTime(popped_record.GetCommitTime());
return Status::OK();
}

Status CDCSDKVirtualWAL::UpdateRestartTimeIfRequired() {
if (!GetAtomicFlag(&FLAGS_cdcsdk_update_restart_time_when_nothing_to_stream)) {
return Status::OK();
}

auto current_time = HybridTime::FromMicros(GetCurrentTimeMicros());
if (last_restart_lsn_read_time_.is_valid() &&
current_time.PhysicalDiff(last_restart_lsn_read_time_) <
static_cast<int64>(GetAtomicFlag(&FLAGS_cdcsdk_update_restart_time_interval_secs))) {
return Status::OK();
}

last_restart_lsn_read_time_ = current_time;

if (last_received_restart_lsn == last_seen_lsn_) {
RETURN_NOT_OK(UpdateAndPersistLSNInternal(
last_received_confirmed_flush_lsn_, last_received_restart_lsn,
true /* use_vwal_safe_time */));
}

return Status::OK();
}

Result<uint64_t> CDCSDKVirtualWAL::GetRecordLSN(
const std::shared_ptr<CDCSDKUniqueRecordID>& curr_unique_record_id) {
// We want to stream all records with the same commit_time as a single transaction even if the
Expand Down Expand Up @@ -1057,7 +1145,8 @@ Status CDCSDKVirtualWAL::AddEntryForBeginRecord(const RecordInfo& record_info) {
}

Result<uint64_t> CDCSDKVirtualWAL::UpdateAndPersistLSNInternal(
const uint64_t confirmed_flush_lsn, const uint64_t restart_lsn_hint) {
const uint64_t confirmed_flush_lsn, const uint64_t restart_lsn_hint,
const bool use_vwal_safe_time) {
if (restart_lsn_hint < last_received_restart_lsn) {
return STATUS_FORMAT(
IllegalState, Format(
Expand All @@ -1067,6 +1156,10 @@ Result<uint64_t> CDCSDKVirtualWAL::UpdateAndPersistLSNInternal(

CommitRecordMetadata record_metadata = last_shipped_commit;
if (restart_lsn_hint < last_shipped_commit.commit_lsn) {
RSTATUS_DCHECK(
!use_vwal_safe_time, IllegalState,
"When trying to move the restart time to VWAL safe time we should always have restart_lsn "
"equal to the last_seen_lsn.");
RETURN_NOT_OK(TruncateMetaMap(restart_lsn_hint));
record_metadata = commit_meta_and_last_req_map_.begin()->second.record_metadata;
VLOG_WITH_PREFIX(2) << "Restart_lsn " << restart_lsn_hint
Expand All @@ -1084,13 +1177,25 @@ Result<uint64_t> CDCSDKVirtualWAL::UpdateAndPersistLSNInternal(
commit_meta_and_last_req_map_.erase(commit_meta_and_last_req_map_.begin(), pos);
}

// Remove the entries from pub_refresh_times which are <= record_metadata.last_pub_refresh_time.
auto pub_refresh_trim_time = use_vwal_safe_time ? virtual_wal_safe_time_.ToUint64()
: record_metadata.last_pub_refresh_time;

// Find the last pub_refresh_time that will be trimmed, i.e. the entry in pub_refresh_times with
// largest value that is <= pub_refresh_trim_time.
auto itr = pub_refresh_times.upper_bound(pub_refresh_trim_time);
uint64_t last_trimmed_pub_refresh_time = 0;
if (itr != pub_refresh_times.begin()) {
last_trimmed_pub_refresh_time = *(--itr);
}

// Remove the entries from pub_refresh_times which are <= pub_refresh_trim_time.
pub_refresh_times.erase(
pub_refresh_times.begin(),
pub_refresh_times.upper_bound(record_metadata.last_pub_refresh_time));
pub_refresh_times.begin(), pub_refresh_times.upper_bound(pub_refresh_trim_time));

RETURN_NOT_OK(UpdateSlotEntryInCDCState(confirmed_flush_lsn, record_metadata));
RETURN_NOT_OK(UpdateSlotEntryInCDCState(
confirmed_flush_lsn, record_metadata, use_vwal_safe_time, last_trimmed_pub_refresh_time));
last_received_restart_lsn = restart_lsn_hint;
last_received_confirmed_flush_lsn_ = confirmed_flush_lsn;

return record_metadata.commit_lsn;
}
Expand Down Expand Up @@ -1125,16 +1230,21 @@ Status CDCSDKVirtualWAL::TruncateMetaMap(const uint64_t restart_lsn) {
}

Status CDCSDKVirtualWAL::UpdateSlotEntryInCDCState(
const uint64_t confirmed_flush_lsn, const CommitRecordMetadata& record_metadata) {
const uint64_t confirmed_flush_lsn, const CommitRecordMetadata& record_metadata,
const bool use_vwal_safe_time, const uint64_t last_trimmed_pub_refresh_time) {
CDCStateTableEntry entry(kCDCSDKSlotEntryTabletId, stream_id_);
entry.confirmed_flush_lsn = confirmed_flush_lsn;
// Also update the return value sent from UpdateAndPersistLSNInternal if the restart_lsn value is
// changed here.
entry.restart_lsn = record_metadata.commit_lsn;
entry.xmin = record_metadata.commit_txn_id;
entry.record_id_commit_time = record_metadata.commit_record_unique_id->GetCommitTime();
entry.record_id_commit_time = use_vwal_safe_time
? virtual_wal_safe_time_.ToUint64()
: record_metadata.commit_record_unique_id->GetCommitTime();
entry.cdc_sdk_safe_time = entry.record_id_commit_time;
entry.last_pub_refresh_time = record_metadata.last_pub_refresh_time;
entry.last_pub_refresh_time = (use_vwal_safe_time && last_trimmed_pub_refresh_time > 0)
? last_trimmed_pub_refresh_time
: record_metadata.last_pub_refresh_time;
entry.pub_refresh_times = GetPubRefreshTimesString();
// Doing an update instead of upsert since we expect an entry for the slot to already exist in
// cdc_state.
Expand Down
24 changes: 22 additions & 2 deletions src/yb/cdc/cdcsdk_virtual_wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class CDCSDKVirtualWAL {

// Returns the actually persisted restart_lsn.
Result<uint64_t> UpdateAndPersistLSNInternal(
const uint64_t confirmed_flush_lsn, const uint64_t restart_lsn_hint);
const uint64_t confirmed_flush_lsn, const uint64_t restart_lsn_hint,
const bool use_vwal_safe_time = false);

Status UpdatePublicationTableListInternal(
const std::unordered_set<TableId>& new_tables, const HostPort hostport,
Expand Down Expand Up @@ -153,7 +154,8 @@ class CDCSDKVirtualWAL {
Status TruncateMetaMap(const uint64_t restart_lsn);

Status UpdateSlotEntryInCDCState(
const uint64_t confirmed_flush_lsn, const CommitRecordMetadata& record_metadata);
const uint64_t confirmed_flush_lsn, const CommitRecordMetadata& record_metadata,
const bool use_vwal_safe_time = false, const uint64_t last_trimmed_pub_refresh_time = 0);

void ResetCommitDecisionVariables();

Expand Down Expand Up @@ -186,6 +188,10 @@ class CDCSDKVirtualWAL {

Status CheckHashRangeConstraints(const CDCStateTableEntry& slot_entry);

Status ValidateAndUpdateVWALSafeTime(const CDCSDKUniqueRecordID& popped_record);

Status UpdateRestartTimeIfRequired();

CDCServiceImpl* cdc_service_;

xrepl::StreamId stream_id_;
Expand Down Expand Up @@ -225,6 +231,10 @@ class CDCSDKVirtualWAL {
// initialised by the restart_lsn stores in the cdc_state's entry for slot.
uint64_t last_received_restart_lsn;

// This will hold the confirmed_flush_lsn value received in the UpdateAndPersistLSN RPC call. It
// will initialised by the confirmed_flush_lsn stores in the cdc_state's entry for slot.
uint64_t last_received_confirmed_flush_lsn_;

// Set to true when a BEGIN record is shipped. Reset to false after we ship the commit record for
// the pg_txn.
bool is_txn_in_progress = false;
Expand Down Expand Up @@ -296,6 +306,16 @@ class CDCSDKVirtualWAL {
std::unique_ptr<ReplicationSlotHashRange> slot_hash_range_;

uint64_t consistent_snapshot_time_;

// The Virtual WAL safe time is a threshold time, whereby the records having commit time lesser
// than this will be filtered out. LSN will not be generated for such records. It is calculated as
// the commit time of the last commit record or the safepoint record popped from the priority
// queue.
HybridTime virtual_wal_safe_time_ = HybridTime::kInvalid;

// The time at which slot entry was last read to compare restart lsn with the last shipped lsn.
HybridTime last_restart_lsn_read_time_ = HybridTime::kInvalid;

};

} // namespace cdc
Expand Down
Loading

0 comments on commit a011f3b

Please sign in to comment.