Skip to content

Restructure Matrix read operations #314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions src/include/detail/linalg/tdb_matrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,20 +351,44 @@ class tdbBlockedMatrix : public MatrixBase {

auto layout_order = schema_.cell_order();

// Create a query
// Read TileDB data
size_t read_batch_size_cells = get_read_batch_size_cells(ctx_);
size_t total_size = elements_to_load * dimension;
size_t offset = 0;
tiledb::Query query(ctx_, *array_);
query.set_subarray(subarray)
.set_layout(layout_order)
.set_data_buffer(attr_name, this->data(), elements_to_load * dimension);
tiledb_helpers::submit_query(tdb_func__, uri_, query);
query.set_subarray(subarray).set_layout(layout_order);
tiledb::Query::Status status;
do {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this result in incomplete queries whenever read_batch_size_cells is less than total_size? I don't think we should batch read unless we actually need to. That is, we should always try to read total_size and then iterate if we don't get total_size.

Having incomplete queries here is a very edge case. Reading in batches may or may not solve that issue, but it adds latency to the common case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of read_batch_size_cells is to setup a cap on the individual request size to TileDB. The cap is configurable, so we could set it to a large value in order to perform all requests in one go.

To explain why I expect this to reduce our memory footprint without large latency overhead, lets use an example:

  • Assume we want to read 15GB of data from a tiledb:// array
  • Assume that TileDB requires some extra memory (20%) to perform the filter pipeline (decompression, etc.)
  • Assume read I/O throughput and network speed is 200MB/s and all operations are IO bound(no CPU overhead)
  • Assume each time we do an extra call to core we have 500ms latency overhead
  • One go
    • Peak allocated memory on client: 15GB
    • Peak allocated memory on server: 15GB + 3GB = 18GB
    • Peak allocated memory(REST+client when collocated during a realtime UDF): 33GB
    • Latency: 500ms + 2 * 15GB/200MB/s = 150,5s
  • With batch size 1GB
    • Peak allocated memory on client: 15GB
    • Peak allocated memory on server: 1GB + 0,3GB = 1,3GB
    • Peak allocated memory(REST+client when collocated during a realtime UDF): 16,3GB
    • Latency: 15*500ms + 2 * 15GB/200MB/s = 157,5s

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One rule in software is "don't pay for what you don't need." It is true that there is only a 5% penalty with this particular set of assumptions. However, I expect the server is going to deal with its memory footprint in its own way, which is why we might get incomplete reads. I would say just request what is needed and let the server say how much memory it wants to devote to that or not.

Do other apps try to be parsimonious this way?

I guess, on the other hand, since this is configurable, we can always just make it really large.

// Submit query and get status
size_t request_size = read_batch_size_cells;
if (offset + read_batch_size_cells > total_size) {
request_size = total_size - offset;
}
query.set_data_buffer(attr_name, this->data() + offset, request_size);
query.submit();
status = query.query_status();

auto num_results = query.result_buffer_elements()[attr_name].second;
if (num_results == 0) {
throw std::runtime_error(
"Read error: Got empty results while expecting to retrieve more "
"values.");
}
offset += num_results;
} while (status == tiledb::Query::Status::INCOMPLETE &&
offset < total_size);
// Handle errors
if (status == tiledb::Query::Status::COMPLETE && offset != total_size) {
throw std::runtime_error(
"Read error: Read status COMPLETE but result size was different "
"than expected: " +
std::to_string(offset) + " != " + std::to_string(total_size));
}
if (status != tiledb::Query::Status::COMPLETE) {
throw std::runtime_error("Read error: Query status not COMPLETE");
}
_memory_data.insert_entry(
tdb_func__, elements_to_load * dimension * sizeof(T));

// @todo Handle incomplete queries.
if (tiledb::Query::Status::COMPLETE != query.query_status()) {
throw std::runtime_error("Query status is not complete");
}

num_loads_++;
return true;
}
Expand Down
47 changes: 37 additions & 10 deletions src/include/detail/linalg/tdb_matrix_with_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,46 @@ class tdbBlockedMatrixWithIds

auto layout_order = ids_schema_.cell_order();
this->ids().resize(elements_to_load * dimension);
// Create a query

