Skip to content

Commit

Permalink
[#24064] DocDB: Write tombstone to reverse vector index when vector i…
Browse files Browse the repository at this point in the history
…s updated or removed

Summary:
When vector column is updated we generate new vector id and add reverse index entry to the regular DB.
Then during vector index search we query reverse index for found vector id and check whether existing column has the same id.
It is used to understand whether this vector is valid for particular read time.

The similar logic is applied when row is deleted. The referenced row should exist.

So to check single vector we need 2 queries to RocksDB.
One to fetch reverse index record, then query main table to check whether particular row was updated or deleted.

This diff adds logic to write tombstone entries for obsolete vector ids.
So we don't have to perform query to main table to check whether vector is valid.

**Upgrade/Rollback safety:** New fields are used only by not yet released code.
Jira: DB-12955

Test Plan: PgVectorIndexTest.DeleteAndUpdate/*

Reviewers: arybochkin

Reviewed By: arybochkin

Subscribers: ybase, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D42176
  • Loading branch information
spolitov committed Mar 4, 2025
1 parent c24c294 commit 4789974
Show file tree
Hide file tree
Showing 23 changed files with 251 additions and 89 deletions.
31 changes: 19 additions & 12 deletions src/yb/common/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ void Schema::CopyFrom(const Schema& other) {

has_nullables_ = other.has_nullables_;
has_statics_ = other.has_statics_;
vector_column_ids_ = other.vector_column_ids_;
table_properties_ = other.table_properties_;
cotable_id_ = other.cotable_id_;
colocation_id_ = other.colocation_id_;
Expand All @@ -376,11 +377,16 @@ void Schema::ResetColumnIds(const vector<ColumnId>& ids) {
col_ids_ = ids;
id_to_index_.clear();
max_col_id_ = 0;
vector_column_ids_.clear();
for (size_t i = 0; i < ids.size(); ++i) {
if (ids[i] > max_col_id_) {
max_col_id_ = ids[i];
}
id_to_index_.set(ids[i], narrow_cast<int>(i));

if (cols_[i].is_vector()) {
vector_column_ids_.push_back(ids[i]);
}
}
}

Expand Down Expand Up @@ -638,18 +644,19 @@ Result<const QLValuePB&> Schema::GetMissingValueByColumnId(ColumnId id) const {

bool Schema::TEST_Equals(const Schema& lhs, const Schema& rhs) {
return lhs.Equals(rhs) &&
YB_STRUCT_EQUALS(num_hash_key_columns_,
max_col_id_,
col_ids_,
col_offsets_,
name_to_index_,
name_to_index_,
id_to_index_,
has_nullables_,
has_statics_,
cotable_id_,
colocation_id_,
pgschema_name_);
YB_CLASS_EQUALS(num_hash_key_columns,
max_col_id,
col_ids,
col_offsets,
name_to_index,
name_to_index,
id_to_index,
has_nullables,
has_statics,
vector_column_ids,
cotable_id,
colocation_id,
pgschema_name);
}

// ============================================================================
Expand Down
13 changes: 11 additions & 2 deletions src/yb/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ class Schema : public MissingValueProvider {
NameToIndexMap::hasher(),
NameToIndexMap::key_equal(),
NameToIndexMapAllocator(&name_to_index_bytes_)),
has_nullables_(false),
cotable_id_(Uuid::Nil()),
colocation_id_(kColocationIdNotSet),
pgschema_name_("") {
Expand Down Expand Up @@ -615,6 +614,14 @@ class Schema : public MissingValueProvider {
return cols_.size();
}

bool has_vectors() const {
return !vector_column_ids_.empty();
}

const std::vector<ColumnId>& vector_column_ids() const {
return vector_column_ids_;
}

// Return the length of the key prefix in this schema.
size_t num_key_columns() const {
return num_key_columns_;
Expand Down Expand Up @@ -1104,11 +1111,13 @@ class Schema : public MissingValueProvider {
IdMapping id_to_index_;

// Cached indicator whether any columns are nullable.
bool has_nullables_;
bool has_nullables_ = false;

// Cached indicator whether any columns are static.
bool has_statics_ = false;

std::vector<ColumnId> vector_column_ids_;

TableProperties table_properties_;

// Uuid of the non-primary table this schema belongs to co-located in a tablet. Nil for the
Expand Down
3 changes: 3 additions & 0 deletions src/yb/docdb/doc_write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,9 @@ void DocWriteBatch::MoveToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) const {
kv_pair->dup_key(entry.key);
kv_pair->dup_value(entry.value);
}
if (!delete_vector_ids_.empty()) {
kv_pb->dup_delete_vector_ids(delete_vector_ids_.AsSlice());
}
MoveLocksToWriteBatchPB(kv_pb, /* is_lock= */ true);
if (has_ttl()) {
kv_pb->set_ttl(ttl_ns());
Expand Down
7 changes: 7 additions & 0 deletions src/yb/docdb/doc_write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "yb/util/monotime.h"
#include "yb/util/operation_counter.h"

#include "yb/vector_index/vector_index_fwd.h"

namespace yb {
namespace docdb {

Expand Down Expand Up @@ -325,6 +327,10 @@ class DocWriteBatch {
doc_read_context_ = doc_read_context;
}

void DeleteVectorId(const vector_index::VectorId& id) {
delete_vector_ids_.Append(id.AsSlice());
}

private:
struct LazyIterator;

Expand Down Expand Up @@ -386,6 +392,7 @@ class DocWriteBatch {
EncodedDocHybridTime packed_row_write_time_;

MonoDelta ttl_;
ValueBuffer delete_vector_ids_;
};

// A helper handler for converting a RocksDB write batch to a string.
Expand Down
2 changes: 2 additions & 0 deletions src/yb/docdb/docdb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ message KeyValueWriteBatchPB {
// Only relevant when the write op was launched for a session advisory lock request. Used by the
// wait-queue to resume deadlocked requests.
optional uint64 pg_session_req_version = 15;

optional bytes delete_vector_ids = 17;
}

message PerDbFilterPB {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/docdb/docdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ Status DocDBRocksDBUtil::PopulateRocksDBWriteBatch(
partial_range_key_intents, /* replicated_batches_state= */ Slice(), intra_txn_write_id_,
/* applier= */ nullptr);
DirectWriteToWriteBatchHandler handler(rocksdb_write_batch);
RETURN_NOT_OK(writer.Apply(&handler));
RETURN_NOT_OK(writer.Apply(handler));
intra_txn_write_id_ = writer.intra_txn_write_id();
} else {
// TODO: this block has common code with docdb::PrepareExternalWriteBatch and probably
Expand Down
55 changes: 53 additions & 2 deletions src/yb/docdb/pgsql_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,9 @@ Status VerifyNoColsMarkedForDeletion(const Schema& schema, const PgsqlWriteReque

template<class PB>
void InitProjection(const Schema& schema, const PB& request, dockv::ReaderProjection* projection) {
if (!request.col_refs().empty()) {
if (schema.has_vectors()) {
projection->Init(schema, request.col_refs(), schema.vector_column_ids());
} else if (!request.col_refs().empty()) {
projection->Init(schema, request.col_refs());
} else {
// Compatibility: Either request indeed has no column refs, or it comes from a legacy node.
Expand Down Expand Up @@ -590,6 +592,10 @@ class FilteringIterator {
return *iterator_holder_;
}

bool has_filter() const {
return filter_.has_value();
}

private:
Status InitCommon(
const PgsqlReadRequestPB& request, const Schema& schema,
Expand Down Expand Up @@ -935,9 +941,12 @@ class PgsqlVectorFilter {
auto key = dockv::VectorIdKey(vector_id);
// TODO(vector_index) handle failure
auto ybctid = CHECK_RESULT(iter_.impl().FetchDirect(key.AsSlice()));
if (ybctid.empty()) {
if (ybctid.empty() || ybctid[0] == dockv::ValueEntryTypeAsChar::kTombstone) {
return false;
}
if (!iter_.has_filter()) {
return true;
}
if (need_refresh_) {
iter_.Refresh();
}
Expand Down Expand Up @@ -1494,6 +1503,10 @@ Status PgsqlWriteOperation::ApplyUpdate(const DocOperationApplyData& data) {
ExpressionHelper expression_helper;
RETURN_NOT_OK(expression_helper.Init(schema, projection(), request_, table_row));

if (schema.has_vectors()) {
RETURN_NOT_OK(HandleUpdatedVectorIds(data, table_row));
}

skipped = request_.column_new_values().empty();
const size_t num_non_key_columns = schema.num_columns() - schema.num_key_columns();
if (FLAGS_ysql_enable_pack_full_row_update &&
Expand Down Expand Up @@ -1624,6 +1637,10 @@ Status PgsqlWriteOperation::ApplyDelete(
RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(
DocPath(encoded_doc_key_.as_slice()), data.read_operation_data));

if (doc_read_context_->schema().has_vectors()) {
RETURN_NOT_OK(HandleDeletedVectorIds(data, table_row));
}

RETURN_NOT_OK(PopulateResultSet(&table_row));

response_->set_rows_affected_count(num_deleted);
Expand Down Expand Up @@ -1878,6 +1895,40 @@ Status PgsqlWriteOperation::GetDocPaths(GetDocPathsMode mode,
return Status::OK();
}

Status PgsqlWriteOperation::HandleUpdatedVectorIds(
const DocOperationApplyData& data, const dockv::PgTableRow& table_row) {
const auto& schema = doc_read_context_->schema();
for (const auto& column_value : request_.column_new_values()) {
ColumnId column_id(column_value.column_id());
const auto& column = VERIFY_RESULT_REF(schema.column_by_id(column_id));
if (!column.is_vector()) {
continue;
}
RETURN_NOT_OK(FillRemovedVectorId(data, table_row, column_id));
}
return Status::OK();
}

Status PgsqlWriteOperation::HandleDeletedVectorIds(
const DocOperationApplyData& data, const dockv::PgTableRow& table_row) {
for (const auto& column_id : doc_read_context_->schema().vector_column_ids()) {
RETURN_NOT_OK(FillRemovedVectorId(data, table_row, column_id));
}
return Status::OK();
}

Status PgsqlWriteOperation::FillRemovedVectorId(
const DocOperationApplyData& data, const dockv::PgTableRow& table_row, ColumnId column_id) {
auto old_vector_value = table_row.GetValueByColumnId(column_id);
if (!old_vector_value) {
return Status::OK();
}
auto vector_value = dockv::EncodedDocVectorValue::FromSlice(old_vector_value->binary_value());
VLOG_WITH_FUNC(4) << "Old vector id: " << AsString(vector_value.DecodeId());
data.doc_write_batch->DeleteVectorId(VERIFY_RESULT(vector_value.DecodeId()));
return Status::OK();
}

class PgsqlReadRequestYbctidProvider {
public:
explicit PgsqlReadRequestYbctidProvider(
Expand Down
10 changes: 10 additions & 0 deletions src/yb/docdb/pgsql_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ class PgsqlWriteOperation :
const PgsqlColumnValuePB& column_value, dockv::PgTableRow* returning_table_row,
qlexpr::QLExprResult* result, RowPackContext* pack_context);

// Handle removal of a single vector caused by any reason.
Status FillRemovedVectorId(
const DocOperationApplyData& data, const dockv::PgTableRow& table_row, ColumnId column_id);
// Handle removal of vectors caused by applying DELETE statement.
Status HandleDeletedVectorIds(
const DocOperationApplyData& data, const dockv::PgTableRow& table_row);
// Handle removal of vectors caused by applying UPDATE statement.
Status HandleUpdatedVectorIds(
const DocOperationApplyData& data, const dockv::PgTableRow& table_row);

const dockv::ReaderProjection& projection() const;

//------------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 4789974

Please sign in to comment.