Skip to content

Commit

Permalink
discovery api
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani committed Nov 14, 2024
1 parent 110580d commit 6d7a211
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension)
project(${TARGET_NAME})
include_directories(src/include duckdb/third_party/httplib duckdb/parquet/include)

set(EXTENSION_SOURCES src/httpserver_extension.cpp)
set(EXTENSION_SOURCES src/httpserver_extension.cpp src/discovery.cpp)

if(MINGW)
set(OPENSSL_USE_STATIC_LIBS TRUE)
Expand Down
102 changes: 102 additions & 0 deletions src/discovery.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include "discovery.hpp"
#include "mbedtls_wrapper.hpp"
#include "duckdb/common/common.hpp"
#include <chrono>
#include <iostream>

namespace duckdb {

// Static member definition
std::unique_ptr<PeerDiscovery> PeerDiscovery::instance = nullptr;

// Constructor implementation
PeerDiscovery::PeerDiscovery(DatabaseInstance& database) : db(database) {
initTables();
}

void PeerDiscovery::Initialize(DatabaseInstance& db) {
instance = std::unique_ptr<PeerDiscovery>(new PeerDiscovery(db));
}

void PeerDiscovery::initTables() {
Connection conn(db);
conn.Query("CREATE TABLE IF NOT EXISTS peers ("
"hash VARCHAR, peer_id VARCHAR, name VARCHAR, endpoint VARCHAR,"
"source_address VARCHAR, ttl BIGINT, metadata VARCHAR,"
"registered_at TIMESTAMP, PRIMARY KEY (hash, peer_id))");

conn.Query("CREATE INDEX IF NOT EXISTS idx_peers_ttl ON peers(registered_at, ttl)");
}

std::string PeerDiscovery::generateDeterministicId(const std::string& name, const std::string& endpoint) {
std::string combined = name + ":" + endpoint;
hash_bytes hash;
duckdb_mbedtls::MbedTlsWrapper::ComputeSha256Hash(combined.c_str(), combined.length(), (char*)hash);

std::string result;
for (int i = 0; i < 16; i++) {
char buf[3];
snprintf(buf, sizeof(buf), "%02x", hash[i]);
result += buf;
}
return result;
}


void PeerDiscovery::registerPeer(const std::string& hash, const PeerData& data) {
Connection conn(db);
std::string peerId = generateDeterministicId(data.name, data.endpoint);

auto stmt = conn.Prepare(
"INSERT INTO peers (hash, peer_id, name, endpoint, source_address, ttl, metadata, registered_at) "
"VALUES ($1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP)");

vector<Value> params;
params.push_back(Value(hash));
params.push_back(Value(peerId));
params.push_back(Value(data.name));
params.push_back(Value(data.endpoint));
params.push_back(Value(data.sourceAddress));
params.push_back(Value::BIGINT(data.ttl));
params.push_back(Value(data.metadata));

stmt->Execute(params);
}

std::unique_ptr<MaterializedQueryResult> PeerDiscovery::getPeers(const std::string& hash, bool ndjson) {
Connection conn(db);
auto stmt = conn.Prepare(
"SELECT name, endpoint, source_address as sourceAddress, "
"peer_id as peerId, metadata, "
"CAST(EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - registered_at)) AS INTEGER) as age, "
"ttl FROM peers WHERE hash = $1 AND "
"EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - registered_at)) < ttl "
"ORDER BY age ASC");

vector<Value> params;
params.push_back(Value(hash));

auto result = stmt->Execute(params);
return unique_ptr<MaterializedQueryResult>((MaterializedQueryResult*)result.release());
}

void PeerDiscovery::removePeer(const std::string& hash, const std::string& peerId) {
Connection conn(db);
auto stmt = conn.Prepare("DELETE FROM peers WHERE hash = $1 AND peer_id = $2");
stmt->Execute(hash, peerId);
}

void PeerDiscovery::updateHeartbeat(const std::string& hash, const std::string& peerId) {
Connection conn(db);
auto stmt = conn.Prepare("UPDATE peers SET registered_at = CURRENT_TIMESTAMP "
"WHERE hash = $1 AND peer_id = $2");
stmt->Execute(hash, peerId);
}

void PeerDiscovery::cleanupExpired() {
Connection conn(db);
conn.Query("DELETE FROM peers WHERE EXTRACT(EPOCH FROM "
"(CURRENT_TIMESTAMP - registered_at)) >= ttl");
}

} // namespace duckdb
139 changes: 139 additions & 0 deletions src/httpserver_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#define CPPHTTPLIB_OPENSSL_SUPPORT
#include "httplib.hpp"

