Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziy1-Tan committed Jul 6, 2024
1 parent da2c018 commit 14f944e
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 110 deletions.
185 changes: 87 additions & 98 deletions dbcon/execplan/mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "constantcolumn.h"
#include "functioncolumn.h"
#include "rowgroup.h"
#include "sessionmanager.h"
#include "simplecolumn.h"
#include "returnedcolumn.h"
#include "simplefilter.h"
Expand All @@ -19,169 +20,158 @@ namespace
{
using RowSize = uint32_t;

RowSize doProcessQuery(const CalpontExecutionPlan& csep, std::shared_ptr<execplan::ClientRotator> exeMgr)
RowSize processQuery(const CalpontSelectExecutionPlan& csep)
{
messageqcpp::ByteStream msg;
auto exeMgr = std::make_unique<execplan::ClientRotator>(csep.sessionID(), "ExeMgr");
try
{
exeMgr->connect(5);
}
catch (...)
{
throw logging::IDBExcept(logging::ERR_LOST_CONN_EXEMGR);
}

messageqcpp::ByteStream msgCSEP;

// send code to indicat tuple
messageqcpp::ByteStream::quadbyte qb = 4;
msg << qb;
exeMgr->write(msg);
msg.restart();
msgCSEP << qb;
exeMgr->write(msgCSEP);
msgCSEP.restart();

// Send the CalpontSelectExecutionPlan to ExeMgr.
csep.serialize(msg);
exeMgr->write(msg);
csep.serialize(msgCSEP);
exeMgr->write(msgCSEP);

// Get the table oid for the system table being queried.
uint32_t tableOID = IDB_VTABLE_ID;
uint16_t status = 0;
msgCSEP.restart();
msgCSEP = exeMgr->read();

// Send the request for the table.
qb = static_cast<messageqcpp::ByteStream::quadbyte>(tableOID);
messageqcpp::ByteStream bs;
bs << qb;
exeMgr->write(bs);
std::unique_ptr<rowgroup::RowGroup> rowGroup;
rowgroup::RGData rgData;

msg.restart();
bs.restart();
messageqcpp::ByteStream msg;
msg = exeMgr->read();
bs = exeMgr->read();

if (bs.length() == 0)
if (msg.length() == 0)
{
throw logging::IDBExcept(logging::ERR_LOST_CONN_EXEMGR);
}

std::string emsgStr;
bs >> emsgStr;
bool err = false;
std::string errMsg;
msg >> errMsg;

if (msg.length() == 4)
bool hasErr = false;
if (msgCSEP.length() == 4)
{
msg >> qb;
msgCSEP >> qb;

if (qb != 0)
err = true;
hasErr = true;
}
else
{
err = true;
hasErr = true;
}

if (err)
if (hasErr)
{
throw runtime_error(emsgStr);
throw runtime_error(errMsg);
}

std::unique_ptr<rowgroup::RowGroup> rowGroup;
rowgroup::RGData rgData;
RowSize result = 0;
while (true)
{
bs.restart();
bs = exeMgr->read();
msg.restart();
msg = exeMgr->read();

// @bug 1782. check ExeMgr connection lost
if (bs.length() == 0)
// ExeMgr connection lost
if (msg.length() == 0)
{
throw logging::IDBExcept(logging::ERR_LOST_CONN_EXEMGR);
}

if (!rowGroup)
{
rowGroup.reset(new rowgroup::RowGroup());
rowGroup->deserialize(bs);
rowGroup->deserialize(msg);
continue;
}
else
{
rgData.deserialize(bs, true);
rgData.deserialize(msg, true);
rowGroup->setData(&rgData);
}

if ((status = rowGroup->getStatus()) != 0)
if (uint32_t status = rowGroup->getStatus(); status != 0)
{
if (status >= 1000) // new error system
{
// bs.advance(rowGroup->getDataSize());
bs >> emsgStr;
throw logging::IDBExcept(emsgStr, rowGroup->getStatus());
msg >> errMsg;
throw logging::IDBExcept(errMsg, rowGroup->getStatus());
}
else
{
throw logging::IDBExcept(logging::ERR_SYSTEM_CATALOG);
throw logging::IDBExcept(logging::ERR_LOST_CONN_EXEMGR);
}
}

if (rowGroup->getRowCount() > 0)
// rowGroup->addToSysDataList(sysDataList);
rowGroup->getColumnCount();
else
break;
}

bs.reset();
qb = 0;
bs << qb;
exeMgr->write(bs);
return 0;
}

RowSize processQuery(const CalpontExecutionPlan& cesp)
{
RowSize result = 0;
auto exeMgr = std::make_shared<execplan::ClientRotator>(0, "ExeMgr");
int maxTries = 0;
while (maxTries < 5)
{
maxTries++;

try
if (uint32_t rowSize = rowGroup->getRowCount(); rowSize > 0)
{
result = doProcessQuery(cesp, exeMgr);
break;
}
// error already occurred. this is not a broken pipe
catch (logging::IDBExcept&)
{
throw;
}
catch (...)
{
// may be a broken pipe. re-establish exeMgr and send the message
exeMgr.reset(new ClientRotator(0, "ExeMgr"));
try
rowgroup::Row row;
rowGroup->initRow(&row);
for (uint32_t i = 0; i < rowSize; i++)
{
exeMgr->connect(5);
}
catch (...)
{
throw logging::IDBExcept(logging::ERR_LOST_CONN_EXEMGR);
rowGroup->getRow(i, &row);
result = row.getUintField(0);
}
}
else
break;
}
if (maxTries >= 5)
throw logging::IDBExcept(logging::ERR_SYSTEM_CATALOG);

msg.reset();
qb = 0;
msg << qb;
exeMgr->write(msg);
return result;
}

RowSize getAUXPartitionSize(BRM::LogicalPartition partitionNum,
execplan::CalpontSystemCatalog::TableName tableNameObj)
execplan::CalpontSystemCatalog::TableName tableNameObj, int sessionID)
{
CalpontSelectExecutionPlan csep;
csep.sessionID(sessionID);

CalpontSystemCatalog csc;
CalpontSystemCatalog::RIDList oidList = csc.columnRIDs(tableNameObj);
auto csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(execplan::CalpontSystemCatalog::FE);
CalpontSystemCatalog::RIDList oidList = csc->columnRIDs(tableNameObj);
if (oidList.empty())
{
return 0;
}

std::string colName = csc.colName(oidList[0].objnum).column;
SessionManager sm;
BRM::TxnID txnID;
txnID = sm.getTxnID(sessionID);

if (!txnID.valid)
{
txnID.id = 0;
txnID.valid = true;
}

BRM::QueryContext verID;
verID = sm.verID();
csep.txnID(txnID.id);
csep.verID(verID);
csep.sessionID(sessionID);

std::string colName = csc->colName(oidList[0].objnum).column;

// SQL Statement: select count(col) from target_table from idbpartition(col) = "*.*.*";
std::string schemaTablePrefix = std::format("{}.{}.", tableNameObj.schema, tableNameObj.table);
FunctionColumn* fc1 = new FunctionColumn("count", schemaTablePrefix + colName);
FunctionColumn* fc2 = new FunctionColumn("idbpartition", schemaTablePrefix + colName);
SimpleColumn* sc = new SimpleColumn(colName);
FunctionColumn* fc1 = new FunctionColumn("count", schemaTablePrefix + colName, sessionID);
FunctionColumn* fc2 = new FunctionColumn("idbpartition", schemaTablePrefix + colName, sessionID);
SimpleColumn* sc = new SimpleColumn(colName, sessionID);

CalpontSelectExecutionPlan::ColumnMap colMap;
SRCP srcp;
Expand Down Expand Up @@ -219,14 +209,14 @@ RowSize getAUXPartitionSize(BRM::LogicalPartition partitionNum,

return processQuery(csep);
}

} // namespace

