Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into testcases_tensor_sp…
Browse files Browse the repository at this point in the history
…arse
  • Loading branch information
Ami11111 committed Jul 25, 2024
2 parents 2bb9477 + 777435f commit 9766673
Show file tree
Hide file tree
Showing 22 changed files with 149 additions and 78 deletions.
46 changes: 23 additions & 23 deletions conf/infinity_conf.toml
Original file line number Diff line number Diff line change
@@ -1,56 +1,56 @@
[general]
version = "0.3.0"
version = "0.3.0"
time_zone = "utc-8"

[network]
server_address = "0.0.0.0"
postgres_port = 5432
http_port = 23820
client_port = 23817
connection_pool_size = 128
server_address = "0.0.0.0"
postgres_port = 5432
http_port = 23820
client_port = 23817
connection_pool_size = 128

[log]
log_filename = "infinity.log"
log_dir = "/var/infinity/log"
log_to_stdout = false
log_file_max_size = "10GB"
log_file_rotate_count = 10
log_filename = "infinity.log"
log_dir = "/var/infinity/log"
log_to_stdout = false
log_file_max_size = "10GB"
log_file_rotate_count = 10

# trace/info/warning/error/critical 5 log levels, default: info
log_level = "info"
log_level = "info"

[storage]
data_dir = "/var/infinity/data"
data_dir = "/var/infinity/data"

# periodically activates garbage collection:
# 0 means real-time,
# s means seconds, for example "60s", 60 seconds
# m means minutes, for example "60m", 60 minutes
# h means hours, for example "1h", 1 hour
cleanup_interval = "60s"
compact_interval = "120s"
cleanup_interval = "60s"
compact_interval = "120s"

# dump memory index entry when it reachs the capacity
mem_index_capacity = 1048576

[buffer]
buffer_manager_size = "4GB"
lru_num = 7
temp_dir = "/var/infinity/tmp"
buffer_manager_size = "4GB"
lru_num = 7
temp_dir = "/var/infinity/tmp"

[wal]
wal_dir = "/var/infinity/wal"
wal_dir = "/var/infinity/wal"
full_checkpoint_interval = "86400s"
delta_checkpoint_interval = "60s"
# delta_checkpoint_threshold = 1000000000
wal_compact_threshold = "1GB"
# delta_checkpoint_threshold = 1000000000
wal_compact_threshold = "1GB"

# flush_at_once: write and flush log each commit
# only_write: write log, OS control when to flush the log, default
# flush_per_second: logs are written after each commit and flushed to disk per second.
wal_flush = "only_write"
wal_flush = "only_write"

[resource]
resource_dir = "/var/infinity/resource"
resource_dir = "/var/infinity/resource"

[persistence]
29 changes: 15 additions & 14 deletions docs/references/pysdk_api_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -578,40 +578,41 @@ table_instance.insert({"c1": 1, "c7": "Tom", "c12": True})

```python
# Create a table with a integer column and a 3-d vector column:
table_instance = db_instance.create_table("vector_table", {"c1": {"type": "integer", "default": 2024}, "vector_column": {"type": "vector,3,float"}})
table_instance = db_instance.create_table("vector_table", {"c1": {"type": "integer", "default": 0}, "vector_column": {"type": "vector,3,float"}})

# Insert one complete row into the table:
table_obj.insert({"c1": 2023, "vector_column": [1.1, 2.2, 3.3]})
# Insert one incomplete row into the table:
# Note that the 'c1' cell defaults to 0.
table_obj.insert({"vector_column": [1.1, 2.2, 3.3]})

# Insert three rows into the table:
table_obj.insert([{"vector_column": [1.1, 2.2, 3.3]}, {"vector_column": [4.4, 5.5, 6.6]}, {"vector_column": [7.7, 8.8, 9.9]}])
# Insert two incomplete rows into the table:
# Note that the 'c1' cells default to 0.
table_obj.insert([{"vector_column": [1.1, 2.2, 3.3]}, {"vector_column": [4.4, 5.5, 6.6]}])
```
#### Insert sparse vectors