#include "discovery.hpp"

// Include yyjson for JSON handling
#include "yyjson.hpp"

Expand Down Expand Up @@ -348,6 +350,123 @@ void HandleHttpRequest(const duckdb_httplib_openssl::Request& req, duckdb_httpli
}
}

// Discovery Functions
void HandleDiscoverySubscribe(const duckdb_httplib_openssl::Request& req, duckdb_httplib_openssl::Response& res) {

string path = req.path;
string hash = path.substr(path.find_last_of('/') + 1);

auto doc = yyjson_read(req.body.c_str(), req.body.length(), 0);
if (!doc) {
res.status = 400;
res.set_content("Invalid JSON", "text/plain");
return;
}

auto root = yyjson_doc_get_root(doc);
PeerData data;
auto name_val = yyjson_obj_get(root, "name");
auto endpoint_val = yyjson_obj_get(root, "endpoint");
auto ttl_val = yyjson_obj_get(root, "ttl");
auto metadata_val = yyjson_obj_get(root, "metadata");

data.name = name_val ? yyjson_get_str(name_val) : "";
data.endpoint = endpoint_val ? yyjson_get_str(endpoint_val) : "";
data.ttl = ttl_val ? yyjson_get_int(ttl_val) : 300;
data.metadata = metadata_val ? yyjson_get_str(metadata_val) : "{}";
data.sourceAddress = req.remote_addr;

try {
PeerDiscovery::Instance().registerPeer(hash, data);
std::string peerId = PeerDiscovery::generateDeterministicId(data.name, data.endpoint);

std::cerr << "GOR PEERID for: " << peerId << std::endl;

auto rdoc = yyjson_mut_doc_new(nullptr);
auto rroot = yyjson_mut_obj(rdoc);
yyjson_mut_doc_set_root(rdoc, rroot);

yyjson_mut_obj_add_str(rdoc, rroot, "peerId", peerId.c_str());
yyjson_mut_obj_add_str(rdoc, rroot, "message", "Successfully registered");
yyjson_mut_obj_add_int(rdoc, rroot, "ttl", data.ttl);

char* json = yyjson_mut_write(rdoc, 0, nullptr);

std::cerr << "JSON OK for: " << peerId << std::endl;

res.set_content(json, "application/json");
free(json);
yyjson_mut_doc_free(rdoc);
} catch (const Exception& ex) {
res.status = 500;
res.set_content(ex.what(), "text/plain");
}

yyjson_doc_free(doc);
}

void HandleDiscoveryGet(const duckdb_httplib_openssl::Request& req, duckdb_httplib_openssl::Response& res) {

string path = req.path;
string hash = path.substr(path.find_last_of('/') + 1);
bool ndjson = false;

if ((path.find_last_of('/') + 2)){
ndjson = true;
}

// const auto& hash = req.path_params.at("secretHash");
// bool ndjson = req.get_param_value("format") == "ndjson";

try {
auto result = PeerDiscovery::Instance().getPeers(hash, ndjson);
if (!result || result->HasError()) {
res.status = 500;
res.set_content(result ? result->GetError() : "Query failed", "text/plain");
return;
}

auto materialized = unique_ptr<MaterializedQueryResult>(reinterpret_cast<MaterializedQueryResult*>(result.release()));

if (ndjson) {
res.set_content(ConvertResultToNDJSON(*materialized), "application/x-ndjson");
} else {
ReqStats stats{0.0, 0, (int64_t)materialized->RowCount()};
res.set_content(ConvertResultToJSON(*materialized, stats), "application/json");
}
} catch (const Exception& ex) {
res.status = 500;
res.set_content(ex.what(), "text/plain");
}
}

