diff --git a/src/yb/common/schema.cc b/src/yb/common/schema.cc index 806abe2ea396..399d40e2ce36 100644 --- a/src/yb/common/schema.cc +++ b/src/yb/common/schema.cc @@ -362,6 +362,7 @@ void Schema::CopyFrom(const Schema& other) { has_nullables_ = other.has_nullables_; has_statics_ = other.has_statics_; + vector_column_ids_ = other.vector_column_ids_; table_properties_ = other.table_properties_; cotable_id_ = other.cotable_id_; colocation_id_ = other.colocation_id_; @@ -376,11 +377,16 @@ void Schema::ResetColumnIds(const vector& ids) { col_ids_ = ids; id_to_index_.clear(); max_col_id_ = 0; + vector_column_ids_.clear(); for (size_t i = 0; i < ids.size(); ++i) { if (ids[i] > max_col_id_) { max_col_id_ = ids[i]; } id_to_index_.set(ids[i], narrow_cast(i)); + + if (cols_[i].is_vector()) { + vector_column_ids_.push_back(ids[i]); + } } } @@ -638,18 +644,19 @@ Result Schema::GetMissingValueByColumnId(ColumnId id) const { bool Schema::TEST_Equals(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs) && - YB_STRUCT_EQUALS(num_hash_key_columns_, - max_col_id_, - col_ids_, - col_offsets_, - name_to_index_, - name_to_index_, - id_to_index_, - has_nullables_, - has_statics_, - cotable_id_, - colocation_id_, - pgschema_name_); + YB_CLASS_EQUALS(num_hash_key_columns, + max_col_id, + col_ids, + col_offsets, + name_to_index, + name_to_index, + id_to_index, + has_nullables, + has_statics, + vector_column_ids, + cotable_id, + colocation_id, + pgschema_name); } // ============================================================================ diff --git a/src/yb/common/schema.h b/src/yb/common/schema.h index 48ef50ce8bcb..a984850a433a 100644 --- a/src/yb/common/schema.h +++ b/src/yb/common/schema.h @@ -547,7 +547,6 @@ class Schema : public MissingValueProvider { NameToIndexMap::hasher(), NameToIndexMap::key_equal(), NameToIndexMapAllocator(&name_to_index_bytes_)), - has_nullables_(false), cotable_id_(Uuid::Nil()), colocation_id_(kColocationIdNotSet), pgschema_name_("") { @@ -615,6 +614,14 @@ class Schema : public MissingValueProvider { return cols_.size(); } + bool has_vectors() const { + return !vector_column_ids_.empty(); + } + + const std::vector& vector_column_ids() const { + return vector_column_ids_; + } + // Return the length of the key prefix in this schema. size_t num_key_columns() const { return num_key_columns_; @@ -1104,11 +1111,13 @@ class Schema : public MissingValueProvider { IdMapping id_to_index_; // Cached indicator whether any columns are nullable. - bool has_nullables_; + bool has_nullables_ = false; // Cached indicator whether any columns are static. bool has_statics_ = false; + std::vector vector_column_ids_; + TableProperties table_properties_; // Uuid of the non-primary table this schema belongs to co-located in a tablet. Nil for the diff --git a/src/yb/docdb/doc_write_batch.cc b/src/yb/docdb/doc_write_batch.cc index c63da4bb2862..ef2b658c21c7 100644 --- a/src/yb/docdb/doc_write_batch.cc +++ b/src/yb/docdb/doc_write_batch.cc @@ -894,6 +894,9 @@ void DocWriteBatch::MoveToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) const { kv_pair->dup_key(entry.key); kv_pair->dup_value(entry.value); } + if (!delete_vector_ids_.empty()) { + kv_pb->dup_delete_vector_ids(delete_vector_ids_.AsSlice()); + } MoveLocksToWriteBatchPB(kv_pb, /* is_lock= */ true); if (has_ttl()) { kv_pb->set_ttl(ttl_ns()); diff --git a/src/yb/docdb/doc_write_batch.h b/src/yb/docdb/doc_write_batch.h index 9bb2d302b52a..21ece2b2c6a4 100644 --- a/src/yb/docdb/doc_write_batch.h +++ b/src/yb/docdb/doc_write_batch.h @@ -35,6 +35,8 @@ #include "yb/util/monotime.h" #include "yb/util/operation_counter.h" +#include "yb/vector_index/vector_index_fwd.h" + namespace yb { namespace docdb { @@ -325,6 +327,10 @@ class DocWriteBatch { doc_read_context_ = doc_read_context; } + void DeleteVectorId(const vector_index::VectorId& id) { + delete_vector_ids_.Append(id.AsSlice()); + } + private: struct LazyIterator; @@ -386,6 +392,7 @@ class DocWriteBatch { EncodedDocHybridTime packed_row_write_time_; MonoDelta ttl_; + ValueBuffer delete_vector_ids_; }; // A helper handler for converting a RocksDB write batch to a string. diff --git a/src/yb/docdb/docdb.proto b/src/yb/docdb/docdb.proto index faaad1a3c4c4..00cf204eab88 100644 --- a/src/yb/docdb/docdb.proto +++ b/src/yb/docdb/docdb.proto @@ -96,6 +96,8 @@ message KeyValueWriteBatchPB { // Only relevant when the write op was launched for a session advisory lock request. Used by the // wait-queue to resume deadlocked requests. optional uint64 pg_session_req_version = 15; + + optional bytes delete_vector_ids = 17; } message PerDbFilterPB { diff --git a/src/yb/docdb/docdb_util.cc b/src/yb/docdb/docdb_util.cc index 117637e3d0ca..addbcb3c56e7 100644 --- a/src/yb/docdb/docdb_util.cc +++ b/src/yb/docdb/docdb_util.cc @@ -177,7 +177,7 @@ Status DocDBRocksDBUtil::PopulateRocksDBWriteBatch( partial_range_key_intents, /* replicated_batches_state= */ Slice(), intra_txn_write_id_, /* applier= */ nullptr); DirectWriteToWriteBatchHandler handler(rocksdb_write_batch); - RETURN_NOT_OK(writer.Apply(&handler)); + RETURN_NOT_OK(writer.Apply(handler)); intra_txn_write_id_ = writer.intra_txn_write_id(); } else { // TODO: this block has common code with docdb::PrepareExternalWriteBatch and probably diff --git a/src/yb/docdb/pgsql_operation.cc b/src/yb/docdb/pgsql_operation.cc index 0a9f3e095331..ba3a0f76bd9d 100644 --- a/src/yb/docdb/pgsql_operation.cc +++ b/src/yb/docdb/pgsql_operation.cc @@ -499,7 +499,9 @@ Status VerifyNoColsMarkedForDeletion(const Schema& schema, const PgsqlWriteReque template void InitProjection(const Schema& schema, const PB& request, dockv::ReaderProjection* projection) { - if (!request.col_refs().empty()) { + if (schema.has_vectors()) { + projection->Init(schema, request.col_refs(), schema.vector_column_ids()); + } else if (!request.col_refs().empty()) { projection->Init(schema, request.col_refs()); } else { // Compatibility: Either request indeed has no column refs, or it comes from a legacy node. @@ -590,6 +592,10 @@ class FilteringIterator { return *iterator_holder_; } + bool has_filter() const { + return filter_.has_value(); + } + private: Status InitCommon( const PgsqlReadRequestPB& request, const Schema& schema, @@ -935,9 +941,12 @@ class PgsqlVectorFilter { auto key = dockv::VectorIdKey(vector_id); // TODO(vector_index) handle failure auto ybctid = CHECK_RESULT(iter_.impl().FetchDirect(key.AsSlice())); - if (ybctid.empty()) { + if (ybctid.empty() || ybctid[0] == dockv::ValueEntryTypeAsChar::kTombstone) { return false; } + if (!iter_.has_filter()) { + return true; + } if (need_refresh_) { iter_.Refresh(); } @@ -1494,6 +1503,10 @@ Status PgsqlWriteOperation::ApplyUpdate(const DocOperationApplyData& data) { ExpressionHelper expression_helper; RETURN_NOT_OK(expression_helper.Init(schema, projection(), request_, table_row)); + if (schema.has_vectors()) { + RETURN_NOT_OK(HandleUpdatedVectorIds(data, table_row)); + } + skipped = request_.column_new_values().empty(); const size_t num_non_key_columns = schema.num_columns() - schema.num_key_columns(); if (FLAGS_ysql_enable_pack_full_row_update && @@ -1624,6 +1637,10 @@ Status PgsqlWriteOperation::ApplyDelete( RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc( DocPath(encoded_doc_key_.as_slice()), data.read_operation_data)); + if (doc_read_context_->schema().has_vectors()) { + RETURN_NOT_OK(HandleDeletedVectorIds(data, table_row)); + } + RETURN_NOT_OK(PopulateResultSet(&table_row)); response_->set_rows_affected_count(num_deleted); @@ -1878,6 +1895,40 @@ Status PgsqlWriteOperation::GetDocPaths(GetDocPathsMode mode, return Status::OK(); } +Status PgsqlWriteOperation::HandleUpdatedVectorIds( + const DocOperationApplyData& data, const dockv::PgTableRow& table_row) { + const auto& schema = doc_read_context_->schema(); + for (const auto& column_value : request_.column_new_values()) { + ColumnId column_id(column_value.column_id()); + const auto& column = VERIFY_RESULT_REF(schema.column_by_id(column_id)); + if (!column.is_vector()) { + continue; + } + RETURN_NOT_OK(FillRemovedVectorId(data, table_row, column_id)); + } + return Status::OK(); +} + +Status PgsqlWriteOperation::HandleDeletedVectorIds( + const DocOperationApplyData& data, const dockv::PgTableRow& table_row) { + for (const auto& column_id : doc_read_context_->schema().vector_column_ids()) { + RETURN_NOT_OK(FillRemovedVectorId(data, table_row, column_id)); + } + return Status::OK(); +} + +Status PgsqlWriteOperation::FillRemovedVectorId( + const DocOperationApplyData& data, const dockv::PgTableRow& table_row, ColumnId column_id) { + auto old_vector_value = table_row.GetValueByColumnId(column_id); + if (!old_vector_value) { + return Status::OK(); + } + auto vector_value = dockv::EncodedDocVectorValue::FromSlice(old_vector_value->binary_value()); + VLOG_WITH_FUNC(4) << "Old vector id: " << AsString(vector_value.DecodeId()); + data.doc_write_batch->DeleteVectorId(VERIFY_RESULT(vector_value.DecodeId())); + return Status::OK(); +} + class PgsqlReadRequestYbctidProvider { public: explicit PgsqlReadRequestYbctidProvider( diff --git a/src/yb/docdb/pgsql_operation.h b/src/yb/docdb/pgsql_operation.h index a9a5c609a0ca..d95fec3c39ed 100644 --- a/src/yb/docdb/pgsql_operation.h +++ b/src/yb/docdb/pgsql_operation.h @@ -126,6 +126,16 @@ class PgsqlWriteOperation : const PgsqlColumnValuePB& column_value, dockv::PgTableRow* returning_table_row, qlexpr::QLExprResult* result, RowPackContext* pack_context); + // Handle removal of a single vector caused by any reason. + Status FillRemovedVectorId( + const DocOperationApplyData& data, const dockv::PgTableRow& table_row, ColumnId column_id); + // Handle removal of vectors caused by applying DELETE statement. + Status HandleDeletedVectorIds( + const DocOperationApplyData& data, const dockv::PgTableRow& table_row); + // Handle removal of vectors caused by applying UPDATE statement. + Status HandleUpdatedVectorIds( + const DocOperationApplyData& data, const dockv::PgTableRow& table_row); + const dockv::ReaderProjection& projection() const; //------------------------------------------------------------------------------------------------ diff --git a/src/yb/docdb/rocksdb_writer.cc b/src/yb/docdb/rocksdb_writer.cc index 23ce5b81b0b3..767dadf3b1a8 100644 --- a/src/yb/docdb/rocksdb_writer.cc +++ b/src/yb/docdb/rocksdb_writer.cc @@ -94,7 +94,7 @@ void AddIntent( const TransactionId& transaction_id, const FixedSliceParts& key, const SliceParts& value, - rocksdb::DirectWriteHandler* handler, + rocksdb::DirectWriteHandler& handler, Slice reverse_value_prefix = Slice()) { char reverse_key_prefix[1] = { KeyEntryTypeAsChar::kTransactionId }; dockv::DocHybridTimeWordBuffer doc_ht_buffer; @@ -105,21 +105,21 @@ void AddIntent( transaction_id.AsSlice(), doc_ht_slice, }}; - handler->Put(key, value); + handler.Put(key, value); if (reverse_value_prefix.empty()) { - handler->Put(reverse_key, key); + handler.Put(reverse_key, key); } else { std::array reverse_value; reverse_value[0] = reverse_value_prefix; memcpy(&reverse_value[1], key.parts, sizeof(*key.parts) * N); - handler->Put(reverse_key, reverse_value); + handler.Put(reverse_key, reverse_value); } } template void PutApplyState( const Slice& transaction_id_slice, HybridTime commit_ht, IntraTxnWriteId write_id, - const std::array& value_parts, rocksdb::DirectWriteHandler* handler) { + const std::array& value_parts, rocksdb::DirectWriteHandler& handler) { char transaction_apply_state_value_type = KeyEntryTypeAsChar::kTransactionApplyState; char group_end_value_type = KeyEntryTypeAsChar::kGroupEnd; char hybrid_time_value_type = KeyEntryTypeAsChar::kHybridTime; @@ -134,12 +134,12 @@ void PutApplyState( Slice(&hybrid_time_value_type, 1), Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end), }}; - handler->Put(key_parts, value_parts); + handler.Put(key_parts, value_parts); } void HandleRegularRecord( const yb::docdb::LWKeyValuePairPB& kv_pair, HybridTime hybrid_time, - DocHybridTimeBuffer* doc_ht_buffer, rocksdb::DirectWriteHandler* handler, + DocHybridTimeBuffer* doc_ht_buffer, rocksdb::DirectWriteHandler& handler, IntraTxnWriteId* write_id) { #ifndef NDEBUG // Debug-only: ensure all keys we get in Raft replication can be decoded. @@ -167,7 +167,7 @@ void HandleRegularRecord( doc_ht_buffer->EncodeWithValueType(record_hybrid_time, *write_id), }}; Slice key_value = kv_pair.value(); - handler->Put(key_parts, SliceParts(&key_value, 1)); + handler.Put(key_parts, SliceParts(&key_value, 1)); ++(*write_id); } @@ -189,7 +189,7 @@ NonTransactionalWriter::NonTransactionalWriter( bool NonTransactionalWriter::Empty() const { return put_batch_.write_pairs().empty(); } -Status NonTransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { +Status NonTransactionalWriter::Apply(rocksdb::DirectWriteHandler& handler) { DocHybridTimeBuffer doc_ht_buffer; IntraTxnWriteId write_id = 0; @@ -238,11 +238,11 @@ TransactionalWriter::TransactionalWriter( // // Where prefix is just a single byte prefix. TxnId, IntentType, HybridTime all prefixed with // appropriate value type. -Status TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { +Status TransactionalWriter::Apply(rocksdb::DirectWriteHandler& handler) { VLOG(4) << "PrepareTransactionWriteBatch(), write_id = " << write_id_; row_mark_ = GetRowMarkTypeFromPB(put_batch_); - handler_ = handler; + handler_ = &handler; if (metadata_to_store_) { auto txn_value_type = KeyEntryTypeAsChar::kTransactionId; @@ -255,7 +255,7 @@ Status TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { data_copy.set_metadata_write_time(GetCurrentTimeMicros()); auto value = data_copy.SerializeAsString(); Slice value_slice(value); - handler->Put(key, SliceParts(&value_slice, 1)); + handler.Put(key, SliceParts(&value_slice, 1)); } subtransaction_id_ = put_batch_.has_subtransaction() @@ -279,6 +279,24 @@ Status TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { partial_range_key_intents_)); } + if (put_batch_.has_delete_vector_ids()) { + auto key_type = KeyEntryTypeAsChar::kTransactionId; + DocHybridTimeBuffer doc_ht_buffer; + + std::array key = {{ + Slice(&key_type, sizeof(key_type)), + transaction_id_.AsSlice(), + doc_ht_buffer.EncodeWithValueType(hybrid_time_, write_id_++), + }}; + auto value_type = ValueEntryTypeAsChar::kDeleteVectorIds; + std::array value = {{ + Slice(&value_type, sizeof(value_type)), + Slice(put_batch_.delete_vector_ids()), + }}; + + handler.Put(key, value); + } + // Apply advisory locks. for (auto& lock_pair : put_batch_.lock_pairs()) { if (lock_pair.is_lock()) { @@ -291,11 +309,11 @@ Status TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { // Unlock operation. if (lock_pair.lock().key().empty()) { // Unlock All. - RETURN_NOT_OK(applier_->RemoveAdvisoryLocks(transaction_id_, handler_)); + RETURN_NOT_OK(applier_->RemoveAdvisoryLocks(transaction_id_, *handler_)); } else { // Unlock a specific key. RETURN_NOT_OK(applier_->RemoveAdvisoryLock(transaction_id_, lock_pair.lock().key(), - dockv::GetIntentTypesForLock(lock_pair.mode()), handler_)); + dockv::GetIntentTypesForLock(lock_pair.mode()), *handler_)); } } } @@ -371,7 +389,7 @@ Status TransactionalWriter::operator()( if (last_key && FLAGS_enable_transaction_sealing) { reverse_value_prefix = replicated_batches_state_; } - AddIntent(transaction_id_, key_parts, value, handler_, reverse_value_prefix); + AddIntent(transaction_id_, key_parts, value, *handler_, reverse_value_prefix); return Status::OK(); } @@ -430,7 +448,7 @@ Status TransactionalWriter::AddWeakIntent( doc_ht_buffer->EncodeWithValueType(hybrid_time_, write_id_++), }}; - AddIntent(transaction_id_, key, value, handler_); + AddIntent(transaction_id_, key, value, *handler_); return Status::OK(); } @@ -440,7 +458,7 @@ PostApplyMetadataWriter::PostApplyMetadataWriter( : metadatas_{metadatas} { } -Status PostApplyMetadataWriter::Apply(rocksdb::DirectWriteHandler* handler) { +Status PostApplyMetadataWriter::Apply(rocksdb::DirectWriteHandler& handler) { ThreadSafeArena arena; for (const auto& metadata : metadatas_) { std::array metadata_key = {{ @@ -454,7 +472,7 @@ Status PostApplyMetadataWriter::Apply(rocksdb::DirectWriteHandler* handler) { auto value = data.SerializeAsString(); Slice value_slice{value}; - handler->Put(metadata_key, SliceParts(&value_slice, 1)); + handler.Put(metadata_key, SliceParts(&value_slice, 1)); } return Status::OK(); @@ -487,7 +505,7 @@ IntentsWriter::IntentsWriter(const Slice& start_key, rocksdb::CacheRestartBlockKeys::kFalse); } -Status IntentsWriter::Apply(rocksdb::DirectWriteHandler* handler) { +Status IntentsWriter::Apply(rocksdb::DirectWriteHandler& handler) { Slice key_prefix = txn_reverse_index_prefix_.AsSlice(); key_prefix.remove_suffix(1); @@ -523,10 +541,12 @@ Status IntentsWriter::Apply(rocksdb::DirectWriteHandler* handler) { // txn_reverse_index_prefix in size, then they are identical, and we are seeked to transaction // metadata. Otherwise, we're seeked to an intent entry in the index which we may process. if (!metadata) { - if (!reverse_index_value.empty() && reverse_index_value[0] == KeyEntryTypeAsChar::kBitSet) { + if (reverse_index_value.TryConsumeByte(KeyEntryTypeAsChar::kBitSet)) { CHECK(!FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record); - reverse_index_value.remove_prefix(1); RETURN_NOT_OK(OneWayBitmap::Skip(&reverse_index_value)); + } else if (reverse_index_value.TryConsumeByte(ValueEntryTypeAsChar::kDeleteVectorIds)) { + RETURN_NOT_OK(context_.DeleteVectorIds(key_slice, reverse_index_value, handler)); + continue; } } @@ -585,7 +605,7 @@ ApplyIntentsContext::ApplyIntentsContext( } Result ApplyIntentsContext::StoreApplyState( - const Slice& key, rocksdb::DirectWriteHandler* handler) { + const Slice& key, rocksdb::DirectWriteHandler& handler) { SetApplyState(key, write_id_, aborted_); ApplyTransactionStatePB pb; apply_state().ToPB(&pb); @@ -617,7 +637,7 @@ void ApplyIntentsContext::Start(const boost::optional& first_key) { } Result ApplyIntentsContext::Entry( - const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler* handler) { + const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler& handler) { // Value of reverse index is a key of original intent record, so seek it and check match. if (metadata || !IsWithinBounds(key_bounds_, value)) { return false; @@ -692,7 +712,7 @@ Result ApplyIntentsContext::Entry( << ", value: " << intent_value.ToDebugString(); } #endif - handler->Put(key_parts, value_parts); + handler.Put(key_parts, value_parts); } if (vector_indexes_) { @@ -713,7 +733,7 @@ Result ApplyIntentsContext::Entry( } Status ApplyIntentsContext::ProcessVectorIndexes( - rocksdb::DirectWriteHandler* handler, Slice key, Slice value) { + rocksdb::DirectWriteHandler& handler, Slice key, Slice value) { auto sizes = VERIFY_RESULT(dockv::DocKey::EncodedPrefixAndDocKeySizes(key)); if (sizes.doc_key_size < key.size()) { auto entry_type = static_cast(key[sizes.doc_key_size]); @@ -734,7 +754,8 @@ Status ApplyIntentsContext::ProcessVectorIndexes( } if (need_reverse_entry) { auto ybctid = key.Prefix(sizes.doc_key_size).WithoutPrefix(table_key_prefix.size()); - AddVectorIndexReverseEntry(handler, ybctid, value, commit_ht_); + AddVectorIndexReverseEntry( + handler, ybctid, value, DocHybridTime(commit_ht_, write_id_)); need_reverse_entry = false; } } @@ -766,7 +787,7 @@ Status ApplyIntentsContext::ProcessVectorIndexes( template Status ApplyIntentsContext::ProcessVectorIndexesForPackedRow( - rocksdb::DirectWriteHandler* handler, size_t prefix_size, Slice key, Slice value) { + rocksdb::DirectWriteHandler& handler, size_t prefix_size, Slice key, Slice value) { value.consume_byte(); auto schema_version = narrow_cast(VERIFY_RESULT(FastDecodeUnsignedVarInt(&value))); @@ -811,14 +832,15 @@ Status ApplyIntentsContext::ProcessVectorIndexesForPackedRow( columns_added_to_vector_index.resize( std::max(columns_added_to_vector_index.size(), column_index + 1)); if (!columns_added_to_vector_index.test_set(column_index)) { - AddVectorIndexReverseEntry(handler, ybctid, *column_value, commit_ht_); + AddVectorIndexReverseEntry( + handler, ybctid, *column_value, DocHybridTime(commit_ht_, write_id_)); } } } return Status::OK(); } -Status ApplyIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) { +Status ApplyIntentsContext::Complete(rocksdb::DirectWriteHandler& handler) { if (apply_state_) { char tombstone_value_type = ValueEntryTypeAsChar::kTombstone; std::array value_parts = {{Slice(&tombstone_value_type, 1)}}; @@ -836,6 +858,24 @@ Status ApplyIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) { return Status::OK(); } +Status ApplyIntentsContext::DeleteVectorIds( + Slice key, Slice ids, rocksdb::DirectWriteHandler& handler) { + RSTATUS_DCHECK_EQ( + ids.size() % vector_index::VectorId::StaticSize(), 0, Corruption, + Format("Wrong size of deleted vector ids: $0", ids.ToDebugHexString())); + DocHybridTimeBuffer ht_buf; + auto encoded_write_time = ht_buf.EncodeWithValueType(DocHybridTime(commit_ht_, write_id_)); + char tombstone = dockv::ValueEntryTypeAsChar::kTombstone; + Slice value(&tombstone, 1); + while (ids.size() != 0) { + auto id = ids.Prefix(vector_index::VectorId::StaticSize()); + handler.Put( + dockv::VectorIndexReverseEntryKeyParts(id, encoded_write_time), {&value, 1}); + ids.RemovePrefix(vector_index::VectorId::StaticSize()); + } + return Status::OK(); +} + Status FrontierSchemaVersionUpdater::UpdateSchemaVersion(Slice key, Slice value) { if (!frontiers_) { return Status::OK(); @@ -916,25 +956,33 @@ RemoveIntentsContext::RemoveIntentsContext(const TransactionId& transaction_id, } Result RemoveIntentsContext::Entry( - const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler* handler) { + const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler& handler) { if (reached_records_limit()) { SetApplyState(key, 0, SubtxnSet()); return true; } - handler->SingleDelete(key); + handler.SingleDelete(key); YB_TRANSACTION_DUMP(RemoveIntent, transaction_id(), reason_, key); RegisterRecord(); if (!metadata) { - handler->SingleDelete(value); + handler.SingleDelete(value); YB_TRANSACTION_DUMP(RemoveIntent, transaction_id(), reason_, value); RegisterRecord(); } return false; } -Status RemoveIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) { +Status RemoveIntentsContext::Complete(rocksdb::DirectWriteHandler& handler) { + return Status::OK(); +} + +Status RemoveIntentsContext::DeleteVectorIds( + Slice key, Slice ids, rocksdb::DirectWriteHandler& handler) { + handler.SingleDelete(key); + YB_TRANSACTION_DUMP(RemoveIntent, transaction_id(), reason_, key); + RegisterRecord(); return Status::OK(); } @@ -991,7 +1039,7 @@ Result ProcessApplyExternalTransactions( Result NonTransactionalBatchWriter::PrepareApplyExternalIntentsBatch( const Slice& original_input_value, ExternalTxnApplyStateData* apply_data, - rocksdb::DirectWriteHandler* regular_write_handler) { + rocksdb::DirectWriteHandler& regular_write_handler) { bool can_delete_entire_batch = true; auto input_value = original_input_value; DocHybridTimeBuffer doc_ht_buffer; @@ -1053,7 +1101,7 @@ Result NonTransactionalBatchWriter::PrepareApplyExternalIntentsBatch( std::array value_parts = {{ output_value, }}; - regular_write_handler->Put(key_parts, value_parts); + regular_write_handler.Put(key_parts, value_parts); ++apply_data->write_id; // Update min/max schema version. @@ -1066,7 +1114,7 @@ Result NonTransactionalBatchWriter::PrepareApplyExternalIntentsBatch( // Reads all stored external intents for provided transactions and prepares batches that will apply // them into regular db and remove from intents db. Status NonTransactionalBatchWriter::PrepareApplyExternalIntents( - ExternalTxnApplyState* apply_external_transactions, rocksdb::DirectWriteHandler* handler) { + ExternalTxnApplyState* apply_external_transactions, rocksdb::DirectWriteHandler& handler) { KeyBytes key_prefix; KeyBytes key_upperbound; @@ -1106,7 +1154,7 @@ Status NonTransactionalBatchWriter::PrepareApplyExternalIntents( Result NonTransactionalBatchWriter::AddEntryToWriteBatch( const yb::docdb::LWKeyValuePairPB& kv_pair, ExternalTxnApplyState* apply_external_transactions, - rocksdb::DirectWriteHandler* regular_write_handler, IntraTxnWriteId* write_id) { + rocksdb::DirectWriteHandler& regular_write_handler, IntraTxnWriteId* write_id) { SCHECK(!kv_pair.key().empty(), InvalidArgument, "Write pair key cannot be empty."); SCHECK(!kv_pair.value().empty(), InvalidArgument, "Write pair value cannot be empty."); @@ -1158,7 +1206,7 @@ Result NonTransactionalBatchWriter::AddEntryToWriteBatch( return false; } -Status NonTransactionalBatchWriter::Apply(rocksdb::DirectWriteHandler* handler) { +Status NonTransactionalBatchWriter::Apply(rocksdb::DirectWriteHandler& handler) { auto apply_external_transactions = VERIFY_RESULT(ProcessApplyExternalTransactions(put_batch_)); if (!apply_external_transactions.empty()) { DCHECK(intents_db_iter_.Initialized()); diff --git a/src/yb/docdb/rocksdb_writer.h b/src/yb/docdb/rocksdb_writer.h index 482911eb9195..81c5c80fc7ba 100644 --- a/src/yb/docdb/rocksdb_writer.h +++ b/src/yb/docdb/rocksdb_writer.h @@ -38,7 +38,7 @@ class NonTransactionalWriter : public rocksdb::DirectWriter { bool Empty() const; - Status Apply(rocksdb::DirectWriteHandler* handler) override; + Status Apply(rocksdb::DirectWriteHandler& handler) override; private: const LWKeyValueWriteBatchPB& put_batch_; @@ -74,7 +74,7 @@ class TransactionalWriter : public rocksdb::DirectWriter { IntraTxnWriteId intra_txn_write_id, tablet::TransactionIntentApplier* applier); - Status Apply(rocksdb::DirectWriteHandler* handler) override; + Status Apply(rocksdb::DirectWriteHandler& handler) override; IntraTxnWriteId intra_txn_write_id() const { return intra_txn_write_id_; @@ -120,7 +120,7 @@ class PostApplyMetadataWriter : public rocksdb::DirectWriter { public: explicit PostApplyMetadataWriter(std::span metadatas); - Status Apply(rocksdb::DirectWriteHandler* handler) override; + Status Apply(rocksdb::DirectWriteHandler& handler) override; private: std::span metadatas_; @@ -143,9 +143,11 @@ class IntentsWriterContext { // Returns true if we should interrupt iteration, false otherwise. virtual Result Entry( const Slice& key, const Slice& value, bool metadata, - rocksdb::DirectWriteHandler* handler) = 0; + rocksdb::DirectWriteHandler& handler) = 0; - virtual Status Complete(rocksdb::DirectWriteHandler* handler) = 0; + virtual Status DeleteVectorIds(Slice key, Slice ids, rocksdb::DirectWriteHandler& handler) = 0; + + virtual Status Complete(rocksdb::DirectWriteHandler& handler) = 0; const TransactionId& transaction_id() const { return transaction_id_; @@ -186,7 +188,7 @@ class IntentsWriter : public rocksdb::DirectWriter { bool ignore_metadata = false, const Slice& key_to_apply = Slice()); - Status Apply(rocksdb::DirectWriteHandler* handler) override; + Status Apply(rocksdb::DirectWriteHandler& handler) override; bool key_applied() { return key_applied_; } private: @@ -254,16 +256,18 @@ class ApplyIntentsContext : public IntentsWriterContext, public FrontierSchemaVe Result Entry( const Slice& key, const Slice& value, bool metadata, - rocksdb::DirectWriteHandler* handler) override; + rocksdb::DirectWriteHandler& handler) override; + + Status Complete(rocksdb::DirectWriteHandler& handler) override; - Status Complete(rocksdb::DirectWriteHandler* handler) override; + Status DeleteVectorIds(Slice key, Slice ids, rocksdb::DirectWriteHandler& handler) override; private: - Result StoreApplyState(const Slice& key, rocksdb::DirectWriteHandler* handler); - Status ProcessVectorIndexes(rocksdb::DirectWriteHandler* handler, Slice key, Slice value); + Result StoreApplyState(const Slice& key, rocksdb::DirectWriteHandler& handler); + Status ProcessVectorIndexes(rocksdb::DirectWriteHandler& handler, Slice key, Slice value); template Status ProcessVectorIndexesForPackedRow( - rocksdb::DirectWriteHandler* handler, size_t prefix_size, Slice key, Slice value); + rocksdb::DirectWriteHandler& handler, size_t prefix_size, Slice key, Slice value); bool ApplyToRegularDB() const { return apply_to_storages_.TestRegularDB(); @@ -298,9 +302,12 @@ class RemoveIntentsContext : public IntentsWriterContext { Result Entry( const Slice& key, const Slice& value, bool metadata, - rocksdb::DirectWriteHandler* handler) override; + rocksdb::DirectWriteHandler& handler) override; + + Status Complete(rocksdb::DirectWriteHandler& handler) override; + + Status DeleteVectorIds(Slice key, Slice ids, rocksdb::DirectWriteHandler& handler) override; - Status Complete(rocksdb::DirectWriteHandler* handler) override; private: uint8_t reason_; }; @@ -331,27 +338,27 @@ class NonTransactionalBatchWriter : public rocksdb::DirectWriter, rocksdb::WriteBatch* intents_write_batch, SchemaPackingProvider* schema_packing_provider); bool Empty() const; - Status Apply(rocksdb::DirectWriteHandler* handler) override; + Status Apply(rocksdb::DirectWriteHandler& handler) override; private: // Reads all stored external intents for provided transactions and prepares batches that will // apply them into regular db and remove from intents db. Status PrepareApplyExternalIntents( - ExternalTxnApplyState* apply_external_transactions, rocksdb::DirectWriteHandler* handler); + ExternalTxnApplyState* apply_external_transactions, rocksdb::DirectWriteHandler& handler); // Adds external pair to write batch. // Returns true if add was skipped because pair is a regular (non external) record. Result AddEntryToWriteBatch( const yb::docdb::LWKeyValuePairPB& kv_pair, ExternalTxnApplyState* apply_external_transactions, - rocksdb::DirectWriteHandler* regular_write_handler, IntraTxnWriteId* write_id); + rocksdb::DirectWriteHandler& regular_write_handler, IntraTxnWriteId* write_id); // Parse the merged external intent value, and write them to regular writer handler. Also updates // min/max schema version. // Returns true when the entire batch was applied, and false if some intents were skipped. Result PrepareApplyExternalIntentsBatch( const Slice& original_input_value, ExternalTxnApplyStateData* apply_data, - rocksdb::DirectWriteHandler* regular_write_handler); + rocksdb::DirectWriteHandler& regular_write_handler); private: const LWKeyValueWriteBatchPB& put_batch_; diff --git a/src/yb/docdb/vector_index.cc b/src/yb/docdb/vector_index.cc index 9bf3787515f5..858896a4b9e5 100644 --- a/src/yb/docdb/vector_index.cc +++ b/src/yb/docdb/vector_index.cc @@ -315,11 +315,11 @@ Result CreateVectorIndex( } void AddVectorIndexReverseEntry( - rocksdb::DirectWriteHandler* handler, Slice ybctid, Slice value, HybridTime write_ht) { + rocksdb::DirectWriteHandler& handler, Slice ybctid, Slice value, DocHybridTime write_ht) { DocHybridTimeBuffer ht_buf; - auto encoded_write_time = ht_buf.EncodeWithValueType({ write_ht, 0 }); - handler->Put( - dockv::VectorIndexReverseEntryKeyParts(value, encoded_write_time), {&ybctid, 1}); + auto encoded_write_time = ht_buf.EncodeWithValueType(write_ht); + handler.Put( + dockv::VectorIndexReverseEntryKeyPartsForValue(value, encoded_write_time), {&ybctid, 1}); } } // namespace yb::docdb diff --git a/src/yb/docdb/vector_index.h b/src/yb/docdb/vector_index.h index 038e8bb8afae..c9cc0c73c6ce 100644 --- a/src/yb/docdb/vector_index.h +++ b/src/yb/docdb/vector_index.h @@ -86,6 +86,6 @@ Result CreateVectorIndex( extern const std::string kVectorIndexDirPrefix; void AddVectorIndexReverseEntry( - rocksdb::DirectWriteHandler* handler, Slice ybctid, Slice value, HybridTime write_ht); + rocksdb::DirectWriteHandler& handler, Slice ybctid, Slice value, DocHybridTime write_ht); } // namespace yb::docdb diff --git a/src/yb/dockv/primitive_value.cc b/src/yb/dockv/primitive_value.cc index a01bdb115b08..b403750c10a6 100644 --- a/src/yb/dockv/primitive_value.cc +++ b/src/yb/dockv/primitive_value.cc @@ -63,6 +63,7 @@ using yb::util::DecodeDoubleFromKey; case ValueEntryType::kInvalid: [[fallthrough]]; \ case ValueEntryType::kJsonb: [[fallthrough]]; \ case ValueEntryType::kObject: [[fallthrough]]; \ + case ValueEntryType::kDeleteVectorIds: [[fallthrough]]; \ case ValueEntryType::kPackedRowV1: [[fallthrough]]; \ case ValueEntryType::kPackedRowV2: [[fallthrough]]; \ case ValueEntryType::kRedisList: [[fallthrough]]; \ @@ -352,6 +353,8 @@ std::string PrimitiveValue::ValueToString() const { return uuid_val_.ToString(); case ValueEntryType::kVectorId: return Substitute("VectorId($0)", uuid_val_.ToString()); + case ValueEntryType::kDeleteVectorIds: + return ""; case ValueEntryType::kRowLock: return "l"; case ValueEntryType::kArrayIndex: @@ -1422,6 +1425,7 @@ Status PrimitiveValue::DecodeFromValue(const Slice& rocksdb_slice) { } case ValueEntryType::kInvalid: [[fallthrough]]; + case ValueEntryType::kDeleteVectorIds: [[fallthrough]]; case ValueEntryType::kPackedRowV1: [[fallthrough]]; case ValueEntryType::kPackedRowV2: [[fallthrough]]; case ValueEntryType::kMaxByte: @@ -1663,6 +1667,7 @@ Status PrimitiveValue::DecodeToQLValuePB( break; case ValueEntryType::kInvalid: [[fallthrough]]; + case ValueEntryType::kDeleteVectorIds: [[fallthrough]]; case ValueEntryType::kPackedRowV1: [[fallthrough]]; case ValueEntryType::kPackedRowV2: [[fallthrough]]; case ValueEntryType::kRowLock: [[fallthrough]]; diff --git a/src/yb/dockv/value_type.h b/src/yb/dockv/value_type.h index 4dfd11fa4364..b3ca1becad33 100644 --- a/src/yb/dockv/value_type.h +++ b/src/yb/dockv/value_type.h @@ -179,6 +179,7 @@ namespace yb::dockv { ((kTrue, 'T')) /* ASCII code 84 */ \ ((kUInt64, 'U')) /* ASCII code 85 */ \ ((kVectorId, 'V')) /* ASCII code 86 */ \ + ((kDeleteVectorIds, 'W')) /* ASCII code 87 */ \ ((kTombstone, 'X')) /* ASCII code 88 */ \ ((kArrayIndex, '[')) /* ASCII code 91 */ \ ((kCollString, '\\')) /* ASCII code 92 */ \ diff --git a/src/yb/dockv/vector_id.cc b/src/yb/dockv/vector_id.cc index f57e239fe190..fadc512df0ed 100644 --- a/src/yb/dockv/vector_id.cc +++ b/src/yb/dockv/vector_id.cc @@ -129,7 +129,8 @@ KeyBuffer VectorIdKey(vector_index::VectorId vector_id) { return key; } -std::array VectorIndexReverseEntryKeyParts(Slice value, Slice encoded_write_time) { +std::array VectorIndexReverseEntryKeyPartsForValue( + Slice value, Slice encoded_write_time) { return std::array{ Slice(kVectorIdKeyPrefix), EncodedDocVectorValue::FromSlice(value).id, @@ -137,4 +138,12 @@ std::array VectorIndexReverseEntryKeyParts(Slice value, Slice encoded_ }; } +std::array VectorIndexReverseEntryKeyParts(Slice id, Slice encoded_write_time) { + return std::array{ + Slice(kVectorIdKeyPrefix), + id, + encoded_write_time, + }; +} + } // namespace yb::dockv diff --git a/src/yb/dockv/vector_id.h b/src/yb/dockv/vector_id.h index 39d179ee9474..64fb48e55bdb 100644 --- a/src/yb/dockv/vector_id.h +++ b/src/yb/dockv/vector_id.h @@ -62,6 +62,7 @@ class DocVectorValue final { bool IsNull(const DocVectorValue& v); KeyBuffer VectorIdKey(vector_index::VectorId vector_id); -std::array VectorIndexReverseEntryKeyParts(Slice value, Slice encoded_write_time); +std::array VectorIndexReverseEntryKeyParts(Slice id, Slice encoded_write_time); +std::array VectorIndexReverseEntryKeyPartsForValue(Slice value, Slice encoded_write_time); } // namespace yb::dockv diff --git a/src/yb/rocksdb/db/write_batch.cc b/src/yb/rocksdb/db/write_batch.cc index bcdfe3d34829..e6e003676ead 100644 --- a/src/yb/rocksdb/db/write_batch.cc +++ b/src/yb/rocksdb/db/write_batch.cc @@ -1029,7 +1029,7 @@ Result DirectInsert( } DirectWriteHandlerImpl direct_write_handler( current->mem(), mem_table_inserter->sequence_, handler_for_logging); - RETURN_NOT_OK(writer->Apply(&direct_write_handler)); + RETURN_NOT_OK(writer->Apply(direct_write_handler)); auto result = direct_write_handler.Complete(); mem_table_inserter->CheckMemtableFull(); return result; diff --git a/src/yb/rocksdb/write_batch.h b/src/yb/rocksdb/write_batch.h index f0044cd66e8f..54c80e0207e0 100644 --- a/src/yb/rocksdb/write_batch.h +++ b/src/yb/rocksdb/write_batch.h @@ -73,7 +73,7 @@ class DirectWriteHandler { // entries directly to the memtable. class DirectWriter { public: - virtual Status Apply(DirectWriteHandler* handler) = 0; + virtual Status Apply(DirectWriteHandler& handler) = 0; virtual ~DirectWriter() = default; }; diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 9262c961d300..5b2804852b8f 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -2362,7 +2362,7 @@ Status Tablet::RemoveIntentsImpl( Status Tablet::RemoveAdvisoryLock( const TransactionId& transaction_id, const Slice& key, - const dockv::IntentTypeSet& intent_types, rocksdb::DirectWriteHandler* handler) { + const dockv::IntentTypeSet& intent_types, rocksdb::DirectWriteHandler& handler) { VLOG_WITH_PREFIX_AND_FUNC(4) << "Transaction " << transaction_id << " is going to release lock "<< key.ToDebugString() << " with type " << yb::ToString(intent_types); @@ -2390,7 +2390,7 @@ Status Tablet::RemoveAdvisoryLock( return Status::OK(); } -Status Tablet::RemoveAdvisoryLocks(const TransactionId& id, rocksdb::DirectWriteHandler* handler) { +Status Tablet::RemoveAdvisoryLocks(const TransactionId& id, rocksdb::DirectWriteHandler& handler) { auto scoped_read_operation = CreateScopedRWOperationNotBlockingRocksDbShutdownStart(); RETURN_NOT_OK(scoped_read_operation); diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index 6e63948b842c..877ba420e652 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -1212,12 +1212,12 @@ class Tablet : public AbstractTablet, // Remove advisory lock intents for the given transaction id. Status RemoveAdvisoryLocks(const TransactionId& id, - rocksdb::DirectWriteHandler* handler) override; + rocksdb::DirectWriteHandler& handler) override; // Remove the advisory lock intent with speficied key and intent_types for the given txn id. Status RemoveAdvisoryLock( const TransactionId& transaction_id, const Slice& key, - const dockv::IntentTypeSet& intent_types, rocksdb::DirectWriteHandler* handler) override; + const dockv::IntentTypeSet& intent_types, rocksdb::DirectWriteHandler& handler) override; // Tries to find intent .SST files that could be deleted and remove them. void DoCleanupIntentFiles(); diff --git a/src/yb/tablet/tablet_vector_indexes.cc b/src/yb/tablet/tablet_vector_indexes.cc index 726da341bb2c..c3f5a24510d5 100644 --- a/src/yb/tablet/tablet_vector_indexes.cc +++ b/src/yb/tablet/tablet_vector_indexes.cc @@ -230,10 +230,10 @@ class VectorIndexBackfillHelper : public rocksdb::DirectWriter { return Status::OK(); } - Status Apply(rocksdb::DirectWriteHandler* handler) override { + Status Apply(rocksdb::DirectWriteHandler& handler) override { for (size_t i = 0; i != ybctids_.size(); ++i) { docdb::AddVectorIndexReverseEntry( - handler, ybctids_[i], entries_[i].value.AsSlice(), backfill_ht_); + handler, ybctids_[i], entries_[i].value.AsSlice(), DocHybridTime(backfill_ht_, 0)); } return Status::OK(); } diff --git a/src/yb/tablet/transaction_intent_applier.h b/src/yb/tablet/transaction_intent_applier.h index 3f3dc5fc3f7c..f42bbaad46ec 100644 --- a/src/yb/tablet/transaction_intent_applier.h +++ b/src/yb/tablet/transaction_intent_applier.h @@ -46,10 +46,10 @@ class TransactionIntentApplier { std::span metadatas) = 0; virtual Status RemoveAdvisoryLocks( - const TransactionId& transaction_id, rocksdb::DirectWriteHandler* handler) = 0; + const TransactionId& transaction_id, rocksdb::DirectWriteHandler& handler) = 0; virtual Status RemoveAdvisoryLock( const TransactionId& transaction_id, const Slice& key, - const dockv::IntentTypeSet& intent_types, rocksdb::DirectWriteHandler* handler) = 0; + const dockv::IntentTypeSet& intent_types, rocksdb::DirectWriteHandler& handler) = 0; virtual HybridTime ApplierSafeTime(HybridTime min_allowed, CoarseTimePoint deadline) = 0; diff --git a/src/yb/yql/pgwrapper/pg_vector_index-test.cc b/src/yb/yql/pgwrapper/pg_vector_index-test.cc index 0b7d3a4d936a..df76e63fa347 100644 --- a/src/yb/yql/pgwrapper/pg_vector_index-test.cc +++ b/src/yb/yql/pgwrapper/pg_vector_index-test.cc @@ -351,6 +351,8 @@ void PgVectorIndexTest::VerifyRows( "SELECT * FROM test $0$1", add_filter ? "WHERE id + 3 <= 5" : "", IndexQuerySuffix("[0.0, 0.0, 0.0]", limit < 0 ? expected.size() : make_unsigned(limit)))))); + LOG_WITH_FUNC(INFO) << " Result: " << AsString(result); + LOG_WITH_FUNC(INFO) << "Expected: " << AsString(expected); ASSERT_EQ(result.size(), expected.size()); for (size_t i = 0; i != std::min(result.size(), expected.size()); ++i) { SCOPED_TRACE(Format("Row $0", i));