Skip to content

Compression feature #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 50 additions & 39 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,24 @@ OPT ?= -g2 -Wall -Werror # (B) Debug mode, w/ full line-level debugging sym
CC = gcc
CXX = g++

SHARED_CFLAGS = -fPIC
SHARED_LDFLAGS = -shared -Wl,-soname -Wl,

INCPATH += -I./src -I./include -I./src/leveldb/include -I./src/leveldb $(DEPS_INCPATH)
CFLAGS += -std=c99 $(OPT) $(SHARED_CFLAGS) $(INCPATH)
CXXFLAGS += $(OPT) $(SHARED_CFLAGS) $(INCPATH)
LDFLAGS += -rdynamic $(DEPS_LDPATH) $(DEPS_LDFLAGS) -lpthread -lrt -lz -ldl
PLATFORM=OS_LINUX
PLATFORM_LDFLAGS=-pthread
PLATFORM_LIBS=
PLATFORM_CCFLAGS= -fno-builtin-memcmp -pthread -DOS_LINUX -DLEVELDB_PLATFORM_POSIX -DLEVELDB_ATOMIC_PRESENT -DSNAPPY
PLATFORM_CXXFLAGS=-std=c++0x -fno-builtin-memcmp -lpthread -DLEVELDB_PLATFORM_POSIX -DLEVELDB_ATOMIC_PRESENT -DSNAPPY
PLATFORM_SHARED_CFLAGS=-fPIC
PLATFORM_SHARED_EXT=so
PLATFORM_SHARED_LDFLAGS=-shared -Wl,-soname -Wl,
PLATFORM_SHARED_VERSIONED=true

SHARED_CFLAGS = -fPIC
SHARED_LDFLAGS = -shared -Wl,-soname -Wl,

INCPATH += -I./src/leveldb -I./src/leveldb/include -I./src -I./include $(DEPS_INCPATH)
#### CFLAGS += -std=c99 $(OPT) $(SHARED_CFLAGS) $(INCPATH) $(PLATFORM_CCFLAGS)
CFLAGS += $(OPT) $(SHARED_CFLAGS) $(INCPATH) $(PLATFORM_CCFLAGS)
CXXFLAGS += $(OPT) $(SHARED_CFLAGS) $(INCPATH) $(PLATFORM_CXXFLAGS)
LDFLAGS += -rdynamic $(DEPS_LDPATH) $(DEPS_LDFLAGS) -lpthread -lrt -lz -ldl

