Skip to content

Commit

Permalink
Merge branch 'feature/duck_flock' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani authored Nov 15, 2024
2 parents 110580d + c5f872e commit a127955
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension)
project(${TARGET_NAME})
include_directories(src/include duckdb/third_party/httplib duckdb/parquet/include)

set(EXTENSION_SOURCES src/httpserver_extension.cpp)
set(EXTENSION_SOURCES src/httpserver_extension.cpp src/duck_flock.cpp)

if(MINGW)
set(OPENSSL_USE_STATIC_LIBS TRUE)
Expand Down
78 changes: 78 additions & 0 deletions src/duck_flock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#ifndef DUCK_FLOCK_H
#define DUCK_FLOCK_H
#include "httpserver_extension.hpp"
namespace duckdb {
struct DuckFlockData : FunctionData{
vector<unique_ptr<Connection>> conn;
vector<unique_ptr<QueryResult>> results;
unique_ptr<FunctionData> Copy() const override {
throw std::runtime_error("not implemented");
}
bool Equals(const FunctionData &other) const override {
throw std::runtime_error("not implemented");
};
};



unique_ptr<FunctionData> DuckFlockBind(ClientContext &context, TableFunctionBindInput &input,
vector<LogicalType> &return_types, vector<string> &names) {
auto data = make_uniq<DuckFlockData>();
auto strQuery = input.inputs[0].GetValue<string>();
vector<string> flock;
auto &raw_flock = ListValue::GetChildren(input.inputs[1]);
for (auto &duck : raw_flock) {
flock.push_back(duck.ToString());
auto conn = make_uniq<Connection>(*context.db);
conn->Query("INSTALL json;LOAD json;INSTALL httpfs;LOAD httpfs;");
auto req = conn->Prepare("SELECT * FROM read_json($2 || '/?q=' || url_encode($1::VARCHAR))");
if (req->HasError()) {
throw std::runtime_error("duck_flock: error: " + req->GetError());
}
data->conn.push_back(std::move(conn));
data->results.push_back(std::move(req->Execute(strQuery.c_str(), duck.ToString())));
}
if (data->results[0]->HasError()) {
throw std::runtime_error("duck_flock: error: " + data->results[0]->GetError());
}
return_types.clear();
copy(data->results[0]->types.begin(), data->results[0]->types.end(), back_inserter(return_types));
names.clear();
copy(data->results[0]->names.begin(), data->results[0]->names.end(), back_inserter(names));
return std::move(data);
}

void DuckFlockImplementation(ClientContext &context, duckdb::TableFunctionInput &data_p,
DataChunk &output) {
auto &data = data_p.bind_data->Cast<DuckFlockData>();
for (const auto &res : data.results) {
ErrorData error_data;
unique_ptr<DataChunk> data_chunk = make_uniq<DataChunk>();
if (res->TryFetch(data_chunk, error_data)) {
if (data_chunk != nullptr) {
output.Append(*data_chunk);
return;
}
}
}
}

TableFunction DuckFlockTableFunction() {
TableFunction f(
"duck_flock",
{LogicalType::VARCHAR, LogicalType::LIST(LogicalType::VARCHAR)},
DuckFlockImplementation,
DuckFlockBind,
nullptr,
nullptr
);
return f;
}


}




#endif
4 changes: 3 additions & 1 deletion src/httpserver_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ static HttpServerState global_state;
int64_t read_rows;
};



// Convert the query result to JSON format
static std::string ConvertResultToJSON(MaterializedQueryResult &result, ReqStats &req_stats) {
auto doc = yyjson_mut_doc_new(nullptr);
Expand Down Expand Up @@ -469,7 +471,7 @@ static void LoadInternal(DatabaseInstance &instance) {

ExtensionUtil::RegisterFunction(instance, httpserve_start);
ExtensionUtil::RegisterFunction(instance, httpserve_stop);

ExtensionUtil::RegisterFunction(instance, DuckFlockTableFunction());
// Register the cleanup function to be called at exit
std::atexit(HttpServerCleanup);
}
Expand Down
2 changes: 2 additions & 0 deletions src/include/httpserver_extension.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ struct HttpServerState;
void HttpServerStart(DatabaseInstance& db, string_t host, int32_t port);
void HttpServerStop();

TableFunction DuckFlockTableFunction();

} // namespace duckdb

0 comments on commit a127955

Please sign in to comment.