void HandleHeartbeat(const duckdb_httplib_openssl::Request& req, duckdb_httplib_openssl::Response& res) {
const auto& hash = req.path_params.at("secretHash");
const auto& peerId = req.path_params.at("peerId");

try {
PeerDiscovery::Instance().updateHeartbeat(hash, peerId);
res.set_content("{\"message\":\"Heartbeat received\"}", "application/json");
} catch (const Exception& ex) {
res.status = 404;
res.set_content("{\"error\":\"Peer not found\"}", "application/json");
}
}

void HandleUnsubscribe(const duckdb_httplib_openssl::Request& req, duckdb_httplib_openssl::Response& res) {
const auto& hash = req.path_params.at("secretHash");
const auto& peerId = req.path_params.at("peerId");

try {
PeerDiscovery::Instance().removePeer(hash, peerId);
res.set_content("{\"message\":\"Successfully unsubscribed\"}", "application/json");
} catch (const Exception& ex) {
res.status = 500;
res.set_content(ex.what(), "text/plain");
}
}

// Server Start
void HttpServerStart(DatabaseInstance& db, string_t host, int32_t port, string_t auth = string_t()) {
if (global_state.is_running) {
throw IOException("HTTP server is already running");
Expand Down Expand Up @@ -377,11 +496,31 @@ void HttpServerStart(DatabaseInstance& db, string_t host, int32_t port, string_t
global_state.server->Get("/", HandleHttpRequest);
global_state.server->Post("/", HandleHttpRequest);

// Handle Discovery API
global_state.server->Post("/subscribe/[^/]+", [&](const duckdb_httplib_openssl::Request& req, duckdb_httplib_openssl::Response& res) {
HandleDiscoverySubscribe(req, res);
});

global_state.server->Get("/discovery/[^/]+", [&](const duckdb_httplib_openssl::Request& req, duckdb_httplib_openssl::Response& res) {
HandleDiscoveryGet(req, res);
});

global_state.server->Post("/heartbeat/[^/]+/[^/]+", [&](const duckdb_httplib_openssl::Request& req, duckdb_httplib_openssl::Response& res) {
HandleHeartbeat(req, res);
});

global_state.server->Delete("/unsubscribe/[^/]+/[^/]+", [&](const duckdb_httplib_openssl::Request& req, duckdb_httplib_openssl::Response& res) {
HandleUnsubscribe(req, res);
});

// Health check endpoint
global_state.server->Get("/ping", [](const duckdb_httplib_openssl::Request& req, duckdb_httplib_openssl::Response& res) {
res.set_content("OK", "text/plain");
});

// Initialize PeerDiscovery with the database instance
PeerDiscovery::Initialize(db);

string host_str = host.GetString();

const char* run_in_same_thread_env = std::getenv("DUCKDB_HTTPSERVER_FOREGROUND");
Expand Down
39 changes: 39 additions & 0 deletions src/include/discovery.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include "duckdb.hpp"
#include "duckdb/common/types.hpp"
#include <memory>
#include <string>

namespace duckdb {

using hash_bytes = uint8_t[32];

struct PeerData {
std::string name;
std::string endpoint;
std::string sourceAddress;
int64_t ttl;
std::string metadata;
};

class PeerDiscovery {
static std::unique_ptr<PeerDiscovery> instance;
DatabaseInstance& db;

void initTables();
PeerDiscovery(DatabaseInstance& database);

public:
static void Initialize(DatabaseInstance& db);
static PeerDiscovery& Instance() { return *instance; }
static std::string generateDeterministicId(const std::string& name, const std::string& endpoint);

void registerPeer(const std::string& hash, const PeerData& data);
std::unique_ptr<MaterializedQueryResult> getPeers(const std::string& hash, bool ndjson = false);
void removePeer(const std::string& hash, const std::string& peerId);
void updateHeartbeat(const std::string& hash, const std::string& peerId);
void cleanupExpired();
};

} // namespace duckdb

0 comments on commit 6d7a211

Please sign in to comment.