// Read TileDB data
size_t read_batch_size_cells = get_read_batch_size_cells(this->ctx_);
size_t total_size = elements_to_load * dimension;
size_t offset = 0;
auto ptr = this->ids().data();
tiledb::Query query(this->ctx_, *ids_array_);
query.set_subarray(subarray)
.set_layout(layout_order)
.set_data_buffer(attr_name, this->ids());
tiledb_helpers::submit_query(tdb_func__, ids_uri_, query);
query.set_subarray(subarray).set_layout(layout_order);
tiledb::Query::Status status;
do {
// Submit query and get status
size_t request_size = read_batch_size_cells;
if (offset + read_batch_size_cells > total_size) {
request_size = total_size - offset;
}
query.set_data_buffer(attr_name, ptr + offset, request_size);
tiledb_helpers::submit_query(tdb_func__, ids_uri_, query);
status = query.query_status();

auto num_results = query.result_buffer_elements()[attr_name].second;
if (num_results == 0) {
throw std::runtime_error(
"Read error: Got empty results while expecting to retrieve more "
"values.");
}
offset += num_results;
} while (status == tiledb::Query::Status::INCOMPLETE &&
offset < total_size);
// Handle errors
if (status == tiledb::Query::Status::COMPLETE && offset != total_size) {
throw std::runtime_error(
"Read error: Read status COMPLETE but result size was different "
"than expected: " +
std::to_string(offset) + " != " + std::to_string(total_size));
}
if (status != tiledb::Query::Status::COMPLETE) {
throw std::runtime_error("Read error: Query status not COMPLETE");
}
_memory_data.insert_entry(
tdb_func__, elements_to_load * dimension * sizeof(T));
// @todo Handle incomplete queries.
if (tiledb::Query::Status::COMPLETE != query.query_status()) {
throw std::runtime_error("Query status for IDs is not complete");
}

return true;
}
}; // tdbBlockedMatrixWithIds
Expand Down
51 changes: 36 additions & 15 deletions src/include/detail/linalg/tdb_partitioned_matrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -559,24 +559,45 @@ class tdbPartitionedMatrix

auto cell_order = partitioned_vectors_schema_.cell_order();
auto layout_order = cell_order;

tiledb::Query query(ctx_, *(this->partitioned_vectors_array_));

auto ptr = this->data();
query.set_subarray(subarray)
.set_layout(layout_order)
.set_data_buffer(attr_name, ptr, col_count * dimension);
// tiledb_helpers::submit_query(tdb_func__, partitioned_vectors_uri_,
// query);
query.submit();
_memory_data.insert_entry(tdb_func__, col_count * dimension * sizeof(T));

// assert(tiledb::Query::Status::COMPLETE == query.query_dstatus());
auto qs = query.query_status();
// @todo Handle incomplete queries.
if (tiledb::Query::Status::COMPLETE != query.query_status()) {
throw std::runtime_error("Query status is not complete -- fix me");
// Read TileDB data
size_t read_batch_size_cells = get_read_batch_size_cells(ctx_);
size_t total_size = col_count * dimension;
size_t offset = 0;
tiledb::Query query(ctx_, *(this->partitioned_vectors_array_));
query.set_subarray(subarray).set_layout(layout_order);
tiledb::Query::Status status;
do {
// Submit query and get status
size_t request_size = read_batch_size_cells;
if (offset + read_batch_size_cells > total_size) {
request_size = total_size - offset;
}
query.set_data_buffer(attr_name, ptr + offset, request_size);
query.submit();
status = query.query_status();

auto num_results = query.result_buffer_elements()[attr_name].second;
if (num_results == 0) {
throw std::runtime_error(
"Read error: Got empty results while expecting to retrieve more "
"values.");
}
offset += num_results;
} while (status == tiledb::Query::Status::INCOMPLETE &&
offset < total_size);
// Handle errors
if (status == tiledb::Query::Status::COMPLETE && offset != total_size) {
throw std::runtime_error(
"Read error: Read status COMPLETE but result size was different "
"than expected: " +
std::to_string(offset) + " != " + std::to_string(total_size));
}
if (status != tiledb::Query::Status::COMPLETE) {
throw std::runtime_error("Read error: Query status not COMPLETE");
}
_memory_data.insert_entry(tdb_func__, col_count * dimension * sizeof(T));
}

/**
Expand Down
22 changes: 22 additions & 0 deletions src/include/tdb_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,28 @@
#include <string>
#include <tiledb/tiledb>

// Default batch size for all TileDB read operations.
// This is expressed in number of array cells read per request.
constexpr size_t DEFAULT_READ_BATCH_SIZE_CELLS = 100000000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using 100'000'000 for legibility.

constexpr char READ_BATCH_SIZE_CELLS_CONFIG_KEY[] =
"vectorsearch.read_batch_size_cells";
static size_t get_read_batch_size_cells(const tiledb::Context& ctx) {
auto config = ctx.config();
if (config.contains(READ_BATCH_SIZE_CELLS_CONFIG_KEY)) {
auto tmp_str = config.get(READ_BATCH_SIZE_CELLS_CONFIG_KEY);
try {
size_t read_batch_size_cells = std::stoull(tmp_str);
return read_batch_size_cells;
} catch (const std::invalid_argument& e) {
throw std::invalid_argument(
"Failed to convert 'vectorsearch.read_batch_size_cells' to size_t "
"('" +
tmp_str + "')");
}
}
return DEFAULT_READ_BATCH_SIZE_CELLS;
}

template <class... T>
constexpr bool always_false = false;

Expand Down
Loading