Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into fix_http_1
Browse files Browse the repository at this point in the history
  • Loading branch information
Ami11111 committed Aug 2, 2024
2 parents db83ac5 + 92f673a commit 3cad7f7
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 45 deletions.
1 change: 1 addition & 0 deletions src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ void InfinityContext::UnInit() {
task_scheduler_.reset();

resource_manager_.reset();
persistence_manager_.reset();

Logger::Shutdown();

Expand Down
2 changes: 1 addition & 1 deletion src/storage/invertedindex/column_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void ColumnIndexReader::Open(optionflag_t flag, String &&index_dir, Map<SegmentI
auto [chunk_index_entries, memory_indexer] = segment_index_entry->GetFullTextIndexSnapshot();
// segment_readers
for (u32 i = 0; i < chunk_index_entries.size(); ++i) {
String full_dir = fmt::format("{}/{}", *chunk_index_entries[i]->base_dir_, index_dir_);
String full_dir = (Path(*chunk_index_entries[i]->base_dir_) / index_dir_).string();
SharedPtr<DiskIndexSegmentReader> segment_reader =
MakeShared<DiskIndexSegmentReader>(full_dir, chunk_index_entries[i]->base_name_, chunk_index_entries[i]->base_rowid_, flag);
segment_readers_.push_back(std::move(segment_reader));
Expand Down
33 changes: 27 additions & 6 deletions src/storage/invertedindex/disk_segment_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,54 @@ import byte_slice_reader;
import infinity_exception;
import status;
import logger;
import persistence_manager;
import infinity_context;

namespace infinity {

DiskIndexSegmentReader::DiskIndexSegmentReader(const String &index_dir, const String &base_name, RowID base_row_id, optionflag_t flag)
: base_row_id_(base_row_id) {
Path path = Path(index_dir) / base_name;
String path_str = path.string();
String dict_file = path_str;
dict_file.append(DICT_SUFFIX);
dict_file_ = path_str;
dict_file_.append(DICT_SUFFIX);
PersistenceManager *pm = InfinityContext::instance().persistence_manager();
String dict_file = dict_file_;
if (nullptr != pm) {
dict_file = pm->GetObjCache(dict_file);
}
dict_reader_ = MakeShared<DictionaryReader>(dict_file, PostingFormatOption(flag));
posting_file_ = path_str;
posting_file_.append(POSTING_SUFFIX);
int rc = fs_.MmapFile(posting_file_, data_ptr_, data_len_);
String posting_file = posting_file_;
if (nullptr != pm) {
posting_file = pm->GetObjCache(posting_file);
posting_file_obj_ = posting_file;
}
int rc = fs_.MmapFile(posting_file, data_ptr_, data_len_);
assert(rc == 0);
if (rc != 0) {
Status status = Status::MmapFileError(posting_file_);
Status status = Status::MmapFileError(posting_file);
RecoverableError(status);
}
}

DiskIndexSegmentReader::~DiskIndexSegmentReader() {
int rc = fs_.MunmapFile(posting_file_);
PersistenceManager *pm = InfinityContext::instance().persistence_manager();
String posting_file = posting_file_;
if (nullptr != pm) {
posting_file = posting_file_obj_;
}
int rc = fs_.MunmapFile(posting_file);
assert(rc == 0);
if (rc != 0) {
Status status = Status::MunmapFileError(posting_file_);
Status status = Status::MunmapFileError(posting_file);
RecoverableError(status);
}
if (nullptr != pm) {
pm->PutObjCache(dict_file_);
pm->PutObjCache(posting_file_);
}
}

bool DiskIndexSegmentReader::GetSegmentPosting(const String &term, SegmentPosting &seg_posting, bool fetch_position) const {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/invertedindex/disk_segment_reader.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ private:
RowID base_row_id_{INVALID_ROWID};
SharedPtr<DictionaryReader> dict_reader_;
String posting_file_{};
String posting_file_obj_{};
String dict_file_{};
u8 *data_ptr_{};
SizeT data_len_{};
LocalFileSystem fs_{};
Expand Down
35 changes: 23 additions & 12 deletions src/storage/invertedindex/memory_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,20 @@ void MemoryIndexer::Dump(bool offline, bool spill) {
while (GetInflightTasks() > 0) {
CommitSync(100);
}
// LOG_INFO("MemoryIndexer::Dump begin");
Path path = Path(index_dir_) / base_name_;
String index_prefix = path.string();
LocalFileSystem fs;
String posting_file = index_prefix + POSTING_SUFFIX + (spill ? SPILL_SUFFIX : "");
SharedPtr<FileWriter> posting_file_writer = MakeShared<FileWriter>(fs, posting_file, 128000);
String dict_file = index_prefix + DICT_SUFFIX + (spill ? SPILL_SUFFIX : "");

PersistenceManager *pm = InfinityContext::instance().persistence_manager();
bool use_object_cache = pm != nullptr && !spill;
if (use_object_cache) {
pm->ObjCreateRefCount(posting_file);
pm->ObjCreateRefCount(dict_file);
}

SharedPtr<FileWriter> posting_file_writer = MakeShared<FileWriter>(fs, posting_file, 128000);
SharedPtr<FileWriter> dict_file_writer = MakeShared<FileWriter>(fs, dict_file, 128000);
TermMetaDumper term_meta_dumpler((PostingFormatOption(flag_)));

Expand Down Expand Up @@ -297,6 +304,20 @@ void MemoryIndexer::Dump(bool offline, bool spill) {
}

String column_length_file = index_prefix + LENGTH_SUFFIX + (spill ? SPILL_SUFFIX : "");
if (use_object_cache) {
pm->ObjCreateRefCount(column_length_file);
}
DeferFn defer_fn([&]() {
if (!use_object_cache) {
return;
}
pm->PutObjCache(posting_file);
pm->PutObjCache(dict_file);
pm->PutObjCache(column_length_file);
std::filesystem::remove(posting_file);
std::filesystem::remove(dict_file);
std::filesystem::remove(column_length_file);
});
auto [file_handler, status] = fs.OpenFile(column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock);
if (!status.ok()) {
UnrecoverableError(status.message());
Expand All @@ -308,16 +329,6 @@ void MemoryIndexer::Dump(bool offline, bool spill) {

is_spilled_ = spill;
Reset();

// persist
PersistenceManager *pm = InfinityContext::instance().persistence_manager();
if (pm && !spill) {
pm->Persist(posting_file);
pm->Persist(dict_file);
pm->Persist(column_length_file);
}

// LOG_INFO("MemoryIndexer::Dump end");
}

// Similar to DiskIndexSegmentReader::GetSegmentPosting
Expand Down
22 changes: 20 additions & 2 deletions src/storage/persistence/persistence_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ void ObjAddr::ReadBuf(const char *buf) {
part_size_ = ReadBufAdv<SizeT>(buf);
}

PersistenceManager::PersistenceManager(const String &workspace, const String &data_dir, SizeT object_size_limit)
: workspace_(workspace), local_data_dir_(data_dir), object_size_limit_(object_size_limit) {
current_object_key_ = ObjCreate();
current_object_size_ = 0;

if (local_data_dir_.empty() || local_data_dir_.back() != '/') {
local_data_dir_ += '/';
}
}

ObjAddr PersistenceManager::Persist(const String &file_path, bool allow_compose) {
std::error_code ec;
fs::path src_fp = file_path;
Expand Down Expand Up @@ -158,7 +168,8 @@ void PersistenceManager::CurrentObjFinalizeNoLock() {
}
}

String PersistenceManager::GetObjCache(const String &local_path) {
String PersistenceManager::GetObjCache(const String &file_path) {
String local_path = RemovePrefix(file_path);
std::lock_guard<std::mutex> lock(mtx_);
auto it = local_path_obj_.find(local_path);
if (it == local_path_obj_.end()) {
Expand Down Expand Up @@ -238,7 +249,6 @@ ObjAddr PersistenceManager::ObjCreateRefCount(const String &file_path) {
if (fs::exists(dst_fp)) {
fs::remove(dst_fp);
}
std::error_code ec;
fs::create_symlink(src_fp, dst_fp);
} catch (const fs::filesystem_error &e) {
String error_message = fmt::format("Failed to link file {}.", file_path);
Expand Down Expand Up @@ -325,6 +335,14 @@ String PersistenceManager::RemovePrefix(const String &path) {
return "";
}

SizeT PersistenceManager::SumRefCounts() {
SizeT ref_counts = 0;
for (auto& [key, obj_stat] : objects_) {
ref_counts += obj_stat.ref_count_;
}
return ref_counts;
}

void PersistenceManager::Cleanup(const String &file_path) {
String local_path = RemovePrefix(file_path);
if (local_path.empty()) {
Expand Down
9 changes: 4 additions & 5 deletions src/storage/persistence/persistence_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,8 @@ export struct ObjStat {
export class PersistenceManager {
public:
// TODO: build cache from existing files under workspace
PersistenceManager(const String &workspace, const String &data_dir, SizeT object_size_limit)
: workspace_(workspace), local_data_dir_(data_dir), object_size_limit_(object_size_limit) {
current_object_key_ = ObjCreate();
current_object_size_ = 0;
}
PersistenceManager(const String &workspace, const String &data_dir, SizeT object_size_limit);

~PersistenceManager() {}

// Create new object or append to current object, and returns the location.
Expand Down Expand Up @@ -95,6 +92,8 @@ public:

void Deserialize(const nlohmann::json &obj);

SizeT SumRefCounts();

private:
String ObjCreate();

Expand Down
8 changes: 7 additions & 1 deletion src/unit_test/base_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class BaseTestWithParam : public std::conditional_t<std::is_same_v<T, void>, ::t

void SetUp() override {}
void TearDown() override {}
public:
static constexpr const char* NULL_CONFIG_PATH = "";
static constexpr const char* VFS_CONFIG_PATH = "test/data/config/test_vfs.toml";

protected:
const char *GetHomeDir() { return "/var/infinity"; }
Expand All @@ -56,10 +59,12 @@ class BaseTestWithParam : public std::conditional_t<std::is_same_v<T, void>, ::t

const char *GetFullTmpDir() { return "/var/infinity/tmp"; }

const char *GetFullPersistDir() { return "/var/infinity/persistence"; }

const char *GetTmpDir() { return "tmp"; }

void CleanupDbDirs() {
const char *infinity_db_dirs[] = {GetFullDataDir(), GetFullWalDir(), GetFullLogDir(), GetFullTmpDir()};
const char *infinity_db_dirs[] = {GetFullDataDir(), GetFullWalDir(), GetFullLogDir(), GetFullTmpDir(), GetFullPersistDir()};
for (auto &dir : infinity_db_dirs) {
CleanupDirectory(dir);
}
Expand Down Expand Up @@ -138,3 +143,4 @@ class BaseTestWithParam : public std::conditional_t<std::is_same_v<T, void>, ::t
};

using BaseTest = BaseTestWithParam<void>;
using BaseTestParamStr = BaseTestWithParam<std::string>;
64 changes: 46 additions & 18 deletions src/unit_test/storage/invertedindex/memory_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ import inmem_index_segment_reader;
import segment_posting;
import global_resource_usage;
import infinity_context;
import third_party;

using namespace infinity;

class MemoryIndexerTest : public BaseTest {
class MemoryIndexerTest : public BaseTestParamStr {
public:
struct ExpectedPosting {
String term;
Expand All @@ -58,9 +59,20 @@ class MemoryIndexerTest : public BaseTest {
optionflag_t flag_{OPTION_FLAG_ALL};
SharedPtr<ColumnVector> column_;
Vector<ExpectedPosting> expected_postings_;
String config_path_{};

public:
void SetUp() override {
system(("mkdir -p " + String(GetFullPersistDir())).c_str());
system(("mkdir -p " + String(GetFullDataDir())).c_str());
system(("mkdir -p " + String(GetFullTmpDir())).c_str());
CleanupDbDirs();
config_path_ = GetParam();
if (config_path_ != BaseTestParamStr::NULL_CONFIG_PATH) {
std::shared_ptr<std::string> config_path = std::make_shared<std::string>(config_path_);
infinity::InfinityContext::instance().Init(config_path);
}

// https://en.wikipedia.org/wiki/Finite-state_transducer
const char *paragraphs[] = {
R"#(A finite-state transducer (FST) is a finite-state machine with two memory tapes, following the terminology for Turing machines: an input tape and an output tape. This contrasts with an ordinary finite-state automaton, which has a single tape. An FST is a type of finite-state automaton (FSA) that maps between two sets of symbols.[1] An FST is more general than an FSA. An FSA defines a formal language by defining a set of accepted strings, while an FST defines a relation between sets of strings.)#",
Expand All @@ -80,7 +92,14 @@ class MemoryIndexerTest : public BaseTest {
expected_postings_ = {{"fst", {0, 1, 2}, {4, 2, 2}}, {"automaton", {0, 3}, {2, 5}}, {"transducer", {0, 4}, {1, 4}}};
}

void TearDown() override {}
void TearDown() override {
if (config_path_ != BaseTestParamStr::NULL_CONFIG_PATH) {
if (InfinityContext::instance().persistence_manager() != nullptr) {
ASSERT_TRUE(InfinityContext::instance().persistence_manager()->SumRefCounts() == 0);
}
infinity::InfinityContext::instance().UnInit();
}
}

void Check(ColumnIndexReader &reader) {
for (SizeT i = 0; i < expected_postings_.size(); ++i) {
Expand Down Expand Up @@ -111,15 +130,24 @@ class MemoryIndexerTest : public BaseTest {
}
};

TEST_F(MemoryIndexerTest, Insert) {
INSTANTIATE_TEST_SUITE_P(
TestWithDifferentParams,
MemoryIndexerTest,
::testing::Values(
BaseTestParamStr::NULL_CONFIG_PATH,
BaseTestParamStr::VFS_CONFIG_PATH
)
);

TEST_P(MemoryIndexerTest, Insert) {
// prepare fake segment index entry
auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullTmpDir());
MemoryIndexer indexer1(GetFullTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard");
auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir());
MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard");
indexer1.Insert(column_, 0, 1);
indexer1.Insert(column_, 1, 3);
indexer1.Dump();

auto indexer2 = MakeUnique<MemoryIndexer>(GetFullTmpDir(), "chunk2", RowID(0U, 4U), flag_, "standard");
auto indexer2 = MakeUnique<MemoryIndexer>(GetFullDataDir(), "chunk2", RowID(0U, 4U), flag_, "standard");
indexer2->Insert(column_, 4, 1);
while (indexer2->GetInflightTasks() > 0) {
sleep(1);
Expand All @@ -130,13 +158,13 @@ TEST_F(MemoryIndexerTest, Insert) {
fake_segment_index_entry_1->SetMemoryIndexer(std::move(indexer2));
Map<SegmentID, SharedPtr<SegmentIndexEntry>> index_by_segment = {{0, fake_segment_index_entry_1}};
ColumnIndexReader reader;
reader.Open(flag_, GetFullTmpDir(), std::move(index_by_segment));
reader.Open(flag_, GetFullDataDir(), std::move(index_by_segment));
Check(reader);
}

TEST_F(MemoryIndexerTest, test2) {
auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullTmpDir());
MemoryIndexer indexer1(GetFullTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard");
TEST_P(MemoryIndexerTest, test2) {
auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir());
MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard");
indexer1.Insert(column_, 0, 2, true);
indexer1.Insert(column_, 2, 2, true);
indexer1.Insert(column_, 4, 1, true);
Expand All @@ -146,13 +174,13 @@ TEST_F(MemoryIndexerTest, test2) {
Map<SegmentID, SharedPtr<SegmentIndexEntry>> index_by_segment = {{1, fake_segment_index_entry_1}};

ColumnIndexReader reader;
reader.Open(flag_, GetFullTmpDir(), std::move(index_by_segment));
reader.Open(flag_, GetFullDataDir(), std::move(index_by_segment));
Check(reader);
}

TEST_F(MemoryIndexerTest, SpillLoadTest) {
auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullTmpDir());
auto indexer1 = MakeUnique<MemoryIndexer>(GetFullTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard");
TEST_P(MemoryIndexerTest, SpillLoadTest) {
auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir());
auto indexer1 = MakeUnique<MemoryIndexer>(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard");
indexer1->Insert(column_, 0, 2);
indexer1->Insert(column_, 2, 2);
indexer1->Insert(column_, 4, 1);
Expand All @@ -162,7 +190,7 @@ TEST_F(MemoryIndexerTest, SpillLoadTest) {
}

indexer1->Dump(false, true);
UniquePtr<MemoryIndexer> loaded_indexer = MakeUnique<MemoryIndexer>(GetFullTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard");
UniquePtr<MemoryIndexer> loaded_indexer = MakeUnique<MemoryIndexer>(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard");

loaded_indexer->Load();
SharedPtr<InMemIndexSegmentReader> segment_reader = MakeShared<InMemIndexSegmentReader>(loaded_indexer.get());
Expand Down Expand Up @@ -190,7 +218,7 @@ TEST_F(MemoryIndexerTest, SpillLoadTest) {
}
}

TEST_F(MemoryIndexerTest, SeekPosition) {
TEST_P(MemoryIndexerTest, SeekPosition) {
// "A B C" repeats 7 times
String paragraph(R"#(A B C A B C A B C A B C A B C A B C A B C)#");
auto column = ColumnVector::Make(MakeShared<DataType>(LogicalType::kVarchar));
Expand All @@ -200,8 +228,8 @@ TEST_F(MemoryIndexerTest, SeekPosition) {
column->AppendValue(v);
}

auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullTmpDir());
MemoryIndexer indexer1(GetFullTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard");
auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir());
MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard");
indexer1.Insert(column, 0, 8192);
while (indexer1.GetInflightTasks() > 0) {
sleep(1);
Expand Down
Loading

0 comments on commit 3cad7f7

Please sign in to comment.