PROTO_FILES := $(wildcard src/proto/*.proto)
PROTO_OUT_CC := $(PROTO_FILES:.proto=.pb.cc)
Expand All @@ -23,7 +34,7 @@ PROTO_OUT_H := $(PROTO_FILES:.proto=.pb.h)
OTHER_SRC := src/trace_flags.cc src/fs_inotify.cc
SDK_SRC := $(wildcard src/sdk/*.cc)
COMMON_SRC := $(wildcard src/common/*.cc)
UTIL_SRC := $(wildcard src/util/*.cc)
UTIL_SRC := $(wildcard src/utils/*.cc)
PROTO_SRC := $(filter-out %.pb.cc, $(wildcard src/proto/*.cc)) $(PROTO_OUT_CC)
VERSION_SRC := src/version.cc
MDTTOOL_SRC := $(wildcard src/mdt-tool/mdt-tool.cc)
Expand All @@ -37,7 +48,7 @@ SCAN_TEST_SRC := $(wildcard src/benchmark/scan_test.cc)
C_SAMPLE_SRC := $(wildcard src/sample/c_sample.c)

FTRACE_SRC := $(wildcard src/ftrace/*.cc)
FTRACE_TEST_SRC := $(wildcard src/ftrace/test/*.cc)
###FTRACE_TEST_SRC := $(wildcard src/ftrace/test/*.cc)

AGENT_SRC := $(wildcard src/agent/*.cc)
COLLECTOR_SRC := $(wildcard src/collector/*.cc)
Expand All @@ -61,7 +72,7 @@ SCAN_TEST_OBJ := $(SCAN_TEST_SRC:.cc=.o)
C_SAMPLE_OBJ := $(C_SAMPLE_SRC:.c=.o)

FTRACE_OBJ := $(FTRACE_SRC:.cc=.o)
FTRACE_TEST_OBJ := $(FTRACE_TEST_SRC:.cc=.o)
#FTRACE_TEST_OBJ := $(FTRACE_TEST_SRC:.cc=.o)
AGENT_OBJ := $(AGENT_SRC:.cc=.o)
COLLECTOR_OBJ := $(COLLECTOR_SRC:.cc=.o)
SCHEDULER_OBJ := $(SCHEDULER_SRC:.cc=.o)
Expand All @@ -76,7 +87,7 @@ LEVELDB_LIB := src/leveldb/libleveldb.a
############################################################
PROGRAM = agent_main collector_main scheduler_main
FTRACELIBRARY = libftrace.a
FTRACE_TEST = TEST_log
#FTRACE_TEST = TEST_log

LIBRARY = libmdt.a
SAMPLE = sample
Expand Down Expand Up @@ -115,53 +126,53 @@ cleanall:
src/leveldb/libleveldb.a: FORCE
$(MAKE) -C src/leveldb

sample: $(SAMPLE_OBJ) $(LIBRARY)
$(CXX) -o $@ $(SAMPLE_OBJ) $(LIBRARY) $(LDFLAGS)
sample: $(SAMPLE_OBJ) $(LIBRARY) $(LEVELDB_LIB)
$(CXX) -o $@ $(SAMPLE_OBJ) $(LIBRARY) $(LDFLAGS) $(LEVELDB_LIB)

mdt-tool: $(MDTTOOL_OBJ) $(LIBRARY) $(OTHER_OBJ)
$(CXX) -o $@ $(MDTTOOL_OBJ) $(LIBRARY) $(OTHER_OBJ) $(LDFLAGS) -lreadline -lhistory -lncurses
mdt-tool: $(MDTTOOL_OBJ) $(LIBRARY) $(OTHER_OBJ) $(LEVELDB_LIB)
$(CXX) -o $@ $(MDTTOOL_OBJ) $(LIBRARY) $(OTHER_OBJ) $(LDFLAGS) $(LEVELDB_LIB) -lreadline -lhistory -lncurses

test_update_schema: $(UPDATESCHEMA_OBJ) $(LIBRARY)
$(CXX) -o $@ $(UPDATESCHEMA_OBJ) $(LIBRARY) $(LDFLAGS)
test_update_schema: $(UPDATESCHEMA_OBJ) $(LIBRARY) $(LEVELDB_LIB)
$(CXX) -o $@ $(UPDATESCHEMA_OBJ) $(LIBRARY) $(LDFLAGS) $(LEVELDB_LIB)

write_test: $(WRITE_TEST_OBJ) $(LIBRARY)
$(CXX) -o $@ $(WRITE_TEST_OBJ) $(LIBRARY) $(LDFLAGS)
write_test: $(WRITE_TEST_OBJ) $(LIBRARY) $(LEVELDB_LIB)
$(CXX) -o $@ $(WRITE_TEST_OBJ) $(LIBRARY) $(LDFLAGS) $(LEVELDB_LIB)

dumpfile: $(DUMPFILE_OBJ) $(LIBRARY)
$(CXX) -o $@ $(DUMPFILE_OBJ) $(LIBRARY) $(LDFLAGS)
dumpfile: $(DUMPFILE_OBJ) $(LIBRARY) $(LEVELDB_LIB)
$(CXX) -o $@ $(DUMPFILE_OBJ) $(LIBRARY) $(LDFLAGS) $(LEVELDB_LIB)

sync_write_test: $(SYNC_WRITE_TEST_OBJ) $(LIBRARY)
$(CXX) -o $@ $(SYNC_WRITE_TEST_OBJ) $(LIBRARY) $(LDFLAGS)
sync_write_test: $(SYNC_WRITE_TEST_OBJ) $(LIBRARY) $(LEVELDB_LIB)
$(CXX) -o $@ $(SYNC_WRITE_TEST_OBJ) $(LIBRARY) $(LDFLAGS) $(LEVELDB_LIB)

mulcli_write_test: $(MULWRITE_TEST_OBJ) $(LIBRARY)
$(CXX) -o $@ $(MULWRITE_TEST_OBJ) $(LIBRARY) $(LDFLAGS)
mulcli_write_test: $(MULWRITE_TEST_OBJ) $(LIBRARY) $(LEVELDB_LIB)
$(CXX) -o $@ $(MULWRITE_TEST_OBJ) $(LIBRARY) $(LDFLAGS) $(LEVELDB_LIB)

scan_test: $(SCAN_TEST_OBJ) $(LIBRARY)
$(CXX) -o $@ $(SCAN_TEST_OBJ) $(LIBRARY) $(LDFLAGS)
scan_test: $(SCAN_TEST_OBJ) $(LIBRARY) $(LEVELDB_LIB)
$(CXX) -o $@ $(SCAN_TEST_OBJ) $(LIBRARY) $(LDFLAGS) $(LEVELDB_LIB)

c_sample: $(C_SAMPLE_OBJ) $(LIBRARY)
$(CXX) -o $@ $(C_SAMPLE_OBJ) $(LIBRARY) $(LDFLAGS)
c_sample: $(C_SAMPLE_OBJ) $(LIBRARY) $(LEVELDB_LIB)
$(CXX) -o $@ $(C_SAMPLE_OBJ) $(LIBRARY) $(LDFLAGS) $(LEVELDB_LIB)

libmdt.a: $(SDK_OBJ) $(COMMON_OBJ) $(UTIL_OBJ) $(PROTO_OBJ) $(VERSION_OBJ)
$(AR) -rs $@ $(SDK_OBJ) $(COMMON_OBJ) $(UTIL_OBJ) $(PROTO_OBJ) $(VERSION_OBJ)
libmdt.a: $(SDK_OBJ) $(COMMON_OBJ) $(UTIL_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(LEVELDB_LIB)
$(AR) -rs $@ $(SDK_OBJ) $(COMMON_OBJ) $(UTIL_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(LEVELDB_LIB)

libftrace.a: $(FTRACE_OBJ) $(PROTO_OBJ)
$(AR) -rs $@ $(FTRACE_OBJ) $(PROTO_OBJ)

TEST_log: $(FTRACE_TEST_OBJ) $(FTRACELIBRARY)
$(CXX) -o $@ $(FTRACE_TEST_OBJ) $(FTRACELIBRARY) $(LDFLAGS)
#TEST_log: $(FTRACE_TEST_OBJ) $(FTRACELIBRARY)
# $(CXX) -o $@ $(FTRACE_TEST_OBJ) $(FTRACELIBRARY) $(LDFLAGS)

agent_main: $(AGENT_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(LEVELDB_LIB) $(OTHER_OBJ)
$(CXX) -o agent_main $(AGENT_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(LDFLAGS) $(LEVELDB_LIB) $(OTHER_OBJ)

collector_main: $(COLLECTOR_OBJ) $(LIBRARY) $(OTHER_OBJ)
$(CXX) -o collector_main $(COLLECTOR_OBJ) $(LIBRARY) $(OTHER_OBJ) $(LDFLAGS)
collector_main: $(COLLECTOR_OBJ) $(LIBRARY) $(OTHER_OBJ) $(LEVELDB_LIB)
$(CXX) -o collector_main $(COLLECTOR_OBJ) $(LIBRARY) $(OTHER_OBJ) $(LDFLAGS) $(LEVELDB_LIB)

scheduler_main: $(SCHEDULER_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(OTHER_OBJ)
$(CXX) -o scheduler_main $(SCHEDULER_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(OTHER_OBJ) $(LDFLAGS)
scheduler_main: $(SCHEDULER_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(OTHER_OBJ) $(LEVELDB_LIB)
$(CXX) -o scheduler_main $(SCHEDULER_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(OTHER_OBJ) $(LDFLAGS) $(LEVELDB_LIB)

$(CXX_OBJ): %.o: %.cc $(PROTO_OUT_H)
$(CXX) $(CXXFLAGS) -c $< -o $@
$(CXX_OBJ): %.o: %.cc $(PROTO_OUT_H) $(LEVELDB_LIB)
$(CXX) $(CXXFLAGS) -c $< -o $@ $(LEVELDB_LIB)

$(C_OBJ): %.o: %.c
$(CC) $(CFLAGS) -c $< -o $@
Expand Down
2 changes: 1 addition & 1 deletion src/agent/agent_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "proto/agent.pb.h"
#include "proto/scheduler.pb.h"
#include "agent/log_stream.h"
#include "util/event.h"
#include "utils/event.h"
#include <stdio.h>

namespace mdt {
Expand Down
2 changes: 1 addition & 1 deletion src/agent/log_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "agent/options.h"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include "util/coding.h"
#include "utils/coding.h"
#include <sys/time.h>

#include "leveldb/slice.h"
Expand Down
6 changes: 3 additions & 3 deletions src/agent/log_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
#include "agent/options.h"
#include "proto/query.pb.h"
#include "rpc/rpc_client.h"
#include "util/event.h"
#include <sys/time.h>
#include "util/counter.h"
#include "util/thread_pool.h"
#include "utils/event.h"
#include "utils/counter.h"
#include "utils/thread_pool.h"

namespace mdt {
namespace agent {
Expand Down
6 changes: 3 additions & 3 deletions src/benchmark/dump_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
#include "sdk/db.h"
#include "sdk/sdk.h"
#include "sdk/table.h"
#include "util/counter.h"
#include "util/env.h"
#include "util/mutex.h"
#include "utils/counter.h"
#include "utils/env.h"
#include "utils/mutex.h"
#include <gflags/gflags.h>
#include <glog/logging.h>

Expand Down
24 changes: 12 additions & 12 deletions src/benchmark/mulcli_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
#include "sdk/sdk.h"
#include "sdk/db.h"
#include "sdk/table.h"
#include "util/env.h"
#include "util/counter.h"
#include "util/mutex.h"
#include "utils/env.h"
#include "utils/counter.h"
#include "utils/mutex.h"

struct WriteTask {
mdt::Database* db_;
Expand All @@ -29,14 +29,14 @@ void StoreCallback_Test(mdt::Table* table, mdt::StoreRequest* request,
WriteTask* task = (WriteTask*)callback_param;
if (task->counter_->Dec() == 0) {
task->cond_->Signal();
}
}
}

void* write_task(void* arg) {
WriteTask* task = (WriteTask*)arg;
std::string value_str(3000, 'x');
uint64_t key_len = task->num_keys_ > 10 ? (task->num_keys_ / 10): task->num_keys_;

std::cout << "open table ..." << std::endl;
mdt::Table* table;
table = OpenTable(task->db_, task->table_name_);
Expand All @@ -52,7 +52,7 @@ void* write_task(void* arg) {
char* col_ptr = colbuf;
uint64_t col_id = i + task->start_key_;
col_ptr += snprintf(col_ptr, 11, "%011lu", col_id);
std::string col = colbuf;
std::string col = colbuf;

mdt::Index query, costtime, service;
query.index_name = "Query";
Expand All @@ -62,7 +62,7 @@ void* write_task(void* arg) {
costtime.index_name = "Costtime";
costtime.index_key = "5ms";
costtime.index_key += col;

service.index_name = "Service";
service.index_key = "bs module";
service.index_key += col;
Expand All @@ -81,7 +81,7 @@ void* write_task(void* arg) {
return NULL;
}

void dispatch_task(mdt::Database* db, const std::string& table_name,
void dispatch_task(mdt::Database* db, const std::string& table_name,
mdt::Mutex* mu, mdt::CondVar* cond,
mdt::Counter* counter, uint64_t num_task, uint64_t num_keys) {
for (uint64_t i = 0; i < num_task; i++) {
Expand All @@ -91,7 +91,7 @@ void dispatch_task(mdt::Database* db, const std::string& table_name,
task->table_name_ = table_name;
task->mu_ = mu;
task->cond_ = cond;
task->counter_ = counter;
task->counter_ = counter;
task->start_key_ = num_keys * i;
task->num_keys_ = num_keys;
pthread_create(&tid, NULL, write_task, task);
Expand All @@ -102,7 +102,7 @@ int main(int ac, char* av[]) {
::google::ParseCommandLineFlags(&ac, &av, true);
std::string db_name = "z020";
std::string table_name = "kepler001";

std::cout << "open db ..." << std::endl;
mdt::Database* db;
db = mdt::OpenDatabase(db_name);
Expand All @@ -115,11 +115,11 @@ int main(int ac, char* av[]) {
counter.Set((int64_t)(num_task * num_keys) + 1);
std::cout << "Test put ..." << std::endl;
dispatch_task(db, table_name, &mu, &cond, &counter, num_task, num_keys);

if (counter.Dec() != 0) {
cond.Wait();
}

std::cout << "done" << std::endl;
return 0;
}
24 changes: 12 additions & 12 deletions src/benchmark/scan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@
#include "sdk/sdk.h"
#include "sdk/db.h"
#include "sdk/table.h"
#include "util/env.h"
#include "util/coding.h"
#include "utils/env.h"
#include "utils/coding.h"

void primary_key_scan(mdt::Table* table, std::string& primary_key) {
mdt::SearchRequest* search_req = new mdt::SearchRequest;
mdt::SearchResponse* search_resp = new mdt::SearchResponse;
search_req->primary_key = primary_key;
std::cout << "scan table, case 1, primary key scan ..." << std::endl;

LOG(INFO) << "magicscan primary key: begin scan.. ";
table->Get(search_req, search_resp);
LOG(INFO) << "magicscan primary key: end scan.. ";
for (uint32_t i = 0; i < search_resp->result_stream.size(); i++) {
const mdt::ResultStream& result = search_resp->result_stream[i];
LOG(INFO) << "primary key: " << result.primary_key
LOG(INFO) << "primary key: " << result.primary_key
<< ", num of item " << result.result_data_list.size() << std::endl;
for (uint32_t j = 0; j < result.result_data_list.size(); j++) {
std::cout << " data(size=" << result.result_data_list[j].size()
std::cout << " data(size=" << result.result_data_list[j].size()
<< ") " << result.result_data_list[j] << std::endl;
}
}
Expand Down Expand Up @@ -61,10 +61,10 @@ void index_key_scan(mdt::Table* table) {
LOG(INFO) << "magicscan index key: end scan.. ";
for (uint32_t i = 0; i < search_resp->result_stream.size(); i++) {
const mdt::ResultStream& result = search_resp->result_stream[i];
LOG(INFO) << "primary key: " << result.primary_key
LOG(INFO) << "primary key: " << result.primary_key
<< ", num of item " << result.result_data_list.size() << std::endl;
for (uint32_t j = 0; j < result.result_data_list.size(); j++) {
std::cout << " data(size=" << result.result_data_list[j].size()
std::cout << " data(size=" << result.result_data_list[j].size()
<< ") " << result.result_data_list[j] << std::endl;
}
}
Expand All @@ -77,7 +77,7 @@ int main(int ac, char* av[]) {
::google::ParseCommandLineFlags(&ac, &av, true);
std::string db_name = "z012";
std::string table_name = "kepler001";

// create db
std::cout << "open db ..." << std::endl;
mdt::Database* db;
Expand All @@ -91,11 +91,11 @@ int main(int ac, char* av[]) {
// search test: case 1: primary key Scan
LOG(INFO) << "primary key scan";
std::string primary_key = "0000000056";
primary_key_scan(table, primary_key);
primary_key_scan(table, primary_key);

LOG(INFO) << "index key scan";
index_key_scan(table);

LOG(INFO) << "index key scan";
index_key_scan(table);

std::cout << "done" << std::endl;
return 0;
}
11 changes: 6 additions & 5 deletions src/benchmark/write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
#include "sdk/sdk.h"
#include "sdk/db.h"
#include "sdk/table.h"
#include "util/env.h"
#include "util/counter.h"
#include "util/mutex.h"
#include "utils/env.h"
#include "utils/counter.h"
#include "utils/mutex.h"
#include "utils/timer.h"

DEFINE_string(dbname, "z012", "production name");
DEFINE_string(tablename, "kepler001", "table name");
Expand Down Expand Up @@ -74,7 +75,7 @@ void* write_task(void* arg) {
req->index_list.push_back(costtime);
req->index_list.push_back(service);
req->data = value_str;
req->timestamp = time(NULL);
req->timestamp = ::mdt::timer::get_micros();

mdt::StoreResponse* resp = new mdt::StoreResponse();
mdt::StoreCallback TestCallback = StoreCallback_Test;
Expand Down Expand Up @@ -120,7 +121,7 @@ int main(int ac, char* av[]) {
mdt::Mutex mu;
mdt::CondVar cond(&mu);
mdt::Counter counter;
uint64_t num_task = 50;
uint64_t num_task = 1;
std::vector<WriteTask*> tasks;
for (uint64_t i = 0; i < num_task; i++) {
WriteTask* task = new WriteTask;
Expand Down
2 changes: 1 addition & 1 deletion src/collector/query_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <gflags/gflags.h>
#include "collector/query_service.h"
#include <sofa/pbrpc/pbrpc.h>
#include "util/status.h"
#include "utils/status.h"
#include <glog/logging.h>

DECLARE_string(se_service_port);
Expand Down
Loading