```python
# Create a table with a integer column and a 100-d sparse vector column:
table_instance = db_instance.create_table("sparse_vector_table", {"c1": {"type": "integer"}, "sparse_column": {"type": "sparse,100,float,int"}})
# Create a table with a 100-d sparse vector column only:
table_instance = db_instance.create_table("sparse_vector_table", {"sparse_column": {"type": "sparse,100,float,int"}})

# Insert three rows into the table:
# Insert one row into the table:
# `indices` specifies the correspoing indices to the values in `values`.
# Note that the second row sets "c1" as 2024 by default.
table_instance.insert([{"c1": 2022, "sparse_column": {"indices": [10, 20, 30], "values": [1.1, 2.2, 3.3]}, {"sparse_column": {"indices": [70, 80, 90], "values": [7.7, 8.8, 9.9]}}}])
table_instance.insert({"sparse_column": {"indices": [10, 20, 30], "values": [1.1, 2.2, 3.3]}})
```

#### Insert tensors

```python
# Create a table with a tensor column:
table_instance = db_instance.create_table("tensor_table", {"c1": {"type": "integer", "default": 2024}, "tensor_column": {"type": "tensor,4,float"}})
# Create a table with a tensor column only:
table_instance = db_instance.create_table("tensor_table", {"tensor_column": {"type": "tensor,4,float"}})

# Insert one row into the table, with the "c1" column defaulting to 2024:
# Insert one row into the table:
table_instance.insert([{"tensor_column": [[1.0, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]]}])
```

#### Insert tensor arrays