std::vector<bool> getPartitionDeletedBitmap(BRM::LogicalPartition partitionNum,
execplan::CalpontSystemCatalog::TableName tableName)
execplan::CalpontSystemCatalog::TableName tableName,
int sessionID)
{
std::vector<bool> auxColMock;
uint32_t rowSize = getAUXPartitionSize(partitionNum, tableName);
uint32_t rowSize = getAUXPartitionSize(partitionNum, tableName, sessionID);
auxColMock.reserve(rowSize);

std::random_device rd;
Expand All @@ -241,5 +231,4 @@ std::vector<bool> getPartitionDeletedBitmap(BRM::LogicalPartition partitionNum,

return auxColMock;
}

} // namespace execplan
3 changes: 2 additions & 1 deletion dbcon/execplan/mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ namespace execplan
* @return std::vector<bool> deleted bitmap,`true` means deleted and `false` means not deleted
*/
std::vector<bool> getPartitionDeletedBitmap(BRM::LogicalPartition partitionNum,
execplan::CalpontSystemCatalog::TableName tableName);
execplan::CalpontSystemCatalog::TableName tableName,
int sessionID);

} // namespace execplan
#endif // __MOCK_H__
25 changes: 14 additions & 11 deletions dbcon/mysql/ha_mcs_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ extern "C"
return result;
}

