Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ArrowStream as internal data format in ChdbResultSet #4

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,18 @@ hs_err_pid*
replay_pid*
/cmake-build-debug/
*.lst

##############################
## Maven
##############################
target/

##############################
## IntelliJ
##############################
out/
.idea/
.idea_modules/
*.iml
*.ipr
*.iws
19 changes: 19 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<maven.compiler.source>22</maven.compiler.source>
<maven.compiler.target>22</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>18.1.0</arrow.version>
</properties>

<dependencies>
Expand All @@ -23,6 +24,23 @@
<version>5.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-unsafe</artifactId>
<version>${arrow.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-compression</artifactId>
<version>${arrow.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -38,6 +56,7 @@
<includes>
<include>**/*Test.java</include>
</includes>
<argLine>--add-opens=java.base/java.nio=ALL-UNNAMED</argLine>
</configuration>
</plugin>
</plugins>
Expand Down
6 changes: 4 additions & 2 deletions src/jni/chdb_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,21 @@ local_result_v2 * queryToBuffer(
}


JNIEXPORT jobject JNICALL Java_org_chdb_jdbc_ChdbJniUtil_executeQuery(JNIEnv *env, jclass clazz, jstring query) {
JNIEXPORT jobject JNICALL Java_org_chdb_jdbc_ChdbJniUtil_executeQuery(JNIEnv *env, jclass clazz, jstring query, jstring format) {
// 1. Convert Java String to C++ string

std::cout << "call func: ChdbJniUtil_executeQuery!" << std::endl;

const char *queryStr = env->GetStringUTFChars(query, nullptr);
const char *formatStr = env->GetStringUTFChars(format, nullptr);

if (queryStr == nullptr) {
std::cerr << "Error: Failed to convert Java string to C++ string" << std::endl;
return nullptr;
}

// 2. Call the native query function
local_result_v2 *result = queryToBuffer(queryStr);
local_result_v2 *result = queryToBuffer(queryStr, formatStr);

// 3. Release the Java string resources
env->ReleaseStringUTFChars(query, queryStr);
Expand Down
3 changes: 1 addition & 2 deletions src/jni/chdb_jni.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
extern "C" {
#endif

JNIEXPORT jobject JNICALL Java_org_chdb_jdbc_ChdbJniUtil_executeQuery(JNIEnv *, jclass, jstring);
//JNIEXPORT jstring JNICALL Java_org_chdb_jdbc_ChdbJniUtil_executeQuery(JNIEnv *, jclass, jstring);
JNIEXPORT jobject JNICALL Java_org_chdb_jdbc_ChdbJniUtil_executeQuery(JNIEnv *, jclass, jstring, jstring);

#ifdef __cplusplus
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/org/chdb/jdbc/ChdbJniUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ public class ChdbJniUtil {
System.loadLibrary("chdbjni");
}

public static native LocalResultV2 executeQuery(String query);
// public static native String executeQuery(String query);
public static LocalResultV2 executeQuery(String query) {
return executeQuery(query, "CSV");
}

public static native LocalResultV2 executeQuery(String query, String format);

// public static void main(String[] args) {
// String query = "SELECT 1";
Expand Down
99 changes: 74 additions & 25 deletions src/main/java/org/chdb/jdbc/ChdbResultSet.java
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
package org.chdb.jdbc;

import java.io.*;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.chdb.jdbc.memory.ArrowMemoryManger;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;

public class ChdbResultSet implements ResultSet {
private LocalResultV2 result;
private int cursor = -1;
private List<String> records;
private int loadedRows = 0;
private ArrowStreamReader arrowStreamReader;
private VectorSchemaRoot batch;
private int batchCursor;

public ChdbResultSet(LocalResultV2 result) throws IOException {
this.result = result;
this.records = new ArrayList<>();
parseData(result.getBuf());
parseData(result.getBuf());
}

private void parseData(ByteBuffer buffer) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(toByteArray(buffer)), StandardCharsets.UTF_8));
String line;
while ((line = reader.readLine()) != null) {
records.add(line);
}
ByteArrayInputStream inputStream = new ByteArrayInputStream(toByteArray(buffer));
this.arrowStreamReader = new ArrowStreamReader(inputStream, ArrowMemoryManger.ROOT_ALLOCATOR);
}

private byte[] toByteArray(ByteBuffer buffer) {
Expand All @@ -38,24 +43,54 @@ private byte[] toByteArray(ByteBuffer buffer) {

@Override
public boolean next() throws SQLException {
if (cursor < records.size() - 1) {
if (cursor < loadedRows - 1) {
cursor++;
batchCursor++;
return true;
}
return false;
try {
if (arrowStreamReader.loadNextBatch()) {
AutoCloseables.close(batch);
batch = arrowStreamReader.getVectorSchemaRoot();
loadedRows += batch.getRowCount();
cursor++;
batchCursor = 0;
return true;
} else {
return false;
}
} catch (Exception e) {
throw new SQLException(e);
}
}

private String getValue(int columnIndex) throws SQLException {
if (cursor < 0 || cursor >= records.size()) {
throw new SQLException("Cursor out of bounds");
/**
* Checks to see whether the given index is a valid column number and throws
* an <code>SQLException</code> if it is not. The index is out of bounds
* if it is less than <code>1</code> or greater than the number of
* columns in this rowset.
* <p>
* This method is called internally by the <code>getXXX</code> and
* <code>updateXXX</code> methods.
*
* @param idx the number of a column, must be between <code>1</code>
* and the number of rows in this rowset
* @throws SQLException if the given index is out of bounds
*/
private void checkColumnIndex(int idx) throws SQLException {
if (idx < 1 || idx > getColumnCount()) {
throw new SQLException("Column index " + idx + " is out of bound[1, " + getColumnCount() +"]");
}
}

return records.get(cursor);
private int getColumnCount() {
return batch.getSchema().getFields().size();
}

@Override
public String getString(int columnIndex) throws SQLException {
return getValue(columnIndex);
checkColumnIndex(columnIndex);
return batch.getVector(columnIndex - 1).getObject(batchCursor).toString();
}

@Override
Expand All @@ -75,12 +110,22 @@ public short getShort(int i) throws SQLException {

@Override
public int getInt(int columnIndex) throws SQLException {
return Integer.parseInt(getValue(columnIndex));
checkColumnIndex(columnIndex);
FieldVector vector = batch.getVector(columnIndex - 1);
if (vector instanceof IntVector) {
return ((IntVector) vector).get(batchCursor);
}
throw new SQLException("Column [" + vector.getField() + "] is not int");
}

@Override
public long getLong(int i) throws SQLException {
throw new SQLException("This method has not been implemented yet.");
public long getLong(int columnIndex) throws SQLException {
checkColumnIndex(columnIndex);
FieldVector vector = batch.getVector(columnIndex);
if (vector instanceof BigIntVector) {
return ((BigIntVector) vector).get(batchCursor);
}
throw new SQLException("Column [" + vector.getField() + "] is not long");
}

@Override
Expand Down Expand Up @@ -135,7 +180,11 @@ public InputStream getBinaryStream(int i) throws SQLException {

@Override
public void close() throws SQLException {
// No resources to close
try {
AutoCloseables.close(batch, arrowStreamReader);
} catch (Exception e) {
throw new SQLException(e);
}
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/chdb/jdbc/ChdbStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public ChdbStatement(ChdbConnection connection) {

@Override
public ResultSet executeQuery(String sql) throws SQLException {
LocalResultV2 result = ChdbJniUtil.executeQuery(sql);
LocalResultV2 result = ChdbJniUtil.executeQuery(sql, "ArrowStream");
System.out.println("sql: " + sql);
try {
return new ChdbResultSet(result);
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/chdb/jdbc/memory/ArrowMemoryManger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.chdb.jdbc.memory;

import org.apache.arrow.memory.RootAllocator;

public class ArrowMemoryManger {
public static RootAllocator ROOT_ALLOCATOR = new RootAllocator();
}
7 changes: 5 additions & 2 deletions src/test/java/ChdbJdbcTest.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import org.junit.jupiter.api.Test;
import org.chdb.jdbc.memory.ArrowMemoryManger;

import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -7,8 +7,10 @@

import static org.junit.jupiter.api.Assertions.*;

public class ChdbJdbcTest {
public class ChdbJdbcTest {

// TODO failed to run by junit, it will crash
// Note: adding --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
public static void main(String[] args) throws Exception {
// Load the JDBC driver
Class.forName("org.chdb.jdbc.ChdbDriver");
Expand All @@ -31,5 +33,6 @@ public static void main(String[] args) throws Exception {
rs.close();
conn.close();

assertEquals(0, ArrowMemoryManger.ROOT_ALLOCATOR.getAllocatedMemory());
}
}