```python
# Creat a table with only one tensor array column:
# Creat a table with a tensor array column only:
table_instance = db_instance.create_table("tensor_array_table", {"tensor_array_column": {"type": "tensorarray,2,float"}})

table_instance.insert([{"tensor_array_column": [[[1.0, 2.0], [3.0, 4.0]], [[5.0, 6.0]]]}])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
EmbeddingDataType GetType() const;
Expand Down Expand Up @@ -136,7 +136,7 @@ void AnnIVFFlatIndexFileWorker<DataType>::WriteToFileImpl(bool to_spill, bool &p
}

template <typename DataType>
void AnnIVFFlatIndexFileWorker<DataType>::ReadFromFileImpl() {
void AnnIVFFlatIndexFileWorker<DataType>::ReadFromFileImpl(SizeT file_size) {
data_ = new AnnIVFFlatIndexData<DataType>();
auto *index = static_cast<AnnIVFFlatIndexData<DataType> *>(data_);
index->ReadIndexInner(*file_handler_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/bmp_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void BMPIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
prepare_success = true;
}

void BMPIndexFileWorker::ReadFromFileImpl() {
void BMPIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
if (data_ != nullptr) {
UnrecoverableError("Data is already allocated.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/bmp_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
SizeT index_size_{};
Expand Down
3 changes: 1 addition & 2 deletions src/storage/buffer/file_worker/data_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,9 @@ void DataFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
prepare_success = true; // Not run defer_fn
}

void DataFileWorker::ReadFromFileImpl() {
void DataFileWorker::ReadFromFileImpl(SizeT file_size) {
LocalFileSystem fs;

SizeT file_size = fs.GetFileSize(*file_handler_);
if (file_size < sizeof(u64) * 3) {
Status status = Status::DataIOError(fmt::format("Incorrect file length {}.", file_size));
RecoverableError(status);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/data_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
const SizeT buffer_size_;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/emvb_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void EMVBIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success)
prepare_success = true;
}

void EMVBIndexFileWorker::ReadFromFileImpl() {
void EMVBIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
if (data_) {
const auto error_message = "Data is already allocated.";
UnrecoverableError(error_message);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/emvb_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
const EmbeddingInfo *GetEmbeddingInfo() const;
Expand Down
19 changes: 13 additions & 6 deletions src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,17 @@ void FileWorker::ReadFromFile(bool from_spill) {
} else {
read_path = fmt::format("{}/{}", ChooseFileDir(from_spill), *file_name_);
}

SizeT file_size = 0;
u8 flags = FileFlags::READ_FLAG;
auto [file_handler, status] = fs.OpenFile(read_path, flags, FileLockType::kReadLock);
if(!status.ok()) {
UnrecoverableError(status.message());
}
if (use_object_cache) {
fs.Seek(*file_handler, obj_addr_.part_offset_);
file_size = obj_addr_.part_size_;
} else {
file_size = fs.GetFileSize(*file_handler);
}
file_handler_ = std::move(file_handler);
DeferFn defer_fn([&]() {
Expand All @@ -106,14 +109,10 @@ void FileWorker::ReadFromFile(bool from_spill) {
InfinityContext::instance().persistence_manager()->PutObjCache(obj_addr_);
}
});
ReadFromFileImpl();
ReadFromFileImpl(file_size);
}

void FileWorker::MoveFile() {
if (InfinityContext::instance().persistence_manager() != nullptr) {
LOG_DEBUG(fmt::format("Skipped MoveFile file since persistence manager is enabled: {}", *file_name_));
return;
}
LocalFileSystem fs;

String src_path = fmt::format("{}/{}", ChooseFileDir(true), *file_name_);
Expand All @@ -130,9 +129,17 @@ void FileWorker::MoveFile() {
// UnrecoverableError(fmt::format("File {} was already been created before.", dest_path));
// }
fs.Rename(src_path, dest_path);
if (InfinityContext::instance().persistence_manager() != nullptr) {
obj_addr_ = InfinityContext::instance().persistence_manager()->Persist(dest_path);
fs.DeleteFile(dest_path);
}
}

void FileWorker::CleanupFile() const {
if (InfinityContext::instance().persistence_manager() != nullptr) {
InfinityContext::instance().persistence_manager()->Cleanup(obj_addr_);
return;
}
LocalFileSystem fs;

String path = fmt::format("{}/{}", ChooseFileDir(false), *file_name_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public:
protected:
virtual void WriteToFileImpl(bool to_spill, bool &prepare_success) = 0;

virtual void ReadFromFileImpl() = 0;
virtual void ReadFromFileImpl(SizeT file_size) = 0;

private:
String ChooseFileDir(bool spill) const { return spill ? fmt::format("{}{}", *temp_dir_, *file_dir_) : *file_dir_; }
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/hnsw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
prepare_success = true;
}

void HnswFileWorker::ReadFromFileImpl() {
void HnswFileWorker::ReadFromFileImpl(SizeT file_size) {
if (data_ != nullptr) {
UnrecoverableError("Data is already allocated.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/hnsw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
SizeT index_size_{};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/raw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void RawFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
prepare_success = true; // Not run defer_fn
}

void RawFileWorker::ReadFromFileImpl() {
void RawFileWorker::ReadFromFileImpl(SizeT file_size) {
LocalFileSystem fs;
buffer_size_ = fs.GetFileSize(*file_handler_);
data_ = static_cast<void *>(new char[buffer_size_]);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/raw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
SizeT buffer_size_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void SecondaryIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_succ
}
}

void SecondaryIndexFileWorker::ReadFromFileImpl() {
void SecondaryIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
if (!data_) [[likely]] {
auto index = GetSecondaryIndexData(column_def_->type(), row_count_, false);
index->ReadIndexInner(*file_handler_);
Expand Down Expand Up @@ -142,7 +142,7 @@ void SecondaryIndexFileWorkerParts::WriteToFileImpl(bool to_spill, bool &prepare
}
}

void SecondaryIndexFileWorkerParts::ReadFromFileImpl() {
void SecondaryIndexFileWorkerParts::ReadFromFileImpl(SizeT file_size) {
if (row_count_ < part_id_ * 8192) {
String error_message = fmt::format("ReadFromFileImpl: row_count_: {} < part_id_ * 8192: {}", row_count_, part_id_ * 8192);
UnrecoverableError(error_message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

const u32 row_count_{};
};
Expand All @@ -81,7 +81,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

const u32 row_count_;
const u32 part_id_;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/version_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void VersionFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
}
}

void VersionFileWorker::ReadFromFileImpl() {
void VersionFileWorker::ReadFromFileImpl(SizeT file_size) {
if (data_ != nullptr) {
String error_message = "Data is already allocated.";
UnrecoverableError(error_message);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/version_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
SizeT capacity_{};
Expand Down
Loading

0 comments on commit 9766673

Please sign in to comment.