void caldisablepartitions_deinit(UDF_INIT* initid)
void caldisablepartitions_deinit(UDF_INIT* initid)
{
}

Expand Down Expand Up @@ -853,7 +853,7 @@ extern "C"
return result;
}

void calenablepartitions_deinit(UDF_INIT* initid)
void calenablepartitions_deinit(UDF_INIT* initid)
{
}

Expand Down Expand Up @@ -1045,7 +1045,7 @@ extern "C"
}

const char* caldisablepartitionsbyvalue(UDF_INIT* initid, UDF_ARGS* args, char* result,
unsigned long* length, char* is_null, char* error)
unsigned long* length, char* is_null, char* error)
{
string msg;
set<LogicalPartition> partSet;
Expand Down Expand Up @@ -1131,7 +1131,7 @@ extern "C"
/**
* CalShowPartitionsByValue
*/
my_bool calshowpartitionsbyvalue_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
my_bool calshowpartitionsbyvalue_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
bool err = false;

Expand Down Expand Up @@ -1164,7 +1164,7 @@ extern "C"
return 0;
}

void calshowpartitionsbyvalue_deinit(UDF_INIT* initid)
void calshowpartitionsbyvalue_deinit(UDF_INIT* initid)
{
delete[] initid->ptr;
}
Expand Down Expand Up @@ -1351,23 +1351,23 @@ extern "C"
*/
my_bool mcs_analyze_partition_bloat_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
bool err = false;
bool hasErr = false;

if (args->arg_count < 2 || args->arg_count > 3)
{
err = true;
hasErr = true;
}
else if (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT ||
(args->arg_count == 3 && args->arg_type[2] != STRING_RESULT))
{
err = true;
hasErr = true;
}
else if (!args->args[0] || !args->args[1] || (args->arg_count == 3 && !args->args[2]))
{
err = true;
hasErr = true;
}

if (err)
if (hasErr)
{
strcpy(message, "usage: MCS_ANALYZE_PARTITION_BLOAT (schema, table, partition_num)");
return 1;
Expand Down Expand Up @@ -1396,6 +1396,8 @@ extern "C"

ostringstream output;

THD* thd = current_thd;

try
{
schema = (char*)(args->args[0]);
Expand All @@ -1412,7 +1414,8 @@ extern "C"
partitionNum = *partitionNums.begin();

tableNameObj = make_table(schema, table, lower_case_table_names);
vector<bool> deletedBitMap = getPartitionDeletedBitmap(partitionNum, tableNameObj);
vector<bool> deletedBitMap =
getPartitionDeletedBitmap(partitionNum, tableNameObj, tid2sid(thd->thread_id));

uint32_t emptyValueCount = std::ranges::count(deletedBitMap, true /* target value */);

Expand Down

0 comments on commit 14f944e

Please sign in to comment.