diff --git a/dbcon/dmlpackage/CMakeLists.txt b/dbcon/dmlpackage/CMakeLists.txt index ccd0b09856..2e05bae5f1 100644 --- a/dbcon/dmlpackage/CMakeLists.txt +++ b/dbcon/dmlpackage/CMakeLists.txt @@ -33,6 +33,7 @@ ADD_LIBRARY(dmlpackage SHARED commanddmlpackage.cpp dmlpkg.cpp dmlparser.cpp + vacuumpartitiondmlpackage.cpp ${BISON_dml_gram_OUTPUTS} ${FLEX_dml_scan_OUTPUTS} ) diff --git a/dbcon/dmlpackage/dmlpkg.h b/dbcon/dmlpackage/dmlpkg.h index c85bbb25f0..fea27b9b3a 100644 --- a/dbcon/dmlpackage/dmlpkg.h +++ b/dbcon/dmlpackage/dmlpkg.h @@ -98,7 +98,8 @@ enum DML_TYPE DML_UPDATE, DML_DELETE, DML_COMMAND, - DML_INVALID_TYPE + DML_INVALID_TYPE, + DML_VACUUM_PARTITION }; /** @brief SqlStatement represents a toplevel diff --git a/dbcon/dmlpackage/vacuumpartitiondmlpackage.cpp b/dbcon/dmlpackage/vacuumpartitiondmlpackage.cpp new file mode 100644 index 0000000000..cef4a8c027 --- /dev/null +++ b/dbcon/dmlpackage/vacuumpartitiondmlpackage.cpp @@ -0,0 +1,81 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + + MA 02110-1301, USA. */ + +#define VACUUMPARTITIONDMLPKG_DLLEXPORT +#include "vacuumpartitiondmlpackage.h" +#undef VACUUMPARTITIONDMLPKG_DLLEXPORT + +namespace dmlpackage +{ + +VacuumPartitionDMLPackage::VacuumPartitionDMLPackage(std::string schemaName, std::string tableName, + std::string dmlStatement, int sessionID, + BRM::LogicalPartition partition) + : CalpontDMLPackage(schemaName, tableName, dmlStatement, sessionID), fPartition(partition) +{ +} + +VacuumPartitionDMLPackage::~VacuumPartitionDMLPackage() = default; + +int VacuumPartitionDMLPackage::write(messageqcpp::ByteStream& bytestream) +{ + bytestream << (uint8_t)DML_VACUUM_PARTITION; + bytestream << (uint32_t)fSessionID; + + bytestream << fDMLStatement; + bytestream << fSchemaName; + bytestream << fTableName; + + fPartition.serialize(bytestream); + + return 1; +} + +int VacuumPartitionDMLPackage::read(messageqcpp::ByteStream& bytestream) +{ + bytestream >> fSessionID; + + bytestream >> fDMLStatement; + bytestream >> fSchemaName; + bytestream >> fTableName; + + fPartition.unserialize(bytestream); + + return 1; +} + +int VacuumPartitionDMLPackage::buildFromSqlStatement(SqlStatement& sqlStatement) +{ + throw std::logic_error("VacuumPartitionDMLPackage::buildFromSqlStatement is not implemented yet."); + return 1; +} + +int VacuumPartitionDMLPackage::buildFromBuffer(std::string& buffer, int columns, int rows) +{ + throw std::logic_error("VacuumPartitionDMLPackage::buildFromBuffer is not implemented yet."); + return 1; +} + +int VacuumPartitionDMLPackage::buildFromMysqlBuffer(ColNameList& colNameList, TableValuesMap& tableValuesMap, + int columns, int rows, NullValuesBitset& nullValues) +{ + throw std::logic_error("VacuumPartitionDMLPackage::buildFromMysqlBuffer is not implemented yet."); + return 1; +} + +} // namespace dmlpackage diff --git a/dbcon/dmlpackage/vacuumpartitiondmlpackage.h b/dbcon/dmlpackage/vacuumpartitiondmlpackage.h new file mode 100644 index 0000000000..91c0f67803 --- /dev/null +++ b/dbcon/dmlpackage/vacuumpartitiondmlpackage.h @@ -0,0 +1,95 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + + MA 02110-1301, USA. */ + +#pragma once + +#include +#include "calpontdmlpackage.h" +#include "bytestream.h" + +#define EXPORT + +namespace dmlpackage +{ +/** @brief concrete implementation of a CalpontDMLPackage + * Specifically for representing MCS_VACUUM_PARTITION_BLOAT Statements + */ +class VacuumPartitionDMLPackage : public CalpontDMLPackage +{ + public: + EXPORT VacuumPartitionDMLPackage() = default; + + /** @brief ctor + * + * @param schemaName the schema of the table being operated on + * @param tableName the name of the table being operated on + * @param dmlStatement the dml statement + * @param sessionID the session ID + * @param partition partition to be vacuumed + */ + EXPORT VacuumPartitionDMLPackage(std::string schemaName, std::string tableName, std::string dmlStatement, + int sessionID, BRM::LogicalPartition partition); + + /** @brief dtor + */ + EXPORT virtual ~VacuumPartitionDMLPackage(); + + /** @brief write a VacuumPartitionDMLPackage to a ByteStream + * + * @param bytestream the ByteStream to write to + */ + EXPORT int write(messageqcpp::ByteStream& bytestream); + + /** @brief read a VacuumPartitionDMLPackage from a ByteStream + * + * @param bytestream the ByteStream to read from + */ + EXPORT int read(messageqcpp::ByteStream& bytestream); + + /** @brief build a VacuumPartitionDMLPackage from a string buffer + * + * @param buffer [rowId, columnName, colValue] + * @param columns the number of columns in the buffer + * @param rows the number of rows in the buffer + */ + EXPORT int buildFromBuffer(std::string& buffer, int columns, int rows); + + /** @brief build a VacuumPartitionDMLPackage from a parsed DeleteSqlStatement + * + * @param sqlStatement the parsed DeleteSqlStatement + */ + EXPORT int buildFromSqlStatement(SqlStatement& sqlStatement); + /** @brief build a InsertDMLPackage from MySQL buffer + * + * @param colNameList, tableValuesMap + * @param rows the number of rows in the buffer + */ + EXPORT int buildFromMysqlBuffer(ColNameList& colNameList, TableValuesMap& tableValuesMap, int columns, + int rows, NullValuesBitset& nullValues); + + BRM::LogicalPartition getPartition() const + { + return fPartition; + } + + private: + BRM::LogicalPartition fPartition; // The partition number +}; +} // namespace dmlpackage + +#undef EXPORT diff --git a/dbcon/dmlpackageproc/CMakeLists.txt b/dbcon/dmlpackageproc/CMakeLists.txt index 6717045e83..d2d17be366 100644 --- a/dbcon/dmlpackageproc/CMakeLists.txt +++ b/dbcon/dmlpackageproc/CMakeLists.txt @@ -11,7 +11,8 @@ set(dmlpackageproc_LIB_SRCS updatepackageprocessor.cpp commandpackageprocessor.cpp autoincrementdata.cpp - tablelockdata.cpp) + tablelockdata.cpp + vacuumpartitionpackageprocessor.cpp) add_library(dmlpackageproc SHARED ${dmlpackageproc_LIB_SRCS}) diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.h b/dbcon/dmlpackageproc/dmlpackageprocessor.h index ec7956c817..b584229037 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.h +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.h @@ -99,7 +99,8 @@ class DMLPackageProcessor JOB_ERROR, JOB_CANCELED, DBRM_READ_ONLY, - PP_LOST_CONNECTION + PP_LOST_CONNECTION, + VACUUM_ERROR }; enum DebugLevel /** @brief Debug level type enumeration */ diff --git a/dbcon/dmlpackageproc/vacuumpartitionpackageprocessor.cpp b/dbcon/dmlpackageproc/vacuumpartitionpackageprocessor.cpp new file mode 100644 index 0000000000..d1f66310b2 --- /dev/null +++ b/dbcon/dmlpackageproc/vacuumpartitionpackageprocessor.cpp @@ -0,0 +1,927 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include + +#include "vacuumpartitionpackageprocessor.h" + +#include "messagelog.h" +#include "simplecolumn.h" +#include "messagelog.h" +#include "sqllogger.h" +#include "dbrm.h" +#include "idberrorinfo.h" +#include "errorids.h" +#include "rowgroup.h" +#include "bytestream.h" +#include "we_messages.h" +#include "oamcache.h" +#include "tablelockdata.h" +#include "bytestream.h" +#include "mockutils.h" + +namespace dmlpackageprocessor +{ +DMLPackageProcessor::DMLResult VacuumPartitionPackageProcessor::processPackageInternal( + dmlpackage::CalpontDMLPackage& cpackage) +{ + SUMMARY_INFO("VacuumPartitionPackageProcessor::processPackageInternal"); + + DMLResult result; + result.result = NO_ERROR; + + BRM::TxnID txnID; + txnID.id = cpackage.get_TxnID(); + txnID.valid = true; + + if (int rc = fDbrm->isReadWrite(); rc != 0) + { + logging::Message::Args args; + logging::Message message(9); + args.add("Unable to execute the statement due to DBRM is read only"); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + fSessionManager.rolledback(txnID); + return result; + } + + auto vacuumPartitionPkg = dynamic_cast(&cpackage); + if (!vacuumPartitionPkg) + { + logging::Message::Args args; + logging::Message message(9); + args.add("VacuumPartitionDMLPackage wrong cast"); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + return result; + } + + fSessionID = cpackage.get_SessionID(); + VERBOSE_INFO("VacuumPartitionPackageProcessor is processing CalpontDMLPackage ..."); + + uint64_t uniqueId = 0; + try + { + uniqueId = fDbrm->getUnique64(); + } + catch (std::exception& ex) + { + logging::Message::Args args; + logging::Message message(9); + args.add(ex.what()); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + fSessionManager.rolledback(txnID); + return result; + } + catch (...) + { + logging::Message::Args args; + logging::Message message(9); + args.add("Unknown error occurred while getting unique number."); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + fSessionManager.rolledback(txnID); + return result; + } + fWEClient->addQueue(uniqueId); + + TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID); + + auto systemCatalogPtr = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID); + execplan::CalpontSystemCatalog::TableName tableName(cpackage.get_SchemaName(), cpackage.get_TableName()); + execplan::CalpontSystemCatalog::ROPair roPair; + try + { + string stmt = + cpackage.get_SQLStatement() + "|" + cpackage.get_SchemaName() + "|" + cpackage.get_TableName(); + logging::SQLLogger sqlLogger(stmt, DMLLoggingId, fSessionID, txnID.id); + + roPair = systemCatalogPtr->tableRID(tableName); + // Check whether this table is locked already for this session + uint64_t tableLockId = tablelockData->getTablelockId(roPair.objnum); + if (!tableLockId) + { + uint32_t processID = ::getpid(); + int32_t txnId = txnID.id; + std::string processName("DMLProc"); + int32_t sessionId = fSessionID; + oam::OamCache* oamcache = oam::OamCache::makeOamCache(); + + std::vector pmList = oamcache->getModuleIds(); + std::vector pms; + for (unsigned i = 0; i < pmList.size(); i++) + { + pms.push_back((uint32_t)pmList[i]); + } + + try + { + tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnId, + BRM::LOADING); + } + catch (std::exception&) + { + throw std::runtime_error(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_HARD_FAILURE)); + } + + // Retry 30 times when failed to get lock + if (!tableLockId) + { + int waitPeriod = 10; + int sleepTime = 100; // sleep 100 milliseconds between checks + int maxTries = 30; // try 30 times (3 seconds) + waitPeriod = WriteEngine::Config::getWaitPeriod(); + maxTries = waitPeriod * 10; + struct timespec rm_ts; + + rm_ts.tv_sec = sleepTime / 1000; + rm_ts.tv_nsec = sleepTime % 1000 * 1000000; + + int numTries = 0; + for (; numTries < maxTries; numTries++) + { + struct timespec abs_ts; + + do + { + abs_ts.tv_sec = rm_ts.tv_sec; + abs_ts.tv_nsec = rm_ts.tv_nsec; + } while (nanosleep(&abs_ts, &rm_ts) < 0); + + try + { + processID = ::getpid(); + txnId = txnID.id; + sessionId = fSessionID; + processName = "DMLProc"; + tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, + &txnId, BRM::LOADING); + } + catch (std::exception&) + { + throw std::runtime_error(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_HARD_FAILURE)); + } + + if (tableLockId > 0) + break; + + if (numTries >= maxTries) + { + result.result = VACUUM_ERROR; + logging::Message::Args args; + args.add("Vacuum"); + args.add(processName); + args.add((uint64_t)processID); + args.add(sessionId); + throw std::runtime_error( + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_TABLE_LOCKED, args)); + } + } + } + } + + tablelockData->setTablelock(roPair.objnum, tableLockId); + execplan::CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(tableName); + execplan::CalpontSystemCatalog::ColType colType; + + for (const auto& [_, objnum] : ridList) + { + // If user hit ctrl+c in the mysql console, fRollbackPending will be true. + if (fRollbackPending) + { + result.result = JOB_CANCELED; + break; + } + + colType = systemCatalogPtr->colType(objnum); + + if (colType.autoincrement) + { + try + { + uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); + fDbrm->startAISequence(objnum, nextVal, colType.colWidth, colType.colDataType); + break; + } + catch (std::exception& ex) + { + result.result = VACUUM_ERROR; + throw std::runtime_error(ex.what()); + } + } + } + + uint32_t rowsProcessed = doVacuumRows(*vacuumPartitionPkg, result, uniqueId, roPair.objnum); + + if (result.result == JOB_CANCELED) + throw std::runtime_error("Query execution was interrupted"); + + if ((result.result != 0) && (result.result != DMLPackageProcessor::IDBRANGE_WARNING)) + throw std::runtime_error(result.message.msg()); + + result.rowCount = rowsProcessed; + + logging::logDML(cpackage.get_SessionID(), txnID.id, cpackage.get_SQLStatement(), + cpackage.get_SchemaName()); + } + catch (exception& ex) + { + if (checkPPLostConnection(ex)) + { + result.result = PP_LOST_CONNECTION; + } + else + { + cerr << "VacuumPartitionPackageProcessor::processPackage: " << ex.what() << endl; + + if (result.result == 0) + { + result.result = VACUUM_ERROR; + } + + result.message = logging::Message(ex.what()); + } + } + catch (...) + { + cerr << "VacuumPartitionPackageProcessor::processPackage: caught unknown exception!" << endl; + logging::Message::Args args; + logging::Message message(7); + args.add("Delete Failed: "); + args.add("encountered unknown exception"); + args.add(result.message.msg()); + args.add(""); + message.format(args); + + result.result = VACUUM_ERROR; + result.message = message; + } + + std::map oids; + int rc = 0; + + if (result.result == NO_ERROR) + { + rc = flushDataFiles(result.result, oids, uniqueId, txnID, roPair.objnum); + + if (rc != NO_ERROR) + { + cerr << "VacuumPartitionPackageProcessor::processPackage: write data to disk failed" << endl; + + if (!fRollbackPending) + { + logging::Message::Args args; + logging::Message message(7); + args.add("vacuum Failed: "); + args.add("error when writing data to disk"); + args.add(""); + args.add(""); + message.format(args); + + result.result = VACUUM_ERROR; + result.message = message; + } + + result.rowCount = 0; + rc = endTransaction(uniqueId, txnID, false); + + if ((rc != NO_ERROR) && (!fRollbackPending)) + { + logging::Message::Args args; + logging::Message message(7); + args.add("Delete Failed: "); + args.add("error when cleaning up data files"); + args.add(""); + args.add(""); + message.format(args); + + result.result = VACUUM_ERROR; + result.message = message; + result.rowCount = 0; + } + } + else + { + if (fRollbackPending) + rc = endTransaction(uniqueId, txnID, false); + else + rc = endTransaction(uniqueId, txnID, true); + } + } + else + { + rc = flushDataFiles(result.result, oids, uniqueId, txnID, roPair.objnum); + result.rowCount = 0; + rc = endTransaction(uniqueId, txnID, false); + } + + if (fRollbackPending) + { + result.result = JOB_CANCELED; + logging::Message::Args args; + args.add("Query execution was interrupted"); + result.message.format(args); + } + + fWEClient->removeQueue(uniqueId); + + VERBOSE_INFO("Finished Processing Delete DML Package"); + return result; +} + +uint64_t VacuumPartitionPackageProcessor::doVacuumRows(dmlpackage::VacuumPartitionDMLPackage& package, + DMLResult& result, const uint64_t uniqueId, + const uint32_t tableOid) +{ + messageqcpp::ByteStream bs, bsCpy, bsErrMsg; + rowgroup::RGData rgData; + uint32_t qb = 4; + bs << qb; + std::unique_ptr rowGroup; + uint64_t rowsProcessed = 0; + uint32_t dbroot = 1; + oam::OamCache* oamCache = oam::OamCache::makeOamCache(); + std::vector pmIds = oamCache->getModuleIds(); + std::map pmState; + std::string errMsg; + bool err = false; + + try + { + for (const int& pmId : pmIds) + { + pmState[pmId] = true; + } + + fExeMgr->write(bs); + // TODO select * from t where idbpartition(col) = "*.*.*" + fExeMgr->write(*(package.get_ExecutionPlan())); + + bs.restart(); + bs = fExeMgr->read(); + if (bs.length() == 4) + { + bs >> qb; + + if (qb != 0) + err = true; + } + else + { + qb = 999; + err = true; + } + + if (err) + { + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: ExeMgr Error"); + args.add((int)qb); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + return rowsProcessed; + } + + bsErrMsg.restart(); + bsErrMsg = fExeMgr->read(); + if (bsErrMsg.empty()) + { + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add("Lost connection to ExeMgr"); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + return rowsProcessed; + } + + bsErrMsg >> errMsg; + + while (true) + { + if (fRollbackPending) + { + break; + } + + bs.restart(); + bs = fExeMgr->read(); + if (bs.empty()) + { + cerr << "VacuumPartitionPackageProcessor::processPackage::doVacuumRows" << endl; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add("Lost connection to ExeMgr"); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + break; + } + else + { + bsCpy.restart(); + bsCpy = bs; + + if (rowGroup.get() == NULL) + { + // This is mete data, need to send all PMs. + processMetaRG(bsCpy, result, uniqueId, package, pmState, dbroot); + rowGroup = std::make_unique(); + rowGroup->deserialize(bs); + bs.restart(); + qb = 100; + bs << qb; + fExeMgr->write(bs); + continue; + } + + rgData.deserialize(bs, true); + rowGroup->setData(&rgData); + + if (bool err = (rowGroup->getStatus() != 0); err) + { + string errorMsg; + bs >> errorMsg; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add(errorMsg); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + DMLResult tmpResult; + receiveAll(tmpResult, uniqueId, pmIds, pmState, tableOid); + break; + } + + if (rowGroup->getRGData() == NULL) + { + bs.restart(); + } + // done fetching + if (rowGroup->getRowCount() == 0) + { + err = receiveAll(result, uniqueId, pmIds, pmState, tableOid); + break; + } + + if (rowGroup->getBaseRid() == (uint64_t)(-1)) + { + continue; + } + + dbroot = rowGroup->getDBRoot(); + + if (bool err = processRG(bsCpy, result, uniqueId, package, pmState, dbroot); err) + { + logging::LoggingID logid(DMLLoggingId, fSessionID, package.get_TxnID()); + logging::Message::Args args; + logging::Message msg(1); + args.add("SQL statement erroring out, need to receive all messages from WES"); + msg.format(args); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_DEBUG, msg, logid); + + DMLResult tmpResult; + receiveAll(tmpResult, uniqueId, pmIds, pmState, tableOid); + + logging::Message::Args args2; + logging::Message msg2(1); + args2.add("SQL statement erroring out, received all messages from WES"); + msg2.format(args2); + logger.logMessage(logging::LOG_TYPE_DEBUG, msg2, logid); + break; + } + + rowsProcessed += rowGroup->getRowCount(); + } + } + + if (fRollbackPending) + { + err = true; + cerr << "VacuumPartitionPackageProcessor::processPackage::doVacuumRows Rollback Pending" << endl; + result.result = JOB_CANCELED; + + logging::LoggingID logid(DMLLoggingId, fSessionID, package.get_TxnID()); + logging::Message::Args args1; + logging::Message msg1(1); + args1.add("SQL statement canceled by user"); + msg1.format(args1); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_DEBUG, msg1, logid); + + DMLResult tmpResult; + receiveAll(tmpResult, uniqueId, pmIds, pmState, tableOid); + } + + if (!err) + { + qb = 3; + bs.restart(); + bs << qb; + fExeMgr->write(bs); + bs = fExeMgr->read(); + bs >> result.queryStats; + bs >> result.extendedStats; + bs >> result.miniStats; + result.stats.unserialize(bs); + } + + if (err) + { + bs.restart(); + bs << qb; + fExeMgr->write(bs); + } + + return rowsProcessed; + } + catch (runtime_error& ex) + { + cerr << "VacuumPartitionPackageProcessor::processPackage::doVacuumRows" << ex.what() << endl; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add(ex.what()); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + qb = 0; + bs.restart(); + bs << qb; + fExeMgr->write(bs); + return rowsProcessed; + } + catch (...) + { + cerr << "VacuumPartitionPackageProcessor::processPackage::doVacuumRows" << endl; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add("Unknown error caught when communicating with ExeMgr"); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + qb = 0; + bs.restart(); + bs << qb; + fExeMgr->write(bs); + return rowsProcessed; + } + + return rowsProcessed; +} + +bool VacuumPartitionPackageProcessor::processMetaRG(messageqcpp::ByteStream& bsRowGroup, DMLResult& result, + const uint64_t uniqueId, + dmlpackage::VacuumPartitionDMLPackage& package, + std::map& pmState, uint32_t dbroot) +{ + bool err = false; + const uint32_t pmNum = (*fDbRootPMMap)[dbroot]; + + messageqcpp::ByteStream bsSend; + bsSend << (uint8_t)WriteEngine::ServerMessages::WE_SVR_VACUUM_PARTITION; + bsSend << uniqueId; + bsSend << pmNum; + bsSend << (uint32_t)package.get_TxnID(); + + execplan::CalpontSystemCatalog::TableName tableName(package.get_SchemaName(), package.get_TableName()); + vector bitmap = mockutils::getPartitionDeletedBitmap(package.getPartition(), tableName); + bsSend << (uint32_t)bitmap.size(); + for (const bool flag : bitmap) + { + bsSend << (uint8_t)flag; + } + + bsSend += bsRowGroup; + + boost::shared_ptr bsRecv; + bsRecv.reset(new messageqcpp::ByteStream()); + string errorMsg; + + uint32_t msgRecv = 0; + package.write(bsSend); + fWEClient->write_to_all(bsSend); + + while (1) + { + if (msgRecv == fWEClient->getPmCount()) + break; + + fWEClient->read(uniqueId, bsRecv); + + if (bsRecv->empty()) + { + err = true; + break; + } + + messageqcpp::ByteStream::byte tmp8; + *bsRecv >> tmp8; + if (tmp8 > 0) + { + *bsRecv >> errorMsg; + err = true; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add(errorMsg); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + break; + } + + msgRecv++; + } + + return err; +} + +bool VacuumPartitionPackageProcessor::processRG(messageqcpp::ByteStream& bsRowGroup, DMLResult& result, + const uint64_t uniqueId, + dmlpackage::VacuumPartitionDMLPackage& cpackage, + std::map& pmState, uint32_t dbroot) +{ + bool err = false; + const uint32_t pmNum = (*fDbRootPMMap)[dbroot]; + + messageqcpp::ByteStream bsSend; + bsSend << (uint8_t)WriteEngine::ServerMessages::WE_SVR_VACUUM_PARTITION; + bsSend << uniqueId; + bsSend << pmNum; + bsSend << (uint32_t)cpackage.get_TxnID(); + bsSend += bsRowGroup; + + boost::shared_ptr bsRecv; + bsRecv.reset(new messageqcpp::ByteStream()); + string errorMsg; + + if (pmState[pmNum]) + { + try + { + fWEClient->write(bsSend, (uint32_t)pmNum); + pmState[pmNum] = false; + } + catch (runtime_error& ex) + { + err = true; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add(ex.what()); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + } + catch (...) + { + err = true; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add("Unknown error caught when communicating with WES"); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + } + } + else + { + while (1) + { + bsRecv.reset(new messageqcpp::ByteStream()); + + try + { + fWEClient->read(uniqueId, bsRecv); + if (bsRecv->empty()) + { + err = true; + errorMsg = "Lost connection to Write Engine Server while updating"; + throw std::runtime_error(errorMsg); + } + + messageqcpp::ByteStream::byte tmp8; + *bsRecv >> tmp8; + *bsRecv >> errorMsg; + + if (tmp8 == IDBRANGE_WARNING) + { + result.result = IDBRANGE_WARNING; + logging::Message::Args args; + logging::Message message(2); + args.add(errorMsg); + message.format(args); + result.message = message; + } + else if (tmp8 > 0) + { + result.stats.fErrorNo = tmp8; + err = (tmp8 != 0); + } + + if (err) + throw std::runtime_error(errorMsg); + + uint32_t tmp32; + *bsRecv >> tmp32; + pmState[tmp32] = true; + + uint64_t blocksChanged = 0; + *bsRecv >> blocksChanged; + result.stats.fBlocksChanged += blocksChanged; + + if (tmp32 == (uint32_t)pmNum) + { + fWEClient->write(bsSend, (uint32_t)pmNum); + pmState[pmNum] = false; + break; + } + } + catch (runtime_error& ex) + { + err = true; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add(ex.what()); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + break; + } + catch (...) + { + err = true; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add("Unknown error caught when communicating with WES"); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + break; + } + } + } + + return err; +} + +bool VacuumPartitionPackageProcessor::receiveAll(DMLResult& result, const uint64_t uniqueId, + std::vector& pmIds, std::map& pmState, + const uint32_t tableOid) +{ + bool err = false; + + // check how many message we need to receive + uint32_t msgNotRecv = 0; + for (const int& pmId : pmIds) + { + if (!pmState[pmId]) + msgNotRecv++; + } + + boost::shared_ptr bsRecv; + string errMsg; + + if (msgNotRecv > 0) + { + logging::LoggingID logid(DMLLoggingId, fSessionID, fSessionID); + + if (msgNotRecv > fWEClient->getPmCount()) + { + logging::Message::Args args1; + logging::Message msg(1); + args1.add("Update outstanding messages exceed PM count , need to receive messages:PMcount = "); + ostringstream oss; + oss << msgNotRecv << ":" << fWEClient->getPmCount(); + args1.add(oss.str()); + msg.format(args1); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_ERROR, msg, logid); + err = true; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add("One of WriteEngineServer went away."); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + return err; + } + + bsRecv.reset(new messageqcpp::ByteStream()); + + uint32_t msgRecv = 0; + while (1) + { + if (msgRecv == msgNotRecv) + break; + + bsRecv.reset(new messageqcpp::ByteStream()); + + try + { + fWEClient->read(uniqueId, bsRecv); + + if (bsRecv->empty()) + { + err = true; + errMsg = "Lost connection to Write Engine Server while updating"; + throw std::runtime_error(errMsg); + } + + messageqcpp::ByteStream::byte tmp8; + *bsRecv >> tmp8; + *bsRecv >> errMsg; + + if (tmp8 == IDBRANGE_WARNING) + { + result.result = IDBRANGE_WARNING; + logging::Message::Args args; + logging::Message message(2); + args.add(errMsg); + message.format(args); + result.message = message; + } + else + { + result.stats.fErrorNo = tmp8; + err = (tmp8 != 0); + } + + if (err) + { + throw std::runtime_error(errMsg); + } + messageqcpp::ByteStream::quadbyte tmp32; + + uint64_t blocksChanged = 0; + *bsRecv >> blocksChanged; + + *bsRecv >> tmp32; + pmState[tmp32] = true; + + msgRecv++; + result.stats.fBlocksChanged += blocksChanged; + } + catch (runtime_error& ex) + { + err = true; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add(ex.what()); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + break; + } + catch (...) + { + err = true; + logging::Message::Args args; + logging::Message message(2); + args.add("Update Failed: "); + args.add("Unknown error caught when communicating with WES"); + message.format(args); + result.result = VACUUM_ERROR; + result.message = message; + break; + } + } + } + + return err; +} + +} // namespace dmlpackageprocessor diff --git a/dbcon/dmlpackageproc/vacuumpartitionpackageprocessor.h b/dbcon/dmlpackageproc/vacuumpartitionpackageprocessor.h new file mode 100644 index 0000000000..e890c1e03e --- /dev/null +++ b/dbcon/dmlpackageproc/vacuumpartitionpackageprocessor.h @@ -0,0 +1,59 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include "dmlpackageprocessor.h" +#include "vacuumpartitiondmlpackage.h" + +#define EXPORT + +namespace dmlpackageprocessor +{ +/** @brief concrete implementation of a DMLPackageProcessor. + * Specifically for interacting with the Write Engine to + * process VACUUM_PARTITION_BLOAT statements. + */ +class VacuumPartitionPackageProcessor : public DMLPackageProcessor +{ + public: + VacuumPartitionPackageProcessor(BRM::DBRM* dbrm, uint32_t sid) : DMLPackageProcessor(dbrm, sid) + { + } + + private: + DMLResult processPackageInternal(dmlpackage::CalpontDMLPackage& cpackage) override; + + uint64_t doVacuumRows(dmlpackage::VacuumPartitionDMLPackage& package, DMLResult& result, + const uint64_t uniqueId, const uint32_t tableOid); + + bool processMetaRG(messageqcpp::ByteStream& bsRowGroup, DMLResult& result, const uint64_t uniqueId, + dmlpackage::VacuumPartitionDMLPackage& package, std::map& pmState, + uint32_t dbroot = 1); + + bool processRG(messageqcpp::ByteStream& bsRowGroup, DMLResult& result, const uint64_t uniqueId, + dmlpackage::VacuumPartitionDMLPackage& package, std::map& pmState, + uint32_t dbroot = 1); + + bool receiveAll(DMLResult& result, const uint64_t uniqueId, std::vector& fPMs, + std::map& pmState, const uint32_t tableOid); +}; + +} // namespace dmlpackageprocessor + +#undef EXPORT diff --git a/dbcon/execplan/CMakeLists.txt b/dbcon/execplan/CMakeLists.txt index e13609f91f..2ceabb0324 100755 --- a/dbcon/execplan/CMakeLists.txt +++ b/dbcon/execplan/CMakeLists.txt @@ -45,7 +45,8 @@ set(execplan_LIB_SRCS treenodeimpl.cpp vendorexecutionplan.cpp windowfunctioncolumn.cpp - udafcolumn.cpp) + udafcolumn.cpp + mockutils.cpp) add_library(execplan SHARED ${execplan_LIB_SRCS}) diff --git a/dbcon/execplan/mockutils.cpp b/dbcon/execplan/mockutils.cpp new file mode 100644 index 0000000000..ddd3119f0c --- /dev/null +++ b/dbcon/execplan/mockutils.cpp @@ -0,0 +1,14 @@ + +#include "mockutils.h" + +namespace mockutils +{ + +std::vector getPartitionDeletedBitmap(const BRM::LogicalPartition& partitionNum, + const execplan::CalpontSystemCatalog::TableName& tableName) +{ + std::vector auxColMock(MAX_PARTITION_SIZE, true); + std::fill_n(auxColMock.begin(), MAX_PARTITION_SIZE / 2, false); + return auxColMock; +} +} // namespace mockutils diff --git a/dbcon/execplan/mockutils.h b/dbcon/execplan/mockutils.h new file mode 100644 index 0000000000..daa0566533 --- /dev/null +++ b/dbcon/execplan/mockutils.h @@ -0,0 +1,24 @@ +#ifndef __MOCKUTILS_H__ +#define __MOCKUTILS_H__ + +#include + +#include "logicalpartition.h" +#include "calpontsystemcatalog.h" + +namespace mockutils +{ +static constexpr auto MAX_PARTITION_SIZE = 8388608; + +/** + * @brief Get partition deleted bitmap + * + * @param partitionNum + * @param tableName + * @return deleted bitmap,`true` means deleted and `false` means not deleted + */ +std::vector getPartitionDeletedBitmap(const BRM::LogicalPartition& partitionNum, + const execplan::CalpontSystemCatalog::TableName& tableName); + +} // namespace mockutils +#endif // __MOCKUTILS_H__ diff --git a/dbcon/mysql/ha_mcs_partition.cpp b/dbcon/mysql/ha_mcs_partition.cpp index d817b464bf..8155d0befb 100644 --- a/dbcon/mysql/ha_mcs_partition.cpp +++ b/dbcon/mysql/ha_mcs_partition.cpp @@ -27,6 +27,8 @@ #include #include #include +#include +#include // #include #include using namespace std; @@ -38,6 +40,7 @@ using namespace std; #include "blocksize.h" #include "calpontsystemcatalog.h" #include "objectidmanager.h" +#include "mockutils.h" using namespace execplan; #include "mastersegmenttable.h" @@ -66,6 +69,9 @@ using namespace logging; #include using namespace boost; +#include "vacuumpartitiondmlpackage.h" +#include "dmlpackageprocessor.h" + namespace { datatypes::SimpleValue getStartVal(const datatypes::SessionParam& sp, const CalpontSystemCatalog::ColType& ct, @@ -540,6 +546,73 @@ std::string ha_mcs_impl_droppartitions_(execplan::CalpontSystemCatalog::TableNam return msg; } +std::string formatBloatInfo(const LogicalPartition& lp, const vector& deletedBitMap) +{ + uint32_t emptyValueCount = std::ranges::count(deletedBitMap, true /* target value */); + return std::format("\n {:<20} {:<.2f}%", std::format("{}.{}.{}", lp.dbroot, lp.pp, lp.seg), + (static_cast(emptyValueCount) * 100.0 / deletedBitMap.size())); +} + +int processVacuumDMLPackage(dmlpackage::CalpontDMLPackage* package) +{ + cout << "Sending to DMLProc" << endl; + ByteStream bs; + package->write(bs); + MessageQueueClient mq("DMLProc"); + ByteStream::byte b = 0; + int rc = 0; + THD* thd = current_thd; + string emsg; + + try + { + mq.write(bs); + bs = mq.read(); + + if (bs.empty()) + { + rc = 1; + thd->get_stmt_da()->set_overwrite_status(true); + thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc"); + } + else + { + bs >> b; + bs >> emsg; + rc = b; + } + } + catch (runtime_error&) + { + rc = 1; + thd->get_stmt_da()->set_overwrite_status(true); + thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc"); + } + catch (...) + { + rc = 1; + thd->get_stmt_da()->set_overwrite_status(true); + thd->raise_error_printf(ER_INTERNAL_ERROR, "Unknown error caught"); + } + + return rc; +} + +std::string ha_mcs_impl_vacuum_partition_bloat(const execplan::CalpontSystemCatalog::TableName& tableName, + const LogicalPartition& partitionNum) +{ + auto dmlPackage = std::make_unique( + tableName.schema, tableName.table, "mcs_vacuum_partition_blocat", tid2sid(current_thd->thread_id), + partitionNum); + + std::string msg = "Partitions are vacuumed successfully"; + int rc = processVacuumDMLPackage(dmlPackage.get()); + if (rc != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) + msg = std::format("Failed to vacuum partition, ErrorCode: {}", rc); + + return msg; +} + extern "C" { /** @@ -1343,6 +1416,453 @@ extern "C" *length = output.str().length(); return initid->ptr; } + + /** + * mcs_analyze_partition_bloat + */ + my_bool mcs_analyze_partition_bloat_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + bool hasErr = false; + + if (args->arg_count < 2 || args->arg_count > 3) + { + hasErr = true; + } + else if (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT || + (args->arg_count == 3 && args->arg_type[2] != STRING_RESULT)) + { + hasErr = true; + } + else if (!args->args[0] || !args->args[1] || (args->arg_count == 3 && !args->args[2])) + { + hasErr = true; + } + + if (hasErr) + { + strcpy(message, "Usage: MCS_ANALYZE_PARTITION_BLOAT ([schema], table, partition_num)"); + return 1; + } + + return 0; + } + + void mcs_analyze_partition_bloat_deinit(UDF_INIT* initid) + { + delete[] initid->ptr; + } + + const char* mcs_analyze_partition_bloat(UDF_INIT* initid, UDF_ARGS* args, char* result, + unsigned long* length, char* is_null, char* error) + { + BRM::DBRM::refreshShm(); + DBRM dbrm; + + CalpontSystemCatalog::TableName tableName; + string schema, table, partitionNumStr; + set partitionNums; + LogicalPartition partitionNum; + vector deletedBitMap; + + string errMsg; + try + { + int offset = 2; + if (args->arg_count == 3) + { + schema = (char*)(args->args[0]); + table = (char*)(args->args[1]); + partitionNumStr = (char*)(args->args[2]); + } + else + { + if (current_thd->db.length) + { + schema = current_thd->db.str; + } + else + { + throw IDBExcept(ERR_PARTITION_NO_SCHEMA); + } + + table = (char*)(args->args[0]); + partitionNumStr = (char*)(args->args[1]); + offset = 1; + } + + parsePartitionString(args, offset, partitionNums, errMsg, tableName); + if (!errMsg.empty()) + { + Message::Args args; + args.add(errMsg); + throw IDBExcept(ERR_INVALID_FUNC_ARGUMENT, args); + } + partitionNum = *partitionNums.begin(); + + tableName = make_table(schema, table, lower_case_table_names); + deletedBitMap = mockutils::getPartitionDeletedBitmap(partitionNum, tableName); + } + catch (IDBExcept& ex) + { + current_thd->get_stmt_da()->set_overwrite_status(true); + current_thd->raise_error_printf(ER_INTERNAL_ERROR, ex.what()); + return result; + } + catch (...) + { + current_thd->get_stmt_da()->set_overwrite_status(true); + current_thd->raise_error_printf(ER_INTERNAL_ERROR, + "Error occurred when calling MCS_ANALYZE_PARTITION_BLOAT"); + return result; + } + + ostringstream output; + output << std::format("{:<20} {:<}", "Part#", "Empty Rate"); + output << formatBloatInfo(partitionNum, deletedBitMap); + + initid->ptr = new char[output.str().length() + 1]; + memcpy(initid->ptr, output.str().c_str(), output.str().length()); + *length = output.str().length(); + return initid->ptr; + } + + /** + * mcs_analyze_table_bloat + */ + my_bool mcs_analyze_table_bloat_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + bool hasErr = false; + + if (args->arg_count < 1 || args->arg_count > 2) + { + hasErr = true; + } + else if (args->arg_type[0] != STRING_RESULT || + (args->arg_count == 2 && args->arg_type[1] != STRING_RESULT)) + { + hasErr = true; + } + else if (!args->args[0] || (args->arg_count == 2 && !args->args[1])) + { + hasErr = true; + } + + if (hasErr) + { + strcpy(message, "Usage: MCS_ANALYZE_TABLE_BLOAT ([schema,] table)"); + return 1; + } + + return 0; + } + + void mcs_analyze_table_bloat_deinit(UDF_INIT* initid) + { + delete[] initid->ptr; + } + + const char* mcs_analyze_table_bloat(UDF_INIT* initid, UDF_ARGS* args, char* result, unsigned long* length, + char* is_null, char* error) + { + BRM::DBRM::refreshShm(); + DBRM dbrm; + + CalpontSystemCatalog::TableName tableName; + std::string schema, table; + std::vector partitionNums; + std::vector entries; + + string errMsg; + try + { + if (args->arg_count == 2) + { + schema = (char*)(args->args[0]); + table = (char*)(args->args[1]); + } + else + { + if (current_thd->db.length) + { + schema = current_thd->db.str; + } + else + { + throw IDBExcept(ERR_PARTITION_NO_SCHEMA); + } + + table = (char*)(args->args[0]); + } + + if (!errMsg.empty()) + { + Message::Args args; + args.add(errMsg); + throw IDBExcept(ERR_INVALID_FUNC_ARGUMENT, args); + } + + tableName = make_table(schema, table, lower_case_table_names); + CalpontSystemCatalog csc; + csc.identity(CalpontSystemCatalog::FE); + OID_t auxOID = csc.tableAUXColumnOID(tableName); + if (auxOID == -1) + { + Message::Args args; + args.add(std::format("'{}.{}'", schema, table)); + throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args); + } + + CHECK(dbrm.getExtents(auxOID, entries, false, false, true)); + + for (const auto& entry : entries) + { + partitionNums.emplace_back(entry.dbRoot, entry.partitionNum, entry.segmentNum); + } + } + catch (IDBExcept& ex) + { + current_thd->get_stmt_da()->set_overwrite_status(true); + current_thd->raise_error_printf(ER_INTERNAL_ERROR, ex.what()); + return result; + } + catch (...) + { + current_thd->get_stmt_da()->set_overwrite_status(true); + current_thd->raise_error_printf(ER_INTERNAL_ERROR, + "Error occurred when calling MCS_ANALYZE_TABLE_BLOAT"); + return result; + } + + ostringstream output; + output << std::format("{:<20} {:<}", "Part#", "Empty Rate"); + for (const auto& partitionNum : partitionNums) + { + vector deletedBitMap = mockutils::getPartitionDeletedBitmap(partitionNum, tableName); + output << formatBloatInfo(partitionNum, deletedBitMap); + } + + initid->ptr = new char[output.str().length() + 1]; + memcpy(initid->ptr, output.str().c_str(), output.str().length()); + *length = output.str().length(); + return initid->ptr; + } + + /** + * mcs_vacuum_partition_bloat + */ + my_bool mcs_vacuum_partition_bloat_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + bool hasErr = false; + + if (args->arg_count < 2 || args->arg_count > 3) + { + hasErr = true; + } + else if (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT || + (args->arg_count == 3 && args->arg_type[2] != STRING_RESULT)) + { + hasErr = true; + } + else if (!args->args[0] || !args->args[1] || (args->arg_count == 3 && !args->args[2])) + { + hasErr = true; + } + + if (hasErr) + { + strcpy(message, "Usage: MCS_VACUUM_PARTITION_BLOAT ([schema,] table, partition_num)"); + return 1; + } + + return 0; + } + + void mcs_vacuum_partition_bloat_deinit(UDF_INIT* initid) + { + delete[] initid->ptr; + } + + const char* mcs_vacuum_partition_bloat(UDF_INIT* initid, UDF_ARGS* args, char* result, + unsigned long* length, char* is_null, char* error) + { + BRM::DBRM::refreshShm(); + DBRM dbrm; + + CalpontSystemCatalog::TableName tableName; + std::string schema, table; + std::set partitionNums; + LogicalPartition partitionNum; + + string errMsg; + try + { + int offset = 2; + if (args->arg_count == 3) + { + schema = (char*)(args->args[0]); + table = (char*)(args->args[1]); + } + else + { + if (current_thd->db.length) + { + schema = current_thd->db.str; + } + else + { + throw IDBExcept(ERR_PARTITION_NO_SCHEMA); + } + + table = (char*)(args->args[0]); + offset = 1; + } + + parsePartitionString(args, offset, partitionNums, errMsg, tableName); + if (!errMsg.empty()) + { + Message::Args args; + args.add(errMsg); + throw IDBExcept(ERR_INVALID_FUNC_ARGUMENT, args); + } + partitionNum = *partitionNums.begin(); + + tableName = make_table(schema, table, lower_case_table_names); + } + catch (IDBExcept& ex) + { + current_thd->get_stmt_da()->set_overwrite_status(true); + current_thd->raise_error_printf(ER_INTERNAL_ERROR, ex.what()); + return result; + } + catch (...) + { + current_thd->get_stmt_da()->set_overwrite_status(true); + current_thd->raise_error_printf(ER_INTERNAL_ERROR, + "Error occurred when calling MCS_VACUUM_PARTITION_BLOAT"); + return result; + } + + std::string vacuumResult = ha_mcs_impl_vacuum_partition_bloat(tableName, partitionNum); + + memcpy(result, vacuumResult.c_str(), vacuumResult.length()); + *length = vacuumResult.length(); + return initid->ptr; + } + + /** + * mcs_vacuum_table_bloat + */ + my_bool mcs_vacuum_table_bloat_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + bool hasErr = false; + + if (args->arg_count < 1 || args->arg_count > 2) + { + hasErr = true; + } + else if (args->arg_type[0] != STRING_RESULT || + (args->arg_count == 2 && args->arg_type[1] != STRING_RESULT)) + { + hasErr = true; + } + else if (!args->args[0] || (args->arg_count == 2 && !args->args[1])) + { + hasErr = true; + } + + if (hasErr) + { + strcpy(message, "Usage: MCS_VACUUM_TABLE_BLOAT ([schema,] table)"); + return 1; + } + + return 0; + } + + void mcs_vacuum_table_bloat_deinit(UDF_INIT* initid) + { + delete[] initid->ptr; + } + + const char* mcs_vacuum_table_bloat(UDF_INIT* initid, UDF_ARGS* args, char* result, unsigned long* length, + char* is_null, char* error) + { + BRM::DBRM::refreshShm(); + DBRM dbrm; + + CalpontSystemCatalog::TableName tableName; + std::string schema, table; + std::vector partitionNums; + std::vector entries; + + string errMsg; + try + { + if (args->arg_count == 2) + { + schema = (char*)(args->args[0]); + table = (char*)(args->args[1]); + } + else + { + if (current_thd->db.length) + { + schema = current_thd->db.str; + } + else + { + throw IDBExcept(ERR_PARTITION_NO_SCHEMA); + } + + table = (char*)(args->args[0]); + } + + if (!errMsg.empty()) + { + Message::Args args; + args.add(errMsg); + throw IDBExcept(ERR_INVALID_FUNC_ARGUMENT, args); + } + + tableName = make_table(schema, table, lower_case_table_names); + CalpontSystemCatalog csc; + csc.identity(CalpontSystemCatalog::FE); + OID_t auxOID = csc.tableAUXColumnOID(tableName); + if (auxOID == -1) + { + Message::Args args; + args.add(std::format("'{}.{}'", schema, table)); + throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args); + } + + CHECK(dbrm.getExtents(auxOID, entries, false, false, true)); + + for (const auto& entry : entries) + { + partitionNums.emplace_back(entry.dbRoot, entry.partitionNum, entry.segmentNum); + } + } + catch (IDBExcept& ex) + { + current_thd->get_stmt_da()->set_overwrite_status(true); + current_thd->raise_error_printf(ER_INTERNAL_ERROR, ex.what()); + return result; + } + catch (...) + { + current_thd->get_stmt_da()->set_overwrite_status(true); + current_thd->raise_error_printf(ER_INTERNAL_ERROR, + "Error occurred when calling MCS_VACUUM_TABLE_BLOAT"); + return result; + } + + ostringstream output; + + initid->ptr = new char[output.str().length() + 1]; + memcpy(initid->ptr, output.str().c_str(), output.str().length()); + *length = output.str().length(); + return initid->ptr; + } } } // namespace diff --git a/dbcon/mysql/install_mcs_mysql.sh.in b/dbcon/mysql/install_mcs_mysql.sh.in index 7d3913cd70..f0c447ea26 100755 --- a/dbcon/mysql/install_mcs_mysql.sh.in +++ b/dbcon/mysql/install_mcs_mysql.sh.in @@ -79,6 +79,10 @@ CREATE OR REPLACE FUNCTION caldisablepartitionsbyvalue RETURNS STRING SONAME 'ha CREATE OR REPLACE FUNCTION calenablepartitionsbyvalue RETURNS STRING SONAME 'ha_columnstore.so'; CREATE OR REPLACE FUNCTION calshowpartitionsbyvalue RETURNS STRING SONAME 'ha_columnstore.so'; CREATE OR REPLACE AGGREGATE FUNCTION moda RETURNS STRING SONAME 'libregr_mysql.so'; +CREATE OR REPLACE FUNCTION mcs_analyze_partition_bloat RETURNS STRING SONAME 'ha_columnstore.so'; +CREATE OR REPLACE FUNCTION mcs_analyze_table_bloat RETURNS STRING SONAME 'ha_columnstore.so'; +CREATE OR REPLACE FUNCTION mcs_vacuum_partition_bloat RETURNS STRING SONAME 'ha_columnstore.so'; +CREATE OR REPLACE FUNCTION mcs_vacuum_table_bloat RETURNS STRING SONAME 'ha_columnstore.so'; CREATE DATABASE IF NOT EXISTS infinidb_querystats; CREATE TABLE IF NOT EXISTS infinidb_querystats.querystats diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 34529212f2..f4860f1bab 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -45,6 +45,9 @@ using namespace boost; #include "sqllogger.h" #include "we_messages.h" #include "dmlprocessor.h" +#include "vacuumpartitiondmlpackage.h" +#include "vacuumpartitionpackageprocessor.h" + using namespace BRM; using namespace config; using namespace execplan; @@ -1125,6 +1128,16 @@ void PackageHandler::run() } } break; + + case dmlpackage::DML_VACUUM_PARTITION: + { + dmlpackage::VacuumPartitionDMLPackage vacuumPkg; + vacuumPkg.read(*(fByteStream.get())); + fProcessor.reset( + new dmlpackageprocessor::VacuumPartitionPackageProcessor(fDbrm, vacuumPkg.get_SessionID())); + result = fProcessor->processPackage(vacuumPkg); + } + break; } // Log errors diff --git a/mysql-test/columnstore/basic/r/mcol4889-mcs_analyze_partition_bloat_function.result b/mysql-test/columnstore/basic/r/mcol4889-mcs_analyze_partition_bloat_function.result new file mode 100644 index 0000000000..452144be64 --- /dev/null +++ b/mysql-test/columnstore/basic/r/mcol4889-mcs_analyze_partition_bloat_function.result @@ -0,0 +1,27 @@ +DROP DATABASE IF EXISTS mcol4889_db1; +CREATE DATABASE IF NOT EXISTS mcol4889_db1; +USE mcol4889_db1; +CREATE TABLE t1(col1 INT, col2 INT, col3 CHAR(8)) ENGINE=COLUMNSTORE; +LOAD DATA LOCAL infile 'MTR_SUITE_DIR/../std_data/100Krows.dat' INTO TABLE t1 FIELDS TERMINATED BY '|';; +SELECT COUNT(*) FROM t1; +COUNT(*) +100001 +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +SELECT COUNT(*) FROM t1; +COUNT(*) +12800128 +SELECT mcs_analyze_partition_bloat('t1', '0.0.1'); +mcs_analyze_partition_bloat('t1', '0.0.1') +Part# Empty Rate + 1.0.0 50.00% +SELECT mcs_analyze_partition_bloat('t1', '1.0.1'); +mcs_analyze_partition_bloat('t1', '1.0.1') +Part# Empty Rate + 1.1.0 50.00% +DROP DATABASE mcol4889_db1; diff --git a/mysql-test/columnstore/basic/r/mcol4889-mcs_analyze_table_bloat_function.result b/mysql-test/columnstore/basic/r/mcol4889-mcs_analyze_table_bloat_function.result new file mode 100644 index 0000000000..722ec9493a --- /dev/null +++ b/mysql-test/columnstore/basic/r/mcol4889-mcs_analyze_table_bloat_function.result @@ -0,0 +1,24 @@ +DROP DATABASE IF EXISTS mcol4889_db2; +CREATE DATABASE IF NOT EXISTS mcol4889_db2; +USE mcol4889_db2; +CREATE TABLE t1(col1 INT, col2 INT, col3 CHAR(8)) ENGINE=COLUMNSTORE; +LOAD DATA LOCAL infile 'MTR_SUITE_DIR/../std_data/100Krows.dat' INTO TABLE t1 FIELDS TERMINATED BY '|';; +SELECT COUNT(*) FROM t1; +COUNT(*) +100001 +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +SELECT COUNT(*) FROM t1; +COUNT(*) +12800128 +SELECT mcs_analyze_table_bloat('t1'); +mcs_analyze_table_bloat('t1') +Part# Empty Rate + 1.0.0 50.00% + 1.0.1 50.00% +DROP DATABASE mcol4889_db2; diff --git a/mysql-test/columnstore/basic/t/mcol4889-mcs_analyze_partition_bloat_function.test b/mysql-test/columnstore/basic/t/mcol4889-mcs_analyze_partition_bloat_function.test new file mode 100644 index 0000000000..39fed5ed42 --- /dev/null +++ b/mysql-test/columnstore/basic/t/mcol4889-mcs_analyze_partition_bloat_function.test @@ -0,0 +1,49 @@ +# +# Test mcs_analyze_table_bloat() function +# Author: Ziy1-Tan, tanziyi0925@gmail.com +# +-- source ../include/have_columnstore.inc + +--disable_warnings +DROP DATABASE IF EXISTS mcol4889_db1; +--enable_warnings + +CREATE DATABASE IF NOT EXISTS mcol4889_db1; +USE mcol4889_db1; + +CREATE TABLE t1(col1 INT, col2 INT, col3 CHAR(8)) ENGINE=COLUMNSTORE; +--replace_result $MTR_SUITE_DIR MTR_SUITE_DIR +--eval LOAD DATA LOCAL infile '$MTR_SUITE_DIR/../std_data/100Krows.dat' INTO TABLE t1 FIELDS TERMINATED BY '|'; + +SELECT COUNT(*) FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +SELECT COUNT(*) FROM t1; + +let $func_exists=`SELECT COUNT(*) FROM mysql.func WHERE name='mcs_analyze_partition_bloat'`; + +--disable_query_log +if (!$func_exists) +{ + CREATE FUNCTION mcs_analyze_partition_bloat RETURNS STRING SONAME "ha_columnstore.so"; +} +--enable_query_log + +SELECT mcs_analyze_partition_bloat('t1', '0.0.1'); + +SELECT mcs_analyze_partition_bloat('t1', '1.0.1'); + +--disable_query_log +if (!$func_exists) +{ + DROP FUNCTION calshowpartitions; +} +--enable_query_log + +# Clean UP +DROP DATABASE mcol4889_db1; diff --git a/mysql-test/columnstore/basic/t/mcol4889-mcs_analyze_table_bloat_function.test b/mysql-test/columnstore/basic/t/mcol4889-mcs_analyze_table_bloat_function.test new file mode 100644 index 0000000000..b898ed698f --- /dev/null +++ b/mysql-test/columnstore/basic/t/mcol4889-mcs_analyze_table_bloat_function.test @@ -0,0 +1,47 @@ +# +# Test mcs_analyze_table_bloat() function +# Author: Ziy1-Tan, tanziyi0925@gmail.com +# +-- source ../include/have_columnstore.inc + +--disable_warnings +DROP DATABASE IF EXISTS mcol4889_db2; +--enable_warnings + +CREATE DATABASE IF NOT EXISTS mcol4889_db2; +USE mcol4889_db2; + +CREATE TABLE t1(col1 INT, col2 INT, col3 CHAR(8)) ENGINE=COLUMNSTORE; +--replace_result $MTR_SUITE_DIR MTR_SUITE_DIR +--eval LOAD DATA LOCAL infile '$MTR_SUITE_DIR/../std_data/100Krows.dat' INTO TABLE t1 FIELDS TERMINATED BY '|'; + +SELECT COUNT(*) FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +SELECT COUNT(*) FROM t1; + +let $func_exists=`SELECT COUNT(*) FROM mysql.func WHERE name='mcs_analyze_table_bloat'`; + +--disable_query_log +if (!$func_exists) +{ + CREATE FUNCTION mcs_analyze_table_bloat RETURNS STRING SONAME "ha_columnstore.so"; +} +--enable_query_log + +SELECT mcs_analyze_table_bloat('t1'); + +--disable_query_log +if (!$func_exists) +{ + DROP FUNCTION calshowpartitions; +} +--enable_query_log + +# Clean UP +DROP DATABASE mcol4889_db2; diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index 79f62ad34a..934928b736 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -3951,6 +3951,12 @@ uint8_t WE_DMLCommandProc::getWrittenLbids(messageqcpp::ByteStream& bs, std::str return rc; } +uint8_t WE_DMLCommandProc::processVacuumPartition(messageqcpp::ByteStream& bs, std::string& err, + messageqcpp::ByteStream::quadbyte& PMId) +{ + return 1; +} + uint8_t WE_DMLCommandProc::processFlushFiles(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; diff --git a/writeengine/server/we_dmlcommandproc.h b/writeengine/server/we_dmlcommandproc.h index 3418e36b79..d477565d58 100644 --- a/writeengine/server/we_dmlcommandproc.h +++ b/writeengine/server/we_dmlcommandproc.h @@ -96,6 +96,8 @@ class WE_DMLCommandProc EXPORT uint8_t processEndTransaction(ByteStream& bs, std::string& err); EXPORT uint8_t processFixRows(ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId); EXPORT uint8_t getWrittenLbids(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId); + EXPORT uint8_t processVacuumPartition(messageqcpp::ByteStream& bs, std::string& err, + messageqcpp::ByteStream::quadbyte& PMId); int validateColumnHWMs(execplan::CalpontSystemCatalog::RIDList& ridList, boost::shared_ptr systemCatalogPtr, const std::vector& segFileInfo, const char* stage, diff --git a/writeengine/server/we_messages.h b/writeengine/server/we_messages.h index 15c1f916e6..9b1858e4c2 100644 --- a/writeengine/server/we_messages.h +++ b/writeengine/server/we_messages.h @@ -81,6 +81,7 @@ enum ServerMessages WE_SVR_WRITE_CREATE_SYSCOLUMN, WE_SVR_BATCH_INSERT_BINARY, WE_SVR_GET_WRITTEN_LBIDS, + WE_SVR_VACUUM_PARTITION, WE_CLT_SRV_DATA = 100, WE_CLT_SRV_EOD, diff --git a/writeengine/server/we_readthread.cpp b/writeengine/server/we_readthread.cpp index 8f6eb600dd..b272290d6c 100644 --- a/writeengine/server/we_readthread.cpp +++ b/writeengine/server/we_readthread.cpp @@ -168,6 +168,12 @@ void DmlReadThread::operator()() break; } + case WE_SVR_VACUUM_PARTITION: + { + rc = fWeDMLprocessor->processVacuumPartition(ibs, errMsg, PMId); + break; + } + case WE_SVR_BATCH_INSERT_END: { rc = fWeDMLprocessor->processBatchInsertHwm(ibs, errMsg);