From 1448e51c899b2657472c405312c392ace5d4916e Mon Sep 17 00:00:00 2001 From: Denis Khalikov Date: Fri, 18 Oct 2024 13:19:48 +0000 Subject: [PATCH] MCOL-5724 Move journal to FDB --- storage-manager/src/IOCoordinator.cpp | 341 ++++++------------ storage-manager/src/IOCoordinator.h | 10 +- storage-manager/src/KVPrefixes.cpp | 9 +- .../src/{KVPrefixes.hpp => KVPrefixes.h} | 7 +- storage-manager/src/MetadataFile.cpp | 2 +- storage-manager/src/Ownership.cpp | 2 +- storage-manager/src/PrefixCache.cpp | 21 +- storage-manager/src/Replicator.cpp | 280 ++++++-------- storage-manager/src/Replicator.h | 3 +- storage-manager/src/Synchronizer.cpp | 114 +++--- 10 files changed, 320 insertions(+), 469 deletions(-) rename storage-manager/src/{KVPrefixes.hpp => KVPrefixes.h} (88%) diff --git a/storage-manager/src/IOCoordinator.cpp b/storage-manager/src/IOCoordinator.cpp index 5a6adaf9a8..4bc9388343 100644 --- a/storage-manager/src/IOCoordinator.cpp +++ b/storage-manager/src/IOCoordinator.cpp @@ -16,6 +16,8 @@ MA 02110-1301, USA. */ #include "IOCoordinator.h" +#include "KVStorageInitializer.h" +#include "KVPrefixes.h" #include "MetadataFile.h" #include "Synchronizer.h" #include @@ -24,6 +26,7 @@ #include #include #include +#include "cstring" #include #define BOOST_SPIRIT_THREADSAFE #include @@ -189,7 +192,8 @@ ssize_t IOCoordinator::read(const char* _filename, uint8_t* data, off_t offset, } vector relevants = meta.metadataRead(offset, length); - map journalFDs, objectFDs; + map objectFDs; + unordered_set journalFDs; map keyToJournalName, keyToObjectName; utils::VLArray fdMinders(relevants.size() * 2); int mindersIndex = 0; @@ -211,29 +215,21 @@ ssize_t IOCoordinator::read(const char* _filename, uint8_t* data, off_t offset, // later. not thinking about it for now. // open all of the journal files that exist - string jFilename = (journalPath / firstDir / (key + ".journal")).string(); - int fd = ::open(jFilename.c_str(), O_RDONLY); - if (fd >= 0) + const string journal = getJournalName((journalPath / firstDir / (key + ".journal")).string()); + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto tnx = kvStorage->createTransaction(); + auto resultPairJournal = tnx->get(journal); + if (resultPairJournal.first) { - keyToJournalName[key] = jFilename; - journalFDs[key] = fd; - fdMinders[mindersIndex++].fd = fd; - // fdMinders.push_back(SharedCloser(fd)); - } - else if (errno != ENOENT) - { - int l_errno = errno; - fileLock.unlock(); - cache->doneReading(firstDir, keys); - logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'", - jFilename.c_str(), strerror_r(l_errno, buf, 80)); - errno = l_errno; - return -1; + keyToJournalName[key] = journal; + journalFDs.insert(key); } + // else + // Journal already merged with object file. // open all of the objects string oFilename = (cachePath / firstDir / key).string(); - fd = ::open(oFilename.c_str(), O_RDONLY); + auto fd = ::open(oFilename.c_str(), O_RDONLY); if (fd < 0) { int l_errno = errno; @@ -247,9 +243,7 @@ ssize_t IOCoordinator::read(const char* _filename, uint8_t* data, off_t offset, keyToObjectName[key] = oFilename; objectFDs[key] = fd; fdMinders[mindersIndex++].fd = fd; - // fdMinders.push_back(SharedCloser(fd)); } - // fileLock.unlock(); // ^^^ TODO: The original version unlocked the file at the above position. On second glance, // I'm not sure how we can guarantee the journal files won't be modified during the loads below. @@ -857,7 +851,14 @@ int IOCoordinator::_truncate(const bf::path& bfpath, size_t newSize, ScopedFileL if (result & 0x1) replicator->remove(cachePath / firstDir / objects[i].key); if (result & 0x2) - replicator->remove(journalPath / firstDir / (objects[i].key + ".journal")); + { + const auto journalName = + getJournalName((journalPath / firstDir / (objects[i].key + ".journal")).string()); + const auto journalSizeName = + getJournalName((journalPath / firstDir / (objects[i].key + "_size.journal")).string()); + replicator->removeJournal(journalName); + replicator->removeJournalSize(journalSizeName); + } deletedObjects.push_back(objects[i].key); } if (!deletedObjects.empty()) @@ -905,7 +906,11 @@ void IOCoordinator::deleteMetaFile(const bf::path& file) if (result & 0x2) { ++iocFilesDeleted; - replicator->remove(journalPath / firstDir / (object.key + ".journal")); + const auto journalName = getJournalName((journalPath / firstDir / (object.key + ".journal")).string()); + const auto journalSizeName = + getJournalName((journalPath / firstDir / (object.key + "_size.journal")).string()); + replicator->removeJournalSize(journalSizeName); + replicator->removeJournal(journalName); } deletedObjects.push_back(object.key); } @@ -1021,20 +1026,6 @@ int IOCoordinator::copyFile(const char* _filename1, const char* _filename2) int err; char errbuf[80]; - // since we don't implement mkdir(), assume the caller did that and - // create any necessary parent dirs for filename2 - try - { - bf::create_directories(metaFile2.parent_path()); - } - catch (bf::filesystem_error& e) - { - logger->log(LOG_CRIT, "IOCoordinator::copyFile(): failed to create directory %s. Got %s", - metaFile2.parent_path().string().c_str(), strerror_r(e.code().value(), errbuf, 80)); - errno = e.code().value(); - return -1; - } - vector> newJournalEntries; ScopedReadLock lock(this, filename1); ScopedWriteLock lock2(this, filename2); @@ -1060,8 +1051,6 @@ int IOCoordinator::copyFile(const char* _filename1, const char* _filename2) { for (const auto& object : objects) { - bf::path journalFile = journalPath / firstDir1 / (object.key + ".journal"); - // originalLength = the length of the object before journal entries. // the length in the metadata is the length after journal entries size_t originalLength = MetadataFile::getLengthFromKey(object.key); @@ -1095,27 +1084,36 @@ int IOCoordinator::copyFile(const char* _filename1, const char* _filename2) object.key + ": " + strerror_r(errno, errbuf, 80)); } - // if there's a journal file for this object, make a copy - if (bf::exists(journalFile)) + const auto journalName = getJournalName((journalPath / firstDir1 / (object.key + ".journal")).string()); + auto keyGen = std::make_shared(); + FDBCS::BlobHandler journalReader(keyGen); + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto resultPair = journalReader.readBlob(kvStorage, journalName); + if (resultPair.first) { - bf::path newJournalFile = journalPath / firstDir2 / (newObj.key + ".journal"); - try - { - bf::copy_file(journalFile, newJournalFile); - size_t tmp = bf::file_size(newJournalFile); - ++iocJournalsCreated; - iocBytesRead += tmp; - iocBytesWritten += tmp; - cache->newJournalEntry(firstDir2, tmp); - newJournalEntries.push_back(pair(newObj.key, tmp)); - } - catch (bf::filesystem_error& e) + const auto& journalData = resultPair.second; + const std::string oldJournalDataHeader = resultPair.second; + const auto newJournalName = + getJournalName((journalPath / firstDir2 / (newObj.key + ".journal")).string()); + const auto newJournalSizeName = + getJournalName((journalPath / firstDir2 / (newObj.key + "_size.journal")).string()); + + FDBCS::BlobHandler journalWriter(keyGen); + if (!journalWriter.writeBlob(kvStorage, newJournalName, journalData)) + logger->log(LOG_CRIT, "Cannot write new journal, while copying files."); + { - throw CFException(e.code().value(), string("IOCoordinator::copyFile(): source = ") + filename1 + - ", dest = " + filename2 + ". Got an error copying " + - journalFile.string() + ": " + - strerror_r(e.code().value(), errbuf, 80)); + auto tnx = kvStorage->createTransaction(); + tnx->set(newJournalSizeName, to_string(journalData.size())); + if (!tnx->commit()) + logger->log(LOG_CRIT, "Cannot write new journal size, while copying files."); } + + ++iocJournalsCreated; + iocBytesRead += journalData.size(); + iocBytesWritten += journalData.size(); + cache->newJournalEntry(firstDir2, journalData.size()); + newJournalEntries.push_back(pair(newObj.key, journalData.size())); } } } @@ -1126,9 +1124,12 @@ int IOCoordinator::copyFile(const char* _filename1, const char* _filename2) cs->deleteObject(newObject.key); for (auto& jEntry : newJournalEntries) { - bf::path fullJournalPath = journalPath / firstDir2 / (jEntry.first + ".journal"); + const auto fullJournalPath = + getJournalName((journalPath / firstDir2 / (jEntry.first + ".journal")).string()); + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto tnx = kvStorage->createTransaction(); + tnx->remove(fullJournalPath); cache->deletedJournal(firstDir2, bf::file_size(fullJournalPath)); - bf::remove(fullJournalPath); } errno = e.l_errno; return -1; @@ -1163,28 +1164,21 @@ const bf::path& IOCoordinator::getMetadataPath() const // first byte after the header. // update: had to make it also return the header; the boost json parser does not stop at either // a null char or the end of an object. -std::shared_ptr seekToEndOfHeader1(int fd, size_t* _bytesRead) +std::shared_ptr seekToEndOfHeader1_(const std::string& dataStr, size_t* _bytesRead) { - //::lseek(fd, 0, SEEK_SET); - std::shared_ptr ret(new char[100]); - int err; + const size_t numBytesToRead = dataStr.size(); + std::shared_ptr ret(new char[numBytesToRead]); + std::memcpy(ret.get(), &dataStr[0], numBytesToRead); - err = ::read(fd, ret.get(), 100); - if (err < 0) - { - char buf[80]; - throw runtime_error("seekToEndOfHeader1 got: " + string(strerror_r(errno, buf, 80))); - } - for (int i = 0; i < err; i++) + for (uint32_t i = 0; i < numBytesToRead; i++) { if (ret[i] == 0) { - ::lseek(fd, i + 1, SEEK_SET); *_bytesRead = i + 1; return ret; } } - throw runtime_error("seekToEndOfHeader1: did not find the end of the header"); + throw runtime_error("seekToEndOfHeader1_: did not find the end of the header"); } int IOCoordinator::mergeJournal(int objFD, int journalFD, uint8_t* buf, off_t offset, size_t* len) const @@ -1195,7 +1189,7 @@ int IOCoordinator::mergeJournal(int objFD, int journalFD, uint8_t* buf, off_t of std::shared_ptr IOCoordinator::mergeJournal(const char* object, const char* journal, off_t offset, size_t len, size_t* _bytesReadOut) const { - int objFD, journalFD; + int objFD; std::shared_ptr ret; size_t l_bytesRead = 0; @@ -1208,7 +1202,6 @@ std::shared_ptr IOCoordinator::mergeJournal(const char* object, const ScopedCloser s1(objFD); ret.reset(new uint8_t[len]); - // read the object into memory size_t count = 0; if (offset != 0) @@ -1249,29 +1242,34 @@ std::shared_ptr IOCoordinator::mergeJournal(const char* object, const return ret; } - journalFD = ::open(journal, O_RDONLY); - if (journalFD < 0) + // Read journal. + auto keyGen = std::make_shared(); + FDBCS::BlobHandler journalReader(keyGen); + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto resultPairJournal = journalReader.readBlob(kvStorage, journal); + if (!resultPairJournal.first) { *_bytesReadOut = l_bytesRead; return ret; } - ScopedCloser s2(journalFD); - std::shared_ptr headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead); + const std::string& journalData = resultPairJournal.second; + size_t journalOffset = 0; + std::shared_ptr headertxt = seekToEndOfHeader1_(journalData, &journalOffset); stringstream ss; ss << headertxt.get(); boost::property_tree::ptree header; boost::property_tree::json_parser::read_json(ss, header); assert(header.get("version") == 1); + l_bytesRead += journalOffset; + const size_t journalSize = journalData.size(); // start processing the entries - while (1) + while (journalOffset < journalSize) { uint64_t offlen[2]; - int err = ::read(journalFD, &offlen, 16); - if (err == 0) // got EOF - break; - assert(err == 16); + std::memcpy(&offlen, &journalData[journalOffset], 16); + journalOffset += 16; l_bytesRead += 16; // if this entry overlaps, read the overlapping section @@ -1284,45 +1282,15 @@ std::shared_ptr IOCoordinator::mergeJournal(const char* object, const // seek to the portion of the entry to start reading at if (startReadingAt != offlen[0]) - ::lseek(journalFD, startReadingAt - offlen[0], SEEK_CUR); - - uint count = 0; - while (count < lengthOfRead) - { - err = ::read(journalFD, &ret[startReadingAt - offset + count], lengthOfRead - count); - if (err < 0) - { - int l_errno = errno; - char buf[80]; - logger->log(LOG_ERR, "mergeJournal: got %s", strerror_r(l_errno, buf, 80)); - ret.reset(); - errno = l_errno; - l_bytesRead += count; - goto out; - } - else if (err == 0) - { - logger->log(LOG_ERR, - "mergeJournal: got early EOF. offset=%ld, len=%ld, jOffset=%ld, jLen=%ld," - " startReadingAt=%ld, lengthOfRead=%ld", - offset, len, offlen[0], offlen[1], startReadingAt, lengthOfRead); - ret.reset(); - l_bytesRead += count; - goto out; - } - count += err; - } - l_bytesRead += lengthOfRead; - - // advance the file pos if we didn't read to the end of the entry - if (startReadingAt - offlen[0] + lengthOfRead != offlen[1]) - ::lseek(journalFD, offlen[1] - (lengthOfRead + startReadingAt - offlen[0]), SEEK_CUR); + journalOffset += startReadingAt - offlen[0]; + //::lseek(journalFD, startReadingAt - offlen[0], SEEK_CUR); + std::memcpy(&ret[startReadingAt - offset], &journalData[journalOffset], lengthOfRead); + journalOffset += lengthOfRead; } else // skip over this journal entry - ::lseek(journalFD, offlen[1], SEEK_CUR); + journalOffset += offlen[1]; } -out: *_bytesReadOut = l_bytesRead; return ret; } @@ -1332,19 +1300,18 @@ std::shared_ptr IOCoordinator::mergeJournal(const char* object, const int IOCoordinator::mergeJournalInMem(std::shared_ptr& objData, size_t len, const char* journalPath, size_t* _bytesReadOut) const { - // if the journal is over some size threshold (100MB for now why not), - // use the original low-mem-usage version - if (len > (100 << 20)) - return mergeJournalInMem_bigJ(objData, len, journalPath, _bytesReadOut); - size_t l_bytesRead = 0; - int journalFD = ::open(journalPath, O_RDONLY); - if (journalFD < 0) + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto keyGen = std::make_shared(); + FDBCS::BlobHandler blobReader(keyGen); + auto resultPair = blobReader.readBlob(kvStorage, journalPath); + if (!resultPair.first) return -1; - ScopedCloser s(journalFD); + const std::string& journalData = resultPair.second; + size_t journalOffset = 0; // grab the journal header and make sure the version is 1 - std::shared_ptr headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead); + std::shared_ptr headertxt = seekToEndOfHeader1_(journalData, &l_bytesRead); stringstream ss; ss << headertxt.get(); boost::property_tree::ptree header; @@ -1352,136 +1319,46 @@ int IOCoordinator::mergeJournalInMem(std::shared_ptr& objData, size_t assert(header.get("version") == 1); // read the journal file into memory - size_t journalBytes = ::lseek(journalFD, 0, SEEK_END) - l_bytesRead; - ::lseek(journalFD, l_bytesRead, SEEK_SET); - boost::scoped_array journalData(new uint8_t[journalBytes]); + size_t journalBytes = journalData.size() - l_bytesRead; + journalOffset += l_bytesRead; size_t readCount = 0; - while (readCount < journalBytes) - { - ssize_t err = ::read(journalFD, &journalData[readCount], journalBytes - readCount); - if (err < 0) - { - char buf[80]; - int l_errno = errno; - logger->log(LOG_ERR, "mergeJournalInMem: got %s", strerror_r(errno, buf, 80)); - errno = l_errno; - return -1; - } - else if (err == 0) - { - logger->log(LOG_ERR, "mergeJournalInMem: got early EOF"); - errno = ENODATA; // is there a better errno for early EOF? - return -1; - } - readCount += err; - l_bytesRead += err; - } + readCount += journalBytes; + l_bytesRead += journalBytes; + const size_t journalSize = journalData.size(); // start processing the entries - size_t offset = 0; - while (offset < journalBytes) + while (journalOffset < journalSize) { - if (offset + 16 >= journalBytes) + if (journalOffset + 16 >= journalSize) { logger->log(LOG_ERR, "mergeJournalInMem: got early EOF"); errno = ENODATA; // is there a better errno for early EOF? return -1; } - uint64_t* offlen = (uint64_t*)&journalData[offset]; - offset += 16; + uint64_t* offlen = (uint64_t*)&journalData[journalOffset]; + journalOffset += 16; uint64_t startReadingAt = offlen[0]; uint64_t lengthOfRead = offlen[1]; + if (lengthOfRead == 0) + break; if (startReadingAt > len) { - offset += offlen[1]; + journalOffset += offlen[1]; continue; } if (startReadingAt + lengthOfRead > len) lengthOfRead = len - startReadingAt; - if (offset + lengthOfRead > journalBytes) + if (journalOffset + lengthOfRead > journalSize) { logger->log(LOG_ERR, "mergeJournalInMem: got early EOF"); errno = ENODATA; // is there a better errno for early EOF? return -1; } - memcpy(&objData[startReadingAt], &journalData[offset], lengthOfRead); - offset += offlen[1]; - } - *_bytesReadOut = l_bytesRead; - return 0; -} - -int IOCoordinator::mergeJournalInMem_bigJ(std::shared_ptr& objData, size_t len, - const char* journalPath, size_t* _bytesReadOut) const -{ - size_t l_bytesRead = 0; - int journalFD = ::open(journalPath, O_RDONLY); - if (journalFD < 0) - return -1; - ScopedCloser s(journalFD); - - // grab the journal header and make sure the version is 1 - std::shared_ptr headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead); - stringstream ss; - ss << headertxt.get(); - boost::property_tree::ptree header; - boost::property_tree::json_parser::read_json(ss, header); - assert(header.get("version") == 1); - - // start processing the entries - while (1) - { - uint64_t offlen[2]; - int err = ::read(journalFD, &offlen, 16); - if (err == 0) // got EOF - break; - else if (err < 16) - { - // punting on this - cout << "mergeJournalInMem: failed to read a journal entry header in one attempt. fixme..." << endl; - errno = ENODATA; - return -1; - } - l_bytesRead += 16; - - uint64_t startReadingAt = offlen[0]; - uint64_t lengthOfRead = offlen[1]; - - if (startReadingAt > len) - { - ::lseek(journalFD, offlen[1], SEEK_CUR); - continue; - } - - if (startReadingAt + lengthOfRead > len) - lengthOfRead = len - startReadingAt; - - uint count = 0; - while (count < lengthOfRead) - { - err = ::read(journalFD, &objData[startReadingAt + count], lengthOfRead - count); - if (err < 0) - { - char buf[80]; - int l_errno = errno; - logger->log(LOG_ERR, "mergeJournalInMem: got %s", strerror_r(errno, buf, 80)); - errno = l_errno; - return -1; - } - else if (err == 0) - { - logger->log(LOG_ERR, "mergeJournalInMem: got early EOF"); - errno = ENODATA; // is there a better errno for early EOF? - return -1; - } - count += err; - } - l_bytesRead += lengthOfRead; - if (lengthOfRead < offlen[1]) - ::lseek(journalFD, offlen[1] - lengthOfRead, SEEK_CUR); + std::memcpy(&objData[startReadingAt], &journalData[journalOffset], lengthOfRead); + journalOffset += offlen[1]; } *_bytesReadOut = l_bytesRead; return 0; diff --git a/storage-manager/src/IOCoordinator.h b/storage-manager/src/IOCoordinator.h index f499aad3e2..6ad0ab5f6c 100644 --- a/storage-manager/src/IOCoordinator.h +++ b/storage-manager/src/IOCoordinator.h @@ -37,7 +37,7 @@ namespace storagemanager { -std::shared_ptr seekToEndOfHeader1(int fd, size_t* bytesRead); +std::shared_ptr seekToEndOfHeader1_(const std::string& dataStr, size_t* bytesRead); class IOCoordinator : public boost::noncopyable { @@ -58,16 +58,12 @@ class IOCoordinator : public boost::noncopyable // The shared logic for merging a journal file with its base file. // len should be set to the length of the data requested std::shared_ptr mergeJournal(const char* objectPath, const char* journalPath, off_t offset, - size_t len, size_t* sizeRead) const; + size_t len, size_t* sizeRead) const; // this version modifies object data in memory, given the journal filename. Processes the whole object // and whole journal file. int mergeJournalInMem(std::shared_ptr& objData, size_t len, const char* journalPath, - size_t* sizeRead) const; - - // this version of MJIM has a higher IOPS requirement and lower mem usage. - int mergeJournalInMem_bigJ(std::shared_ptr& objData, size_t len, const char* journalPath, - size_t* sizeRead) const; + size_t* sizeRead) const; // this version takes already-open file descriptors, and an already-allocated buffer as input. // file descriptor are positioned, eh, best not to assume anything about their positions diff --git a/storage-manager/src/KVPrefixes.cpp b/storage-manager/src/KVPrefixes.cpp index 8183d9f695..0e5bc29865 100644 --- a/storage-manager/src/KVPrefixes.cpp +++ b/storage-manager/src/KVPrefixes.cpp @@ -15,10 +15,15 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -#include "KVPrefixes.hpp" +#include "KVPrefixes.h" namespace storagemanager { // FDB recommends keep the key size up to 32 bytes. -const char* KVPrefixes[2] = {/*OWNERSHIP*/ "SM_O_", /*META*/ "SM_M_"}; +const char* KVPrefixes[3] = {/*OWNERSHIP*/ "SM_O_", /*META*/ "SM_M_", /*JOURNAL*/ "SM_J_"}; + +std::string getJournalName(const std::string& key) +{ + return KVPrefixes[static_cast(KVPrefixId::SM_JOURNAL)] + key; +} } // namespace storagemanager diff --git a/storage-manager/src/KVPrefixes.hpp b/storage-manager/src/KVPrefixes.h similarity index 88% rename from storage-manager/src/KVPrefixes.hpp rename to storage-manager/src/KVPrefixes.h index 92d8b191c6..34ad4ce96b 100644 --- a/storage-manager/src/KVPrefixes.hpp +++ b/storage-manager/src/KVPrefixes.h @@ -16,14 +16,19 @@ MA 02110-1301, USA. */ #pragma once +#include +#include namespace storagemanager { enum class KVPrefixId { SM_OWNERSHIP = 0, - SM_META + SM_META, + SM_JOURNAL }; extern const char* KVPrefixes[]; + +std::string getJournalName(const std::string &key); } // namespace storagemanager diff --git a/storage-manager/src/MetadataFile.cpp b/storage-manager/src/MetadataFile.cpp index 8cf964c692..7070deac56 100644 --- a/storage-manager/src/MetadataFile.cpp +++ b/storage-manager/src/MetadataFile.cpp @@ -45,7 +45,7 @@ #include #include #include "fdbcs.hpp" -#include "KVPrefixes.hpp" +#include "KVPrefixes.h" #define max(x, y) (x > y ? x : y) #define min(x, y) (x < y ? x : y) diff --git a/storage-manager/src/Ownership.cpp b/storage-manager/src/Ownership.cpp index 35c8a19b4f..972f922609 100644 --- a/storage-manager/src/Ownership.cpp +++ b/storage-manager/src/Ownership.cpp @@ -20,7 +20,7 @@ #include "Cache.h" #include "Synchronizer.h" #include "KVStorageInitializer.h" -#include "KVPrefixes.hpp" +#include "KVPrefixes.h" #include "fdbcs.hpp" #include #include diff --git a/storage-manager/src/PrefixCache.cpp b/storage-manager/src/PrefixCache.cpp index 220346aedd..584b10d65b 100644 --- a/storage-manager/src/PrefixCache.cpp +++ b/storage-manager/src/PrefixCache.cpp @@ -17,6 +17,8 @@ #include "PrefixCache.h" #include "Cache.h" +#include "KVPrefixes.h" +#include "KVStorageInitializer.h" #include "Config.h" #include "Downloader.h" #include "Synchronizer.h" @@ -575,7 +577,8 @@ void PrefixCache::rename(const string& oldKey, const string& newKey, ssize_t siz int PrefixCache::ifExistsThenDelete(const string& key) { bf::path cachedPath = cachePrefix / key; - bf::path journalPath = journalPrefix / (key + ".journal"); + const auto journalName = getJournalName((journalPrefix / (key + ".journal")).string()); + const auto journalSizeName = getJournalName((journalPrefix / (key + "_size.journal")).string()); boost::unique_lock s(lru_mutex); bool objectExists = false; @@ -593,16 +596,18 @@ int PrefixCache::ifExistsThenDelete(const string& key) else // let makeSpace() delete it if it's already in progress return 0; } - bool journalExists = bf::exists(journalPath); - // assert(objectExists == bf::exists(cachedPath)); + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto tnx = kvStorage->createTransaction(); + auto journalResult = tnx->get(journalName); + auto journalSizeResult = tnx->get(journalSizeName); + bool journalExists = journalSizeResult.first; size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0); - // size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0); - size_t journalSize = (journalExists ? bf::file_size(journalPath) : 0); - currentCacheSize -= (objectSize + journalSize); - - // assert(!objectExists || objectSize == bf::file_size(cachedPath)); + size_t journalSize = 0; + if (journalExists) + journalSize = std::atoi(journalSizeResult.second.c_str()); + currentCacheSize -= (objectSize + journalSize); return (objectExists ? 1 : 0) | (journalExists ? 2 : 0); } diff --git a/storage-manager/src/Replicator.cpp b/storage-manager/src/Replicator.cpp index c4778a9907..606b5ffecc 100644 --- a/storage-manager/src/Replicator.cpp +++ b/storage-manager/src/Replicator.cpp @@ -20,6 +20,9 @@ #include "SMLogging.h" #include "Utilities.h" #include "Cache.h" +#include "KVStorageInitializer.h" +#include "KVPrefixes.h" +#include "fdbcs.hpp" #include #include #include @@ -121,9 +124,17 @@ void Replicator::printKPIs() const int Replicator::newObject(const boost::filesystem::path& filename, const uint8_t* data, off_t offset, size_t length) { - int fd, err; - string objectFilename = msCachePath + "/" + filename.string(); + const string objectFilename = msCachePath + "/" + filename.string(); + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto keyGen = std::make_shared(); + FDBCS::BlobHandler blobWriter(keyGen); + const std::string dataValue((char*)data); + if (!blobWriter.writeBlob(kvStorage, objectFilename, dataValue)) + { + return -1; + } + int fd, err; OPEN(objectFilename.c_str(), O_WRONLY | O_CREAT); size_t count = 0; while (count < length) @@ -147,6 +158,14 @@ int Replicator::newNullObject(const boost::filesystem::path& filename, size_t le { int fd, err; string objectFilename = msCachePath + "/" + filename.string(); + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto keyGen = std::make_shared(); + FDBCS::BlobHandler blobWriter(keyGen); + const std::string emptyString(length, 0); + if (!blobWriter.writeBlob(kvStorage, objectFilename, emptyString)) + { + return -1; + } OPEN(objectFilename.c_str(), O_WRONLY | O_CREAT); err = ftruncate(fd, length); @@ -215,54 +234,48 @@ ssize_t Replicator::_write(int fd, const void* data, size_t length) Benefits would be data integrity, and possibly add'l parallelism. The downside is of course, a higher number of IO ops for the same operation. */ + int Replicator::addJournalEntry(const boost::filesystem::path& filename, const uint8_t* data, off_t offset, size_t length) { - int fd, err; uint64_t offlen[] = {(uint64_t)offset, length}; - size_t count = 0; - int version = 1; - int l_errno; - char errbuf[80]; - bool bHeaderChanged = false; - string headerRollback = ""; - string journalFilename = msJournalPath + "/" + filename.string() + ".journal"; + const int version = 1; + const auto journalName = getJournalName(msJournalPath + "/" + filename.string() + ".journal"); + const auto journalSizeName = getJournalName(msJournalPath + "/" + filename.string() + "_size" + ".journal"); boost::filesystem::path firstDir = *((filename).begin()); - uint64_t thisEntryMaxOffset = (offset + length - 1); - - uint64_t currentMaxOffset = 0; - bool exists = boost::filesystem::exists(journalFilename); - OPEN(journalFilename.c_str(), (exists ? O_RDWR : O_WRONLY | O_CREAT)) - - if (!exists) + const uint64_t thisEntryMaxOffset = (offset + length - 1); + string dataStr; + size_t dataStrOffset = 0; + + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto keyGen = std::make_shared(); + FDBCS::BlobHandler journalHandler(keyGen); + auto resultPair = journalHandler.readBlob(kvStorage, journalName); + const std::string& journalData = resultPair.second; + const bool journalExists = resultPair.first; + if (!journalExists) { - bHeaderChanged = true; // create new journal file with header string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset) .str(); - err = _write(fd, header.c_str(), header.length() + 1); - l_errno = errno; - repHeaderDataWritten += (header.length() + 1); - if ((uint)err != (header.length() + 1)) - { - // return the error because the header for this entry on a new journal file failed - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Writing journal header failed (%s).", - strerror_r(l_errno, errbuf, 80)); - errno = l_errno; - return err; - } - Cache::get()->newJournalEntry(firstDir, header.length() + 1); + const size_t headerLength = header.length(); + dataStr.resize(headerLength + 1 + JOURNAL_ENTRY_HEADER_SIZE + length); + std::memcpy(&dataStr[dataStrOffset], header.c_str(), headerLength); + // Specifies the end of the header. + dataStr[headerLength] = 0; + dataStrOffset = headerLength + 1; + repHeaderDataWritten += headerLength + 1; + Cache::get()->newJournalEntry(firstDir, headerLength + 1); ++replicatorJournalsCreated; } else { - // read the existing header and check if max_offset needs to be updated size_t tmp; std::shared_ptr headertxt; try { - headertxt = seekToEndOfHeader1(fd, &tmp); + headertxt = seekToEndOfHeader1_(journalData, &tmp); } catch (std::runtime_error& e) { @@ -278,7 +291,6 @@ int Replicator::addJournalEntry(const boost::filesystem::path& filename, const u } stringstream ss; ss << headertxt.get(); - headerRollback = headertxt.get(); boost::property_tree::ptree header; try { @@ -297,164 +309,61 @@ int Replicator::addJournalEntry(const boost::filesystem::path& filename, const u return -1; } assert(header.get("version") == 1); - uint64_t currentMaxOffset = header.get("max_offset"); + const uint64_t currentMaxOffset = header.get("max_offset"); + dataStr.resize(journalData.size() + JOURNAL_ENTRY_HEADER_SIZE + length); + size_t journalOffset = 0; + if (thisEntryMaxOffset > currentMaxOffset) { - bHeaderChanged = true; string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset) .str(); - err = _pwrite(fd, header.c_str(), header.length() + 1, 0); - l_errno = errno; - repHeaderDataWritten += (header.length() + 1); - if ((uint)err != (header.length() + 1)) - { - // only the header was possibly changed rollback attempt - mpLogger->log(LOG_CRIT, - "Replicator::addJournalEntry: Updating journal header failed. " - "Attempting to rollback and continue."); - int rollbackErr = _pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1, 0); - if ((uint)rollbackErr == (headerRollback.length() + 1)) - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header success."); - else - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed!"); - errno = l_errno; - if (err < 0) - return err; - else - return 0; - } + const size_t headerLenght = header.length(); + std::memcpy(&dataStr[0], header.c_str(), headerLenght); + dataStr[headerLenght] = 0; + dataStrOffset = headerLenght + 1; + journalOffset = headerLenght + 1; + repHeaderDataWritten += headerLenght + 1; } - } - off_t entryHeaderOffset = ::lseek(fd, 0, SEEK_END); + std::memcpy(&dataStr[dataStrOffset], &journalData[journalOffset], journalData.size() - journalOffset); + dataStrOffset = journalData.size(); + } - err = _write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE); - l_errno = errno; + std::memcpy(&dataStr[dataStrOffset], offlen, JOURNAL_ENTRY_HEADER_SIZE); + dataStrOffset += JOURNAL_ENTRY_HEADER_SIZE; repHeaderDataWritten += JOURNAL_ENTRY_HEADER_SIZE; - if (err != JOURNAL_ENTRY_HEADER_SIZE) + std::memcpy(&dataStr[dataStrOffset], data, length); + dataStrOffset += length; + assert(dataStr.size() == dataStrOffset); + + if (journalExists && !journalHandler.removeBlob(kvStorage, journalName)) { - // this entry failed so if the header was updated roll it back - if (bHeaderChanged) - { - mpLogger->log(LOG_CRIT, - "Replicator::addJournalEntry: write journal entry header failed. Attempting to rollback " - "and continue."); - // attempt to rollback top level header - int rollbackErr = _pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1, 0); - if ((uint)rollbackErr != (headerRollback.length() + 1)) - { - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed! (%s)", - strerror_r(errno, errbuf, 80)); - errno = l_errno; - if (err < 0) - return err; - else - return 0; - } - } - int rollbackErr = ::ftruncate(fd, entryHeaderOffset); - if (rollbackErr != 0) - { - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Truncate to previous EOF failed! (%s)", - strerror_r(errno, errbuf, 80)); - errno = l_errno; - if (err < 0) - return err; - else - return 0; - } - l_errno = errno; - return err; + mpLogger->log(LOG_CRIT, "Cannot remove journal blob."); + errno = EIO; + return -1; } - while (count < length) + + if (!journalHandler.writeBlob(kvStorage, journalName, dataStr)) + { + mpLogger->log(LOG_CRIT, "Cannot write journal blob."); + errno = EIO; + return -1; + } + { - err = _write(fd, &data[count], length - count); - if (err < 0) + auto tnx = kvStorage->createTransaction(); + tnx->set(journalSizeName, std::to_string(dataStr.size())); + if (!tnx->commit()) { - l_errno = errno; - /* XXXBEN: Attempt to update entry header with the partial write and write it. - IF the write fails to update entry header report an error to IOC and logging. - */ - if (count > 0) // return what was successfully written - { - mpLogger->log(LOG_CRIT, - "Replicator::addJournalEntry: Got '%s' writing a journal entry. " - "Attempting to update and continue.", - strerror_r(l_errno, errbuf, 80)); - // Update the file header max_offset if necessary and possible - thisEntryMaxOffset = (offset + count - 1); - if (thisEntryMaxOffset > currentMaxOffset) - { - string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % - thisEntryMaxOffset) - .str(); - int rollbackErr = _pwrite(fd, header.c_str(), header.length() + 1, 0); - if ((uint)rollbackErr != (header.length() + 1)) - { - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Update of journal header failed! (%s)", - strerror_r(errno, errbuf, 80)); - errno = l_errno; - return err; - } - } - // Update the journal entry header - offlen[1] = count; - int rollbackErr = _pwrite(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE, entryHeaderOffset); - if ((uint)rollbackErr != JOURNAL_ENTRY_HEADER_SIZE) - { - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Update of journal entry header failed! (%s)", - strerror_r(errno, errbuf, 80)); - errno = l_errno; - return err; - } - // return back what we did write - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Partial write success."); - repUserDataWritten += count; - return count; - } - else - { - // If the header was changed rollback and reset EOF - // Like this never happened - // Currently since this returns the err from the first write. IOC returns -1 and writeTask returns an - // error So system is likely broken in some way - if (bHeaderChanged) - { - mpLogger->log(LOG_CRIT, - "Replicator::addJournalEntry: write journal entry failed (%s). " - "Attempting to rollback and continue.", - strerror_r(l_errno, errbuf, 80)); - // attempt to rollback top level header - string header = - (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % 0).str(); - int rollbackErr = _pwrite(fd, header.c_str(), header.length() + 1, 0); - if ((uint)rollbackErr != (header.length() + 1)) - { - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed (%s)!", - strerror_r(errno, errbuf, 80)); - errno = l_errno; - return err; - } - } - int rollbackErr = ::ftruncate(fd, entryHeaderOffset); - if (rollbackErr != 0) - { - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Remove entry header failed (%s)!", - strerror_r(errno, errbuf, 80)); - errno = l_errno; - return err; - } - mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Write failed. Journal file restored."); - errno = l_errno; - return err; - } + mpLogger->log(LOG_CRIT, "Cannot write journal size."); + errno = EIO; + return -1; } - count += err; } - repUserDataWritten += count; - return count; + repUserDataWritten += length; + return length; } int Replicator::remove(const boost::filesystem::path& filename, Flags flags) @@ -484,6 +393,27 @@ int Replicator::remove(const boost::filesystem::path& filename, Flags flags) return ret; } +int Replicator::removeJournal(const boost::filesystem::path& filename) +{ + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto keyGen = std::make_shared(); + FDBCS::BlobHandler journalHandler(keyGen); + if (!journalHandler.removeBlob(kvStorage, filename.string())) + { + return -1; + } + return 0; +} + +int Replicator::removeJournalSize(const boost::filesystem::path& filename) +{ + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto tnx = kvStorage->createTransaction(); + tnx->remove(filename.string()); + return tnx->commit(); +} + + int Replicator::updateMetadata(MetadataFile& meta) { return meta.writeMetadata(); diff --git a/storage-manager/src/Replicator.h b/storage-manager/src/Replicator.h index 165c60e456..bc237b3086 100644 --- a/storage-manager/src/Replicator.h +++ b/storage-manager/src/Replicator.h @@ -43,13 +43,14 @@ class Replicator LOCAL_ONLY = 0x1, NO_LOCAL = 0x2 }; - int addJournalEntry(const boost::filesystem::path& filename, const uint8_t* data, off_t offset, size_t length); int newObject(const boost::filesystem::path& filename, const uint8_t* data, off_t offset, size_t length); int newNullObject(const boost::filesystem::path& filename, size_t length); int remove(const boost::filesystem::path& file, Flags flags = NONE); + int removeJournal(const boost::filesystem::path &file); + int removeJournalSize(const boost::filesystem::path &file); int updateMetadata(MetadataFile& meta); diff --git a/storage-manager/src/Synchronizer.cpp b/storage-manager/src/Synchronizer.cpp index 62315d9902..99a7ff0fb5 100644 --- a/storage-manager/src/Synchronizer.cpp +++ b/storage-manager/src/Synchronizer.cpp @@ -20,6 +20,8 @@ #include "IOCoordinator.h" #include "MetadataFile.h" #include "Utilities.h" +#include "KVStorageInitializer.h" +#include "KVPrefixes.h" #include #include @@ -238,8 +240,12 @@ void Synchronizer::flushObject(const bf::path& prefix, const string& _key) sleep(5); } } while (err); - journalExists = bf::exists(journalPath / (key + ".journal")); + const auto jounralName = getJournalName((journalPath / (key + ".journal")).string()); + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto tnx = kvStorage->createTransaction(); + auto resultPair = tnx->get(jounralName); + journalExists = resultPair.first; if (journalExists) { logger->log(LOG_DEBUG, @@ -292,8 +298,8 @@ void Synchronizer::periodicSync() else ++flushesTriggeredByTimer; } - // cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " << - // threadPool.currentQueueSize() << endl; + // cout << "Sync'ing " << pendingOps.size() << " objects" + // << " queue size is " << threadPool->currentQueueSize() << endl; for (auto& job : pendingOps) makeJob(job.first); for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it) @@ -575,11 +581,18 @@ void Synchronizer::synchronizeWithJournal(const string& sourceFile, list cache->deletedObject(prefix, cloudKey, objSize); cs->deleteObject(cloudKey); } - bf::path jPath = journalPath / (key + ".journal"); - if (bf::exists(jPath)) + + const auto journalName = getJournalName((journalPath / (key + ".journal")).string()); + const auto journalSizeName = getJournalName((journalPath / (key + "_size.journal")).string()); + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto tnx = kvStorage->createTransaction(); + auto resultPairJournal = tnx->get(journalName); + auto resultPairJournalSize = tnx->get(journalName); + if (resultPairJournal.first && resultPairJournalSize.first) { - size_t jSize = bf::file_size(jPath); - replicator->remove(jPath); + size_t jSize = std::atoi(resultPairJournalSize.second.c_str()); + replicator->removeJournal(journalName); + replicator->removeJournalSize(journalSizeName); cache->deletedJournal(prefix, jSize); } } @@ -605,43 +618,48 @@ void Synchronizer::synchronizeWithJournal(const string& sourceFile, list // sync queue bf::path oldCachePath = cachePath / key; - string journalName = (journalPath / (key + ".journal")).string(); - - if (!bf::exists(journalName)) + const string journalName = getJournalName((journalPath / (key + ".journal")).string()); + const auto journalSizeName = getJournalName((journalPath / (key + "_size.journal")).string()); { - logger->log(LOG_DEBUG, "synchronizeWithJournal(): no journal file found for %s", key.c_str()); - - // sanity check + add'l info. Test whether the object exists in cloud storage. If so, complain, - // and run synchronize() instead. - bool existsOnCloud; - int err = cs->exists(cloudKey, &existsOnCloud); - if (err) - throw runtime_error(string("Synchronizer: cs->exists() failed: ") + strerror_r(errno, buf, 80)); - if (!existsOnCloud) + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto tnx = kvStorage->createTransaction(); + auto resultPair = tnx->get(journalName); + if (!resultPair.first) { - if (cache->exists(prefix, cloudKey)) + logger->log(LOG_DEBUG, "synchronizeWithJournal(): no journal file found for %s", key.c_str()); + + // sanity check + add'l info. Test whether the object exists in cloud storage. If so, complain, + // and run synchronize() instead. + bool existsOnCloud; + int err = cs->exists(cloudKey, &existsOnCloud); + if (err) + throw runtime_error(string("Synchronizer: cs->exists() failed: ") + strerror_r(errno, buf, 80)); + if (!existsOnCloud) { - logger->log(LOG_DEBUG, - "synchronizeWithJournal(): %s has no journal and does not exist in the cloud, calling " - "synchronize() instead. Need to explain how this happens.", - key.c_str()); - s.unlock(); - synchronize(sourceFile, lit); + if (cache->exists(prefix, cloudKey)) + { + logger->log(LOG_DEBUG, + "synchronizeWithJournal(): %s has no journal and does not exist in the cloud, calling " + "synchronize() instead. Need to explain how this happens.", + key.c_str()); + s.unlock(); + synchronize(sourceFile, lit); + } + else + logger->log(LOG_DEBUG, + "synchronizeWithJournal(): %s has no journal, and does not exist in the cloud or in " + " the local cache. Need to explain how this happens.", + key.c_str()); + return; } else logger->log(LOG_DEBUG, - "synchronizeWithJournal(): %s has no journal, and does not exist in the cloud or in " - " the local cache. Need to explain how this happens.", + "synchronizeWithJournal(): %s has no journal, but it does exist in the cloud. " + " This indicates that an overlapping syncWithJournal() call handled it properly.", key.c_str()); + return; } - else - logger->log(LOG_DEBUG, - "synchronizeWithJournal(): %s has no journal, but it does exist in the cloud. " - " This indicates that an overlapping syncWithJournal() call handled it properly.", - key.c_str()); - - return; } int err; @@ -649,7 +667,6 @@ void Synchronizer::synchronizeWithJournal(const string& sourceFile, list size_t count = 0, size = mdEntry.length, originalSize = 0; bool oldObjIsCached = cache->exists(prefix, cloudKey); - // get the base object if it is not already cached // merge it with its journal file if (!oldObjIsCached) @@ -685,7 +702,10 @@ void Synchronizer::synchronizeWithJournal(const string& sourceFile, list err = ioc->mergeJournalInMem(data, size, journalName.c_str(), &_bytesRead); if (err) { - if (!bf::exists(journalName)) + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto tnx = kvStorage->createTransaction(); + auto resultPair = tnx->get(journalName); + if (!resultPair.first) logger->log(LOG_DEBUG, "synchronizeWithJournal(): journal %s was deleted mid-operation, check locking", journalName.c_str()); @@ -704,7 +724,10 @@ void Synchronizer::synchronizeWithJournal(const string& sourceFile, list data = ioc->mergeJournal(oldCachePath.string().c_str(), journalName.c_str(), 0, size, &_bytesRead); if (!data) { - if (!bf::exists(journalName)) + auto kvStorage = KVStorageInitializer::getStorageInstance(); + auto tnx = kvStorage->createTransaction(); + auto resultPair = tnx->get(journalName); + if (!resultPair.first) logger->log(LOG_DEBUG, "synchronizeWithJournal(): journal %s was deleted mid-operation, check locking", journalName.c_str()); @@ -790,8 +813,17 @@ void Synchronizer::synchronizeWithJournal(const string& sourceFile, list rename(key, newKey); // delete the old object & journal file - cache->deletedJournal(prefix, bf::file_size(journalName)); - replicator->remove(journalName); + auto kvStorage = KVStorageInitializer::getStorageInstance(); + { + auto tnx = kvStorage->createTransaction(); + auto result = tnx->get(journalSizeName); + if (result.first) + cache->deletedJournal(prefix, std::atoi(result.second.c_str())); + } + + // Remove journal size name from kv storage. + replicator->removeJournalSize(journalSizeName); + replicator->removeJournal(journalName); cs->deleteObject(cloudKey); } @@ -887,7 +919,7 @@ void Synchronizer::configListener() { maxUploads = 20; threadPool->setMaxThreads(maxUploads); - logger->log(LOG_INFO, "max_concurrent_uploads = %u",maxUploads); + logger->log(LOG_INFO, "max_concurrent_uploads = %u", maxUploads); } if (stmp.empty()) {